agnes-the-ai-analyst/tests/test_api_query_rbac_bq_path.py
ZdenekSrotyr 77cdb65f76 sec(query): #160 BQ_PATH catches quoted "bq" catalog token (Phase 3 review)
Phase 3 review identified an RBAC + cost-cap bypass: `SELECT * FROM
"bq"."ds"."tbl"` (catalog token quoted as a DuckDB identifier) was NOT
matched by the BQ_PATH regex, so direct quoted-form references skipped
both the registry check and the cost-cap dry-run. DuckDB resolves
`"bq"` to the same ATTACHed BQ catalog, so the bypass is real.

Widen the catalog-token alternation: `(?:"bq"|bq)` matches both forms.
Negative lookbehind `(?<![\w.])` still rejects look-alike prefixes
(`other_bq`, `my_bq`); the new "my_bq".ds.tbl negative test locks that
in alongside `other_bq.ds.tbl`.

Tests:
- 2 new positive cases in tests/test_query_bq_regex.py for the quoted
  form (`"bq"."finance"."ue"` and uppercase `"BQ"."ds"."tbl"`).
- 1 new negative case rejecting `"my_bq".ds.tbl` so the quoted-form
  widening doesn't open a different evasion.
- 1 new RBAC test in tests/test_api_query_rbac_bq_path.py: admin
  hitting an unregistered quoted path returns the same
  bq_path_not_registered 403 as the unquoted form.

All 33 Phase 3 tests pass after the fix.
2026-05-04 10:31:35 +02:00

141 lines
5.7 KiB
Python

"""POST /api/query gates direct `bq."<dataset>"."<source_table>"` references.
Pre-existing RBAC hole: the forbidden-table check at `app/api/query.py`
only blocks names matching an existing master view. Direct `bq.*` syntax
doesn't match any master view → bypasses the check entirely. Closed in
#160 via the BQ_PATH regex + find_by_bq_path lookup: every `bq.*` in user
SQL must point at a registered query_mode='remote' BigQuery row.
Tests cover: unregistered path → 403; registered + caller has grant →
allowed (request reaches BQ — we only check that RBAC didn't block it
before BQ runs); admin → bypasses per-name grant but still requires
registration.
"""
from __future__ import annotations
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None:
"""Insert a query_mode='remote' BQ row directly via the system DB."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
sys_conn = get_system_db()
try:
TableRegistryRepository(sys_conn).register(
id=f"bq.{bucket}.{source_table}",
name=name,
source_type="bigquery",
bucket=bucket,
source_table=source_table,
query_mode="remote",
)
finally:
sys_conn.close()
def test_quoted_bq_catalog_token_rejected_403(seeded_app):
"""Phase 3 review evasion: `SELECT * FROM "bq"."ds"."tbl"` (catalog
token quoted) must be caught by the same RBAC check as the unquoted
form. DuckDB resolves `"bq"` to the same ATTACHed BQ catalog, so the
quoted variant is a real bypass we have to close."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": 'SELECT * FROM "bq"."secret_ds"."secret_tbl"'},
headers=_auth(token),
)
assert r.status_code == 403, r.json()
detail = r.json().get("detail", {})
if isinstance(detail, dict):
assert detail.get("reason") == "bq_path_not_registered", detail
def test_unregistered_bq_path_rejected_403(seeded_app):
"""Direct reference to a `bq.<ds>.<tbl>` that no registry row points at:
403 with `bq_path_not_registered`. Caller has admin token (no per-name
RBAC); the registration check still fires because admin must register
first to query a new dataset."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": 'SELECT * FROM bq."secret_ds"."secret_tbl"'},
headers=_auth(token),
)
assert r.status_code == 403, r.json()
detail = r.json().get("detail", {})
if isinstance(detail, dict):
assert detail.get("reason") == "bq_path_not_registered", detail
assert "secret_ds" in detail.get("path", "")
assert "secret_tbl" in detail.get("path", "")
else:
# Fallback: detail must at least mention the unregistered path.
assert "bq_path_not_registered" in str(detail) or "secret_ds" in str(detail)
def test_registered_bq_path_admin_passes_rbac(seeded_app):
"""When the path IS registered, an admin caller sails past the RBAC
check. The query may still fail downstream (e.g. cost guardrail or
actual BQ execution) — but NOT with `bq_path_not_registered` or
`bq_path_access_denied`."""
_register_bq_remote_row("ue", "finance", "ue")
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": 'SELECT * FROM bq."finance"."ue"'},
headers=_auth(token),
)
# Either allowed (200/400/etc on downstream paths) but NOT 403 from RBAC.
if r.status_code == 403:
detail = r.json().get("detail", {})
if isinstance(detail, dict):
assert detail.get("reason") not in (
"bq_path_not_registered",
"bq_path_access_denied",
), f"admin with registered path should not be RBAC-rejected: {detail}"
def test_unregistered_bq_path_case_insensitive_match(seeded_app):
"""`bq."Finance"."UE"` resolves to the registered `(finance, ue)` row
via case-insensitive lookup — admin sails through, no 403."""
_register_bq_remote_row("ue", "finance", "ue")
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": 'SELECT * FROM bq."Finance"."UE"'},
headers=_auth(token),
)
if r.status_code == 403:
detail = r.json().get("detail", {})
if isinstance(detail, dict):
assert detail.get("reason") not in (
"bq_path_not_registered",
"bq_path_access_denied",
), f"case-insensitive lookup should match: {detail}"
def test_string_literal_matching_bq_path_rejected_403(seeded_app):
"""Documented false-positive: `WHERE c = 'bq.unreg.tbl'` regex-matches
the literal. Strict-deny: 403 if the path isn't registered. Operator
can rephrase or register the path."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": "SELECT count(*) FROM ue WHERE c = 'bq.unreg.tbl'"},
headers=_auth(token),
)
# The literal contains a `bq.<unreg>.<tbl>` pattern that the regex
# matches; the RBAC patch must reject 403 (strict-deny on a security
# boundary) — pre-existing master-view check would only return 400 if
# `ue` weren't registered. Test: explicitly verify bq_path_not_registered.
assert r.status_code == 403, r.json()
detail = r.json().get("detail", {})
if isinstance(detail, dict):
assert detail.get("reason") == "bq_path_not_registered", detail