From dfb7f25e76976ca2ff6b632c5295ee1dcf468863 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Wed, 6 May 2026 16:58:18 +0200 Subject: [PATCH 1/2] =?UTF-8?q?release:=200.41.0=20=E2=80=94=20orchestrato?= =?UTF-8?q?r=20filesystem=20fallback=20for=20missing=20=5Fmeta=20materiali?= =?UTF-8?q?zed=20rows?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 0.40.0 added _persist_materialized_inner_view in materialize_query, which tried to open extract.duckdb from a fresh DuckDB handle to write the _meta row + inner view. In production this conflicts with the same uvicorn process's existing read-only ATTACH (orchestrator's analytics conn holds extract.duckdb ATTACHed as alias), and DuckDB single-process file-handle uniqueness rejects with: Binder Error: Unique file handle conflict: Cannot attach "extract" — already attached by database "" The helper logs WARNING fail-soft, parquet stays canonical, but the master view never appears via the meta path. Fix: at the end of _attach_and_create_views, scan /data/*.parquet and CREATE OR REPLACE VIEW AS SELECT * FROM read_parquet('') for any parquet whose is not already in the per-source tables list (= meta path didn't pick it up). Decoupled from materialize_query open-handle race. Honors the same view_ownership cross-connector collision rules as the meta path (first-come-first-served via view_repo.claim). Tests: - filesystem-fallback fires when _meta row missing - skipped when meta path already created the view (no shadow) - skips invalid identifiers (e.g. parquet stem starting with a digit) - doesn't crash when source has no data/ subdir --- CHANGELOG.md | 29 +++++++ pyproject.toml | 2 +- src/orchestrator.py | 63 ++++++++++++++ tests/test_orchestrator.py | 168 +++++++++++++++++++++++++++++++++++++ 4 files changed, 261 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8afc6c2..adc8edd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,35 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.41.0] — 2026-05-06 + +### Fixed +- **Orchestrator filesystem fallback for materialized parquets that + couldn't register in `extract.duckdb`'s `_meta`** + (`src/orchestrator.py:_attach_and_create_views`). The 0.40.0 fix in + `materialize_query` opens `extract.duckdb` from a fresh DuckDB handle + to write the `_meta` row + inner view; in production the same uvicorn + process already holds `extract.duckdb` ATTACHed read-only as the + source-name alias under the orchestrator's analytics connection, and + DuckDB's single-process file-handle uniqueness rejects the second + open with `Binder Error: Unique file handle conflict: Cannot attach + "extract" — already attached by database ""`. The 0.40.0 + helper logs WARNING and falls through; parquet stays canonical, but + the master view never appears via the meta path. + + This release adds a second pass at the end of + `_attach_and_create_views`: scan `/data/*.parquet` and + create a master view via `read_parquet('')` for any parquet + whose `` is not already in the per-source `tables` list (i.e. the + meta path didn't pick it up). Decoupled from `materialize_query`'s + open-handle race; robust against any registration drift between + materialize and rebuild. Honors the same `view_ownership` / cross- + connector collision rules as the meta path (first-come-first-served + via `view_repo.claim`). Tests cover: fallback fires when meta row is + missing; fallback skips when meta path already created the view (no + shadow); invalid identifier in parquet stem is skipped without crash; + source without `data/` subdir doesn't crash the scan. + ## [0.40.0] — 2026-05-06 ### Fixed diff --git a/pyproject.toml b/pyproject.toml index aafb4bc..c98c2c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.40.0" +version = "0.41.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/src/orchestrator.py b/src/orchestrator.py index 47f1737..13da638 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -395,6 +395,69 @@ class SyncOrchestrator: source_name, table_name, e, ) + # Filesystem-fallback master views (0.41.0). The 0.40.0 fix in + # `materialize_query` tries to register the parquet in + # `extract.duckdb`'s `_meta` + inner view, but the open-as- + # second-write-handle from the same uvicorn process collides + # with the existing read-only ATTACH that `rebuild()` itself + # holds (`Unique file handle conflict: Cannot attach "extract" + # — already attached by database ""`). The 0.40.0 + # helper logs a WARNING and falls through, parquet is + # canonical, but the master view never appears via the meta + # path. This second pass scans `/data/*.parquet` + # directly and creates a master view via `read_parquet()` for + # any parquet that didn't already get one through the meta + # path. Decoupled from materialize_query's open-handle race; + # robust against any registration drift between materialize + # and rebuild. + try: + extracts_dir = _get_extracts_dir() + except Exception: + extracts_dir = None + if extracts_dir is not None: + data_dir = extracts_dir / source_name / "data" + if data_dir.exists(): + already_created = set(tables) + for parquet_path in sorted(data_dir.glob("*.parquet")): + table_id = parquet_path.stem + if not _validate_identifier(table_id, "fs_fallback table_id"): + continue + if table_id in already_created: + continue + # view_repo claim — same first-come-first-served + # rule as the meta-path branch above. + if view_repo is not None: + if not view_repo.claim(table_id, source_name): + prior_owner = ( + view_repo.get_owner(table_id) + or existing_owners.get(table_id, "") + ) + logger.error( + "view_ownership collision: %s already owns view %r; " + "%s.%s (filesystem-fallback) will NOT be exposed.", + prior_owner, table_id, source_name, table_id, + ) + continue + if claimed_pairs is not None: + claimed_pairs.append((source_name, table_id)) + try: + safe_path = str(parquet_path).replace("'", "''") + conn.execute( + f"CREATE OR REPLACE VIEW \"{table_id}\" AS " + f"SELECT * FROM read_parquet('{safe_path}')" + ) + tables.append(table_id) + logger.info( + "filesystem-fallback master view created: " + "%s/%s (parquet at %s) — meta row was missing", + source_name, table_id, parquet_path, + ) + except Exception as e: + logger.error( + "filesystem-fallback master view failed for %s/%s: %s", + source_name, table_id, e, + ) + # Update sync_state in system DB self._update_sync_state(meta_rows, source_name) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 0f061e1..5a69f01 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -677,3 +677,171 @@ class TestOrchestratorFailureModes: assert "corrupt_a" not in result assert "corrupt_b" not in result assert "keboola" in result + + +# ---------------------------------------------------------------------------- +# 0.41.0 — filesystem-fallback master views for materialized parquets that +# couldn't register themselves in extract.duckdb's _meta (the open-as-second- +# write-handle race that 0.40.0's _persist_materialized_inner_view hits). +# ---------------------------------------------------------------------------- + + +import struct + + +def _write_minimal_parquet(path: Path, n_rows: int = 3) -> None: + """Write a tiny valid parquet file using DuckDB's COPY. Used to seed + the filesystem-fallback test cases where we want a parquet on disk + that the extractor never registered in _meta.""" + conn = duckdb.connect(":memory:") + try: + conn.execute(f"CREATE TABLE t AS SELECT range AS id FROM range({n_rows})") + safe = str(path).replace("'", "''") + conn.execute(f"COPY (SELECT * FROM t) TO '{safe}' (FORMAT PARQUET)") + finally: + conn.close() + + +class TestFilesystemFallbackMasterViews: + """If a parquet exists on disk but never made it into _meta (the + open-handle race that bit 0.40.0), the orchestrator must still + create a master view over it via read_parquet().""" + + def test_filesystem_fallback_creates_master_view(self, setup_env): + from src.orchestrator import SyncOrchestrator + + # Set up an extract.duckdb with ONLY remote rows in _meta. + _create_mock_extract( + setup_env["extracts_dir"], + "bigquery", + tables=[ + {"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"}, + ], + ) + + # Drop a parquet on disk for a table that's NOT in _meta — this + # is the materialize_query "atomic swap succeeded but _meta + # registration hit lock conflict" scenario. + data_dir = setup_env["extracts_dir"] / "bigquery" / "data" + _write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=42) + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # Both views should appear: the meta-path one and the + # filesystem-fallback one. + assert "bigquery" in result + assert "remote_one" in result["bigquery"] + assert "order_economics" in result["bigquery"], ( + "filesystem-fallback master view must be created when parquet " + "exists on disk but _meta row is missing" + ) + + # Verify the master view actually queries the parquet. + master = duckdb.connect(setup_env["analytics_db"], read_only=True) + try: + n = master.execute("SELECT COUNT(*) FROM order_economics").fetchone()[0] + assert n == 42, f"master view should return parquet rows, got {n}" + finally: + master.close() + + def test_filesystem_fallback_does_not_duplicate_meta_path(self, setup_env, caplog): + """When the same name is in BOTH _meta (with an inner view) AND + on disk as a parquet, the meta path wins — filesystem fallback + must not create a second / replacement view that shadows the + meta-driven one.""" + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "bigquery", + tables=[ + { + "name": "order_economics", + "data": [{"x": "from_meta"}], + "query_mode": "materialized", + }, + ], + ) + # Also drop a parquet of the same name on disk. + data_dir = setup_env["extracts_dir"] / "bigquery" / "data" + _write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=99) + + import logging + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + with caplog.at_level(logging.INFO, logger="src.orchestrator"): + result = orch.rebuild() + + # `order_economics` should appear exactly once in the result list — + # meta path won, fallback skipped because the name was already + # in the per-source `tables` set. + bq_tables = result.get("bigquery", []) + assert bq_tables.count("order_economics") == 1, ( + f"name should appear exactly once, got {bq_tables}" + ) + # Fallback log line must NOT have fired for this name. + fallback_lines = [ + r.getMessage() for r in caplog.records + if "filesystem-fallback master view created" in r.getMessage() + and "order_economics" in r.getMessage() + ] + assert fallback_lines == [], ( + f"filesystem-fallback must not fire when meta path is viable; got: {fallback_lines}" + ) + + def test_filesystem_fallback_skips_invalid_table_id(self, setup_env, tmp_path): + """A parquet whose stem doesn't pass identifier validation + (e.g. starts with a digit or contains spaces) must be skipped, + not crash the rebuild.""" + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "bigquery", + tables=[ + {"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"}, + ], + ) + + # Identifier validator rejects names starting with a digit. + data_dir = setup_env["extracts_dir"] / "bigquery" / "data" + _write_minimal_parquet(data_dir / "9bad_name.parquet") + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # remote_one still works; bad-named parquet is silently skipped. + assert "remote_one" in result["bigquery"] + assert "9bad_name" not in result["bigquery"] + # Master DB has no such view. + master = duckdb.connect(setup_env["analytics_db"], read_only=True) + try: + tables = [r[0] for r in master.execute( + "SELECT table_name FROM information_schema.tables " + "WHERE table_schema='main'" + ).fetchall()] + assert "9bad_name" not in tables + finally: + master.close() + + def test_filesystem_fallback_no_data_dir_is_safe(self, setup_env): + """Sources without a `/data/` directory (e.g. the + BigQuery extractor in remote-only mode pre-#160) must not crash + the fallback scan.""" + from src.orchestrator import SyncOrchestrator + + # Create a source with extract.duckdb but no `data/` subdir. + source_dir = setup_env["extracts_dir"] / "bigquery" + source_dir.mkdir() + db_path = source_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute( + "CREATE TABLE _meta (table_name VARCHAR, description VARCHAR, " + "rows BIGINT, size_bytes BIGINT, extracted_at TIMESTAMP, " + "query_mode VARCHAR)" + ) + conn.close() + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + # Must not crash. + orch.rebuild() From 7781c3f331262a14cd9fc3e83f5f00d9c72a64d2 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Wed, 6 May 2026 17:06:20 +0200 Subject: [PATCH 2/2] fix(0.41.0): orphan parquet skip in filesystem fallback (CI regression) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-existing test_orchestrator_skips_orphan_parquet_in_extracts caught the regression: my filesystem fallback created master views for ANY parquet on disk, including orphans where DELETE /api/admin/registry removed the registry row but the parquet wasn't fully cleaned up. Fix: load the set of registered materialized table_ids for THIS source from table_registry before the scan, and skip any parquet whose stem isn't in that set. If the registry read fails (test fixture, transient DB error), skip the fallback entirely — orphan exposure is worse than missing master view recovery. Pre-existing test now passes. New regression test pins the orphan-skip contract specifically for the filesystem-fallback path. --- src/orchestrator.py | 133 ++++++++++++++++++++++++++----------- tests/test_orchestrator.py | 73 ++++++++++++++++++++ 2 files changed, 168 insertions(+), 38 deletions(-) diff --git a/src/orchestrator.py b/src/orchestrator.py index 13da638..0747554 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -417,46 +417,103 @@ class SyncOrchestrator: if extracts_dir is not None: data_dir = extracts_dir / source_name / "data" if data_dir.exists(): - already_created = set(tables) - for parquet_path in sorted(data_dir.glob("*.parquet")): - table_id = parquet_path.stem - if not _validate_identifier(table_id, "fs_fallback table_id"): - continue - if table_id in already_created: - continue - # view_repo claim — same first-come-first-served - # rule as the meta-path branch above. - if view_repo is not None: - if not view_repo.claim(table_id, source_name): - prior_owner = ( - view_repo.get_owner(table_id) - or existing_owners.get(table_id, "") - ) - logger.error( - "view_ownership collision: %s already owns view %r; " - "%s.%s (filesystem-fallback) will NOT be exposed.", - prior_owner, table_id, source_name, table_id, + # Resolve the set of registry-known table_ids for this + # source. The fallback is a master-view recovery path + # for parquets that materialize_query wrote but + # couldn't register in `_meta`; an **orphan** parquet + # (registry row deleted by `DELETE /api/admin/registry` + # but parquet not yet cleaned up) must NOT get a + # master view — that would resurrect a deleted table. + # Pre-existing test `test_orchestrator_skips_orphan_ + # parquet_in_extracts` pins this contract. + registered_ids: Optional[set] = None + try: + from src.db import get_system_db + from src.repositories.table_registry import ( + TableRegistryRepository, + ) + sys_conn = get_system_db() + try: + rows = TableRegistryRepository(sys_conn).list_all() + # Match parquet stems against registry rows for + # THIS source where query_mode='materialized'. + # The parquet filename is keyed by registry + # `name` (per `_run_materialized_pass` / + # `materialize_query` convention). + registered_ids = { + str(r.get("name")) + for r in rows + if (r.get("source_type") or "") == source_name + and (r.get("query_mode") or "") == "materialized" + and r.get("name") + } + finally: + try: + sys_conn.close() + except Exception: + pass + except Exception as e: + # No registry access (test fixture, transient DB + # error) — skip the fallback rather than risk + # exposing orphan parquets. + logger.warning( + "filesystem-fallback: registry read failed (%s); " + "skipping fallback scan for %s — orphan parquets " + "from a prior DELETE could otherwise be exposed.", + e, source_name, + ) + registered_ids = None + + if registered_ids is not None: + already_created = set(tables) + for parquet_path in sorted(data_dir.glob("*.parquet")): + table_id = parquet_path.stem + if not _validate_identifier(table_id, "fs_fallback table_id"): + continue + if table_id in already_created: + continue + # Only register parquets that have a live + # materialized registry row. Orphans skip. + if table_id not in registered_ids: + logger.debug( + "filesystem-fallback: skipping orphan " + "parquet %s/%s (no registry row)", + source_name, table_id, ) continue - if claimed_pairs is not None: - claimed_pairs.append((source_name, table_id)) - try: - safe_path = str(parquet_path).replace("'", "''") - conn.execute( - f"CREATE OR REPLACE VIEW \"{table_id}\" AS " - f"SELECT * FROM read_parquet('{safe_path}')" - ) - tables.append(table_id) - logger.info( - "filesystem-fallback master view created: " - "%s/%s (parquet at %s) — meta row was missing", - source_name, table_id, parquet_path, - ) - except Exception as e: - logger.error( - "filesystem-fallback master view failed for %s/%s: %s", - source_name, table_id, e, - ) + # view_repo claim — same first-come-first-served + # rule as the meta-path branch above. + if view_repo is not None: + if not view_repo.claim(table_id, source_name): + prior_owner = ( + view_repo.get_owner(table_id) + or existing_owners.get(table_id, "") + ) + logger.error( + "view_ownership collision: %s already owns view %r; " + "%s.%s (filesystem-fallback) will NOT be exposed.", + prior_owner, table_id, source_name, table_id, + ) + continue + if claimed_pairs is not None: + claimed_pairs.append((source_name, table_id)) + try: + safe_path = str(parquet_path).replace("'", "''") + conn.execute( + f"CREATE OR REPLACE VIEW \"{table_id}\" AS " + f"SELECT * FROM read_parquet('{safe_path}')" + ) + tables.append(table_id) + logger.info( + "filesystem-fallback master view created: " + "%s/%s (parquet at %s) — meta row was missing", + source_name, table_id, parquet_path, + ) + except Exception as e: + logger.error( + "filesystem-fallback master view failed for %s/%s: %s", + source_name, table_id, e, + ) # Update sync_state in system DB self._update_sync_state(meta_rows, source_name) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 5a69f01..3aaf0b5 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -702,6 +702,33 @@ def _write_minimal_parquet(path: Path, n_rows: int = 3) -> None: conn.close() +def _seed_registry_materialized_row(table_id: str, source_type: str = "bigquery") -> None: + """Insert a `query_mode='materialized'` row into table_registry so + the filesystem-fallback scan recognises the parquet as live (not + orphan from a deleted registry row). + + Uses the same `get_system_db` + `TableRegistryRepository` path the + orchestrator's fallback uses internally.""" + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + conn = get_system_db() + try: + TableRegistryRepository(conn).register( + id=table_id, + name=table_id, + source_type=source_type, + bucket="bkt", + source_table=table_id, + source_query=f"SELECT 1 AS id", + query_mode="materialized", + ) + finally: + try: + conn.close() + except Exception: + pass + + class TestFilesystemFallbackMasterViews: """If a parquet exists on disk but never made it into _meta (the open-handle race that bit 0.40.0), the orchestrator must still @@ -724,6 +751,9 @@ class TestFilesystemFallbackMasterViews: # registration hit lock conflict" scenario. data_dir = setup_env["extracts_dir"] / "bigquery" / "data" _write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=42) + # Seed the matching materialized registry row — orphan parquets + # without a registry row are deliberately skipped. + _seed_registry_materialized_row("order_economics") orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) result = orch.rebuild() @@ -766,6 +796,7 @@ class TestFilesystemFallbackMasterViews: # Also drop a parquet of the same name on disk. data_dir = setup_env["extracts_dir"] / "bigquery" / "data" _write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=99) + _seed_registry_materialized_row("order_economics") import logging orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) @@ -806,6 +837,15 @@ class TestFilesystemFallbackMasterViews: # Identifier validator rejects names starting with a digit. data_dir = setup_env["extracts_dir"] / "bigquery" / "data" _write_minimal_parquet(data_dir / "9bad_name.parquet") + # Even though the registry would let it through, the identifier + # validator should still reject. (Seed registry to isolate the + # validator path from the orphan-skip path.) + try: + _seed_registry_materialized_row("9bad_name") + except Exception: + # Registry row insert may itself reject the bad id; that's + # fine — the orchestrator scan never sees it then either. + pass orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) result = orch.rebuild() @@ -824,6 +864,39 @@ class TestFilesystemFallbackMasterViews: finally: master.close() + def test_filesystem_fallback_skips_orphan_parquet(self, setup_env): + """Orphan parquets — files left on disk after `DELETE + /api/admin/registry/{id}` either crashed mid-cleanup or the + operator dropped the row but the file lingers — must NOT get a + master view. Otherwise the deleted table would resurrect on next + rebuild, defeating the unregister contract. + + Existing test `test_orchestrator_skips_orphan_parquet_in_extracts` + in `tests/test_admin_unregister_cleanup.py` pins this rule for the + wider unregister flow; this test pins it specifically for the + new filesystem-fallback path.""" + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "bigquery", + tables=[ + {"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"}, + ], + ) + # Orphan parquet — NO registry row. + data_dir = setup_env["extracts_dir"] / "bigquery" / "data" + _write_minimal_parquet(data_dir / "deleted_table.parquet") + # NOT calling _seed_registry_materialized_row here — that's the + # whole point: registry row is missing. + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + bq_tables = result.get("bigquery", []) + assert "deleted_table" not in bq_tables, ( + f"orphan parquet must NOT resurrect as a master view; got {bq_tables}" + ) + def test_filesystem_fallback_no_data_dir_is_safe(self, setup_env): """Sources without a `/data/` directory (e.g. the BigQuery extractor in remote-only mode pre-#160) must not crash