agnes-the-ai-analyst/tests/test_query_remote_rewrite.py
ZdenekSrotyr b2c1ff143c fix(query): rewrite BQ-backed user SQL via bigquery_query() to enable predicate pushdown
User SQL hitting query_mode='remote' BigQuery rows was 50-100x slower
than the equivalent direct bigquery_query() call because DuckDB's master
view (CREATE VIEW … AS SELECT * FROM bigquery.<ds>.<tbl>) does not push
WHERE/SELECT/LIMIT into BQ in ATTACH-catalog mode. The BQ extension opens
a Storage Read API session over the entire upstream table; on >100M-row
sources this was 70-150s and frequently failed with 'Response too large
to return'.

Extract the existing dry-run rewriter's core (table-name → BQ-native
backtick path) into a shared helper. Add an execution-path rewriter
that wraps the whole user SQL in bigquery_query('<project>', '<inner>')
so the BQ planner sees the full query and engages partition pruning +
projection pushdown server-side.

Conservative fall-through: cross-source JOINs (BQ ↔ Keboola/Jira local),
queries already containing bigquery_query(, and unconfigured BQ project
all skip the rewrite and run the original SQL via ATTACH-catalog so
behavior degrades gracefully.
2026-05-06 13:02:34 +02:00

444 lines
16 KiB
Python

"""Unit tests for ``_rewrite_user_sql_for_bigquery_query``.
The helper rewrites user SQL referencing query_mode='remote' BigQuery
tables so the entire query ships to BQ via the DuckDB BQ extension's
``bigquery_query(<project>, <sql>)`` UDF — engaging WHERE / SELECT /
LIMIT predicate pushdown instead of falling through to ATTACH-catalog
mode (which opens a Storage Read API session over the whole table).
These tests pin down each conservative-skip rule plus the happy-path
rewrites. Edge cases (CTE shadowing, double-wrap, mixed-source JOIN)
are intentionally explicit so a future refactor doesn't quietly
loosen the guard.
"""
from __future__ import annotations
import pytest
# ---------------------------------------------------------------------------
# Test infrastructure: an in-memory DuckDB seeded with table_registry rows
# matching the shapes the production registry produces. Avoids the full app
# bootstrap path; the rewriter only needs ``conn.execute("SELECT * FROM
# table_registry ...")`` to resolve names.
# ---------------------------------------------------------------------------
@pytest.fixture
def seeded_registry(tmp_path, monkeypatch):
"""Build a fresh ``system.duckdb`` in tmp_path with the schema migrated.
Returns the open connection so tests can pass it to the rewriter.
Cleanup is automatic via tmp_path teardown — but we close the
open singleton handle first so a different DATA_DIR in the next
test doesn't see the previous tmp's lock.
"""
from src.db import get_system_db, close_system_db
monkeypatch.setenv("DATA_DIR", str(tmp_path))
(tmp_path / "state").mkdir(parents=True, exist_ok=True)
close_system_db()
conn = get_system_db()
yield conn
close_system_db()
def _register_bq_remote(conn, *, table_id, name, bucket, source_table):
from src.repositories.table_registry import TableRegistryRepository
TableRegistryRepository(conn).register(
id=table_id,
name=name,
source_type="bigquery",
bucket=bucket,
source_table=source_table,
query_mode="remote",
)
def _register_local(conn, *, table_id, name, source_type="keboola"):
from src.repositories.table_registry import TableRegistryRepository
TableRegistryRepository(conn).register(
id=table_id,
name=name,
source_type=source_type,
bucket="bkt",
source_table=name,
query_mode="local",
)
def _set_bq_project(monkeypatch, project="test-prj"):
"""Stub get_bq_access so the rewriter sees a real-looking project ID."""
from connectors.bigquery.access import BqAccess, BqProjects, get_bq_access
bq = BqAccess(
BqProjects(billing=project, data=project),
client_factory=lambda projects: object(),
)
monkeypatch.setattr(
"app.api.query.get_bq_access",
lambda: bq,
raising=False,
)
get_bq_access.cache_clear()
# ---------------------------------------------------------------------------
# Happy-path rewrites
# ---------------------------------------------------------------------------
def test_simple_select_where_against_one_bq_table_rewrites(seeded_registry, monkeypatch):
"""Single-table SELECT-WHERE against a registered BQ remote row →
full SQL wrapped in ``bigquery_query('project', '<rewritten>')``.
The bare-name reference gets translated to BQ-native backtick form."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"SELECT count(*) FROM ue WHERE event_date = '2026-01-01'",
seeded_registry,
)
assert did_rewrite is True
# Outer wrap must be a single bigquery_query() FROM-source.
assert "bigquery_query(" in rewritten
assert "test-prj" in rewritten
# Inner SQL: bare name rewritten to backticked BQ-native path.
assert "`test-prj.fin.ue`" in rewritten
# WHERE predicate is preserved (single-quote-doubled for embedding
# inside the outer string literal — standard SQL escaping that
# both DuckDB and BQ honor).
assert "event_date = ''2026-01-01''" in rewritten
def test_direct_bq_path_rewrites(seeded_registry, monkeypatch):
"""User wrote the direct ``bq."ds"."tbl"`` form. The rewriter must
still translate to BQ-native backtick form before wrapping."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
'SELECT * FROM bq."fin"."ue" LIMIT 10',
seeded_registry,
)
assert did_rewrite is True
assert "bigquery_query(" in rewritten
assert "`test-prj.fin.ue`" in rewritten
# Original duckdb-flavor path must NOT remain (it'd parse-fail under BQ).
assert 'bq."fin"."ue"' not in rewritten
def test_cte_referencing_bq_table_rewrites_inside_cte(seeded_registry, monkeypatch):
"""A WITH clause whose body references a BQ table must rewrite that
inner reference; the wrapping happens at the top level so BQ sees a
valid BQ-flavor CTE."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.orders", name="orders",
bucket="fin", source_table="orders")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"WITH x AS (SELECT id FROM orders WHERE total > 0) SELECT count(*) FROM x",
seeded_registry,
)
assert did_rewrite is True
# Inner reference is rewritten.
assert "`test-prj.fin.orders`" in rewritten
# The whole thing is wrapped — bigquery_query is the outermost FROM.
assert rewritten.lower().count("bigquery_query(") == 1
def test_subquery_referencing_bq_table_rewrites(seeded_registry, monkeypatch):
"""Subquery in FROM position — same handling as a CTE: rewrite the
inner table reference, wrap the whole at the top."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"SELECT s.cnt FROM (SELECT count(*) AS cnt FROM ue) s",
seeded_registry,
)
assert did_rewrite is True
assert "`test-prj.fin.ue`" in rewritten
assert rewritten.lower().count("bigquery_query(") == 1
def test_multiple_bq_tables_one_project_combine(seeded_registry, monkeypatch):
"""Two registered BQ tables in the same project → single
``bigquery_query()`` wraps the whole SQL with both refs rewritten
inline. No separate parallel calls."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.orders", name="orders",
bucket="fin", source_table="orders")
_register_bq_remote(seeded_registry, table_id="bq.fin.users", name="users",
bucket="fin", source_table="users")
_set_bq_project(monkeypatch, "test-prj")
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
"SELECT u.id, count(o.id) "
"FROM users u JOIN orders o ON u.id = o.user_id "
"GROUP BY u.id",
seeded_registry,
)
assert did_rewrite is True
# Both rewritten.
assert "`test-prj.fin.users`" in rewritten
assert "`test-prj.fin.orders`" in rewritten
# Single wrap.
assert rewritten.lower().count("bigquery_query(") == 1
# ---------------------------------------------------------------------------
# Conservative-skip cases
# ---------------------------------------------------------------------------
def test_join_bq_to_local_skips_rewrite(seeded_registry, monkeypatch):
"""A JOIN between a BQ table and a local-mode (Keboola/Jira) table
is a cross-source query — wrapping it in bigquery_query() would lose
the local table. The rewriter must fall through to the ATTACH-catalog
path (slow but correct).
"""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_register_local(seeded_registry, table_id="kbc.in.local_orders",
name="local_orders")
_set_bq_project(monkeypatch, "test-prj")
user_sql = (
"SELECT u.id, lo.total "
"FROM ue u JOIN local_orders lo ON u.id = lo.user_id"
)
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql # untouched
def test_no_bq_tables_passes_through(seeded_registry, monkeypatch):
"""User SQL referencing only local-source tables → no rewrite,
no log spam, original SQL returned."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_local(seeded_registry, table_id="kbc.in.orders", name="orders")
_set_bq_project(monkeypatch, "test-prj")
user_sql = "SELECT * FROM orders WHERE id = 1"
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql
def test_already_contains_bigquery_query_passes_through(seeded_registry, monkeypatch):
"""User SQL already calls bigquery_query() — never double-wrap.
Note: the /api/query endpoint blocks ``bigquery_query`` in user SQL
via the keyword denylist, so this scenario can't reach the rewriter
in production today. Defensive guard for callers from other paths.
"""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
_set_bq_project(monkeypatch, "test-prj")
user_sql = (
"SELECT * FROM bigquery_query('test-prj', 'SELECT * FROM `test-prj.fin.ue`')"
)
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql
def test_unconfigured_bq_project_skips(seeded_registry, monkeypatch):
"""If get_bq_access() is the not-configured sentinel (data=''),
don't rewrite — there's no project to fill into bigquery_query()."""
from app.api.query import _rewrite_user_sql_for_bigquery_query
_register_bq_remote(seeded_registry, table_id="bq.fin.ue", name="ue",
bucket="fin", source_table="ue")
# Override to sentinel (empty data project).
from connectors.bigquery.access import BqAccess, BqProjects, get_bq_access
monkeypatch.setattr(
"app.api.query.get_bq_access",
lambda: BqAccess(BqProjects(billing="", data="")),
raising=False,
)
get_bq_access.cache_clear()
user_sql = "SELECT * FROM ue"
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
user_sql, seeded_registry,
)
assert did_rewrite is False
assert rewritten == user_sql
# ---------------------------------------------------------------------------
# Backwards-compat: dry-run helper still available + behaves the same
# ---------------------------------------------------------------------------
def test_existing_dry_run_helper_still_callable():
"""The original ``_rewrite_user_sql_for_bq_dry_run`` is now a thin
wrapper around the shared core rewriter (Pass 1 + Pass 2). Callers
that pass an explicit ``project`` argument keep working unchanged.
"""
from app.api.query import _rewrite_user_sql_for_bq_dry_run
rewritten = _rewrite_user_sql_for_bq_dry_run(
sql="SELECT * FROM ue",
name_lookups=[("ue", "fin", "ue")],
project="some-prj",
)
assert "`some-prj.fin.ue`" in rewritten
# The dry-run helper does NOT add a bigquery_query() wrapper; that's
# only the new execution-path helper's job.
assert "bigquery_query(" not in rewritten
# ---------------------------------------------------------------------------
# End-to-end: the /api/query handler must invoke the rewriter and execute
# the rewritten SQL (not the original) when there's a BQ-remote table.
# ---------------------------------------------------------------------------
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def _register_bq_remote_row(name: str, bucket: str, source_table: str) -> None:
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
sys_conn = get_system_db()
try:
TableRegistryRepository(sys_conn).register(
id=f"bq.{bucket}.{source_table}",
name=name,
source_type="bigquery",
bucket=bucket,
source_table=source_table,
query_mode="remote",
)
finally:
sys_conn.close()
@pytest.fixture
def stub_bq_for_endpoint(monkeypatch):
"""Stub _bq_dry_run_bytes + get_bq_access at the endpoint level so the
cap-guard sees a real-looking BQ project but doesn't issue real RPCs.
"""
monkeypatch.setattr(
"app.api.query._bq_dry_run_bytes",
lambda *a, **k: 1024, # tiny — pass cap
raising=False,
)
class _FakeProjects:
data = "test-data-prj"
billing = "test-billing-prj"
class _FakeBqAccess:
projects = _FakeProjects()
monkeypatch.setattr(
"app.api.query.get_bq_access",
lambda: _FakeBqAccess(),
raising=False,
)
def test_endpoint_executes_rewritten_sql_against_analytics(
seeded_app, stub_bq_for_endpoint, monkeypatch,
):
"""The /api/query handler must call ``analytics.execute(rewritten_sql)``
NOT the user's original SQL — when a BQ-remote table is referenced.
Capture what reaches DuckDB and assert the bigquery_query() wrap is
present.
"""
_register_bq_remote_row("ue", "fin", "ue")
# Capture analytics.execute calls. The handler does
# `analytics = get_analytics_db_readonly(); analytics.execute(sql)`,
# so we patch the connection factory to return a stub.
captured = {"sql": None}
class _StubAnalytics:
description = [("c0",)]
def execute(self, sql, *args, **kwargs):
captured["sql"] = sql
class _R:
def fetchmany(self, _n):
return []
return _R()
def close(self):
pass
monkeypatch.setattr(
"app.api.query.get_analytics_db_readonly",
lambda: _StubAnalytics(),
raising=False,
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/query",
json={"sql": "SELECT count(*) FROM ue WHERE country = 'CZ'"},
headers=_auth(token),
)
assert r.status_code == 200, r.json()
sent = captured["sql"]
assert sent is not None, "analytics.execute was never called"
assert "bigquery_query(" in sent, (
f"endpoint did not wrap user SQL in bigquery_query(); sent: {sent!r}"
)
assert "test-data-prj" in sent
assert "`test-data-prj.fin.ue`" in sent
def test_endpoint_passes_original_sql_when_no_bq_table(
seeded_app, stub_bq_for_endpoint, monkeypatch,
):
"""For queries that don't touch any BQ-remote registered name, the
handler must pass the original SQL through unchanged the
ATTACH-catalog path handles local-source tables natively and any
rewrite would be wasted work."""
captured = {"sql": None}
class _StubAnalytics:
description = [("c0",)]
def execute(self, sql, *args, **kwargs):
captured["sql"] = sql
class _R:
def fetchmany(self, _n):
return []
return _R()
def close(self):
pass
monkeypatch.setattr(
"app.api.query.get_analytics_db_readonly",
lambda: _StubAnalytics(),
raising=False,
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
user_sql = "SELECT 1 AS x"
r = c.post("/api/query", json={"sql": user_sql}, headers=_auth(token))
assert r.status_code == 200, r.json()
assert captured["sql"] == user_sql
assert "bigquery_query(" not in captured["sql"]