Merge pull request #202 from keboola/zs/perf-followup-0.41.0

fix(0.41.0): orchestrator filesystem fallback for materialized parquets
This commit is contained in:
ZdenekSrotyr 2026-05-06 17:16:38 +02:00 committed by GitHub
commit 1b49de1568
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 391 additions and 1 deletions

View file

@ -10,6 +10,35 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [Unreleased] ## [Unreleased]
## [0.41.0] — 2026-05-06
### Fixed
- **Orchestrator filesystem fallback for materialized parquets that
couldn't register in `extract.duckdb`'s `_meta`**
(`src/orchestrator.py:_attach_and_create_views`). The 0.40.0 fix in
`materialize_query` opens `extract.duckdb` from a fresh DuckDB handle
to write the `_meta` row + inner view; in production the same uvicorn
process already holds `extract.duckdb` ATTACHed read-only as the
source-name alias under the orchestrator's analytics connection, and
DuckDB's single-process file-handle uniqueness rejects the second
open with `Binder Error: Unique file handle conflict: Cannot attach
"extract" — already attached by database "<source>"`. The 0.40.0
helper logs WARNING and falls through; parquet stays canonical, but
the master view never appears via the meta path.
This release adds a second pass at the end of
`_attach_and_create_views`: scan `<extract_dir>/data/*.parquet` and
create a master view via `read_parquet('<path>')` for any parquet
whose `<id>` is not already in the per-source `tables` list (i.e. the
meta path didn't pick it up). Decoupled from `materialize_query`'s
open-handle race; robust against any registration drift between
materialize and rebuild. Honors the same `view_ownership` / cross-
connector collision rules as the meta path (first-come-first-served
via `view_repo.claim`). Tests cover: fallback fires when meta row is
missing; fallback skips when meta path already created the view (no
shadow); invalid identifier in parquet stem is skipped without crash;
source without `data/` subdir doesn't crash the scan.
## [0.40.0] — 2026-05-06 ## [0.40.0] — 2026-05-06
### Fixed ### Fixed

View file

@ -1,6 +1,6 @@
[project] [project]
name = "agnes-the-ai-analyst" name = "agnes-the-ai-analyst"
version = "0.40.0" version = "0.41.0"
description = "Agnes — AI Data Analyst platform for AI analytical systems" description = "Agnes — AI Data Analyst platform for AI analytical systems"
requires-python = ">=3.11,<3.14" requires-python = ">=3.11,<3.14"
license = "MIT" license = "MIT"

View file

@ -395,6 +395,126 @@ class SyncOrchestrator:
source_name, table_name, e, source_name, table_name, e,
) )
# Filesystem-fallback master views (0.41.0). The 0.40.0 fix in
# `materialize_query` tries to register the parquet in
# `extract.duckdb`'s `_meta` + inner view, but the open-as-
# second-write-handle from the same uvicorn process collides
# with the existing read-only ATTACH that `rebuild()` itself
# holds (`Unique file handle conflict: Cannot attach "extract"
# — already attached by database "<source>"`). The 0.40.0
# helper logs a WARNING and falls through, parquet is
# canonical, but the master view never appears via the meta
# path. This second pass scans `<extract_dir>/data/*.parquet`
# directly and creates a master view via `read_parquet()` for
# any parquet that didn't already get one through the meta
# path. Decoupled from materialize_query's open-handle race;
# robust against any registration drift between materialize
# and rebuild.
try:
extracts_dir = _get_extracts_dir()
except Exception:
extracts_dir = None
if extracts_dir is not None:
data_dir = extracts_dir / source_name / "data"
if data_dir.exists():
# Resolve the set of registry-known table_ids for this
# source. The fallback is a master-view recovery path
# for parquets that materialize_query wrote but
# couldn't register in `_meta`; an **orphan** parquet
# (registry row deleted by `DELETE /api/admin/registry`
# but parquet not yet cleaned up) must NOT get a
# master view — that would resurrect a deleted table.
# Pre-existing test `test_orchestrator_skips_orphan_
# parquet_in_extracts` pins this contract.
registered_ids: Optional[set] = None
try:
from src.db import get_system_db
from src.repositories.table_registry import (
TableRegistryRepository,
)
sys_conn = get_system_db()
try:
rows = TableRegistryRepository(sys_conn).list_all()
# Match parquet stems against registry rows for
# THIS source where query_mode='materialized'.
# The parquet filename is keyed by registry
# `name` (per `_run_materialized_pass` /
# `materialize_query` convention).
registered_ids = {
str(r.get("name"))
for r in rows
if (r.get("source_type") or "") == source_name
and (r.get("query_mode") or "") == "materialized"
and r.get("name")
}
finally:
try:
sys_conn.close()
except Exception:
pass
except Exception as e:
# No registry access (test fixture, transient DB
# error) — skip the fallback rather than risk
# exposing orphan parquets.
logger.warning(
"filesystem-fallback: registry read failed (%s); "
"skipping fallback scan for %s — orphan parquets "
"from a prior DELETE could otherwise be exposed.",
e, source_name,
)
registered_ids = None
if registered_ids is not None:
already_created = set(tables)
for parquet_path in sorted(data_dir.glob("*.parquet")):
table_id = parquet_path.stem
if not _validate_identifier(table_id, "fs_fallback table_id"):
continue
if table_id in already_created:
continue
# Only register parquets that have a live
# materialized registry row. Orphans skip.
if table_id not in registered_ids:
logger.debug(
"filesystem-fallback: skipping orphan "
"parquet %s/%s (no registry row)",
source_name, table_id,
)
continue
# view_repo claim — same first-come-first-served
# rule as the meta-path branch above.
if view_repo is not None:
if not view_repo.claim(table_id, source_name):
prior_owner = (
view_repo.get_owner(table_id)
or existing_owners.get(table_id, "<unknown>")
)
logger.error(
"view_ownership collision: %s already owns view %r; "
"%s.%s (filesystem-fallback) will NOT be exposed.",
prior_owner, table_id, source_name, table_id,
)
continue
if claimed_pairs is not None:
claimed_pairs.append((source_name, table_id))
try:
safe_path = str(parquet_path).replace("'", "''")
conn.execute(
f"CREATE OR REPLACE VIEW \"{table_id}\" AS "
f"SELECT * FROM read_parquet('{safe_path}')"
)
tables.append(table_id)
logger.info(
"filesystem-fallback master view created: "
"%s/%s (parquet at %s) — meta row was missing",
source_name, table_id, parquet_path,
)
except Exception as e:
logger.error(
"filesystem-fallback master view failed for %s/%s: %s",
source_name, table_id, e,
)
# Update sync_state in system DB # Update sync_state in system DB
self._update_sync_state(meta_rows, source_name) self._update_sync_state(meta_rows, source_name)

