agnes-the-ai-analyst/app/api/query.py
ZdenekSrotyr 500db8cd3c fix(query-guardrail): dry-run user SQL not synthetic SELECT * (#171)
Closes #171. The /api/query cost guardrail used to dry-run a synthetic
`SELECT * FROM <table>` for each registered remote-BQ row referenced
by the user SQL — which made BigQuery estimate a full table scan, with
column projection, predicate pushdown, and partition pruning all
disabled. Narrow queries on big partitioned/clustered tables (the
documented happy path for `agnes query --remote`) hit ~30,000×
over-estimates and got rejected with 400 `remote_scan_too_large` even
when BQ's own dry-run reported single-digit MB.

Pavel's report on #171 traced the root cause and proposed the fix:
rewrite the user SQL to BQ-native syntax and dry-run it as a single
job, exactly the way `bq query --dry_run` works.

Implementation:
- New helper _rewrite_user_sql_for_bq_dry_run rewrites bare registered
  names (word-boundary, case-insensitive, longest-first to avoid prefix
  collisions) + bq."<ds>"."<tbl>" forms to backticked
  `<project>.<ds>.<tbl>` paths.
- _bq_quota_and_cap_guard runs ONE dry-run on the rewritten SQL. Cap
  check uses the real estimate.
- Fallback path: if BQ rejects with bq_bad_request (e.g. DuckDB-only
  syntax like ::INT casts), the guard falls back to the pre-fix
  per-table SELECT * approach so non-portable queries still get a
  (loose) cap estimate instead of fail-opening. Non-parse BQ errors
  (forbidden, upstream) still propagate as 502.
- _bq_guardrail_inputs now also returns name_lookups so the rewriter
  has the (registered_name, bucket, source_table) mapping it needs.
- Per-table breakdown is unavailable from a composite dry-run; total
  bytes are pinned to dry_run_set[0] for the post-flight
  record_bytes(sum(...)) call to keep returning the right total.

Tests (7 new, 3 existing still pass):
- dry-run receives rewritten user SQL with WHERE clause intact (the
  load-bearing assertion for #171)
- single dry-run per request even with multiple registered tables
  (JOIN, UNION) referenced
- fallback to per-table SELECT * on bq_bad_request
- non-parse BQ errors (forbidden) still 502
- rewriter unit tests: bare + bq.path in same SQL, longest-name-wins
  on prefix collision, case-insensitive bare-name match
2026-05-04 21:08:21 +02:00

650 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Query endpoint — execute SQL against server DuckDB."""
import contextlib
import logging
import os
import re
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import duckdb
from app.auth.access import is_user_admin
from app.auth.dependencies import get_current_user, _get_db
from app.instance_config import get_value
from src.db import get_analytics_db_readonly
from src.rbac import get_accessible_tables
from src.repositories.table_registry import TableRegistryRepository
# Imported at module level so tests can monkeypatch via
# `app.api.query._bq_dry_run_bytes` without resolving lazy imports inside
# the handler (reaches the patched attribute on each call). Same for
# get_bq_access — sibling module, dep direction doesn't matter (both are
# leaves under app.api).
from app.api.v2_quota import _build_quota_tracker, QuotaExceededError
from app.api.v2_scan import _bq_dry_run_bytes
from connectors.bigquery.access import get_bq_access, BqAccessError
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/query", tags=["query"])
# Issue #160 §4.3.1 — direct `bq.<dataset>.<source_table>` references in user
# SQL. Catalog token accepts both `bq` (the unquoted DuckDB-style name) and
# `"bq"` (quoted identifier). DuckDB resolves both to the same ATTACHed
# catalog, so the security-boundary regex must accept both — Phase 3 review
# caught the quoted variant as an RBAC + cost-cap bypass.
# Lookahead `(?=\W|$)` works where `\b` doesn't (after a closing quote).
# Negative lookbehind `(?<![\w.])` rejects `other_bq.x.y`, `my_bq.ds.tbl`,
# and `x.bq.y.z` so the regex doesn't fire on column qualifiers or
# look-alike-prefixed identifiers.
BQ_PATH = re.compile(
r'(?<![\w.])(?:"bq"|bq)\s*\.\s*("[^"]+"|\w+)\s*\.\s*("[^"]+"|\w+)(?=\W|$)',
re.IGNORECASE,
)
def _default_remote_query_cap_bytes() -> int:
"""5 GiB default cap on /api/query BQ-touching scans. Configurable via
`data_source.bigquery.bq_max_scan_bytes` in /admin/server-config —
sits next to `max_bytes_per_materialize` for visual symmetry.
"""
raw = get_value("data_source", "bigquery", "bq_max_scan_bytes", default=5_368_709_120)
try:
return int(raw) if raw is not None else 5_368_709_120
except (TypeError, ValueError):
return 5_368_709_120
class QueryRequest(BaseModel):
sql: str
limit: int = 1000
class QueryResponse(BaseModel):
columns: list
rows: list
row_count: int
truncated: bool = False
@router.post("", response_model=QueryResponse)
async def execute_query(
request: QueryRequest,
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Execute SQL against the server analytics DuckDB."""
sql_lower = request.sql.strip().lower()
# Block everything except SELECT
blocked = [
"drop ", "delete ", "insert ", "update ", "alter ", "create ",
"copy ", "attach ", "detach ", "load ", "install ",
"export ", "import ", "pragma ", "call ",
# File access functions
"read_csv", "read_json", "read_parquet", "read_text",
"write_csv", "write_parquet", "read_blob", "read_ndjson",
"parquet_scan", "parquet_metadata", "parquet_schema",
"json_scan", "csv_scan",
"query_table", "iceberg_scan", "delta_scan",
# #160: bigquery_query() bypasses the registry / RBAC entirely
# (it runs an arbitrary BQ jobs API call against any reachable
# dataset). Wrap views created by the BQ extractor use it inside
# CREATE VIEW bodies, but those run via DuckDB's view resolution at
# query time — user-submitted SQL never contains the function name.
"bigquery_query",
"glob(", "list_files",
"'/", '"/','http://', 'https://', 's3://', 'gcs://',
# DuckDB metadata (leaks schema info regardless of RBAC)
"information_schema", "duckdb_tables", "duckdb_columns",
"duckdb_databases", "duckdb_settings", "duckdb_functions",
"duckdb_views", "duckdb_indexes", "duckdb_schemas",
"pragma_table_info", "pragma_storage_info",
# Relative path traversal
"'../", '"../',
# Multiple statements
";",
]
if any(keyword in sql_lower for keyword in blocked):
raise HTTPException(status_code=400, detail="Only single SELECT queries are allowed")
# Accept any whitespace (newline, tab, space) after the keyword so
# multi-line SQL doesn't 400 on `SELECT\n col, ...`.
import re as _re
if not _re.match(r"^(select|with)\s", sql_lower):
raise HTTPException(status_code=400, detail="Query must start with SELECT or WITH")
# Get allowed tables for this user
allowed = get_accessible_tables(user, conn)
analytics = get_analytics_db_readonly()
try:
if allowed is not None: # None = admin, sees all
# Get all views in analytics DB
all_views = {row[0] for row in analytics.execute(
"SELECT table_name FROM information_schema.tables WHERE table_type='VIEW'"
).fetchall()}
# `allowed` carries registry IDs (resource_grants.resource_id);
# DuckDB master views are named by registry display `name`.
# Build a name->id map so the forbidden check compares apples to
# apples — when id != name, the prior `all_views - set(allowed)`
# over-denied authorized users (Devin Review iter #5 on PR #168;
# pre-existing class of name/id mismatch flagged across this
# PR's BQ guardrail too).
allowed_ids = set(allowed)
registry_rows = TableRegistryRepository(conn).list_all()
allowed_view_names = {
r["name"] for r in registry_rows
if r.get("name") and r.get("id") in allowed_ids
}
# Check if query references any forbidden tables (word-boundary match)
forbidden = all_views - allowed_view_names
for table in forbidden:
pattern = r'\b' + re.escape(table.lower()) + r'\b'
if re.search(pattern, sql_lower):
raise HTTPException(status_code=403, detail=f"Access denied to table '{table}'")
# ---- #160 BQ remote-row guardrail + RBAC patch -------------------
dry_run_set, name_lookups, blocked_bq_path = _bq_guardrail_inputs(
request.sql, sql_lower, conn, user, allowed,
)
if blocked_bq_path is not None:
raise HTTPException(status_code=403, detail=blocked_bq_path)
# Issue #160 §4.3.3 — concurrent-slot guard MUST wrap the actual
# `analytics.execute(request.sql)` call (which is what triggers the
# BQ scan when DuckDB resolves the master view), not just the
# dry-run. Devin Review on PR #168 caught this — earlier
# implementation released the slot before execute. Use a context
# manager so dry-run + cap check + execute + record_bytes all run
# inside the slot.
# Match /api/v2/scan's user_id key shape (`email or "anon"`) so the
# shared QuotaTracker singleton sees the SAME key for both endpoints.
# Earlier `id or email` ordering keyed BQ bytes on UUID for /api/query
# vs email for /api/v2/scan — the per-user daily cap was effectively
# doubled because the two paths tracked under different keys.
# Devin Review #2 caught this on PR #168.
user_id = user.get("email") or user.get("id") or "anon"
guard = (
_bq_quota_and_cap_guard(
user_id=user_id,
dry_run_set=dry_run_set,
name_lookups=name_lookups,
sql=request.sql,
)
if dry_run_set
else contextlib.nullcontext()
)
with guard:
# Open in read-only mode for extra safety
result = analytics.execute(request.sql).fetchmany(request.limit + 1)
columns = [desc[0] for desc in analytics.description] if analytics.description else []
truncated = len(result) > request.limit
rows = result[:request.limit]
# Post-flight: bill the dry-run estimate against the user's daily
# quota. Do this AFTER execute so a downstream failure (e.g. BQ
# outage) doesn't strand the user with charged-but-unrun bytes.
# Stays inside the `with quota.acquire(...)` block so the slot
# release happens after record_bytes completes.
if dry_run_set:
try:
_build_quota_tracker().record_bytes(
user_id, sum(b for _, _, b in dry_run_set),
)
except Exception:
# record_bytes is documented as never-raising; defensive guard.
logger.warning("quota record_bytes failed for user=%s", user_id)
# Convert to serializable types
serializable_rows = []
for row in rows:
serializable_rows.append([
str(v) if v is not None and not isinstance(v, (int, float, bool, str)) else v
for v in row
])
return QueryResponse(
columns=columns,
rows=serializable_rows,
row_count=len(serializable_rows),
truncated=truncated,
)
except HTTPException:
raise
except Exception as e:
# If DuckDB raised "Table … does not exist" for a referenced name,
# check whether that name belongs to a registry row in
# `query_mode='materialized'` that hasn't yet been materialized in
# this instance's analytics.duckdb. Materialized rows produce a
# parquet at `${DATA_DIR}/extracts/<source>/data/<id>.parquet` but
# the orchestrator is `_meta`-driven and only creates master views
# for connectors that emit `_meta` rows — so on a fresh instance
# (or before the first scheduler tick) the master view doesn't
# exist yet and the operator gets a confusing "table does not
# exist" with no path forward. Surface a materialize-aware hint
# instead of DuckDB's bare error.
msg = str(e)
helpful = _materialized_hint_for_query_error(conn, request.sql, msg)
if helpful:
raise HTTPException(status_code=400, detail=helpful)
raise HTTPException(status_code=400, detail=f"Query error: {msg}")
finally:
analytics.close()
def _materialized_hint_for_query_error(
conn: duckdb.DuckDBPyConnection, sql: str, error_msg: str,
) -> Optional[str]:
"""Return a materialize-aware error message if the failed query
references a registry row whose `query_mode='materialized'` and which
has no master view in analytics.duckdb yet, OR ``None`` to fall back
to DuckDB's raw error.
The detection scans each materialized row's id/name against the SQL
text; a hit means the operator picked a name that exists in the
registry but isn't queryable in this instance. The hint is the same
in both arms of the OR — it tells them what the table needs and what
they can do today (`agnes pull` or query `bq."dataset"."table"`
directly using the bucket/source_table from the registry row).
"""
# Cheap fast-path — only inspect the registry when DuckDB's error
# actually mentions a missing table. Avoids registry round-trip on
# every parse/cast/permission failure.
el = error_msg.lower()
if "does not exist" not in el and "table with name" not in el:
return None
try:
repo = TableRegistryRepository(conn)
rows = repo.list_all()
except Exception:
# Registry read failed for whatever reason — don't compound the
# error response by hiding the original DuckDB message.
return None
sql_l = sql.lower()
for r in rows:
if (r.get("query_mode") or "") != "materialized":
continue
# Match by id or by name; either could appear in the SQL.
candidates = {r.get("id"), r.get("name")}
for cand in candidates:
if not cand:
continue
cand_l = str(cand).lower()
# Word-boundary-ish check — `\b` doesn't match `.` so
# `bq.dataset.cand` would still hit, which is fine for the
# hint path (the operator is referring to the same table).
if re.search(r"\b" + re.escape(cand_l) + r"\b", sql_l):
return _build_materialized_hint(r)
return None
def _build_materialized_hint(row: dict) -> str:
"""Format the user-facing hint for a materialized row that's not yet
queryable. Includes the table id, the bucket/source_table when the
row carries them, and concrete operator next steps."""
tid = row.get("id") or row.get("name") or "<unknown>"
bucket = row.get("bucket")
source_table = row.get("source_table")
direct_hint = ""
if bucket and source_table:
# BigQuery: `bq."dataset"."table"`; Keboola: `kbc."bucket"."table"`.
# Pick the alias by source_type so the hint is copy-pasteable.
alias = "bq" if (row.get("source_type") or "") == "bigquery" else "kbc"
direct_hint = (
f' or query the source directly via {alias}."{bucket}".'
f'"{source_table}"'
)
return (
f"Table {tid!r} is registered as query_mode='materialized' but is "
f"not yet materialized in this instance's analytics views. Run "
f"`agnes pull` (or wait for the scheduler tick / hit POST "
f"/api/sync/trigger) to materialize the parquet"
f"{direct_hint}."
)
def _bq_guardrail_inputs(
sql: str,
sql_lower: str,
sys_conn: duckdb.DuckDBPyConnection,
user: dict,
allowed: Optional[list],
):
"""Two-pass scan over user SQL for the upcoming BQ guardrail + RBAC patch.
Returns a tuple `(dry_run_set, name_lookups, blocked_bq_path)`:
- `dry_run_set` is a list of `(bucket, source_table, est_bytes)` triples
identifying every BigQuery row the request will scan. The caller dry-runs
the rewritten user SQL once and distributes the total here for quota
bookkeeping.
- `name_lookups` is a list of `(registered_name, bucket, source_table)`
triples — only the bare-name matches from pass 1, NOT the direct
`bq."<ds>"."<tbl>"` matches. Issue #171 fix: the cap-guard rewrites
these name → ``\\`<project>.<bucket>.<source_table>\\``` when building
the BQ-native SQL for dry-run, so partition pruning + column projection
+ predicate pushdown all engage.
- `blocked_bq_path` is a structured-detail dict for the caller to raise
HTTPException(403) with, when user SQL contains a direct
`bq."<ds>"."<tbl>"` reference that either points at an unregistered
path (`bq_path_not_registered`) or registered but the caller has no
grant on the registered name (`bq_path_access_denied`). None when the
RBAC check passes.
"""
repo = TableRegistryRepository(sys_conn)
# 1. Bare-name pass: look up registered remote-BQ names that appear in
# the user SQL as word-boundary tokens. Reuses the same regex shape as
# the existing forbidden-table loop above.
#
# `accessible_set` comes from `get_accessible_tables()` which returns
# `resource_grants.resource_id` values — i.e. table registry IDs, NOT
# display names. Devin Review iter #3 caught the mismatch: when
# `id != name` (e.g. id="bq.finance.ue", name="ue"), legitimate
# accessible rows were skipped, under-counting dry-run bytes for the
# cost cap. The user SQL still references the display `name` (that's
# what shows in `agnes catalog`), so the regex match below uses `name`,
# but the access gate uses `id`.
dry_run: list = []
name_lookups: list = []
seen_paths: set = set()
accessible_set = set(allowed) if allowed is not None else None
for r in repo.list_by_source("bigquery"):
if (r.get("query_mode") or "") != "remote":
continue
bucket = r.get("bucket")
source_table = r.get("source_table")
name = r.get("name")
row_id = r.get("id")
if not (bucket and source_table and name and row_id):
continue
if accessible_set is not None and row_id not in accessible_set:
# Forbidden-table loop above will have rejected the request
# before we get here. Defensive skip.
continue
pattern = r'\b' + re.escape(str(name).lower()) + r'\b'
if re.search(pattern, sql_lower):
key = (bucket.lower(), source_table.lower())
if key not in seen_paths:
seen_paths.add(key)
dry_run.append((bucket, source_table, 0)) # bytes filled at dry-run
# Record the (name, bucket, source_table) mapping separately so the
# cap-guard's SQL rewriter can find every occurrence — even if the
# user references the same physical table under two registered
# names (rare but possible: aliased catalog rows).
name_lookups.append((str(name), bucket, source_table))
# 2. Direct bq.<ds>.<tbl> pass: every match must point at a registered
# row. Run BEFORE adding to dry_run so unregistered paths fail-fast.
is_admin = is_user_admin(user.get("id") or user.get("email") or "", sys_conn)
for m in BQ_PATH.finditer(sql):
bucket_raw = m.group(1).strip('"')
source_table_raw = m.group(2).strip('"')
row = repo.find_by_bq_path(bucket_raw, source_table_raw)
if row is None:
return [], [], {
"reason": "bq_path_not_registered",
"path": f'bq."{bucket_raw}"."{source_table_raw}"',
"hint": (
"Direct bq.* references must point to a registered table. "
"Register via `agnes admin register-table` or use the "
"registered name from `agnes catalog`."
),
}
# Row exists. Per-id grant check (non-admin only).
# `accessible_set` is keyed by registry id (resource_grants
# resource_id), so use `row["id"]` here, not display name.
# Devin Review iter #3.
if not is_admin:
if accessible_set is None or row["id"] not in accessible_set:
return [], [], {
"reason": "bq_path_access_denied",
"path": f'bq."{bucket_raw}"."{source_table_raw}"',
"registered_as": row["name"],
}
# Add to dry-run set if not already covered by bare-name pass.
bucket = row["bucket"]
source_table = row["source_table"]
if bucket and source_table:
key = (bucket.lower(), source_table.lower())
if key not in seen_paths:
seen_paths.add(key)
dry_run.append((bucket, source_table, 0))
return dry_run, name_lookups, None
def _rewrite_user_sql_for_bq_dry_run(
sql: str, name_lookups: list, project: str,
) -> str:
"""Rewrite user SQL from DuckDB-flavor to BQ-native so a single
`_bq_dry_run_bytes` call can estimate scan size for the EXACT query
the user submitted (issue #171).
Two transformations:
1. Each registered remote-BQ name (word-boundary, case-insensitive)
→ ``\\`<project>.<bucket>.<source_table>\\````. Sorted longest-first
so a longer name (`unit_economics_summary`) is rewritten before a
shorter one that's a prefix (`unit_economics`) and we don't end up
with a partially-rewritten identifier.
2. ``bq."<ds>"."<tbl>"`` (and the unquoted variant) → ``\\`<project>.<ds>.<tbl>\\````.
The rewrite is regex-only (no SQL parser): a registered name appearing
inside a string literal (e.g. an `IN (...)` value or a `LIKE` pattern)
will also be rewritten. This is acceptable because (a) it's vanishingly
rare to have a string literal exactly matching a registered table name,
and (b) when it does happen the dry-run errors out and the caller falls
back to the per-table SELECT * estimate (current behavior, no regression).
CTE shadowing: a `WITH unit_economics AS (...)` followed by `FROM
unit_economics` would also rewrite the `FROM` reference. BQ then treats
the CTE as unreferenced (legal) and the dry-run estimates the rewritten
physical table — likely an over-estimate. Same fallback path covers this.
"""
out = sql
# Pass 1: bare-name rewrite, longest names first.
for name, bucket, source_table in sorted(
name_lookups, key=lambda t: -len(t[0])
):
target = f"`{project}.{bucket}.{source_table}`"
out = re.sub(
r"\b" + re.escape(name) + r"\b",
target,
out,
flags=re.IGNORECASE,
)
# Pass 2: bq."ds"."tbl" / bq.ds.tbl → `<project>.<ds>.<tbl>`.
def _bq_path_repl(m: re.Match) -> str:
ds = m.group(1).strip('"')
tbl = m.group(2).strip('"')
return f"`{project}.{ds}.{tbl}`"
out = BQ_PATH.sub(_bq_path_repl, out)
return out
@contextlib.contextmanager
def _bq_quota_and_cap_guard(
*,
user_id: str,
dry_run_set: list,
name_lookups: list,
sql: str,
):
"""Pre-flight check + dry-run + cap enforcement for /api/query BQ paths.
Context-manager shape (Devin Review #5 on PR #168). Earlier implementation
ran the dry-run + cap check inside `with quota.acquire(user_id):`, then
returned — releasing the concurrent slot BEFORE the actual BQ-touching
`analytics.execute(...)` ran. Spec §4.3.3 wants execute to be inside the
slot so the per-user concurrent cap actually limits BQ scans, not just
dry-runs.
Now: the helper is a context manager that yields after the cap check.
The caller's `with` block holds the slot through both dry-run AND the
subsequent `analytics.execute(...)` until the body exits.
Issue #171 fix: dry-run runs ONCE on the user's actual SQL (translated
to BQ-native via `_rewrite_user_sql_for_bq_dry_run`). Pre-fix the
pre-check did N dry-runs of synthetic ``SELECT * FROM <table>`` per
referenced table — which ignored WHERE filters, column projection, and
partition pruning, over-estimating scan size up to ~30,000× on
partitioned/clustered tables and rejecting narrow queries that BQ
itself would dry-run as a few MB.
Fallback: if BQ rejects the rewritten SQL with a parse-level
``client_error`` (e.g. DuckDB-only syntax like ``::INT`` casts that
don't translate to BQ), fall back to the pre-#171 per-table
SELECT * approach so the cap-guard still functions — over-estimate
is preferred over fail-open. Forbidden / upstream errors still
propagate as HTTP 502.
Flow:
1. `check_daily_budget` — over-cap users get 429 BEFORE any BQ work.
2. `quota.acquire(user_id)` opened — concurrent-slot held throughout.
3. Single dry-run of rewritten user SQL → `total_bytes`.
On parse error, fall back to per-table SELECT * → sum.
4. If total > cap → 400 `remote_scan_too_large`.
5. Yield. Caller runs `analytics.execute(...)` + `record_bytes(...)`.
6. On exit, slot released.
Mutates `dry_run_set` in place: the third tuple element (bytes) is
populated so the caller can sum and record bytes against the user's
quota post-flight. Single-dry-run path puts `total_bytes` on the first
entry and zero on the rest (BQ doesn't expose per-table bytes for a
composite query); the caller's `sum(b for _, _, b in dry_run_set)`
still equals `total_bytes`.
"""
quota = _build_quota_tracker()
try:
quota.check_daily_budget(user_id)
except QuotaExceededError as exc:
raise HTTPException(status_code=429, detail={
"reason": "daily_byte_cap_exceeded",
"kind": exc.kind,
"current": exc.current,
"limit": exc.limit,
"retry_after_seconds": exc.retry_after_seconds,
})
try:
bq = get_bq_access()
except BqAccessError as exc:
raise HTTPException(status_code=502, detail={
"kind": exc.kind,
"message": exc.message,
**(exc.details or {}),
})
cap_bytes = _default_remote_query_cap_bytes()
# `quota.acquire(user_id)` raises QuotaExceededError(KIND_CONCURRENT)
# via __enter__ when the per-user concurrent-scan slot is at cap.
# Catch around the `with` and map to HTTP 429 with the typed detail
# shape — same shape as the daily-budget rejection above. Without
# this, the exception propagates through @contextlib.contextmanager
# and is caught by execute_query's generic `except Exception` →
# returns HTTP 400 with a flattened "Query error: concurrent_scans:
# N/M" string, dropping the typed retry_after_seconds field.
# Devin Review #2 on PR #168.
try:
with quota.acquire(user_id):
project = bq.projects.data
rewritten_sql = _rewrite_user_sql_for_bq_dry_run(
sql, name_lookups, project,
)
# Try the single-dry-run path first (issue #171). Falls back
# to the per-table SELECT * approach only on BQ parse errors
# (kind="bq_bad_request" — DuckDB-only syntax that BQ can't
# translate). All other BQ errors propagate as 502 below.
total_bytes = 0
used_fallback = False
try:
total_bytes = _bq_dry_run_bytes(bq, rewritten_sql)
except BqAccessError as exc:
if exc.kind == "bq_bad_request":
logger.warning(
"BQ dry-run rejected the rewritten SQL "
"(kind=%s, message=%s). Falling back to per-table "
"SELECT * estimate; the cap check will over-estimate "
"scan bytes for this query. Consider rewriting to "
"BQ-native syntax for a tight pre-check.",
exc.kind, exc.message,
)
used_fallback = True
else:
raise HTTPException(status_code=502, detail={
"kind": exc.kind,
"message": exc.message,
**(exc.details or {}),
})
if used_fallback:
# Pre-#171 path: estimate per registered table from a
# synthetic SELECT *. Over-estimates partitioned scans but
# never under-estimates, so the cap still bounds risk.
for i, (bucket, source_table, _) in enumerate(dry_run_set):
fallback_sql = (
f"SELECT * FROM `{project}.{bucket}.{source_table}`"
)
try:
est = _bq_dry_run_bytes(bq, fallback_sql)
except BqAccessError as exc:
raise HTTPException(status_code=502, detail={
"kind": exc.kind,
"message": exc.message,
**(exc.details or {}),
})
dry_run_set[i] = (bucket, source_table, est)
total_bytes += est
else:
# Single-dry-run path. Distribute the total to dry_run_set
# so the caller's `record_bytes(sum(...))` stays correct.
# Per-table breakdown is unavailable from a composite
# dry-run; pin total to entry 0, zero the rest.
if dry_run_set:
b0, t0, _ = dry_run_set[0]
dry_run_set[0] = (b0, t0, total_bytes)
for i in range(1, len(dry_run_set)):
bi, ti, _ = dry_run_set[i]
dry_run_set[i] = (bi, ti, 0)
if cap_bytes > 0 and total_bytes > cap_bytes:
tables = [f"{b}.{t}" for b, t, _ in dry_run_set]
raise HTTPException(status_code=400, detail={
"reason": "remote_scan_too_large",
"scan_bytes": total_bytes,
"limit_bytes": cap_bytes,
"tables": tables,
"suggestion": (
"Use `agnes snapshot create <id> --select <cols> --where <predicate> "
"--estimate` to materialize a filtered subset, then query "
"the snapshot locally."
),
})
# Yield control to the handler — slot stays acquired while the
# caller runs analytics.execute() + record_bytes().
yield total_bytes
except QuotaExceededError as exc:
# Only KIND_CONCURRENT can land here (daily-budget already mapped
# above; record_bytes never raises). Map to 429 with structured
# detail consistent with the daily-budget shape.
raise HTTPException(status_code=429, detail={
"reason": "concurrent_slot_exceeded",
"kind": exc.kind,
"current": exc.current,
"limit": exc.limit,
"retry_after_seconds": exc.retry_after_seconds,
})