diff --git a/tests/test_api_query_guardrail.py b/tests/test_api_query_guardrail.py new file mode 100644 index 0000000..1babe3f --- /dev/null +++ b/tests/test_api_query_guardrail.py @@ -0,0 +1,119 @@ +"""POST /api/query cost guardrail for query_mode='remote' BigQuery rows. + +When user SQL references a registered remote-BQ name (or a direct +`bq."".""` path), run a BQ dry-run before execute. If the +estimated scan exceeds the configured cap, reject with 400 + +`remote_scan_too_large` so the operator pivots to `da fetch`. + +Default cap: 5 GiB per request. Configurable via +`api.query.bq_max_scan_bytes` in /admin/server-config (#160 §4.4). +""" +from __future__ import annotations + +import pytest + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None: + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + sys_conn = get_system_db() + try: + TableRegistryRepository(sys_conn).register( + id=f"bq.{bucket}.{source_table}", + name=name, + source_type="bigquery", + bucket=bucket, + source_table=source_table, + query_mode="remote", + ) + finally: + sys_conn.close() + + +@pytest.fixture +def mock_dry_run(monkeypatch): + """Replace `_bq_dry_run_bytes` with a controllable stub. Each test sets + `mock_dry_run.bytes_to_return` to control what /api/query sees.""" + state = {"bytes": 0} + + def fake(*args, **kwargs): + return state["bytes"] + + monkeypatch.setattr("app.api.query._bq_dry_run_bytes", fake, raising=False) + return state + + +def test_query_under_cap_calls_dry_run(seeded_app, mock_dry_run, monkeypatch): + """Dry-run is invoked when SQL references a registered remote BQ row. + Use a sentinel side-effect to confirm: the mock records call counts.""" + _register_bq_remote_row("ue", "finance", "ue") + state = mock_dry_run + state["bytes"] = 1 * 1024 * 1024 # 1 MiB + state["call_count"] = 0 + + def counting_fake(*args, **kwargs): + state["call_count"] += 1 + return state["bytes"] + + monkeypatch.setattr("app.api.query._bq_dry_run_bytes", counting_fake, raising=False) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + c.post( + "/api/query", + json={"sql": "SELECT count(*) FROM ue"}, + headers=_auth(token), + ) + assert state["call_count"] >= 1, \ + "guardrail must invoke _bq_dry_run_bytes when SQL references a registered remote BQ row" + + +def test_query_over_cap_rejected_400(seeded_app, mock_dry_run, monkeypatch): + """Dry-run reports 10 GiB; default cap (5 GiB) is exceeded → 400 with + structured detail naming bytes + tables + suggestion.""" + _register_bq_remote_row("ue", "finance", "ue") + mock_dry_run["bytes"] = 10 * 1024 * 1024 * 1024 # 10 GiB + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT * FROM ue"}, + headers=_auth(token), + ) + assert r.status_code == 400, r.json() + detail = r.json().get("detail", {}) + if isinstance(detail, dict): + assert detail.get("reason") == "remote_scan_too_large", detail + assert detail.get("scan_bytes") >= 10 * 1024 * 1024 * 1024 + assert "da fetch" in detail.get("suggestion", "").lower() or \ + "fetch" in detail.get("suggestion", "").lower() + assert "ue" in detail.get("tables", []) or \ + any("ue" in t for t in detail.get("tables", [])) + + +def test_no_bq_row_reference_skips_dry_run(seeded_app, monkeypatch): + """A query that doesn't touch any registered BQ remote row must NOT + invoke `_bq_dry_run_bytes` — guardrail incurs zero new latency on + plain non-BQ queries.""" + state = {"calls": 0} + + def counting_fake(*args, **kwargs): + state["calls"] += 1 + return 100 * 1024 * 1024 * 1024 # 100 GiB — irrelevant if not called + + monkeypatch.setattr("app.api.query._bq_dry_run_bytes", counting_fake, raising=False) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + c.post( + "/api/query", + json={"sql": "SELECT 1 AS x"}, + headers=_auth(token), + ) + assert state["calls"] == 0, \ + f"guardrail must skip dry-run on non-BQ queries; got {state['calls']} calls" diff --git a/tests/test_api_query_quota.py b/tests/test_api_query_quota.py new file mode 100644 index 0000000..585a510 --- /dev/null +++ b/tests/test_api_query_quota.py @@ -0,0 +1,126 @@ +"""POST /api/query enforces the same per-user quota as /api/v2/scan. + +Daily-byte cap is checked pre-flight (before dry-run); concurrent-slot is +acquired around dry-run + execute and released on exit; record_bytes is +called post-flight after the result lands. The quota tracker is the +process-local singleton in app/api/v2_quota.py — shared with /api/v2/scan +so both paths bill against the same daily budget. + +Closes part of #160 §4.3.3. +""" +from __future__ import annotations + +import pytest + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None: + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + sys_conn = get_system_db() + try: + TableRegistryRepository(sys_conn).register( + id=f"bq.{bucket}.{source_table}", + name=name, + source_type="bigquery", + bucket=bucket, + source_table=source_table, + query_mode="remote", + ) + finally: + sys_conn.close() + + +@pytest.fixture +def fresh_quota(monkeypatch): + """Reset the process-local quota singleton + return a fresh tracker + bound to the v2_quota module so the test owns its state. Without + this, prior tests' usage bleeds into the daily-bytes counter.""" + import app.api.v2_quota as q + monkeypatch.setattr(q, "_quota_singleton", None, raising=False) + return q + + +@pytest.fixture +def mock_dry_run(monkeypatch): + state = {"bytes": 1024} + + def fake(*args, **kwargs): + return state["bytes"] + + monkeypatch.setattr("app.api.query._bq_dry_run_bytes", fake, raising=False) + return state + + +def test_query_records_bytes_against_shared_quota(seeded_app, fresh_quota, mock_dry_run): + """A successful BQ-touching query bumps the user's daily-byte counter + on the SAME singleton tracker that /api/v2/scan uses — so a user who + has consumed daily budget via /api/v2/scan can't dodge the cap by + routing through /api/query.""" + _register_bq_remote_row("ue", "finance", "ue") + mock_dry_run["bytes"] = 4096 # 4 KiB + + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Pre-flight: tracker has zero usage for this user. + tracker = fresh_quota._build_quota_tracker() + user_id = "admin" # seeded_app's admin user id + before = tracker.bytes_used_today(user_id) + + r = c.post( + "/api/query", + json={"sql": "SELECT count(*) FROM ue"}, + headers=_auth(token), + ) + # The query may fail (no real BQ) but bytes recording should happen + # before any post-execute failure. Accept either 200 or 400; what + # matters is the byte counter advanced. + after = tracker.bytes_used_today(user_id) + if r.status_code == 200: + assert after - before >= 4096, \ + f"successful BQ-touching query must record bytes; before={before} after={after}" + + +def test_query_pre_flight_rejects_user_over_daily_cap(seeded_app, fresh_quota, mock_dry_run): + """If the user is already over their daily byte cap on the shared + tracker, /api/query rejects 429 BEFORE running the dry-run — no free + BQ work for over-cap users via this back door.""" + _register_bq_remote_row("ue", "finance", "ue") + + # Plant the user's daily counter already at the cap by injecting bytes. + tracker = fresh_quota._build_quota_tracker() + user_id = "admin" + # Push counter past the cap (default 50 GiB). + tracker.record_bytes(user_id, tracker._max_daily_bytes + 1) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT count(*) FROM ue"}, + headers=_auth(token), + ) + assert r.status_code == 429, r.json() + + +def test_non_bq_query_skips_quota_path(seeded_app, fresh_quota, mock_dry_run): + """A query that doesn't touch any registered remote BQ row must NOT + decrement quota. Quota wiring runs only when dry_run_set is non-empty.""" + tracker = fresh_quota._build_quota_tracker() + user_id = "admin" + before = tracker.bytes_used_today(user_id) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT 1 AS x"}, + headers=_auth(token), + ) + after = tracker.bytes_used_today(user_id) + assert after == before, \ + f"non-BQ query must not record bytes; before={before} after={after}" diff --git a/tests/test_api_query_rbac_bq_path.py b/tests/test_api_query_rbac_bq_path.py new file mode 100644 index 0000000..5ab7154 --- /dev/null +++ b/tests/test_api_query_rbac_bq_path.py @@ -0,0 +1,123 @@ +"""POST /api/query gates direct `bq."".""` references. + +Pre-existing RBAC hole: the forbidden-table check at `app/api/query.py` +only blocks names matching an existing master view. Direct `bq.*` syntax +doesn't match any master view → bypasses the check entirely. Closed in +#160 via the BQ_PATH regex + find_by_bq_path lookup: every `bq.*` in user +SQL must point at a registered query_mode='remote' BigQuery row. + +Tests cover: unregistered path → 403; registered + caller has grant → +allowed (request reaches BQ — we only check that RBAC didn't block it +before BQ runs); admin → bypasses per-name grant but still requires +registration. +""" +from __future__ import annotations + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None: + """Insert a query_mode='remote' BQ row directly via the system DB.""" + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + sys_conn = get_system_db() + try: + TableRegistryRepository(sys_conn).register( + id=f"bq.{bucket}.{source_table}", + name=name, + source_type="bigquery", + bucket=bucket, + source_table=source_table, + query_mode="remote", + ) + finally: + sys_conn.close() + + +def test_unregistered_bq_path_rejected_403(seeded_app): + """Direct reference to a `bq..` that no registry row points at: + 403 with `bq_path_not_registered`. Caller has admin token (no per-name + RBAC); the registration check still fires because admin must register + first to query a new dataset.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": 'SELECT * FROM bq."secret_ds"."secret_tbl"'}, + headers=_auth(token), + ) + assert r.status_code == 403, r.json() + detail = r.json().get("detail", {}) + if isinstance(detail, dict): + assert detail.get("reason") == "bq_path_not_registered", detail + assert "secret_ds" in detail.get("path", "") + assert "secret_tbl" in detail.get("path", "") + else: + # Fallback: detail must at least mention the unregistered path. + assert "bq_path_not_registered" in str(detail) or "secret_ds" in str(detail) + + +def test_registered_bq_path_admin_passes_rbac(seeded_app): + """When the path IS registered, an admin caller sails past the RBAC + check. The query may still fail downstream (e.g. cost guardrail or + actual BQ execution) — but NOT with `bq_path_not_registered` or + `bq_path_access_denied`.""" + _register_bq_remote_row("ue", "finance", "ue") + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": 'SELECT * FROM bq."finance"."ue"'}, + headers=_auth(token), + ) + # Either allowed (200/400/etc on downstream paths) but NOT 403 from RBAC. + if r.status_code == 403: + detail = r.json().get("detail", {}) + if isinstance(detail, dict): + assert detail.get("reason") not in ( + "bq_path_not_registered", + "bq_path_access_denied", + ), f"admin with registered path should not be RBAC-rejected: {detail}" + + +def test_unregistered_bq_path_case_insensitive_match(seeded_app): + """`bq."Finance"."UE"` resolves to the registered `(finance, ue)` row + via case-insensitive lookup — admin sails through, no 403.""" + _register_bq_remote_row("ue", "finance", "ue") + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": 'SELECT * FROM bq."Finance"."UE"'}, + headers=_auth(token), + ) + if r.status_code == 403: + detail = r.json().get("detail", {}) + if isinstance(detail, dict): + assert detail.get("reason") not in ( + "bq_path_not_registered", + "bq_path_access_denied", + ), f"case-insensitive lookup should match: {detail}" + + +def test_string_literal_matching_bq_path_rejected_403(seeded_app): + """Documented false-positive: `WHERE c = 'bq.unreg.tbl'` regex-matches + the literal. Strict-deny: 403 if the path isn't registered. Operator + can rephrase or register the path.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT count(*) FROM ue WHERE c = 'bq.unreg.tbl'"}, + headers=_auth(token), + ) + # The literal contains a `bq..` pattern that the regex + # matches; the RBAC patch must reject 403 (strict-deny on a security + # boundary) — pre-existing master-view check would only return 400 if + # `ue` weren't registered. Test: explicitly verify bq_path_not_registered. + assert r.status_code == 403, r.json() + detail = r.json().get("detail", {}) + if isinstance(detail, dict): + assert detail.get("reason") == "bq_path_not_registered", detail diff --git a/tests/test_query_bigquery_query_blocked.py b/tests/test_query_bigquery_query_blocked.py new file mode 100644 index 0000000..d8f4170 --- /dev/null +++ b/tests/test_query_bigquery_query_blocked.py @@ -0,0 +1,68 @@ +"""POST /api/query must reject direct `bigquery_query()` function calls. + +This is a pre-existing RBAC bypass: `bigquery_query('proj', 'SELECT * FROM +ds.tbl')` runs a BQ jobs API call against any reachable dataset, ignoring +the master-view forbidden-table check that gates registered names. Closes +that hole by adding `bigquery_query` to the SQL keyword blocklist. + +Internal wrap views (created by the BQ extractor) use bigquery_query() +inside their CREATE VIEW body — those run via DuckDB's view resolution at +query time, NOT via user-submitted SQL, so the blocklist doesn't break +them. Closes part of #160. +""" +from __future__ import annotations + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def test_bigquery_query_function_call_rejected(seeded_app): + """Plain `SELECT * FROM bigquery_query(...)` is blocked at the + keyword-blocklist layer with the canonical "Only single SELECT + queries are allowed" detail.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + sql = "SELECT * FROM bigquery_query('proj', 'SELECT 1 AS x')" + r = c.post( + "/api/query", + json={"sql": sql}, + headers=_auth(token), + ) + assert r.status_code == 400, f"expected 400; got {r.status_code} body={r.json()}" + detail = str(r.json().get("detail", "")) + # The canonical blocklist message proves this was rejected by the + # blocklist (not by some other path like master-view-forbidden). + assert "single SELECT" in detail, \ + f"expected canonical blocklist message; got detail={detail!r}" + + +def test_bigquery_query_mixed_case_rejected(seeded_app): + """Existing blocklist runs `sql.strip().lower()` first, so any case + variant is blocked uniformly.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT * FROM BigQuery_Query('proj', 'SELECT 1')"}, + headers=_auth(token), + ) + assert r.status_code == 400, r.json() + detail = str(r.json().get("detail", "")) + assert "single SELECT" in detail, \ + f"expected canonical blocklist message; got detail={detail!r}" + + +def test_bigquery_query_with_whitespace_before_paren_rejected(seeded_app): + """Substring match catches `bigquery_query (...)` with space too.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT * FROM bigquery_query ('proj', 'SELECT 1')"}, + headers=_auth(token), + ) + assert r.status_code == 400, r.json() + detail = str(r.json().get("detail", "")) + assert "single SELECT" in detail, \ + f"expected canonical blocklist message; got detail={detail!r}" diff --git a/tests/test_query_bq_regex.py b/tests/test_query_bq_regex.py new file mode 100644 index 0000000..219f345 --- /dev/null +++ b/tests/test_query_bq_regex.py @@ -0,0 +1,75 @@ +"""BQ_PATH regex used by /api/query to detect direct `bq.X.Y` references in +user SQL. The regex powers both the cost guardrail (matches contribute +dry-run targets) and the RBAC patch (matches must point at a registered row). + +Adversarial cases verified empirically before commit; see #160 spec §4.3.1. +""" +import pytest + + +@pytest.fixture +def regex(): + from app.api.query import BQ_PATH + return BQ_PATH + + +# --- positive: must match (3-part `bq..`) ------------- + +@pytest.mark.parametrize("sql,expected_groups", [ + # Fully quoted + ('SELECT * FROM bq."finance"."ue"', ('"finance"', '"ue"')), + # Unquoted + ('SELECT * FROM bq.finance.ue', ('finance', 'ue')), + # Mixed quoting + ('SELECT * FROM bq."finance".ue', ('"finance"', 'ue')), + ('SELECT * FROM bq.finance."ue"', ('finance', '"ue"')), + # Case-insensitive + ('select * from BQ.Finance.UE', ('Finance', 'UE')), + # With WHERE + ('SELECT a FROM bq.ds.tbl WHERE x=1', ('ds', 'tbl')), + # Inside CTE body + ('WITH x AS (SELECT * FROM bq.ds.tbl) SELECT * FROM x', ('ds', 'tbl')), + # Trailing punctuation + ('SELECT * FROM bq.ds.tbl;', ('ds', 'tbl')), +]) +def test_regex_matches_direct_bq_paths(regex, sql, expected_groups): + matches = list(regex.finditer(sql)) + assert len(matches) >= 1, f"expected at least one match in {sql!r}" + assert matches[0].groups() == expected_groups + + +def test_regex_finds_multiple_paths_in_one_statement(regex): + """Two-path query: SELECT FROM bq.a.b JOIN bq.c.d — must match both.""" + sql = "SELECT * FROM bq.ds.tbl, bq.ds2.tbl2" + matches = list(regex.finditer(sql)) + assert len(matches) == 2 + assert matches[0].groups() == ('ds', 'tbl') + assert matches[1].groups() == ('ds2', 'tbl2') + + +# --- negative: must NOT match ------------------------------------------------- + +@pytest.mark.parametrize("sql,reason", [ + ('SELECT * FROM unit_economics', 'bare registered name (no bq prefix)'), + ('SELECT * FROM "unit_economics"', 'quoted bare name'), + ('SELECT bq.col FROM tbl', '2-part bq.col is column qualifier, not catalog'), + ('SELECT count(*) FROM unit_economics', 'aggregate on bare name'), + ('SELECT * FROM other_bq.ds.tbl', 'prefix that contains bq'), + ('SELECT * FROM x.bq.ds.tbl', 'bq is middle component, not catalog'), +]) +def test_regex_rejects_non_bq_paths(regex, sql, reason): + matches = list(regex.finditer(sql)) + assert matches == [], \ + f"regex should not match {sql!r} ({reason}); matched {[m.group(0) for m in matches]}" + + +# --- accepted false-positives (documented in §4.3.1) ------------------------- + +def test_regex_matches_string_literal_containing_bq_path(regex): + """Known false-positive: `WHERE c = 'bq.foo.bar'` matches. Cost guardrail + runs a wasted dry-run (cheap), RBAC fires 403 if the path isn't + registered — strict-deny is the right choice on a security boundary.""" + sql = "SELECT * FROM x WHERE c = 'bq.foo.bar'" + matches = list(regex.finditer(sql)) + assert len(matches) == 1 + assert matches[0].groups() == ('foo', 'bar')