View file

@ -677,3 +677,244 @@ class TestOrchestratorFailureModes:
assert "corrupt_a" not in result assert "corrupt_a" not in result
assert "corrupt_b" not in result assert "corrupt_b" not in result
assert "keboola" in result assert "keboola" in result
# ----------------------------------------------------------------------------
# 0.41.0 — filesystem-fallback master views for materialized parquets that
# couldn't register themselves in extract.duckdb's _meta (the open-as-second-
# write-handle race that 0.40.0's _persist_materialized_inner_view hits).
# ----------------------------------------------------------------------------
import struct
def _write_minimal_parquet(path: Path, n_rows: int = 3) -> None:
"""Write a tiny valid parquet file using DuckDB's COPY. Used to seed
the filesystem-fallback test cases where we want a parquet on disk
that the extractor never registered in _meta."""
conn = duckdb.connect(":memory:")
try:
conn.execute(f"CREATE TABLE t AS SELECT range AS id FROM range({n_rows})")
safe = str(path).replace("'", "''")
conn.execute(f"COPY (SELECT * FROM t) TO '{safe}' (FORMAT PARQUET)")
finally:
conn.close()
def _seed_registry_materialized_row(table_id: str, source_type: str = "bigquery") -> None:
"""Insert a `query_mode='materialized'` row into table_registry so
the filesystem-fallback scan recognises the parquet as live (not
orphan from a deleted registry row).
Uses the same `get_system_db` + `TableRegistryRepository` path the
orchestrator's fallback uses internally."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
conn = get_system_db()
try:
TableRegistryRepository(conn).register(
id=table_id,
name=table_id,
source_type=source_type,
bucket="bkt",
source_table=table_id,
source_query=f"SELECT 1 AS id",
query_mode="materialized",
)
finally:
try:
conn.close()
except Exception:
pass
class TestFilesystemFallbackMasterViews:
"""If a parquet exists on disk but never made it into _meta (the
open-handle race that bit 0.40.0), the orchestrator must still
create a master view over it via read_parquet()."""
def test_filesystem_fallback_creates_master_view(self, setup_env):
from src.orchestrator import SyncOrchestrator
# Set up an extract.duckdb with ONLY remote rows in _meta.
_create_mock_extract(
setup_env["extracts_dir"],
"bigquery",
tables=[
{"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"},
],
)
# Drop a parquet on disk for a table that's NOT in _meta — this
# is the materialize_query "atomic swap succeeded but _meta
# registration hit lock conflict" scenario.
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
_write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=42)
# Seed the matching materialized registry row — orphan parquets
# without a registry row are deliberately skipped.
_seed_registry_materialized_row("order_economics")
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
result = orch.rebuild()
# Both views should appear: the meta-path one and the
# filesystem-fallback one.
assert "bigquery" in result
assert "remote_one" in result["bigquery"]
assert "order_economics" in result["bigquery"], (
"filesystem-fallback master view must be created when parquet "
"exists on disk but _meta row is missing"
)
# Verify the master view actually queries the parquet.
master = duckdb.connect(setup_env["analytics_db"], read_only=True)
try:
n = master.execute("SELECT COUNT(*) FROM order_economics").fetchone()[0]
assert n == 42, f"master view should return parquet rows, got {n}"
finally:
master.close()
def test_filesystem_fallback_does_not_duplicate_meta_path(self, setup_env, caplog):
"""When the same name is in BOTH _meta (with an inner view) AND
on disk as a parquet, the meta path wins filesystem fallback
must not create a second / replacement view that shadows the
meta-driven one."""
from src.orchestrator import SyncOrchestrator
_create_mock_extract(
setup_env["extracts_dir"],
"bigquery",
tables=[
{
"name": "order_economics",
"data": [{"x": "from_meta"}],
"query_mode": "materialized",
},
],
)
# Also drop a parquet of the same name on disk.
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
_write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=99)
_seed_registry_materialized_row("order_economics")
import logging
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
with caplog.at_level(logging.INFO, logger="src.orchestrator"):
result = orch.rebuild()
# `order_economics` should appear exactly once in the result list —
# meta path won, fallback skipped because the name was already
# in the per-source `tables` set.
bq_tables = result.get("bigquery", [])
assert bq_tables.count("order_economics") == 1, (
f"name should appear exactly once, got {bq_tables}"
)
# Fallback log line must NOT have fired for this name.
fallback_lines = [
r.getMessage() for r in caplog.records
if "filesystem-fallback master view created" in r.getMessage()
and "order_economics" in r.getMessage()
]
assert fallback_lines == [], (
f"filesystem-fallback must not fire when meta path is viable; got: {fallback_lines}"
)
def test_filesystem_fallback_skips_invalid_table_id(self, setup_env, tmp_path):
"""A parquet whose stem doesn't pass identifier validation
(e.g. starts with a digit or contains spaces) must be skipped,
not crash the rebuild."""
from src.orchestrator import SyncOrchestrator
_create_mock_extract(
setup_env["extracts_dir"],
"bigquery",
tables=[
{"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"},
],
)
# Identifier validator rejects names starting with a digit.
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
_write_minimal_parquet(data_dir / "9bad_name.parquet")
# Even though the registry would let it through, the identifier
# validator should still reject. (Seed registry to isolate the
# validator path from the orphan-skip path.)
try:
_seed_registry_materialized_row("9bad_name")
except Exception:
# Registry row insert may itself reject the bad id; that's
# fine — the orchestrator scan never sees it then either.
pass
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
result = orch.rebuild()
# remote_one still works; bad-named parquet is silently skipped.
assert "remote_one" in result["bigquery"]
assert "9bad_name" not in result["bigquery"]
# Master DB has no such view.
master = duckdb.connect(setup_env["analytics_db"], read_only=True)
try:
tables = [r[0] for r in master.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema='main'"
).fetchall()]
assert "9bad_name" not in tables
finally:
master.close()
def test_filesystem_fallback_skips_orphan_parquet(self, setup_env):
"""Orphan parquets — files left on disk after `DELETE
/api/admin/registry/{id}` either crashed mid-cleanup or the
operator dropped the row but the file lingers must NOT get a
master view. Otherwise the deleted table would resurrect on next
rebuild, defeating the unregister contract.
Existing test `test_orchestrator_skips_orphan_parquet_in_extracts`
in `tests/test_admin_unregister_cleanup.py` pins this rule for the
wider unregister flow; this test pins it specifically for the
new filesystem-fallback path."""
from src.orchestrator import SyncOrchestrator
_create_mock_extract(
setup_env["extracts_dir"],
"bigquery",
tables=[
{"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"},
],
)
# Orphan parquet — NO registry row.
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
_write_minimal_parquet(data_dir / "deleted_table.parquet")
# NOT calling _seed_registry_materialized_row here — that's the
# whole point: registry row is missing.
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
result = orch.rebuild()
bq_tables = result.get("bigquery", [])
assert "deleted_table" not in bq_tables, (
f"orphan parquet must NOT resurrect as a master view; got {bq_tables}"
)
def test_filesystem_fallback_no_data_dir_is_safe(self, setup_env):
"""Sources without a `<extract_dir>/data/` directory (e.g. the
BigQuery extractor in remote-only mode pre-#160) must not crash
the fallback scan."""
from src.orchestrator import SyncOrchestrator
# Create a source with extract.duckdb but no `data/` subdir.
source_dir = setup_env["extracts_dir"] / "bigquery"
source_dir.mkdir()
db_path = source_dir / "extract.duckdb"
conn = duckdb.connect(str(db_path))
conn.execute(
"CREATE TABLE _meta (table_name VARCHAR, description VARCHAR, "
"rows BIGINT, size_bytes BIGINT, extracted_at TIMESTAMP, "
"query_mode VARCHAR)"
)
conn.close()
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
# Must not crash.
orch.rebuild()