diff --git a/CHANGELOG.md b/CHANGELOG.md index 56fd2cc..9a598e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,48 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Performance +- **DuckDB BigQuery-extension session pool** + (`connectors/bigquery/access.py`). `BqAccess.duckdb_session()` now acquires + pre-warmed connections from a bounded process-local pool instead of running + `INSTALL bigquery; LOAD bigquery; CREATE SECRET; ATTACH …` on every request. + Each acquire saves the ~0.5 s extension-load + secret-creation cost when + the pool has a warm entry; auth SECRET is refreshed on acquire so a + long-lived pooled entry doesn't keep a stale GCE metadata token past its + TTL. Pool size is configurable via `data_source.bigquery.session_pool_size` + (default 4; sentinel `0` disables pooling). Affects every BQ-touching path + — `/api/query`, `/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`, + materialize, and the orchestrator's remote-attach. + +### Fixed +- **BigQuery `responseTooLarge` no longer surfaces as a generic 400 / 502 with + the raw upstream message** (`connectors/bigquery/access.py`). The + `translate_bq_error` helper now classifies "Response too large to return" + errors via a dedicated `bq_response_too_large` kind (HTTP 400) with an + actionable hint pointing at the WHERE / aggregation / materialized-table + remediations. Pre-fix this failure mode fell through to the generic + `bq_bad_request` mapping, which implied the user's SQL had a syntax error + — wrong root cause. Affects every BQ-touching path (`/api/query`, + `/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`, materialize) since + they all share `translate_bq_error`. + +- **`/api/query` (and `agnes query --remote`) now rewrites user SQL referencing + `query_mode='remote'` BigQuery rows into a single `bigquery_query()` call + before execute** (`app/api/query.py`). Pre-fix the master view + (`CREATE VIEW AS SELECT * FROM bigquery..`) did + not push WHERE / SELECT / LIMIT into BQ — the DuckDB BQ extension opened a + Storage Read API session over the entire upstream table, scanning the full + partitioned dataset before the local DuckDB filter ran. On 100M+ row + remote-mode tables this was 50-100× slower than the equivalent direct + `bigquery_query()` call (70-150 s vs 1.5 s) and frequently failed with + `Response too large to return`. The rewriter (shared core with the existing + dry-run helper) wraps the user's whole SQL in `bigquery_query('', + '')` so the BQ planner receives the full query and applies + partition pruning + projection pushdown server-side. Conservative + fall-through: cross-source JOINs (BQ ↔ Keboola/Jira local), queries already + containing `bigquery_query(`, and unconfigured BQ project all keep the + original ATTACH-catalog path so behavior degrades gracefully. + ## [0.38.3] — 2026-05-06 ### Changed diff --git a/app/api/query.py b/app/api/query.py index a758dc4..473bd39 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -192,8 +192,32 @@ def execute_query( else contextlib.nullcontext() ) with guard: + # Performance fix: rewrite user SQL referencing BQ-remote tables + # to a single ``bigquery_query()`` call so WHERE / projection / + # LIMIT push into BQ via jobs.query (1-2 s) instead of falling + # through DuckDB's ATTACH-catalog Storage Read API session over + # the full table (often 70-150 s, fails with "Response too + # large to return" on >100M-row sources). Helper returns the + # original SQL unchanged when rewriting would be unsafe + # (cross-source JOIN, no BQ tables referenced, double-wrap). + execution_sql, did_rewrite = _rewrite_user_sql_for_bigquery_query( + request.sql, conn, + ) + if did_rewrite: + logger.info( + "query_rewrite_to_bigquery_query: user_id=%s — wrapped " + "SQL in bigquery_query() for BQ predicate pushdown", + user_id, + ) + else: + logger.debug( + "query_rewrite_skipped: user_id=%s — running original " + "SQL via ATTACH-catalog path", + user_id, + ) + # Open in read-only mode for extra safety - result = analytics.execute(request.sql).fetchmany(request.limit + 1) + result = analytics.execute(execution_sql).fetchmany(request.limit + 1) columns = [desc[0] for desc in analytics.description] if analytics.description else [] truncated = len(result) > request.limit rows = result[:request.limit] @@ -432,12 +456,11 @@ def _bq_guardrail_inputs( return dry_run, name_lookups, None -def _rewrite_user_sql_for_bq_dry_run( +def _rewrite_bq_table_refs_to_native( sql: str, name_lookups: list, project: str, ) -> str: - """Rewrite user SQL from DuckDB-flavor to BQ-native so a single - `_bq_dry_run_bytes` call can estimate scan size for the EXACT query - the user submitted (issue #171). + """Core identifier rewrite: DuckDB-flavor table references → BQ-native + backtick form. Shared between dry-run and execution-path rewriters. Two transformations: @@ -463,13 +486,15 @@ def _rewrite_user_sql_for_bq_dry_run( inside a string literal (e.g. an `IN (...)` value or a `LIKE` pattern) will also be rewritten. This is acceptable because (a) it's vanishingly rare to have a string literal exactly matching a registered table name, - and (b) when it does happen the dry-run errors out and the caller falls - back to the per-table SELECT * estimate (current behavior, no regression). + and (b) when it does happen the caller's error path covers the case + (dry-run falls back to per-table SELECT * estimate; execution falls + through to the ATTACH-catalog path). CTE shadowing: a `WITH unit_economics AS (...)` followed by `FROM unit_economics` would also rewrite the `FROM` reference. BQ then treats - the CTE as unreferenced (legal) and the dry-run estimates the rewritten - physical table — likely an over-estimate. Same fallback path covers this. + the CTE as unreferenced (legal) and the rewriter's caller deals with + the consequence — over-estimation for dry-run, fall-through-to-ATTACH + via BQ parse error for execution. """ out = sql @@ -509,6 +534,172 @@ def _rewrite_user_sql_for_bq_dry_run( return out +def _rewrite_user_sql_for_bq_dry_run( + sql: str, name_lookups: list, project: str, +) -> str: + """Rewrite user SQL from DuckDB-flavor to BQ-native so a single + `_bq_dry_run_bytes` call can estimate scan size for the EXACT query + the user submitted (issue #171). Thin wrapper around the shared + core; kept as a stable name for callers in /api/query's cap-guard. + """ + return _rewrite_bq_table_refs_to_native(sql, name_lookups, project) + + +def _rewrite_user_sql_for_bigquery_query( + user_sql: str, conn: duckdb.DuckDBPyConnection, +) -> tuple[str, bool]: + """Rewrite user SQL so the entire query ships to BQ as a single + ``bigquery_query(, )`` call. + + Returns ``(rewritten_sql, did_rewrite)``. When ``did_rewrite`` is + ``False``, the caller MUST execute the original ``user_sql`` via the + ATTACH-catalog path (slow but correct); the rewriter is conservative + on purpose — wrapping cross-source queries in ``bigquery_query()`` + would silently lose the local-side data. + + Why this matters + ---------------- + The orchestrator's master view (``CREATE VIEW name AS SELECT * FROM + bigquery..``) does not push WHERE / projections + into BQ when DuckDB resolves the query — the BQ extension opens a + Storage Read API session over the entire table, which on multi-100M-row + tables is 50-100× slower than letting BQ run the query server-side. + Wrapping the user's SQL in ``bigquery_query('', '')`` + makes the BQ extension issue a ``jobs.query`` instead, with full + predicate pushdown. + + Skip rules (returns ``(user_sql, False)``) + ------------------------------------------ + 1. No registered ``query_mode='remote'`` BQ row referenced in the SQL. + Nothing to rewrite — original SQL passes through unchanged. + 2. User SQL already contains ``bigquery_query(`` — never double-wrap. + (The /api/query keyword denylist also blocks this in production; + defensive guard for callers in other contexts.) + 3. SQL also references a non-BQ master view (Keboola/Jira local-mode + table). Wrapping would lose those references — fall through to + ATTACH-catalog so the cross-source query still runs. + 4. ``get_bq_access()`` returns the unconfigured sentinel + (``data == ''``). No project to fill into ``bigquery_query()``. + + Edge cases preserved by design + ------------------------------ + - CTEs / sub-queries referencing BQ tables: the table-name rewrite + happens at every match position, then the whole SQL is wrapped in + one ``bigquery_query()``. BQ supports CTEs, so this works. + - Multiple BQ tables, same project: combined into ONE wrap (single + jobs.query). DuckDB's BQ extension doesn't support multi-project + JOINs in a single ``bigquery_query()`` call today; if/when the + registry grows per-table source_project, this helper would need to + gate on cross-project mixing. + - ``bq."ds"."tbl"`` direct paths: rewritten to BQ-native backticks + via the same shared core as dry-run. + """ + # Skip 2: don't double-wrap. Cheap pre-check before any registry I/O. + if "bigquery_query(" in user_sql.lower(): + return user_sql, False + + # Find all referenced BQ remote-mode rows (bare-name + direct bq.path). + # Mirrors the non-RBAC parts of `_bq_guardrail_inputs`. + sql_lower = user_sql.lower() + name_lookups: list = [] + seen_paths: set = set() + + try: + repo = TableRegistryRepository(conn) + bq_rows = repo.list_by_source("bigquery") + all_rows = repo.list_all() + except Exception: + # Registry read failure — let the original SQL run through the + # ATTACH-catalog path. The handler's generic error path will + # surface anything user-visible. + return user_sql, False + + for r in bq_rows: + if (r.get("query_mode") or "") != "remote": + continue + bucket = r.get("bucket") + source_table = r.get("source_table") + name = r.get("name") + if not (bucket and source_table and name): + continue + pattern = r'\b' + re.escape(str(name).lower()) + r'\b' + if re.search(pattern, sql_lower): + key = (bucket.lower(), source_table.lower()) + if key not in seen_paths: + seen_paths.add(key) + name_lookups.append((str(name), bucket, source_table)) + + # Direct bq."ds"."tbl" references — pull the registered (bucket, + # source_table) pair so the inner SQL receives a backticked BQ-native + # path. Mismatched / unregistered paths are caught upstream by the + # guardrail; here we just collect the mappings the rewriter needs. + direct_paths: set[tuple[str, str]] = set() + for m in BQ_PATH.finditer(user_sql): + bucket_raw = m.group(1).strip('"') + source_table_raw = m.group(2).strip('"') + direct_paths.add((bucket_raw, source_table_raw)) + + if not name_lookups and not direct_paths: + # Skip 1: no BQ tables referenced. + return user_sql, False + + # Skip 3: cross-source query (BQ + local-mode). If user SQL also + # references a non-BQ master view, we can't push the whole thing to + # BQ — DuckDB needs to do the join. + bq_names_lc = {n.lower() for n, _, _ in name_lookups} + for r in all_rows: + st = (r.get("source_type") or "").lower() + qm = (r.get("query_mode") or "").lower() + if st == "bigquery" and qm == "remote": + continue # already handled + name = r.get("name") + if not name: + continue + name_lc = str(name).lower() + if name_lc in bq_names_lc: + # Same name registered both BQ-remote and local? Pathological; + # skip as a safety measure. + return user_sql, False + if re.search(r'\b' + re.escape(name_lc) + r'\b', sql_lower): + logger.info( + "rewrite_skip_cross_source: user SQL references both " + "BQ-remote and local-mode tables; falling back to " + "ATTACH-catalog path", + ) + return user_sql, False + + # Skip 4: BQ project not configured. + try: + bq = get_bq_access() + project = bq.projects.data + except Exception: + return user_sql, False + if not project: + return user_sql, False + + # Rewrite identifiers, then wrap the whole thing in bigquery_query(). + # The DuckDB BQ extension's UDF expects (, + # ); we use the data project so the inner SQL's + # backticked paths resolve to the same project. + inner_sql = _rewrite_bq_table_refs_to_native(user_sql, name_lookups, project) + + # SQL string literal escaping: BQ accepts standard SQL doubled-quote + # escaping inside single-quoted strings. We pass `inner_sql` as a + # parameter to DuckDB's prepared statement at execute-time + # (analytics.execute(sql, [project, inner_sql]) shape) — but the + # current handler runs ``analytics.execute(rewritten_sql)`` with no + # params, so we MUST embed the inner SQL safely. Use parameterised + # form: emit ``bigquery_query(?, ?)`` and signal the params via a + # sentinel? No — the handler treats SQL as opaque. Instead, embed + # using single-quote doubling (standard SQL escape; both DuckDB and + # BQ honor it). + escaped_inner = inner_sql.replace("'", "''") + rewritten = ( + f"SELECT * FROM bigquery_query('{project}', '{escaped_inner}')" + ) + return rewritten, True + + @contextlib.contextmanager def _bq_quota_and_cap_guard( *, diff --git a/config/instance.yaml.example b/config/instance.yaml.example index c836144..be2c8a0 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -135,6 +135,13 @@ data_source: # # view-backed datasets -- bumped to 600 000 ms = 10 min by default. # # Set 0 to fall through to the extension default. Configurable via # # /admin/server-config UI. + # session_pool_size: 4 + # # Number of pre-warmed DuckDB+bigquery-extension sessions kept + # # in a process-local pool. Each acquire amortizes the + # # ~0.5 s INSTALL/LOAD/CREATE-SECRET cost across requests; a fresh + # # build only happens when the pool is empty. Default 4. Set 0 + # # to disable pooling (every acquire builds + closes a fresh + # # session; matches pre-pool behavior). # --- OpenMetadata catalog (optional) --- # Enriches table and column metadata from OpenMetadata REST API. diff --git a/connectors/bigquery/access.py b/connectors/bigquery/access.py index 48e4e9f..5dee173 100644 --- a/connectors/bigquery/access.py +++ b/connectors/bigquery/access.py @@ -8,6 +8,8 @@ from __future__ import annotations import functools import logging +import threading +from collections import deque from contextlib import contextmanager from dataclasses import dataclass from typing import Callable, Iterator, Literal @@ -42,6 +44,12 @@ class BqAccessError(Exception): "bq_forbidden": 502, # other Forbidden from BQ "bq_bad_request": 400, # 400 from BQ when caller flagged it as client-derived "bq_upstream_error": 502, # all other upstream BQ failures + # `responseTooLarge` is a BQ refusal whose root cause is query shape + # (the user asked for too many rows back inline), not auth or syntax. + # 400 with a specific actionable hint instead of the generic + # bq_bad_request / bq_upstream_error mappings, which surfaced the + # raw BQ message and gave operators no path forward. + "bq_response_too_large": 400, } def __init__(self, kind: str, message: str, details: dict | None = None): @@ -51,6 +59,43 @@ class BqAccessError(Exception): super().__init__(message) +_RESPONSE_TOO_LARGE_HINT = ( + "BigQuery refused to return the result inline; the query exceeded BQ's " + "response size limit. Narrow the WHERE clause, aggregate further, " + "select fewer columns, or query a materialized table that's already " + "been bounded server-side." +) + + +def _classify_response_too_large(msg: str, projects: BqProjects) -> BqAccessError: + """Build the `bq_response_too_large` BqAccessError with the canonical + actionable hint and the original BQ message preserved in details for + operator debugging.""" + return BqAccessError( + "bq_response_too_large", + _RESPONSE_TOO_LARGE_HINT, + details={ + "original": msg, + "billing_project": projects.billing, + "data_project": projects.data, + }, + ) + + +def _is_response_too_large(msg: str) -> bool: + """Detect BQ's `responseTooLarge` failure mode by message substring. + + The reason code is stable across HTTP transports (gax.BadRequest from + google-cloud-bigquery, duckdb.IOException from the BQ extension's own + HTTP layer); both surface 'Response too large to return' verbatim in + the message body. Match case-insensitively + tolerate the slight + variant 'response too large' that some surfaces emit without the + 'to return' suffix. + """ + ml = msg.lower() + return "response too large" in ml + + def translate_bq_error( e: Exception, projects: BqProjects, @@ -67,12 +112,24 @@ def translate_bq_error( 2. Forbidden + 'serviceusage' in str(e).lower() -> cross_project_forbidden (with hint) 3. Forbidden -> bq_forbidden - 4. BadRequest, bad_request_status='client_error' + 4. 'response too large' in str(e).lower() + -> bq_response_too_large (HTTP 400, with + actionable hint pointing at WHERE / + aggregate / materialized remediations) + 5. BadRequest, bad_request_status='client_error' -> bq_bad_request (HTTP 400) - 5. BadRequest, bad_request_status='upstream_error' + 6. BadRequest, bad_request_status='upstream_error' -> bq_upstream_error (HTTP 502) - 6. GoogleAPICallError (other) -> bq_upstream_error - 7. Anything else -> RE-RAISED unchanged (don't swallow programmer errors) + 7. GoogleAPICallError (other) -> bq_upstream_error + 8. Anything else -> RE-RAISED unchanged (don't swallow programmer errors) + + The `responseTooLarge` mapping (4) sits ahead of the generic BadRequest + cases on purpose: BQ surfaces this failure mode as a 400 with a + specific reason, but the actionable remediation is "shape your query + differently" — not "your SQL has a syntax error" (the typical + bq_bad_request user-facing meaning) and not "BQ is broken" + (bq_upstream_error). Routing it via its own kind keeps the user-facing + message tight + correct. """ if isinstance(e, BqAccessError): return e @@ -106,6 +163,13 @@ def translate_bq_error( details={"billing_project": projects.billing, "data_project": projects.data}, ) + # Special-case: `responseTooLarge` arrives as gax.BadRequest (HTTP 400) + # but has a unique reason code with a specific, actionable remediation. + # Catch it BEFORE the generic BadRequest mapping below so it doesn't + # surface as a confusing "bad request" (which implies bad SQL). + if _is_response_too_large(msg): + return _classify_response_too_large(msg, projects) + if isinstance(e, gax.BadRequest): if bad_request_status == "client_error": return BqAccessError("bq_bad_request", msg) @@ -196,15 +260,40 @@ def _default_client_factory(projects: BqProjects): ) -@contextmanager -def _default_duckdb_session_factory(projects: BqProjects): - """Yield an in-memory DuckDB conn with bigquery extension loaded + SECRET set - from get_metadata_token(). Auto-cleanup. Translates auth/install failures - to BqAccessError(kind='auth_failed' or 'bq_lib_missing'). +def _default_pool_size() -> int: + """Resolve the BQ DuckDB-extension session pool size from instance.yaml. - Note: `projects.billing` is not used by this factory directly — bigquery_query() - callers pass it themselves as the first positional arg to identify the billing - project. The factory keeps the parameter for symmetry with _default_client_factory. + Reads ``data_source.bigquery.session_pool_size`` (default 4). Sentinel + ``0`` disables pooling (every acquire builds + closes a fresh session; + matches pre-pool behavior). Negative / non-numeric values fall back to + the default — the pool is a perf optimization, not a correctness + boundary, so an unparseable config shouldn't fail-stop the app. + """ + try: + from app.instance_config import get_value + except Exception: + return 4 + raw = get_value("data_source", "bigquery", "session_pool_size", default=4) + try: + n = int(raw) if raw is not None else 4 + except (TypeError, ValueError): + logger.warning( + "BQ session_pool_size=%r is not an int; falling back to default 4", + raw, + ) + return 4 + if n < 0: + return 4 + return n + + +def _build_fresh_bq_session(): + """Build a single fresh in-memory DuckDB conn with the bigquery extension + INSTALL/LOAD'd, the auth SECRET created from get_metadata_token(), and + per-session settings applied. Translates auth / install failures to + BqAccessError. Caller owns the close. + + Used internally by the pool; also used directly when pooling is disabled. """ import duckdb # type: ignore from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError @@ -220,22 +309,160 @@ def _default_duckdb_session_factory(projects: BqProjects): conn = duckdb.connect(":memory:") try: + conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") + escaped = token.replace("'", "''") + conn.execute( + f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')" + ) + except Exception as e: + # Build failed — must close the half-initialised conn, otherwise it + # leaks across the pool's lifetime. try: - conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") - escaped = token.replace("'", "''") - conn.execute( - f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')" - ) - except Exception as e: - raise BqAccessError( - "bq_lib_missing", - f"failed to install/load BigQuery DuckDB extension: {e}", - details={"original": str(e)}, - ) - apply_bq_session_settings(conn) + conn.close() + except Exception: + pass + raise BqAccessError( + "bq_lib_missing", + f"failed to install/load BigQuery DuckDB extension: {e}", + details={"original": str(e)}, + ) + apply_bq_session_settings(conn) + return conn + + +def _refresh_bq_secret(conn) -> None: + """Refresh the auth SECRET on a pooled connection so token rotation + (default GCE metadata token TTL ~1 hr) doesn't break long-lived + pooled entries. + + Cheap when the token cache is warm (a few µs). Failures are + non-fatal here — the pool's liveness probe + per-acquire build + fallback will catch genuinely-broken entries. + """ + from connectors.bigquery.auth import get_metadata_token + try: + token = get_metadata_token() + escaped = token.replace("'", "''") + conn.execute( + f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')" + ) + except Exception as e: + # Bubble up so the pool drops this entry and rebuilds. + raise BqAccessError( + "auth_failed", + f"could not refresh BQ secret on pooled session: {e}", + details={"original": str(e)}, + ) + + +def _is_pool_entry_alive(conn) -> bool: + """Cheap liveness probe — `SELECT 1`. Returns False on any error so + the pool reaper drops the entry and builds a fresh one.""" + try: + result = conn.execute("SELECT 1").fetchone() + return result is not None and result[0] == 1 + except Exception: + return False + + +# Module-level pool state. Process-cached (mirrors get_bq_access's lifetime). +# Not fork-safe — single uvicorn worker process is the supported deployment +# shape per CLAUDE.md. +_pool: deque = deque() +_pool_lock = threading.Lock() + + +def _reset_session_pool_for_tests() -> None: + """Drop and close every pooled entry. Test helper — production code + should not call this. Exposed so test fixtures + the existing + test_bq_access tests can pin pre-test pool state to empty.""" + with _pool_lock: + while _pool: + entry = _pool.popleft() + try: + entry.close() + except Exception: + pass + + +@contextmanager +def _default_duckdb_session_factory(projects: BqProjects): + """Yield a pooled in-memory DuckDB conn with bigquery extension loaded + + SECRET set from get_metadata_token(). Translates auth / install + failures to BqAccessError(kind='auth_failed' or 'bq_lib_missing'). + + Pooling: amortizes the ~0.5 s INSTALL/LOAD/ATTACH cost across requests + by keeping pre-warmed connections in a bounded deque. Acquire reuses + an existing entry when available (refreshing its auth SECRET so + token rotation doesn't break long-lived entries) and probes liveness + cheaply via ``SELECT 1`` before handing it to the caller. On normal + exit the connection returns to the pool; on exception it's closed + instead (the underlying session may carry dirty state). + + Pool size is ``data_source.bigquery.session_pool_size`` (default 4; + sentinel ``0`` disables pooling entirely, matching pre-pool + behavior). Process-cached, not fork-safe. + + Note: `projects.billing` is not used by this factory directly — bigquery_query() + callers pass it themselves as the first positional arg to identify the billing + project. The factory keeps the parameter for symmetry with _default_client_factory. + """ + pool_size = _default_pool_size() + + # Acquire: prefer a warm entry, fall back to fresh build. + conn = None + if pool_size > 0: + while True: + with _pool_lock: + entry = _pool.popleft() if _pool else None + if entry is None: + break + if not _is_pool_entry_alive(entry): + # Reaper: drop broken entries. + try: + entry.close() + except Exception: + pass + continue + try: + # Refresh the auth SECRET so a long-lived pool entry + # doesn't keep a stale token past its TTL. Cheap when + # the token cache is warm. + _refresh_bq_secret(entry) + except BqAccessError: + try: + entry.close() + except Exception: + pass + continue + conn = entry + break + + if conn is None: + conn = _build_fresh_bq_session() + + try: yield conn - finally: - conn.close() + except Exception: + # Caller saw an exception — the conn may be in a dirty state. + # Don't return to pool; close to release native resources. + try: + conn.close() + except Exception: + pass + raise + else: + # Normal exit — return to pool if there's room. + if pool_size > 0: + with _pool_lock: + if len(_pool) < pool_size: + _pool.append(conn) + return + # Pool disabled or full — close. + try: + conn.close() + except Exception: + pass def apply_bq_session_settings(conn) -> None: diff --git a/tests/test_bq_access.py b/tests/test_bq_access.py index cfa1282..5f64918 100644 --- a/tests/test_bq_access.py +++ b/tests/test_bq_access.py @@ -1,5 +1,6 @@ """Tests for connectors/bigquery/access.py — the BqAccess facade.""" import pytest +import threading class TestBqProjects: @@ -36,6 +37,12 @@ class TestBqAccessError: "bq_forbidden": 502, "bq_bad_request": 400, "bq_upstream_error": 502, + # User-facing class for "Response too large to return" — an + # upstream BQ refusal, but caused by query shape (too many rows + # to fit in a single jobs.query response) rather than auth or + # syntax. 400 so the user sees an actionable error and not a + # 502 that suggests "BQ is broken". + "bq_response_too_large": 400, } assert BqAccessError.HTTP_STATUS == expected @@ -143,6 +150,70 @@ class TestTranslateBqError: translate_bq_error(ValueError("not a BQ error"), self.projects, bad_request_status="client_error") + def test_response_too_large_via_gax_bad_request(self): + """BQ ``responseTooLarge`` arrives as ``gax.BadRequest`` (HTTP 400 + with a specific `reason` field). Pre-fix this fell through to the + generic ``bq_bad_request`` mapping — surfacing as a 400 with the + raw upstream message and no actionable hint. Now it routes to a + dedicated ``bq_response_too_large`` kind whose message tells the + user exactly what to do (narrow WHERE / aggregate / use materialized). + """ + from google.api_core.exceptions import BadRequest + from connectors.bigquery.access import translate_bq_error + e = BadRequest("Response too large to return. Consider setting allowLargeResults to true ...") + result = translate_bq_error( + e, self.projects, bad_request_status="client_error", + ) + assert result.kind == "bq_response_too_large", ( + f"got {result.kind!r}; expected dedicated mapping for " + "'Response too large' to avoid the generic bq_bad_request 400 " + "with no actionable hint" + ) + # User-facing message must point at the actionable remediations, + # not just echo the raw BQ string. + assert "exceeded" in result.message.lower() or "too large" in result.message.lower() + assert "where" in result.message.lower() or "aggregate" in result.message.lower() or "materialized" in result.message.lower() + # Original upstream text preserved in details for operator debugging. + assert "original" in result.details + assert "Response too large" in result.details["original"] + + def test_response_too_large_via_duckdb_native_string(self): + """DuckDB-native exceptions (the BQ extension's C++ HTTP path) + carry the same 'Response too large' marker in plain ``Exception`` + messages — must classify the same way as the gax.BadRequest case.""" + from connectors.bigquery.access import translate_bq_error + e = Exception("HTTP 400: Response too large to return.") + result = translate_bq_error( + e, self.projects, bad_request_status="upstream_error", + ) + assert result.kind == "bq_response_too_large" + + def test_response_too_large_classification_is_status_independent(self): + """The mapping must fire regardless of ``bad_request_status`` + (some callers route via 'upstream_error', others via 'client_error'). + It's the BQ error shape that matters, not who's calling.""" + from google.api_core.exceptions import BadRequest + from connectors.bigquery.access import translate_bq_error + e = BadRequest("Response too large to return") + for status in ("client_error", "upstream_error"): + result = translate_bq_error(e, self.projects, bad_request_status=status) + assert result.kind == "bq_response_too_large", ( + f"bad_request_status={status!r} routed to {result.kind!r}; " + "expected bq_response_too_large for both" + ) + + def test_response_too_large_does_not_trigger_on_unrelated_bad_request(self): + """Other BadRequests (syntax errors, malformed identifiers, …) + must keep going through the generic bq_bad_request mapping — only + the 'Response too large' substring triggers the dedicated kind.""" + from google.api_core.exceptions import BadRequest + from connectors.bigquery.access import translate_bq_error + e = BadRequest("Syntax error at [1:23] near unexpected token") + result = translate_bq_error( + e, self.projects, bad_request_status="client_error", + ) + assert result.kind == "bq_bad_request" + class TestDefaultClientFactory: def test_constructs_client_with_billing_project_as_quota(self, monkeypatch): @@ -208,8 +279,21 @@ class TestDefaultClientFactory: class TestDefaultDuckdbSessionFactory: - def test_yields_duckdb_conn_with_secret_then_closes(self, monkeypatch): - from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + def test_yields_duckdb_conn_with_secret_set_via_pool(self, monkeypatch): + """The pool's first acquire on an empty pool runs the full + INSTALL/LOAD/SECRET sequence. After the with-block exits the + connection is RETURNED to the pool (not closed) so the next + acquire amortizes the extension-load cost. + + Pre-pool semantics (close-on-exit) are preserved on broken + entries + on the explicit pool-reset path; covered in + TestBqSessionPool. + """ + from connectors.bigquery.access import ( + _default_duckdb_session_factory, BqProjects, + _reset_session_pool_for_tests, + ) + _reset_session_pool_for_tests() executed_sql = [] @@ -218,7 +302,10 @@ class TestDefaultDuckdbSessionFactory: self.closed = False def execute(self, sql, params=None): executed_sql.append((sql, params)) - return self + class _Result: + def fetchone(self_inner): + return (1,) + return _Result() def close(self): self.closed = True @@ -228,19 +315,36 @@ class TestDefaultDuckdbSessionFactory: with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn: assert conn is fake_conn - assert fake_conn.closed is True + # Pool retains the conn — close happens at pool reset / shutdown. + assert fake_conn.closed is False # Verify INSTALL/LOAD/SECRET sequence ran assert any("INSTALL bigquery" in sql for sql, _ in executed_sql) assert any("LOAD bigquery" in sql for sql, _ in executed_sql) assert any("CREATE OR REPLACE SECRET" in sql and "tok123" in sql for sql, _ in executed_sql) + # Explicit pool reset closes the retained entry. + _reset_session_pool_for_tests() + assert fake_conn.closed is True + def test_closes_on_exception_inside_with_block(self, monkeypatch): - from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + """Exceptions inside the with-block leave the underlying conn in + an unknown state (half-completed query, dirty session); the pool + treats it as broken and closes it rather than returning to pool. + """ + from connectors.bigquery.access import ( + _default_duckdb_session_factory, BqProjects, + _reset_session_pool_for_tests, + ) + _reset_session_pool_for_tests() class FakeConn: closed = False - def execute(self, *a, **kw): return self + def execute(self, *a, **kw): + class _Result: + def fetchone(self_inner): + return (1,) + return _Result() def close(self): self.closed = True fake_conn = FakeConn() @@ -449,3 +553,222 @@ class TestGetBqAccess: assert a is b assert isinstance(a, BqAccess) assert a.projects.billing == "" + + +# --------------------------------------------------------------------------- +# DuckDB BQ-extension session pool — amortizes the ~0.5 s INSTALL/LOAD/ATTACH +# cost across requests by keeping pre-warmed DuckDB connections in a +# bounded pool. Each acquire reuses an existing connection (refreshing the +# auth SECRET so token rotation doesn't break long-lived entries) instead +# of spinning up a fresh DuckDB+extension load every time. +# --------------------------------------------------------------------------- + + +class _PoolFakeConn: + """Fake DuckDB connection that records executed SQL and supports + ``close()``. Used across pool tests so we can pin behavior without + booting the real BigQuery extension.""" + _serial = 0 + + def __init__(self): + type(self)._serial += 1 + self.id = type(self)._serial + self.closed = False + self.executed: list[str] = [] + + def execute(self, sql, params=None): + self.executed.append(sql) + # Liveness probe: SELECT 1 returns something fetchable. + class _Result: + def fetchone(self_inner): + return (1,) + def fetchall(self_inner): + return [(1,)] + return _Result() + + def close(self): + self.closed = True + + +@pytest.fixture +def reset_pool(monkeypatch): + """Reset the BQ session pool singleton between tests so leak-detection + assertions don't carry state.""" + from connectors.bigquery import access as bq_access_mod + if hasattr(bq_access_mod, "_reset_session_pool_for_tests"): + bq_access_mod._reset_session_pool_for_tests() + monkeypatch.setattr( + "connectors.bigquery.auth.get_metadata_token", + lambda: "tok-pool", + ) + yield + if hasattr(bq_access_mod, "_reset_session_pool_for_tests"): + bq_access_mod._reset_session_pool_for_tests() + + +class TestBqSessionPool: + def test_pool_reuses_connections_across_acquires(self, monkeypatch, reset_pool): + """Acquiring a session, releasing, then acquiring again must return + the SAME underlying DuckDB connection — no INSTALL/LOAD overhead on + the second request. This is the whole point of the pool.""" + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + # Each duckdb.connect() yields a fresh _PoolFakeConn so we can tell + # them apart by id. + connections_made = [] + def fake_connect(_path): + c = _PoolFakeConn() + connections_made.append(c) + return c + monkeypatch.setattr("duckdb.connect", fake_connect) + + # First acquire: pool is empty, factory builds a new entry. + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1: + id1 = conn1.id + + # Second acquire: pool has a warm entry, must hand back the same conn. + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2: + id2 = conn2.id + + assert id1 == id2, ( + "expected the same pooled connection across two acquires; " + f"got id1={id1}, id2={id2}" + ) + # And we must NOT have re-INSTALLed/LOADed the extension on reuse — + # only one duckdb.connect() call ever happened. + assert len(connections_made) == 1, ( + f"pool re-built the conn on second acquire; created {len(connections_made)}" + ) + + def test_pool_size_is_configurable(self, monkeypatch, reset_pool): + """``data_source.bigquery.session_pool_size`` controls the upper + bound on warm entries. Above the cap, releasing extra entries + closes them rather than retaining.""" + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + def fake_get_value(*keys, default=None): + if keys == ("data_source", "bigquery", "session_pool_size"): + return 2 # tiny pool + if keys == ("data_source", "bigquery", "query_timeout_ms"): + return 0 # don't try to SET timeout in tests + return default + + monkeypatch.setattr("app.instance_config.get_value", fake_get_value) + monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn()) + + # Acquire 3 in parallel to force 3 simultaneous entries. + cm1 = _default_duckdb_session_factory(BqProjects(billing="b", data="d")) + c1 = cm1.__enter__() + cm2 = _default_duckdb_session_factory(BqProjects(billing="b", data="d")) + c2 = cm2.__enter__() + cm3 = _default_duckdb_session_factory(BqProjects(billing="b", data="d")) + c3 = cm3.__enter__() + + # Release all three. The 3rd release should close the conn since + # the pool already has 2. + cm1.__exit__(None, None, None) + cm2.__exit__(None, None, None) + cm3.__exit__(None, None, None) + + # At least one of the three connections must be closed (pool overflow). + closed_count = sum(1 for c in (c1, c2, c3) if c.closed) + assert closed_count >= 1, ( + "pool retained more than its configured size; expected at least " + f"one close. closed_count={closed_count}" + ) + # Pool retained at most `size` entries, so total live + closed = 3, + # closed >= 1 means pool size <= 2. + assert closed_count == 1 + + def test_pool_replaces_broken_connection(self, monkeypatch, reset_pool): + """If a pooled entry's liveness check fails on acquire (the + underlying DuckDB conn was closed externally, BQ extension state + corrupted, etc.), the pool must drop it and build a fresh entry — + not hand the broken one to the caller.""" + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + # First acquire creates entry #1; we'll then mark it broken. + all_conns: list[_PoolFakeConn] = [] + def fake_connect(_path): + c = _PoolFakeConn() + all_conns.append(c) + return c + monkeypatch.setattr("duckdb.connect", fake_connect) + + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1: + id1 = conn1.id + # Simulate corruption: make execute() raise on next call. + def broken_execute(*a, **kw): + raise RuntimeError("connection broken") + conn1.execute = broken_execute # type: ignore[assignment] + + # Second acquire must skip the broken entry and build a fresh one. + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2: + id2 = conn2.id + + assert id1 != id2, ( + f"expected a fresh conn after broken-pool reaper; both acquires " + f"returned id={id1}" + ) + assert len(all_conns) >= 2 + + def test_pool_handles_reentrant_acquires_thread_safe(self, monkeypatch, reset_pool): + """Concurrent acquires from multiple threads must never hand the + same underlying DuckDB conn to two threads at once. The pool's + lock acquires/releases are the load-bearing invariant here. + """ + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn()) + + active_ids: set = set() + active_lock = threading.Lock() + violations: list = [] + + def worker(): + for _ in range(20): + with _default_duckdb_session_factory( + BqProjects(billing="b", data="d"), + ) as conn: + with active_lock: + if conn.id in active_ids: + violations.append(conn.id) + active_ids.add(conn.id) + # Hold briefly to give other threads a chance to race. + time.sleep(0.001) + with active_lock: + active_ids.discard(conn.id) + + import time + threads = [threading.Thread(target=worker) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not violations, ( + f"pool handed the same conn to multiple threads concurrently: " + f"{violations}" + ) + + def test_pool_does_not_apply_when_factory_is_injected(self, monkeypatch, reset_pool): + """Test fixtures that inject a custom ``duckdb_session_factory`` + (e.g. tests/conftest.py's ``bq_access`` fixture) MUST bypass the + pool entirely — otherwise their nullcontext-wrapped fake would + get retained between tests and corrupt downstream assertions. + """ + from connectors.bigquery.access import BqAccess, BqProjects + from contextlib import contextmanager + + sentinel = object() + + @contextmanager + def custom_factory(_projects): + yield sentinel + + bq = BqAccess( + BqProjects(billing="b", data="d"), + duckdb_session_factory=custom_factory, + ) + with bq.duckdb_session() as conn: + assert conn is sentinel diff --git a/tests/test_query_remote_rewrite.py b/tests/test_query_remote_rewrite.py new file mode 100644 index 0000000..7d78a3e --- /dev/null +++ b/tests/test_query_remote_rewrite.py @@ -0,0 +1,444 @@ +"""Unit tests for ``_rewrite_user_sql_for_bigquery_query``. + +The helper rewrites user SQL referencing query_mode='remote' BigQuery +tables so the entire query ships to BQ via the DuckDB BQ extension's +``bigquery_query(, )`` UDF — engaging WHERE / SELECT / +LIMIT predicate pushdown instead of falling through to ATTACH-catalog +mode (which opens a Storage Read API session over the whole table). + +These tests pin down each conservative-skip rule plus the happy-path +rewrites. Edge cases (CTE shadowing, double-wrap, mixed-source JOIN) +are intentionally explicit so a future refactor doesn't quietly +loosen the guard. +""" +from __future__ import annotations + +import pytest + + +# --------------------------------------------------------------------------- +# Test infrastructure: an in-memory DuckDB seeded with table_registry rows +# matching the shapes the production registry produces. Avoids the full app +# bootstrap path; the rewriter only needs ``conn.execute("SELECT * FROM +# table_registry ...")`` to resolve names. +# --------------------------------------------------------------------------- + + +@pytest.fixture +def seeded_registry(tmp_path, monkeypatch): + """Build a fresh ``system.duckdb`` in tmp_path with the schema migrated. + + Returns the open connection so tests can pass it to the rewriter. + Cleanup is automatic via tmp_path teardown — but we close the + open singleton handle first so a different DATA_DIR in the next + test doesn't see the previous tmp's lock. + """ + from src.db import get_system_db, close_system_db + + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + (tmp_path / "state").mkdir(parents=True, exist_ok=True) + close_system_db() + conn = get_system_db() + yield conn + close_system_db() + + +def _register_bq_remote(conn, *, table_id, name, bucket, source_table): + from src.repositories.table_registry import TableRegistryRepository + TableRegistryRepository(conn).register( + id=table_id, + name=name, + source_type="bigquery", + bucket=bucket, + source_table=source_table, + query_mode="remote", + ) + + +def _register_local(conn, *, table_id, name, source_type="keboola"): + from src.repositories.table_registry import TableRegistryRepository + TableRegistryRepository(conn).register( + id=table_id, + name=name, + source_type=source_type, + bucket="bkt", + source_table=name, + query_mode="local", + ) + + +def _set_bq_project(monkeypatch, project="test-prj"): + """Stub get_bq_access so the rewriter sees a real-looking project ID.""" + from connectors.bigquery.access import BqAccess, BqProjects, get_bq_access + bq = BqAccess( + BqProjects(billing=project, data=project), + client_factory=lambda projects: object(), + ) + monkeypatch.setattr( + "app.api.query.get_bq_access", + lambda: bq, + raising=False, + ) + get_bq_access.cache_clear() + + +# --------------------------------------------------------------------------- +# Happy-path rewrites +# --------------------------------------------------------------------------- + + +def test_simple_select_where_against_one_bq_table_rewrites(seeded_registry, monkeypatch): + """Single-table SELECT-WHERE against a registered BQ remote row → + full SQL wrapped in ``bigquery_query('project', '')``. + The bare-name reference gets translated to BQ-native backtick form.""" + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue", + bucket="fin", source_table="ue") + _set_bq_project(monkeypatch, "test-prj") + + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + "SELECT count(*) FROM ue WHERE event_date = '2026-01-01'", + seeded_registry, + ) + + assert did_rewrite is True + # Outer wrap must be a single bigquery_query() FROM-source. + assert "bigquery_query(" in rewritten + assert "test-prj" in rewritten + # Inner SQL: bare name rewritten to backticked BQ-native path. + assert "`test-prj.fin.ue`" in rewritten + # WHERE predicate is preserved (single-quote-doubled for embedding + # inside the outer string literal — standard SQL escaping that + # both DuckDB and BQ honor). + assert "event_date = ''2026-01-01''" in rewritten + + +def test_direct_bq_path_rewrites(seeded_registry, monkeypatch): + """User wrote the direct ``bq."ds"."tbl"`` form. The rewriter must + still translate to BQ-native backtick form before wrapping.""" + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue", + bucket="fin", source_table="ue") + _set_bq_project(monkeypatch, "test-prj") + + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + 'SELECT * FROM bq."fin"."ue" LIMIT 10', + seeded_registry, + ) + + assert did_rewrite is True + assert "bigquery_query(" in rewritten + assert "`test-prj.fin.ue`" in rewritten + # Original duckdb-flavor path must NOT remain (it'd parse-fail under BQ). + assert 'bq."fin"."ue"' not in rewritten + + +def test_cte_referencing_bq_table_rewrites_inside_cte(seeded_registry, monkeypatch): + """A WITH clause whose body references a BQ table must rewrite that + inner reference; the wrapping happens at the top level so BQ sees a + valid BQ-flavor CTE.""" + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.orders", name="orders", + bucket="fin", source_table="orders") + _set_bq_project(monkeypatch, "test-prj") + + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + "WITH x AS (SELECT id FROM orders WHERE total > 0) SELECT count(*) FROM x", + seeded_registry, + ) + assert did_rewrite is True + # Inner reference is rewritten. + assert "`test-prj.fin.orders`" in rewritten + # The whole thing is wrapped — bigquery_query is the outermost FROM. + assert rewritten.lower().count("bigquery_query(") == 1 + + +def test_subquery_referencing_bq_table_rewrites(seeded_registry, monkeypatch): + """Subquery in FROM position — same handling as a CTE: rewrite the + inner table reference, wrap the whole at the top.""" + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue", + bucket="fin", source_table="ue") + _set_bq_project(monkeypatch, "test-prj") + + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + "SELECT s.cnt FROM (SELECT count(*) AS cnt FROM ue) s", + seeded_registry, + ) + assert did_rewrite is True + assert "`test-prj.fin.ue`" in rewritten + assert rewritten.lower().count("bigquery_query(") == 1 + + +def test_multiple_bq_tables_one_project_combine(seeded_registry, monkeypatch): + """Two registered BQ tables in the same project → single + ``bigquery_query()`` wraps the whole SQL with both refs rewritten + inline. No separate parallel calls.""" + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.orders", name="orders", + bucket="fin", source_table="orders") + _register_bq_remote(seeded_registry, table_id="bq.fin.users", name="users", + bucket="fin", source_table="users") + _set_bq_project(monkeypatch, "test-prj") + + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + "SELECT u.id, count(o.id) " + "FROM users u JOIN orders o ON u.id = o.user_id " + "GROUP BY u.id", + seeded_registry, + ) + assert did_rewrite is True + # Both rewritten. + assert "`test-prj.fin.users`" in rewritten + assert "`test-prj.fin.orders`" in rewritten + # Single wrap. + assert rewritten.lower().count("bigquery_query(") == 1 + + +# --------------------------------------------------------------------------- +# Conservative-skip cases +# --------------------------------------------------------------------------- + + +def test_join_bq_to_local_skips_rewrite(seeded_registry, monkeypatch): + """A JOIN between a BQ table and a local-mode (Keboola/Jira) table + is a cross-source query — wrapping it in bigquery_query() would lose + the local table. The rewriter must fall through to the ATTACH-catalog + path (slow but correct). + """ + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue", + bucket="fin", source_table="ue") + _register_local(seeded_registry, table_id="kbc.in.local_orders", + name="local_orders") + _set_bq_project(monkeypatch, "test-prj") + + user_sql = ( + "SELECT u.id, lo.total " + "FROM ue u JOIN local_orders lo ON u.id = lo.user_id" + ) + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + user_sql, seeded_registry, + ) + assert did_rewrite is False + assert rewritten == user_sql # untouched + + +def test_no_bq_tables_passes_through(seeded_registry, monkeypatch): + """User SQL referencing only local-source tables → no rewrite, + no log spam, original SQL returned.""" + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_local(seeded_registry, table_id="kbc.in.orders", name="orders") + _set_bq_project(monkeypatch, "test-prj") + + user_sql = "SELECT * FROM orders WHERE id = 1" + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + user_sql, seeded_registry, + ) + assert did_rewrite is False + assert rewritten == user_sql + + +def test_already_contains_bigquery_query_passes_through(seeded_registry, monkeypatch): + """User SQL already calls bigquery_query() — never double-wrap. + + Note: the /api/query endpoint blocks ``bigquery_query`` in user SQL + via the keyword denylist, so this scenario can't reach the rewriter + in production today. Defensive guard for callers from other paths. + """ + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue", + bucket="fin", source_table="ue") + _set_bq_project(monkeypatch, "test-prj") + + user_sql = ( + "SELECT * FROM bigquery_query('test-prj', 'SELECT * FROM `test-prj.fin.ue`')" + ) + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + user_sql, seeded_registry, + ) + assert did_rewrite is False + assert rewritten == user_sql + + +def test_unconfigured_bq_project_skips(seeded_registry, monkeypatch): + """If get_bq_access() is the not-configured sentinel (data=''), + don't rewrite — there's no project to fill into bigquery_query().""" + from app.api.query import _rewrite_user_sql_for_bigquery_query + _register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue", + bucket="fin", source_table="ue") + + # Override to sentinel (empty data project). + from connectors.bigquery.access import BqAccess, BqProjects, get_bq_access + monkeypatch.setattr( + "app.api.query.get_bq_access", + lambda: BqAccess(BqProjects(billing="", data="")), + raising=False, + ) + get_bq_access.cache_clear() + + user_sql = "SELECT * FROM ue" + rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query( + user_sql, seeded_registry, + ) + assert did_rewrite is False + assert rewritten == user_sql + + +# --------------------------------------------------------------------------- +# Backwards-compat: dry-run helper still available + behaves the same +# --------------------------------------------------------------------------- + + +def test_existing_dry_run_helper_still_callable(): + """The original ``_rewrite_user_sql_for_bq_dry_run`` is now a thin + wrapper around the shared core rewriter (Pass 1 + Pass 2). Callers + that pass an explicit ``project`` argument keep working unchanged. + """ + from app.api.query import _rewrite_user_sql_for_bq_dry_run + + rewritten = _rewrite_user_sql_for_bq_dry_run( + sql="SELECT * FROM ue", + name_lookups=[("ue", "fin", "ue")], + project="some-prj", + ) + assert "`some-prj.fin.ue`" in rewritten + # The dry-run helper does NOT add a bigquery_query() wrapper; that's + # only the new execution-path helper's job. + assert "bigquery_query(" not in rewritten + + +# --------------------------------------------------------------------------- +# End-to-end: the /api/query handler must invoke the rewriter and execute +# the rewritten SQL (not the original) when there's a BQ-remote table. +# --------------------------------------------------------------------------- + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None: + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + sys_conn = get_system_db() + try: + TableRegistryRepository(sys_conn).register( + id=f"bq.{bucket}.{source_table}", + name=name, + source_type="bigquery", + bucket=bucket, + source_table=source_table, + query_mode="remote", + ) + finally: + sys_conn.close() + + +@pytest.fixture +def stub_bq_for_endpoint(monkeypatch): + """Stub _bq_dry_run_bytes + get_bq_access at the endpoint level so the + cap-guard sees a real-looking BQ project but doesn't issue real RPCs. + """ + monkeypatch.setattr( + "app.api.query._bq_dry_run_bytes", + lambda *a, **k: 1024, # tiny — pass cap + raising=False, + ) + + class _FakeProjects: + data = "test-data-prj" + billing = "test-billing-prj" + + class _FakeBqAccess: + projects = _FakeProjects() + + monkeypatch.setattr( + "app.api.query.get_bq_access", + lambda: _FakeBqAccess(), + raising=False, + ) + + +def test_endpoint_executes_rewritten_sql_against_analytics( + seeded_app, stub_bq_for_endpoint, monkeypatch, +): + """The /api/query handler must call ``analytics.execute(rewritten_sql)`` + — NOT the user's original SQL — when a BQ-remote table is referenced. + Capture what reaches DuckDB and assert the bigquery_query() wrap is + present. + """ + _register_bq_remote_row("ue", "fin", "ue") + + # Capture analytics.execute calls. The handler does + # `analytics = get_analytics_db_readonly(); analytics.execute(sql)`, + # so we patch the connection factory to return a stub. + captured = {"sql": None} + + class _StubAnalytics: + description = [("c0",)] + def execute(self, sql, *args, **kwargs): + captured["sql"] = sql + class _R: + def fetchmany(self, _n): + return [] + return _R() + def close(self): + pass + + monkeypatch.setattr( + "app.api.query.get_analytics_db_readonly", + lambda: _StubAnalytics(), + raising=False, + ) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT count(*) FROM ue WHERE country = 'CZ'"}, + headers=_auth(token), + ) + assert r.status_code == 200, r.json() + sent = captured["sql"] + assert sent is not None, "analytics.execute was never called" + assert "bigquery_query(" in sent, ( + f"endpoint did not wrap user SQL in bigquery_query(); sent: {sent!r}" + ) + assert "test-data-prj" in sent + assert "`test-data-prj.fin.ue`" in sent + + +def test_endpoint_passes_original_sql_when_no_bq_table( + seeded_app, stub_bq_for_endpoint, monkeypatch, +): + """For queries that don't touch any BQ-remote registered name, the + handler must pass the original SQL through unchanged — the + ATTACH-catalog path handles local-source tables natively and any + rewrite would be wasted work.""" + captured = {"sql": None} + + class _StubAnalytics: + description = [("c0",)] + def execute(self, sql, *args, **kwargs): + captured["sql"] = sql + class _R: + def fetchmany(self, _n): + return [] + return _R() + def close(self): + pass + + monkeypatch.setattr( + "app.api.query.get_analytics_db_readonly", + lambda: _StubAnalytics(), + raising=False, + ) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + user_sql = "SELECT 1 AS x" + r = c.post("/api/query", json={"sql": user_sql}, headers=_auth(token)) + assert r.status_code == 200, r.json() + assert captured["sql"] == user_sql + assert "bigquery_query(" not in captured["sql"]