* docs(spec): #134 unify BigQuery access behind BqAccess facade Brainstorm output for issue #134. Captures: - root cause (incl. correction of the issue's hypothesis about commit 33a9964) - BqAccess facade API + project resolution rules - error contract — typed BqAccessError mapped to HTTP 502 for upstream BQ failures, 500 for deployment/config bugs - migration plan for v2_scan, v2_sample, RemoteQueryEngine - test rewrite eliminating _bq_client_factory injection point - E2E verification protocol on agnes-development as success criterion * docs(spec): #134 revise after first review Incorporates code-reviewer findings: Must-fix: - Add v2_schema (2 copies of INSTALL/LOAD/SECRET dance) to migration scope. - Reframe v2_scan headline: missing try/except around BQ calls is the actual cause of bare 500s, not project resolution (which 33a9964 fixed). - List two more deferred call sites (extractor.py, register_bq_table) with explicit rationale. Important: - Drop billing != data clause from cross_project_forbidden heuristic; rely only on 'serviceusage' substring. billing != data is normal for cross-project setup, was over-classifying. - Split bq_bad_request into _user (400) and _server (502) variants; add sql_origin parameter to translate_bq_error so call sites declare whether SQL contains user input. - Add @functools.cache to BqAccess.from_config; document tests bypass via dependency_overrides. - Replace monkey-patched-classmethod test pattern with BqAccess(client_factory=...) injection at construction time. Cleaner than today's _bq_client_factory and 1:1 migration shape. - Keep BqProjects.data (reviewer assumed registry has source_project; it doesn't). Multi-project explicitly listed as non-goal with note. Nice-to-have: - Add 'Implementation strategy' section: 2 staged commits (bug fix alone is revertable; refactor follows). - Extend E2E protocol to cover all three endpoints, not just /sample. - Note removal of stale docstring at src/remote_query.py:204. * docs(spec): #134 revision 3 — incorporates second-round review Must-fix from second review: - v2_schema split into two migration cases: _fetch_bq_schema translates errors via translate_bq_error; _fetch_bq_table_options preserves its swallow-all 'except Exception → return {}' so /schema doesn't 502 on partition-info failures. - RemoteQueryEngine.__init__ now resolves BqAccess lazily (in _get_bq_client, not in __init__). Without this, ~7 DuckDB-only tests in test_remote_query.py would suddenly fail with not_configured. - translate_bq_error pass-through for BqAccessError is now load-bearing (clause 1, before any Google-API branch). bq.client() raises BqAccessError for bq_lib_missing/auth_failed; without explicit pass-through those fall to 'unknown' and re-raise as bare 500. - Commit 1 now emits the SAME structured response shape as commit 2 to avoid contract churn between commits. - BIGQUERY_PROJECT env-var precedence is BREAKING for env-only deployments — flagged in CHANGELOG ### Changed. Editorial: - sql_origin renamed to bad_request_status with values 'client_error' / 'upstream_error' (clearer about what the parameter actually decides). bq_bad_request_user/_server kinds collapsed to bq_bad_request (400) and bq_upstream_error (502). - CLI (cli/commands/query.py) noted as external RemoteQueryEngine caller; unaffected because new bq_access kwarg has default None. - Added unit/integration tests for the new contracts: test_translate_passes_through_BqAccessError, test_v2_scan_returns_500_on_bq_lib_missing, test_v2_schema_returns_200_with_empty_partition_on_bq_failure, test_resolve_succeeds_after_config_set. - E2E protocol now covers /schema as the fourth endpoint. - Documented functools.cache-doesn't-cache-exceptions semantics and fixture nullcontext-doesn't-close caveat for nested sessions. * docs(spec): #134 revision 4 — incorporates third-round review Third reviewer verdict: 'implementation-ready with two trivial edits'; explicitly noted prior rounds did the heavy lifting. Edits: 1. get_bq_access() module-level function instead of @classmethod @functools.cache from_config. Removes the classmethod-cache stacking footgun (different Python versions wrap differently) and gives FastAPI's dependency introspection a clean function signature. Drops the 'Do not subclass BqAccess' caveat that no longer applies. 2. Commit 1 strategy explicitly: wrap _fetch_bq_sample (v2_sample), _bq_dry_run_bytes + _run_bq_scan (v2_scan), and _fetch_bq_schema (v2_schema strict block). Do NOT touch _fetch_bq_table_options swallow-all in commit 1 — preserved as-is, then migrated (still preserved) in commit 2. All three endpoints emit the same structured body shape so client parsers see one consistent contract throughout the staged rollout. No more half-rolled-out window where /sample is bare 500 while /scan is structured 502. * docs(plan): #134 implementation plan — Phase 1 (atomic bug fix) + Phase 2 (BqAccess refactor) + Phase 3 (verification) Bite-sized TDD tasks. 3 phases, 16 tasks total: Phase 1 (Commit 1) — atomic bug fix across all four v2 endpoints: Tasks 1.1-1.5 wrap _fetch_bq_sample, _bq_dry_run_bytes, _run_bq_scan, _fetch_bq_schema with structured 502/400 try/except. _fetch_bq_table_options preserved untouched. CHANGELOG Fixed entries. Phase 2 (Commit 2) — BqAccess facade extraction + migration: Tasks 2.1-2.5 build connectors/bigquery/access.py bottom-up (BqProjects, BqAccessError, translate_bq_error, default factories, BqAccess class, get_bq_access module-level cached). Task 2.6 adds conftest.py fixture. Tasks 2.7-2.9 migrate v2_scan, v2_sample, v2_schema to BqAccess. Tasks 2.10-2.11 migrate RemoteQueryEngine + tests (lazy bq_access, drop _bq_client_factory). Task 2.12 CHANGELOG Changed BREAKING + Internal. Phase 3 — Verification: 3.1 full pytest. 3.2 squash into two PR-shape commits. 3.3 manual E2E on agnes-development per spec protocol → close #134. Self-review table maps spec sections to implementing tasks; no gaps. * fix(v2): #134 structured 502/400 on BQ errors across /scan, /scan/estimate, /sample, /schema Wraps the BigQuery call sites in v2_scan, v2_sample, and v2_schema (strict block only) with try/except for google.api_core exceptions, translating to HTTPException with a structured body shape: {error, message, details}. Fixes Pavel's report (#134) where these endpoints returned bare HTTP 500 with no body when the SA on agnes-development hit cross-project Forbidden on serviceusage.services.use. Also fixes /sample's missing billing_project fallback (the bug 33a9964 fixed for /scan never landed here). Status code split: - /scan, /scan/estimate: BadRequest -> 400 (bq_bad_request) since SQL is user-derived from req.select/where/order_by. - /sample, /schema: BadRequest -> 502 (bq_upstream_error) since SQL is server-constructed from validated identifiers. - All Forbidden -> 502 with cross_project_forbidden if 'serviceusage' in error message (with hint pointing at data_source.bigquery.billing_project), else bq_forbidden. Body shape matches what the upcoming BqAccess refactor (next commit) will produce, so client-side parsers see one consistent contract throughout the staged rollout. _fetch_bq_table_options preserved exactly as-is — its swallow-all-and-return-empty contract is intentional and survives into the refactor; /schema continues to return 200 with empty partition info when partition queries fail. Outer wraps in scan_endpoint, scan_estimate_endpoint, sample, and schema endpoints exist only to make the test pattern (monkeypatching whole _fetch_* functions) work, and are tagged TODO(#134 Phase 2) for removal once BqAccess centralizes translation. * refactor(bq): #134 BqAccess facade — unify v2_scan, v2_sample, v2_schema, RemoteQueryEngine Extracts the duplicated BigQuery-access pattern (project resolution + client construction + DuckDB-extension session + Google-API error translation) into connectors/bigquery/access.py. Migrates four call sites to use it: - app/api/v2_scan.py — _bq_dry_run_bytes, _run_bq_scan - app/api/v2_sample.py — _fetch_bq_sample - app/api/v2_schema.py — _fetch_bq_schema (strict translation), _fetch_bq_table_options (preserves swallow-all best-effort contract) - src/remote_query.py — RemoteQueryEngine, lazy bq_access kwarg The new module exposes: - BqProjects (frozen dataclass: billing + data project IDs) - BqAccessError (typed exception with HTTP_STATUS class mapping) - BqAccess (facade with injectable client_factory/duckdb_session_factory for tests; defaults call the real google-cloud-bigquery + DuckDB extension) - get_bq_access (module-level @functools.cache; FastAPI Depends target) - translate_bq_error (Google API exception → BqAccessError mapper, with BqAccessError pass-through, 'serviceusage'-substring heuristic for cross_project_forbidden, and bad_request_status param distinguishing user-derived (400) from server-constructed (502) SQL) - _default_client_factory, _default_duckdb_session_factory RemoteQueryEngine.__init__ no longer accepts _bq_client_factory; tests migrate to bq_access=BqAccess(projects, client_factory=...). DuckDB-only RemoteQueryEngine tests need no changes — bq_access defaults to None and get_bq_access() is only invoked on first BQ call (lazy resolution). BqAccessError raised internally is translated to RemoteQueryError( error_type="bq_error") in _get_bq_client to preserve the engine's existing public contract — CLI and /api/query/hybrid callers see no change. Endpoint tests (test_v2_scan, test_v2_scan_estimate, test_v2_sample, test_v2_schema) migrate from monkey-patching whole _fetch_* functions to using the new bq_access fixture in tests/conftest.py — which exercises the REAL translation path through BqAccess + translate_bq_error, closing the test gap flagged in Task 1.1's review. Side-effect behavior change: v2_sample's FROM clause now uses the data project (instance.yaml data_source.bigquery.project), not the conflated billing_project from Phase 1. Documented in CHANGELOG ### Internal. BREAKING for deployments combining BIGQUERY_PROJECT env var with data_source.bigquery.project in instance.yaml — env var now overrides data project too. See CHANGELOG ### Changed. Two known-duplicate BQ-access sites (connectors/bigquery/extractor.py, scripts/duckdb_manager.register_bq_table) explicitly out of scope; tracked as follow-up. Removed stale docstring at the previous src/remote_query.py:204 that referenced scripts.duckdb_manager._create_bq_client as the default BQ client factory (RemoteQueryEngine never actually used that function). Test counts: tests/test_bq_access.py +27 (new), tests/test_v2_*.py + tests/test_remote_query.py migrated to bq_access fixture (counts unchanged or +1-2 per file). Full suite: 2086 passed, 8 pre-existing failures (DB migration tests with unrelated internal_roles DependencyException — not introduced by this PR). * fix(bq_access): translate DefaultCredentialsError to BqAccessError(auth_failed) CI on PR #138 caught: bigquery.Client(...) resolves Application Default Credentials at construction time; without ADC (CI without SA key, dev laptop without 'gcloud auth application-default login') it raises google.auth.exceptions.DefaultCredentialsError synchronously. Pre-fix _default_client_factory only caught ImportError, so DefaultCredentialsError propagated as raw exception — and from production endpoints would surface as bare 500 (the exact failure mode #134 sets out to fix). Now translates to BqAccessError(kind='auth_failed', details.hint='Run gcloud auth application-default login...'). Endpoint catch chain returns HTTP 502 with structured body. Adds unit test test_raises_auth_failed_on_default_credentials_error. Third-round spec review flagged this case in passing; the fix didn't land. CI's auth-less environment surfaced it. * fix(bq_access): get_bq_access() returns sentinel instead of raising when not configured Devin BUG_0001 on PR #138 review: 'get_bq_access() as FastAPI Depends breaks all v2 endpoints for non-BigQuery instances'. Pre-fix: get_bq_access() raised BqAccessError(not_configured) when neither BIGQUERY_PROJECT env nor data_source.bigquery.project was set. Because FastAPI resolves Depends() BEFORE the endpoint body runs, this exception fires during dep-injection — the endpoint's try/except BqAccessError clause never gets a chance to catch it. Result: every v2 request on Keboola-only or CSV-only instances returned bare HTTP 500, even for local-source tables that never touch BigQuery. Fix: get_bq_access() now returns a sentinel BqAccess with empty BqProjects and factories that raise BqAccessError(not_configured) on actual use. Construction succeeds, FastAPI's dep-injection cleanly yields the sentinel, the endpoint runs. The local-source code path in build_sample / build_schema / etc. never calls bq.client() or bq.duckdb_session() (it reads parquet directly), so non-BQ tables return 200 as before. Only when an endpoint actually tries to query BQ (source_type == 'bigquery') does the sentinel raise — and the endpoint's existing except BqAccessError catches it normally, returning structured 502 with hint. Test get_bq_access::test_raises_not_configured_when_neither_set renamed and rewritten to test_returns_sentinel_when_neither_set: asserts BqAccess is returned, then asserts client() and duckdb_session() each raise BqAccessError(not_configured) on call. Test test_does_not_cache_exceptions removed (no longer applicable) and replaced with test_sentinel_is_cached_per_process documenting the operator-restart-on-config-change contract. * docs(spec+plan): #134 genericize customer-specific tokens (CLAUDE.md OSS rule) Devin BUG_0001/0002 round 3 on PR #138: spec and plan docs contained customer-specific deployment hostnames, deployment names, and a GCP project ID that violated CLAUDE.md's vendor-agnostic OSS rule ('Nothing customer-specific belongs in code, configuration defaults, comments, docs, commit messages, PR titles, or PR bodies'). Replacements: agnes-development.groupondev.com -> <your-agnes-host> agnes-development -> <your-dev-instance> prj-grp-dataview-prod-1ff9 -> <your-data-project> s1_session_landings -> <bq_table_id> E2E verification semantics unchanged — operators still run the same four curls + config flip + retry, just substituting their own host / deployment name / project / table. * fix(bq_access): hook get_bq_access.cache_clear into instance_config.reset_cache Devin ANALYSIS_0004 on PR #138: get_bq_access is @functools.cache'd at process level, so it captures BigQuery project IDs at first call and ignores subsequent instance.yaml changes. Pre-Phase-2 the v2 endpoints re-read get_value() on every request, so admin /api/admin/server-config saves (which call instance_config.reset_cache()) hot-reloaded the BQ project. Without this fix, my refactor silently regresses that contract — operators editing instance.yaml via the admin UI would see no effect on v2 endpoints until container restart. instance_config.reset_cache() now also calls connectors.bigquery.access.get_bq_access.cache_clear() (lazy import, swallowed if connectors module isn't loaded — keeps instance_config usable in isolated unit tests). Adds test_instance_config_reset_cache_invalidates_get_bq_access as regression guard. Updates CHANGELOG Internal entry to mention the hot-reload contract + the not-configured sentinel behavior (round-3 fix from Devin BUG_0001 was previously only in commit message). * fix(bq_access): surface not_configured before identifier validation + plan path genericize Devin BUG_0001 + BUG_0002 round 5 on PR #138. BUG_0001 (plan doc): personal filesystem path violated CLAUDE.md vendor-agnostic rule. Replaced with '<worktree-root>' placeholder. BUG_0002 (sentinel error path): when get_bq_access() returns the sentinel BqAccess (BQ not configured), the empty bq.projects.data was reaching validate_quoted_identifier first and raising ValueError -> endpoint mapped to HTTP 400 'unsafe_identifier' instead of structured 500 'not_configured' with hint. Each fetch helper now checks 'if not bq.projects.data: bq.client()' as the first step, which triggers the sentinel's BqAccessError(not_configured). Endpoint catches the typed error and returns HTTP 500 with hint pointing at data_source.bigquery.project. Best-effort _fetch_bq_table_options returns {} silently in this case (preserves the swallow-all contract). * fix(bq_access): classify DuckDB-native exceptions from bigquery_query() via string match Devin ANALYSIS on PR #138 review (latest round). The DuckDB bigquery extension is a C++ plugin making its own HTTP calls — when BQ returns 403, it throws duckdb.IOException with the BQ error embedded as text, not gax.Forbidden. translate_bq_error's isinstance checks would miss these, falling to case 7 → bare 500 in production for v2_scan, v2_sample, and v2_schema (the bigquery_query() paths). Fix: last-resort string-match heuristic before the re-raise. 'Forbidden' / '403' / 'Bad Request' / '400' in the lowercased message classifies via the same kind hierarchy. The 'serviceusage' substring still distinguishes cross_project_forbidden from bq_forbidden. Specific enough that random exceptions without HTTP-error keywords still re-raise. Adds 4 unit tests covering the new heuristic + the 'don't swallow random exceptions' invariant. * chore(release): cut 0.22.0 PR #138 contains issue #134 user-visible behavior changes: - BREAKING: BIGQUERY_PROJECT env var now overrides instance.yaml data_source.bigquery.project for v2 endpoints (previously RemoteQueryEngine billing only). - Fixed: structured 502/400 on /api/v2/sample, /scan, /scan/estimate, /schema when BigQuery raises Forbidden/BadRequest (was bare 500). - Internal: BqAccess facade refactor unifying four duplicate BQ-access call sites; instance_config.reset_cache() now invalidates BqAccess cache too so admin server-config saves hot-reload BQ project IDs. Bumps to 0.22.0 because PR #137 merged first and took 0.21.0.
426 lines
17 KiB
Python
426 lines
17 KiB
Python
"""POST /api/v2/scan and POST /api/v2/scan/estimate (spec §3.4 + §3.5)."""
|
|
|
|
from __future__ import annotations
|
|
import logging
|
|
import re
|
|
from typing import Optional
|
|
|
|
import pyarrow as pa
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from fastapi.responses import Response
|
|
from pydantic import BaseModel, Field
|
|
import duckdb
|
|
|
|
from app.auth.dependencies import get_current_user, _get_db
|
|
from app.instance_config import get_value
|
|
from src.rbac import can_access_table
|
|
from src.repositories.table_registry import TableRegistryRepository
|
|
from app.api.where_validator import (
|
|
validate_where, safe_where_predicate, WhereValidationError,
|
|
)
|
|
from app.api.v2_schema import build_schema # reused for column resolution
|
|
from app.api.v2_arrow import arrow_table_to_ipc_bytes, CONTENT_TYPE
|
|
from app.api.v2_quota import QuotaTracker, QuotaExceededError
|
|
from connectors.bigquery.access import BqAccess, BqAccessError, get_bq_access
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/v2", tags=["v2"])
|
|
|
|
|
|
class ScanRequest(BaseModel):
|
|
table_id: str
|
|
select: Optional[list[str]] = None
|
|
where: Optional[str] = None
|
|
limit: Optional[int] = Field(default=None, ge=1)
|
|
order_by: Optional[list[str]] = None
|
|
|
|
|
|
def _resolve_schema(conn, user, table_id: str, bq: BqAccess) -> dict:
|
|
"""Get {column: type} dict for the target table — used by validator + projection check."""
|
|
s = build_schema(conn, user, table_id, bq=bq)
|
|
return {c["name"]: c["type"] for c in s.get("columns", [])}
|
|
|
|
|
|
def _bq_dry_run_bytes(bq: BqAccess, sql: str) -> int:
|
|
"""Run a BQ dry-run via the google-cloud-bigquery client and return totalBytesProcessed.
|
|
|
|
SQL here is user-derived (built from req.select/where/order_by), so BadRequest → 400
|
|
(`bad_request_status="client_error"`).
|
|
"""
|
|
from google.cloud import bigquery
|
|
from connectors.bigquery.access import translate_bq_error
|
|
|
|
client = bq.client() # raises BqAccessError(bq_lib_missing/auth_failed) — propagates
|
|
try:
|
|
job = client.query(
|
|
sql,
|
|
job_config=bigquery.QueryJobConfig(dry_run=True, use_query_cache=False),
|
|
)
|
|
return int(job.total_bytes_processed or 0)
|
|
except Exception as e:
|
|
raise translate_bq_error(e, bq.projects, bad_request_status="client_error")
|
|
|
|
|
|
_COLUMN_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]{0,63}$")
|
|
_ORDER_BY_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*(\s+(ASC|DESC))?$", re.IGNORECASE)
|
|
|
|
|
|
def _validate_select_columns(select: list[str] | None, schema: dict) -> None:
|
|
"""Reject SELECT column names that don't fit the safe-identifier shape.
|
|
|
|
Schema-existence is checked separately; this guard is defense-in-depth
|
|
so a backtick / double-quote in a column name can't break out of the
|
|
`…` (BQ) or "…" (DuckDB) identifier wrapper in `_build_bq_sql` and the
|
|
local-scan path. Today, schema names from BQ INFORMATION_SCHEMA never
|
|
contain those characters — but Devin called this out as relying on an
|
|
implicit upstream constraint. Make it explicit."""
|
|
if not select:
|
|
return
|
|
for entry in select:
|
|
if not _COLUMN_NAME_RE.match(entry or ""):
|
|
raise ValueError(f"invalid column name: {entry!r}")
|
|
|
|
|
|
def _validate_order_by(order_by: list[str] | None, schema: dict) -> None:
|
|
"""Reject anything other than `<column>` or `<column> ASC|DESC` against the schema.
|
|
Without this, `order_by` is concatenated raw into the FROM clause SQL — exploitable."""
|
|
if not order_by:
|
|
return
|
|
known = {c.lower() for c in schema}
|
|
for entry in order_by:
|
|
s = (entry or "").strip()
|
|
if not _ORDER_BY_RE.match(s):
|
|
raise ValueError(f"invalid order_by entry: {entry!r}")
|
|
col = s.split()[0].lower()
|
|
if col not in known:
|
|
raise ValueError(f"unknown order_by column: {entry!r}")
|
|
|
|
|
|
def _quote_order_by_bq(entry: str) -> str:
|
|
"""Backtick-quote the column part of an order_by entry, preserve direction."""
|
|
parts = entry.strip().split()
|
|
return f"`{parts[0]}`" + ("" if len(parts) == 1 else " " + " ".join(parts[1:]))
|
|
|
|
|
|
def _quote_order_by_duckdb(entry: str) -> str:
|
|
parts = entry.strip().split()
|
|
return f'"{parts[0]}"' + ("" if len(parts) == 1 else " " + " ".join(parts[1:]))
|
|
|
|
|
|
def _build_bq_sql(
|
|
table_row: dict, project_id: str, req: ScanRequest, *, safe_where: str | None = None,
|
|
) -> str:
|
|
"""Build the BQ SQL string. ``safe_where`` MUST be the comment-stripped
|
|
fragment from ``safe_where_predicate`` — splicing ``req.where`` raw lets a
|
|
`1=1 --` predicate comment out everything that follows (LIMIT/ORDER BY).
|
|
|
|
Identifier quoting: column names are validated against the schema before
|
|
we get here, but reserved words (`order`, `group`, `timestamp`, …) still
|
|
need backticks to parse as identifiers in BQ.
|
|
"""
|
|
from src.identifier_validation import validate_quoted_identifier
|
|
bucket = table_row.get('bucket') or ''
|
|
src_table = table_row.get('source_table') or req.table_id
|
|
if not (validate_quoted_identifier(project_id, "BQ project")
|
|
and validate_quoted_identifier(bucket, "BQ dataset")
|
|
and validate_quoted_identifier(src_table, "BQ source_table")):
|
|
raise ValueError("unsafe BQ identifier in registry — refusing to build SQL")
|
|
|
|
select_sql = ", ".join(f"`{c}`" for c in req.select) if req.select else "*"
|
|
table_ref = f"`{project_id}.{bucket}.{src_table}`"
|
|
sql = f"SELECT {select_sql} FROM {table_ref}"
|
|
if safe_where:
|
|
sql += f" WHERE {safe_where}"
|
|
if req.order_by:
|
|
sql += f" ORDER BY {', '.join(_quote_order_by_bq(e) for e in req.order_by)}"
|
|
if req.limit:
|
|
sql += f" LIMIT {int(req.limit)}"
|
|
return sql
|
|
|
|
|
|
def estimate(conn, user, raw_request: dict, *, bq: BqAccess) -> dict:
|
|
req = ScanRequest(**raw_request)
|
|
repo = TableRegistryRepository(conn)
|
|
row = repo.get(req.table_id)
|
|
if not row:
|
|
raise FileNotFoundError(req.table_id)
|
|
if user.get("role") != "admin" and not can_access_table(user, req.table_id, conn):
|
|
raise PermissionError(req.table_id)
|
|
|
|
schema = _resolve_schema(conn, user, req.table_id, bq)
|
|
dialect = "bigquery" if (row.get("source_type") or "") == "bigquery" else "duckdb"
|
|
|
|
# Validate WHERE and capture the comment-stripped fragment for splicing.
|
|
safe_where = (
|
|
safe_where_predicate(req.where, req.table_id, schema, dialect=dialect)
|
|
if req.where else None
|
|
)
|
|
# Validate select columns exist (case-insensitive, matching order_by).
|
|
if req.select:
|
|
_validate_select_columns(req.select, schema)
|
|
known = {c.lower() for c in schema}
|
|
unknown = [c for c in req.select if c.lower() not in known]
|
|
if unknown:
|
|
raise ValueError(f"unknown columns: {unknown}")
|
|
_validate_order_by(req.order_by, schema)
|
|
|
|
if (row.get("source_type") or "") != "bigquery":
|
|
return {
|
|
"table_id": req.table_id,
|
|
"estimated_scan_bytes": 0,
|
|
"estimated_result_rows": None,
|
|
"estimated_result_bytes": None,
|
|
"bq_cost_estimate_usd": 0.0,
|
|
}
|
|
|
|
bq_sql = _build_bq_sql(row, bq.projects.data, req, safe_where=safe_where)
|
|
scan_bytes = _bq_dry_run_bytes(bq, bq_sql)
|
|
|
|
cost_per_tb = float(get_value("api", "scan", "bq_cost_per_tb_usd", default=5.0) or 5.0)
|
|
cost = (scan_bytes / 1_099_511_627_776) * cost_per_tb # 1 TiB = 2^40
|
|
|
|
# Heuristic for result row/byte estimate. A row contains all selected
|
|
# columns, so per-row bytes = sum of per-column estimates (NOT average).
|
|
# If req.select is set, narrow to those columns; otherwise use full schema.
|
|
# Case-insensitive lookup matches the SELECT-validation policy — analysts
|
|
# often write a lowercased column name where INFORMATION_SCHEMA returned
|
|
# mixed-case; the schema lookup must follow.
|
|
schema_lower = {k.lower(): v for k, v in schema.items()}
|
|
cols_for_estimate = (
|
|
[schema_lower[c.lower()] for c in (req.select or []) if c.lower() in schema_lower]
|
|
or list(schema.values())
|
|
)
|
|
avg_row_bytes = max(1, sum(_avg_bytes_for_type(t) for t in cols_for_estimate))
|
|
rows_est = scan_bytes // max(avg_row_bytes, 1)
|
|
if req.limit:
|
|
rows_est = min(rows_est, req.limit)
|
|
|
|
return {
|
|
"table_id": req.table_id,
|
|
"estimated_scan_bytes": int(scan_bytes),
|
|
"estimated_result_rows": int(rows_est),
|
|
"estimated_result_bytes": int(rows_est * avg_row_bytes),
|
|
"bq_cost_estimate_usd": round(cost, 4),
|
|
}
|
|
|
|
|
|
def _avg_bytes_for_type(t: str) -> int:
|
|
t = (t or "").upper()
|
|
if t in ("INT64", "FLOAT64", "DATE", "TIMESTAMP", "DATETIME", "TIME"):
|
|
return 8
|
|
if t == "STRING":
|
|
return 32 # rough average
|
|
if t == "BYTES":
|
|
return 64
|
|
if t == "BOOL":
|
|
return 1
|
|
return 16
|
|
|
|
|
|
@router.post("/scan/estimate")
|
|
async def scan_estimate_endpoint(
|
|
raw: dict,
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
bq: BqAccess = Depends(get_bq_access),
|
|
):
|
|
try:
|
|
return estimate(conn, user, raw, bq=bq)
|
|
except WhereValidationError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={"error": "validator_rejected", "kind": e.kind, "details": e.detail or {}},
|
|
)
|
|
except PermissionError:
|
|
raise HTTPException(status_code=403, detail="not authorized for this table")
|
|
except FileNotFoundError as e:
|
|
raise HTTPException(status_code=404, detail=f"table {e!s} not found")
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except BqAccessError as e:
|
|
raise HTTPException(
|
|
status_code=BqAccessError.HTTP_STATUS.get(e.kind, 500),
|
|
detail={"error": e.kind, "message": e.message, "details": e.details},
|
|
)
|
|
|
|
|
|
# Module-level singleton (process-local quota state per spec §3.8). FastAPI
|
|
# dispatches sync handlers via a thread pool, so two concurrent first-time
|
|
# requests can both observe `_quota_singleton is None` and each construct a
|
|
# separate tracker; the second assignment wins and the first reference leaks
|
|
# split-brain quota state. Guard with an init lock + double-check.
|
|
import threading as _threading
|
|
_quota_init_lock = _threading.Lock()
|
|
_quota_singleton: QuotaTracker | None = None
|
|
|
|
|
|
def _build_quota_tracker() -> QuotaTracker:
|
|
"""Returns or constructs the process-local quota tracker (thread-safe)."""
|
|
global _quota_singleton
|
|
if _quota_singleton is not None:
|
|
return _quota_singleton
|
|
with _quota_init_lock:
|
|
if _quota_singleton is None:
|
|
_quota_singleton = QuotaTracker(
|
|
max_concurrent_per_user=int(get_value("api", "scan", "max_concurrent_per_user", default=5) or 5),
|
|
max_daily_bytes_per_user=int(get_value("api", "scan", "max_daily_bytes_per_user", default=53687091200) or 53687091200),
|
|
)
|
|
return _quota_singleton
|
|
|
|
|
|
def _max_result_bytes() -> int:
|
|
return int(get_value("api", "scan", "max_result_bytes", default=2_147_483_648) or 2_147_483_648)
|
|
|
|
|
|
def _max_limit() -> int:
|
|
return int(get_value("api", "scan", "max_limit", default=10_000_000) or 10_000_000)
|
|
|
|
|
|
def _run_bq_scan(bq: BqAccess, sql: str) -> pa.Table:
|
|
"""Run a BQ query via DuckDB BQ extension. Returns Arrow table.
|
|
|
|
SQL here is user-derived → BadRequest → 400 (`bad_request_status="client_error"`).
|
|
"""
|
|
from connectors.bigquery.access import translate_bq_error
|
|
|
|
with bq.duckdb_session() as conn:
|
|
try:
|
|
return conn.execute(
|
|
"SELECT * FROM bigquery_query(?, ?)",
|
|
[bq.projects.billing, sql],
|
|
).arrow()
|
|
except Exception as e:
|
|
raise translate_bq_error(e, bq.projects, bad_request_status="client_error")
|
|
|
|
|
|
def run_scan(
|
|
conn: duckdb.DuckDBPyConnection,
|
|
user: dict,
|
|
raw_request: dict,
|
|
*,
|
|
bq: BqAccess,
|
|
quota: QuotaTracker,
|
|
) -> bytes:
|
|
"""Validate → quota → execute → serialize. Returns Arrow IPC bytes.
|
|
|
|
Raises:
|
|
WhereValidationError, QuotaExceededError, FileNotFoundError, PermissionError,
|
|
ValueError, BqAccessError
|
|
"""
|
|
req = ScanRequest(**raw_request)
|
|
repo = TableRegistryRepository(conn)
|
|
row = repo.get(req.table_id)
|
|
if not row:
|
|
raise FileNotFoundError(req.table_id)
|
|
if user.get("role") != "admin" and not can_access_table(user, req.table_id, conn):
|
|
raise PermissionError(req.table_id)
|
|
|
|
if req.limit and req.limit > _max_limit():
|
|
raise ValueError(f"limit {req.limit} exceeds max {_max_limit()}")
|
|
|
|
schema = _resolve_schema(conn, user, req.table_id, bq)
|
|
dialect = "bigquery" if (row.get("source_type") or "") == "bigquery" else "duckdb"
|
|
# Validate WHERE and capture the comment-stripped fragment for splicing.
|
|
safe_where = (
|
|
safe_where_predicate(req.where, req.table_id, schema, dialect=dialect)
|
|
if req.where else None
|
|
)
|
|
if req.select:
|
|
# Case-insensitive (BQ identifiers are case-insensitive; mixed-case
|
|
# names from INFORMATION_SCHEMA.COLUMNS shouldn't 400-reject the
|
|
# lowercased form a typical analyst writes).
|
|
_validate_select_columns(req.select, schema)
|
|
known = {c.lower() for c in schema}
|
|
unknown = [c for c in req.select if c.lower() not in known]
|
|
if unknown:
|
|
raise ValueError(f"unknown columns: {unknown}")
|
|
_validate_order_by(req.order_by, schema)
|
|
|
|
source_type = row.get("source_type") or ""
|
|
user_id = user.get("email") or "anon"
|
|
|
|
# Pre-flight quota check — fail BEFORE running the BQ scan so the user
|
|
# doesn't pay for a query whose result we'd then refuse to return.
|
|
quota.check_daily_budget(user=user_id)
|
|
|
|
with quota.acquire(user=user_id):
|
|
if source_type != "bigquery":
|
|
# Local source: query parquet directly. `source_type` extracted above
|
|
# because `row["source_type"]` could be NULL for legacy registry rows
|
|
# and `Path(...) / None` raises TypeError.
|
|
from app.utils import get_data_dir
|
|
parquet = (
|
|
get_data_dir() / "extracts" / source_type / "data" / f"{req.table_id}.parquet"
|
|
)
|
|
local = duckdb.connect(":memory:")
|
|
try:
|
|
projection = ", ".join(f'"{c}"' for c in req.select) if req.select else "*"
|
|
sql = f"SELECT {projection} FROM read_parquet(?)"
|
|
if safe_where:
|
|
sql += f" WHERE {safe_where}"
|
|
if req.order_by:
|
|
sql += f" ORDER BY {', '.join(_quote_order_by_duckdb(e) for e in req.order_by)}"
|
|
if req.limit:
|
|
sql += f" LIMIT {int(req.limit)}"
|
|
table = local.execute(sql, [str(parquet)]).arrow()
|
|
finally:
|
|
local.close()
|
|
else:
|
|
bq_sql = _build_bq_sql(row, bq.projects.data, req, safe_where=safe_where)
|
|
table = _run_bq_scan(bq, bq_sql)
|
|
|
|
ipc = arrow_table_to_ipc_bytes(table)
|
|
|
|
# Enforce max_result_bytes guard (spec §3.4 step 8)
|
|
if len(ipc) > _max_result_bytes():
|
|
# Truncate by taking only as many rows as fit roughly
|
|
# Simple heuristic: cap rows to estimated avg per max_bytes
|
|
row_count = table.num_rows
|
|
avg = max(1, len(ipc) // max(row_count, 1))
|
|
keep = min(row_count, _max_result_bytes() // max(avg, 1))
|
|
table = table.slice(0, keep)
|
|
ipc = arrow_table_to_ipc_bytes(table)
|
|
|
|
# Record bytes for daily quota
|
|
quota.record_bytes(user=user_id, n=len(ipc))
|
|
return ipc
|
|
|
|
|
|
@router.post("/scan")
|
|
async def scan_endpoint(
|
|
raw: dict,
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
bq: BqAccess = Depends(get_bq_access),
|
|
):
|
|
quota = _build_quota_tracker()
|
|
try:
|
|
ipc = run_scan(conn, user, raw, bq=bq, quota=quota)
|
|
return Response(content=ipc, media_type=CONTENT_TYPE)
|
|
except WhereValidationError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={"error": "validator_rejected", "kind": e.kind, "details": e.detail or {}},
|
|
)
|
|
except QuotaExceededError as e:
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail={
|
|
"error": "quota_exceeded",
|
|
"kind": e.kind,
|
|
"current": e.current,
|
|
"limit": e.limit,
|
|
"retry_after_seconds": e.retry_after_seconds,
|
|
},
|
|
)
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail="table not found")
|
|
except PermissionError:
|
|
raise HTTPException(status_code=403, detail="not authorized")
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except BqAccessError as e:
|
|
raise HTTPException(
|
|
status_code=BqAccessError.HTTP_STATUS.get(e.kind, 500),
|
|
detail={"error": e.kind, "message": e.message, "details": e.details},
|
|
)
|