diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index bf95c2b..9c4d717 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -5,6 +5,7 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension import logging import os +import re import shutil import threading from datetime import datetime, timezone @@ -59,6 +60,55 @@ def _detect_table_type( return row[0] if row else None +_BILLING_PROJECT_RE = re.compile(r"^[a-z][a-z0-9-]{4,28}[a-z0-9]$") + + +def _escape_sql_string_literal(s: str) -> str: + """Double every single quote so the result is safe to embed inside a + single-quoted SQL string literal. DuckDB and BigQuery both honor the + SQL standard `''` escape inside `'...'`. Used to wrap admin + source_query into bigquery_query()'s second arg without breaking + the literal envelope.""" + return s.replace("'", "''") + + +def _wrap_admin_sql_for_jobs_api(billing_project: str, inner_sql: str) -> str: + """Build the COPY-source SQL that runs admin's `inner_sql` through + the BigQuery jobs API via the DuckDB BQ extension's + ``bigquery_query()`` table function. + + Why: the default `bq."ds"."t"` reference path uses the BQ Storage + Read API which rejects non-base entities (views, materialized views). + Routing through `bigquery_query()` uses the jobs API which accepts + every entity type uniformly. + + Args: + billing_project: GCP project ID that bills the BQ job. Must + match the GCP project_id grammar — anything else is rejected + as a defense-in-depth check (admin is trusted, but a typo + should fail closed not silently lose budget to the wrong + project). + inner_sql: BigQuery-flavor SQL the admin registered as + ``source_query``. Must use BQ syntax (backticks for dashed + identifiers, native function calls). DuckDB-flavor `bq."ds"."t"` + is NOT acceptable here — the v24 migration converts existing + rows; new registrations are validated upstream. + + Returns: + A DuckDB-parseable SQL fragment suitable as the operand of + ``COPY (...) TO 'path' (FORMAT PARQUET)``. + """ + if not _BILLING_PROJECT_RE.match(billing_project): + raise ValueError( + f"billing_project {billing_project!r} is not a valid GCP project_id " + "(grammar: ^[a-z][a-z0-9-]{4,28}[a-z0-9]$)" + ) + return ( + f"SELECT * FROM bigquery_query('{billing_project}', " + f"'{_escape_sql_string_literal(inner_sql)}')" + ) + + def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None: """Create the _meta table required by the extract.duckdb contract.""" conn.execute("DROP TABLE IF EXISTS _meta") @@ -321,24 +371,24 @@ def materialize_query( to `/data/.parquet` atomically. Designed for `query_mode='materialized'` table_registry rows. The SQL - is admin-registered (validated upstream) and may reference DuckDB - three-part identifiers (`bq."dataset"."table"`) resolved by the - in-session ATTACH, OR native BQ identifiers via the `bigquery_query()` - table function — both work because the session has the bigquery - extension loaded with a SECRET token. + is admin-registered BQ-native SQL (DuckDB-flavor `bq."ds"."t"` refs are + validated upstream). The SQL is wrapped in `bigquery_query('', + '')` before the COPY so the BQ extension routes through the BQ + jobs API — the default Storage Read API path rejects non-base entities + (views, materialized views) with "non-table entities cannot be read with + the storage API". Routing through `bigquery_query()` works uniformly for + base tables and views alike. Cost guardrail: when `max_bytes` is a positive int, run a BQ dry-run via `bq.client()` first; raise `MaterializeBudgetError` if the estimate exceeds the cap. `max_bytes=None` or `max_bytes <= 0` disables the guardrail (config sentinel, see - `data_source.bigquery.max_bytes_per_materialize`). + `data_source.bigquery.max_bytes_per_materialize`). The dry-run operates + on the inner `sql` (BQ-native), not the wrapped form. - Dry-run is best-effort and fail-open: if the SQL uses DuckDB syntax - that the native BQ client can't parse (e.g. `bq."ds"."t"`), the - dry-run raises and we log a warning; the COPY still runs. This - matches the BqAccess facade's "client is for native BQ SQL only" - contract — operators who need the cap to engage write the registered - SQL using native BQ identifiers (`\\`project.ds.t\\``). + Dry-run is best-effort and fail-open: if the dry-run errors (transient + upstream failure, missing google lib), we log a warning and proceed + with the wrapped COPY. Atomic write: result lands in `.parquet.tmp` first, then `os.replace` swaps it in. A failed COPY leaves no partial file behind. @@ -347,7 +397,9 @@ def materialize_query( table_id: Logical id from table_registry; becomes the parquet filename. Must pass `validate_identifier()` so it can't inject path traversal. - sql: SELECT statement, no trailing semicolon. + sql: BQ-native SELECT statement, no trailing semicolon. Wrapped + in `bigquery_query()` before the COPY — must not itself + contain a `bigquery_query()` call. bq: A `BqAccess` instance — provides `duckdb_session()` for the COPY and `client()` for the dry-run. output_dir: Connector root, e.g. `/data/extracts/bigquery`. @@ -358,7 +410,8 @@ def materialize_query( {"rows": int, "size_bytes": int, "query_mode": "materialized"} Raises: - ValueError: if `table_id` is unsafe. + ValueError: if `table_id` is unsafe or `bq.projects.billing` fails + the GCP project_id grammar check. MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it. BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing / not_configured) — caller catches and aggregates into the trigger @@ -377,17 +430,20 @@ def materialize_query( if tmp_path.exists(): tmp_path.unlink() - # Cost guardrail (best-effort — fail-open if dry-run can't parse the SQL). + # Build the wrapped SQL once — both the cost guardrail dry-run and + # the COPY operate on `sql` (the inner BQ SQL); only the COPY needs + # the DuckDB-side bigquery_query() envelope. + billing_project = bq.projects.billing + wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql) + if max_bytes is not None and max_bytes > 0: try: from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl - estimated = _bq_dry_run_bytes(bq, sql) + estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native) except Exception as e: logger.warning( "BQ dry-run failed for materialize cost guardrail (fail-open): %s. " - "If the SQL uses DuckDB three-part names like bq.\"ds\".\"t\", " - "rewrite to native BQ identifiers (`project.ds.t`) for the " - "guardrail to engage. Proceeding with COPY.", + "Proceeding with COPY against `bigquery_query()` wrapping.", e, ) estimated = 0 @@ -400,18 +456,10 @@ def materialize_query( limit=max_bytes, ) - # COPY through a BqAccess-managed session. + # COPY through a BqAccess-managed session. The session has the BQ + # extension loaded with a SECRET token; bigquery_query() reuses that + # auth path against the billing_project for the jobs API call. with bq.duckdb_session() as conn: - # ATTACH the data project — but only when no `bq` catalog is - # already attached. Production sessions (real BqAccess) come with - # only `:memory:` and need the ATTACH; test sessions pre-populate - # `bq` as a fixture catalog and would error on a redundant ATTACH - # (alias already in use) AND on the bigquery extension load when - # the test runner has no cached extension. Detecting via - # `duckdb_databases()` keeps the ATTACH path idempotent without - # swallowing real errors (auth, cross-project permission, - # malformed project_id) — those still propagate from the actual - # ATTACH call. attached = { r[0] for r in conn.execute( "SELECT database_name FROM duckdb_databases()" @@ -424,7 +472,9 @@ def materialize_query( try: safe_path = str(tmp_path).replace("'", "''") - conn.execute(f"COPY ({sql}) TO '{safe_path}' (FORMAT PARQUET)") + conn.execute( + f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" + ) rows = conn.execute( f"SELECT count(*) FROM read_parquet('{safe_path}')" ).fetchone()[0] diff --git a/tests/test_bq_cost_guardrail.py b/tests/test_bq_cost_guardrail.py index d51702e..00300d1 100644 --- a/tests/test_bq_cost_guardrail.py +++ b/tests/test_bq_cost_guardrail.py @@ -18,7 +18,13 @@ from connectors.bigquery.extractor import materialize_query, MaterializeBudgetEr def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess: """Stub BqAccess seeded with in-memory tables (same recipe as - test_bq_materialize).""" + test_bq_materialize). + + A `bigquery_query(project, sql_text)` table macro is registered so the + wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY + through the BQ jobs API for views) resolves against the in-memory tables + without needing the real BQ extension. + """ tables = tables or {} @contextmanager @@ -30,6 +36,12 @@ def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess: conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}") for ref, body in tables.items(): conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}") + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() diff --git a/tests/test_bq_materialize.py b/tests/test_bq_materialize.py index ae23680..1684278 100644 --- a/tests/test_bq_materialize.py +++ b/tests/test_bq_materialize.py @@ -21,6 +21,11 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess: with a pretend `bq` catalog containing test tables. `tables` maps DuckDB-three-part references like `'bq.test.orders'` to a SELECT expression to seed them with. + + A `bigquery_query(project, sql_text)` table macro is registered so the + wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY + through the BQ jobs API for views) resolves against the in-memory tables + without needing the real BQ extension. """ tables = tables or {} @@ -34,6 +39,12 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess: conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}") for ref, body in tables.items(): conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}") + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() diff --git a/tests/test_bq_materialize_query_wrapping.py b/tests/test_bq_materialize_query_wrapping.py new file mode 100644 index 0000000..964c7e1 --- /dev/null +++ b/tests/test_bq_materialize_query_wrapping.py @@ -0,0 +1,54 @@ +"""materialize_query must always wrap admin source_query in +bigquery_query('', '') so the COPY uses BQ jobs API, +which works for base tables AND views — Storage Read API does not.""" +from __future__ import annotations +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from connectors.bigquery.extractor import ( + _wrap_admin_sql_for_jobs_api, + _escape_sql_string_literal, +) + + +def test_wrap_simple_select(): + out = _wrap_admin_sql_for_jobs_api( + billing_project="prj-billing", + inner_sql="SELECT * FROM `ds.tbl`", + ) + assert out == ( + "SELECT * FROM bigquery_query('prj-billing', " + "'SELECT * FROM `ds.tbl`')" + ) + + +def test_escape_single_quotes_in_inner_sql(): + inner = "SELECT name FROM `ds.tbl` WHERE country = 'CZ'" + escaped = _escape_sql_string_literal(inner) + assert escaped == "SELECT name FROM `ds.tbl` WHERE country = ''CZ''" + + +def test_wrap_with_inner_quotes_round_trips(): + inner = "SELECT * FROM `ds.tbl` WHERE col = 'foo''bar'" + out = _wrap_admin_sql_for_jobs_api("myproject", inner) + # Outer string-literal envelope must double the inner single quotes + # so DuckDB's parser sees a balanced literal. + assert out.count("'") % 2 == 0 + # Round-trip: stripping the wrapper gives back the original inner exactly. + prefix = "SELECT * FROM bigquery_query('myproject', '" + assert out.startswith(prefix) + assert out.endswith("')") + middle = out[len(prefix):-2] + # DuckDB string literal escape: '' → '. Reverse it. + decoded = middle.replace("''", "'") + assert decoded == inner + + +def test_billing_project_validates_format(): + with pytest.raises(ValueError, match="billing_project"): + _wrap_admin_sql_for_jobs_api( + billing_project="bad project'; DROP", + inner_sql="SELECT 1", + ) diff --git a/tests/test_materialized_e2e.py b/tests/test_materialized_e2e.py index f0e07d1..b58b8dc 100644 --- a/tests/test_materialized_e2e.py +++ b/tests/test_materialized_e2e.py @@ -75,7 +75,13 @@ def stub_bq_extractor(monkeypatch): @pytest.fixture def stub_bq(): """Real-shape BqAccess wired to in-memory DuckDB factories so the - materialize_query path can run end-to-end without GCP.""" + materialize_query path can run end-to-end without GCP. + + A `bigquery_query(project, sql_text)` table macro is registered so the + wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY + through the BQ jobs API for views) resolves against the in-memory tables + without needing the real BQ extension. + """ @contextmanager def _session(_p): conn = duckdb.connect(":memory:") @@ -87,6 +93,12 @@ def stub_bq(): "SELECT 'EU' AS region, 100 AS revenue UNION ALL " "SELECT 'US' AS region, 250 AS revenue" ) + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() @@ -265,12 +277,18 @@ def test_materialized_zero_rows_logs_warning(stub_bq, tmp_path, caplog): conn.execute("CREATE SCHEMA bq.test") conn.execute("CREATE OR REPLACE TABLE bq.test.empty AS " "SELECT 1 AS n WHERE FALSE") + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() bq_empty = BqAccess( - BqProjects(billing="t", data="t"), + BqProjects(billing="test-project", data="test-project"), client_factory=lambda _p: MagicMock(), duckdb_session_factory=_session_empty, ) @@ -323,7 +341,7 @@ def test_attach_real_error_propagates(stub_bq, tmp_path): conn.close() bq_bad = BqAccess( - BqProjects(billing="t", data="t"), + BqProjects(billing="test-project", data="test-project"), client_factory=lambda _p: MagicMock(), duckdb_session_factory=_session_attach_fails, )