fix(bq-materialize): wrap admin SQL in bigquery_query() so views work

Pre-fix, materialize ran the admin source_query as 'COPY (sql) TO parquet'
through the DuckDB BQ extension session. The extension defaults to the
BQ Storage Read API for bq.<ds>.<tbl> references, which rejects views
('non-table entities cannot be read with the storage API'). The fix
always wraps admin SQL into bigquery_query('<billing>', '<inner>') so
COPY uses the BQ jobs API uniformly for tables and views.

Cost guardrail dry-run now operates on the inner SQL (BQ-native), so
the BQ Python client parses it and the cap engages — pre-fix the dry-run
hit 'Table-valued function not found: bigquery_query' and fail-opened.
This commit is contained in:
ZdenekSrotyr 2026-05-04 16:40:40 +02:00
parent aa622f2af4
commit d8a2299633
5 changed files with 180 additions and 35 deletions

View file

@ -5,6 +5,7 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension
import logging
import os
import re
import shutil
import threading
from datetime import datetime, timezone
@ -59,6 +60,55 @@ def _detect_table_type(
return row[0] if row else None
_BILLING_PROJECT_RE = re.compile(r"^[a-z][a-z0-9-]{4,28}[a-z0-9]$")
def _escape_sql_string_literal(s: str) -> str:
"""Double every single quote so the result is safe to embed inside a
single-quoted SQL string literal. DuckDB and BigQuery both honor the
SQL standard `''` escape inside `'...'`. Used to wrap admin
source_query into bigquery_query()'s second arg without breaking
the literal envelope."""
return s.replace("'", "''")
def _wrap_admin_sql_for_jobs_api(billing_project: str, inner_sql: str) -> str:
"""Build the COPY-source SQL that runs admin's `inner_sql` through
the BigQuery jobs API via the DuckDB BQ extension's
``bigquery_query()`` table function.
Why: the default `bq."ds"."t"` reference path uses the BQ Storage
Read API which rejects non-base entities (views, materialized views).
Routing through `bigquery_query()` uses the jobs API which accepts
every entity type uniformly.
Args:
billing_project: GCP project ID that bills the BQ job. Must
match the GCP project_id grammar anything else is rejected
as a defense-in-depth check (admin is trusted, but a typo
should fail closed not silently lose budget to the wrong
project).
inner_sql: BigQuery-flavor SQL the admin registered as
``source_query``. Must use BQ syntax (backticks for dashed
identifiers, native function calls). DuckDB-flavor `bq."ds"."t"`
is NOT acceptable here the v24 migration converts existing
rows; new registrations are validated upstream.
Returns:
A DuckDB-parseable SQL fragment suitable as the operand of
``COPY (...) TO 'path' (FORMAT PARQUET)``.
"""
if not _BILLING_PROJECT_RE.match(billing_project):
raise ValueError(
f"billing_project {billing_project!r} is not a valid GCP project_id "
"(grammar: ^[a-z][a-z0-9-]{4,28}[a-z0-9]$)"
)
return (
f"SELECT * FROM bigquery_query('{billing_project}', "
f"'{_escape_sql_string_literal(inner_sql)}')"
)
def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None:
"""Create the _meta table required by the extract.duckdb contract."""
conn.execute("DROP TABLE IF EXISTS _meta")
@ -321,24 +371,24 @@ def materialize_query(
to `<output_dir>/data/<table_id>.parquet` atomically.
Designed for `query_mode='materialized'` table_registry rows. The SQL
is admin-registered (validated upstream) and may reference DuckDB
three-part identifiers (`bq."dataset"."table"`) resolved by the
in-session ATTACH, OR native BQ identifiers via the `bigquery_query()`
table function both work because the session has the bigquery
extension loaded with a SECRET token.
is admin-registered BQ-native SQL (DuckDB-flavor `bq."ds"."t"` refs are
validated upstream). The SQL is wrapped in `bigquery_query('<billing>',
'<inner>')` before the COPY so the BQ extension routes through the BQ
jobs API the default Storage Read API path rejects non-base entities
(views, materialized views) with "non-table entities cannot be read with
the storage API". Routing through `bigquery_query()` works uniformly for
base tables and views alike.
Cost guardrail: when `max_bytes` is a positive int, run a BQ dry-run
via `bq.client()` first; raise `MaterializeBudgetError` if the
estimate exceeds the cap. `max_bytes=None` or `max_bytes <= 0`
disables the guardrail (config sentinel, see
`data_source.bigquery.max_bytes_per_materialize`).
`data_source.bigquery.max_bytes_per_materialize`). The dry-run operates
on the inner `sql` (BQ-native), not the wrapped form.
Dry-run is best-effort and fail-open: if the SQL uses DuckDB syntax
that the native BQ client can't parse (e.g. `bq."ds"."t"`), the
dry-run raises and we log a warning; the COPY still runs. This
matches the BqAccess facade's "client is for native BQ SQL only"
contract operators who need the cap to engage write the registered
SQL using native BQ identifiers (`\\`project.ds.t\\``).
Dry-run is best-effort and fail-open: if the dry-run errors (transient
upstream failure, missing google lib), we log a warning and proceed
with the wrapped COPY.
Atomic write: result lands in `<id>.parquet.tmp` first, then
`os.replace` swaps it in. A failed COPY leaves no partial file behind.
@ -347,7 +397,9 @@ def materialize_query(
table_id: Logical id from table_registry; becomes the parquet
filename. Must pass `validate_identifier()` so it can't
inject path traversal.
sql: SELECT statement, no trailing semicolon.
sql: BQ-native SELECT statement, no trailing semicolon. Wrapped
in `bigquery_query()` before the COPY must not itself
contain a `bigquery_query()` call.
bq: A `BqAccess` instance provides `duckdb_session()` for the
COPY and `client()` for the dry-run.
output_dir: Connector root, e.g. `/data/extracts/bigquery`.
@ -358,7 +410,8 @@ def materialize_query(
{"rows": int, "size_bytes": int, "query_mode": "materialized"}
Raises:
ValueError: if `table_id` is unsafe.
ValueError: if `table_id` is unsafe or `bq.projects.billing` fails
the GCP project_id grammar check.
MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it.
BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing /
not_configured) caller catches and aggregates into the trigger
@ -377,17 +430,20 @@ def materialize_query(
if tmp_path.exists():
tmp_path.unlink()
# Cost guardrail (best-effort — fail-open if dry-run can't parse the SQL).
# Build the wrapped SQL once — both the cost guardrail dry-run and
# the COPY operate on `sql` (the inner BQ SQL); only the COPY needs
# the DuckDB-side bigquery_query() envelope.
billing_project = bq.projects.billing
wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql)
if max_bytes is not None and max_bytes > 0:
try:
from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl
estimated = _bq_dry_run_bytes(bq, sql)
estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native)
except Exception as e:
logger.warning(
"BQ dry-run failed for materialize cost guardrail (fail-open): %s. "
"If the SQL uses DuckDB three-part names like bq.\"ds\".\"t\", "
"rewrite to native BQ identifiers (`project.ds.t`) for the "
"guardrail to engage. Proceeding with COPY.",
"Proceeding with COPY against `bigquery_query()` wrapping.",
e,
)
estimated = 0
@ -400,18 +456,10 @@ def materialize_query(
limit=max_bytes,
)
# COPY through a BqAccess-managed session.
# COPY through a BqAccess-managed session. The session has the BQ
# extension loaded with a SECRET token; bigquery_query() reuses that
# auth path against the billing_project for the jobs API call.
with bq.duckdb_session() as conn:
# ATTACH the data project — but only when no `bq` catalog is
# already attached. Production sessions (real BqAccess) come with
# only `:memory:` and need the ATTACH; test sessions pre-populate
# `bq` as a fixture catalog and would error on a redundant ATTACH
# (alias already in use) AND on the bigquery extension load when
# the test runner has no cached extension. Detecting via
# `duckdb_databases()` keeps the ATTACH path idempotent without
# swallowing real errors (auth, cross-project permission,
# malformed project_id) — those still propagate from the actual
# ATTACH call.
attached = {
r[0] for r in conn.execute(
"SELECT database_name FROM duckdb_databases()"
@ -424,7 +472,9 @@ def materialize_query(
try:
safe_path = str(tmp_path).replace("'", "''")
conn.execute(f"COPY ({sql}) TO '{safe_path}' (FORMAT PARQUET)")
conn.execute(
f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)"
)
rows = conn.execute(
f"SELECT count(*) FROM read_parquet('{safe_path}')"
).fetchone()[0]

View file

@ -18,7 +18,13 @@ from connectors.bigquery.extractor import materialize_query, MaterializeBudgetEr
def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess:
"""Stub BqAccess seeded with in-memory tables (same recipe as
test_bq_materialize)."""
test_bq_materialize).
A `bigquery_query(project, sql_text)` table macro is registered so the
wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 routes COPY
through the BQ jobs API for views) resolves against the in-memory tables
without needing the real BQ extension.
"""
tables = tables or {}
@contextmanager
@ -30,6 +36,12 @@ def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
for ref, body in tables.items():
conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}")
# Stub bigquery_query() so materialize_query's wrapped COPY works
# against the in-memory bq catalog without the real BQ extension.
conn.execute(
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
"AS TABLE SELECT * FROM query(sql_text)"
)
yield conn
finally:
conn.close()

View file

@ -21,6 +21,11 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess:
with a pretend `bq` catalog containing test tables. `tables` maps
DuckDB-three-part references like `'bq.test.orders'` to a SELECT
expression to seed them with.
A `bigquery_query(project, sql_text)` table macro is registered so the
wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 routes COPY
through the BQ jobs API for views) resolves against the in-memory tables
without needing the real BQ extension.
"""
tables = tables or {}
@ -34,6 +39,12 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
for ref, body in tables.items():
conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}")
# Stub bigquery_query() so materialize_query's wrapped COPY works
# against the in-memory bq catalog without the real BQ extension.
conn.execute(
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
"AS TABLE SELECT * FROM query(sql_text)"
)
yield conn
finally:
conn.close()

