agnes-the-ai-analyst/tests/test_api_query_guardrail.py
ZdenekSrotyr 896c43c7a2 feat(query): #160 cost guardrail + bq.* RBAC + quota integration on /api/query
The headline implementation for issue #160. POST /api/query now gates
direct `bq."<dataset>"."<source_table>"` references behind the registry
and bounds the BQ scan cost behind a configurable cap. Wired through
the same singleton QuotaTracker as /api/v2/scan so daily-byte budgets
are shared across both BQ-touching paths.

Changes in app/api/query.py:

- Add module-level `BQ_PATH` regex matching the 16 syntax variants
  verified empirically (fully-quoted, unquoted, mixed quoting,
  case-insensitive, inside CTE bodies, multi-path, …).
- Add `bigquery_query` to the SQL keyword blocklist. Closes the
  pre-existing function-call backdoor where a user could run an
  arbitrary BQ jobs API call against any reachable dataset, bypassing
  the registry and RBAC. Wrap views internal to the BQ extractor still
  use bigquery_query() — but those run via DuckDB view resolution at
  query time, not via user-submitted SQL, so the blocklist doesn't
  break them.
- Add `_bq_guardrail_inputs` helper: walks user SQL twice — once for
  bare-name matches against accessible registered remote-BQ names
  (contributes to dry_run_set), once for direct `bq.X.Y` matches
  (gated against `find_by_bq_path` lookups, returns 403 with
  structured detail on miss or grant violation).
- Add `_enforce_remote_bq_quota_and_cap` helper: pre-flight
  `check_daily_budget` (over-cap → 429), then `with quota.acquire(...)`
  wraps a per-path BQ dry-run, sums bytes, raises 400
  `remote_scan_too_large` when total > cap.
- Cap default 5 GiB; configurable via `api.query.bq_max_scan_bytes`
  in /admin/server-config (next phase wires the UI).
- Post-flight `record_bytes` against the user's daily counter.
- Module-level imports of `_bq_dry_run_bytes`, `_build_quota_tracker`,
  `get_bq_access` so tests can monkeypatch via `app.api.query.<name>`.

Tests:
- All 23 RED tests from the previous commit now pass (regex matrix,
  blocklist with detail-string assertion, RBAC unregistered/admin-bypass,
  guardrail dry-run-called/over-cap-rejected, quota pre-flight 429).
- mock_dry_run fixture stubs both `_bq_dry_run_bytes` and `get_bq_access`
  so guardrail tests don't require a live BQ project.
- Quota test uses `admin1` (the seeded_app fixture's actual user id, not
  `admin`).

Smoke: 887 passed across query/bq/admin/extractor/registry/quota
domains. No regressions.
2026-05-04 10:31:35 +02:00

136 lines
4.6 KiB
Python

"""POST /api/query cost guardrail for query_mode='remote' BigQuery rows.
When user SQL references a registered remote-BQ name (or a direct
`bq."<ds>"."<tbl>"` path), run a BQ dry-run before execute. If the
estimated scan exceeds the configured cap, reject with 400 +
`remote_scan_too_large` so the operator pivots to `da fetch`.
Default cap: 5 GiB per request. Configurable via
`api.query.bq_max_scan_bytes` in /admin/server-config (#160 §4.4).
"""
from __future__ import annotations
import pytest
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None:
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
sys_conn = get_system_db()
try:
TableRegistryRepository(sys_conn).register(
id=f"bq.{bucket}.{source_table}",
name=name,
source_type="bigquery",
bucket=bucket,
source_table=source_table,
query_mode="remote",
)
finally:
sys_conn.close()
@pytest.fixture
def mock_dry_run(monkeypatch):
"""Replace `_bq_dry_run_bytes` with a controllable stub. Each test sets
`mock_dry_run["bytes"]` to control what /api/query sees. Also stubs
`get_bq_access` so the guardrail doesn't require a real BQ connection
in the test env."""
state = {"bytes": 0}
def fake_dry_run(*args, **kwargs):
return state["bytes"]
monkeypatch.setattr("app.api.query._bq_dry_run_bytes", fake_dry_run, raising=False)
# Stub get_bq_access so the guardrail's BqAccess construction doesn't
# fail with `not_configured` in tests that don't set up real BQ.
class _FakeProjects:
data = "test-data-prj"
billing = "test-billing-prj"
class _FakeBqAccess:
projects = _FakeProjects()
monkeypatch.setattr(
"app.api.query.get_bq_access",
lambda: _FakeBqAccess(),
raising=False,
)
return state
def test_query_under_cap_calls_dry_run(seeded_app, mock_dry_run, monkeypatch):
"""Dry-run is invoked when SQL references a registered remote BQ row.
Use a sentinel side-effect to confirm: the mock records call counts."""
_register_bq_remote_row("ue", "finance", "ue")
state = mock_dry_run
state["bytes"] = 1 * 1024 * 1024 # 1 MiB
state["call_count"] = 0
def counting_fake(*args, **kwargs):
state["call_count"] += 1
return state["bytes"]
monkeypatch.setattr("app.api.query._bq_dry_run_bytes", counting_fake, raising=False)
c = seeded_app["client"]
token = seeded_app["admin_token"]
c.post(
"/api/query",
json={"sql": "SELECT count(*) FROM ue"},
headers=_auth(token),
)
assert state["call_count"] >= 1, \
"guardrail must invoke _bq_dry_run_bytes when SQL references a registered remote BQ row"
def test_query_over_cap_rejected_400(seeded_app, mock_dry_run, monkeypatch):
"""Dry-run reports 10 GiB; default cap (5 GiB) is exceeded → 400 with
structured detail naming bytes + tables + suggestion."""
_register_bq_remote_row("ue", "finance", "ue")
mock_dry_run["bytes"] = 10 * 1024 * 1024 * 1024 # 10 GiB
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": "SELECT * FROM ue"},
headers=_auth(token),
)
assert r.status_code == 400, r.json()
detail = r.json().get("detail", {})
if isinstance(detail, dict):
assert detail.get("reason") == "remote_scan_too_large", detail
assert detail.get("scan_bytes") >= 10 * 1024 * 1024 * 1024
assert "da fetch" in detail.get("suggestion", "").lower() or \
"fetch" in detail.get("suggestion", "").lower()
assert "ue" in detail.get("tables", []) or \
any("ue" in t for t in detail.get("tables", []))
def test_no_bq_row_reference_skips_dry_run(seeded_app, monkeypatch):
"""A query that doesn't touch any registered BQ remote row must NOT
invoke `_bq_dry_run_bytes` — guardrail incurs zero new latency on
plain non-BQ queries."""
state = {"calls": 0}
def counting_fake(*args, **kwargs):
state["calls"] += 1
return 100 * 1024 * 1024 * 1024 # 100 GiB — irrelevant if not called
monkeypatch.setattr("app.api.query._bq_dry_run_bytes", counting_fake, raising=False)
c = seeded_app["client"]
token = seeded_app["admin_token"]
c.post(
"/api/query",
json={"sql": "SELECT 1 AS x"},
headers=_auth(token),
)
assert state["calls"] == 0, \
f"guardrail must skip dry-run on non-BQ queries; got {state['calls']} calls"