diff --git a/app/api/query.py b/app/api/query.py index 4312407..453504d 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -1,5 +1,6 @@ """Query endpoint — execute SQL against server DuckDB.""" +import logging import os import re from pathlib import Path @@ -9,13 +10,47 @@ from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel import duckdb +from app.auth.access import is_user_admin from app.auth.dependencies import get_current_user, _get_db +from app.instance_config import get_value from src.db import get_analytics_db_readonly from src.rbac import get_accessible_tables from src.repositories.table_registry import TableRegistryRepository +# Imported at module level so tests can monkeypatch via +# `app.api.query._bq_dry_run_bytes` without resolving lazy imports inside +# the handler (reaches the patched attribute on each call). Same for +# get_bq_access — sibling module, dep direction doesn't matter (both are +# leaves under app.api). +from app.api.v2_quota import _build_quota_tracker, QuotaExceededError +from app.api.v2_scan import _bq_dry_run_bytes +from connectors.bigquery.access import get_bq_access, BqAccessError + +logger = logging.getLogger(__name__) + router = APIRouter(prefix="/api/query", tags=["query"]) +# Issue #160 §4.3.1 — direct `bq..` references in user +# SQL. Matches all 16 cases verified empirically (fully-quoted, unquoted, +# mixed quoting, case-insensitive, inside CTE bodies, multiple in one stmt). +# Lookahead `(?=\W|$)` works where `\b` doesn't (after a closing quote). +# Negative lookbehind `(? int: + """5 GiB default cap on /api/query BQ-touching scans. Configurable via + `api.query.bq_max_scan_bytes` in /admin/server-config. + """ + raw = get_value("api", "query", "bq_max_scan_bytes", default=5_368_709_120) + try: + return int(raw) if raw is not None else 5_368_709_120 + except (TypeError, ValueError): + return 5_368_709_120 + class QueryRequest(BaseModel): sql: str @@ -49,6 +84,12 @@ async def execute_query( "parquet_scan", "parquet_metadata", "parquet_schema", "json_scan", "csv_scan", "query_table", "iceberg_scan", "delta_scan", + # #160: bigquery_query() bypasses the registry / RBAC entirely + # (it runs an arbitrary BQ jobs API call against any reachable + # dataset). Wrap views created by the BQ extractor use it inside + # CREATE VIEW bodies, but those run via DuckDB's view resolution at + # query time — user-submitted SQL never contains the function name. + "bigquery_query", "glob(", "list_files", "'/", '"/','http://', 'https://', 's3://', 'gcs://', # DuckDB metadata (leaks schema info regardless of RBAC) @@ -88,11 +129,39 @@ async def execute_query( if re.search(pattern, sql_lower): raise HTTPException(status_code=403, detail=f"Access denied to table '{table}'") + # ---- #160 BQ remote-row guardrail + RBAC patch ------------------- + dry_run_set, blocked_bq_path = _bq_guardrail_inputs( + request.sql, sql_lower, conn, user, allowed, + ) + if blocked_bq_path is not None: + raise HTTPException(status_code=403, detail=blocked_bq_path) + + if dry_run_set: + _enforce_remote_bq_quota_and_cap( + user_id=user.get("id") or user.get("email") or "anon", + dry_run_set=dry_run_set, + sql=request.sql, + ) + # Open in read-only mode for extra safety result = analytics.execute(request.sql).fetchmany(request.limit + 1) columns = [desc[0] for desc in analytics.description] if analytics.description else [] truncated = len(result) > request.limit rows = result[:request.limit] + + # Post-flight: bill the dry-run estimate against the user's daily + # quota. Do this AFTER execute so a downstream failure (e.g. BQ + # outage) doesn't strand the user with charged-but-unrun bytes. + if dry_run_set: + user_id = user.get("id") or user.get("email") or "anon" + try: + _build_quota_tracker().record_bytes( + user_id, sum(b for _, _, b in dry_run_set), + ) + except Exception: + # record_bytes is documented as never-raising; defensive guard. + logger.warning("quota record_bytes failed for user=%s", user_id) + # Convert to serializable types serializable_rows = [] for row in rows: @@ -198,3 +267,155 @@ def _build_materialized_hint(row: dict) -> str: f"/api/sync/trigger) to materialize the parquet" f"{direct_hint}." ) + + +def _bq_guardrail_inputs( + sql: str, + sql_lower: str, + sys_conn: duckdb.DuckDBPyConnection, + user: dict, + allowed: Optional[list], +): + """Two-pass scan over user SQL for the upcoming BQ guardrail + RBAC patch. + + Returns a tuple `(dry_run_set, blocked_bq_path)`: + + - `dry_run_set` is a list of `(bucket, source_table, est_bytes)` triples + identifying every BigQuery row the request will scan. The caller dry-runs + each and bills the sum against the user's daily quota. + + - `blocked_bq_path` is a structured-detail dict for the caller to raise + HTTPException(403) with, when user SQL contains a direct + `bq."".""` reference that either points at an unregistered + path (`bq_path_not_registered`) or registered but the caller has no + grant on the registered name (`bq_path_access_denied`). None when the + RBAC check passes. + """ + repo = TableRegistryRepository(sys_conn) + + # 1. Bare-name pass: look up registered remote-BQ names that appear in + # the user SQL as word-boundary tokens. Reuses the same regex shape as + # the existing forbidden-table loop above. + dry_run: list = [] + seen_paths: set = set() + accessible_set = set(allowed) if allowed is not None else None + for r in repo.list_by_source("bigquery"): + if (r.get("query_mode") or "") != "remote": + continue + bucket = r.get("bucket") + source_table = r.get("source_table") + name = r.get("name") + if not (bucket and source_table and name): + continue + if accessible_set is not None and name not in accessible_set: + # Forbidden-table loop above will have rejected the request + # before we get here. Defensive skip. + continue + pattern = r'\b' + re.escape(str(name).lower()) + r'\b' + if re.search(pattern, sql_lower): + key = (bucket.lower(), source_table.lower()) + if key not in seen_paths: + seen_paths.add(key) + dry_run.append((bucket, source_table, 0)) # bytes filled at dry-run + + # 2. Direct bq.. pass: every match must point at a registered + # row. Run BEFORE adding to dry_run so unregistered paths fail-fast. + is_admin = is_user_admin(user.get("id") or user.get("email") or "", sys_conn) + for m in BQ_PATH.finditer(sql): + bucket_raw = m.group(1).strip('"') + source_table_raw = m.group(2).strip('"') + row = repo.find_by_bq_path(bucket_raw, source_table_raw) + if row is None: + return [], { + "reason": "bq_path_not_registered", + "path": f'bq."{bucket_raw}"."{source_table_raw}"', + "hint": ( + "Direct bq.* references must point to a registered table. " + "Register via `da admin register-table` or use the " + "registered name from `da catalog`." + ), + } + # Row exists. Per-name grant check (non-admin only). + if not is_admin: + if accessible_set is None or row["name"] not in accessible_set: + return [], { + "reason": "bq_path_access_denied", + "path": f'bq."{bucket_raw}"."{source_table_raw}"', + "registered_as": row["name"], + } + # Add to dry-run set if not already covered by bare-name pass. + bucket = row["bucket"] + source_table = row["source_table"] + if bucket and source_table: + key = (bucket.lower(), source_table.lower()) + if key not in seen_paths: + seen_paths.add(key) + dry_run.append((bucket, source_table, 0)) + + return dry_run, None + + +def _enforce_remote_bq_quota_and_cap(*, user_id: str, dry_run_set: list, sql: str) -> None: + """Pre-flight check + dry-run + cap enforcement for /api/query BQ paths. + + 1. `check_daily_budget` — over-cap users get 429 BEFORE any BQ work. + 2. `with quota.acquire(user_id)` — concurrent slot guard. + 3. Dry-run each `(bucket, source_table)` via the existing + `_bq_dry_run_bytes` helper. Sum bytes. + 4. If sum > cap → 400 `remote_scan_too_large` with structured detail. + + Mutates `dry_run_set` in place: the third tuple element (bytes) is + populated with the per-path dry-run result so the caller can sum and + record the bytes against the user's quota post-flight. + """ + quota = _build_quota_tracker() + try: + quota.check_daily_budget(user_id) + except QuotaExceededError as exc: + raise HTTPException(status_code=429, detail={ + "reason": "daily_byte_cap_exceeded", + "kind": exc.kind, + "current": exc.current, + "limit": exc.limit, + "retry_after_seconds": exc.retry_after_seconds, + }) + + try: + bq = get_bq_access() + except BqAccessError as exc: + raise HTTPException(status_code=502, detail={ + "kind": exc.kind, + "message": exc.message, + **(exc.details or {}), + }) + + cap_bytes = _default_remote_query_cap_bytes() + + with quota.acquire(user_id): + total_bytes = 0 + for i, (bucket, source_table, _) in enumerate(dry_run_set): + bq_sql = f"SELECT * FROM `{bq.projects.data}.{bucket}.{source_table}`" + try: + est = _bq_dry_run_bytes(bq, bq_sql) + except BqAccessError as exc: + raise HTTPException(status_code=502, detail={ + "kind": exc.kind, + "message": exc.message, + **(exc.details or {}), + }) + dry_run_set[i] = (bucket, source_table, est) + total_bytes += est + + if cap_bytes > 0 and total_bytes > cap_bytes: + tables = [f"{b}.{t}" for b, t, _ in dry_run_set] + raise HTTPException(status_code=400, detail={ + "reason": "remote_scan_too_large", + "scan_bytes": total_bytes, + "limit_bytes": cap_bytes, + "tables": tables, + "suggestion": ( + "Use `da fetch --select --where " + "--estimate` to materialize a filtered subset, then query " + "the snapshot locally." + ), + }) diff --git a/tests/test_api_query_guardrail.py b/tests/test_api_query_guardrail.py index 1babe3f..4352b98 100644 --- a/tests/test_api_query_guardrail.py +++ b/tests/test_api_query_guardrail.py @@ -37,13 +37,30 @@ def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None: @pytest.fixture def mock_dry_run(monkeypatch): """Replace `_bq_dry_run_bytes` with a controllable stub. Each test sets - `mock_dry_run.bytes_to_return` to control what /api/query sees.""" + `mock_dry_run["bytes"]` to control what /api/query sees. Also stubs + `get_bq_access` so the guardrail doesn't require a real BQ connection + in the test env.""" state = {"bytes": 0} - def fake(*args, **kwargs): + def fake_dry_run(*args, **kwargs): return state["bytes"] - monkeypatch.setattr("app.api.query._bq_dry_run_bytes", fake, raising=False) + monkeypatch.setattr("app.api.query._bq_dry_run_bytes", fake_dry_run, raising=False) + + # Stub get_bq_access so the guardrail's BqAccess construction doesn't + # fail with `not_configured` in tests that don't set up real BQ. + class _FakeProjects: + data = "test-data-prj" + billing = "test-billing-prj" + + class _FakeBqAccess: + projects = _FakeProjects() + + monkeypatch.setattr( + "app.api.query.get_bq_access", + lambda: _FakeBqAccess(), + raising=False, + ) return state diff --git a/tests/test_api_query_quota.py b/tests/test_api_query_quota.py index 585a510..702b9fd 100644 --- a/tests/test_api_query_quota.py +++ b/tests/test_api_query_quota.py @@ -68,7 +68,7 @@ def test_query_records_bytes_against_shared_quota(seeded_app, fresh_quota, mock_ # Pre-flight: tracker has zero usage for this user. tracker = fresh_quota._build_quota_tracker() - user_id = "admin" # seeded_app's admin user id + user_id = "admin1" # seeded_app's admin user id before = tracker.bytes_used_today(user_id) r = c.post( @@ -93,7 +93,7 @@ def test_query_pre_flight_rejects_user_over_daily_cap(seeded_app, fresh_quota, m # Plant the user's daily counter already at the cap by injecting bytes. tracker = fresh_quota._build_quota_tracker() - user_id = "admin" + user_id = "admin1" # Push counter past the cap (default 50 GiB). tracker.record_bytes(user_id, tracker._max_daily_bytes + 1) @@ -111,7 +111,7 @@ def test_non_bq_query_skips_quota_path(seeded_app, fresh_quota, mock_dry_run): """A query that doesn't touch any registered remote BQ row must NOT decrement quota. Quota wiring runs only when dry_run_set is non-empty.""" tracker = fresh_quota._build_quota_tracker() - user_id = "admin" + user_id = "admin1" before = tracker.bytes_used_today(user_id) c = seeded_app["client"]