View file

@ -0,0 +1,54 @@
"""materialize_query must always wrap admin source_query in
bigquery_query('<billing>', '<admin>') so the COPY uses BQ jobs API,
which works for base tables AND views Storage Read API does not."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from connectors.bigquery.extractor import (
_wrap_admin_sql_for_jobs_api,
_escape_sql_string_literal,
)
def test_wrap_simple_select():
out = _wrap_admin_sql_for_jobs_api(
billing_project="prj-billing",
inner_sql="SELECT * FROM `ds.tbl`",
)
assert out == (
"SELECT * FROM bigquery_query('prj-billing', "
"'SELECT * FROM `ds.tbl`')"
)
def test_escape_single_quotes_in_inner_sql():
inner = "SELECT name FROM `ds.tbl` WHERE country = 'CZ'"
escaped = _escape_sql_string_literal(inner)
assert escaped == "SELECT name FROM `ds.tbl` WHERE country = ''CZ''"
def test_wrap_with_inner_quotes_round_trips():
inner = "SELECT * FROM `ds.tbl` WHERE col = 'foo''bar'"
out = _wrap_admin_sql_for_jobs_api("myproject", inner)
# Outer string-literal envelope must double the inner single quotes
# so DuckDB's parser sees a balanced literal.
assert out.count("'") % 2 == 0
# Round-trip: stripping the wrapper gives back the original inner exactly.
prefix = "SELECT * FROM bigquery_query('myproject', '"
assert out.startswith(prefix)
assert out.endswith("')")
middle = out[len(prefix):-2]
# DuckDB string literal escape: '' → '. Reverse it.
decoded = middle.replace("''", "'")
assert decoded == inner
def test_billing_project_validates_format():
with pytest.raises(ValueError, match="billing_project"):
_wrap_admin_sql_for_jobs_api(
billing_project="bad project'; DROP",
inner_sql="SELECT 1",
)

View file

@ -75,7 +75,13 @@ def stub_bq_extractor(monkeypatch):
@pytest.fixture
def stub_bq():
"""Real-shape BqAccess wired to in-memory DuckDB factories so the
materialize_query path can run end-to-end without GCP."""
materialize_query path can run end-to-end without GCP.
A `bigquery_query(project, sql_text)` table macro is registered so the
wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 routes COPY
through the BQ jobs API for views) resolves against the in-memory tables
without needing the real BQ extension.
"""
@contextmanager
def _session(_p):
conn = duckdb.connect(":memory:")
@ -87,6 +93,12 @@ def stub_bq():
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
"SELECT 'US' AS region, 250 AS revenue"
)
# Stub bigquery_query() so materialize_query's wrapped COPY works
# against the in-memory bq catalog without the real BQ extension.
conn.execute(
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
"AS TABLE SELECT * FROM query(sql_text)"
)
yield conn
finally:
conn.close()
@ -265,12 +277,18 @@ def test_materialized_zero_rows_logs_warning(stub_bq, tmp_path, caplog):
conn.execute("CREATE SCHEMA bq.test")
conn.execute("CREATE OR REPLACE TABLE bq.test.empty AS "
"SELECT 1 AS n WHERE FALSE")
# Stub bigquery_query() so materialize_query's wrapped COPY works
# against the in-memory bq catalog without the real BQ extension.
conn.execute(
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
"AS TABLE SELECT * FROM query(sql_text)"
)
yield conn
finally:
conn.close()
bq_empty = BqAccess(
BqProjects(billing="t", data="t"),
BqProjects(billing="test-project", data="test-project"),
client_factory=lambda _p: MagicMock(),
duckdb_session_factory=_session_empty,
)
@ -323,7 +341,7 @@ def test_attach_real_error_propagates(stub_bq, tmp_path):
conn.close()
bq_bad = BqAccess(
BqProjects(billing="t", data="t"),
BqProjects(billing="test-project", data="test-project"),
client_factory=lambda _p: MagicMock(),
duckdb_session_factory=_session_attach_fails,
)