release: 0.40.0 — materialize_query writes _meta + inner view so master views appear
Pre-fix flow: 1. extractor subprocess writes _meta with N remote rows + creates N inner views in extract.duckdb (rebuild_from_registry skips materialized rows per design — explicit `continue` at line 389) 2. _run_materialized_pass calls materialize_query, which writes parquet atomically + returns stats — but never updates _meta 3. orchestrator.rebuild scans _meta, finds only the N remote rows, creates master views only for them. Materialized parquet is on disk but invisible to /api/query → 400 'not yet materialized' Symptom appears after every container recreate (the previous run's _meta state is wiped because docker compose down nukes the named volume that backs extract.duckdb on some compose layouts; even on volumes that persist, the next extractor pass calls _create_meta_table which DROPs + CREATEs _meta cleanly). Fix: after os.replace(tmp_path, parquet_path) in materialize_query, open extract.duckdb (read-write), DELETE existing _meta row for table_id, INSERT new one with query_mode='materialized', and CREATE OR REPLACE VIEW <table_id> AS SELECT * FROM read_parquet(<path>). All inside a single transaction so concurrent reads see either old or new state, not torn rows. Fail-soft on lock contention or schema drift — parquet remains canonical, next sync pass recovers. Tests: 3 new in test_bq_materialize.py covering: - meta + inner view registered after materialize, alongside existing remote rows - re-run replaces (not duplicates) the meta row - skips inner-view registration when extract.duckdb doesn't exist yet (fresh BQ-only deployment edge case)
This commit is contained in:
parent
6de7084c9f
commit
b5b16e98a0
4 changed files with 280 additions and 2 deletions
26
CHANGELOG.md
26
CHANGELOG.md
|
|
@ -10,7 +10,31 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.39.0] — 2026-05-06
|
||||
## [0.40.0] — 2026-05-06
|
||||
|
||||
### Fixed
|
||||
- **Materialized BigQuery parquets now register themselves in
|
||||
`extract.duckdb` so the master view actually appears**
|
||||
(`connectors/bigquery/extractor.py:materialize_query`). Pre-fix the
|
||||
function wrote the `<id>.parquet` to disk and returned the row count,
|
||||
but **never** wrote a `_meta` row or an inner view in the connector's
|
||||
`extract.duckdb`. The orchestrator's `rebuild()` scans `_meta` to
|
||||
decide which master views to create, so materialized tables remained
|
||||
invisible: `agnes query "SELECT … FROM <id>"` returned HTTP 400
|
||||
*"registered as query_mode='materialized' but is not yet materialized
|
||||
in this instance's analytics views"* even though the parquet was
|
||||
sitting there. Symptom appeared after every container recreate (image
|
||||
upgrade) and after every `_create_meta_table` cycle in the extractor
|
||||
subprocess (which `DROP TABLE IF EXISTS _meta` + `CREATE TABLE`
|
||||
cleanly each pass — wiping any prior materialized rows). Fix: after
|
||||
the atomic `os.replace(tmp_path, parquet_path)`, open
|
||||
`extract.duckdb` and `DELETE FROM _meta WHERE table_name = ? + INSERT
|
||||
+ CREATE OR REPLACE VIEW <id> AS SELECT * FROM read_parquet('<path>')`
|
||||
inside a single transaction. Idempotent, fail-soft (parquet remains
|
||||
canonical, the next sync pass recovers any registration drift).
|
||||
When `extract.duckdb` doesn't exist yet (fresh BQ-only deployment),
|
||||
the fix logs and continues — the next extractor pass creates the
|
||||
file and the master view appears on the rebuild after that.
|
||||
|
||||
### Performance
|
||||
- **`/api/query` (and `agnes query --remote`) now rewrites user SQL referencing
|
||||
|
|
|
|||
|
|
@ -269,6 +269,103 @@ def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None:
|
|||
)""")
|
||||
|
||||
|
||||
def _ensure_meta_table(conn: duckdb.DuckDBPyConnection) -> None:
|
||||
"""Idempotent variant of `_create_meta_table` — creates the table if
|
||||
missing, leaves existing rows untouched. Used by `materialize_query`
|
||||
to register the materialized parquet without wiping the
|
||||
extractor-subprocess-written remote rows that share the same
|
||||
extract.duckdb."""
|
||||
conn.execute("""CREATE TABLE IF NOT EXISTS _meta (
|
||||
table_name VARCHAR NOT NULL,
|
||||
description VARCHAR,
|
||||
rows BIGINT,
|
||||
size_bytes BIGINT,
|
||||
extracted_at TIMESTAMP,
|
||||
query_mode VARCHAR DEFAULT 'remote'
|
||||
)""")
|
||||
|
||||
|
||||
def _persist_materialized_inner_view(
|
||||
extract_db_path: Path,
|
||||
table_id: str,
|
||||
parquet_path: Path,
|
||||
rows: int,
|
||||
size_bytes: int,
|
||||
) -> None:
|
||||
"""Write the materialized parquet's inner view + ``_meta`` row into
|
||||
``extract.duckdb`` so the orchestrator's master-view rebuild picks it
|
||||
up uniformly with remote-mode rows. Without this, ``materialize_query``
|
||||
leaves the parquet on disk but no record of it in ``_meta``, and the
|
||||
orchestrator's ``rebuild()`` scan never creates the master view —
|
||||
``agnes query`` then 400s with "registered as query_mode='materialized'
|
||||
but is not yet materialized" even though the parquet exists.
|
||||
|
||||
Idempotent: existing ``_meta`` row for the same ``table_name`` is
|
||||
replaced, existing inner view is recreated. Fail-soft — the parquet
|
||||
is the canonical artifact; if extract.duckdb registration fails (lock
|
||||
contention, missing file, schema drift), log and continue. The
|
||||
caller's ``rebuild_from_registry`` rebuild will get a chance to fix
|
||||
it next pass.
|
||||
"""
|
||||
if not extract_db_path.exists():
|
||||
# Fresh BQ-only deployment hasn't run the extractor subprocess
|
||||
# yet, so extract.duckdb doesn't exist. Nothing to update — the
|
||||
# next extractor pass + materialize cycle will populate it.
|
||||
logger.info(
|
||||
"materialize: extract.duckdb at %s does not exist yet; "
|
||||
"skipping inner-view registration. Next extractor pass will "
|
||||
"create it and the master view will appear on the rebuild "
|
||||
"after that.",
|
||||
extract_db_path,
|
||||
)
|
||||
return
|
||||
|
||||
safe_table = _escape_sql_string_literal(table_id)
|
||||
safe_path = _escape_sql_string_literal(str(parquet_path))
|
||||
try:
|
||||
with duckdb.connect(str(extract_db_path), read_only=False) as ext_conn:
|
||||
_ensure_meta_table(ext_conn)
|
||||
# `_meta` has no UNIQUE on table_name (legacy schema), so we
|
||||
# do a manual delete-then-insert. Wrap in a transaction so
|
||||
# concurrent reads of `_meta` either see the old row or the
|
||||
# new one, never both / neither.
|
||||
ext_conn.execute("BEGIN")
|
||||
try:
|
||||
ext_conn.execute(
|
||||
"DELETE FROM _meta WHERE table_name = ?", [table_id]
|
||||
)
|
||||
ext_conn.execute(
|
||||
"INSERT INTO _meta VALUES (?, '', ?, ?, CURRENT_TIMESTAMP, 'materialized')",
|
||||
[table_id, rows, size_bytes],
|
||||
)
|
||||
# Inner view backing the master view. Orchestrator scans
|
||||
# information_schema.tables for the attached extract.duckdb
|
||||
# and only creates a master view when an inner object
|
||||
# exists by the same name. read_parquet() is hot per-call,
|
||||
# so the master view path goes through the same disk.
|
||||
ext_conn.execute(
|
||||
f"CREATE OR REPLACE VIEW \"{table_id}\" AS "
|
||||
f"SELECT * FROM read_parquet('{safe_path}')"
|
||||
)
|
||||
ext_conn.execute("COMMIT")
|
||||
except Exception:
|
||||
try:
|
||||
ext_conn.execute("ROLLBACK")
|
||||
except Exception:
|
||||
pass
|
||||
raise
|
||||
except Exception as e:
|
||||
# Fail-soft: parquet is on disk, registry stays consistent, the
|
||||
# next extractor + orchestrator pass will recover. Loud log so
|
||||
# operators can spot persistent breakage.
|
||||
logger.warning(
|
||||
"materialize: failed to register %s in extract.duckdb (%s) — "
|
||||
"parquet at %s is fine, master view will appear after the "
|
||||
"next sync cycle. Error: %s",
|
||||
table_id, extract_db_path, parquet_path, e,
|
||||
)
|
||||
|
||||
|
||||
def _create_remote_attach_table(
|
||||
conn: duckdb.DuckDBPyConnection, project_id: str
|
||||
) -> None:
|
||||
|
|
@ -667,6 +764,23 @@ def materialize_query(
|
|||
os.replace(tmp_path, parquet_path)
|
||||
|
||||
rows = int(rows)
|
||||
|
||||
# Register the parquet in extract.duckdb so the orchestrator's
|
||||
# master-view rebuild can pick it up uniformly with remote-mode
|
||||
# rows. Without this, the parquet sits on disk but the master
|
||||
# view is never created — `agnes query "SELECT … FROM <id>"`
|
||||
# 400s with "not yet materialized in this instance's analytics
|
||||
# views". Fail-soft — the parquet is canonical, the next
|
||||
# extractor + orchestrator pass will recover any registration
|
||||
# drift.
|
||||
_persist_materialized_inner_view(
|
||||
extract_db_path=out_path / "extract.duckdb",
|
||||
table_id=table_id,
|
||||
parquet_path=parquet_path,
|
||||
rows=rows,
|
||||
size_bytes=size_bytes,
|
||||
)
|
||||
|
||||
if rows == 0:
|
||||
# 0 rows is indistinguishable from "the SQL is wrong and nobody
|
||||
# noticed" — surface it loudly so operators see it in the scheduler
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[project]
|
||||
name = "agnes-the-ai-analyst"
|
||||
version = "0.39.0"
|
||||
version = "0.40.0"
|
||||
description = "Agnes — AI Data Analyst platform for AI analytical systems"
|
||||
requires-python = ">=3.11,<3.14"
|
||||
license = "MIT"
|
||||
|
|
|
|||
|
|
@ -148,3 +148,143 @@ def test_materialize_overwrites_existing_parquet(tmp_path):
|
|||
f"SELECT n FROM read_parquet('{out}/data/t1.parquet')"
|
||||
).fetchall()
|
||||
assert rows == [(2,)]
|
||||
|
||||
|
||||
def test_materialize_persists_meta_and_inner_view_in_extract_db(tmp_path):
|
||||
"""0.40.0 fix: after materialize_query writes the parquet, it must also
|
||||
register the table in extract.duckdb (`_meta` row + inner view) so the
|
||||
orchestrator's master-view rebuild picks it up uniformly with remote-mode
|
||||
rows. Without this, the parquet sits on disk but the master view never
|
||||
materializes — `agnes query` 400s with "not yet materialized".
|
||||
"""
|
||||
out = tmp_path / "extracts" / "bigquery"
|
||||
out.mkdir(parents=True)
|
||||
|
||||
# Pre-create extract.duckdb (as the extractor subprocess would have done
|
||||
# on this connector's first pass) with the canonical _meta table + a
|
||||
# remote-mode row. We must verify the materialize call adds its row
|
||||
# without wiping the existing remote rows.
|
||||
extract_db = out / "extract.duckdb"
|
||||
with duckdb.connect(str(extract_db)) as ext:
|
||||
ext.execute("""CREATE TABLE _meta (
|
||||
table_name VARCHAR NOT NULL,
|
||||
description VARCHAR,
|
||||
rows BIGINT,
|
||||
size_bytes BIGINT,
|
||||
extracted_at TIMESTAMP,
|
||||
query_mode VARCHAR DEFAULT 'remote'
|
||||
)""")
|
||||
ext.execute(
|
||||
"INSERT INTO _meta VALUES ('s1_session_landings', '', 0, 0, "
|
||||
"CURRENT_TIMESTAMP, 'remote')"
|
||||
)
|
||||
|
||||
bq = _make_stub_bq({
|
||||
"bq.test.orders": (
|
||||
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
|
||||
"SELECT 'US' AS region, 250 AS revenue"
|
||||
)
|
||||
})
|
||||
|
||||
materialize_query(
|
||||
table_id="orders_summary",
|
||||
sql="SELECT region, SUM(revenue) AS revenue FROM bq.test.orders GROUP BY 1",
|
||||
bq=bq,
|
||||
output_dir=str(out),
|
||||
)
|
||||
|
||||
# Parquet exists.
|
||||
parquet_path = out / "data" / "orders_summary.parquet"
|
||||
assert parquet_path.exists()
|
||||
|
||||
# _meta has BOTH the legacy remote row AND the new materialized row.
|
||||
with duckdb.connect(str(extract_db), read_only=True) as ext:
|
||||
rows = ext.execute(
|
||||
"SELECT table_name, query_mode, rows FROM _meta ORDER BY table_name"
|
||||
).fetchall()
|
||||
assert ("orders_summary", "materialized", 2) in [
|
||||
(r[0], r[1], r[2]) for r in rows
|
||||
]
|
||||
assert ("s1_session_landings", "remote", 0) in [
|
||||
(r[0], r[1], r[2]) for r in rows
|
||||
]
|
||||
# Inner view backing the master view exists, points at the parquet.
|
||||
view_rows = ext.execute(
|
||||
"SELECT * FROM \"orders_summary\" ORDER BY region"
|
||||
).fetchall()
|
||||
assert view_rows == [("EU", 100), ("US", 250)]
|
||||
|
||||
|
||||
def test_materialize_replaces_meta_row_on_re_run(tmp_path):
|
||||
"""A second materialize for the same table_id must REPLACE the existing
|
||||
`_meta` row, not duplicate it. Otherwise the orchestrator scan sees two
|
||||
rows for the same name and creates the master view twice (or worse,
|
||||
against stale row stats)."""
|
||||
out = tmp_path / "extracts" / "bigquery"
|
||||
out.mkdir(parents=True)
|
||||
# Pre-create extract.duckdb (the extractor subprocess would do this on
|
||||
# the first sync pass; we shortcut so the test exercises the
|
||||
# delete-then-insert branch on re-run, not the "no extract.duckdb yet"
|
||||
# skip branch.
|
||||
extract_db = out / "extract.duckdb"
|
||||
with duckdb.connect(str(extract_db)) as ext:
|
||||
ext.execute("""CREATE TABLE _meta (
|
||||
table_name VARCHAR NOT NULL,
|
||||
description VARCHAR,
|
||||
rows BIGINT,
|
||||
size_bytes BIGINT,
|
||||
extracted_at TIMESTAMP,
|
||||
query_mode VARCHAR DEFAULT 'remote'
|
||||
)""")
|
||||
|
||||
bq = _make_stub_bq({
|
||||
"bq.test.t1": "SELECT 'EU' AS region, 100 AS revenue",
|
||||
"bq.test.t2": (
|
||||
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
|
||||
"SELECT 'US' AS region, 250 AS revenue"
|
||||
),
|
||||
})
|
||||
|
||||
# First pass — 1 row.
|
||||
materialize_query(
|
||||
table_id="orders_summary",
|
||||
sql="SELECT region, revenue FROM bq.test.t1",
|
||||
bq=bq, output_dir=str(out),
|
||||
)
|
||||
# Second pass — different SQL, 2 rows. Must overwrite, not duplicate.
|
||||
materialize_query(
|
||||
table_id="orders_summary",
|
||||
sql="SELECT region, revenue FROM bq.test.t2",
|
||||
bq=bq, output_dir=str(out),
|
||||
)
|
||||
|
||||
extract_db = out / "extract.duckdb"
|
||||
with duckdb.connect(str(extract_db), read_only=True) as ext:
|
||||
rows = ext.execute(
|
||||
"SELECT COUNT(*), MAX(rows) FROM _meta WHERE table_name = 'orders_summary'"
|
||||
).fetchone()
|
||||
assert rows[0] == 1, "must be exactly one _meta row, not duplicated"
|
||||
assert rows[1] == 2, "row count reflects the latest run, not the first"
|
||||
|
||||
|
||||
def test_materialize_skips_inner_view_when_extract_db_missing(tmp_path):
|
||||
"""Fresh BQ-only deployment may not have run the extractor subprocess
|
||||
yet, so extract.duckdb doesn't exist. materialize_query must not crash
|
||||
on that path — it logs and continues, the next extractor pass +
|
||||
rebuild will pick up the parquet via the registered registry row."""
|
||||
out = tmp_path / "extracts" / "bigquery"
|
||||
out.mkdir(parents=True)
|
||||
# Deliberately do NOT create extract.duckdb.
|
||||
|
||||
bq = _make_stub_bq({"bq.test.t": "SELECT 1 AS n"})
|
||||
|
||||
# Should NOT raise — fail-soft.
|
||||
stats = materialize_query(
|
||||
table_id="solo_table",
|
||||
sql="SELECT n FROM bq.test.t",
|
||||
bq=bq, output_dir=str(out),
|
||||
)
|
||||
assert stats["rows"] == 1
|
||||
# Parquet is on disk, extract.duckdb still doesn't exist (no force-create).
|
||||
assert (out / "data" / "solo_table.parquet").exists()
|
||||
assert not (out / "extract.duckdb").exists()
|
||||
|
|
|
|||
Loading…
Reference in a new issue