merge: server-side perf (BQ rewrite + session pool + error mapping)

This commit is contained in:
ZdenekSrotyr 2026-05-06 13:13:48 +02:00
commit c96ea3ad49
6 changed files with 1275 additions and 41 deletions

View file

@ -10,6 +10,48 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [Unreleased] ## [Unreleased]
### Performance
- **DuckDB BigQuery-extension session pool**
(`connectors/bigquery/access.py`). `BqAccess.duckdb_session()` now acquires
pre-warmed connections from a bounded process-local pool instead of running
`INSTALL bigquery; LOAD bigquery; CREATE SECRET; ATTACH …` on every request.
Each acquire saves the ~0.5 s extension-load + secret-creation cost when
the pool has a warm entry; auth SECRET is refreshed on acquire so a
long-lived pooled entry doesn't keep a stale GCE metadata token past its
TTL. Pool size is configurable via `data_source.bigquery.session_pool_size`
(default 4; sentinel `0` disables pooling). Affects every BQ-touching path
`/api/query`, `/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`,
materialize, and the orchestrator's remote-attach.
### Fixed
- **BigQuery `responseTooLarge` no longer surfaces as a generic 400 / 502 with
the raw upstream message** (`connectors/bigquery/access.py`). The
`translate_bq_error` helper now classifies "Response too large to return"
errors via a dedicated `bq_response_too_large` kind (HTTP 400) with an
actionable hint pointing at the WHERE / aggregation / materialized-table
remediations. Pre-fix this failure mode fell through to the generic
`bq_bad_request` mapping, which implied the user's SQL had a syntax error
— wrong root cause. Affects every BQ-touching path (`/api/query`,
`/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`, materialize) since
they all share `translate_bq_error`.
- **`/api/query` (and `agnes query --remote`) now rewrites user SQL referencing
`query_mode='remote'` BigQuery rows into a single `bigquery_query()` call
before execute** (`app/api/query.py`). Pre-fix the master view
(`CREATE VIEW <name> AS SELECT * FROM bigquery.<bucket>.<source_table>`) did
not push WHERE / SELECT / LIMIT into BQ — the DuckDB BQ extension opened a
Storage Read API session over the entire upstream table, scanning the full
partitioned dataset before the local DuckDB filter ran. On 100M+ row
remote-mode tables this was 50-100× slower than the equivalent direct
`bigquery_query()` call (70-150 s vs 1.5 s) and frequently failed with
`Response too large to return`. The rewriter (shared core with the existing
dry-run helper) wraps the user's whole SQL in `bigquery_query('<project>',
'<inner-sql>')` so the BQ planner receives the full query and applies
partition pruning + projection pushdown server-side. Conservative
fall-through: cross-source JOINs (BQ ↔ Keboola/Jira local), queries already
containing `bigquery_query(`, and unconfigured BQ project all keep the
original ATTACH-catalog path so behavior degrades gracefully.
## [0.38.3] — 2026-05-06 ## [0.38.3] — 2026-05-06
### Changed ### Changed

View file

@ -192,8 +192,32 @@ def execute_query(
else contextlib.nullcontext() else contextlib.nullcontext()
) )
with guard: with guard:
# Performance fix: rewrite user SQL referencing BQ-remote tables
# to a single ``bigquery_query()`` call so WHERE / projection /
# LIMIT push into BQ via jobs.query (1-2 s) instead of falling
# through DuckDB's ATTACH-catalog Storage Read API session over
# the full table (often 70-150 s, fails with "Response too
# large to return" on >100M-row sources). Helper returns the
# original SQL unchanged when rewriting would be unsafe
# (cross-source JOIN, no BQ tables referenced, double-wrap).
execution_sql, did_rewrite = _rewrite_user_sql_for_bigquery_query(
request.sql, conn,
)
if did_rewrite:
logger.info(
"query_rewrite_to_bigquery_query: user_id=%s — wrapped "
"SQL in bigquery_query() for BQ predicate pushdown",
user_id,
)
else:
logger.debug(
"query_rewrite_skipped: user_id=%s — running original "
"SQL via ATTACH-catalog path",
user_id,
)
# Open in read-only mode for extra safety # Open in read-only mode for extra safety
result = analytics.execute(request.sql).fetchmany(request.limit + 1) result = analytics.execute(execution_sql).fetchmany(request.limit + 1)
columns = [desc[0] for desc in analytics.description] if analytics.description else [] columns = [desc[0] for desc in analytics.description] if analytics.description else []
truncated = len(result) > request.limit truncated = len(result) > request.limit
rows = result[:request.limit] rows = result[:request.limit]
@ -432,12 +456,11 @@ def _bq_guardrail_inputs(
return dry_run, name_lookups, None return dry_run, name_lookups, None
def _rewrite_user_sql_for_bq_dry_run( def _rewrite_bq_table_refs_to_native(
sql: str, name_lookups: list, project: str, sql: str, name_lookups: list, project: str,
) -> str: ) -> str:
"""Rewrite user SQL from DuckDB-flavor to BQ-native so a single """Core identifier rewrite: DuckDB-flavor table references → BQ-native
`_bq_dry_run_bytes` call can estimate scan size for the EXACT query backtick form. Shared between dry-run and execution-path rewriters.
the user submitted (issue #171).
Two transformations: Two transformations:
@ -463,13 +486,15 @@ def _rewrite_user_sql_for_bq_dry_run(
inside a string literal (e.g. an `IN (...)` value or a `LIKE` pattern) inside a string literal (e.g. an `IN (...)` value or a `LIKE` pattern)
will also be rewritten. This is acceptable because (a) it's vanishingly will also be rewritten. This is acceptable because (a) it's vanishingly
rare to have a string literal exactly matching a registered table name, rare to have a string literal exactly matching a registered table name,
and (b) when it does happen the dry-run errors out and the caller falls and (b) when it does happen the caller's error path covers the case
back to the per-table SELECT * estimate (current behavior, no regression). (dry-run falls back to per-table SELECT * estimate; execution falls
through to the ATTACH-catalog path).
CTE shadowing: a `WITH unit_economics AS (...)` followed by `FROM CTE shadowing: a `WITH unit_economics AS (...)` followed by `FROM
unit_economics` would also rewrite the `FROM` reference. BQ then treats unit_economics` would also rewrite the `FROM` reference. BQ then treats
the CTE as unreferenced (legal) and the dry-run estimates the rewritten the CTE as unreferenced (legal) and the rewriter's caller deals with
physical table likely an over-estimate. Same fallback path covers this. the consequence over-estimation for dry-run, fall-through-to-ATTACH
via BQ parse error for execution.
""" """
out = sql out = sql
@ -509,6 +534,172 @@ def _rewrite_user_sql_for_bq_dry_run(
return out return out
def _rewrite_user_sql_for_bq_dry_run(
sql: str, name_lookups: list, project: str,
) -> str:
"""Rewrite user SQL from DuckDB-flavor to BQ-native so a single
`_bq_dry_run_bytes` call can estimate scan size for the EXACT query
the user submitted (issue #171). Thin wrapper around the shared
core; kept as a stable name for callers in /api/query's cap-guard.
"""
return _rewrite_bq_table_refs_to_native(sql, name_lookups, project)
def _rewrite_user_sql_for_bigquery_query(
user_sql: str, conn: duckdb.DuckDBPyConnection,
) -> tuple[str, bool]:
"""Rewrite user SQL so the entire query ships to BQ as a single
``bigquery_query(<project>, <inner-sql>)`` call.
Returns ``(rewritten_sql, did_rewrite)``. When ``did_rewrite`` is
``False``, the caller MUST execute the original ``user_sql`` via the
ATTACH-catalog path (slow but correct); the rewriter is conservative
on purpose wrapping cross-source queries in ``bigquery_query()``
would silently lose the local-side data.
Why this matters
----------------
The orchestrator's master view (``CREATE VIEW name AS SELECT * FROM
bigquery.<bucket>.<source_table>``) does not push WHERE / projections
into BQ when DuckDB resolves the query the BQ extension opens a
Storage Read API session over the entire table, which on multi-100M-row
tables is 50-100× slower than letting BQ run the query server-side.
Wrapping the user's SQL in ``bigquery_query('<project>', '<inner>')``
makes the BQ extension issue a ``jobs.query`` instead, with full
predicate pushdown.
Skip rules (returns ``(user_sql, False)``)
------------------------------------------
1. No registered ``query_mode='remote'`` BQ row referenced in the SQL.
Nothing to rewrite original SQL passes through unchanged.
2. User SQL already contains ``bigquery_query(`` never double-wrap.
(The /api/query keyword denylist also blocks this in production;
defensive guard for callers in other contexts.)
3. SQL also references a non-BQ master view (Keboola/Jira local-mode
table). Wrapping would lose those references fall through to
ATTACH-catalog so the cross-source query still runs.
4. ``get_bq_access()`` returns the unconfigured sentinel
(``data == ''``). No project to fill into ``bigquery_query()``.
Edge cases preserved by design
------------------------------
- CTEs / sub-queries referencing BQ tables: the table-name rewrite
happens at every match position, then the whole SQL is wrapped in
one ``bigquery_query()``. BQ supports CTEs, so this works.
- Multiple BQ tables, same project: combined into ONE wrap (single
jobs.query). DuckDB's BQ extension doesn't support multi-project
JOINs in a single ``bigquery_query()`` call today; if/when the
registry grows per-table source_project, this helper would need to
gate on cross-project mixing.
- ``bq."ds"."tbl"`` direct paths: rewritten to BQ-native backticks
via the same shared core as dry-run.
"""
# Skip 2: don't double-wrap. Cheap pre-check before any registry I/O.
if "bigquery_query(" in user_sql.lower():
return user_sql, False
# Find all referenced BQ remote-mode rows (bare-name + direct bq.path).
# Mirrors the non-RBAC parts of `_bq_guardrail_inputs`.
sql_lower = user_sql.lower()
name_lookups: list = []
seen_paths: set = set()
try:
repo = TableRegistryRepository(conn)
bq_rows = repo.list_by_source("bigquery")
all_rows = repo.list_all()
except Exception:
# Registry read failure — let the original SQL run through the
# ATTACH-catalog path. The handler's generic error path will
# surface anything user-visible.
return user_sql, False
for r in bq_rows:
if (r.get("query_mode") or "") != "remote":
continue
bucket = r.get("bucket")
source_table = r.get("source_table")
name = r.get("name")
if not (bucket and source_table and name):
continue
pattern = r'\b' + re.escape(str(name).lower()) + r'\b'
if re.search(pattern, sql_lower):
key = (bucket.lower(), source_table.lower())
if key not in seen_paths:
seen_paths.add(key)
name_lookups.append((str(name), bucket, source_table))
# Direct bq."ds"."tbl" references — pull the registered (bucket,
# source_table) pair so the inner SQL receives a backticked BQ-native
# path. Mismatched / unregistered paths are caught upstream by the
# guardrail; here we just collect the mappings the rewriter needs.
direct_paths: set[tuple[str, str]] = set()
for m in BQ_PATH.finditer(user_sql):
bucket_raw = m.group(1).strip('"')
source_table_raw = m.group(2).strip('"')
direct_paths.add((bucket_raw, source_table_raw))
if not name_lookups and not direct_paths:
# Skip 1: no BQ tables referenced.
return user_sql, False
# Skip 3: cross-source query (BQ + local-mode). If user SQL also
# references a non-BQ master view, we can't push the whole thing to
# BQ — DuckDB needs to do the join.
bq_names_lc = {n.lower() for n, _, _ in name_lookups}
for r in all_rows:
st = (r.get("source_type") or "").lower()
qm = (r.get("query_mode") or "").lower()
if st == "bigquery" and qm == "remote":
continue # already handled
name = r.get("name")
if not name:
continue
name_lc = str(name).lower()
if name_lc in bq_names_lc:
# Same name registered both BQ-remote and local? Pathological;
# skip as a safety measure.
return user_sql, False
if re.search(r'\b' + re.escape(name_lc) + r'\b', sql_lower):
logger.info(
"rewrite_skip_cross_source: user SQL references both "
"BQ-remote and local-mode tables; falling back to "
"ATTACH-catalog path",
)
return user_sql, False
# Skip 4: BQ project not configured.
try:
bq = get_bq_access()
project = bq.projects.data
except Exception:
return user_sql, False
if not project:
return user_sql, False
# Rewrite identifiers, then wrap the whole thing in bigquery_query().
# The DuckDB BQ extension's UDF expects (<billing-project-string>,
# <inner-sql-string>); we use the data project so the inner SQL's
# backticked paths resolve to the same project.
inner_sql = _rewrite_bq_table_refs_to_native(user_sql, name_lookups, project)
# SQL string literal escaping: BQ accepts standard SQL doubled-quote
# escaping inside single-quoted strings. We pass `inner_sql` as a
# parameter to DuckDB's prepared statement at execute-time
# (analytics.execute(sql, [project, inner_sql]) shape) — but the
# current handler runs ``analytics.execute(rewritten_sql)`` with no
# params, so we MUST embed the inner SQL safely. Use parameterised
# form: emit ``bigquery_query(?, ?)`` and signal the params via a
# sentinel? No — the handler treats SQL as opaque. Instead, embed
# using single-quote doubling (standard SQL escape; both DuckDB and
# BQ honor it).
escaped_inner = inner_sql.replace("'", "''")
rewritten = (
f"SELECT * FROM bigquery_query('{project}', '{escaped_inner}')"
)
return rewritten, True
@contextlib.contextmanager @contextlib.contextmanager
def _bq_quota_and_cap_guard( def _bq_quota_and_cap_guard(
*, *,

View file

@ -135,6 +135,13 @@ data_source:
# # view-backed datasets -- bumped to 600 000 ms = 10 min by default. # # view-backed datasets -- bumped to 600 000 ms = 10 min by default.
# # Set 0 to fall through to the extension default. Configurable via # # Set 0 to fall through to the extension default. Configurable via
# # /admin/server-config UI. # # /admin/server-config UI.
# session_pool_size: 4
# # Number of pre-warmed DuckDB+bigquery-extension sessions kept
# # in a process-local pool. Each acquire amortizes the
# # ~0.5 s INSTALL/LOAD/CREATE-SECRET cost across requests; a fresh
# # build only happens when the pool is empty. Default 4. Set 0
# # to disable pooling (every acquire builds + closes a fresh
# # session; matches pre-pool behavior).
# --- OpenMetadata catalog (optional) --- # --- OpenMetadata catalog (optional) ---
# Enriches table and column metadata from OpenMetadata REST API. # Enriches table and column metadata from OpenMetadata REST API.

View file

@ -8,6 +8,8 @@ from __future__ import annotations
import functools import functools
import logging import logging
import threading
from collections import deque
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from typing import Callable, Iterator, Literal from typing import Callable, Iterator, Literal
@ -42,6 +44,12 @@ class BqAccessError(Exception):
"bq_forbidden": 502, # other Forbidden from BQ "bq_forbidden": 502, # other Forbidden from BQ
"bq_bad_request": 400, # 400 from BQ when caller flagged it as client-derived "bq_bad_request": 400, # 400 from BQ when caller flagged it as client-derived
"bq_upstream_error": 502, # all other upstream BQ failures "bq_upstream_error": 502, # all other upstream BQ failures
# `responseTooLarge` is a BQ refusal whose root cause is query shape
# (the user asked for too many rows back inline), not auth or syntax.
# 400 with a specific actionable hint instead of the generic
# bq_bad_request / bq_upstream_error mappings, which surfaced the
# raw BQ message and gave operators no path forward.
"bq_response_too_large": 400,
} }
def __init__(self, kind: str, message: str, details: dict | None = None): def __init__(self, kind: str, message: str, details: dict | None = None):
@ -51,6 +59,43 @@ class BqAccessError(Exception):
super().__init__(message) super().__init__(message)
_RESPONSE_TOO_LARGE_HINT = (
"BigQuery refused to return the result inline; the query exceeded BQ's "
"response size limit. Narrow the WHERE clause, aggregate further, "
"select fewer columns, or query a materialized table that's already "
"been bounded server-side."
)
def _classify_response_too_large(msg: str, projects: BqProjects) -> BqAccessError:
"""Build the `bq_response_too_large` BqAccessError with the canonical
actionable hint and the original BQ message preserved in details for
operator debugging."""
return BqAccessError(
"bq_response_too_large",
_RESPONSE_TOO_LARGE_HINT,
details={
"original": msg,
"billing_project": projects.billing,
"data_project": projects.data,
},
)
def _is_response_too_large(msg: str) -> bool:
"""Detect BQ's `responseTooLarge` failure mode by message substring.
The reason code is stable across HTTP transports (gax.BadRequest from
google-cloud-bigquery, duckdb.IOException from the BQ extension's own
HTTP layer); both surface 'Response too large to return' verbatim in
the message body. Match case-insensitively + tolerate the slight
variant 'response too large' that some surfaces emit without the
'to return' suffix.
"""
ml = msg.lower()
return "response too large" in ml
def translate_bq_error( def translate_bq_error(
e: Exception, e: Exception,
projects: BqProjects, projects: BqProjects,
@ -67,12 +112,24 @@ def translate_bq_error(
2. Forbidden + 'serviceusage' in str(e).lower() 2. Forbidden + 'serviceusage' in str(e).lower()
-> cross_project_forbidden (with hint) -> cross_project_forbidden (with hint)
3. Forbidden -> bq_forbidden 3. Forbidden -> bq_forbidden
4. BadRequest, bad_request_status='client_error' 4. 'response too large' in str(e).lower()
-> bq_response_too_large (HTTP 400, with
actionable hint pointing at WHERE /
aggregate / materialized remediations)
5. BadRequest, bad_request_status='client_error'
-> bq_bad_request (HTTP 400) -> bq_bad_request (HTTP 400)
5. BadRequest, bad_request_status='upstream_error' 6. BadRequest, bad_request_status='upstream_error'
-> bq_upstream_error (HTTP 502) -> bq_upstream_error (HTTP 502)
6. GoogleAPICallError (other) -> bq_upstream_error 7. GoogleAPICallError (other) -> bq_upstream_error
7. Anything else -> RE-RAISED unchanged (don't swallow programmer errors) 8. Anything else -> RE-RAISED unchanged (don't swallow programmer errors)
The `responseTooLarge` mapping (4) sits ahead of the generic BadRequest
cases on purpose: BQ surfaces this failure mode as a 400 with a
specific reason, but the actionable remediation is "shape your query
differently" — not "your SQL has a syntax error" (the typical
bq_bad_request user-facing meaning) and not "BQ is broken"
(bq_upstream_error). Routing it via its own kind keeps the user-facing
message tight + correct.
""" """
if isinstance(e, BqAccessError): if isinstance(e, BqAccessError):
return e return e
@ -106,6 +163,13 @@ def translate_bq_error(
details={"billing_project": projects.billing, "data_project": projects.data}, details={"billing_project": projects.billing, "data_project": projects.data},
) )
# Special-case: `responseTooLarge` arrives as gax.BadRequest (HTTP 400)
# but has a unique reason code with a specific, actionable remediation.
# Catch it BEFORE the generic BadRequest mapping below so it doesn't
# surface as a confusing "bad request" (which implies bad SQL).
if _is_response_too_large(msg):
return _classify_response_too_large(msg, projects)
if isinstance(e, gax.BadRequest): if isinstance(e, gax.BadRequest):
if bad_request_status == "client_error": if bad_request_status == "client_error":
return BqAccessError("bq_bad_request", msg) return BqAccessError("bq_bad_request", msg)
@ -196,15 +260,40 @@ def _default_client_factory(projects: BqProjects):
) )
@contextmanager def _default_pool_size() -> int:
def _default_duckdb_session_factory(projects: BqProjects): """Resolve the BQ DuckDB-extension session pool size from instance.yaml.
"""Yield an in-memory DuckDB conn with bigquery extension loaded + SECRET set
from get_metadata_token(). Auto-cleanup. Translates auth/install failures
to BqAccessError(kind='auth_failed' or 'bq_lib_missing').
Note: `projects.billing` is not used by this factory directly bigquery_query() Reads ``data_source.bigquery.session_pool_size`` (default 4). Sentinel
callers pass it themselves as the first positional arg to identify the billing ``0`` disables pooling (every acquire builds + closes a fresh session;
project. The factory keeps the parameter for symmetry with _default_client_factory. matches pre-pool behavior). Negative / non-numeric values fall back to
the default the pool is a perf optimization, not a correctness
boundary, so an unparseable config shouldn't fail-stop the app.
"""
try:
from app.instance_config import get_value
except Exception:
return 4
raw = get_value("data_source", "bigquery", "session_pool_size", default=4)
try:
n = int(raw) if raw is not None else 4
except (TypeError, ValueError):
logger.warning(
"BQ session_pool_size=%r is not an int; falling back to default 4",
raw,
)
return 4
if n < 0:
return 4
return n
def _build_fresh_bq_session():
"""Build a single fresh in-memory DuckDB conn with the bigquery extension
INSTALL/LOAD'd, the auth SECRET created from get_metadata_token(), and
per-session settings applied. Translates auth / install failures to
BqAccessError. Caller owns the close.
Used internally by the pool; also used directly when pooling is disabled.
""" """
import duckdb # type: ignore import duckdb # type: ignore
from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError
@ -220,22 +309,160 @@ def _default_duckdb_session_factory(projects: BqProjects):
conn = duckdb.connect(":memory:") conn = duckdb.connect(":memory:")
try: try:
conn.execute("INSTALL bigquery FROM community; LOAD bigquery;")
escaped = token.replace("'", "''")
conn.execute(
f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')"
)
except Exception as e:
# Build failed — must close the half-initialised conn, otherwise it
# leaks across the pool's lifetime.
try: try:
conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") conn.close()
escaped = token.replace("'", "''") except Exception:
conn.execute( pass
f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')" raise BqAccessError(
) "bq_lib_missing",
except Exception as e: f"failed to install/load BigQuery DuckDB extension: {e}",
raise BqAccessError( details={"original": str(e)},
"bq_lib_missing", )
f"failed to install/load BigQuery DuckDB extension: {e}", apply_bq_session_settings(conn)
details={"original": str(e)}, return conn
)
apply_bq_session_settings(conn)
def _refresh_bq_secret(conn) -> None:
"""Refresh the auth SECRET on a pooled connection so token rotation
(default GCE metadata token TTL ~1 hr) doesn't break long-lived
pooled entries.
Cheap when the token cache is warm (a few µs). Failures are
non-fatal here the pool's liveness probe + per-acquire build
fallback will catch genuinely-broken entries.
"""
from connectors.bigquery.auth import get_metadata_token
try:
token = get_metadata_token()
escaped = token.replace("'", "''")
conn.execute(
f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')"
)
except Exception as e:
# Bubble up so the pool drops this entry and rebuilds.
raise BqAccessError(
"auth_failed",
f"could not refresh BQ secret on pooled session: {e}",
details={"original": str(e)},
)
def _is_pool_entry_alive(conn) -> bool:
"""Cheap liveness probe — `SELECT 1`. Returns False on any error so
the pool reaper drops the entry and builds a fresh one."""
try:
result = conn.execute("SELECT 1").fetchone()
return result is not None and result[0] == 1
except Exception:
return False
# Module-level pool state. Process-cached (mirrors get_bq_access's lifetime).
# Not fork-safe — single uvicorn worker process is the supported deployment
# shape per CLAUDE.md.
_pool: deque = deque()
_pool_lock = threading.Lock()
def _reset_session_pool_for_tests() -> None:
"""Drop and close every pooled entry. Test helper — production code
should not call this. Exposed so test fixtures + the existing
test_bq_access tests can pin pre-test pool state to empty."""
with _pool_lock:
while _pool:
entry = _pool.popleft()
try:
entry.close()
except Exception:
pass
@contextmanager
def _default_duckdb_session_factory(projects: BqProjects):
"""Yield a pooled in-memory DuckDB conn with bigquery extension loaded
+ SECRET set from get_metadata_token(). Translates auth / install
failures to BqAccessError(kind='auth_failed' or 'bq_lib_missing').
Pooling: amortizes the ~0.5 s INSTALL/LOAD/ATTACH cost across requests
by keeping pre-warmed connections in a bounded deque. Acquire reuses
an existing entry when available (refreshing its auth SECRET so
token rotation doesn't break long-lived entries) and probes liveness
cheaply via ``SELECT 1`` before handing it to the caller. On normal
exit the connection returns to the pool; on exception it's closed
instead (the underlying session may carry dirty state).
Pool size is ``data_source.bigquery.session_pool_size`` (default 4;
sentinel ``0`` disables pooling entirely, matching pre-pool
behavior). Process-cached, not fork-safe.
Note: `projects.billing` is not used by this factory directly bigquery_query()
callers pass it themselves as the first positional arg to identify the billing
project. The factory keeps the parameter for symmetry with _default_client_factory.
"""
pool_size = _default_pool_size()
# Acquire: prefer a warm entry, fall back to fresh build.
conn = None
if pool_size > 0:
while True:
with _pool_lock:
entry = _pool.popleft() if _pool else None
if entry is None:
break
if not _is_pool_entry_alive(entry):
# Reaper: drop broken entries.
try:
entry.close()
except Exception:
pass
continue
try:
# Refresh the auth SECRET so a long-lived pool entry
# doesn't keep a stale token past its TTL. Cheap when
# the token cache is warm.
_refresh_bq_secret(entry)
except BqAccessError:
try:
entry.close()
except Exception:
pass
continue
conn = entry
break
if conn is None:
conn = _build_fresh_bq_session()
try:
yield conn yield conn
finally: except Exception:
conn.close() # Caller saw an exception — the conn may be in a dirty state.
# Don't return to pool; close to release native resources.
try:
conn.close()
except Exception:
pass
raise
else:
# Normal exit — return to pool if there's room.
if pool_size > 0:
with _pool_lock:
if len(_pool) < pool_size:
_pool.append(conn)
return
# Pool disabled or full — close.
try:
conn.close()
except Exception:
pass
def apply_bq_session_settings(conn) -> None: def apply_bq_session_settings(conn) -> None:

View file

@ -1,5 +1,6 @@
"""Tests for connectors/bigquery/access.py — the BqAccess facade.""" """Tests for connectors/bigquery/access.py — the BqAccess facade."""
import pytest import pytest
import threading
class TestBqProjects: class TestBqProjects:
@ -36,6 +37,12 @@ class TestBqAccessError:
"bq_forbidden": 502, "bq_forbidden": 502,
"bq_bad_request": 400, "bq_bad_request": 400,
"bq_upstream_error": 502, "bq_upstream_error": 502,
# User-facing class for "Response too large to return" — an
# upstream BQ refusal, but caused by query shape (too many rows
# to fit in a single jobs.query response) rather than auth or
# syntax. 400 so the user sees an actionable error and not a
# 502 that suggests "BQ is broken".
"bq_response_too_large": 400,
} }
assert BqAccessError.HTTP_STATUS == expected assert BqAccessError.HTTP_STATUS == expected
@ -143,6 +150,70 @@ class TestTranslateBqError:
translate_bq_error(ValueError("not a BQ error"), self.projects, translate_bq_error(ValueError("not a BQ error"), self.projects,
bad_request_status="client_error") bad_request_status="client_error")
def test_response_too_large_via_gax_bad_request(self):
"""BQ ``responseTooLarge`` arrives as ``gax.BadRequest`` (HTTP 400
with a specific `reason` field). Pre-fix this fell through to the
generic ``bq_bad_request`` mapping surfacing as a 400 with the
raw upstream message and no actionable hint. Now it routes to a
dedicated ``bq_response_too_large`` kind whose message tells the
user exactly what to do (narrow WHERE / aggregate / use materialized).
"""
from google.api_core.exceptions import BadRequest
from connectors.bigquery.access import translate_bq_error
e = BadRequest("Response too large to return. Consider setting allowLargeResults to true ...")
result = translate_bq_error(
e, self.projects, bad_request_status="client_error",
)
assert result.kind == "bq_response_too_large", (
f"got {result.kind!r}; expected dedicated mapping for "
"'Response too large' to avoid the generic bq_bad_request 400 "
"with no actionable hint"
)
# User-facing message must point at the actionable remediations,
# not just echo the raw BQ string.
assert "exceeded" in result.message.lower() or "too large" in result.message.lower()
assert "where" in result.message.lower() or "aggregate" in result.message.lower() or "materialized" in result.message.lower()
# Original upstream text preserved in details for operator debugging.
assert "original" in result.details
assert "Response too large" in result.details["original"]
def test_response_too_large_via_duckdb_native_string(self):
"""DuckDB-native exceptions (the BQ extension's C++ HTTP path)
carry the same 'Response too large' marker in plain ``Exception``
messages must classify the same way as the gax.BadRequest case."""
from connectors.bigquery.access import translate_bq_error
e = Exception("HTTP 400: Response too large to return.")
result = translate_bq_error(
e, self.projects, bad_request_status="upstream_error",
)
assert result.kind == "bq_response_too_large"
def test_response_too_large_classification_is_status_independent(self):
"""The mapping must fire regardless of ``bad_request_status``
(some callers route via 'upstream_error', others via 'client_error').
It's the BQ error shape that matters, not who's calling."""
from google.api_core.exceptions import BadRequest
from connectors.bigquery.access import translate_bq_error
e = BadRequest("Response too large to return")
for status in ("client_error", "upstream_error"):
result = translate_bq_error(e, self.projects, bad_request_status=status)
assert result.kind == "bq_response_too_large", (
f"bad_request_status={status!r} routed to {result.kind!r}; "
"expected bq_response_too_large for both"
)
def test_response_too_large_does_not_trigger_on_unrelated_bad_request(self):
"""Other BadRequests (syntax errors, malformed identifiers, …)
must keep going through the generic bq_bad_request mapping only
the 'Response too large' substring triggers the dedicated kind."""
from google.api_core.exceptions import BadRequest
from connectors.bigquery.access import translate_bq_error
e = BadRequest("Syntax error at [1:23] near unexpected token")
result = translate_bq_error(
e, self.projects, bad_request_status="client_error",
)
assert result.kind == "bq_bad_request"
class TestDefaultClientFactory: class TestDefaultClientFactory:
def test_constructs_client_with_billing_project_as_quota(self, monkeypatch): def test_constructs_client_with_billing_project_as_quota(self, monkeypatch):
@ -208,8 +279,21 @@ class TestDefaultClientFactory:
class TestDefaultDuckdbSessionFactory: class TestDefaultDuckdbSessionFactory:
def test_yields_duckdb_conn_with_secret_then_closes(self, monkeypatch): def test_yields_duckdb_conn_with_secret_set_via_pool(self, monkeypatch):
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects """The pool's first acquire on an empty pool runs the full
INSTALL/LOAD/SECRET sequence. After the with-block exits the
connection is RETURNED to the pool (not closed) so the next
acquire amortizes the extension-load cost.
Pre-pool semantics (close-on-exit) are preserved on broken
entries + on the explicit pool-reset path; covered in
TestBqSessionPool.
"""
from connectors.bigquery.access import (
_default_duckdb_session_factory, BqProjects,
_reset_session_pool_for_tests,
)
_reset_session_pool_for_tests()
executed_sql = [] executed_sql = []
@ -218,7 +302,10 @@ class TestDefaultDuckdbSessionFactory:
self.closed = False self.closed = False
def execute(self, sql, params=None): def execute(self, sql, params=None):
executed_sql.append((sql, params)) executed_sql.append((sql, params))
return self class _Result:
def fetchone(self_inner):
return (1,)
return _Result()
def close(self): def close(self):
self.closed = True self.closed = True
@ -228,19 +315,36 @@ class TestDefaultDuckdbSessionFactory:
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn: with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn:
assert conn is fake_conn assert conn is fake_conn
assert fake_conn.closed is True # Pool retains the conn — close happens at pool reset / shutdown.
assert fake_conn.closed is False
# Verify INSTALL/LOAD/SECRET sequence ran # Verify INSTALL/LOAD/SECRET sequence ran
assert any("INSTALL bigquery" in sql for sql, _ in executed_sql) assert any("INSTALL bigquery" in sql for sql, _ in executed_sql)
assert any("LOAD bigquery" in sql for sql, _ in executed_sql) assert any("LOAD bigquery" in sql for sql, _ in executed_sql)
assert any("CREATE OR REPLACE SECRET" in sql and "tok123" in sql for sql, _ in executed_sql) assert any("CREATE OR REPLACE SECRET" in sql and "tok123" in sql for sql, _ in executed_sql)
# Explicit pool reset closes the retained entry.
_reset_session_pool_for_tests()
assert fake_conn.closed is True
def test_closes_on_exception_inside_with_block(self, monkeypatch): def test_closes_on_exception_inside_with_block(self, monkeypatch):
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects """Exceptions inside the with-block leave the underlying conn in
an unknown state (half-completed query, dirty session); the pool
treats it as broken and closes it rather than returning to pool.
"""
from connectors.bigquery.access import (
_default_duckdb_session_factory, BqProjects,
_reset_session_pool_for_tests,
)
_reset_session_pool_for_tests()
class FakeConn: class FakeConn:
closed = False closed = False
def execute(self, *a, **kw): return self def execute(self, *a, **kw):
class _Result:
def fetchone(self_inner):
return (1,)
return _Result()
def close(self): self.closed = True def close(self): self.closed = True
fake_conn = FakeConn() fake_conn = FakeConn()
@ -449,3 +553,222 @@ class TestGetBqAccess:
assert a is b assert a is b
assert isinstance(a, BqAccess) assert isinstance(a, BqAccess)
assert a.projects.billing == "" assert a.projects.billing == ""
# ---------------------------------------------------------------------------
# DuckDB BQ-extension session pool — amortizes the ~0.5 s INSTALL/LOAD/ATTACH
# cost across requests by keeping pre-warmed DuckDB connections in a
# bounded pool. Each acquire reuses an existing connection (refreshing the
# auth SECRET so token rotation doesn't break long-lived entries) instead
# of spinning up a fresh DuckDB+extension load every time.
# ---------------------------------------------------------------------------
class _PoolFakeConn:
"""Fake DuckDB connection that records executed SQL and supports
``close()``. Used across pool tests so we can pin behavior without
booting the real BigQuery extension."""
_serial = 0
def __init__(self):
type(self)._serial += 1
self.id = type(self)._serial
self.closed = False
self.executed: list[str] = []
def execute(self, sql, params=None):
self.executed.append(sql)
# Liveness probe: SELECT 1 returns something fetchable.
class _Result:
def fetchone(self_inner):
return (1,)
def fetchall(self_inner):
return [(1,)]
return _Result()
def close(self):
self.closed = True
@pytest.fixture
def reset_pool(monkeypatch):
"""Reset the BQ session pool singleton between tests so leak-detection
assertions don't carry state."""
from connectors.bigquery import access as bq_access_mod
if hasattr(bq_access_mod, "_reset_session_pool_for_tests"):
bq_access_mod._reset_session_pool_for_tests()
monkeypatch.setattr(
"connectors.bigquery.auth.get_metadata_token",
lambda: "tok-pool",
)
yield
if hasattr(bq_access_mod, "_reset_session_pool_for_tests"):
bq_access_mod._reset_session_pool_for_tests()
class TestBqSessionPool:
def test_pool_reuses_connections_across_acquires(self, monkeypatch, reset_pool):
"""Acquiring a session, releasing, then acquiring again must return
the SAME underlying DuckDB connection no INSTALL/LOAD overhead on
the second request. This is the whole point of the pool."""
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
# Each duckdb.connect() yields a fresh _PoolFakeConn so we can tell
# them apart by id.
connections_made = []
def fake_connect(_path):
c = _PoolFakeConn()
connections_made.append(c)
return c
monkeypatch.setattr("duckdb.connect", fake_connect)
# First acquire: pool is empty, factory builds a new entry.
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1:
id1 = conn1.id
# Second acquire: pool has a warm entry, must hand back the same conn.
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2:
id2 = conn2.id
assert id1 == id2, (
"expected the same pooled connection across two acquires; "
f"got id1={id1}, id2={id2}"
)
# And we must NOT have re-INSTALLed/LOADed the extension on reuse —
# only one duckdb.connect() call ever happened.
assert len(connections_made) == 1, (
f"pool re-built the conn on second acquire; created {len(connections_made)}"
)
def test_pool_size_is_configurable(self, monkeypatch, reset_pool):
"""``data_source.bigquery.session_pool_size`` controls the upper
bound on warm entries. Above the cap, releasing extra entries
closes them rather than retaining."""
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
def fake_get_value(*keys, default=None):
if keys == ("data_source", "bigquery", "session_pool_size"):
return 2 # tiny pool
if keys == ("data_source", "bigquery", "query_timeout_ms"):
return 0 # don't try to SET timeout in tests
return default
monkeypatch.setattr("app.instance_config.get_value", fake_get_value)
monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn())
# Acquire 3 in parallel to force 3 simultaneous entries.
cm1 = _default_duckdb_session_factory(BqProjects(billing="b", data="d"))
c1 = cm1.__enter__()
cm2 = _default_duckdb_session_factory(BqProjects(billing="b", data="d"))
c2 = cm2.__enter__()
cm3 = _default_duckdb_session_factory(BqProjects(billing="b", data="d"))
c3 = cm3.__enter__()
# Release all three. The 3rd release should close the conn since
# the pool already has 2.
cm1.__exit__(None, None, None)
cm2.__exit__(None, None, None)
cm3.__exit__(None, None, None)
# At least one of the three connections must be closed (pool overflow).
closed_count = sum(1 for c in (c1, c2, c3) if c.closed)
assert closed_count >= 1, (
"pool retained more than its configured size; expected at least "
f"one close. closed_count={closed_count}"
)
# Pool retained at most `size` entries, so total live + closed = 3,
# closed >= 1 means pool size <= 2.
assert closed_count == 1
def test_pool_replaces_broken_connection(self, monkeypatch, reset_pool):
"""If a pooled entry's liveness check fails on acquire (the
underlying DuckDB conn was closed externally, BQ extension state
corrupted, etc.), the pool must drop it and build a fresh entry
not hand the broken one to the caller."""
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
# First acquire creates entry #1; we'll then mark it broken.
all_conns: list[_PoolFakeConn] = []
def fake_connect(_path):
c = _PoolFakeConn()
all_conns.append(c)
return c
monkeypatch.setattr("duckdb.connect", fake_connect)
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1:
id1 = conn1.id
# Simulate corruption: make execute() raise on next call.
def broken_execute(*a, **kw):
raise RuntimeError("connection broken")
conn1.execute = broken_execute # type: ignore[assignment]
# Second acquire must skip the broken entry and build a fresh one.
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2:
id2 = conn2.id
assert id1 != id2, (
f"expected a fresh conn after broken-pool reaper; both acquires "
f"returned id={id1}"
)
assert len(all_conns) >= 2
def test_pool_handles_reentrant_acquires_thread_safe(self, monkeypatch, reset_pool):
"""Concurrent acquires from multiple threads must never hand the
same underlying DuckDB conn to two threads at once. The pool's
lock acquires/releases are the load-bearing invariant here.
"""
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn())
active_ids: set = set()
active_lock = threading.Lock()
violations: list = []
def worker():
for _ in range(20):
with _default_duckdb_session_factory(
BqProjects(billing="b", data="d"),
) as conn:
with active_lock:
if conn.id in active_ids:
violations.append(conn.id)
active_ids.add(conn.id)
# Hold briefly to give other threads a chance to race.
time.sleep(0.001)
with active_lock:
active_ids.discard(conn.id)
import time
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not violations, (
f"pool handed the same conn to multiple threads concurrently: "
f"{violations}"
)
def test_pool_does_not_apply_when_factory_is_injected(self, monkeypatch, reset_pool):
"""Test fixtures that inject a custom ``duckdb_session_factory``
(e.g. tests/conftest.py's ``bq_access`` fixture) MUST bypass the
pool entirely otherwise their nullcontext-wrapped fake would
get retained between tests and corrupt downstream assertions.
"""
from connectors.bigquery.access import BqAccess, BqProjects
from contextlib import contextmanager
sentinel = object()
@contextmanager
def custom_factory(_projects):
yield sentinel
bq = BqAccess(
BqProjects(billing="b", data="d"),
duckdb_session_factory=custom_factory,
)
with bq.duckdb_session() as conn:
assert conn is sentinel

View file

@ -0,0 +1,444 @@
"""Unit tests for ``_rewrite_user_sql_for_bigquery_query``.
The helper rewrites user SQL referencing query_mode='remote' BigQuery
tables so the entire query ships to BQ via the DuckDB BQ extension's
``bigquery_query(<project>, <sql>)`` UDF engaging WHERE / SELECT /
LIMIT predicate pushdown instead of falling through to ATTACH-catalog
mode (which opens a Storage Read API session over the whole table).
These tests pin down each conservative-skip rule plus the happy-path
rewrites. Edge cases (CTE shadowing, double-wrap, mixed-source JOIN)
are intentionally explicit so a future refactor doesn't quietly
loosen the guard.
"""
from __future__ import annotations
import pytest
# ---------------------------------------------------------------------------
# Test infrastructure: an in-memory DuckDB seeded with table_registry rows
# matching the shapes the production registry produces. Avoids the full app
# bootstrap path; the rewriter only needs ``conn.execute("SELECT * FROM
# table_registry ...")`` to resolve names.
# ---------------------------------------------------------------------------
@pytest.fixture
def seeded_registry(tmp_path, monkeypatch):
"""Build a fresh ``system.duckdb`` in tmp_path with the schema migrated.
Returns the open connection so tests can pass it to the rewriter.
Cleanup is automatic via tmp_path teardown but we close the
open singleton handle first so a different DATA_DIR in the next
test doesn't see the previous tmp's lock.
"""
from src.db import get_system_db, close_system_db
monkeypatch.setenv("DATA_DIR", str(tmp_path))
(tmp_path / "state").mkdir(parents=True, exist_ok=True)
close_system_db()
conn = get_system_db()
yield conn
close_system_db()
def _register_bq_remote(conn, *, table_id, name, bucket, source_table):
from src.repositories.table_registry import TableRegistryRepository
TableRegistryRepository(conn).register(
id=table_id,
name=name,
source_type="bigquery",
bucket=bucket,
source_table=source_table,
query_mode="remote",
)
def _register_local(conn, *, table_id, name, source_type="keboola"):
from src.repositories.table_registry import TableRegistryRepository
TableRegistryRepository(conn).register(
id=table_id,
name=name,
source_type=source_type,
bucket="bkt",
source_table=name,
query_mode="local",
)
def _set_bq_project(monkeypatch, project="test-prj"):
"""Stub get_bq_access so the rewriter sees a real-looking project ID."""
from connectors.bigquery.access import BqAccess, BqProjects, get_bq_access
bq = BqAccess(
BqProjects(billing=project, data=project),
client_factory=lambda projects: object(),
)
monkeypatch.setattr(
"app.api.query.get_bq_access",
lambda: bq,
raising=False,
)
get_bq_access.cache_clear()
# ---------------------------------------------------------------------------
# Happy-path rewrites
# ---------------------------------------------------------------------------
def test_simple_select_where_against_one_bq_table_rewrites(seeded_registry, monkeypatch):
"""Single-table SELECT-WHERE against a registered BQ remote row →
full SQL wrapped in ``bigquery_query('project', '<rewritten>')``.
The bare-name reference gets translated to BQ-native backtick form."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"SELECT count(*) FROM ue WHERE event_date = '2026-01-01'",
seeded_registry,
)
assert did_rewrite is True
# Outer wrap must be a single bigquery_query() FROM-source.
assert "bigquery_query(" in rewritten
assert "test-prj" in rewritten
# Inner SQL: bare name rewritten to backticked BQ-native path.
assert "`test-prj.fin.ue`" in rewritten
# WHERE predicate is preserved (single-quote-doubled for embedding
# inside the outer string literal — standard SQL escaping that
# both DuckDB and BQ honor).
assert "event_date = ''2026-01-01''" in rewritten
def test_direct_bq_path_rewrites(seeded_registry, monkeypatch):
"""User wrote the direct ``bq."ds"."tbl"`` form. The rewriter must
still translate to BQ-native backtick form before wrapping."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
'SELECT * FROM bq."fin"."ue" LIMIT 10',
seeded_registry,
)
assert did_rewrite is True
assert "bigquery_query(" in rewritten
assert "`test-prj.fin.ue`" in rewritten
# Original duckdb-flavor path must NOT remain (it'd parse-fail under BQ).
assert 'bq."fin"."ue"' not in rewritten
def test_cte_referencing_bq_table_rewrites_inside_cte(seeded_registry, monkeypatch):
"""A WITH clause whose body references a BQ table must rewrite that
inner reference; the wrapping happens at the top level so BQ sees a
valid BQ-flavor CTE."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.orders", name="orders",
bucket="fin", source_table="orders")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"WITH x AS (SELECT id FROM orders WHERE total > 0) SELECT count(*) FROM x",
seeded_registry,
)
assert did_rewrite is True
# Inner reference is rewritten.
assert "`test-prj.fin.orders`" in rewritten
# The whole thing is wrapped — bigquery_query is the outermost FROM.
assert rewritten.lower().count("bigquery_query(") == 1
def test_subquery_referencing_bq_table_rewrites(seeded_registry, monkeypatch):
"""Subquery in FROM position — same handling as a CTE: rewrite the
inner table reference, wrap the whole at the top."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"SELECT s.cnt FROM (SELECT count(*) AS cnt FROM ue) s",
seeded_registry,
)
assert did_rewrite is True
assert "`test-prj.fin.ue`" in rewritten
assert rewritten.lower().count("bigquery_query(") == 1
def test_multiple_bq_tables_one_project_combine(seeded_registry, monkeypatch):
"""Two registered BQ tables in the same project → single
``bigquery_query()`` wraps the whole SQL with both refs rewritten
inline. No separate parallel calls."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.orders", name="orders",
bucket="fin", source_table="orders")
_register_bq_remote(seeded_registry, table_id="bq.fin.users", name="users",
bucket="fin", source_table="users")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"SELECT u.id, count(o.id) "
"FROM users u JOIN orders o ON u.id = o.user_id "
"GROUP BY u.id",
seeded_registry,
)
assert did_rewrite is True
# Both rewritten.
assert "`test-prj.fin.users`" in rewritten
assert "`test-prj.fin.orders`" in rewritten
# Single wrap.
assert rewritten.lower().count("bigquery_query(") == 1
# ---------------------------------------------------------------------------
# Conservative-skip cases
# ---------------------------------------------------------------------------
def test_join_bq_to_local_skips_rewrite(seeded_registry, monkeypatch):
"""A JOIN between a BQ table and a local-mode (Keboola/Jira) table
is a cross-source query wrapping it in bigquery_query() would lose
the local table. The rewriter must fall through to the ATTACH-catalog
path (slow but correct).
"""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_register_local(seeded_registry, table_id="kbc.in.local_orders",
name="local_orders")
_set_bq_project(monkeypatch, "test-prj")
user_sql = (
"SELECT u.id, lo.total "
"FROM ue u JOIN local_orders lo ON u.id = lo.user_id"
)
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql # untouched
def test_no_bq_tables_passes_through(seeded_registry, monkeypatch):
"""User SQL referencing only local-source tables → no rewrite,
no log spam, original SQL returned."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_local(seeded_registry, table_id="kbc.in.orders", name="orders")
_set_bq_project(monkeypatch, "test-prj")
user_sql = "SELECT * FROM orders WHERE id = 1"
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql
def test_already_contains_bigquery_query_passes_through(seeded_registry, monkeypatch):
"""User SQL already calls bigquery_query() — never double-wrap.
Note: the /api/query endpoint blocks ``bigquery_query`` in user SQL
via the keyword denylist, so this scenario can't reach the rewriter
in production today. Defensive guard for callers from other paths.
"""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
user_sql = (
"SELECT * FROM bigquery_query('test-prj', 'SELECT * FROM `test-prj.fin.ue`')"
)
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql
def test_unconfigured_bq_project_skips(seeded_registry, monkeypatch):
"""If get_bq_access() is the not-configured sentinel (data=''),
don't rewrite — there's no project to fill into bigquery_query()."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
# Override to sentinel (empty data project).
from connectors.bigquery.access import BqAccess, BqProjects, get_bq_access
monkeypatch.setattr(
"app.api.query.get_bq_access",
lambda: BqAccess(BqProjects(billing="", data="")),
raising=False,
)
get_bq_access.cache_clear()
user_sql = "SELECT * FROM ue"
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql
# ---------------------------------------------------------------------------
# Backwards-compat: dry-run helper still available + behaves the same
# ---------------------------------------------------------------------------
def test_existing_dry_run_helper_still_callable():
"""The original ``_rewrite_user_sql_for_bq_dry_run`` is now a thin
wrapper around the shared core rewriter (Pass 1 + Pass 2). Callers
that pass an explicit ``project`` argument keep working unchanged.
"""
from app.api.query import _rewrite_user_sql_for_bq_dry_run
rewritten = _rewrite_user_sql_for_bq_dry_run(
sql="SELECT * FROM ue",
name_lookups=[("ue", "fin", "ue")],
project="some-prj",
)
assert "`some-prj.fin.ue`" in rewritten
# The dry-run helper does NOT add a bigquery_query() wrapper; that's
# only the new execution-path helper's job.
assert "bigquery_query(" not in rewritten
# ---------------------------------------------------------------------------
# End-to-end: the /api/query handler must invoke the rewriter and execute
# the rewritten SQL (not the original) when there's a BQ-remote table.
# ---------------------------------------------------------------------------
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None:
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
sys_conn = get_system_db()
try:
TableRegistryRepository(sys_conn).register(
id=f"bq.{bucket}.{source_table}",
name=name,
source_type="bigquery",
bucket=bucket,
source_table=source_table,
query_mode="remote",
)
finally:
sys_conn.close()
@pytest.fixture
def stub_bq_for_endpoint(monkeypatch):
"""Stub _bq_dry_run_bytes + get_bq_access at the endpoint level so the
cap-guard sees a real-looking BQ project but doesn't issue real RPCs.
"""
monkeypatch.setattr(
"app.api.query._bq_dry_run_bytes",
lambda *a, **k: 1024, # tiny — pass cap
raising=False,
)
class _FakeProjects:
data = "test-data-prj"
billing = "test-billing-prj"
class _FakeBqAccess:
projects = _FakeProjects()
monkeypatch.setattr(
"app.api.query.get_bq_access",
lambda: _FakeBqAccess(),
raising=False,
)
def test_endpoint_executes_rewritten_sql_against_analytics(
seeded_app, stub_bq_for_endpoint, monkeypatch,
):
"""The /api/query handler must call ``analytics.execute(rewritten_sql)``
NOT the user's original SQL — when a BQ-remote table is referenced.
Capture what reaches DuckDB and assert the bigquery_query() wrap is
present.
"""
_register_bq_remote_row("ue", "fin", "ue")
# Capture analytics.execute calls. The handler does
# `analytics = get_analytics_db_readonly(); analytics.execute(sql)`,
# so we patch the connection factory to return a stub.
captured = {"sql": None}
class _StubAnalytics:
description = [("c0",)]
def execute(self, sql, *args, **kwargs):
captured["sql"] = sql
class _R:
def fetchmany(self, _n):
return []
return _R()
def close(self):
pass
monkeypatch.setattr(
"app.api.query.get_analytics_db_readonly",
lambda: _StubAnalytics(),
raising=False,
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": "SELECT count(*) FROM ue WHERE country = 'CZ'"},
headers=_auth(token),
)
assert r.status_code == 200, r.json()
sent = captured["sql"]
assert sent is not None, "analytics.execute was never called"
assert "bigquery_query(" in sent, (
f"endpoint did not wrap user SQL in bigquery_query(); sent: {sent!r}"
)
assert "test-data-prj" in sent
assert "`test-data-prj.fin.ue`" in sent
def test_endpoint_passes_original_sql_when_no_bq_table(
seeded_app, stub_bq_for_endpoint, monkeypatch,
):
"""For queries that don't touch any BQ-remote registered name, the
handler must pass the original SQL through unchanged the
ATTACH-catalog path handles local-source tables natively and any
rewrite would be wasted work."""
captured = {"sql": None}
class _StubAnalytics:
description = [("c0",)]
def execute(self, sql, *args, **kwargs):
captured["sql"] = sql
class _R:
def fetchmany(self, _n):
return []
return _R()
def close(self):
pass
monkeypatch.setattr(
"app.api.query.get_analytics_db_readonly",
lambda: _StubAnalytics(),
raising=False,
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
user_sql = "SELECT 1 AS x"
r = c.post("/api/query", json={"sql": user_sql}, headers=_auth(token))
assert r.status_code == 200, r.json()
assert captured["sql"] == user_sql
assert "bigquery_query(" not in captured["sql"]