diff --git a/CHANGELOG.md b/CHANGELOG.md index c89d3f9..9a75489 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C to the data_source Save button; on failure the inline result uses the same structured shape as the CLI renderer so operators see the same hint format admins do. -- **`api.query.bq_max_scan_bytes` server-config knob** (default 5 GiB): +- **`data_source.bigquery.bq_max_scan_bytes` server-config knob** (default 5 GiB): caps the BigQuery scan that `da query --remote` will issue against `query_mode='remote'` BQ rows. Exceeded queries are rejected with a structured `400 remote_scan_too_large` detail naming the bytes, diff --git a/app/api/query.py b/app/api/query.py index ce80abd..5ea2179 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -1,5 +1,6 @@ """Query endpoint — execute SQL against server DuckDB.""" +import contextlib import logging import os import re @@ -141,31 +142,41 @@ async def execute_query( if blocked_bq_path is not None: raise HTTPException(status_code=403, detail=blocked_bq_path) - if dry_run_set: - _enforce_remote_bq_quota_and_cap( - user_id=user.get("id") or user.get("email") or "anon", - dry_run_set=dry_run_set, - sql=request.sql, + # Issue #160 §4.3.3 — concurrent-slot guard MUST wrap the actual + # `analytics.execute(request.sql)` call (which is what triggers the + # BQ scan when DuckDB resolves the master view), not just the + # dry-run. Devin Review on PR #168 caught this — earlier + # implementation released the slot before execute. Use a context + # manager so dry-run + cap check + execute + record_bytes all run + # inside the slot. + user_id = user.get("id") or user.get("email") or "anon" + guard = ( + _bq_quota_and_cap_guard( + user_id=user_id, dry_run_set=dry_run_set, sql=request.sql, ) + if dry_run_set + else contextlib.nullcontext() + ) + with guard: + # Open in read-only mode for extra safety + result = analytics.execute(request.sql).fetchmany(request.limit + 1) + columns = [desc[0] for desc in analytics.description] if analytics.description else [] + truncated = len(result) > request.limit + rows = result[:request.limit] - # Open in read-only mode for extra safety - result = analytics.execute(request.sql).fetchmany(request.limit + 1) - columns = [desc[0] for desc in analytics.description] if analytics.description else [] - truncated = len(result) > request.limit - rows = result[:request.limit] - - # Post-flight: bill the dry-run estimate against the user's daily - # quota. Do this AFTER execute so a downstream failure (e.g. BQ - # outage) doesn't strand the user with charged-but-unrun bytes. - if dry_run_set: - user_id = user.get("id") or user.get("email") or "anon" - try: - _build_quota_tracker().record_bytes( - user_id, sum(b for _, _, b in dry_run_set), - ) - except Exception: - # record_bytes is documented as never-raising; defensive guard. - logger.warning("quota record_bytes failed for user=%s", user_id) + # Post-flight: bill the dry-run estimate against the user's daily + # quota. Do this AFTER execute so a downstream failure (e.g. BQ + # outage) doesn't strand the user with charged-but-unrun bytes. + # Stays inside the `with quota.acquire(...)` block so the slot + # release happens after record_bytes completes. + if dry_run_set: + try: + _build_quota_tracker().record_bytes( + user_id, sum(b for _, _, b in dry_run_set), + ) + except Exception: + # record_bytes is documented as never-raising; defensive guard. + logger.warning("quota record_bytes failed for user=%s", user_id) # Convert to serializable types serializable_rows = [] @@ -360,14 +371,27 @@ def _bq_guardrail_inputs( return dry_run, None -def _enforce_remote_bq_quota_and_cap(*, user_id: str, dry_run_set: list, sql: str) -> None: +@contextlib.contextmanager +def _bq_quota_and_cap_guard(*, user_id: str, dry_run_set: list, sql: str): """Pre-flight check + dry-run + cap enforcement for /api/query BQ paths. + Context-manager shape (Devin Review #5 on PR #168). Earlier implementation + ran the dry-run + cap check inside `with quota.acquire(user_id):`, then + returned — releasing the concurrent slot BEFORE the actual BQ-touching + `analytics.execute(...)` ran. Spec §4.3.3 wants execute to be inside the + slot so the per-user concurrent cap actually limits BQ scans, not just + dry-runs. + + Now: the helper is a context manager that yields after the cap check. + The caller's `with` block holds the slot through both dry-run AND the + subsequent `analytics.execute(...)` until the body exits. + 1. `check_daily_budget` — over-cap users get 429 BEFORE any BQ work. - 2. `with quota.acquire(user_id)` — concurrent slot guard. - 3. Dry-run each `(bucket, source_table)` via the existing - `_bq_dry_run_bytes` helper. Sum bytes. - 4. If sum > cap → 400 `remote_scan_too_large` with structured detail. + 2. `quota.acquire(user_id)` opened — concurrent-slot held throughout. + 3. Dry-run each `(bucket, source_table)` via `_bq_dry_run_bytes`. + 4. If sum > cap → 400 `remote_scan_too_large`. + 5. Yield. Caller runs `analytics.execute(...)` + `record_bytes(...)`. + 6. On exit, slot released. Mutates `dry_run_set` in place: the third tuple element (bytes) is populated with the per-path dry-run result so the caller can sum and @@ -424,3 +448,7 @@ def _enforce_remote_bq_quota_and_cap(*, user_id: str, dry_run_set: list, sql: st "the snapshot locally." ), }) + + # Yield control to the handler — slot stays acquired while the + # caller runs analytics.execute() + record_bytes(). + yield total_bytes diff --git a/app/web/templates/admin_server_config.html b/app/web/templates/admin_server_config.html index 1dbf1c4..a3efa09 100644 --- a/app/web/templates/admin_server_config.html +++ b/app/web/templates/admin_server_config.html @@ -322,9 +322,13 @@ function renderLeafInput(fieldId, section, pathSegments, kind, value, opts, isUn // surface the access.py:339-340 billing→data fallback in the UI. let placeholderAttr = ""; if (isUnset && opts && opts.spec && Array.isArray(opts.spec.placeholder_from)) { + // `original` is the full GET /api/admin/server-config response shape: + // {sections: {data_source: ...}, editable_sections: [...], ...}. + // `placeholder_from` is a section-relative path (e.g. ["data_source", + // "bigquery", "project"]) so walk `original.sections` not `original`. const resolved = opts.spec.placeholder_from.reduce( (cur, k) => (cur && typeof cur === "object" ? cur[k] : undefined), - original, + original && original.sections ? original.sections : original, ); if (resolved !== undefined && resolved !== null && resolved !== "") { placeholderAttr = ` placeholder="(defaults to ${escHtml(String(resolved))})"`; diff --git a/app/web/templates/admin_tables.html b/app/web/templates/admin_tables.html index 2e3aa17..75d7f5f 100644 --- a/app/web/templates/admin_tables.html +++ b/app/web/templates/admin_tables.html @@ -894,7 +894,7 @@
Live access: BASE TABLEs query via bq."dataset"."table" (Storage Read API; predicate pushdown). VIEWs and MATERIALIZED_VIEWs query via the BQ jobs API (full-scan estimate; - cost-guarded by max_bytes_per_remote_query). + cost-guarded by bq_max_scan_bytes). da query --remote works for both.
Synced access: handles both table and view transparently — the scheduler runs SELECT * through the jobs API and writes a @@ -1051,7 +1051,7 @@
Live access: BASE TABLEs query via bq."dataset"."table" (Storage Read API; predicate pushdown). VIEWs and MATERIALIZED_VIEWs query via the BQ jobs API (full-scan estimate; - cost-guarded by max_bytes_per_remote_query). + cost-guarded by bq_max_scan_bytes). da query --remote works for both.
Synced access: handles both transparently — the scheduler runs SELECT * through the jobs API and writes a