fix(bq-materialize): wrap admin SQL in bigquery_query() so views work
Pre-fix, materialize ran the admin source_query as 'COPY (sql) TO parquet'
through the DuckDB BQ extension session. The extension defaults to the
BQ Storage Read API for bq.<ds>.<tbl> references, which rejects views
('non-table entities cannot be read with the storage API'). The fix
always wraps admin SQL into bigquery_query('<billing>', '<inner>') so
COPY uses the BQ jobs API uniformly for tables and views.
Cost guardrail dry-run now operates on the inner SQL (BQ-native), so
the BQ Python client parses it and the cap engages — pre-fix the dry-run
hit 'Table-valued function not found: bigquery_query' and fail-opened.
This commit is contained in:
parent
aa622f2af4
commit
d8a2299633
5 changed files with 180 additions and 35 deletions
|
|
@ -5,6 +5,7 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension
|
|||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import threading
|
||||
from datetime import datetime, timezone
|
||||
|
|
@ -59,6 +60,55 @@ def _detect_table_type(
|
|||
return row[0] if row else None
|
||||
|
||||
|
||||
_BILLING_PROJECT_RE = re.compile(r"^[a-z][a-z0-9-]{4,28}[a-z0-9]$")
|
||||
|
||||
|
||||
def _escape_sql_string_literal(s: str) -> str:
|
||||
"""Double every single quote so the result is safe to embed inside a
|
||||
single-quoted SQL string literal. DuckDB and BigQuery both honor the
|
||||
SQL standard `''` escape inside `'...'`. Used to wrap admin
|
||||
source_query into bigquery_query()'s second arg without breaking
|
||||
the literal envelope."""
|
||||
return s.replace("'", "''")
|
||||
|
||||
|
||||
def _wrap_admin_sql_for_jobs_api(billing_project: str, inner_sql: str) -> str:
|
||||
"""Build the COPY-source SQL that runs admin's `inner_sql` through
|
||||
the BigQuery jobs API via the DuckDB BQ extension's
|
||||
``bigquery_query()`` table function.
|
||||
|
||||
Why: the default `bq."ds"."t"` reference path uses the BQ Storage
|
||||
Read API which rejects non-base entities (views, materialized views).
|
||||
Routing through `bigquery_query()` uses the jobs API which accepts
|
||||
every entity type uniformly.
|
||||
|
||||
Args:
|
||||
billing_project: GCP project ID that bills the BQ job. Must
|
||||
match the GCP project_id grammar — anything else is rejected
|
||||
as a defense-in-depth check (admin is trusted, but a typo
|
||||
should fail closed not silently lose budget to the wrong
|
||||
project).
|
||||
inner_sql: BigQuery-flavor SQL the admin registered as
|
||||
``source_query``. Must use BQ syntax (backticks for dashed
|
||||
identifiers, native function calls). DuckDB-flavor `bq."ds"."t"`
|
||||
is NOT acceptable here — the v24 migration converts existing
|
||||
rows; new registrations are validated upstream.
|
||||
|
||||
Returns:
|
||||
A DuckDB-parseable SQL fragment suitable as the operand of
|
||||
``COPY (...) TO 'path' (FORMAT PARQUET)``.
|
||||
"""
|
||||
if not _BILLING_PROJECT_RE.match(billing_project):
|
||||
raise ValueError(
|
||||
f"billing_project {billing_project!r} is not a valid GCP project_id "
|
||||
"(grammar: ^[a-z][a-z0-9-]{4,28}[a-z0-9]$)"
|
||||
)
|
||||
return (
|
||||
f"SELECT * FROM bigquery_query('{billing_project}', "
|
||||
f"'{_escape_sql_string_literal(inner_sql)}')"
|
||||
)
|
||||
|
||||
|
||||
def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None:
|
||||
"""Create the _meta table required by the extract.duckdb contract."""
|
||||
conn.execute("DROP TABLE IF EXISTS _meta")
|
||||
|
|
@ -321,24 +371,24 @@ def materialize_query(
|
|||
to `<output_dir>/data/<table_id>.parquet` atomically.
|
||||
|
||||
Designed for `query_mode='materialized'` table_registry rows. The SQL
|
||||
is admin-registered (validated upstream) and may reference DuckDB
|
||||
three-part identifiers (`bq."dataset"."table"`) resolved by the
|
||||
in-session ATTACH, OR native BQ identifiers via the `bigquery_query()`
|
||||
table function — both work because the session has the bigquery
|
||||
extension loaded with a SECRET token.
|
||||
is admin-registered BQ-native SQL (DuckDB-flavor `bq."ds"."t"` refs are
|
||||
validated upstream). The SQL is wrapped in `bigquery_query('<billing>',
|
||||
'<inner>')` before the COPY so the BQ extension routes through the BQ
|
||||
jobs API — the default Storage Read API path rejects non-base entities
|
||||
(views, materialized views) with "non-table entities cannot be read with
|
||||
the storage API". Routing through `bigquery_query()` works uniformly for
|
||||
base tables and views alike.
|
||||
|
||||
Cost guardrail: when `max_bytes` is a positive int, run a BQ dry-run
|
||||
via `bq.client()` first; raise `MaterializeBudgetError` if the
|
||||
estimate exceeds the cap. `max_bytes=None` or `max_bytes <= 0`
|
||||
disables the guardrail (config sentinel, see
|
||||
`data_source.bigquery.max_bytes_per_materialize`).
|
||||
`data_source.bigquery.max_bytes_per_materialize`). The dry-run operates
|
||||
on the inner `sql` (BQ-native), not the wrapped form.
|
||||
|
||||
Dry-run is best-effort and fail-open: if the SQL uses DuckDB syntax
|
||||
that the native BQ client can't parse (e.g. `bq."ds"."t"`), the
|
||||
dry-run raises and we log a warning; the COPY still runs. This
|
||||
matches the BqAccess facade's "client is for native BQ SQL only"
|
||||
contract — operators who need the cap to engage write the registered
|
||||
SQL using native BQ identifiers (`\\`project.ds.t\\``).
|
||||
Dry-run is best-effort and fail-open: if the dry-run errors (transient
|
||||
upstream failure, missing google lib), we log a warning and proceed
|
||||
with the wrapped COPY.
|
||||
|
||||
Atomic write: result lands in `<id>.parquet.tmp` first, then
|
||||
`os.replace` swaps it in. A failed COPY leaves no partial file behind.
|
||||
|
|
@ -347,7 +397,9 @@ def materialize_query(
|
|||
table_id: Logical id from table_registry; becomes the parquet
|
||||
filename. Must pass `validate_identifier()` so it can't
|
||||
inject path traversal.
|
||||
sql: SELECT statement, no trailing semicolon.
|
||||
sql: BQ-native SELECT statement, no trailing semicolon. Wrapped
|
||||
in `bigquery_query()` before the COPY — must not itself
|
||||
contain a `bigquery_query()` call.
|
||||
bq: A `BqAccess` instance — provides `duckdb_session()` for the
|
||||
COPY and `client()` for the dry-run.
|
||||
output_dir: Connector root, e.g. `/data/extracts/bigquery`.
|
||||
|
|
@ -358,7 +410,8 @@ def materialize_query(
|
|||
{"rows": int, "size_bytes": int, "query_mode": "materialized"}
|
||||
|
||||
Raises:
|
||||
ValueError: if `table_id` is unsafe.
|
||||
ValueError: if `table_id` is unsafe or `bq.projects.billing` fails
|
||||
the GCP project_id grammar check.
|
||||
MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it.
|
||||
BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing /
|
||||
not_configured) — caller catches and aggregates into the trigger
|
||||
|
|
@ -377,17 +430,20 @@ def materialize_query(
|
|||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
|
||||
# Cost guardrail (best-effort — fail-open if dry-run can't parse the SQL).
|
||||
# Build the wrapped SQL once — both the cost guardrail dry-run and
|
||||
# the COPY operate on `sql` (the inner BQ SQL); only the COPY needs
|
||||
# the DuckDB-side bigquery_query() envelope.
|
||||
billing_project = bq.projects.billing
|
||||
wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql)
|
||||
|
||||
if max_bytes is not None and max_bytes > 0:
|
||||
try:
|
||||
from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl
|
||||
estimated = _bq_dry_run_bytes(bq, sql)
|
||||
estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"BQ dry-run failed for materialize cost guardrail (fail-open): %s. "
|
||||
"If the SQL uses DuckDB three-part names like bq.\"ds\".\"t\", "
|
||||
"rewrite to native BQ identifiers (`project.ds.t`) for the "
|
||||
"guardrail to engage. Proceeding with COPY.",
|
||||
"Proceeding with COPY against `bigquery_query()` wrapping.",
|
||||
e,
|
||||
)
|
||||
estimated = 0
|
||||
|
|
@ -400,18 +456,10 @@ def materialize_query(
|
|||
limit=max_bytes,
|
||||
)
|
||||
|
||||
# COPY through a BqAccess-managed session.
|
||||
# COPY through a BqAccess-managed session. The session has the BQ
|
||||
# extension loaded with a SECRET token; bigquery_query() reuses that
|
||||
# auth path against the billing_project for the jobs API call.
|
||||
with bq.duckdb_session() as conn:
|
||||
# ATTACH the data project — but only when no `bq` catalog is
|
||||
# already attached. Production sessions (real BqAccess) come with
|
||||
# only `:memory:` and need the ATTACH; test sessions pre-populate
|
||||
# `bq` as a fixture catalog and would error on a redundant ATTACH
|
||||
# (alias already in use) AND on the bigquery extension load when
|
||||
# the test runner has no cached extension. Detecting via
|
||||
# `duckdb_databases()` keeps the ATTACH path idempotent without
|
||||
# swallowing real errors (auth, cross-project permission,
|
||||
# malformed project_id) — those still propagate from the actual
|
||||
# ATTACH call.
|
||||
attached = {
|
||||
r[0] for r in conn.execute(
|
||||
"SELECT database_name FROM duckdb_databases()"
|
||||
|
|
@ -424,7 +472,9 @@ def materialize_query(
|
|||
|
||||
try:
|
||||
safe_path = str(tmp_path).replace("'", "''")
|
||||
conn.execute(f"COPY ({sql}) TO '{safe_path}' (FORMAT PARQUET)")
|
||||
conn.execute(
|
||||
f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)"
|
||||
)
|
||||
rows = conn.execute(
|
||||
f"SELECT count(*) FROM read_parquet('{safe_path}')"
|
||||
).fetchone()[0]
|
||||
|
|
|
|||
|
|
@ -18,7 +18,13 @@ from connectors.bigquery.extractor import materialize_query, MaterializeBudgetEr
|
|||
|
||||
def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess:
|
||||
"""Stub BqAccess seeded with in-memory tables (same recipe as
|
||||
test_bq_materialize)."""
|
||||
test_bq_materialize).
|
||||
|
||||
A `bigquery_query(project, sql_text)` table macro is registered so the
|
||||
wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY
|
||||
through the BQ jobs API for views) resolves against the in-memory tables
|
||||
without needing the real BQ extension.
|
||||
"""
|
||||
tables = tables or {}
|
||||
|
||||
@contextmanager
|
||||
|
|
@ -30,6 +36,12 @@ def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess:
|
|||
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
|
||||
for ref, body in tables.items():
|
||||
conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}")
|
||||
# Stub bigquery_query() so materialize_query's wrapped COPY works
|
||||
# against the in-memory bq catalog without the real BQ extension.
|
||||
conn.execute(
|
||||
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
|
||||
"AS TABLE SELECT * FROM query(sql_text)"
|
||||
)
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
|
|
|||
|
|
@ -21,6 +21,11 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess:
|
|||
with a pretend `bq` catalog containing test tables. `tables` maps
|
||||
DuckDB-three-part references like `'bq.test.orders'` to a SELECT
|
||||
expression to seed them with.
|
||||
|
||||
A `bigquery_query(project, sql_text)` table macro is registered so the
|
||||
wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY
|
||||
through the BQ jobs API for views) resolves against the in-memory tables
|
||||
without needing the real BQ extension.
|
||||
"""
|
||||
tables = tables or {}
|
||||
|
||||
|
|
@ -34,6 +39,12 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess:
|
|||
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
|
||||
for ref, body in tables.items():
|
||||
conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}")
|
||||
# Stub bigquery_query() so materialize_query's wrapped COPY works
|
||||
# against the in-memory bq catalog without the real BQ extension.
|
||||
conn.execute(
|
||||
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
|
||||
"AS TABLE SELECT * FROM query(sql_text)"
|
||||
)
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
|
|
|||
54
tests/test_bq_materialize_query_wrapping.py
Normal file
54
tests/test_bq_materialize_query_wrapping.py
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
"""materialize_query must always wrap admin source_query in
|
||||
bigquery_query('<billing>', '<admin>') so the COPY uses BQ jobs API,
|
||||
which works for base tables AND views — Storage Read API does not."""
|
||||
from __future__ import annotations
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from connectors.bigquery.extractor import (
|
||||
_wrap_admin_sql_for_jobs_api,
|
||||
_escape_sql_string_literal,
|
||||
)
|
||||
|
||||
|
||||
def test_wrap_simple_select():
|
||||
out = _wrap_admin_sql_for_jobs_api(
|
||||
billing_project="prj-billing",
|
||||
inner_sql="SELECT * FROM `ds.tbl`",
|
||||
)
|
||||
assert out == (
|
||||
"SELECT * FROM bigquery_query('prj-billing', "
|
||||
"'SELECT * FROM `ds.tbl`')"
|
||||
)
|
||||
|
||||
|
||||
def test_escape_single_quotes_in_inner_sql():
|
||||
inner = "SELECT name FROM `ds.tbl` WHERE country = 'CZ'"
|
||||
escaped = _escape_sql_string_literal(inner)
|
||||
assert escaped == "SELECT name FROM `ds.tbl` WHERE country = ''CZ''"
|
||||
|
||||
|
||||
def test_wrap_with_inner_quotes_round_trips():
|
||||
inner = "SELECT * FROM `ds.tbl` WHERE col = 'foo''bar'"
|
||||
out = _wrap_admin_sql_for_jobs_api("myproject", inner)
|
||||
# Outer string-literal envelope must double the inner single quotes
|
||||
# so DuckDB's parser sees a balanced literal.
|
||||
assert out.count("'") % 2 == 0
|
||||
# Round-trip: stripping the wrapper gives back the original inner exactly.
|
||||
prefix = "SELECT * FROM bigquery_query('myproject', '"
|
||||
assert out.startswith(prefix)
|
||||
assert out.endswith("')")
|
||||
middle = out[len(prefix):-2]
|
||||
# DuckDB string literal escape: '' → '. Reverse it.
|
||||
decoded = middle.replace("''", "'")
|
||||
assert decoded == inner
|
||||
|
||||
|
||||
def test_billing_project_validates_format():
|
||||
with pytest.raises(ValueError, match="billing_project"):
|
||||
_wrap_admin_sql_for_jobs_api(
|
||||
billing_project="bad project'; DROP",
|
||||
inner_sql="SELECT 1",
|
||||
)
|
||||
|
|
@ -75,7 +75,13 @@ def stub_bq_extractor(monkeypatch):
|
|||
@pytest.fixture
|
||||
def stub_bq():
|
||||
"""Real-shape BqAccess wired to in-memory DuckDB factories so the
|
||||
materialize_query path can run end-to-end without GCP."""
|
||||
materialize_query path can run end-to-end without GCP.
|
||||
|
||||
A `bigquery_query(project, sql_text)` table macro is registered so the
|
||||
wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY
|
||||
through the BQ jobs API for views) resolves against the in-memory tables
|
||||
without needing the real BQ extension.
|
||||
"""
|
||||
@contextmanager
|
||||
def _session(_p):
|
||||
conn = duckdb.connect(":memory:")
|
||||
|
|
@ -87,6 +93,12 @@ def stub_bq():
|
|||
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
|
||||
"SELECT 'US' AS region, 250 AS revenue"
|
||||
)
|
||||
# Stub bigquery_query() so materialize_query's wrapped COPY works
|
||||
# against the in-memory bq catalog without the real BQ extension.
|
||||
conn.execute(
|
||||
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
|
||||
"AS TABLE SELECT * FROM query(sql_text)"
|
||||
)
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
|
@ -265,12 +277,18 @@ def test_materialized_zero_rows_logs_warning(stub_bq, tmp_path, caplog):
|
|||
conn.execute("CREATE SCHEMA bq.test")
|
||||
conn.execute("CREATE OR REPLACE TABLE bq.test.empty AS "
|
||||
"SELECT 1 AS n WHERE FALSE")
|
||||
# Stub bigquery_query() so materialize_query's wrapped COPY works
|
||||
# against the in-memory bq catalog without the real BQ extension.
|
||||
conn.execute(
|
||||
"CREATE OR REPLACE MACRO bigquery_query(project, sql_text) "
|
||||
"AS TABLE SELECT * FROM query(sql_text)"
|
||||
)
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
bq_empty = BqAccess(
|
||||
BqProjects(billing="t", data="t"),
|
||||
BqProjects(billing="test-project", data="test-project"),
|
||||
client_factory=lambda _p: MagicMock(),
|
||||
duckdb_session_factory=_session_empty,
|
||||
)
|
||||
|
|
@ -323,7 +341,7 @@ def test_attach_real_error_propagates(stub_bq, tmp_path):
|
|||
conn.close()
|
||||
|
||||
bq_bad = BqAccess(
|
||||
BqProjects(billing="t", data="t"),
|
||||
BqProjects(billing="test-project", data="test-project"),
|
||||
client_factory=lambda _p: MagicMock(),
|
||||
duckdb_session_factory=_session_attach_fails,
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in a new issue