diff --git a/src/orchestrator.py b/src/orchestrator.py index 13da638..0747554 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -417,46 +417,103 @@ class SyncOrchestrator: if extracts_dir is not None: data_dir = extracts_dir / source_name / "data" if data_dir.exists(): - already_created = set(tables) - for parquet_path in sorted(data_dir.glob("*.parquet")): - table_id = parquet_path.stem - if not _validate_identifier(table_id, "fs_fallback table_id"): - continue - if table_id in already_created: - continue - # view_repo claim — same first-come-first-served - # rule as the meta-path branch above. - if view_repo is not None: - if not view_repo.claim(table_id, source_name): - prior_owner = ( - view_repo.get_owner(table_id) - or existing_owners.get(table_id, "") - ) - logger.error( - "view_ownership collision: %s already owns view %r; " - "%s.%s (filesystem-fallback) will NOT be exposed.", - prior_owner, table_id, source_name, table_id, + # Resolve the set of registry-known table_ids for this + # source. The fallback is a master-view recovery path + # for parquets that materialize_query wrote but + # couldn't register in `_meta`; an **orphan** parquet + # (registry row deleted by `DELETE /api/admin/registry` + # but parquet not yet cleaned up) must NOT get a + # master view — that would resurrect a deleted table. + # Pre-existing test `test_orchestrator_skips_orphan_ + # parquet_in_extracts` pins this contract. + registered_ids: Optional[set] = None + try: + from src.db import get_system_db + from src.repositories.table_registry import ( + TableRegistryRepository, + ) + sys_conn = get_system_db() + try: + rows = TableRegistryRepository(sys_conn).list_all() + # Match parquet stems against registry rows for + # THIS source where query_mode='materialized'. + # The parquet filename is keyed by registry + # `name` (per `_run_materialized_pass` / + # `materialize_query` convention). + registered_ids = { + str(r.get("name")) + for r in rows + if (r.get("source_type") or "") == source_name + and (r.get("query_mode") or "") == "materialized" + and r.get("name") + } + finally: + try: + sys_conn.close() + except Exception: + pass + except Exception as e: + # No registry access (test fixture, transient DB + # error) — skip the fallback rather than risk + # exposing orphan parquets. + logger.warning( + "filesystem-fallback: registry read failed (%s); " + "skipping fallback scan for %s — orphan parquets " + "from a prior DELETE could otherwise be exposed.", + e, source_name, + ) + registered_ids = None + + if registered_ids is not None: + already_created = set(tables) + for parquet_path in sorted(data_dir.glob("*.parquet")): + table_id = parquet_path.stem + if not _validate_identifier(table_id, "fs_fallback table_id"): + continue + if table_id in already_created: + continue + # Only register parquets that have a live + # materialized registry row. Orphans skip. + if table_id not in registered_ids: + logger.debug( + "filesystem-fallback: skipping orphan " + "parquet %s/%s (no registry row)", + source_name, table_id, ) continue - if claimed_pairs is not None: - claimed_pairs.append((source_name, table_id)) - try: - safe_path = str(parquet_path).replace("'", "''") - conn.execute( - f"CREATE OR REPLACE VIEW \"{table_id}\" AS " - f"SELECT * FROM read_parquet('{safe_path}')" - ) - tables.append(table_id) - logger.info( - "filesystem-fallback master view created: " - "%s/%s (parquet at %s) — meta row was missing", - source_name, table_id, parquet_path, - ) - except Exception as e: - logger.error( - "filesystem-fallback master view failed for %s/%s: %s", - source_name, table_id, e, - ) + # view_repo claim — same first-come-first-served + # rule as the meta-path branch above. + if view_repo is not None: + if not view_repo.claim(table_id, source_name): + prior_owner = ( + view_repo.get_owner(table_id) + or existing_owners.get(table_id, "") + ) + logger.error( + "view_ownership collision: %s already owns view %r; " + "%s.%s (filesystem-fallback) will NOT be exposed.", + prior_owner, table_id, source_name, table_id, + ) + continue + if claimed_pairs is not None: + claimed_pairs.append((source_name, table_id)) + try: + safe_path = str(parquet_path).replace("'", "''") + conn.execute( + f"CREATE OR REPLACE VIEW \"{table_id}\" AS " + f"SELECT * FROM read_parquet('{safe_path}')" + ) + tables.append(table_id) + logger.info( + "filesystem-fallback master view created: " + "%s/%s (parquet at %s) — meta row was missing", + source_name, table_id, parquet_path, + ) + except Exception as e: + logger.error( + "filesystem-fallback master view failed for %s/%s: %s", + source_name, table_id, e, + ) # Update sync_state in system DB self._update_sync_state(meta_rows, source_name) diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 5a69f01..3aaf0b5 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -702,6 +702,33 @@ def _write_minimal_parquet(path: Path, n_rows: int = 3) -> None: conn.close() +def _seed_registry_materialized_row(table_id: str, source_type: str = "bigquery") -> None: + """Insert a `query_mode='materialized'` row into table_registry so + the filesystem-fallback scan recognises the parquet as live (not + orphan from a deleted registry row). + + Uses the same `get_system_db` + `TableRegistryRepository` path the + orchestrator's fallback uses internally.""" + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + conn = get_system_db() + try: + TableRegistryRepository(conn).register( + id=table_id, + name=table_id, + source_type=source_type, + bucket="bkt", + source_table=table_id, + source_query=f"SELECT 1 AS id", + query_mode="materialized", + ) + finally: + try: + conn.close() + except Exception: + pass + + class TestFilesystemFallbackMasterViews: """If a parquet exists on disk but never made it into _meta (the open-handle race that bit 0.40.0), the orchestrator must still @@ -724,6 +751,9 @@ class TestFilesystemFallbackMasterViews: # registration hit lock conflict" scenario. data_dir = setup_env["extracts_dir"] / "bigquery" / "data" _write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=42) + # Seed the matching materialized registry row — orphan parquets + # without a registry row are deliberately skipped. + _seed_registry_materialized_row("order_economics") orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) result = orch.rebuild() @@ -766,6 +796,7 @@ class TestFilesystemFallbackMasterViews: # Also drop a parquet of the same name on disk. data_dir = setup_env["extracts_dir"] / "bigquery" / "data" _write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=99) + _seed_registry_materialized_row("order_economics") import logging orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) @@ -806,6 +837,15 @@ class TestFilesystemFallbackMasterViews: # Identifier validator rejects names starting with a digit. data_dir = setup_env["extracts_dir"] / "bigquery" / "data" _write_minimal_parquet(data_dir / "9bad_name.parquet") + # Even though the registry would let it through, the identifier + # validator should still reject. (Seed registry to isolate the + # validator path from the orphan-skip path.) + try: + _seed_registry_materialized_row("9bad_name") + except Exception: + # Registry row insert may itself reject the bad id; that's + # fine — the orchestrator scan never sees it then either. + pass orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) result = orch.rebuild() @@ -824,6 +864,39 @@ class TestFilesystemFallbackMasterViews: finally: master.close() + def test_filesystem_fallback_skips_orphan_parquet(self, setup_env): + """Orphan parquets — files left on disk after `DELETE + /api/admin/registry/{id}` either crashed mid-cleanup or the + operator dropped the row but the file lingers — must NOT get a + master view. Otherwise the deleted table would resurrect on next + rebuild, defeating the unregister contract. + + Existing test `test_orchestrator_skips_orphan_parquet_in_extracts` + in `tests/test_admin_unregister_cleanup.py` pins this rule for the + wider unregister flow; this test pins it specifically for the + new filesystem-fallback path.""" + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "bigquery", + tables=[ + {"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"}, + ], + ) + # Orphan parquet — NO registry row. + data_dir = setup_env["extracts_dir"] / "bigquery" / "data" + _write_minimal_parquet(data_dir / "deleted_table.parquet") + # NOT calling _seed_registry_materialized_row here — that's the + # whole point: registry row is missing. + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + bq_tables = result.get("bigquery", []) + assert "deleted_table" not in bq_tables, ( + f"orphan parquet must NOT resurrect as a master view; got {bq_tables}" + ) + def test_filesystem_fallback_no_data_dir_is_safe(self, setup_env): """Sources without a `/data/` directory (e.g. the BigQuery extractor in remote-only mode pre-#160) must not crash