The iterative bare-name rewriter (one re.sub per name, longest-first)
was vulnerable to cross-contamination when the GCP project ID contained
a registered table name as a hyphen-delimited word.
Concrete repro:
project = 'my-ue-project'
registered = ['orders', 'ue']
user SQL = 'SELECT * FROM orders JOIN ue ON ...'
iter 1 (orders): produces 'FROM `my-ue-project.fin.orders` JOIN ue ...'
iter 2 (ue): '\bue\b' matches 'ue' INSIDE 'my-ue-project' (hyphen
creates word boundary on both sides) — corrupts
the iter-1 path
Fallback at query.py:576 caught the resulting BQ parse error and fell
back to per-table SELECT * estimate, so impact was over-estimation,
not fail-open — but the #171 partition-pruning fix silently degraded
to pre-fix behavior whenever a project name shared a hyphen-segment
with a registered table.
Fix: single re.sub call with an alternation regex sorted longest-first.
Single-pass means each source position is processed exactly once, so
freshly-inserted backticked text from one match isn't re-scanned by
later names in the alternation.
Regression test
test_rewrite_helper_does_not_corrupt_when_project_id_contains_registered_name
covers the exact Devin repro.
675 lines
31 KiB
Python
675 lines
31 KiB
Python
"""Query endpoint — execute SQL against server DuckDB."""
|
||
|
||
import contextlib
|
||
import logging
|
||
import os
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
from pydantic import BaseModel
|
||
import duckdb
|
||
|
||
from app.auth.access import is_user_admin
|
||
from app.auth.dependencies import get_current_user, _get_db
|
||
from app.instance_config import get_value
|
||
from src.db import get_analytics_db_readonly
|
||
from src.rbac import get_accessible_tables
|
||
from src.repositories.table_registry import TableRegistryRepository
|
||
|
||
# Imported at module level so tests can monkeypatch via
|
||
# `app.api.query._bq_dry_run_bytes` without resolving lazy imports inside
|
||
# the handler (reaches the patched attribute on each call). Same for
|
||
# get_bq_access — sibling module, dep direction doesn't matter (both are
|
||
# leaves under app.api).
|
||
from app.api.v2_quota import _build_quota_tracker, QuotaExceededError
|
||
from app.api.v2_scan import _bq_dry_run_bytes
|
||
from connectors.bigquery.access import get_bq_access, BqAccessError
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/api/query", tags=["query"])
|
||
|
||
# Issue #160 §4.3.1 — direct `bq.<dataset>.<source_table>` references in user
|
||
# SQL. Catalog token accepts both `bq` (the unquoted DuckDB-style name) and
|
||
# `"bq"` (quoted identifier). DuckDB resolves both to the same ATTACHed
|
||
# catalog, so the security-boundary regex must accept both — Phase 3 review
|
||
# caught the quoted variant as an RBAC + cost-cap bypass.
|
||
# Lookahead `(?=\W|$)` works where `\b` doesn't (after a closing quote).
|
||
# Negative lookbehind `(?<![\w.])` rejects `other_bq.x.y`, `my_bq.ds.tbl`,
|
||
# and `x.bq.y.z` so the regex doesn't fire on column qualifiers or
|
||
# look-alike-prefixed identifiers.
|
||
BQ_PATH = re.compile(
|
||
r'(?<![\w.])(?:"bq"|bq)\s*\.\s*("[^"]+"|\w+)\s*\.\s*("[^"]+"|\w+)(?=\W|$)',
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
def _default_remote_query_cap_bytes() -> int:
|
||
"""5 GiB default cap on /api/query BQ-touching scans. Configurable via
|
||
`data_source.bigquery.bq_max_scan_bytes` in /admin/server-config —
|
||
sits next to `max_bytes_per_materialize` for visual symmetry.
|
||
"""
|
||
raw = get_value("data_source", "bigquery", "bq_max_scan_bytes", default=5_368_709_120)
|
||
try:
|
||
return int(raw) if raw is not None else 5_368_709_120
|
||
except (TypeError, ValueError):
|
||
return 5_368_709_120
|
||
|
||
|
||
class QueryRequest(BaseModel):
|
||
sql: str
|
||
limit: int = 1000
|
||
|
||
|
||
class QueryResponse(BaseModel):
|
||
columns: list
|
||
rows: list
|
||
row_count: int
|
||
truncated: bool = False
|
||
|
||
|
||
@router.post("", response_model=QueryResponse)
|
||
async def execute_query(
|
||
request: QueryRequest,
|
||
user: dict = Depends(get_current_user),
|
||
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
||
):
|
||
"""Execute SQL against the server analytics DuckDB."""
|
||
sql_lower = request.sql.strip().lower()
|
||
|
||
# Block everything except SELECT
|
||
blocked = [
|
||
"drop ", "delete ", "insert ", "update ", "alter ", "create ",
|
||
"copy ", "attach ", "detach ", "load ", "install ",
|
||
"export ", "import ", "pragma ", "call ",
|
||
# File access functions
|
||
"read_csv", "read_json", "read_parquet", "read_text",
|
||
"write_csv", "write_parquet", "read_blob", "read_ndjson",
|
||
"parquet_scan", "parquet_metadata", "parquet_schema",
|
||
"json_scan", "csv_scan",
|
||
"query_table", "iceberg_scan", "delta_scan",
|
||
# #160: bigquery_query() bypasses the registry / RBAC entirely
|
||
# (it runs an arbitrary BQ jobs API call against any reachable
|
||
# dataset). Wrap views created by the BQ extractor use it inside
|
||
# CREATE VIEW bodies, but those run via DuckDB's view resolution at
|
||
# query time — user-submitted SQL never contains the function name.
|
||
"bigquery_query",
|
||
"glob(", "list_files",
|
||
"'/", '"/','http://', 'https://', 's3://', 'gcs://',
|
||
# DuckDB metadata (leaks schema info regardless of RBAC)
|
||
"information_schema", "duckdb_tables", "duckdb_columns",
|
||
"duckdb_databases", "duckdb_settings", "duckdb_functions",
|
||
"duckdb_views", "duckdb_indexes", "duckdb_schemas",
|
||
"pragma_table_info", "pragma_storage_info",
|
||
# Relative path traversal
|
||
"'../", '"../',
|
||
# Multiple statements
|
||
";",
|
||
]
|
||
if any(keyword in sql_lower for keyword in blocked):
|
||
raise HTTPException(status_code=400, detail="Only single SELECT queries are allowed")
|
||
|
||
# Accept any whitespace (newline, tab, space) after the keyword so
|
||
# multi-line SQL doesn't 400 on `SELECT\n col, ...`.
|
||
import re as _re
|
||
if not _re.match(r"^(select|with)\s", sql_lower):
|
||
raise HTTPException(status_code=400, detail="Query must start with SELECT or WITH")
|
||
|
||
# Get allowed tables for this user
|
||
allowed = get_accessible_tables(user, conn)
|
||
|
||
analytics = get_analytics_db_readonly()
|
||
try:
|
||
if allowed is not None: # None = admin, sees all
|
||
# Get all views in analytics DB
|
||
all_views = {row[0] for row in analytics.execute(
|
||
"SELECT table_name FROM information_schema.tables WHERE table_type='VIEW'"
|
||
).fetchall()}
|
||
|
||
# `allowed` carries registry IDs (resource_grants.resource_id);
|
||
# DuckDB master views are named by registry display `name`.
|
||
# Build a name->id map so the forbidden check compares apples to
|
||
# apples — when id != name, the prior `all_views - set(allowed)`
|
||
# over-denied authorized users (Devin Review iter #5 on PR #168;
|
||
# pre-existing class of name/id mismatch flagged across this
|
||
# PR's BQ guardrail too).
|
||
allowed_ids = set(allowed)
|
||
registry_rows = TableRegistryRepository(conn).list_all()
|
||
allowed_view_names = {
|
||
r["name"] for r in registry_rows
|
||
if r.get("name") and r.get("id") in allowed_ids
|
||
}
|
||
|
||
# Check if query references any forbidden tables (word-boundary match)
|
||
forbidden = all_views - allowed_view_names
|
||
for table in forbidden:
|
||
pattern = r'\b' + re.escape(table.lower()) + r'\b'
|
||
if re.search(pattern, sql_lower):
|
||
raise HTTPException(status_code=403, detail=f"Access denied to table '{table}'")
|
||
|
||
# ---- #160 BQ remote-row guardrail + RBAC patch -------------------
|
||
dry_run_set, name_lookups, blocked_bq_path = _bq_guardrail_inputs(
|
||
request.sql, sql_lower, conn, user, allowed,
|
||
)
|
||
if blocked_bq_path is not None:
|
||
raise HTTPException(status_code=403, detail=blocked_bq_path)
|
||
|
||
# Issue #160 §4.3.3 — concurrent-slot guard MUST wrap the actual
|
||
# `analytics.execute(request.sql)` call (which is what triggers the
|
||
# BQ scan when DuckDB resolves the master view), not just the
|
||
# dry-run. Devin Review on PR #168 caught this — earlier
|
||
# implementation released the slot before execute. Use a context
|
||
# manager so dry-run + cap check + execute + record_bytes all run
|
||
# inside the slot.
|
||
# Match /api/v2/scan's user_id key shape (`email or "anon"`) so the
|
||
# shared QuotaTracker singleton sees the SAME key for both endpoints.
|
||
# Earlier `id or email` ordering keyed BQ bytes on UUID for /api/query
|
||
# vs email for /api/v2/scan — the per-user daily cap was effectively
|
||
# doubled because the two paths tracked under different keys.
|
||
# Devin Review #2 caught this on PR #168.
|
||
user_id = user.get("email") or user.get("id") or "anon"
|
||
guard = (
|
||
_bq_quota_and_cap_guard(
|
||
user_id=user_id,
|
||
dry_run_set=dry_run_set,
|
||
name_lookups=name_lookups,
|
||
sql=request.sql,
|
||
)
|
||
if dry_run_set
|
||
else contextlib.nullcontext()
|
||
)
|
||
with guard:
|
||
# Open in read-only mode for extra safety
|
||
result = analytics.execute(request.sql).fetchmany(request.limit + 1)
|
||
columns = [desc[0] for desc in analytics.description] if analytics.description else []
|
||
truncated = len(result) > request.limit
|
||
rows = result[:request.limit]
|
||
|
||
# Post-flight: bill the dry-run estimate against the user's daily
|
||
# quota. Do this AFTER execute so a downstream failure (e.g. BQ
|
||
# outage) doesn't strand the user with charged-but-unrun bytes.
|
||
# Stays inside the `with quota.acquire(...)` block so the slot
|
||
# release happens after record_bytes completes.
|
||
if dry_run_set:
|
||
try:
|
||
_build_quota_tracker().record_bytes(
|
||
user_id, sum(b for _, _, b in dry_run_set),
|
||
)
|
||
except Exception:
|
||
# record_bytes is documented as never-raising; defensive guard.
|
||
logger.warning("quota record_bytes failed for user=%s", user_id)
|
||
|
||
# Convert to serializable types
|
||
serializable_rows = []
|
||
for row in rows:
|
||
serializable_rows.append([
|
||
str(v) if v is not None and not isinstance(v, (int, float, bool, str)) else v
|
||
for v in row
|
||
])
|
||
return QueryResponse(
|
||
columns=columns,
|
||
rows=serializable_rows,
|
||
row_count=len(serializable_rows),
|
||
truncated=truncated,
|
||
)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
# If DuckDB raised "Table … does not exist" for a referenced name,
|
||
# check whether that name belongs to a registry row in
|
||
# `query_mode='materialized'` that hasn't yet been materialized in
|
||
# this instance's analytics.duckdb. Materialized rows produce a
|
||
# parquet at `${DATA_DIR}/extracts/<source>/data/<id>.parquet` but
|
||
# the orchestrator is `_meta`-driven and only creates master views
|
||
# for connectors that emit `_meta` rows — so on a fresh instance
|
||
# (or before the first scheduler tick) the master view doesn't
|
||
# exist yet and the operator gets a confusing "table does not
|
||
# exist" with no path forward. Surface a materialize-aware hint
|
||
# instead of DuckDB's bare error.
|
||
msg = str(e)
|
||
helpful = _materialized_hint_for_query_error(conn, request.sql, msg)
|
||
if helpful:
|
||
raise HTTPException(status_code=400, detail=helpful)
|
||
raise HTTPException(status_code=400, detail=f"Query error: {msg}")
|
||
finally:
|
||
analytics.close()
|
||
|
||
|
||
def _materialized_hint_for_query_error(
|
||
conn: duckdb.DuckDBPyConnection, sql: str, error_msg: str,
|
||
) -> Optional[str]:
|
||
"""Return a materialize-aware error message if the failed query
|
||
references a registry row whose `query_mode='materialized'` and which
|
||
has no master view in analytics.duckdb yet, OR ``None`` to fall back
|
||
to DuckDB's raw error.
|
||
|
||
The detection scans each materialized row's id/name against the SQL
|
||
text; a hit means the operator picked a name that exists in the
|
||
registry but isn't queryable in this instance. The hint is the same
|
||
in both arms of the OR — it tells them what the table needs and what
|
||
they can do today (`agnes pull` or query `bq."dataset"."table"`
|
||
directly using the bucket/source_table from the registry row).
|
||
"""
|
||
# Cheap fast-path — only inspect the registry when DuckDB's error
|
||
# actually mentions a missing table. Avoids registry round-trip on
|
||
# every parse/cast/permission failure.
|
||
el = error_msg.lower()
|
||
if "does not exist" not in el and "table with name" not in el:
|
||
return None
|
||
try:
|
||
repo = TableRegistryRepository(conn)
|
||
rows = repo.list_all()
|
||
except Exception:
|
||
# Registry read failed for whatever reason — don't compound the
|
||
# error response by hiding the original DuckDB message.
|
||
return None
|
||
sql_l = sql.lower()
|
||
for r in rows:
|
||
if (r.get("query_mode") or "") != "materialized":
|
||
continue
|
||
# Match by id or by name; either could appear in the SQL.
|
||
candidates = {r.get("id"), r.get("name")}
|
||
for cand in candidates:
|
||
if not cand:
|
||
continue
|
||
cand_l = str(cand).lower()
|
||
# Word-boundary-ish check — `\b` doesn't match `.` so
|
||
# `bq.dataset.cand` would still hit, which is fine for the
|
||
# hint path (the operator is referring to the same table).
|
||
if re.search(r"\b" + re.escape(cand_l) + r"\b", sql_l):
|
||
return _build_materialized_hint(r)
|
||
return None
|
||
|
||
|
||
def _build_materialized_hint(row: dict) -> str:
|
||
"""Format the user-facing hint for a materialized row that's not yet
|
||
queryable. Includes the table id, the bucket/source_table when the
|
||
row carries them, and concrete operator next steps."""
|
||
tid = row.get("id") or row.get("name") or "<unknown>"
|
||
bucket = row.get("bucket")
|
||
source_table = row.get("source_table")
|
||
direct_hint = ""
|
||
if bucket and source_table:
|
||
# BigQuery: `bq."dataset"."table"`; Keboola: `kbc."bucket"."table"`.
|
||
# Pick the alias by source_type so the hint is copy-pasteable.
|
||
alias = "bq" if (row.get("source_type") or "") == "bigquery" else "kbc"
|
||
direct_hint = (
|
||
f' or query the source directly via {alias}."{bucket}".'
|
||
f'"{source_table}"'
|
||
)
|
||
return (
|
||
f"Table {tid!r} is registered as query_mode='materialized' but is "
|
||
f"not yet materialized in this instance's analytics views. Run "
|
||
f"`agnes pull` (or wait for the scheduler tick / hit POST "
|
||
f"/api/sync/trigger) to materialize the parquet"
|
||
f"{direct_hint}."
|
||
)
|
||
|
||
|
||
def _bq_guardrail_inputs(
|
||
sql: str,
|
||
sql_lower: str,
|
||
sys_conn: duckdb.DuckDBPyConnection,
|
||
user: dict,
|
||
allowed: Optional[list],
|
||
):
|
||
"""Two-pass scan over user SQL for the upcoming BQ guardrail + RBAC patch.
|
||
|
||
Returns a tuple `(dry_run_set, name_lookups, blocked_bq_path)`:
|
||
|
||
- `dry_run_set` is a list of `(bucket, source_table, est_bytes)` triples
|
||
identifying every BigQuery row the request will scan. The caller dry-runs
|
||
the rewritten user SQL once and distributes the total here for quota
|
||
bookkeeping.
|
||
|
||
- `name_lookups` is a list of `(registered_name, bucket, source_table)`
|
||
triples — only the bare-name matches from pass 1, NOT the direct
|
||
`bq."<ds>"."<tbl>"` matches. Issue #171 fix: the cap-guard rewrites
|
||
these name → ``\\`<project>.<bucket>.<source_table>\\``` when building
|
||
the BQ-native SQL for dry-run, so partition pruning + column projection
|
||
+ predicate pushdown all engage.
|
||
|
||
- `blocked_bq_path` is a structured-detail dict for the caller to raise
|
||
HTTPException(403) with, when user SQL contains a direct
|
||
`bq."<ds>"."<tbl>"` reference that either points at an unregistered
|
||
path (`bq_path_not_registered`) or registered but the caller has no
|
||
grant on the registered name (`bq_path_access_denied`). None when the
|
||
RBAC check passes.
|
||
"""
|
||
repo = TableRegistryRepository(sys_conn)
|
||
|
||
# 1. Bare-name pass: look up registered remote-BQ names that appear in
|
||
# the user SQL as word-boundary tokens. Reuses the same regex shape as
|
||
# the existing forbidden-table loop above.
|
||
#
|
||
# `accessible_set` comes from `get_accessible_tables()` which returns
|
||
# `resource_grants.resource_id` values — i.e. table registry IDs, NOT
|
||
# display names. Devin Review iter #3 caught the mismatch: when
|
||
# `id != name` (e.g. id="bq.finance.ue", name="ue"), legitimate
|
||
# accessible rows were skipped, under-counting dry-run bytes for the
|
||
# cost cap. The user SQL still references the display `name` (that's
|
||
# what shows in `agnes catalog`), so the regex match below uses `name`,
|
||
# but the access gate uses `id`.
|
||
dry_run: list = []
|
||
name_lookups: list = []
|
||
seen_paths: set = set()
|
||
accessible_set = set(allowed) if allowed is not None else None
|
||
for r in repo.list_by_source("bigquery"):
|
||
if (r.get("query_mode") or "") != "remote":
|
||
continue
|
||
bucket = r.get("bucket")
|
||
source_table = r.get("source_table")
|
||
name = r.get("name")
|
||
row_id = r.get("id")
|
||
if not (bucket and source_table and name and row_id):
|
||
continue
|
||
if accessible_set is not None and row_id not in accessible_set:
|
||
# Forbidden-table loop above will have rejected the request
|
||
# before we get here. Defensive skip.
|
||
continue
|
||
pattern = r'\b' + re.escape(str(name).lower()) + r'\b'
|
||
if re.search(pattern, sql_lower):
|
||
key = (bucket.lower(), source_table.lower())
|
||
if key not in seen_paths:
|
||
seen_paths.add(key)
|
||
dry_run.append((bucket, source_table, 0)) # bytes filled at dry-run
|
||
# Record the (name, bucket, source_table) mapping separately so the
|
||
# cap-guard's SQL rewriter can find every occurrence — even if the
|
||
# user references the same physical table under two registered
|
||
# names (rare but possible: aliased catalog rows).
|
||
name_lookups.append((str(name), bucket, source_table))
|
||
|
||
# 2. Direct bq.<ds>.<tbl> pass: every match must point at a registered
|
||
# row. Run BEFORE adding to dry_run so unregistered paths fail-fast.
|
||
is_admin = is_user_admin(user.get("id") or user.get("email") or "", sys_conn)
|
||
for m in BQ_PATH.finditer(sql):
|
||
bucket_raw = m.group(1).strip('"')
|
||
source_table_raw = m.group(2).strip('"')
|
||
row = repo.find_by_bq_path(bucket_raw, source_table_raw)
|
||
if row is None:
|
||
return [], [], {
|
||
"reason": "bq_path_not_registered",
|
||
"path": f'bq."{bucket_raw}"."{source_table_raw}"',
|
||
"hint": (
|
||
"Direct bq.* references must point to a registered table. "
|
||
"Register via `agnes admin register-table` or use the "
|
||
"registered name from `agnes catalog`."
|
||
),
|
||
}
|
||
# Row exists. Per-id grant check (non-admin only).
|
||
# `accessible_set` is keyed by registry id (resource_grants
|
||
# resource_id), so use `row["id"]` here, not display name.
|
||
# Devin Review iter #3.
|
||
if not is_admin:
|
||
if accessible_set is None or row["id"] not in accessible_set:
|
||
return [], [], {
|
||
"reason": "bq_path_access_denied",
|
||
"path": f'bq."{bucket_raw}"."{source_table_raw}"',
|
||
"registered_as": row["name"],
|
||
}
|
||
# Add to dry-run set if not already covered by bare-name pass.
|
||
bucket = row["bucket"]
|
||
source_table = row["source_table"]
|
||
if bucket and source_table:
|
||
key = (bucket.lower(), source_table.lower())
|
||
if key not in seen_paths:
|
||
seen_paths.add(key)
|
||
dry_run.append((bucket, source_table, 0))
|
||
|
||
return dry_run, name_lookups, None
|
||
|
||
|
||
def _rewrite_user_sql_for_bq_dry_run(
|
||
sql: str, name_lookups: list, project: str,
|
||
) -> str:
|
||
"""Rewrite user SQL from DuckDB-flavor to BQ-native so a single
|
||
`_bq_dry_run_bytes` call can estimate scan size for the EXACT query
|
||
the user submitted (issue #171).
|
||
|
||
Two transformations:
|
||
|
||
1. Each registered remote-BQ name (word-boundary, case-insensitive)
|
||
→ ``\\`<project>.<bucket>.<source_table>\\````. A SINGLE re.sub call
|
||
with an alternation regex sorted longest-first replaces every
|
||
occurrence in one pass — important to avoid cross-contamination
|
||
(Devin Review on query.py:464). The previous iterative approach
|
||
(one re.sub per name, longest-first) corrupted output when the
|
||
project ID contained a registered table name as a hyphen-delimited
|
||
word: Pass 1 iter N's `\\bname\\b` regex would match INSIDE the
|
||
backticked replacement text from a prior iter. Concrete repro:
|
||
project = `my-ue-project`, registered names `orders` + `ue`, SQL
|
||
`FROM orders JOIN ue` → after iter 1 (orders): the backticked path
|
||
contains `my-ue-project`, then iter 2 (ue) matches the `ue` inside
|
||
it. Single-pass alternation processes each source position exactly
|
||
once, so the freshly-inserted backticked text isn't re-scanned.
|
||
|
||
2. ``bq."<ds>"."<tbl>"`` (and the unquoted variant) → ``\\`<project>.<ds>.<tbl>\\````.
|
||
Distinct pattern from Pass 1, no overlap, separate re.sub.
|
||
|
||
The rewrite is regex-only (no SQL parser): a registered name appearing
|
||
inside a string literal (e.g. an `IN (...)` value or a `LIKE` pattern)
|
||
will also be rewritten. This is acceptable because (a) it's vanishingly
|
||
rare to have a string literal exactly matching a registered table name,
|
||
and (b) when it does happen the dry-run errors out and the caller falls
|
||
back to the per-table SELECT * estimate (current behavior, no regression).
|
||
|
||
CTE shadowing: a `WITH unit_economics AS (...)` followed by `FROM
|
||
unit_economics` would also rewrite the `FROM` reference. BQ then treats
|
||
the CTE as unreferenced (legal) and the dry-run estimates the rewritten
|
||
physical table — likely an over-estimate. Same fallback path covers this.
|
||
"""
|
||
out = sql
|
||
|
||
# Pass 1: bare-name rewrite. Build a single alternation regex sorted
|
||
# longest-first, with a function-replacement that looks the matched
|
||
# name up in a case-insensitive dict. Single-pass means freshly
|
||
# inserted backticked text isn't re-scanned, fixing the
|
||
# project-ID-contains-name corruption (Devin Review on query.py:464).
|
||
if name_lookups:
|
||
# Map name (lower-cased) → backticked target. Names are
|
||
# case-insensitive on the input side per the existing helper
|
||
# contract (see test_rewrite_helper_is_case_insensitive_on_bare_names).
|
||
name_to_target: dict[str, str] = {}
|
||
for name, bucket, source_table in name_lookups:
|
||
name_to_target[name.lower()] = f"`{project}.{bucket}.{source_table}`"
|
||
|
||
# Alternation pattern, longest-first. Longer match wins at any
|
||
# given position because Python's re tries alternatives
|
||
# left-to-right and stops at the first match — pinning longest
|
||
# entries to the front preserves the prefix-collision invariant
|
||
# exercised by test_rewrite_helper_longer_name_wins_over_prefix.
|
||
sorted_names = sorted(name_to_target.keys(), key=len, reverse=True)
|
||
pattern = r"\b(" + "|".join(re.escape(n) for n in sorted_names) + r")\b"
|
||
|
||
def _name_repl(m: re.Match) -> str:
|
||
return name_to_target[m.group(1).lower()]
|
||
|
||
out = re.sub(pattern, _name_repl, out, flags=re.IGNORECASE)
|
||
|
||
# Pass 2: bq."ds"."tbl" / bq.ds.tbl → `<project>.<ds>.<tbl>`.
|
||
def _bq_path_repl(m: re.Match) -> str:
|
||
ds = m.group(1).strip('"')
|
||
tbl = m.group(2).strip('"')
|
||
return f"`{project}.{ds}.{tbl}`"
|
||
|
||
out = BQ_PATH.sub(_bq_path_repl, out)
|
||
return out
|
||
|
||
|
||
@contextlib.contextmanager
|
||
def _bq_quota_and_cap_guard(
|
||
*,
|
||
user_id: str,
|
||
dry_run_set: list,
|
||
name_lookups: list,
|
||
sql: str,
|
||
):
|
||
"""Pre-flight check + dry-run + cap enforcement for /api/query BQ paths.
|
||
|
||
Context-manager shape (Devin Review #5 on PR #168). Earlier implementation
|
||
ran the dry-run + cap check inside `with quota.acquire(user_id):`, then
|
||
returned — releasing the concurrent slot BEFORE the actual BQ-touching
|
||
`analytics.execute(...)` ran. Spec §4.3.3 wants execute to be inside the
|
||
slot so the per-user concurrent cap actually limits BQ scans, not just
|
||
dry-runs.
|
||
|
||
Now: the helper is a context manager that yields after the cap check.
|
||
The caller's `with` block holds the slot through both dry-run AND the
|
||
subsequent `analytics.execute(...)` until the body exits.
|
||
|
||
Issue #171 fix: dry-run runs ONCE on the user's actual SQL (translated
|
||
to BQ-native via `_rewrite_user_sql_for_bq_dry_run`). Pre-fix the
|
||
pre-check did N dry-runs of synthetic ``SELECT * FROM <table>`` per
|
||
referenced table — which ignored WHERE filters, column projection, and
|
||
partition pruning, over-estimating scan size up to ~30,000× on
|
||
partitioned/clustered tables and rejecting narrow queries that BQ
|
||
itself would dry-run as a few MB.
|
||
|
||
Fallback: if BQ rejects the rewritten SQL with a parse-level
|
||
``client_error`` (e.g. DuckDB-only syntax like ``::INT`` casts that
|
||
don't translate to BQ), fall back to the pre-#171 per-table
|
||
SELECT * approach so the cap-guard still functions — over-estimate
|
||
is preferred over fail-open. Forbidden / upstream errors still
|
||
propagate as HTTP 502.
|
||
|
||
Flow:
|
||
1. `check_daily_budget` — over-cap users get 429 BEFORE any BQ work.
|
||
2. `quota.acquire(user_id)` opened — concurrent-slot held throughout.
|
||
3. Single dry-run of rewritten user SQL → `total_bytes`.
|
||
On parse error, fall back to per-table SELECT * → sum.
|
||
4. If total > cap → 400 `remote_scan_too_large`.
|
||
5. Yield. Caller runs `analytics.execute(...)` + `record_bytes(...)`.
|
||
6. On exit, slot released.
|
||
|
||
Mutates `dry_run_set` in place: the third tuple element (bytes) is
|
||
populated so the caller can sum and record bytes against the user's
|
||
quota post-flight. Single-dry-run path puts `total_bytes` on the first
|
||
entry and zero on the rest (BQ doesn't expose per-table bytes for a
|
||
composite query); the caller's `sum(b for _, _, b in dry_run_set)`
|
||
still equals `total_bytes`.
|
||
"""
|
||
quota = _build_quota_tracker()
|
||
try:
|
||
quota.check_daily_budget(user_id)
|
||
except QuotaExceededError as exc:
|
||
raise HTTPException(status_code=429, detail={
|
||
"reason": "daily_byte_cap_exceeded",
|
||
"kind": exc.kind,
|
||
"current": exc.current,
|
||
"limit": exc.limit,
|
||
"retry_after_seconds": exc.retry_after_seconds,
|
||
})
|
||
|
||
try:
|
||
bq = get_bq_access()
|
||
except BqAccessError as exc:
|
||
raise HTTPException(status_code=502, detail={
|
||
"kind": exc.kind,
|
||
"message": exc.message,
|
||
**(exc.details or {}),
|
||
})
|
||
|
||
cap_bytes = _default_remote_query_cap_bytes()
|
||
|
||
# `quota.acquire(user_id)` raises QuotaExceededError(KIND_CONCURRENT)
|
||
# via __enter__ when the per-user concurrent-scan slot is at cap.
|
||
# Catch around the `with` and map to HTTP 429 with the typed detail
|
||
# shape — same shape as the daily-budget rejection above. Without
|
||
# this, the exception propagates through @contextlib.contextmanager
|
||
# and is caught by execute_query's generic `except Exception` →
|
||
# returns HTTP 400 with a flattened "Query error: concurrent_scans:
|
||
# N/M" string, dropping the typed retry_after_seconds field.
|
||
# Devin Review #2 on PR #168.
|
||
try:
|
||
with quota.acquire(user_id):
|
||
project = bq.projects.data
|
||
rewritten_sql = _rewrite_user_sql_for_bq_dry_run(
|
||
sql, name_lookups, project,
|
||
)
|
||
|
||
# Try the single-dry-run path first (issue #171). Falls back
|
||
# to the per-table SELECT * approach only on BQ parse errors
|
||
# (kind="bq_bad_request" — DuckDB-only syntax that BQ can't
|
||
# translate). All other BQ errors propagate as 502 below.
|
||
total_bytes = 0
|
||
used_fallback = False
|
||
try:
|
||
total_bytes = _bq_dry_run_bytes(bq, rewritten_sql)
|
||
except BqAccessError as exc:
|
||
if exc.kind == "bq_bad_request":
|
||
logger.warning(
|
||
"BQ dry-run rejected the rewritten SQL "
|
||
"(kind=%s, message=%s). Falling back to per-table "
|
||
"SELECT * estimate; the cap check will over-estimate "
|
||
"scan bytes for this query. Consider rewriting to "
|
||
"BQ-native syntax for a tight pre-check.",
|
||
exc.kind, exc.message,
|
||
)
|
||
used_fallback = True
|
||
else:
|
||
raise HTTPException(status_code=502, detail={
|
||
"kind": exc.kind,
|
||
"message": exc.message,
|
||
**(exc.details or {}),
|
||
})
|
||
|
||
if used_fallback:
|
||
# Pre-#171 path: estimate per registered table from a
|
||
# synthetic SELECT *. Over-estimates partitioned scans but
|
||
# never under-estimates, so the cap still bounds risk.
|
||
for i, (bucket, source_table, _) in enumerate(dry_run_set):
|
||
fallback_sql = (
|
||
f"SELECT * FROM `{project}.{bucket}.{source_table}`"
|
||
)
|
||
try:
|
||
est = _bq_dry_run_bytes(bq, fallback_sql)
|
||
except BqAccessError as exc:
|
||
raise HTTPException(status_code=502, detail={
|
||
"kind": exc.kind,
|
||
"message": exc.message,
|
||
**(exc.details or {}),
|
||
})
|
||
dry_run_set[i] = (bucket, source_table, est)
|
||
total_bytes += est
|
||
else:
|
||
# Single-dry-run path. Distribute the total to dry_run_set
|
||
# so the caller's `record_bytes(sum(...))` stays correct.
|
||
# Per-table breakdown is unavailable from a composite
|
||
# dry-run; pin total to entry 0, zero the rest.
|
||
if dry_run_set:
|
||
b0, t0, _ = dry_run_set[0]
|
||
dry_run_set[0] = (b0, t0, total_bytes)
|
||
for i in range(1, len(dry_run_set)):
|
||
bi, ti, _ = dry_run_set[i]
|
||
dry_run_set[i] = (bi, ti, 0)
|
||
|
||
if cap_bytes > 0 and total_bytes > cap_bytes:
|
||
tables = [f"{b}.{t}" for b, t, _ in dry_run_set]
|
||
raise HTTPException(status_code=400, detail={
|
||
"reason": "remote_scan_too_large",
|
||
"scan_bytes": total_bytes,
|
||
"limit_bytes": cap_bytes,
|
||
"tables": tables,
|
||
"suggestion": (
|
||
"Use `agnes snapshot create <id> --select <cols> --where <predicate> "
|
||
"--estimate` to materialize a filtered subset, then query "
|
||
"the snapshot locally."
|
||
),
|
||
})
|
||
|
||
# Yield control to the handler — slot stays acquired while the
|
||
# caller runs analytics.execute() + record_bytes().
|
||
yield total_bytes
|
||
except QuotaExceededError as exc:
|
||
# Only KIND_CONCURRENT can land here (daily-budget already mapped
|
||
# above; record_bytes never raises). Map to 429 with structured
|
||
# detail consistent with the daily-budget shape.
|
||
raise HTTPException(status_code=429, detail={
|
||
"reason": "concurrent_slot_exceeded",
|
||
"kind": exc.kind,
|
||
"current": exc.current,
|
||
"limit": exc.limit,
|
||
"retry_after_seconds": exc.retry_after_seconds,
|
||
})
|