diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index 9c4d717..e8dcebf 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -3,6 +3,7 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH. """ +import hashlib import logging import os import re @@ -89,10 +90,11 @@ def _wrap_admin_sql_for_jobs_api(billing_project: str, inner_sql: str) -> str: should fail closed not silently lose budget to the wrong project). inner_sql: BigQuery-flavor SQL the admin registered as - ``source_query``. Must use BQ syntax (backticks for dashed - identifiers, native function calls). DuckDB-flavor `bq."ds"."t"` - is NOT acceptable here — the v24 migration converts existing - rows; new registrations are validated upstream. + ``source_query``. Should be BigQuery-native; DuckDB-flavor + `bq."ds"."t"` references are not enforced here but will fail at + COPY time inside the BQ jobs API. Existing rows are converted by + the v24 schema migration; new rows are validated upstream at + register/PUT. Returns: A DuckDB-parseable SQL fragment suitable as the operand of @@ -471,7 +473,7 @@ def materialize_query( ) try: - safe_path = str(tmp_path).replace("'", "''") + safe_path = _escape_sql_string_literal(str(tmp_path)) conn.execute( f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" ) @@ -489,7 +491,6 @@ def materialize_query( # thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other # requests. Hashing here keeps the open-file handle hot from the COPY # round and removes the second read. Devil's-advocate review item. - import hashlib h = hashlib.md5() with open(tmp_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): diff --git a/tests/test_bq_cost_guardrail.py b/tests/test_bq_cost_guardrail.py index 00300d1..7811a62 100644 --- a/tests/test_bq_cost_guardrail.py +++ b/tests/test_bq_cost_guardrail.py @@ -128,22 +128,26 @@ def test_zero_max_bytes_skips_dry_run(tmp_path): assert stats["rows"] == 1 -def test_dry_run_failure_is_fail_open(tmp_path): +def test_dry_run_failure_is_fail_open(tmp_path, caplog): """If the dry-run errors (DuckDB syntax, missing google lib, transient upstream failure) we don't block — log + proceed with COPY. Operators who need hard-fail watch logs for the warning.""" + import logging + out = tmp_path / "extracts" / "bigquery" out.mkdir(parents=True) bq = _bq_with_seed({"bq.test.tiny": "SELECT 1 AS n"}) - with patch( - "app.api.v2_scan._bq_dry_run_bytes", side_effect=RuntimeError("boom") - ): - stats = materialize_query( - table_id="t1", - sql="SELECT * FROM bq.test.tiny", - bq=bq, - output_dir=str(out), - max_bytes=10 * 2**30, - ) + with caplog.at_level(logging.WARNING, logger="connectors.bigquery.extractor"): + with patch( + "app.api.v2_scan._bq_dry_run_bytes", side_effect=RuntimeError("boom") + ): + stats = materialize_query( + table_id="t1", + sql="SELECT * FROM bq.test.tiny", + bq=bq, + output_dir=str(out), + max_bytes=10 * 2**30, + ) assert stats["rows"] == 1 + assert "fail-open" in caplog.text