From b5b16e98a0792bc620d6cf443e1fe2531188c062 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Wed, 6 May 2026 16:04:58 +0200 Subject: [PATCH] =?UTF-8?q?release:=200.40.0=20=E2=80=94=20materialize=5Fq?= =?UTF-8?q?uery=20writes=20=5Fmeta=20+=20inner=20view=20so=20master=20view?= =?UTF-8?q?s=20appear?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-fix flow: 1. extractor subprocess writes _meta with N remote rows + creates N inner views in extract.duckdb (rebuild_from_registry skips materialized rows per design — explicit `continue` at line 389) 2. _run_materialized_pass calls materialize_query, which writes parquet atomically + returns stats — but never updates _meta 3. orchestrator.rebuild scans _meta, finds only the N remote rows, creates master views only for them. Materialized parquet is on disk but invisible to /api/query → 400 'not yet materialized' Symptom appears after every container recreate (the previous run's _meta state is wiped because docker compose down nukes the named volume that backs extract.duckdb on some compose layouts; even on volumes that persist, the next extractor pass calls _create_meta_table which DROPs + CREATEs _meta cleanly). Fix: after os.replace(tmp_path, parquet_path) in materialize_query, open extract.duckdb (read-write), DELETE existing _meta row for table_id, INSERT new one with query_mode='materialized', and CREATE OR REPLACE VIEW AS SELECT * FROM read_parquet(). All inside a single transaction so concurrent reads see either old or new state, not torn rows. Fail-soft on lock contention or schema drift — parquet remains canonical, next sync pass recovers. Tests: 3 new in test_bq_materialize.py covering: - meta + inner view registered after materialize, alongside existing remote rows - re-run replaces (not duplicates) the meta row - skips inner-view registration when extract.duckdb doesn't exist yet (fresh BQ-only deployment edge case) --- CHANGELOG.md | 26 +++++- connectors/bigquery/extractor.py | 114 +++++++++++++++++++++++++ pyproject.toml | 2 +- tests/test_bq_materialize.py | 140 +++++++++++++++++++++++++++++++ 4 files changed, 280 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b5d694..8afc6c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,31 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] -## [0.39.0] — 2026-05-06 +## [0.40.0] — 2026-05-06 + +### Fixed +- **Materialized BigQuery parquets now register themselves in + `extract.duckdb` so the master view actually appears** + (`connectors/bigquery/extractor.py:materialize_query`). Pre-fix the + function wrote the `.parquet` to disk and returned the row count, + but **never** wrote a `_meta` row or an inner view in the connector's + `extract.duckdb`. The orchestrator's `rebuild()` scans `_meta` to + decide which master views to create, so materialized tables remained + invisible: `agnes query "SELECT … FROM "` returned HTTP 400 + *"registered as query_mode='materialized' but is not yet materialized + in this instance's analytics views"* even though the parquet was + sitting there. Symptom appeared after every container recreate (image + upgrade) and after every `_create_meta_table` cycle in the extractor + subprocess (which `DROP TABLE IF EXISTS _meta` + `CREATE TABLE` + cleanly each pass — wiping any prior materialized rows). Fix: after + the atomic `os.replace(tmp_path, parquet_path)`, open + `extract.duckdb` and `DELETE FROM _meta WHERE table_name = ? + INSERT + + CREATE OR REPLACE VIEW AS SELECT * FROM read_parquet('')` + inside a single transaction. Idempotent, fail-soft (parquet remains + canonical, the next sync pass recovers any registration drift). + When `extract.duckdb` doesn't exist yet (fresh BQ-only deployment), + the fix logs and continues — the next extractor pass creates the + file and the master view appears on the rebuild after that. ### Performance - **`/api/query` (and `agnes query --remote`) now rewrites user SQL referencing diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index d90005d..0baafda 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -269,6 +269,103 @@ def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None: )""") +def _ensure_meta_table(conn: duckdb.DuckDBPyConnection) -> None: + """Idempotent variant of `_create_meta_table` — creates the table if + missing, leaves existing rows untouched. Used by `materialize_query` + to register the materialized parquet without wiping the + extractor-subprocess-written remote rows that share the same + extract.duckdb.""" + conn.execute("""CREATE TABLE IF NOT EXISTS _meta ( + table_name VARCHAR NOT NULL, + description VARCHAR, + rows BIGINT, + size_bytes BIGINT, + extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'remote' + )""") + + +def _persist_materialized_inner_view( + extract_db_path: Path, + table_id: str, + parquet_path: Path, + rows: int, + size_bytes: int, +) -> None: + """Write the materialized parquet's inner view + ``_meta`` row into + ``extract.duckdb`` so the orchestrator's master-view rebuild picks it + up uniformly with remote-mode rows. Without this, ``materialize_query`` + leaves the parquet on disk but no record of it in ``_meta``, and the + orchestrator's ``rebuild()`` scan never creates the master view — + ``agnes query`` then 400s with "registered as query_mode='materialized' + but is not yet materialized" even though the parquet exists. + + Idempotent: existing ``_meta`` row for the same ``table_name`` is + replaced, existing inner view is recreated. Fail-soft — the parquet + is the canonical artifact; if extract.duckdb registration fails (lock + contention, missing file, schema drift), log and continue. The + caller's ``rebuild_from_registry`` rebuild will get a chance to fix + it next pass. + """ + if not extract_db_path.exists(): + # Fresh BQ-only deployment hasn't run the extractor subprocess + # yet, so extract.duckdb doesn't exist. Nothing to update — the + # next extractor pass + materialize cycle will populate it. + logger.info( + "materialize: extract.duckdb at %s does not exist yet; " + "skipping inner-view registration. Next extractor pass will " + "create it and the master view will appear on the rebuild " + "after that.", + extract_db_path, + ) + return + + safe_table = _escape_sql_string_literal(table_id) + safe_path = _escape_sql_string_literal(str(parquet_path)) + try: + with duckdb.connect(str(extract_db_path), read_only=False) as ext_conn: + _ensure_meta_table(ext_conn) + # `_meta` has no UNIQUE on table_name (legacy schema), so we + # do a manual delete-then-insert. Wrap in a transaction so + # concurrent reads of `_meta` either see the old row or the + # new one, never both / neither. + ext_conn.execute("BEGIN") + try: + ext_conn.execute( + "DELETE FROM _meta WHERE table_name = ?", [table_id] + ) + ext_conn.execute( + "INSERT INTO _meta VALUES (?, '', ?, ?, CURRENT_TIMESTAMP, 'materialized')", + [table_id, rows, size_bytes], + ) + # Inner view backing the master view. Orchestrator scans + # information_schema.tables for the attached extract.duckdb + # and only creates a master view when an inner object + # exists by the same name. read_parquet() is hot per-call, + # so the master view path goes through the same disk. + ext_conn.execute( + f"CREATE OR REPLACE VIEW \"{table_id}\" AS " + f"SELECT * FROM read_parquet('{safe_path}')" + ) + ext_conn.execute("COMMIT") + except Exception: + try: + ext_conn.execute("ROLLBACK") + except Exception: + pass + raise + except Exception as e: + # Fail-soft: parquet is on disk, registry stays consistent, the + # next extractor + orchestrator pass will recover. Loud log so + # operators can spot persistent breakage. + logger.warning( + "materialize: failed to register %s in extract.duckdb (%s) — " + "parquet at %s is fine, master view will appear after the " + "next sync cycle. Error: %s", + table_id, extract_db_path, parquet_path, e, + ) + + def _create_remote_attach_table( conn: duckdb.DuckDBPyConnection, project_id: str ) -> None: @@ -667,6 +764,23 @@ def materialize_query( os.replace(tmp_path, parquet_path) rows = int(rows) + + # Register the parquet in extract.duckdb so the orchestrator's + # master-view rebuild can pick it up uniformly with remote-mode + # rows. Without this, the parquet sits on disk but the master + # view is never created — `agnes query "SELECT … FROM "` + # 400s with "not yet materialized in this instance's analytics + # views". Fail-soft — the parquet is canonical, the next + # extractor + orchestrator pass will recover any registration + # drift. + _persist_materialized_inner_view( + extract_db_path=out_path / "extract.duckdb", + table_id=table_id, + parquet_path=parquet_path, + rows=rows, + size_bytes=size_bytes, + ) + if rows == 0: # 0 rows is indistinguishable from "the SQL is wrong and nobody # noticed" — surface it loudly so operators see it in the scheduler diff --git a/pyproject.toml b/pyproject.toml index 0093060..aafb4bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.39.0" +version = "0.40.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/tests/test_bq_materialize.py b/tests/test_bq_materialize.py index 1684278..bea73b2 100644 --- a/tests/test_bq_materialize.py +++ b/tests/test_bq_materialize.py @@ -148,3 +148,143 @@ def test_materialize_overwrites_existing_parquet(tmp_path): f"SELECT n FROM read_parquet('{out}/data/t1.parquet')" ).fetchall() assert rows == [(2,)] + + +def test_materialize_persists_meta_and_inner_view_in_extract_db(tmp_path): + """0.40.0 fix: after materialize_query writes the parquet, it must also + register the table in extract.duckdb (`_meta` row + inner view) so the + orchestrator's master-view rebuild picks it up uniformly with remote-mode + rows. Without this, the parquet sits on disk but the master view never + materializes — `agnes query` 400s with "not yet materialized". + """ + out = tmp_path / "extracts" / "bigquery" + out.mkdir(parents=True) + + # Pre-create extract.duckdb (as the extractor subprocess would have done + # on this connector's first pass) with the canonical _meta table + a + # remote-mode row. We must verify the materialize call adds its row + # without wiping the existing remote rows. + extract_db = out / "extract.duckdb" + with duckdb.connect(str(extract_db)) as ext: + ext.execute("""CREATE TABLE _meta ( + table_name VARCHAR NOT NULL, + description VARCHAR, + rows BIGINT, + size_bytes BIGINT, + extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'remote' + )""") + ext.execute( + "INSERT INTO _meta VALUES ('s1_session_landings', '', 0, 0, " + "CURRENT_TIMESTAMP, 'remote')" + ) + + bq = _make_stub_bq({ + "bq.test.orders": ( + "SELECT 'EU' AS region, 100 AS revenue UNION ALL " + "SELECT 'US' AS region, 250 AS revenue" + ) + }) + + materialize_query( + table_id="orders_summary", + sql="SELECT region, SUM(revenue) AS revenue FROM bq.test.orders GROUP BY 1", + bq=bq, + output_dir=str(out), + ) + + # Parquet exists. + parquet_path = out / "data" / "orders_summary.parquet" + assert parquet_path.exists() + + # _meta has BOTH the legacy remote row AND the new materialized row. + with duckdb.connect(str(extract_db), read_only=True) as ext: + rows = ext.execute( + "SELECT table_name, query_mode, rows FROM _meta ORDER BY table_name" + ).fetchall() + assert ("orders_summary", "materialized", 2) in [ + (r[0], r[1], r[2]) for r in rows + ] + assert ("s1_session_landings", "remote", 0) in [ + (r[0], r[1], r[2]) for r in rows + ] + # Inner view backing the master view exists, points at the parquet. + view_rows = ext.execute( + "SELECT * FROM \"orders_summary\" ORDER BY region" + ).fetchall() + assert view_rows == [("EU", 100), ("US", 250)] + + +def test_materialize_replaces_meta_row_on_re_run(tmp_path): + """A second materialize for the same table_id must REPLACE the existing + `_meta` row, not duplicate it. Otherwise the orchestrator scan sees two + rows for the same name and creates the master view twice (or worse, + against stale row stats).""" + out = tmp_path / "extracts" / "bigquery" + out.mkdir(parents=True) + # Pre-create extract.duckdb (the extractor subprocess would do this on + # the first sync pass; we shortcut so the test exercises the + # delete-then-insert branch on re-run, not the "no extract.duckdb yet" + # skip branch. + extract_db = out / "extract.duckdb" + with duckdb.connect(str(extract_db)) as ext: + ext.execute("""CREATE TABLE _meta ( + table_name VARCHAR NOT NULL, + description VARCHAR, + rows BIGINT, + size_bytes BIGINT, + extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'remote' + )""") + + bq = _make_stub_bq({ + "bq.test.t1": "SELECT 'EU' AS region, 100 AS revenue", + "bq.test.t2": ( + "SELECT 'EU' AS region, 100 AS revenue UNION ALL " + "SELECT 'US' AS region, 250 AS revenue" + ), + }) + + # First pass — 1 row. + materialize_query( + table_id="orders_summary", + sql="SELECT region, revenue FROM bq.test.t1", + bq=bq, output_dir=str(out), + ) + # Second pass — different SQL, 2 rows. Must overwrite, not duplicate. + materialize_query( + table_id="orders_summary", + sql="SELECT region, revenue FROM bq.test.t2", + bq=bq, output_dir=str(out), + ) + + extract_db = out / "extract.duckdb" + with duckdb.connect(str(extract_db), read_only=True) as ext: + rows = ext.execute( + "SELECT COUNT(*), MAX(rows) FROM _meta WHERE table_name = 'orders_summary'" + ).fetchone() + assert rows[0] == 1, "must be exactly one _meta row, not duplicated" + assert rows[1] == 2, "row count reflects the latest run, not the first" + + +def test_materialize_skips_inner_view_when_extract_db_missing(tmp_path): + """Fresh BQ-only deployment may not have run the extractor subprocess + yet, so extract.duckdb doesn't exist. materialize_query must not crash + on that path — it logs and continues, the next extractor pass + + rebuild will pick up the parquet via the registered registry row.""" + out = tmp_path / "extracts" / "bigquery" + out.mkdir(parents=True) + # Deliberately do NOT create extract.duckdb. + + bq = _make_stub_bq({"bq.test.t": "SELECT 1 AS n"}) + + # Should NOT raise — fail-soft. + stats = materialize_query( + table_id="solo_table", + sql="SELECT n FROM bq.test.t", + bq=bq, output_dir=str(out), + ) + assert stats["rows"] == 1 + # Parquet is on disk, extract.duckdb still doesn't exist (no force-create). + assert (out / "data" / "solo_table.parquet").exists() + assert not (out / "extract.duckdb").exists()