agnes-the-ai-analyst/app/api/v2_scan.py
ZdenekSrotyr 83adf01bde
fix(v2): #134 BigQuery cross-project errors return structured 502/400 + BqAccess facade (#138)
* docs(spec): #134 unify BigQuery access behind BqAccess facade

Brainstorm output for issue #134. Captures:
- root cause (incl. correction of the issue's hypothesis about commit 33a9964)
- BqAccess facade API + project resolution rules
- error contract — typed BqAccessError mapped to HTTP 502 for upstream
  BQ failures, 500 for deployment/config bugs
- migration plan for v2_scan, v2_sample, RemoteQueryEngine
- test rewrite eliminating _bq_client_factory injection point
- E2E verification protocol on agnes-development as success criterion

* docs(spec): #134 revise after first review

Incorporates code-reviewer findings:

Must-fix:
- Add v2_schema (2 copies of INSTALL/LOAD/SECRET dance) to migration scope.
- Reframe v2_scan headline: missing try/except around BQ calls is the
  actual cause of bare 500s, not project resolution (which 33a9964 fixed).
- List two more deferred call sites (extractor.py, register_bq_table)
  with explicit rationale.

Important:
- Drop billing != data clause from cross_project_forbidden heuristic;
  rely only on 'serviceusage' substring. billing != data is normal
  for cross-project setup, was over-classifying.
- Split bq_bad_request into _user (400) and _server (502) variants;
  add sql_origin parameter to translate_bq_error so call sites declare
  whether SQL contains user input.
- Add @functools.cache to BqAccess.from_config; document tests bypass
  via dependency_overrides.
- Replace monkey-patched-classmethod test pattern with
  BqAccess(client_factory=...) injection at construction time. Cleaner
  than today's _bq_client_factory and 1:1 migration shape.
- Keep BqProjects.data (reviewer assumed registry has source_project;
  it doesn't). Multi-project explicitly listed as non-goal with note.

Nice-to-have:
- Add 'Implementation strategy' section: 2 staged commits (bug fix
  alone is revertable; refactor follows).
- Extend E2E protocol to cover all three endpoints, not just /sample.
- Note removal of stale docstring at src/remote_query.py:204.

* docs(spec): #134 revision 3 — incorporates second-round review

Must-fix from second review:
- v2_schema split into two migration cases: _fetch_bq_schema translates
  errors via translate_bq_error; _fetch_bq_table_options preserves its
  swallow-all 'except Exception → return {}' so /schema doesn't 502 on
  partition-info failures.
- RemoteQueryEngine.__init__ now resolves BqAccess lazily (in
  _get_bq_client, not in __init__). Without this, ~7 DuckDB-only tests
  in test_remote_query.py would suddenly fail with not_configured.
- translate_bq_error pass-through for BqAccessError is now load-bearing
  (clause 1, before any Google-API branch). bq.client() raises BqAccessError
  for bq_lib_missing/auth_failed; without explicit pass-through those
  fall to 'unknown' and re-raise as bare 500.
- Commit 1 now emits the SAME structured response shape as commit 2 to
  avoid contract churn between commits.
- BIGQUERY_PROJECT env-var precedence is BREAKING for env-only deployments
  — flagged in CHANGELOG ### Changed.

Editorial:
- sql_origin renamed to bad_request_status with values 'client_error' /
  'upstream_error' (clearer about what the parameter actually decides).
  bq_bad_request_user/_server kinds collapsed to bq_bad_request (400)
  and bq_upstream_error (502).
- CLI (cli/commands/query.py) noted as external RemoteQueryEngine caller;
  unaffected because new bq_access kwarg has default None.
- Added unit/integration tests for the new contracts:
  test_translate_passes_through_BqAccessError,
  test_v2_scan_returns_500_on_bq_lib_missing,
  test_v2_schema_returns_200_with_empty_partition_on_bq_failure,
  test_resolve_succeeds_after_config_set.
- E2E protocol now covers /schema as the fourth endpoint.
- Documented functools.cache-doesn't-cache-exceptions semantics and
  fixture nullcontext-doesn't-close caveat for nested sessions.

* docs(spec): #134 revision 4 — incorporates third-round review

Third reviewer verdict: 'implementation-ready with two trivial edits';
explicitly noted prior rounds did the heavy lifting.

Edits:
1. get_bq_access() module-level function instead of @classmethod
   @functools.cache from_config. Removes the classmethod-cache stacking
   footgun (different Python versions wrap differently) and gives FastAPI's
   dependency introspection a clean function signature. Drops the
   'Do not subclass BqAccess' caveat that no longer applies.

2. Commit 1 strategy explicitly: wrap _fetch_bq_sample (v2_sample),
   _bq_dry_run_bytes + _run_bq_scan (v2_scan), and _fetch_bq_schema
   (v2_schema strict block). Do NOT touch _fetch_bq_table_options swallow-all
   in commit 1 — preserved as-is, then migrated (still preserved) in commit 2.
   All three endpoints emit the same structured body shape so client parsers
   see one consistent contract throughout the staged rollout. No more
   half-rolled-out window where /sample is bare 500 while /scan is
   structured 502.

* docs(plan): #134 implementation plan — Phase 1 (atomic bug fix) + Phase 2 (BqAccess refactor) + Phase 3 (verification)

Bite-sized TDD tasks. 3 phases, 16 tasks total:

Phase 1 (Commit 1) — atomic bug fix across all four v2 endpoints:
  Tasks 1.1-1.5 wrap _fetch_bq_sample, _bq_dry_run_bytes, _run_bq_scan,
  _fetch_bq_schema with structured 502/400 try/except. _fetch_bq_table_options
  preserved untouched. CHANGELOG Fixed entries.

Phase 2 (Commit 2) — BqAccess facade extraction + migration:
  Tasks 2.1-2.5 build connectors/bigquery/access.py bottom-up
  (BqProjects, BqAccessError, translate_bq_error, default factories,
  BqAccess class, get_bq_access module-level cached). Task 2.6 adds
  conftest.py fixture. Tasks 2.7-2.9 migrate v2_scan, v2_sample, v2_schema
  to BqAccess. Tasks 2.10-2.11 migrate RemoteQueryEngine + tests
  (lazy bq_access, drop _bq_client_factory). Task 2.12 CHANGELOG
  Changed BREAKING + Internal.

Phase 3 — Verification:
  3.1 full pytest. 3.2 squash into two PR-shape commits. 3.3 manual
  E2E on agnes-development per spec protocol → close #134.

Self-review table maps spec sections to implementing tasks; no gaps.

* fix(v2): #134 structured 502/400 on BQ errors across /scan, /scan/estimate, /sample, /schema

Wraps the BigQuery call sites in v2_scan, v2_sample, and v2_schema (strict
block only) with try/except for google.api_core exceptions, translating to
HTTPException with a structured body shape: {error, message, details}.

Fixes Pavel's report (#134) where these endpoints returned bare HTTP 500
with no body when the SA on agnes-development hit cross-project Forbidden
on serviceusage.services.use.

Also fixes /sample's missing billing_project fallback (the bug 33a9964
fixed for /scan never landed here).

Status code split:
  - /scan, /scan/estimate: BadRequest -> 400 (bq_bad_request) since SQL is
    user-derived from req.select/where/order_by.
  - /sample, /schema: BadRequest -> 502 (bq_upstream_error) since SQL is
    server-constructed from validated identifiers.
  - All Forbidden -> 502 with cross_project_forbidden if 'serviceusage' in
    error message (with hint pointing at data_source.bigquery.billing_project),
    else bq_forbidden.

Body shape matches what the upcoming BqAccess refactor (next commit) will
produce, so client-side parsers see one consistent contract throughout
the staged rollout.

_fetch_bq_table_options preserved exactly as-is — its swallow-all-and-return-empty
contract is intentional and survives into the refactor; /schema continues to
return 200 with empty partition info when partition queries fail.

Outer wraps in scan_endpoint, scan_estimate_endpoint, sample, and schema
endpoints exist only to make the test pattern (monkeypatching whole
_fetch_* functions) work, and are tagged TODO(#134 Phase 2) for removal
once BqAccess centralizes translation.

* refactor(bq): #134 BqAccess facade — unify v2_scan, v2_sample, v2_schema, RemoteQueryEngine

Extracts the duplicated BigQuery-access pattern (project resolution +
client construction + DuckDB-extension session + Google-API error
translation) into connectors/bigquery/access.py. Migrates four
call sites to use it:

- app/api/v2_scan.py — _bq_dry_run_bytes, _run_bq_scan
- app/api/v2_sample.py — _fetch_bq_sample
- app/api/v2_schema.py — _fetch_bq_schema (strict translation),
  _fetch_bq_table_options (preserves swallow-all best-effort contract)
- src/remote_query.py — RemoteQueryEngine, lazy bq_access kwarg

The new module exposes:
- BqProjects (frozen dataclass: billing + data project IDs)
- BqAccessError (typed exception with HTTP_STATUS class mapping)
- BqAccess (facade with injectable client_factory/duckdb_session_factory
  for tests; defaults call the real google-cloud-bigquery + DuckDB extension)
- get_bq_access (module-level @functools.cache; FastAPI Depends target)
- translate_bq_error (Google API exception → BqAccessError mapper, with
  BqAccessError pass-through, 'serviceusage'-substring heuristic for
  cross_project_forbidden, and bad_request_status param distinguishing
  user-derived (400) from server-constructed (502) SQL)
- _default_client_factory, _default_duckdb_session_factory

RemoteQueryEngine.__init__ no longer accepts _bq_client_factory; tests
migrate to bq_access=BqAccess(projects, client_factory=...). DuckDB-only
RemoteQueryEngine tests need no changes — bq_access defaults to None and
get_bq_access() is only invoked on first BQ call (lazy resolution).
BqAccessError raised internally is translated to RemoteQueryError(
error_type="bq_error") in _get_bq_client to preserve the engine's
existing public contract — CLI and /api/query/hybrid callers see no change.

Endpoint tests (test_v2_scan, test_v2_scan_estimate, test_v2_sample,
test_v2_schema) migrate from monkey-patching whole _fetch_* functions
to using the new bq_access fixture in tests/conftest.py — which
exercises the REAL translation path through BqAccess + translate_bq_error,
closing the test gap flagged in Task 1.1's review.

Side-effect behavior change: v2_sample's FROM clause now uses the data
project (instance.yaml data_source.bigquery.project), not the conflated
billing_project from Phase 1. Documented in CHANGELOG ### Internal.

BREAKING for deployments combining BIGQUERY_PROJECT env var with
data_source.bigquery.project in instance.yaml — env var now overrides
data project too. See CHANGELOG ### Changed.

Two known-duplicate BQ-access sites (connectors/bigquery/extractor.py,
scripts/duckdb_manager.register_bq_table) explicitly out of scope;
tracked as follow-up.

Removed stale docstring at the previous src/remote_query.py:204
that referenced scripts.duckdb_manager._create_bq_client as the default
BQ client factory (RemoteQueryEngine never actually used that function).

Test counts: tests/test_bq_access.py +27 (new), tests/test_v2_*.py +
tests/test_remote_query.py migrated to bq_access fixture (counts unchanged
or +1-2 per file). Full suite: 2086 passed, 8 pre-existing failures
(DB migration tests with unrelated internal_roles DependencyException —
not introduced by this PR).

* fix(bq_access): translate DefaultCredentialsError to BqAccessError(auth_failed)

CI on PR #138 caught: bigquery.Client(...) resolves Application Default
Credentials at construction time; without ADC (CI without SA key, dev
laptop without 'gcloud auth application-default login') it raises
google.auth.exceptions.DefaultCredentialsError synchronously.

Pre-fix _default_client_factory only caught ImportError, so DefaultCredentialsError
propagated as raw exception — and from production endpoints would surface
as bare 500 (the exact failure mode #134 sets out to fix).

Now translates to BqAccessError(kind='auth_failed', details.hint='Run
gcloud auth application-default login...'). Endpoint catch chain returns
HTTP 502 with structured body. Adds unit test
test_raises_auth_failed_on_default_credentials_error.

Third-round spec review flagged this case in passing; the fix didn't land.
CI's auth-less environment surfaced it.

* fix(bq_access): get_bq_access() returns sentinel instead of raising when not configured

Devin BUG_0001 on PR #138 review: 'get_bq_access() as FastAPI Depends
breaks all v2 endpoints for non-BigQuery instances'.

Pre-fix: get_bq_access() raised BqAccessError(not_configured) when
neither BIGQUERY_PROJECT env nor data_source.bigquery.project was set.
Because FastAPI resolves Depends() BEFORE the endpoint body runs, this
exception fires during dep-injection — the endpoint's try/except
BqAccessError clause never gets a chance to catch it. Result: every
v2 request on Keboola-only or CSV-only instances returned bare HTTP
500, even for local-source tables that never touch BigQuery.

Fix: get_bq_access() now returns a sentinel BqAccess with empty
BqProjects and factories that raise BqAccessError(not_configured)
on actual use. Construction succeeds, FastAPI's dep-injection cleanly
yields the sentinel, the endpoint runs. The local-source code path
in build_sample / build_schema / etc. never calls bq.client() or
bq.duckdb_session() (it reads parquet directly), so non-BQ tables
return 200 as before. Only when an endpoint actually tries to query
BQ (source_type == 'bigquery') does the sentinel raise — and the
endpoint's existing except BqAccessError catches it normally,
returning structured 502 with hint.

Test get_bq_access::test_raises_not_configured_when_neither_set
renamed and rewritten to test_returns_sentinel_when_neither_set:
asserts BqAccess is returned, then asserts client() and
duckdb_session() each raise BqAccessError(not_configured) on call.

Test test_does_not_cache_exceptions removed (no longer applicable)
and replaced with test_sentinel_is_cached_per_process documenting
the operator-restart-on-config-change contract.

* docs(spec+plan): #134 genericize customer-specific tokens (CLAUDE.md OSS rule)

Devin BUG_0001/0002 round 3 on PR #138: spec and plan docs contained
customer-specific deployment hostnames, deployment names, and a GCP
project ID that violated CLAUDE.md's vendor-agnostic OSS rule
('Nothing customer-specific belongs in code, configuration defaults,
comments, docs, commit messages, PR titles, or PR bodies').

Replacements:
  agnes-development.groupondev.com -> <your-agnes-host>
  agnes-development                -> <your-dev-instance>
  prj-grp-dataview-prod-1ff9       -> <your-data-project>
  s1_session_landings              -> <bq_table_id>

E2E verification semantics unchanged — operators still run the same
four curls + config flip + retry, just substituting their own host /
deployment name / project / table.

* fix(bq_access): hook get_bq_access.cache_clear into instance_config.reset_cache

Devin ANALYSIS_0004 on PR #138: get_bq_access is @functools.cache'd at
process level, so it captures BigQuery project IDs at first call and
ignores subsequent instance.yaml changes. Pre-Phase-2 the v2 endpoints
re-read get_value() on every request, so admin /api/admin/server-config
saves (which call instance_config.reset_cache()) hot-reloaded the BQ
project. Without this fix, my refactor silently regresses that contract
— operators editing instance.yaml via the admin UI would see no effect
on v2 endpoints until container restart.

instance_config.reset_cache() now also calls
connectors.bigquery.access.get_bq_access.cache_clear() (lazy import,
swallowed if connectors module isn't loaded — keeps instance_config
usable in isolated unit tests).

Adds test_instance_config_reset_cache_invalidates_get_bq_access as
regression guard. Updates CHANGELOG Internal entry to mention the
hot-reload contract + the not-configured sentinel behavior (round-3
fix from Devin BUG_0001 was previously only in commit message).

* fix(bq_access): surface not_configured before identifier validation + plan path genericize

Devin BUG_0001 + BUG_0002 round 5 on PR #138.

BUG_0001 (plan doc): personal filesystem path violated CLAUDE.md
vendor-agnostic rule. Replaced with '<worktree-root>' placeholder.

BUG_0002 (sentinel error path): when get_bq_access() returns the sentinel
BqAccess (BQ not configured), the empty bq.projects.data was reaching
validate_quoted_identifier first and raising ValueError -> endpoint
mapped to HTTP 400 'unsafe_identifier' instead of structured 500
'not_configured' with hint.

Each fetch helper now checks 'if not bq.projects.data: bq.client()' as
the first step, which triggers the sentinel's BqAccessError(not_configured).
Endpoint catches the typed error and returns HTTP 500 with hint pointing
at data_source.bigquery.project. Best-effort _fetch_bq_table_options
returns {} silently in this case (preserves the swallow-all contract).

* fix(bq_access): classify DuckDB-native exceptions from bigquery_query() via string match

Devin ANALYSIS on PR #138 review (latest round). The DuckDB bigquery
extension is a C++ plugin making its own HTTP calls — when BQ returns
403, it throws duckdb.IOException with the BQ error embedded as text,
not gax.Forbidden. translate_bq_error's isinstance checks would miss
these, falling to case 7 → bare 500 in production for v2_scan, v2_sample,
and v2_schema (the bigquery_query() paths).

Fix: last-resort string-match heuristic before the re-raise. 'Forbidden'
/ '403' / 'Bad Request' / '400' in the lowercased message classifies via
the same kind hierarchy. The 'serviceusage' substring still distinguishes
cross_project_forbidden from bq_forbidden. Specific enough that random
exceptions without HTTP-error keywords still re-raise.

Adds 4 unit tests covering the new heuristic + the 'don't swallow random
exceptions' invariant.

* chore(release): cut 0.22.0

PR #138 contains issue #134 user-visible behavior changes:
- BREAKING: BIGQUERY_PROJECT env var now overrides instance.yaml
  data_source.bigquery.project for v2 endpoints (previously
  RemoteQueryEngine billing only).
- Fixed: structured 502/400 on /api/v2/sample, /scan, /scan/estimate,
  /schema when BigQuery raises Forbidden/BadRequest (was bare 500).
- Internal: BqAccess facade refactor unifying four duplicate BQ-access
  call sites; instance_config.reset_cache() now invalidates BqAccess
  cache too so admin server-config saves hot-reload BQ project IDs.

Bumps to 0.22.0 because PR #137 merged first and took 0.21.0.
2026-04-30 10:11:20 +02:00

426 lines
17 KiB
Python

"""POST /api/v2/scan and POST /api/v2/scan/estimate (spec §3.4 + §3.5)."""
from __future__ import annotations
import logging
import re
from typing import Optional
import pyarrow as pa
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel, Field
import duckdb
from app.auth.dependencies import get_current_user, _get_db
from app.instance_config import get_value
from src.rbac import can_access_table
from src.repositories.table_registry import TableRegistryRepository
from app.api.where_validator import (
validate_where, safe_where_predicate, WhereValidationError,
)
from app.api.v2_schema import build_schema # reused for column resolution
from app.api.v2_arrow import arrow_table_to_ipc_bytes, CONTENT_TYPE
from app.api.v2_quota import QuotaTracker, QuotaExceededError
from connectors.bigquery.access import BqAccess, BqAccessError, get_bq_access
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v2", tags=["v2"])
class ScanRequest(BaseModel):
table_id: str
select: Optional[list[str]] = None
where: Optional[str] = None
limit: Optional[int] = Field(default=None, ge=1)
order_by: Optional[list[str]] = None
def _resolve_schema(conn, user, table_id: str, bq: BqAccess) -> dict:
"""Get {column: type} dict for the target table — used by validator + projection check."""
s = build_schema(conn, user, table_id, bq=bq)
return {c["name"]: c["type"] for c in s.get("columns", [])}
def _bq_dry_run_bytes(bq: BqAccess, sql: str) -> int:
"""Run a BQ dry-run via the google-cloud-bigquery client and return totalBytesProcessed.
SQL here is user-derived (built from req.select/where/order_by), so BadRequest → 400
(`bad_request_status="client_error"`).
"""
from google.cloud import bigquery
from connectors.bigquery.access import translate_bq_error
client = bq.client() # raises BqAccessError(bq_lib_missing/auth_failed) — propagates
try:
job = client.query(
sql,
job_config=bigquery.QueryJobConfig(dry_run=True, use_query_cache=False),
)
return int(job.total_bytes_processed or 0)
except Exception as e:
raise translate_bq_error(e, bq.projects, bad_request_status="client_error")
_COLUMN_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]{0,63}$")
_ORDER_BY_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*(\s+(ASC|DESC))?$", re.IGNORECASE)
def _validate_select_columns(select: list[str] | None, schema: dict) -> None:
"""Reject SELECT column names that don't fit the safe-identifier shape.
Schema-existence is checked separately; this guard is defense-in-depth
so a backtick / double-quote in a column name can't break out of the
`…` (BQ) or "" (DuckDB) identifier wrapper in `_build_bq_sql` and the
local-scan path. Today, schema names from BQ INFORMATION_SCHEMA never
contain those characters — but Devin called this out as relying on an
implicit upstream constraint. Make it explicit."""
if not select:
return
for entry in select:
if not _COLUMN_NAME_RE.match(entry or ""):
raise ValueError(f"invalid column name: {entry!r}")
def _validate_order_by(order_by: list[str] | None, schema: dict) -> None:
"""Reject anything other than `<column>` or `<column> ASC|DESC` against the schema.
Without this, `order_by` is concatenated raw into the FROM clause SQL — exploitable."""
if not order_by:
return
known = {c.lower() for c in schema}
for entry in order_by:
s = (entry or "").strip()
if not _ORDER_BY_RE.match(s):
raise ValueError(f"invalid order_by entry: {entry!r}")
col = s.split()[0].lower()
if col not in known:
raise ValueError(f"unknown order_by column: {entry!r}")
def _quote_order_by_bq(entry: str) -> str:
"""Backtick-quote the column part of an order_by entry, preserve direction."""
parts = entry.strip().split()
return f"`{parts[0]}`" + ("" if len(parts) == 1 else " " + " ".join(parts[1:]))
def _quote_order_by_duckdb(entry: str) -> str:
parts = entry.strip().split()
return f'"{parts[0]}"' + ("" if len(parts) == 1 else " " + " ".join(parts[1:]))
def _build_bq_sql(
table_row: dict, project_id: str, req: ScanRequest, *, safe_where: str | None = None,
) -> str:
"""Build the BQ SQL string. ``safe_where`` MUST be the comment-stripped
fragment from ``safe_where_predicate`` — splicing ``req.where`` raw lets a
`1=1 --` predicate comment out everything that follows (LIMIT/ORDER BY).
Identifier quoting: column names are validated against the schema before
we get here, but reserved words (`order`, `group`, `timestamp`, …) still
need backticks to parse as identifiers in BQ.
"""
from src.identifier_validation import validate_quoted_identifier
bucket = table_row.get('bucket') or ''
src_table = table_row.get('source_table') or req.table_id
if not (validate_quoted_identifier(project_id, "BQ project")
and validate_quoted_identifier(bucket, "BQ dataset")
and validate_quoted_identifier(src_table, "BQ source_table")):
raise ValueError("unsafe BQ identifier in registry — refusing to build SQL")
select_sql = ", ".join(f"`{c}`" for c in req.select) if req.select else "*"
table_ref = f"`{project_id}.{bucket}.{src_table}`"
sql = f"SELECT {select_sql} FROM {table_ref}"
if safe_where:
sql += f" WHERE {safe_where}"
if req.order_by:
sql += f" ORDER BY {', '.join(_quote_order_by_bq(e) for e in req.order_by)}"
if req.limit:
sql += f" LIMIT {int(req.limit)}"
return sql
def estimate(conn, user, raw_request: dict, *, bq: BqAccess) -> dict:
req = ScanRequest(**raw_request)
repo = TableRegistryRepository(conn)
row = repo.get(req.table_id)
if not row:
raise FileNotFoundError(req.table_id)
if user.get("role") != "admin" and not can_access_table(user, req.table_id, conn):
raise PermissionError(req.table_id)
schema = _resolve_schema(conn, user, req.table_id, bq)
dialect = "bigquery" if (row.get("source_type") or "") == "bigquery" else "duckdb"
# Validate WHERE and capture the comment-stripped fragment for splicing.
safe_where = (
safe_where_predicate(req.where, req.table_id, schema, dialect=dialect)
if req.where else None
)
# Validate select columns exist (case-insensitive, matching order_by).
if req.select:
_validate_select_columns(req.select, schema)
known = {c.lower() for c in schema}
unknown = [c for c in req.select if c.lower() not in known]
if unknown:
raise ValueError(f"unknown columns: {unknown}")
_validate_order_by(req.order_by, schema)
if (row.get("source_type") or "") != "bigquery":
return {
"table_id": req.table_id,
"estimated_scan_bytes": 0,
"estimated_result_rows": None,
"estimated_result_bytes": None,
"bq_cost_estimate_usd": 0.0,
}
bq_sql = _build_bq_sql(row, bq.projects.data, req, safe_where=safe_where)
scan_bytes = _bq_dry_run_bytes(bq, bq_sql)
cost_per_tb = float(get_value("api", "scan", "bq_cost_per_tb_usd", default=5.0) or 5.0)
cost = (scan_bytes / 1_099_511_627_776) * cost_per_tb # 1 TiB = 2^40
# Heuristic for result row/byte estimate. A row contains all selected
# columns, so per-row bytes = sum of per-column estimates (NOT average).
# If req.select is set, narrow to those columns; otherwise use full schema.
# Case-insensitive lookup matches the SELECT-validation policy — analysts
# often write a lowercased column name where INFORMATION_SCHEMA returned
# mixed-case; the schema lookup must follow.
schema_lower = {k.lower(): v for k, v in schema.items()}
cols_for_estimate = (
[schema_lower[c.lower()] for c in (req.select or []) if c.lower() in schema_lower]
or list(schema.values())
)
avg_row_bytes = max(1, sum(_avg_bytes_for_type(t) for t in cols_for_estimate))
rows_est = scan_bytes // max(avg_row_bytes, 1)
if req.limit:
rows_est = min(rows_est, req.limit)
return {
"table_id": req.table_id,
"estimated_scan_bytes": int(scan_bytes),
"estimated_result_rows": int(rows_est),
"estimated_result_bytes": int(rows_est * avg_row_bytes),
"bq_cost_estimate_usd": round(cost, 4),
}
def _avg_bytes_for_type(t: str) -> int:
t = (t or "").upper()
if t in ("INT64", "FLOAT64", "DATE", "TIMESTAMP", "DATETIME", "TIME"):
return 8
if t == "STRING":
return 32 # rough average
if t == "BYTES":
return 64
if t == "BOOL":
return 1
return 16
@router.post("/scan/estimate")
async def scan_estimate_endpoint(
raw: dict,
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
bq: BqAccess = Depends(get_bq_access),
):
try:
return estimate(conn, user, raw, bq=bq)
except WhereValidationError as e:
raise HTTPException(
status_code=400,
detail={"error": "validator_rejected", "kind": e.kind, "details": e.detail or {}},
)
except PermissionError:
raise HTTPException(status_code=403, detail="not authorized for this table")
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=f"table {e!s} not found")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except BqAccessError as e:
raise HTTPException(
status_code=BqAccessError.HTTP_STATUS.get(e.kind, 500),
detail={"error": e.kind, "message": e.message, "details": e.details},
)
# Module-level singleton (process-local quota state per spec §3.8). FastAPI
# dispatches sync handlers via a thread pool, so two concurrent first-time
# requests can both observe `_quota_singleton is None` and each construct a
# separate tracker; the second assignment wins and the first reference leaks
# split-brain quota state. Guard with an init lock + double-check.
import threading as _threading
_quota_init_lock = _threading.Lock()
_quota_singleton: QuotaTracker | None = None
def _build_quota_tracker() -> QuotaTracker:
"""Returns or constructs the process-local quota tracker (thread-safe)."""
global _quota_singleton
if _quota_singleton is not None:
return _quota_singleton
with _quota_init_lock:
if _quota_singleton is None:
_quota_singleton = QuotaTracker(
max_concurrent_per_user=int(get_value("api", "scan", "max_concurrent_per_user", default=5) or 5),
max_daily_bytes_per_user=int(get_value("api", "scan", "max_daily_bytes_per_user", default=53687091200) or 53687091200),
)
return _quota_singleton
def _max_result_bytes() -> int:
return int(get_value("api", "scan", "max_result_bytes", default=2_147_483_648) or 2_147_483_648)
def _max_limit() -> int:
return int(get_value("api", "scan", "max_limit", default=10_000_000) or 10_000_000)
def _run_bq_scan(bq: BqAccess, sql: str) -> pa.Table:
"""Run a BQ query via DuckDB BQ extension. Returns Arrow table.
SQL here is user-derived → BadRequest → 400 (`bad_request_status="client_error"`).
"""
from connectors.bigquery.access import translate_bq_error
with bq.duckdb_session() as conn:
try:
return conn.execute(
"SELECT * FROM bigquery_query(?, ?)",
[bq.projects.billing, sql],
).arrow()
except Exception as e:
raise translate_bq_error(e, bq.projects, bad_request_status="client_error")
def run_scan(
conn: duckdb.DuckDBPyConnection,
user: dict,
raw_request: dict,
*,
bq: BqAccess,
quota: QuotaTracker,
) -> bytes:
"""Validate → quota → execute → serialize. Returns Arrow IPC bytes.
Raises:
WhereValidationError, QuotaExceededError, FileNotFoundError, PermissionError,
ValueError, BqAccessError
"""
req = ScanRequest(**raw_request)
repo = TableRegistryRepository(conn)
row = repo.get(req.table_id)
if not row:
raise FileNotFoundError(req.table_id)
if user.get("role") != "admin" and not can_access_table(user, req.table_id, conn):
raise PermissionError(req.table_id)
if req.limit and req.limit > _max_limit():
raise ValueError(f"limit {req.limit} exceeds max {_max_limit()}")
schema = _resolve_schema(conn, user, req.table_id, bq)
dialect = "bigquery" if (row.get("source_type") or "") == "bigquery" else "duckdb"
# Validate WHERE and capture the comment-stripped fragment for splicing.
safe_where = (
safe_where_predicate(req.where, req.table_id, schema, dialect=dialect)
if req.where else None
)
if req.select:
# Case-insensitive (BQ identifiers are case-insensitive; mixed-case
# names from INFORMATION_SCHEMA.COLUMNS shouldn't 400-reject the
# lowercased form a typical analyst writes).
_validate_select_columns(req.select, schema)
known = {c.lower() for c in schema}
unknown = [c for c in req.select if c.lower() not in known]
if unknown:
raise ValueError(f"unknown columns: {unknown}")
_validate_order_by(req.order_by, schema)
source_type = row.get("source_type") or ""
user_id = user.get("email") or "anon"
# Pre-flight quota check — fail BEFORE running the BQ scan so the user
# doesn't pay for a query whose result we'd then refuse to return.
quota.check_daily_budget(user=user_id)
with quota.acquire(user=user_id):
if source_type != "bigquery":
# Local source: query parquet directly. `source_type` extracted above
# because `row["source_type"]` could be NULL for legacy registry rows
# and `Path(...) / None` raises TypeError.
from app.utils import get_data_dir
parquet = (
get_data_dir() / "extracts" / source_type / "data" / f"{req.table_id}.parquet"
)
local = duckdb.connect(":memory:")
try:
projection = ", ".join(f'"{c}"' for c in req.select) if req.select else "*"
sql = f"SELECT {projection} FROM read_parquet(?)"
if safe_where:
sql += f" WHERE {safe_where}"
if req.order_by:
sql += f" ORDER BY {', '.join(_quote_order_by_duckdb(e) for e in req.order_by)}"
if req.limit:
sql += f" LIMIT {int(req.limit)}"
table = local.execute(sql, [str(parquet)]).arrow()
finally:
local.close()
else:
bq_sql = _build_bq_sql(row, bq.projects.data, req, safe_where=safe_where)
table = _run_bq_scan(bq, bq_sql)
ipc = arrow_table_to_ipc_bytes(table)
# Enforce max_result_bytes guard (spec §3.4 step 8)
if len(ipc) > _max_result_bytes():
# Truncate by taking only as many rows as fit roughly
# Simple heuristic: cap rows to estimated avg per max_bytes
row_count = table.num_rows
avg = max(1, len(ipc) // max(row_count, 1))
keep = min(row_count, _max_result_bytes() // max(avg, 1))
table = table.slice(0, keep)
ipc = arrow_table_to_ipc_bytes(table)
# Record bytes for daily quota
quota.record_bytes(user=user_id, n=len(ipc))
return ipc
@router.post("/scan")
async def scan_endpoint(
raw: dict,
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
bq: BqAccess = Depends(get_bq_access),
):
quota = _build_quota_tracker()
try:
ipc = run_scan(conn, user, raw, bq=bq, quota=quota)
return Response(content=ipc, media_type=CONTENT_TYPE)
except WhereValidationError as e:
raise HTTPException(
status_code=400,
detail={"error": "validator_rejected", "kind": e.kind, "details": e.detail or {}},
)
except QuotaExceededError as e:
raise HTTPException(
status_code=429,
detail={
"error": "quota_exceeded",
"kind": e.kind,
"current": e.current,
"limit": e.limit,
"retry_after_seconds": e.retry_after_seconds,
},
)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="table not found")
except PermissionError:
raise HTTPException(status_code=403, detail="not authorized")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except BqAccessError as e:
raise HTTPException(
status_code=BqAccessError.HTTP_STATUS.get(e.kind, 500),
detail={"error": e.kind, "message": e.message, "details": e.details},
)