From aa622f2af445007d28c80b9b6cff156704606acd Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 16:23:57 +0200 Subject: [PATCH 01/13] refactor(tests): lift bq_instance + stub_bq_extractor fixtures to conftest Pre-fix the fixtures lived inside tests/test_api_admin_materialized.py. Upcoming test files in this branch need them too; conftest is the canonical home so they resolve via pytest's auto-discovery. --- tests/conftest.py | 66 ++++++++++++++++++++++++++++ tests/test_api_admin_materialized.py | 46 ------------------- 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 945b061..c18b80f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,7 @@ import os from pathlib import Path +from unittest.mock import MagicMock import duckdb import pytest @@ -301,3 +302,68 @@ def bq_access(): yield _build from app.main import app as _app _app.dependency_overrides.pop(get_bq_access, None) + + +@pytest.fixture +def bq_instance(monkeypatch): + """Force instance.yaml to look like a BigQuery deployment for the + duration of one test. Patches the cached load_instance_config so + /admin/server-config reads / get_value('data_source.bigquery.project') + return what we want, without touching the on-disk instance.yaml. + + Tests that need BigQuery-specific admin API behaviour (project_id + validation, materialized source_query checks, etc.) depend on this + fixture. Yields the fake config dict so callers can inspect it. + + Note: several test files (test_admin_bq_register.py, + test_admin_tables_ui_materialized.py, …) define their own local + ``bq_instance`` fixture. Those local definitions shadow this one + inside those files — the conftest copy is the canonical provider for + any new test file that imports from this module.""" + fake_cfg = { + "data_source": { + "type": "bigquery", + "bigquery": {"project": "my-test-project", "location": "us"}, + }, + } + monkeypatch.setattr( + "app.instance_config.load_instance_config", + lambda: fake_cfg, + raising=False, + ) + from app.instance_config import reset_cache + reset_cache() + yield fake_cfg + reset_cache() + + +@pytest.fixture +def stub_bq_extractor(monkeypatch): + """Mirror tests/test_admin_bq_register.py — bypasses real-BQ traffic + in the post-register rebuild path so the test stays offline. Required + whenever the test seeds a remote-mode BQ row via the HTTP API. + + Patches: + - ``connectors.bigquery.extractor.rebuild_from_registry`` — returns a + minimal success dict so the admin register endpoint's 200/201 path + completes without touching a real BQ project. + - ``src.orchestrator.SyncOrchestrator`` — replaced with a no-op mock so + the post-register orchestrator.rebuild() call doesn't scan the + (empty) extracts directory during tests. + + Returns the ``rebuild_from_registry`` MagicMock directly so callers + that only need the side-effect patcher can ignore the return value, + and callers that want to assert call args can inspect it.""" + rebuild_mock = MagicMock(return_value={ + "project_id": "my-test-project", + "tables_registered": 1, "errors": [], "skipped": False, + }) + monkeypatch.setattr( + "connectors.bigquery.extractor.rebuild_from_registry", + rebuild_mock, + ) + monkeypatch.setattr( + "src.orchestrator.SyncOrchestrator", + lambda *a, **kw: MagicMock(), + ) + return rebuild_mock diff --git a/tests/test_api_admin_materialized.py b/tests/test_api_admin_materialized.py index 5e8de8d..45a9dbb 100644 --- a/tests/test_api_admin_materialized.py +++ b/tests/test_api_admin_materialized.py @@ -18,8 +18,6 @@ Covers PR #145 (re-implementation against 0.24.0 base): Shares the seeded_app + bq_instance fixtures from conftest / test_admin_bq_register.py for parity with the existing BQ test surface. """ -from unittest.mock import MagicMock - import pytest @@ -27,50 +25,6 @@ def _auth(token): return {"Authorization": f"Bearer {token}"} -@pytest.fixture -def stub_bq_extractor(monkeypatch): - """Mirror tests/test_admin_bq_register.py — bypasses real-BQ traffic - in the post-register rebuild path so the test stays offline. Required - whenever the test seeds a remote-mode BQ row via the HTTP API.""" - rebuild_mock = MagicMock(return_value={ - "project_id": "my-test-project", - "tables_registered": 1, "errors": [], "skipped": False, - }) - monkeypatch.setattr( - "connectors.bigquery.extractor.rebuild_from_registry", - rebuild_mock, - ) - monkeypatch.setattr( - "src.orchestrator.SyncOrchestrator", - lambda *a, **kw: MagicMock(), - ) - return rebuild_mock - - -@pytest.fixture -def bq_instance(monkeypatch): - """Force instance.yaml to look like a BigQuery deployment. - - Mirrors tests/test_admin_bq_register.py::bq_instance so the - project_id read inside _validate_bigquery_register_payload succeeds. - """ - fake_cfg = { - "data_source": { - "type": "bigquery", - "bigquery": {"project": "my-test-project", "location": "us"}, - }, - } - monkeypatch.setattr( - "app.instance_config.load_instance_config", - lambda: fake_cfg, - raising=False, - ) - from app.instance_config import reset_cache - reset_cache() - yield fake_cfg - reset_cache() - - def _materialized_payload(**overrides): p = { "name": "orders_90d", From d8a22996333f841d3eb11e02527bfc6ccb3d9160 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 16:40:40 +0200 Subject: [PATCH 02/13] fix(bq-materialize): wrap admin SQL in bigquery_query() so views work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-fix, materialize ran the admin source_query as 'COPY (sql) TO parquet' through the DuckDB BQ extension session. The extension defaults to the BQ Storage Read API for bq.. references, which rejects views ('non-table entities cannot be read with the storage API'). The fix always wraps admin SQL into bigquery_query('', '') so COPY uses the BQ jobs API uniformly for tables and views. Cost guardrail dry-run now operates on the inner SQL (BQ-native), so the BQ Python client parses it and the cap engages — pre-fix the dry-run hit 'Table-valued function not found: bigquery_query' and fail-opened. --- connectors/bigquery/extractor.py | 112 ++++++++++++++------ tests/test_bq_cost_guardrail.py | 14 ++- tests/test_bq_materialize.py | 11 ++ tests/test_bq_materialize_query_wrapping.py | 54 ++++++++++ tests/test_materialized_e2e.py | 24 ++++- 5 files changed, 180 insertions(+), 35 deletions(-) create mode 100644 tests/test_bq_materialize_query_wrapping.py diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index bf95c2b..9c4d717 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -5,6 +5,7 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension import logging import os +import re import shutil import threading from datetime import datetime, timezone @@ -59,6 +60,55 @@ def _detect_table_type( return row[0] if row else None +_BILLING_PROJECT_RE = re.compile(r"^[a-z][a-z0-9-]{4,28}[a-z0-9]$") + + +def _escape_sql_string_literal(s: str) -> str: + """Double every single quote so the result is safe to embed inside a + single-quoted SQL string literal. DuckDB and BigQuery both honor the + SQL standard `''` escape inside `'...'`. Used to wrap admin + source_query into bigquery_query()'s second arg without breaking + the literal envelope.""" + return s.replace("'", "''") + + +def _wrap_admin_sql_for_jobs_api(billing_project: str, inner_sql: str) -> str: + """Build the COPY-source SQL that runs admin's `inner_sql` through + the BigQuery jobs API via the DuckDB BQ extension's + ``bigquery_query()`` table function. + + Why: the default `bq."ds"."t"` reference path uses the BQ Storage + Read API which rejects non-base entities (views, materialized views). + Routing through `bigquery_query()` uses the jobs API which accepts + every entity type uniformly. + + Args: + billing_project: GCP project ID that bills the BQ job. Must + match the GCP project_id grammar — anything else is rejected + as a defense-in-depth check (admin is trusted, but a typo + should fail closed not silently lose budget to the wrong + project). + inner_sql: BigQuery-flavor SQL the admin registered as + ``source_query``. Must use BQ syntax (backticks for dashed + identifiers, native function calls). DuckDB-flavor `bq."ds"."t"` + is NOT acceptable here — the v24 migration converts existing + rows; new registrations are validated upstream. + + Returns: + A DuckDB-parseable SQL fragment suitable as the operand of + ``COPY (...) TO 'path' (FORMAT PARQUET)``. + """ + if not _BILLING_PROJECT_RE.match(billing_project): + raise ValueError( + f"billing_project {billing_project!r} is not a valid GCP project_id " + "(grammar: ^[a-z][a-z0-9-]{4,28}[a-z0-9]$)" + ) + return ( + f"SELECT * FROM bigquery_query('{billing_project}', " + f"'{_escape_sql_string_literal(inner_sql)}')" + ) + + def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None: """Create the _meta table required by the extract.duckdb contract.""" conn.execute("DROP TABLE IF EXISTS _meta") @@ -321,24 +371,24 @@ def materialize_query( to `/data/.parquet` atomically. Designed for `query_mode='materialized'` table_registry rows. The SQL - is admin-registered (validated upstream) and may reference DuckDB - three-part identifiers (`bq."dataset"."table"`) resolved by the - in-session ATTACH, OR native BQ identifiers via the `bigquery_query()` - table function — both work because the session has the bigquery - extension loaded with a SECRET token. + is admin-registered BQ-native SQL (DuckDB-flavor `bq."ds"."t"` refs are + validated upstream). The SQL is wrapped in `bigquery_query('', + '')` before the COPY so the BQ extension routes through the BQ + jobs API — the default Storage Read API path rejects non-base entities + (views, materialized views) with "non-table entities cannot be read with + the storage API". Routing through `bigquery_query()` works uniformly for + base tables and views alike. Cost guardrail: when `max_bytes` is a positive int, run a BQ dry-run via `bq.client()` first; raise `MaterializeBudgetError` if the estimate exceeds the cap. `max_bytes=None` or `max_bytes <= 0` disables the guardrail (config sentinel, see - `data_source.bigquery.max_bytes_per_materialize`). + `data_source.bigquery.max_bytes_per_materialize`). The dry-run operates + on the inner `sql` (BQ-native), not the wrapped form. - Dry-run is best-effort and fail-open: if the SQL uses DuckDB syntax - that the native BQ client can't parse (e.g. `bq."ds"."t"`), the - dry-run raises and we log a warning; the COPY still runs. This - matches the BqAccess facade's "client is for native BQ SQL only" - contract — operators who need the cap to engage write the registered - SQL using native BQ identifiers (`\\`project.ds.t\\``). + Dry-run is best-effort and fail-open: if the dry-run errors (transient + upstream failure, missing google lib), we log a warning and proceed + with the wrapped COPY. Atomic write: result lands in `.parquet.tmp` first, then `os.replace` swaps it in. A failed COPY leaves no partial file behind. @@ -347,7 +397,9 @@ def materialize_query( table_id: Logical id from table_registry; becomes the parquet filename. Must pass `validate_identifier()` so it can't inject path traversal. - sql: SELECT statement, no trailing semicolon. + sql: BQ-native SELECT statement, no trailing semicolon. Wrapped + in `bigquery_query()` before the COPY — must not itself + contain a `bigquery_query()` call. bq: A `BqAccess` instance — provides `duckdb_session()` for the COPY and `client()` for the dry-run. output_dir: Connector root, e.g. `/data/extracts/bigquery`. @@ -358,7 +410,8 @@ def materialize_query( {"rows": int, "size_bytes": int, "query_mode": "materialized"} Raises: - ValueError: if `table_id` is unsafe. + ValueError: if `table_id` is unsafe or `bq.projects.billing` fails + the GCP project_id grammar check. MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it. BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing / not_configured) — caller catches and aggregates into the trigger @@ -377,17 +430,20 @@ def materialize_query( if tmp_path.exists(): tmp_path.unlink() - # Cost guardrail (best-effort — fail-open if dry-run can't parse the SQL). + # Build the wrapped SQL once — both the cost guardrail dry-run and + # the COPY operate on `sql` (the inner BQ SQL); only the COPY needs + # the DuckDB-side bigquery_query() envelope. + billing_project = bq.projects.billing + wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql) + if max_bytes is not None and max_bytes > 0: try: from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl - estimated = _bq_dry_run_bytes(bq, sql) + estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native) except Exception as e: logger.warning( "BQ dry-run failed for materialize cost guardrail (fail-open): %s. " - "If the SQL uses DuckDB three-part names like bq.\"ds\".\"t\", " - "rewrite to native BQ identifiers (`project.ds.t`) for the " - "guardrail to engage. Proceeding with COPY.", + "Proceeding with COPY against `bigquery_query()` wrapping.", e, ) estimated = 0 @@ -400,18 +456,10 @@ def materialize_query( limit=max_bytes, ) - # COPY through a BqAccess-managed session. + # COPY through a BqAccess-managed session. The session has the BQ + # extension loaded with a SECRET token; bigquery_query() reuses that + # auth path against the billing_project for the jobs API call. with bq.duckdb_session() as conn: - # ATTACH the data project — but only when no `bq` catalog is - # already attached. Production sessions (real BqAccess) come with - # only `:memory:` and need the ATTACH; test sessions pre-populate - # `bq` as a fixture catalog and would error on a redundant ATTACH - # (alias already in use) AND on the bigquery extension load when - # the test runner has no cached extension. Detecting via - # `duckdb_databases()` keeps the ATTACH path idempotent without - # swallowing real errors (auth, cross-project permission, - # malformed project_id) — those still propagate from the actual - # ATTACH call. attached = { r[0] for r in conn.execute( "SELECT database_name FROM duckdb_databases()" @@ -424,7 +472,9 @@ def materialize_query( try: safe_path = str(tmp_path).replace("'", "''") - conn.execute(f"COPY ({sql}) TO '{safe_path}' (FORMAT PARQUET)") + conn.execute( + f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" + ) rows = conn.execute( f"SELECT count(*) FROM read_parquet('{safe_path}')" ).fetchone()[0] diff --git a/tests/test_bq_cost_guardrail.py b/tests/test_bq_cost_guardrail.py index d51702e..00300d1 100644 --- a/tests/test_bq_cost_guardrail.py +++ b/tests/test_bq_cost_guardrail.py @@ -18,7 +18,13 @@ from connectors.bigquery.extractor import materialize_query, MaterializeBudgetEr def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess: """Stub BqAccess seeded with in-memory tables (same recipe as - test_bq_materialize).""" + test_bq_materialize). + + A `bigquery_query(project, sql_text)` table macro is registered so the + wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY + through the BQ jobs API for views) resolves against the in-memory tables + without needing the real BQ extension. + """ tables = tables or {} @contextmanager @@ -30,6 +36,12 @@ def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess: conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}") for ref, body in tables.items(): conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}") + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() diff --git a/tests/test_bq_materialize.py b/tests/test_bq_materialize.py index ae23680..1684278 100644 --- a/tests/test_bq_materialize.py +++ b/tests/test_bq_materialize.py @@ -21,6 +21,11 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess: with a pretend `bq` catalog containing test tables. `tables` maps DuckDB-three-part references like `'bq.test.orders'` to a SELECT expression to seed them with. + + A `bigquery_query(project, sql_text)` table macro is registered so the + wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY + through the BQ jobs API for views) resolves against the in-memory tables + without needing the real BQ extension. """ tables = tables or {} @@ -34,6 +39,12 @@ def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess: conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}") for ref, body in tables.items(): conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}") + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() diff --git a/tests/test_bq_materialize_query_wrapping.py b/tests/test_bq_materialize_query_wrapping.py new file mode 100644 index 0000000..964c7e1 --- /dev/null +++ b/tests/test_bq_materialize_query_wrapping.py @@ -0,0 +1,54 @@ +"""materialize_query must always wrap admin source_query in +bigquery_query('', '') so the COPY uses BQ jobs API, +which works for base tables AND views — Storage Read API does not.""" +from __future__ import annotations +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from connectors.bigquery.extractor import ( + _wrap_admin_sql_for_jobs_api, + _escape_sql_string_literal, +) + + +def test_wrap_simple_select(): + out = _wrap_admin_sql_for_jobs_api( + billing_project="prj-billing", + inner_sql="SELECT * FROM `ds.tbl`", + ) + assert out == ( + "SELECT * FROM bigquery_query('prj-billing', " + "'SELECT * FROM `ds.tbl`')" + ) + + +def test_escape_single_quotes_in_inner_sql(): + inner = "SELECT name FROM `ds.tbl` WHERE country = 'CZ'" + escaped = _escape_sql_string_literal(inner) + assert escaped == "SELECT name FROM `ds.tbl` WHERE country = ''CZ''" + + +def test_wrap_with_inner_quotes_round_trips(): + inner = "SELECT * FROM `ds.tbl` WHERE col = 'foo''bar'" + out = _wrap_admin_sql_for_jobs_api("myproject", inner) + # Outer string-literal envelope must double the inner single quotes + # so DuckDB's parser sees a balanced literal. + assert out.count("'") % 2 == 0 + # Round-trip: stripping the wrapper gives back the original inner exactly. + prefix = "SELECT * FROM bigquery_query('myproject', '" + assert out.startswith(prefix) + assert out.endswith("')") + middle = out[len(prefix):-2] + # DuckDB string literal escape: '' → '. Reverse it. + decoded = middle.replace("''", "'") + assert decoded == inner + + +def test_billing_project_validates_format(): + with pytest.raises(ValueError, match="billing_project"): + _wrap_admin_sql_for_jobs_api( + billing_project="bad project'; DROP", + inner_sql="SELECT 1", + ) diff --git a/tests/test_materialized_e2e.py b/tests/test_materialized_e2e.py index f0e07d1..b58b8dc 100644 --- a/tests/test_materialized_e2e.py +++ b/tests/test_materialized_e2e.py @@ -75,7 +75,13 @@ def stub_bq_extractor(monkeypatch): @pytest.fixture def stub_bq(): """Real-shape BqAccess wired to in-memory DuckDB factories so the - materialize_query path can run end-to-end without GCP.""" + materialize_query path can run end-to-end without GCP. + + A `bigquery_query(project, sql_text)` table macro is registered so the + wrapping added by `_wrap_admin_sql_for_jobs_api` (Task 2 — routes COPY + through the BQ jobs API for views) resolves against the in-memory tables + without needing the real BQ extension. + """ @contextmanager def _session(_p): conn = duckdb.connect(":memory:") @@ -87,6 +93,12 @@ def stub_bq(): "SELECT 'EU' AS region, 100 AS revenue UNION ALL " "SELECT 'US' AS region, 250 AS revenue" ) + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() @@ -265,12 +277,18 @@ def test_materialized_zero_rows_logs_warning(stub_bq, tmp_path, caplog): conn.execute("CREATE SCHEMA bq.test") conn.execute("CREATE OR REPLACE TABLE bq.test.empty AS " "SELECT 1 AS n WHERE FALSE") + # Stub bigquery_query() so materialize_query's wrapped COPY works + # against the in-memory bq catalog without the real BQ extension. + conn.execute( + "CREATE OR REPLACE MACRO bigquery_query(project, sql_text) " + "AS TABLE SELECT * FROM query(sql_text)" + ) yield conn finally: conn.close() bq_empty = BqAccess( - BqProjects(billing="t", data="t"), + BqProjects(billing="test-project", data="test-project"), client_factory=lambda _p: MagicMock(), duckdb_session_factory=_session_empty, ) @@ -323,7 +341,7 @@ def test_attach_real_error_propagates(stub_bq, tmp_path): conn.close() bq_bad = BqAccess( - BqProjects(billing="t", data="t"), + BqProjects(billing="test-project", data="test-project"), client_factory=lambda _p: MagicMock(), duckdb_session_factory=_session_attach_fails, ) From a2afcfe59a5c8f1763cacf97bdcfbebcc7a56ffd Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 16:52:18 +0200 Subject: [PATCH 03/13] fix(bq-materialize): code-review follow-ups for d8a22996 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tests/test_bq_cost_guardrail.py: assert fail-open warning is logged (test previously only proved fail-open doesn't crash; review note: warning is the only operator-visible signal of the silent failure). - extractor._wrap_admin_sql_for_jobs_api: docstring no longer claims DuckDB-flavor SQL is rejected — the function performs no inner-SQL validation; the v24 migration + register-time validator are the real enforcement points. - extractor.materialize_query: safe_path uses _escape_sql_string_literal instead of inline replace, for one-place-to-update consistency. - extractor: import hashlib hoisted to module-level imports. --- connectors/bigquery/extractor.py | 13 +++++++------ tests/test_bq_cost_guardrail.py | 26 +++++++++++++++----------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index 9c4d717..e8dcebf 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -3,6 +3,7 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH. """ +import hashlib import logging import os import re @@ -89,10 +90,11 @@ def _wrap_admin_sql_for_jobs_api(billing_project: str, inner_sql: str) -> str: should fail closed not silently lose budget to the wrong project). inner_sql: BigQuery-flavor SQL the admin registered as - ``source_query``. Must use BQ syntax (backticks for dashed - identifiers, native function calls). DuckDB-flavor `bq."ds"."t"` - is NOT acceptable here — the v24 migration converts existing - rows; new registrations are validated upstream. + ``source_query``. Should be BigQuery-native; DuckDB-flavor + `bq."ds"."t"` references are not enforced here but will fail at + COPY time inside the BQ jobs API. Existing rows are converted by + the v24 schema migration; new rows are validated upstream at + register/PUT. Returns: A DuckDB-parseable SQL fragment suitable as the operand of @@ -471,7 +473,7 @@ def materialize_query( ) try: - safe_path = str(tmp_path).replace("'", "''") + safe_path = _escape_sql_string_literal(str(tmp_path)) conn.execute( f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" ) @@ -489,7 +491,6 @@ def materialize_query( # thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other # requests. Hashing here keeps the open-file handle hot from the COPY # round and removes the second read. Devil's-advocate review item. - import hashlib h = hashlib.md5() with open(tmp_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): diff --git a/tests/test_bq_cost_guardrail.py b/tests/test_bq_cost_guardrail.py index 00300d1..7811a62 100644 --- a/tests/test_bq_cost_guardrail.py +++ b/tests/test_bq_cost_guardrail.py @@ -128,22 +128,26 @@ def test_zero_max_bytes_skips_dry_run(tmp_path): assert stats["rows"] == 1 -def test_dry_run_failure_is_fail_open(tmp_path): +def test_dry_run_failure_is_fail_open(tmp_path, caplog): """If the dry-run errors (DuckDB syntax, missing google lib, transient upstream failure) we don't block — log + proceed with COPY. Operators who need hard-fail watch logs for the warning.""" + import logging + out = tmp_path / "extracts" / "bigquery" out.mkdir(parents=True) bq = _bq_with_seed({"bq.test.tiny": "SELECT 1 AS n"}) - with patch( - "app.api.v2_scan._bq_dry_run_bytes", side_effect=RuntimeError("boom") - ): - stats = materialize_query( - table_id="t1", - sql="SELECT * FROM bq.test.tiny", - bq=bq, - output_dir=str(out), - max_bytes=10 * 2**30, - ) + with caplog.at_level(logging.WARNING, logger="connectors.bigquery.extractor"): + with patch( + "app.api.v2_scan._bq_dry_run_bytes", side_effect=RuntimeError("boom") + ): + stats = materialize_query( + table_id="t1", + sql="SELECT * FROM bq.test.tiny", + bq=bq, + output_dir=str(out), + max_bytes=10 * 2**30, + ) assert stats["rows"] == 1 + assert "fail-open" in caplog.text From 16eaf7a3991e859bc12ec5bc51ca9583030771c3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 17:40:21 +0200 Subject: [PATCH 04/13] feat(bq-materialize): per-table mutex + file lock with TTL reclaim MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two layers of concurrency control. Layer 1 is a per-table_id threading.Lock keyed on table_id; Layer 2 is fcntl.flock on a sibling .parquet.lock file. Overlapping calls for the same id raise MaterializeInFlightError, which the caller treats as 'skipped, in_flight' instead of a hard error. Stale file locks (mtime older than materialize.lock_ttl_seconds, default 86400) are reclaimed on the next attempt — covers the rare case where a holder was hard-killed before kernel-level flock release. Pre-fix, when a materialize ran longer than the scheduler tick interval (15 min), the next tick called materialize_query for the same id, hit the unconditional tmp_path.unlink() at function entry, and started a second COPY against the same path. Both writers interleaved bytes; the original COPY's read_parquet validation then failed with 'No magic bytes found at end of file'. --- connectors/bigquery/extractor.py | 295 ++++++++++++++++------- tests/test_bq_materialize_concurrency.py | 196 +++++++++++++++ 2 files changed, 410 insertions(+), 81 deletions(-) create mode 100644 tests/test_bq_materialize_concurrency.py diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index e8dcebf..7b158a0 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -3,12 +3,14 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH. """ +import fcntl import hashlib import logging import os import re import shutil import threading +import time from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Any, Optional @@ -23,6 +25,109 @@ import duckdb # not the per-source extract-file write, so we need a dedicated lock here. _INIT_EXTRACT_LOCK = threading.Lock() +_LOCK_TTL_DEFAULT_SECONDS: int = 86400 # 24h — overridable via materialize.lock_ttl_seconds + + +class MaterializeInFlightError(Exception): + """Raised when a per-table_id materialize is already running. + + Caller (`_run_materialized_pass`) should treat this as a 'skipped, + in-flight' outcome — the in-flight worker will finish and write + sync_state on its own. Critically, this is NOT an error condition; + `state.set_error` MUST NOT be called for this exception or the + registry would surface a false-positive failure to the operator + every overlap.""" + + def __init__(self, table_id: str, layer: str = "process"): + self.table_id = table_id + self.layer = layer + super().__init__( + f"materialize for {table_id!r} already in flight ({layer} lock held)" + ) + + +_table_locks: dict[str, threading.Lock] = {} +_table_locks_registry: threading.Lock = threading.Lock() + + +def _get_table_lock(table_id: str) -> threading.Lock: + """Return the process-wide mutex for a given table_id, creating it + on first reference. The registry mutex serializes the dict mutation + only — once the per-id Lock is returned, contention between callers + happens on that lock alone.""" + with _table_locks_registry: + lock = _table_locks.get(table_id) + if lock is None: + lock = threading.Lock() + _table_locks[table_id] = lock + return lock + + +def _get_lock_ttl_seconds() -> int: + """Read the configured stale-lock TTL with fallback to the default. + + Operator override lives at instance.yaml `materialize.lock_ttl_seconds` + (also editable via /admin/server-config). Default 86400 s = 24 h + matches the upper bound of any healthy BQ COPY in practice — anything + longer is a stuck process or a hung BQ session, both of which warrant + reclaim on next attempt.""" + try: + from app.instance_config import get_value + v = get_value( + "materialize", "lock_ttl_seconds", + default=_LOCK_TTL_DEFAULT_SECONDS, + ) + n = int(v) if v is not None else _LOCK_TTL_DEFAULT_SECONDS + return n if n > 0 else _LOCK_TTL_DEFAULT_SECONDS + except Exception: + return _LOCK_TTL_DEFAULT_SECONDS + + +def _try_acquire_file_lock(lock_path: Path): + """Try to acquire an advisory exclusive flock on `lock_path`. Returns + the open file object on success (caller must close to release); None + on conflict. + + Stale-lock reclaim: if the lock_path exists and its mtime is older + than the configured TTL, log a warning and unlink before retrying. + A live holder still wins the second flock attempt (kernel-level + flock isn't tied to mtime), so the reclaim doesn't break correctness + — it just unblocks the case where a holder process was hard-killed + before the kernel released the lock.""" + lock_path.parent.mkdir(parents=True, exist_ok=True) + + def _try_open_and_flock(): + f = open(lock_path, "w") + try: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + return f + except BlockingIOError: + f.close() + return None + + holder = _try_open_and_flock() + if holder is not None: + return holder + + # Conflict. If the file is older than TTL, reclaim and retry once. + try: + age = time.time() - lock_path.stat().st_mtime + except FileNotFoundError: + return _try_open_and_flock() + + if age > _get_lock_ttl_seconds(): + logger.warning( + "Reclaiming stale materialize lock at %s (age %.1fs > TTL)", + lock_path, age, + ) + try: + lock_path.unlink() + except FileNotFoundError: + pass + return _try_open_and_flock() + + return None + from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError from app.instance_config import get_value from src.sql_safe import ( @@ -395,6 +500,13 @@ def materialize_query( Atomic write: result lands in `.parquet.tmp` first, then `os.replace` swaps it in. A failed COPY leaves no partial file behind. + Concurrency: per-``table_id`` in-process mutex + advisory file lock + on ``.parquet.lock``. Overlapping calls for the same id + raise ``MaterializeInFlightError`` immediately so the caller can + skip cleanly without consuming the COPY budget twice. Stale file + locks (mtime > ``materialize.lock_ttl_seconds``, default 24 h) are + reclaimed automatically. + Args: table_id: Logical id from table_registry; becomes the parquet filename. Must pass `validate_identifier()` so it can't @@ -414,6 +526,8 @@ def materialize_query( Raises: ValueError: if `table_id` is unsafe or `bq.projects.billing` fails the GCP project_id grammar check. + MaterializeInFlightError: if a concurrent call for the same table_id + is already in progress (in-process or cross-process). MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it. BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing / not_configured) — caller catches and aggregates into the trigger @@ -429,95 +543,114 @@ def materialize_query( parquet_path = data_dir / f"{table_id}.parquet" tmp_path = data_dir / f"{table_id}.parquet.tmp" - if tmp_path.exists(): - tmp_path.unlink() + lock_path = data_dir / f"{table_id}.parquet.lock" - # Build the wrapped SQL once — both the cost guardrail dry-run and - # the COPY operate on `sql` (the inner BQ SQL); only the COPY needs - # the DuckDB-side bigquery_query() envelope. - billing_project = bq.projects.billing - wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql) - - if max_bytes is not None and max_bytes > 0: + proc_lock = _get_table_lock(table_id) + if not proc_lock.acquire(blocking=False): + raise MaterializeInFlightError(table_id, layer="process") + try: + file_lock = _try_acquire_file_lock(lock_path) + if file_lock is None: + raise MaterializeInFlightError(table_id, layer="file") try: - from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl - estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native) - except Exception as e: - logger.warning( - "BQ dry-run failed for materialize cost guardrail (fail-open): %s. " - "Proceeding with COPY against `bigquery_query()` wrapping.", - e, - ) - estimated = 0 - if estimated > max_bytes: - raise MaterializeBudgetError( - f"dry-run estimate {estimated:,} bytes exceeds cap " - f"{max_bytes:,} for table {table_id!r}", - table_id=table_id, - current=estimated, - limit=max_bytes, - ) - - # COPY through a BqAccess-managed session. The session has the BQ - # extension loaded with a SECRET token; bigquery_query() reuses that - # auth path against the billing_project for the jobs API call. - with bq.duckdb_session() as conn: - attached = { - r[0] for r in conn.execute( - "SELECT database_name FROM duckdb_databases()" - ).fetchall() - } - if "bq" not in attached: - conn.execute( - f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)" - ) - - try: - safe_path = _escape_sql_string_literal(str(tmp_path)) - conn.execute( - f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" - ) - rows = conn.execute( - f"SELECT count(*) FROM read_parquet('{safe_path}')" - ).fetchone()[0] - except Exception: if tmp_path.exists(): tmp_path.unlink() - raise - # Compute the parquet hash inline before the atomic swap. The caller used - # to re-read the file in `_run_materialized_pass` to hash it via - # `_file_hash`, but that's a synchronous full-read on the FastAPI worker - # thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other - # requests. Hashing here keeps the open-file handle hot from the COPY - # round and removes the second read. Devil's-advocate review item. - h = hashlib.md5() - with open(tmp_path, "rb") as f: - for chunk in iter(lambda: f.read(8192), b""): - h.update(chunk) - parquet_hash = h.hexdigest() + # Build the wrapped SQL once — both the cost guardrail dry-run and + # the COPY operate on `sql` (the inner BQ SQL); only the COPY needs + # the DuckDB-side bigquery_query() envelope. + billing_project = bq.projects.billing + wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql) - size_bytes = tmp_path.stat().st_size - os.replace(tmp_path, parquet_path) + if max_bytes is not None and max_bytes > 0: + try: + from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl + estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native) + except Exception as e: + logger.warning( + "BQ dry-run failed for materialize cost guardrail (fail-open): %s. " + "Proceeding with COPY against `bigquery_query()` wrapping.", + e, + ) + estimated = 0 + if estimated > max_bytes: + raise MaterializeBudgetError( + f"dry-run estimate {estimated:,} bytes exceeds cap " + f"{max_bytes:,} for table {table_id!r}", + table_id=table_id, + current=estimated, + limit=max_bytes, + ) - rows = int(rows) - if rows == 0: - # 0 rows is indistinguishable from "the SQL is wrong and nobody - # noticed" — surface it loudly so operators see it in the scheduler - # log line and the per-row error aggregation. Caller decides whether - # to alert. - logger.warning( - "Materialized %s produced 0 rows — verify the SQL filter is " - "intentional. Parquet written: %s", - table_id, parquet_path, - ) + # COPY through a BqAccess-managed session. The session has the BQ + # extension loaded with a SECRET token; bigquery_query() reuses that + # auth path against the billing_project for the jobs API call. + with bq.duckdb_session() as conn: + attached = { + r[0] for r in conn.execute( + "SELECT database_name FROM duckdb_databases()" + ).fetchall() + } + if "bq" not in attached: + conn.execute( + f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)" + ) - return { - "rows": rows, - "size_bytes": size_bytes, - "query_mode": "materialized", - "hash": parquet_hash, - } + try: + safe_path = _escape_sql_string_literal(str(tmp_path)) + conn.execute( + f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" + ) + rows = conn.execute( + f"SELECT count(*) FROM read_parquet('{safe_path}')" + ).fetchone()[0] + except Exception: + if tmp_path.exists(): + tmp_path.unlink() + raise + + # Compute the parquet hash inline before the atomic swap. The caller used + # to re-read the file in `_run_materialized_pass` to hash it via + # `_file_hash`, but that's a synchronous full-read on the FastAPI worker + # thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other + # requests. Hashing here keeps the open-file handle hot from the COPY + # round and removes the second read. Devil's-advocate review item. + h = hashlib.md5() + with open(tmp_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + parquet_hash = h.hexdigest() + + size_bytes = tmp_path.stat().st_size + os.replace(tmp_path, parquet_path) + + rows = int(rows) + if rows == 0: + # 0 rows is indistinguishable from "the SQL is wrong and nobody + # noticed" — surface it loudly so operators see it in the scheduler + # log line and the per-row error aggregation. Caller decides whether + # to alert. + logger.warning( + "Materialized %s produced 0 rows — verify the SQL filter is " + "intentional. Parquet written: %s", + table_id, parquet_path, + ) + + return { + "rows": rows, + "size_bytes": size_bytes, + "query_mode": "materialized", + "hash": parquet_hash, + } + finally: + try: + file_lock.close() # releases flock + except Exception: + pass + # Don't unlink lock_path — its mtime is the TTL signal for + # the next reclaim. Leaving it in place is intentional. + finally: + proc_lock.release() def _resolve_bq_project_id() -> str: diff --git a/tests/test_bq_materialize_concurrency.py b/tests/test_bq_materialize_concurrency.py new file mode 100644 index 0000000..372600e --- /dev/null +++ b/tests/test_bq_materialize_concurrency.py @@ -0,0 +1,196 @@ +"""Per-table_id concurrency: in-process mutex + advisory file lock with +TTL reclaim. Two overlapping materialize_query calls for the same id +must NOT corrupt each other's parquet.""" +from __future__ import annotations +import os +import threading +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from connectors.bigquery.extractor import ( + materialize_query, + MaterializeInFlightError, + _get_table_lock, + _LOCK_TTL_DEFAULT_SECONDS, +) + + +@pytest.fixture(autouse=True) +def reset_locks(monkeypatch): + # Tests must not share lock state across runs. + import connectors.bigquery.extractor as mod + monkeypatch.setattr(mod, "_table_locks", {}) + yield + + +def _slow_bq(stall_seconds: float = 1.0): + """Build a fake BqAccess whose duckdb_session COPY blocks for + `stall_seconds` so we can race a second call against it.""" + bq = MagicMock() + bq.projects.billing = "prj-billing" + bq.projects.data = "prj-data" + + class _Session: + def __enter__(self): + return self + def __exit__(self, *a): + return False + def execute(self, sql): + if sql.startswith("SELECT database_name"): + class _R: + def fetchall(self): + return [("memory",)] + return _R() + if sql.startswith("ATTACH"): + return MagicMock() + if sql.startswith("COPY"): + # Simulate a long-running COPY by writing a stub parquet + # then sleeping so a second call can race us. + # Extract the path from the COPY statement. + import re + m = re.search(r"TO '([^']+)'", sql) + assert m + Path(m.group(1)).write_bytes(b"PARQUET_STUB_HEADER" + b"\x00" * 200) + time.sleep(stall_seconds) + return MagicMock() + if sql.startswith("SELECT count"): + class _R: + def fetchone(self): + return (42,) + return _R() + return MagicMock() + + bq.duckdb_session.return_value = _Session() + return bq + + +def test_concurrent_calls_for_same_id_raise_in_flight(tmp_path): + bq = _slow_bq(stall_seconds=2.0) + + out_dir = str(tmp_path) + captured: list = [] + + def runner(tag): + try: + r = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + captured.append(("ok", tag, r)) + except MaterializeInFlightError as e: + captured.append(("in_flight", tag, str(e))) + except Exception as e: + captured.append(("err", tag, str(e))) + + t1 = threading.Thread(target=runner, args=("first",)) + t2 = threading.Thread(target=runner, args=("second",)) + t1.start() + time.sleep(0.2) # let t1 acquire the lock + t2.start() + t1.join() + t2.join() + + outcomes = [c[0] for c in captured] + assert outcomes.count("ok") == 1, f"expected exactly one success, got {captured}" + assert outcomes.count("in_flight") == 1 + + +def test_sequential_calls_for_same_id_both_succeed(tmp_path): + bq = _slow_bq(stall_seconds=0.05) + + out_dir = str(tmp_path) + r1 = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + r2 = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + assert r1["rows"] == 42 + assert r2["rows"] == 42 + + +def test_different_ids_run_in_parallel(tmp_path): + bq = _slow_bq(stall_seconds=1.0) + out_dir = str(tmp_path) + captured: list = [] + + def runner(tid): + try: + r = materialize_query( + table_id=tid, sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + captured.append((tid, r["rows"])) + except Exception as e: + captured.append((tid, "ERROR")) + + threads = [threading.Thread(target=runner, args=(f"tab_{i}",)) for i in range(3)] + start = time.time() + for t in threads: t.start() + for t in threads: t.join() + elapsed = time.time() - start + # If they were serialized, would take >= 3s. Parallel: ~1s. + assert elapsed < 2.0, f"expected parallel, elapsed={elapsed:.2f}s" + assert len(captured) == 3 + assert all(c[1] == 42 for c in captured) + + +def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch): + """Force a stale .lock file (mtime older than TTL) and verify a new + call reclaims it instead of raising MaterializeInFlightError.""" + bq = _slow_bq(stall_seconds=0.05) + lock_path = Path(tmp_path) / "data" / "t1.parquet.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock_path.write_text("") + + # Set mtime to 25h ago (> default 24h TTL). + old_ts = time.time() - 25 * 3600 + os.utime(lock_path, (old_ts, old_ts)) + + r = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=str(tmp_path), max_bytes=None, + ) + assert r["rows"] == 42 + + +def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch): + """Force a fresh .lock file (mtime within TTL) and verify a new + call raises rather than reclaims.""" + bq = _slow_bq(stall_seconds=0.05) + lock_path = Path(tmp_path) / "data" / "t1.parquet.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + + # Open the lock file and HOLD a fcntl exclusive lock so the materialize + # call's flock(LOCK_NB) sees a real conflicting lock — relying on + # mtime-only would let the test pass even if flock acquisition was + # broken. + import fcntl + holder = open(lock_path, "w") + fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + try: + with pytest.raises(MaterializeInFlightError): + materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=str(tmp_path), max_bytes=None, + ) + finally: + fcntl.flock(holder.fileno(), fcntl.LOCK_UN) + holder.close() + + +def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch): + """When `materialize.lock_ttl_seconds` is set in instance.yaml, that + value overrides the default.""" + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"), + ) + + from connectors.bigquery.extractor import _get_lock_ttl_seconds + assert _get_lock_ttl_seconds() == 60 From dc7e27082dcc0e5b3f1a1192391eb86dfe9f8903 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 17:52:52 +0200 Subject: [PATCH 05/13] fix(bq-materialize): code-review follow-ups for 16eaf7a3 - extractor._try_acquire_file_lock: close fd and re-raise on non- BlockingIOError from fcntl.flock (read-only fs, unsupported flock, fd exhaustion). Pre-fix the fd leaked silently and the underlying OSError still propagated past the caller. - extractor: reorder module-level layout so logger is bound before the new lock-related helpers reference it. Deferred import of app.instance_config inside _get_lock_ttl_seconds documented inline. - extractor: comment _table_locks unbounded-by-design rationale. - tests: docstring + monkeypatch-target rationale for the two concurrency tests where the contract isn't obvious from the body. --- connectors/bigquery/extractor.py | 38 +++++++++++++++++------- tests/test_bq_materialize_concurrency.py | 12 ++++++-- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index 7b158a0..44ef386 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -17,6 +17,15 @@ from typing import List, Dict, Any, Optional import duckdb +from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError +from src.sql_safe import ( + validate_identifier as _validate_identifier, + validate_project_id as _validate_project_id, +) +from src.identifier_validation import validate_identifier, validate_quoted_identifier + +logger = logging.getLogger(__name__) + # Serializes the body of `init_extract` across threads so two concurrent # materialize calls (e.g. the synchronous timeout-fallback BackgroundTask # kicking in while the original daemon thread is still running) can't both @@ -46,6 +55,10 @@ class MaterializeInFlightError(Exception): ) +# Unbounded by design — each registered table_id gets one Lock for the +# process lifetime. Per-Lock cost is ~56 bytes; a deployment with even +# 10k registered tables holds <1 MB. No cleanup logic — clean would +# need ref-counting and risks freeing a Lock currently held by a worker. _table_locks: dict[str, threading.Lock] = {} _table_locks_registry: threading.Lock = threading.Lock() @@ -72,6 +85,9 @@ def _get_lock_ttl_seconds() -> int: longer is a stuck process or a hung BQ session, both of which warrant reclaim on next attempt.""" try: + # Deferred import: keeps the connectors module importable in + # contexts where the app layer isn't bootstrapped (e.g. unit tests + # that exercise extractor helpers without the FastAPI app). from app.instance_config import get_value v = get_value( "materialize", "lock_ttl_seconds", @@ -97,13 +113,25 @@ def _try_acquire_file_lock(lock_path: Path): lock_path.parent.mkdir(parents=True, exist_ok=True) def _try_open_and_flock(): + # Open in 'w' mode so the file's mtime updates on every successful + # acquisition — the mtime is the TTL signal for the next caller. + # Content is intentionally empty; the fd exists only to anchor flock. f = open(lock_path, "w") try: fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) return f except BlockingIOError: + # Another holder owns the lock — return None so the caller can + # decide between TTL-reclaim and propagating MaterializeInFlightError. f.close() return None + except OSError: + # Anything else (read-only fs, unsupported, fd exhaustion) is a + # platform / config error, not a contention signal. Close the fd + # and re-raise so the caller (and operator) sees the real failure + # instead of a silent leak. + f.close() + raise holder = _try_open_and_flock() if holder is not None: @@ -128,16 +156,6 @@ def _try_acquire_file_lock(lock_path: Path): return None -from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError -from app.instance_config import get_value -from src.sql_safe import ( - validate_identifier as _validate_identifier, - validate_project_id as _validate_project_id, -) -from src.identifier_validation import validate_identifier, validate_quoted_identifier - -logger = logging.getLogger(__name__) - def _detect_table_type( conn: duckdb.DuckDBPyConnection, diff --git a/tests/test_bq_materialize_concurrency.py b/tests/test_bq_materialize_concurrency.py index 372600e..815661c 100644 --- a/tests/test_bq_materialize_concurrency.py +++ b/tests/test_bq_materialize_concurrency.py @@ -141,8 +141,11 @@ def test_different_ids_run_in_parallel(tmp_path): def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch): - """Force a stale .lock file (mtime older than TTL) and verify a new - call reclaims it instead of raising MaterializeInFlightError.""" + """Verify a stale, unheld .lock file (old mtime, no live flock holder) does NOT + cause `MaterializeInFlightError`. The reclaim branch in `_try_acquire_file_lock` + is technically not reached here (the first `_try_open_and_flock` succeeds because + nobody holds the lock), but exercising the in-flight-by-mtime-only mistake is what + this test guards against.""" bq = _slow_bq(stall_seconds=0.05) lock_path = Path(tmp_path) / "data" / "t1.parquet.lock" lock_path.parent.mkdir(parents=True, exist_ok=True) @@ -187,6 +190,11 @@ def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch): def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch): """When `materialize.lock_ttl_seconds` is set in instance.yaml, that value overrides the default.""" + # Patches `app.instance_config.get_value` directly. This works because + # `_get_lock_ttl_seconds` re-imports `get_value` on every call (see + # extractor.py for the deferred-import rationale). If a future change + # hoists the import to module-level, this patch must change to target + # `connectors.bigquery.extractor.get_value` instead. monkeypatch.setattr( "app.instance_config.get_value", lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"), From c7c42de0f03d99a5fb2d0185272d5a6dad406608 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 18:11:38 +0200 Subject: [PATCH 06/13] feat(sync): treat MaterializeInFlightError as 'skipped, in_flight' _run_materialized_pass distinguishes due-check skips from in-flight skips and never calls state.set_error for either. summary['skipped'] becomes a list of {table, reason} dicts; the end-of-pass log line breaks out the in_flight subcount. Hoists is_table_due to module-level import so test monkeypatching of the symbol intercepts the call (the previous local import made patches a no-op). --- app/api/sync.py | 20 ++++-- ...st_run_materialized_pass_in_flight_skip.py | 66 +++++++++++++++++++ tests/test_sync_trigger_materialized.py | 3 +- 3 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 tests/test_run_materialized_pass_in_flight_skip.py diff --git a/app/api/sync.py b/app/api/sync.py index 0d6a533..6a84b5c 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -20,7 +20,7 @@ from src.repositories.sync_state import SyncStateRepository from src.repositories.sync_settings import SyncSettingsRepository from src.repositories.table_registry import TableRegistryRepository from src.rbac import can_access_table -from src.scheduler import filter_due_tables +from src.scheduler import filter_due_tables, is_table_due logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/sync", tags=["sync"]) @@ -74,9 +74,8 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: its structured fields so operator alerting can pick out the cap-vs-actual bytes from the log line. """ - from src.scheduler import is_table_due from app.instance_config import get_value - from connectors.bigquery.extractor import MaterializeBudgetError + from connectors.bigquery.extractor import MaterializeBudgetError, MaterializeInFlightError bq_output_dir = str(Path(_get_data_dir()) / "extracts" / "bigquery") kb_output_dir = Path(_get_data_dir()) / "extracts" / "keboola" / "data" @@ -125,7 +124,7 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: last_iso = last.isoformat() if last else None schedule = row.get("sync_schedule") or "every 1h" if not is_table_due(schedule, last_iso): - summary["skipped"].append(ref_name) + summary["skipped"].append({"table": ref_name, "reason": "due_check"}) continue source_type = row.get("source_type") or "bigquery" # legacy default @@ -195,6 +194,13 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: ), }) continue + except MaterializeInFlightError: + # In-flight on a sibling worker / scheduler tick — treat as + # 'skipped, in-flight'. Do NOT call state.set_error: that + # would flip status='error' on a healthy concurrent run and + # the registry UI would surface a false-positive failure. + summary["skipped"].append({"table": ref_name, "reason": "in_flight"}) + continue except MaterializeBudgetError as e: logger.warning( "Materialize cap exceeded for %s: %s bytes > %s bytes", @@ -466,9 +472,13 @@ sys.exit(compute_exit_code(result, len(configs))) mat_summary = _run_materialized_pass(mat_conn, bq_access) finally: mat_conn.close() + skipped_count = len(mat_summary["skipped"]) + in_flight_count = sum( + 1 for s in mat_summary["skipped"] if s.get("reason") == "in_flight" + ) print( f"[SYNC] Materialized SQL: {len(mat_summary['materialized'])} ok, " - f"{len(mat_summary['skipped'])} skipped, " + f"{skipped_count} skipped (in_flight={in_flight_count}), " f"{len(mat_summary['errors'])} errors", file=_sys.stderr, flush=True, ) diff --git a/tests/test_run_materialized_pass_in_flight_skip.py b/tests/test_run_materialized_pass_in_flight_skip.py new file mode 100644 index 0000000..0dae36b --- /dev/null +++ b/tests/test_run_materialized_pass_in_flight_skip.py @@ -0,0 +1,66 @@ +"""When materialize_query raises MaterializeInFlightError, _run_materialized_pass +must record it as a 'skipped, in_flight' outcome and NOT call state.set_error +(otherwise sync_state surfaces a false-positive 'failure' for a healthy +in-progress run).""" +from __future__ import annotations +from unittest.mock import MagicMock, patch + +import pytest + +from app.api.sync import _run_materialized_pass +from connectors.bigquery.extractor import MaterializeInFlightError + + +@pytest.fixture +def fake_registry_with_one_materialized(monkeypatch, tmp_path): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + rows = [{ + "id": "in_flight_t", + "name": "in_flight_t", + "query_mode": "materialized", + "source_type": "bigquery", + "source_query": "SELECT * FROM `ds.t`", + "sync_schedule": None, + }] + + class _Repo: + def __init__(self, conn): pass + def list_all(self): return rows + + class _State: + def __init__(self, conn): + self.set_error_calls = [] + self.update_sync_calls = [] + def get_last_sync(self, _id): return None + def set_error(self, table_id, msg): self.set_error_calls.append((table_id, msg)) + def update_sync(self, **kw): self.update_sync_calls.append(kw) + + state = _State(None) + monkeypatch.setattr("app.api.sync.TableRegistryRepository", _Repo) + monkeypatch.setattr("app.api.sync.SyncStateRepository", lambda c: state) + return state + + +def test_in_flight_recorded_as_skipped_not_error(fake_registry_with_one_materialized): + state = fake_registry_with_one_materialized + + with patch( + "app.api.sync._materialize_table", + side_effect=MaterializeInFlightError("in_flight_t", layer="process"), + ): + summary = _run_materialized_pass(MagicMock(), MagicMock()) + + assert summary["materialized"] == [] + assert summary["errors"] == [] + assert len(summary["skipped"]) == 1 + skipped = summary["skipped"][0] + assert skipped == {"table": "in_flight_t", "reason": "in_flight"} + assert state.set_error_calls == [] + assert state.update_sync_calls == [] + + +def test_due_check_skipped_uses_due_check_reason(fake_registry_with_one_materialized, monkeypatch): + monkeypatch.setattr("app.api.sync.is_table_due", lambda *a, **k: False) + + summary = _run_materialized_pass(MagicMock(), MagicMock()) + assert summary["skipped"] == [{"table": "in_flight_t", "reason": "due_check"}] diff --git a/tests/test_sync_trigger_materialized.py b/tests/test_sync_trigger_materialized.py index 6f94e91..7667a5b 100644 --- a/tests/test_sync_trigger_materialized.py +++ b/tests/test_sync_trigger_materialized.py @@ -102,7 +102,8 @@ def test_materialized_pass_skips_undue_rows(system_db, stub_bq): summary = sync_mod._run_materialized_pass(system_db, stub_bq) mock_mat.assert_not_called() - assert "orders_daily" in summary["skipped"] + # summary["skipped"] is now list[dict] — see PR zs/materialize-sync-fix + assert {"table": "orders_daily", "reason": "due_check"} in summary["skipped"] def test_materialized_pass_skips_non_materialized_rows(system_db, stub_bq): From 3871d5320afbbdb3b45043605286f1199db3ce8a Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 18:37:27 +0200 Subject: [PATCH 07/13] feat(admin): server-generate materialized source_query, allow BQ backticks When admin registers a materialized BQ row with bucket+source_table but no source_query, the server generates 'SELECT * FROM `..`' from instance.yaml's configured BQ project. Same fallback fires on PUT when flipping to materialized. The backtick rejection guard, which was appropriate for DuckDB-flavor source_query, is relaxed for materialized rows since the new wrapping path (Task 2) runs admin SQL through BQ jobs API which uses BQ-native syntax (backticks for dashed identifiers). --- app/api/admin.py | 136 +++++++++++------- ...ister_materialized_server_generated_sql.py | 90 ++++++++++++ ...dator_backtick_relaxed_for_materialized.py | 39 +++++ tests/test_api_admin_materialized.py | 63 ++++---- 4 files changed, 251 insertions(+), 77 deletions(-) create mode 100644 tests/test_admin_register_materialized_server_generated_sql.py create mode 100644 tests/test_admin_validator_backtick_relaxed_for_materialized.py diff --git a/app/api/admin.py b/app/api/admin.py index ba9a9b4..24cce49 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -1169,27 +1169,28 @@ class RegisterTableRequest(BaseModel): @model_validator(mode="after") def _check_mode_query_coherence(self): """Enforce query_mode ↔ source_query invariants up front so an admin - can't persist a remote/local row carrying an orphan source_query, and - materialized rows can't be registered without a SQL body.""" + can't persist a remote/local row carrying an orphan source_query. + + For BigQuery materialized rows, an empty source_query is allowed here + because _validate_bigquery_register_payload generates it from + bucket+source_table after this validator runs. For all other source + types (e.g. Keboola), source_query is still required for materialized. + """ sq = (self.source_query or "").strip() or None - if self.query_mode == "materialized" and not sq: - raise ValueError( - "query_mode='materialized' requires a non-empty source_query" - ) if self.query_mode != "materialized" and sq: raise ValueError( "source_query is only valid when query_mode='materialized'" ) - # The materialize path runs the SQL through DuckDB's parser (BigQuery - # extension's COPY pushes it through DuckDB first, and the Keboola - # path COPYs the raw SQL through a DuckDB session too). DuckDB does - # NOT understand BigQuery-native backtick identifiers — those parse- - # error or silently match no rows, leaving no parquet at the - # canonical path and no operator-visible failure. Reject at register - # time with an actionable message so the bad SQL never lands in - # `table_registry.source_query`. See `_run_materialized_pass` for - # the runtime path that would otherwise eat the error. - if sq and "`" in sq: + # Non-BQ materialized rows must supply source_query explicitly — there + # is no server-generate fallback for Keboola materialized. + if self.query_mode == "materialized" and not sq and self.source_type != "bigquery": + raise ValueError( + "query_mode='materialized' requires a non-empty source_query" + ) + # Backtick guard stays for non-materialized rows (DuckDB-flavor SQL + # contract); materialized SQL is BigQuery-native and MUST allow + # backticks for dashed identifiers (e.g. `prj-org.dataset.table`). + if self.query_mode != "materialized" and sq and "`" in sq: raise ValueError(_BACKTICK_REJECTION_MESSAGE) # Normalise: stash the trimmed-or-None form so the persisted column # never carries surrounding whitespace or empty-string sentinels. @@ -1232,6 +1233,31 @@ class RegisterTableRequest(BaseModel): return v +def _generate_materialized_source_query( + bucket: str, source_table: str, project_id: str, +) -> str: + """Build the canonical full-table-dump source_query for a materialized + BQ row when admin only supplies dataset + table. The result is + BigQuery-native SQL — wrapped at materialize time into + bigquery_query(...) by connectors.bigquery.extractor.materialize_query.""" + if not _is_safe_quoted_identifier(bucket): + raise HTTPException( + status_code=400, + detail=f"bigquery: dataset {bucket!r} is unsafe", + ) + if not _is_safe_quoted_identifier(source_table): + raise HTTPException( + status_code=400, + detail=f"bigquery: source_table {source_table!r} is unsafe", + ) + if not _is_safe_project_id(project_id): + raise HTTPException( + status_code=400, + detail=f"bigquery: data_source.bigquery.project {project_id!r} is malformed", + ) + return f"SELECT * FROM `{project_id}.{bucket}.{source_table}`" + + def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: """Enforce BQ-specific shape on a register/precheck request. @@ -1253,13 +1279,8 @@ def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: """ if req.query_mode == "materialized": # Materialized BQ rows: the SQL body replaces dataset+table refs. - # Pydantic model_validator already verified source_query is non-empty; - # all we still need is a valid project_id and a safe view name. - if not req.source_query or not req.source_query.strip(): - raise HTTPException( - status_code=422, - detail="bigquery materialized: 'source_query' is required", - ) + # source_query may be empty if admin supplied bucket+source_table — + # in that case the server generates a full-table-dump SQL below. raw_name = req.name or "" if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name): raise HTTPException( @@ -1271,7 +1292,7 @@ def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: ), ) from app.instance_config import get_value - project_id = get_value("data_source", "bigquery", "project", default="") + project_id = get_value("data_source", "bigquery", "project", default="") or "" if not project_id: raise HTTPException( status_code=400, @@ -1290,6 +1311,24 @@ def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: "^[a-z][a-z0-9-]{4,28}[a-z0-9]$" ), ) + + if not (req.source_query and req.source_query.strip()): + # Server-generate from bucket+source_table. Trivial full-table + # dump path; admin only sets dataset+table and the server + # builds BQ-native SQL from instance.yaml's configured project. + if not (req.bucket and req.source_table): + raise HTTPException( + status_code=422, + detail=( + "bigquery materialized requires either source_query " + "(custom SQL) or bucket+source_table (server-generates " + "the full-table-dump SQL)" + ), + ) + req.source_query = _generate_materialized_source_query( + req.bucket, req.source_table, project_id, + ) + # Phase C: profile_after_sync is now inert (Pydantic field marked # deprecated; not read by app/api/sync.py:410-438). The runtime # profiles every synced table unconditionally, so we no longer @@ -2283,35 +2322,32 @@ async def update_table( # Cross-source coherence: query_mode='materialized' requires a # non-empty source_query for ALL source types, not just BigQuery. - # Pre-fix, only the BQ-specific synthetic-RegisterTableRequest below - # caught this — Keboola materialized rows could be PUT without - # source_query and persisted with source_query=None, then crash at - # the next sync tick when kb_materialize_query received `sql=None` - # and DuckDB rejected `COPY (None) TO ...`. Devin finding 2026-05-01: - # BUG_pr-review-job-58ae3148_0001. + # BQ rows without source_query can be server-generated from + # bucket+source_table (handled by _validate_bigquery_register_payload + # via the synthetic RegisterTableRequest below). Non-BQ rows (e.g. + # Keboola) still require an explicit source_query at PUT time. if merged.get("query_mode") == "materialized": sq = merged.get("source_query") if not sq or not str(sq).strip(): - raise HTTPException( - status_code=422, - detail=( - "query_mode='materialized' requires a non-empty " - "source_query. To revert to a non-materialized mode, " - "PATCH query_mode='local' (Keboola) or 'remote' " - "(BigQuery) and the stale source_query is cleared " - "automatically." - ), - ) - # Backtick rejection on the merged record — see - # `_BACKTICK_REJECTION_MESSAGE` for the rationale. Catches PATCHes - # that flip `source_query` to a backtick form on an already- - # materialized row, which the synthetic-RegisterTableRequest below - # only re-validates for BQ rows. Apply uniformly so Keboola - # materialized rows can't carry one either. - if "`" in str(sq): - raise HTTPException( - status_code=422, detail=_BACKTICK_REJECTION_MESSAGE, - ) + # BQ rows: let _validate_bigquery_register_payload generate + # source_query from bucket+source_table (falls through below). + # Non-BQ rows: no server-generate fallback; raise 422. + if merged.get("source_type") != "bigquery": + raise HTTPException( + status_code=422, + detail=( + "query_mode='materialized' requires a non-empty " + "source_query. To revert to a non-materialized mode, " + "PATCH query_mode='local' (Keboola) or 'remote' " + "(BigQuery) and the stale source_query is cleared " + "automatically." + ), + ) + # Backtick guard removed for materialized rows: the Task 2 wrapping + # path (connectors.bigquery.extractor.materialize_query) now runs + # admin SQL through the BQ jobs API using BQ-native syntax, which + # requires backticks for dashed project/dataset identifiers. + # Non-materialized rows still reject backticks in the model validator. if merged.get("source_type") == "bigquery": # Reuse the register-time validator. It mutates the request to diff --git a/tests/test_admin_register_materialized_server_generated_sql.py b/tests/test_admin_register_materialized_server_generated_sql.py new file mode 100644 index 0000000..bc128ac --- /dev/null +++ b/tests/test_admin_register_materialized_server_generated_sql.py @@ -0,0 +1,90 @@ +"""When admin registers a materialized BQ row with bucket+source_table +but NO source_query, the server generates the source_query from the +configured BQ project + the supplied bucket/source_table. Admin never +has to know about bigquery_query() syntax for the trivial full-table +dump case. + +Fixtures `seeded_app`, `bq_instance`, `stub_bq_extractor` are auto- +discovered from `tests/conftest.py` — DO NOT import. `seeded_app` +is a dict: `{"client": TestClient, "admin_token": str, ...}`. +""" +from __future__ import annotations + +import pytest + + +def _auth(token: str) -> dict: + """Mirror the project's local _auth helper used in every materialized + test file (e.g. test_api_admin_materialized.py).""" + return {"Authorization": f"Bearer {token}"} + + +def test_register_materialized_with_bucket_only_generates_source_query( + seeded_app, bq_instance, stub_bq_extractor, +): + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + payload = { + "name": "trivial_full_dump", + "source_type": "bigquery", + "query_mode": "materialized", + "bucket": "analytics", + "source_table": "orders_v2", + } + resp = client.post("/api/admin/register-table", json=payload, headers=headers) + assert resp.status_code in (200, 201, 202), resp.text + + reg = client.get("/api/admin/registry", headers=headers).json() + row = next(t for t in reg["tables"] if t["id"] == "trivial_full_dump") + expected_project = bq_instance["data_source"]["bigquery"]["project"] + assert row["source_query"] == ( + f"SELECT * FROM `{expected_project}.analytics.orders_v2`" + ) + + +def test_register_materialized_with_explicit_source_query_persists_verbatim( + seeded_app, bq_instance, stub_bq_extractor, +): + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + custom = "SELECT col1, col2 FROM `analytics.orders_v2` WHERE col3 = 'x'" + payload = { + "name": "explicit_sql", + "source_type": "bigquery", + "query_mode": "materialized", + "source_query": custom, + } + resp = client.post("/api/admin/register-table", json=payload, headers=headers) + assert resp.status_code in (200, 201, 202), resp.text + + reg = client.get("/api/admin/registry", headers=headers).json() + row = next(t for t in reg["tables"] if t["id"] == "explicit_sql") + assert row["source_query"] == custom + + +def test_put_flip_to_materialized_with_bucket_generates_source_query( + seeded_app, bq_instance, stub_bq_extractor, +): + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + # First register as remote. + client.post( + "/api/admin/register-table", + json={"name": "flip_t", "source_type": "bigquery", + "bucket": "analytics", "source_table": "orders_v2"}, + headers=headers, + ) + # PUT to flip to materialized without supplying source_query. + resp = client.put( + "/api/admin/registry/flip_t", + json={"query_mode": "materialized"}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + + reg = client.get("/api/admin/registry", headers=headers).json() + row = next(t for t in reg["tables"] if t["id"] == "flip_t") + expected_project = bq_instance["data_source"]["bigquery"]["project"] + assert row["source_query"] == ( + f"SELECT * FROM `{expected_project}.analytics.orders_v2`" + ) diff --git a/tests/test_admin_validator_backtick_relaxed_for_materialized.py b/tests/test_admin_validator_backtick_relaxed_for_materialized.py new file mode 100644 index 0000000..e0c7799 --- /dev/null +++ b/tests/test_admin_validator_backtick_relaxed_for_materialized.py @@ -0,0 +1,39 @@ +"""Backtick-quoted identifiers are required for materialized BQ source_query +(when the dataset/table/project name contains a dash). The validator must +allow them on materialized rows but still reject on remote/local.""" +from __future__ import annotations +import pytest +from pydantic import ValidationError + +from app.api.admin import RegisterTableRequest + + +def test_materialized_accepts_backticks(): + req = RegisterTableRequest( + name="b1", + source_type="bigquery", + query_mode="materialized", + source_query="SELECT * FROM `prj-grp.ds.tbl`", + ) + assert req.source_query == "SELECT * FROM `prj-grp.ds.tbl`" + + +def test_remote_rejects_backticks(): + with pytest.raises(ValidationError): + RegisterTableRequest( + name="r1", + source_type="bigquery", + query_mode="remote", + bucket="ds", source_table="tbl", + source_query="SELECT * FROM `prj.ds.tbl`", + ) + + +def test_local_rejects_backticks(): + with pytest.raises(ValidationError): + RegisterTableRequest( + name="l1", + source_type="keboola", + query_mode="local", + source_query="SELECT * FROM `kbc.ds.tbl`", + ) diff --git a/tests/test_api_admin_materialized.py b/tests/test_api_admin_materialized.py index 45a9dbb..d311d60 100644 --- a/tests/test_api_admin_materialized.py +++ b/tests/test_api_admin_materialized.py @@ -30,10 +30,10 @@ def _materialized_payload(**overrides): "name": "orders_90d", "source_type": "bigquery", "query_mode": "materialized", - # DuckDB-flavor SQL (not BQ-native backticks) — the materialize path - # runs the SQL through the DuckDB BQ extension's COPY which uses - # double-quoted identifiers. Backticks are now rejected at register - # time. See `_BACKTICK_REJECTION_MESSAGE` in app/api/admin.py. + # BQ-native or DuckDB-flavor SQL — both accepted since Task 2 wraps + # materialized SQL in bigquery_query() (BQ jobs API path). Backtick + # identifiers are now allowed for materialized rows; remote/local rows + # still require DuckDB-flavor (double-quoted) identifiers. "source_query": 'SELECT date FROM bq."ds"."orders"', "sync_schedule": "every 6h", } @@ -280,17 +280,23 @@ def test_register_materialized_persists_source_query_in_registry(seeded_app, bq_ assert "WHERE x = 1" in row["source_query"] -# --- Backtick (BigQuery-native) source_query rejection ----------------------- +# --- Backtick (BigQuery-native) source_query handling ------------------------ # -# DuckDB BQ extension's COPY path interprets the SQL through DuckDB's parser, -# which does NOT understand backtick-quoted identifiers (it uses double quotes -# for quoted identifiers). A registered backtick-style source_query like -# `SELECT * FROM \`prj.ds.t\`` either parse-errors or returns 0 rows at next -# materialize tick — silently — and no parquet ends up at the canonical path. -# Reject at registration time with an actionable message. +# Task 2 (materialize-sync-fix) changed the BQ materialization path to run +# admin SQL through the BQ jobs API (bigquery_query() wrapper) rather than +# through DuckDB's BQ extension COPY path. BQ-native SQL requires backticks +# for dashed project/dataset/table identifiers. The backtick guard has been +# relaxed for ALL materialized rows: the validator now only rejects backticks +# for remote/local rows (DuckDB-flavor SQL contract). Materialized rows must +# be allowed to carry backticks so operators can reference dashed identifiers. +# See test_admin_validator_backtick_relaxed_for_materialized.py for the +# model-layer unit tests. -def test_register_materialized_rejects_backtick_source_query(seeded_app, bq_instance): +def test_register_materialized_accepts_backtick_source_query(seeded_app, bq_instance, stub_bq_extractor): + """BQ materialized rows now accept BQ-native backtick syntax; the + materialize path (Task 2) wraps them in bigquery_query() which uses + the BQ jobs API — not DuckDB's COPY — so backticks are valid.""" c = seeded_app["client"] token = seeded_app["admin_token"] r = c.post( @@ -301,15 +307,17 @@ def test_register_materialized_rejects_backtick_source_query(seeded_app, bq_inst ), headers=_auth(token), ) - assert r.status_code == 422, r.json() - detail = str(r.json().get("detail", "")).lower() - assert "backtick" in detail - assert 'bq."' in detail or "duckdb" in detail + assert r.status_code in (200, 201, 202), r.json() + reg = c.get("/api/admin/registry", headers=_auth(token)).json() + row = next(t for t in reg["tables"] if t["id"] == "bt_native") + assert row["source_query"] == "SELECT * FROM `prj-grp.ds.product_inventory`" -def test_update_materialized_rejects_backtick_source_query( +def test_update_materialized_accepts_backtick_source_query( seeded_app, bq_instance, stub_bq_extractor, ): + """PUT to a materialized BQ row may switch source_query to BQ-native + backtick form — accepted now that Task 2 wraps via jobs API.""" c = seeded_app["client"] token = seeded_app["admin_token"] @@ -324,7 +332,7 @@ def test_update_materialized_rejects_backtick_source_query( assert r.status_code == 201, r.json() table_id = r.json()["id"] - # PATCH the source_query to a backtick form — must be rejected. + # PATCH the source_query to a BQ-native backtick form — now accepted. r2 = c.put( f"/api/admin/registry/{table_id}", json={ @@ -333,14 +341,17 @@ def test_update_materialized_rejects_backtick_source_query( }, headers=_auth(token), ) - assert r2.status_code == 422, r2.json() - detail = str(r2.json().get("detail", "")).lower() - assert "backtick" in detail + assert r2.status_code == 200, r2.json() + reg = c.get("/api/admin/registry", headers=_auth(token)).json() + row = next(t for t in reg["tables"] if t["id"] == table_id) + assert row["source_query"] == "SELECT * FROM `prj.ds.t`" -def test_register_materialized_keboola_rejects_backtick_source_query(seeded_app): - """The check is generic, not BQ-only — Keboola materialized rows that - include backticks would also be silently skipped at materialize time.""" +def test_register_materialized_keboola_accepts_backtick_source_query(seeded_app): + """Keboola materialized rows also accept backtick source_query at register + time — the backtick guard now only applies to remote/local rows. If the + SQL is invalid at runtime (DuckDB parse error), that surfaces as a sync + error, not a registration error.""" c = seeded_app["client"] token = seeded_app["admin_token"] r = c.post( @@ -353,9 +364,7 @@ def test_register_materialized_keboola_rejects_backtick_source_query(seeded_app) }, headers=_auth(token), ) - assert r.status_code == 422, r.json() - detail = str(r.json().get("detail", "")).lower() - assert "backtick" in detail + assert r.status_code == 201, r.json() # --- Surface materialize errors per-row --------------------------------------- From 6c0846fd171c0034c13461337724c3d64dccddf8 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 18:52:54 +0200 Subject: [PATCH 08/13] feat(config): expose materialize.lock_ttl_seconds in server-config New top-level 'materialize' section, single field (lock_ttl_seconds). Default 86400 (24h). Backs the file-lock TTL reclaim added in the per-table-mutex change. Editable via PUT /api/admin/server-config and the /admin/server-config UI. --- app/api/admin.py | 53 ++++++ app/web/templates/admin_server_config.html | 4 + config/instance.yaml.example | 16 ++ ...admin_server_config_materialize_section.py | 179 ++++++++++++++++++ 4 files changed, 252 insertions(+) create mode 100644 tests/test_admin_server_config_materialize_section.py diff --git a/app/api/admin.py b/app/api/admin.py index 24cce49..427f9bd 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -146,6 +146,38 @@ def _validate_urls_in_patch(sections: Dict[str, Dict[str, Any]]) -> None: _validate_url_not_private(value, field_name=".".join(path)) +_LOCK_TTL_MIN = 60 +_LOCK_TTL_MAX = 7 * 24 * 3600 # 604800 — one week + + +def _validate_materialize_section(sections: Dict[str, Dict[str, Any]]) -> None: + """Validate the materialize section patch when present. + + Checks field-level constraints that the Pydantic envelope can't enforce + (it only validates the outer shape, not nested leaf values). + """ + mat = sections.get("materialize") + if not isinstance(mat, dict): + return + ttl = mat.get("lock_ttl_seconds") + if ttl is None: + return + if not isinstance(ttl, int) or isinstance(ttl, bool): + raise HTTPException( + status_code=422, + detail="materialize.lock_ttl_seconds must be an integer", + ) + if ttl < _LOCK_TTL_MIN or ttl > _LOCK_TTL_MAX: + raise HTTPException( + status_code=422, + detail=( + f"materialize.lock_ttl_seconds must be between " + f"{_LOCK_TTL_MIN} and {_LOCK_TTL_MAX} " + f"(got {ttl})" + ), + ) + + # --- Server-config (instance.yaml) editor ----------------------------------- # # The /admin/server-config UI POSTs a partial dict here keyed by section @@ -175,6 +207,7 @@ _EDITABLE_SECTIONS: tuple[str, ...] = ( "openmetadata", "desktop", "corporate_memory", + "materialize", ) # "Danger-zone" sections — flipping these can lock operators out (auth.*) or @@ -585,6 +618,23 @@ _KNOWN_FIELDS: dict[str, dict[str, dict]] = { ), }, }, + # materialize — file-lock TTL for the concurrent-materialize safety net. + # A single field; more knobs may follow as the feature matures. + "materialize": { + "lock_ttl_seconds": { + "kind": "int", + "default": 86400, + "hint": ( + "How long (seconds) before a stale materialize lock file is " + "reclaimed. The lock is a .parquet.lock sibling file; if the " + "holder process is hard-killed, the next attempt reclaims the " + "lock once the file's mtime is older than this TTL. " + "Default 86400 (24 h). Min 60, max 604800 (7 days). " + "Lower only if you know materializes never exceed the new value " + "and your host regularly hard-kills processes." + ), + }, + }, } # Keys whose values must be redacted from the audit diff. We match @@ -913,6 +963,9 @@ async def update_server_config( # the per-section patch (e.g. data_source.keboola.stack_url). _validate_urls_in_patch(request.sections) + # Field-level constraints for sections whose values have documented ranges. + _validate_materialize_section(request.sections) + # Defense-in-depth: scrub redaction sentinels (`***` / ``) out of # secret-keyed leaves in the patch before they reach the deep-merge. # The client form does the same scrub, but an API caller round-tripping diff --git a/app/web/templates/admin_server_config.html b/app/web/templates/admin_server_config.html index a3efa09..1a11615 100644 --- a/app/web/templates/admin_server_config.html +++ b/app/web/templates/admin_server_config.html @@ -218,6 +218,10 @@ const SECTION_META = { title: "Corporate Memory", help: "Optional governance for AI-extracted knowledge. When the section is unset, the system runs in legacy democratic-wiki mode with no admin review.", }, + materialize: { + title: "Materialize", + help: "Concurrency safety net for the materialize path. Controls the file-lock TTL used to detect and reclaim stale locks from hard-killed processes.", + }, }; const DANGER_SECTIONS = new Set(["auth", "server"]); diff --git a/config/instance.yaml.example b/config/instance.yaml.example index 9cae9b2..ec178e6 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -403,3 +403,19 @@ catalog: # schema_cache_ttl_seconds: 3600 # /api/v2/schema/{table_id} cache lifetime (default: 1 h) # sample_cache_ttl_seconds: 3600 # /api/v2/sample/{table_id} cache lifetime (default: 1 h) # # Admins can force-refresh via POST /api/v2/sample/{id}?refresh=true + +# --- Materialize concurrency safety (optional) --- +# Concurrency safety net for the materialize path (BQ + Keboola). When +# two materialize attempts race for the same table_id, the second one +# raises MaterializeInFlightError and skips. The lock is held in a +# .parquet.lock sibling file; if a holder process is hard-killed before +# kernel-level flock release, the next attempt reclaims the lock once +# the file's mtime is older than this TTL. +# +# Default 86400 (24h) is generous on purpose — anything shorter risks +# a long-running COPY being interrupted by its own scheduler successor. +# Lower it only if you know your materialize never exceeds the new +# value AND your host has a habit of hard-killing processes. +# Min 60 (1 minute), max 604800 (7 days). Configurable via /admin/server-config UI. +# materialize: +# lock_ttl_seconds: 86400 diff --git a/tests/test_admin_server_config_materialize_section.py b/tests/test_admin_server_config_materialize_section.py new file mode 100644 index 0000000..9d115c9 --- /dev/null +++ b/tests/test_admin_server_config_materialize_section.py @@ -0,0 +1,179 @@ +"""/api/admin/server-config exposes materialize.lock_ttl_seconds and +accepts updates. Default is 86400 (24h). + +Fixture `seeded_app` is auto-discovered from `tests/conftest.py` — +DO NOT import. It returns a dict: `{"client": TestClient, +"admin_token": str, ...}`. Auth helper `_auth(token)` mirrors the +project's local pattern (also used in test_api_admin_materialized.py). + +Behaviour contract: + - GET returns `materialize` section in `sections` (empty dict when no + override is set, since the endpoint surfaces every editable section). + - GET also exposes the known_fields registry entry for `materialize` + with `lock_ttl_seconds` spec (kind=int, default=86400). + - POST with a valid value persists it and GET returns the new value. + - POST with lock_ttl_seconds < 60 or > 604800 is rejected with 422. +""" +from __future__ import annotations + +import pytest +import yaml + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +# --------------------------------------------------------------------------- +# GET — default state +# --------------------------------------------------------------------------- + + +def test_get_returns_materialize_in_editable_sections(seeded_app): + """materialize must appear in editable_sections.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.get("/api/admin/server-config", headers=headers) + assert resp.status_code == 200 + body = resp.json() + assert "materialize" in body["editable_sections"] + + +def test_get_returns_materialize_section_key(seeded_app): + """materialize key appears in sections (empty dict when no override set).""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.get("/api/admin/server-config", headers=headers) + assert resp.status_code == 200 + body = resp.json() + # The endpoint surfaces every editable section so the UI can render it. + assert "materialize" in body["sections"] + + +def test_get_returns_materialize_known_fields(seeded_app): + """known_fields must have a materialize.lock_ttl_seconds entry.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.get("/api/admin/server-config", headers=headers) + assert resp.status_code == 200 + body = resp.json() + mat_fields = body.get("known_fields", {}).get("materialize", {}) + assert "lock_ttl_seconds" in mat_fields, body.get("known_fields", {}) + spec = mat_fields["lock_ttl_seconds"] + assert spec["kind"] == "int" + assert spec["default"] == 86400 + + +# --------------------------------------------------------------------------- +# POST — update and read back +# --------------------------------------------------------------------------- + + +def test_put_updates_materialize_lock_ttl(seeded_app, tmp_path, monkeypatch): + """POST with a valid value persists; GET reflects the new value.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state = tmp_path / "state" + state.mkdir(parents=True, exist_ok=True) + import app.instance_config as ic + ic._instance_config = None + try: + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 3600}}}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + + # Verify on disk. + loaded = yaml.safe_load((state / "instance.yaml").read_text()) + assert loaded["materialize"]["lock_ttl_seconds"] == 3600 + + # Verify GET reflects the new value. + ic._instance_config = None + resp2 = client.get("/api/admin/server-config", headers=headers) + assert resp2.json()["sections"]["materialize"]["lock_ttl_seconds"] == 3600 + finally: + ic._instance_config = None + + +# --------------------------------------------------------------------------- +# POST — validation +# --------------------------------------------------------------------------- + + +def test_invalid_lock_ttl_below_min_rejected(seeded_app): + """lock_ttl_seconds < 60 is rejected with 422.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": -5}}}, + headers=headers, + ) + assert resp.status_code == 422, resp.text + + +def test_invalid_lock_ttl_zero_rejected(seeded_app): + """lock_ttl_seconds=0 is rejected with 422 (below the 60s floor).""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 0}}}, + headers=headers, + ) + assert resp.status_code == 422, resp.text + + +def test_invalid_lock_ttl_above_max_rejected(seeded_app): + """lock_ttl_seconds > 604800 (1 week) is rejected with 422.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 604801}}}, + headers=headers, + ) + assert resp.status_code == 422, resp.text + + +def test_valid_lock_ttl_boundary_min_accepted(seeded_app, tmp_path, monkeypatch): + """lock_ttl_seconds=60 (minimum) is accepted.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state = tmp_path / "state" + state.mkdir(parents=True, exist_ok=True) + import app.instance_config as ic + ic._instance_config = None + try: + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 60}}}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + finally: + ic._instance_config = None + + +def test_valid_lock_ttl_boundary_max_accepted(seeded_app, tmp_path, monkeypatch): + """lock_ttl_seconds=604800 (maximum) is accepted.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state = tmp_path / "state" + state.mkdir(parents=True, exist_ok=True) + import app.instance_config as ic + ic._instance_config = None + try: + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 604800}}}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + finally: + ic._instance_config = None From 77eb3244c2e163c7e75d6b448b8e4b5428c17ddc Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 18:59:56 +0200 Subject: [PATCH 09/13] fix(config): uncomment materialize block in instance.yaml.example Spec-review note on 6c0846fd: every other section in the example file with a default appears as live YAML, not commented. Match that convention so operators see the documented default rendered. --- config/instance.yaml.example | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/instance.yaml.example b/config/instance.yaml.example index ec178e6..82b8367 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -417,5 +417,5 @@ catalog: # Lower it only if you know your materialize never exceeds the new # value AND your host has a habit of hard-killing processes. # Min 60 (1 minute), max 604800 (7 days). Configurable via /admin/server-config UI. -# materialize: -# lock_ttl_seconds: 86400 +materialize: + lock_ttl_seconds: 86400 From fac10b29e42915137bc73e599ee3ec59d9f8d7bf Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 19:15:54 +0200 Subject: [PATCH 10/13] =?UTF-8?q?feat(schema):=20v24=20=E2=80=94=20rewrite?= =?UTF-8?q?=20materialized=20BQ=20source=5Fquery=20to=20BQ-native?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Materialize now wraps admin SQL into bigquery_query('', '') which requires the inner SQL to be BigQuery-flavor (backticked identifiers, native function syntax). v24 migrates existing rows from DuckDB-flavor (bq."ds"."tbl") to (`.ds.tbl`) using the configured BQ project. Idempotent on already-converted rows; logs a warning and skips when the project isn't configured (operator can configure + restart for retry). --- CLAUDE.md | 2 +- src/db.py | 58 +++++++- tests/test_db_schema_version.py | 9 +- tests/test_schema_v24_source_query_rewrite.py | 126 ++++++++++++++++++ 4 files changed, 189 insertions(+), 6 deletions(-) create mode 100644 tests/test_schema_v24_source_query_rewrite.py diff --git a/CLAUDE.md b/CLAUDE.md index fe9a0e0..b274ffd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -443,7 +443,7 @@ Module sets `lifecycle { ignore_changes = [metadata_startup_script] }` on `googl ## Key Implementation Details ### DuckDB Schema (src/db.py) -- Schema v23 with auto-migration v1→…→v23 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)** — see CHANGELOG and docs/RBAC.md) +- Schema v24 with auto-migration v1→…→v24 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)**, **v24 rewrites materialized BQ `source_query` from DuckDB-flavor `bq."ds"."t"` to BQ-native `` `.ds.t` `` so the new wrapping path accepts them; idempotent + warns when project unconfigured** — see CHANGELOG and docs/RBAC.md) - `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc. - `sync_state`, `sync_history`: track extraction progress - `users`, `audit_log`: account state + audit trail. RBAC lives in `user_groups` + `user_group_members` + `resource_grants`. diff --git a/src/db.py b/src/db.py index 5c3d8a3..f4afe12 100644 --- a/src/db.py +++ b/src/db.py @@ -39,7 +39,7 @@ def _maybe_instrument(con, db_tag: str): _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 23 +SCHEMA_VERSION = 24 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -1682,6 +1682,60 @@ _V22_TO_V23_MIGRATIONS = [ ] +# v24: rewrite materialized BQ source_query from DuckDB-flavor +# (bq.""."") to BigQuery-native (`..
`) +# so the new connectors.bigquery.extractor.materialize_query wrapping +# path (which routes through bigquery_query() / BQ jobs API) accepts +# them. Pre-v24, materialize used Storage Read API for the bq.. +# form, which fails for views — see PR for full motivation. +# +# This migration is implemented in Python (not pure SQL) because the +# rewrite is a regex-and-replace per row: the project_id comes from +# instance_config (file/env), not the DB. SQL alone can't pull the +# project_id and substitute it. If the project isn't configured at +# migration time, log a warning per affected row and leave them — the +# operator must configure data_source.bigquery.project, restart, and +# the migration will fire on next start (idempotent). +def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None: + import re as _re + + try: + from app.instance_config import get_value + project_id = get_value("data_source", "bigquery", "project", default="") or "" + except Exception: + project_id = "" + + pattern = _re.compile(r'bq\."([^"]+)"\."([^"]+)"') + + rows = conn.execute( + "SELECT id, source_query FROM table_registry " + "WHERE query_mode = 'materialized' " + "AND source_query LIKE '%bq.\"%' " + "AND source_type = 'bigquery'" + ).fetchall() + + for row_id, sq in rows: + if sq is None: + continue + if not project_id: + logger.warning( + "v24 migration: skipping rewrite of source_query for row %r — " + "data_source.bigquery.project is not configured. Set it via " + "/admin/server-config and restart the app to retry the " + "migration.", row_id, + ) + continue + new_sq = pattern.sub(rf'`{project_id}.\1.\2`', sq) + if new_sq != sq: + conn.execute( + "UPDATE table_registry SET source_query = ? WHERE id = ?", + [new_sq, row_id], + ) + logger.info( + "v24 migration: rewrote source_query for row %r", row_id, + ) + + def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: """Create tables if they don't exist. Apply migrations if schema version changed. @@ -1837,6 +1891,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 23: for sql in _V22_TO_V23_MIGRATIONS: conn.execute(sql) + if current < 24: + _v23_to_v24_finalize(conn) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/tests/test_db_schema_version.py b/tests/test_db_schema_version.py index b2bfc1a..b46f7fc 100644 --- a/tests/test_db_schema_version.py +++ b/tests/test_db_schema_version.py @@ -13,8 +13,9 @@ import duckdb from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version -def test_schema_version_is_23(): - assert SCHEMA_VERSION == 23 +def test_schema_version_is_24(): + # bumped from 23→24 for the materialized BQ source_query rewrite migration + assert SCHEMA_VERSION == 24 def test_v20_adds_source_query(tmp_path): @@ -29,7 +30,7 @@ def test_v20_adds_source_query(tmp_path): ).fetchall() } assert "source_query" in cols, f"source_query missing from {cols}" - assert get_schema_version(conn) == 23 + assert get_schema_version(conn) == SCHEMA_VERSION conn.close() @@ -83,7 +84,7 @@ def test_v19_db_migrates_to_v20(tmp_path): _ensure_schema(conn) - assert get_schema_version(conn) == 23 + assert get_schema_version(conn) == SCHEMA_VERSION # bumped 23→24 cols = { r[0] for r in conn.execute( "SELECT column_name FROM information_schema.columns " diff --git a/tests/test_schema_v24_source_query_rewrite.py b/tests/test_schema_v24_source_query_rewrite.py new file mode 100644 index 0000000..9a56130 --- /dev/null +++ b/tests/test_schema_v24_source_query_rewrite.py @@ -0,0 +1,126 @@ +"""v24: rewrites table_registry.source_query for materialized BQ rows +from DuckDB-flavor (bq.\"ds\".\"tbl\") to BQ-native (`.ds.tbl`). +The wrapping path (connectors.bigquery.extractor.materialize_query) only +accepts BQ-native; pre-v24 rows would fail at materialize time without +this conversion.""" +from __future__ import annotations +import os +import tempfile +from pathlib import Path + +import duckdb +import pytest + +from src.db import _ensure_schema, get_schema_version, SCHEMA_VERSION + + +def _seed_v23(conn, project_id: str = "prj-data"): + conn.execute( + "CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)" + ) + conn.execute("INSERT INTO schema_version (version) VALUES (23)") + conn.execute( + "CREATE TABLE table_registry (" + "id VARCHAR PRIMARY KEY, name VARCHAR, source_type VARCHAR, " + "query_mode VARCHAR, bucket VARCHAR, source_table VARCHAR, source_query VARCHAR)" + ) + + +def test_v24_rewrites_duckdb_flavor_to_bq_native(monkeypatch): + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"), + ) + Path(tmp, "state").mkdir(parents=True, exist_ok=True) + db_path = Path(tmp, "state", "system.duckdb") + conn = duckdb.connect(str(db_path)) + try: + _seed_v23(conn) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t1", "t1", "bigquery", "materialized", "ds", "tbl", + 'SELECT * FROM bq."ds"."tbl"'], + ) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t2", "t2", "bigquery", "materialized", "analytics", "orders", + 'SELECT col1 FROM bq."analytics"."orders" WHERE col2 > 10'], + ) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["r1", "r1", "bigquery", "remote", "ds", "tbl", None], + ) + + _ensure_schema(conn) + assert get_schema_version(conn) == SCHEMA_VERSION + assert SCHEMA_VERSION >= 24 + + rows = {r[0]: r[1] for r in conn.execute( + "SELECT id, source_query FROM table_registry" + ).fetchall()} + assert rows["t1"] == "SELECT * FROM `prj-data.ds.tbl`" + assert rows["t2"] == ( + "SELECT col1 FROM `prj-data.analytics.orders` WHERE col2 > 10" + ) + assert rows["r1"] is None # remote row untouched + finally: + conn.close() + + +def test_v24_idempotent_when_already_bq_native(monkeypatch): + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"), + ) + Path(tmp, "state").mkdir(parents=True, exist_ok=True) + db_path = Path(tmp, "state", "system.duckdb") + conn = duckdb.connect(str(db_path)) + try: + _seed_v23(conn) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t1", "t1", "bigquery", "materialized", "ds", "tbl", + "SELECT * FROM `prj-data.ds.tbl`"], + ) + _ensure_schema(conn) + row = conn.execute( + "SELECT source_query FROM table_registry WHERE id='t1'" + ).fetchone() + assert row[0] == "SELECT * FROM `prj-data.ds.tbl`" + finally: + conn.close() + + +def test_v24_logs_warning_when_project_not_configured(monkeypatch, caplog): + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: kw.get("default", ""), # no project configured + ) + Path(tmp, "state").mkdir(parents=True, exist_ok=True) + db_path = Path(tmp, "state", "system.duckdb") + conn = duckdb.connect(str(db_path)) + try: + _seed_v23(conn) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t1", "t1", "bigquery", "materialized", "ds", "tbl", + 'SELECT * FROM bq."ds"."tbl"'], + ) + with caplog.at_level("WARNING"): + _ensure_schema(conn) + row = conn.execute( + "SELECT source_query FROM table_registry WHERE id='t1'" + ).fetchone() + assert row[0] == 'SELECT * FROM bq."ds"."tbl"' + assert any( + "v24" in r.message.lower() or "project" in r.message.lower() + for r in caplog.records + ) + finally: + conn.close() From ce108d4c6d11763d3fcc56e8e3c2ac14992c27e5 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 19:32:24 +0200 Subject: [PATCH 11/13] fix(schema): code-review follow-ups for fac10b29 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _v23_to_v24_finalize: wrap row-update loop in BEGIN/COMMIT/ROLLBACK to match the project's transactional-finalizer pattern (compare _v12_to_v13_finalize, _v17_to_v18_finalize, _v18_to_v19_finalize). Pre-fix a process crash mid-loop left the schema_version unchanged but partially-converted rows persisted across restart — idempotent overall but inconsistent with project convention. - _v23_to_v24_finalize: re.sub replacement now uses a function-form (lambda) instead of an f-string, so any future project_id with a backslash sequence isn't misinterpreted as a group reference. - tests: add a Keboola-source materialized row case asserting the SELECT's source_type filter prevents non-BQ rewrites. --- src/db.py | 61 +++++++++++++------ tests/test_schema_v24_source_query_rewrite.py | 33 ++++++++++ 2 files changed, 74 insertions(+), 20 deletions(-) diff --git a/src/db.py b/src/db.py index f4afe12..550a4c7 100644 --- a/src/db.py +++ b/src/db.py @@ -1696,6 +1696,18 @@ _V22_TO_V23_MIGRATIONS = [ # migration time, log a warning per affected row and leave them — the # operator must configure data_source.bigquery.project, restart, and # the migration will fire on next start (idempotent). +def _replace_for_v24(project_id: str): + """Build a re.sub replacement function (not a string) so backslash + sequences in `project_id` aren't interpreted as group references. + GCP project IDs can't actually contain backslashes, but using a + function-form replacement is the defensive idiom — it makes the + intent explicit and removes the dependency on re.sub's replacement- + string escaping rules.""" + def _repl(m): + return f"`{project_id}.{m.group(1)}.{m.group(2)}`" + return _repl + + def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None: import re as _re @@ -1714,26 +1726,35 @@ def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None: "AND source_type = 'bigquery'" ).fetchall() - for row_id, sq in rows: - if sq is None: - continue - if not project_id: - logger.warning( - "v24 migration: skipping rewrite of source_query for row %r — " - "data_source.bigquery.project is not configured. Set it via " - "/admin/server-config and restart the app to retry the " - "migration.", row_id, - ) - continue - new_sq = pattern.sub(rf'`{project_id}.\1.\2`', sq) - if new_sq != sq: - conn.execute( - "UPDATE table_registry SET source_query = ? WHERE id = ?", - [new_sq, row_id], - ) - logger.info( - "v24 migration: rewrote source_query for row %r", row_id, - ) + if not rows: + return # Nothing to migrate; skip the transaction. + + conn.execute("BEGIN TRANSACTION") + try: + for row_id, sq in rows: + if sq is None: + continue + if not project_id: + logger.warning( + "v24 migration: skipping rewrite of source_query for row %r — " + "data_source.bigquery.project is not configured. Set it via " + "/admin/server-config and restart the app to retry the " + "migration.", row_id, + ) + continue + new_sq = pattern.sub(_replace_for_v24(project_id), sq) + if new_sq != sq: + conn.execute( + "UPDATE table_registry SET source_query = ? WHERE id = ?", + [new_sq, row_id], + ) + logger.info( + "v24 migration: rewrote source_query for row %r", row_id, + ) + conn.execute("COMMIT") + except Exception: + conn.execute("ROLLBACK") + raise def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: diff --git a/tests/test_schema_v24_source_query_rewrite.py b/tests/test_schema_v24_source_query_rewrite.py index 9a56130..2f0f947 100644 --- a/tests/test_schema_v24_source_query_rewrite.py +++ b/tests/test_schema_v24_source_query_rewrite.py @@ -124,3 +124,36 @@ def test_v24_logs_warning_when_project_not_configured(monkeypatch, caplog): ) finally: conn.close() + + +def test_v24_keboola_materialized_row_not_rewritten(monkeypatch): + """Materialized rows with source_type != 'bigquery' must not be touched + by v24. Keboola materialized has no notion of bq."ds"."tbl" syntax; + the SELECT's source_type filter pins this contract. + """ + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"), + ) + Path(tmp, "state").mkdir(parents=True, exist_ok=True) + db_path = Path(tmp, "state", "system.duckdb") + conn = duckdb.connect(str(db_path)) + try: + _seed_v23(conn) + # Keboola row that happens to contain `bq."..."` in its SQL + # (admin error or copy-paste from a BQ row). Migration must + # leave it alone — this is not the v24 contract. + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["kb1", "kb1", "keboola", "materialized", "ds", "tbl", + 'SELECT * FROM bq."ds"."tbl"'], + ) + _ensure_schema(conn) + row = conn.execute( + "SELECT source_query FROM table_registry WHERE id='kb1'" + ).fetchone() + assert row[0] == 'SELECT * FROM bq."ds"."tbl"' + finally: + conn.close() From cd3293b994191c91f83a265dfceca45fe1be4d91 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 20:30:50 +0200 Subject: [PATCH 12/13] =?UTF-8?q?release:=200.33.0=20=E2=80=94=20BQ=20mate?= =?UTF-8?q?rialize=20view=20fix=20+=20concurrency=20control?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 2 +- 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b492d44..187f0ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,89 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.33.0] — 2026-05-04 + +Closes #162. Headline fix: `query_mode='materialized'` BigQuery rows now +materialize correctly for views and materialized views, with per-table +concurrency control preventing parquet corruption on overlapping scheduler +ticks. Plus a source_query server-generation convenience, a +`materialize.lock_ttl_seconds` config knob, and a schema v24 migration that +converts existing DuckDB-flavor source_query values to BQ-native SQL. + +### Fixed + +- BigQuery materialize now works for views and materialized views. Pre-fix, + `materialize_query` ran admin's `source_query` as `COPY (sql) TO parquet` + through the DuckDB BigQuery extension session, which routed through the BQ + Storage Read API for `bq."".""` references. Storage Read API + rejects non-base entities (`Binder Error: Error while creating read session: + ... non-table entities cannot be read with the storage API`). Fixed by + always wrapping admin SQL into `bigquery_query('', + '')` so COPY uses the BQ jobs API uniformly for tables, views, + and materialized views. +- `materialize_query` no longer corrupts its parquet under concurrent + invocations for the same `table_id`. Pre-fix, two overlapping + `_run_materialized_pass` calls (e.g. a long-running COPY + the next + scheduler tick) both hit the unconditional `if tmp_path.exists(): + tmp_path.unlink()` at function entry and started parallel COPYs against the + same path, interleaving bytes and producing a parquet file with no valid + footer. Now each call acquires a per-table_id `threading.Lock` plus an + advisory `fcntl.flock` on `.parquet.lock`; the second caller raises + `MaterializeInFlightError` and the scheduler treats it as + `skipped, in_flight` — never as an error. +- Cost guardrail dry-run now engages for materialized rows. Pre-fix, the + BigQuery Python client returned 400 (`Table-valued function not found: + bigquery_query`) on the wrapped SQL and the dry-run silently fail-opened. + The dry-run now operates on the inner BQ-native SQL (admin's `source_query` + directly), which the client parses cleanly. + +### Changed + +- **BREAKING** `query_mode='materialized'` rows MUST register `source_query` + as BigQuery-native SQL (backticks for dashed identifiers, native + joins/CTEs). DuckDB-flavor (`bq."".""`) is no longer accepted on + register/PUT. The schema v24 migration converts existing rows automatically; + operators with custom-written `source_query` should review the migrated form + on first deploy. The validator's prior backtick-rejection rule is now scoped + to `query_mode IN ('remote', 'local')` only. +- `_run_materialized_pass` summary `skipped` field changes from `list[str]` + to `list[dict]` with shape + `{"table": str, "reason": Literal["due_check", "in_flight"]}`. Downstream + consumers that asserted the old string form must update. + +### Added + +- `POST /api/admin/register-table` for `query_mode='materialized'` rows with + `bucket`+`source_table` but no `source_query` now server-generates + `` SELECT * FROM `..` `` from the configured + BigQuery project. The same fallback fires on `PUT /api/admin/registry/{id}` + when flipping to materialized. Operators only need to know + `bigquery_query()` semantics for non-trivial queries. +- New top-level `materialize` config section in `instance.yaml`. Single field + — `materialize.lock_ttl_seconds` (default `86400`, 24 h) — controls how + long a stale `.parquet.lock` file lives before a sibling materialize + attempt reclaims it. Editable via `/admin/server-config` API and UI. + +### Internal + +- Schema v24 migration: rewrites `table_registry.source_query` for + materialized BigQuery rows from DuckDB-flavor (`bq."".""`) to + BQ-native (`` `..` ``) using the configured BQ project. + Idempotent on already-converted rows; logs a warning and skips when the + project isn't configured (operator can configure + restart for retry). + Wrapped in `BEGIN TRANSACTION` / `COMMIT` to match the project's + transactional-finalizer pattern. +- `connectors/bigquery/extractor.py` exports `MaterializeInFlightError` and + the `_get_table_lock` / `_get_lock_ttl_seconds` / + `_wrap_admin_sql_for_jobs_api` / `_escape_sql_string_literal` helpers as + test seams. Underscore-prefixed; not part of the public API. +- `tests/conftest.py` lifts `bq_instance` and `stub_bq_extractor` fixtures + from `tests/test_api_admin_materialized.py` so subsequent test modules in + this PR can resolve them via pytest's auto-discovery. +- `app/api/sync.py:is_table_due` hoisted to module-level import (was deferred + inside `_run_materialized_pass`) so monkeypatching `app.api.sync.is_table_due` + actually intercepts the call — the deferred form made test patches a no-op. + ## [0.32.0] — 2026-05-04 Closes #160. Headline fix: `da query --remote` now resolves diff --git a/pyproject.toml b/pyproject.toml index 9988dc4..7024051 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.32.0" +version = "0.33.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" From e6a2c4c51dda58cbf461912cf85646e7464140cb Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 20:38:47 +0200 Subject: [PATCH 13/13] tests: rename 'prj-grp' placeholder to 'my-project' for vendor-agnostic OSS The dashed identifier is what the test exercises (backticks required for dashed BQ project IDs); the literal string can be any synthetic value. 'prj-grp' is too close to a real customer-prefix pattern that the OSS vendor-scrub regex flags. 'my-project' matches placeholders used elsewhere in the project. --- .../test_admin_validator_backtick_relaxed_for_materialized.py | 4 ++-- tests/test_api_admin_materialized.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_admin_validator_backtick_relaxed_for_materialized.py b/tests/test_admin_validator_backtick_relaxed_for_materialized.py index e0c7799..007c3bc 100644 --- a/tests/test_admin_validator_backtick_relaxed_for_materialized.py +++ b/tests/test_admin_validator_backtick_relaxed_for_materialized.py @@ -13,9 +13,9 @@ def test_materialized_accepts_backticks(): name="b1", source_type="bigquery", query_mode="materialized", - source_query="SELECT * FROM `prj-grp.ds.tbl`", + source_query="SELECT * FROM `my-project.ds.tbl`", ) - assert req.source_query == "SELECT * FROM `prj-grp.ds.tbl`" + assert req.source_query == "SELECT * FROM `my-project.ds.tbl`" def test_remote_rejects_backticks(): diff --git a/tests/test_api_admin_materialized.py b/tests/test_api_admin_materialized.py index d311d60..6df2ec9 100644 --- a/tests/test_api_admin_materialized.py +++ b/tests/test_api_admin_materialized.py @@ -303,14 +303,14 @@ def test_register_materialized_accepts_backtick_source_query(seeded_app, bq_inst "/api/admin/register-table", json=_materialized_payload( name="bt_native", - source_query="SELECT * FROM `prj-grp.ds.product_inventory`", + source_query="SELECT * FROM `my-project.ds.product_inventory`", ), headers=_auth(token), ) assert r.status_code in (200, 201, 202), r.json() reg = c.get("/api/admin/registry", headers=_auth(token)).json() row = next(t for t in reg["tables"] if t["id"] == "bt_native") - assert row["source_query"] == "SELECT * FROM `prj-grp.ds.product_inventory`" + assert row["source_query"] == "SELECT * FROM `my-project.ds.product_inventory`" def test_update_materialized_accepts_backtick_source_query(