fix(0.41.0): orphan parquet skip in filesystem fallback (CI regression)
Pre-existing test_orchestrator_skips_orphan_parquet_in_extracts caught the regression: my filesystem fallback created master views for ANY parquet on disk, including orphans where DELETE /api/admin/registry removed the registry row but the parquet wasn't fully cleaned up. Fix: load the set of registered materialized table_ids for THIS source from table_registry before the scan, and skip any parquet whose stem isn't in that set. If the registry read fails (test fixture, transient DB error), skip the fallback entirely — orphan exposure is worse than missing master view recovery. Pre-existing test now passes. New regression test pins the orphan-skip contract specifically for the filesystem-fallback path.
This commit is contained in:
parent
dfb7f25e76
commit
7781c3f331
2 changed files with 168 additions and 38 deletions
|
|
@ -417,46 +417,103 @@ class SyncOrchestrator:
|
||||||
if extracts_dir is not None:
|
if extracts_dir is not None:
|
||||||
data_dir = extracts_dir / source_name / "data"
|
data_dir = extracts_dir / source_name / "data"
|
||||||
if data_dir.exists():
|
if data_dir.exists():
|
||||||
already_created = set(tables)
|
# Resolve the set of registry-known table_ids for this
|
||||||
for parquet_path in sorted(data_dir.glob("*.parquet")):
|
# source. The fallback is a master-view recovery path
|
||||||
table_id = parquet_path.stem
|
# for parquets that materialize_query wrote but
|
||||||
if not _validate_identifier(table_id, "fs_fallback table_id"):
|
# couldn't register in `_meta`; an **orphan** parquet
|
||||||
continue
|
# (registry row deleted by `DELETE /api/admin/registry`
|
||||||
if table_id in already_created:
|
# but parquet not yet cleaned up) must NOT get a
|
||||||
continue
|
# master view — that would resurrect a deleted table.
|
||||||
# view_repo claim — same first-come-first-served
|
# Pre-existing test `test_orchestrator_skips_orphan_
|
||||||
# rule as the meta-path branch above.
|
# parquet_in_extracts` pins this contract.
|
||||||
if view_repo is not None:
|
registered_ids: Optional[set] = None
|
||||||
if not view_repo.claim(table_id, source_name):
|
try:
|
||||||
prior_owner = (
|
from src.db import get_system_db
|
||||||
view_repo.get_owner(table_id)
|
from src.repositories.table_registry import (
|
||||||
or existing_owners.get(table_id, "<unknown>")
|
TableRegistryRepository,
|
||||||
)
|
)
|
||||||
logger.error(
|
sys_conn = get_system_db()
|
||||||
"view_ownership collision: %s already owns view %r; "
|
try:
|
||||||
"%s.%s (filesystem-fallback) will NOT be exposed.",
|
rows = TableRegistryRepository(sys_conn).list_all()
|
||||||
prior_owner, table_id, source_name, table_id,
|
# Match parquet stems against registry rows for
|
||||||
|
# THIS source where query_mode='materialized'.
|
||||||
|
# The parquet filename is keyed by registry
|
||||||
|
# `name` (per `_run_materialized_pass` /
|
||||||
|
# `materialize_query` convention).
|
||||||
|
registered_ids = {
|
||||||
|
str(r.get("name"))
|
||||||
|
for r in rows
|
||||||
|
if (r.get("source_type") or "") == source_name
|
||||||
|
and (r.get("query_mode") or "") == "materialized"
|
||||||
|
and r.get("name")
|
||||||
|
}
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
sys_conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
# No registry access (test fixture, transient DB
|
||||||
|
# error) — skip the fallback rather than risk
|
||||||
|
# exposing orphan parquets.
|
||||||
|
logger.warning(
|
||||||
|
"filesystem-fallback: registry read failed (%s); "
|
||||||
|
"skipping fallback scan for %s — orphan parquets "
|
||||||
|
"from a prior DELETE could otherwise be exposed.",
|
||||||
|
e, source_name,
|
||||||
|
)
|
||||||
|
registered_ids = None
|
||||||
|
|
||||||
|
if registered_ids is not None:
|
||||||
|
already_created = set(tables)
|
||||||
|
for parquet_path in sorted(data_dir.glob("*.parquet")):
|
||||||
|
table_id = parquet_path.stem
|
||||||
|
if not _validate_identifier(table_id, "fs_fallback table_id"):
|
||||||
|
continue
|
||||||
|
if table_id in already_created:
|
||||||
|
continue
|
||||||
|
# Only register parquets that have a live
|
||||||
|
# materialized registry row. Orphans skip.
|
||||||
|
if table_id not in registered_ids:
|
||||||
|
logger.debug(
|
||||||
|
"filesystem-fallback: skipping orphan "
|
||||||
|
"parquet %s/%s (no registry row)",
|
||||||
|
source_name, table_id,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
if claimed_pairs is not None:
|
# view_repo claim — same first-come-first-served
|
||||||
claimed_pairs.append((source_name, table_id))
|
# rule as the meta-path branch above.
|
||||||
try:
|
if view_repo is not None:
|
||||||
safe_path = str(parquet_path).replace("'", "''")
|
if not view_repo.claim(table_id, source_name):
|
||||||
conn.execute(
|
prior_owner = (
|
||||||
f"CREATE OR REPLACE VIEW \"{table_id}\" AS "
|
view_repo.get_owner(table_id)
|
||||||
f"SELECT * FROM read_parquet('{safe_path}')"
|
or existing_owners.get(table_id, "<unknown>")
|
||||||
)
|
)
|
||||||
tables.append(table_id)
|
logger.error(
|
||||||
logger.info(
|
"view_ownership collision: %s already owns view %r; "
|
||||||
"filesystem-fallback master view created: "
|
"%s.%s (filesystem-fallback) will NOT be exposed.",
|
||||||
"%s/%s (parquet at %s) — meta row was missing",
|
prior_owner, table_id, source_name, table_id,
|
||||||
source_name, table_id, parquet_path,
|
)
|
||||||
)
|
continue
|
||||||
except Exception as e:
|
if claimed_pairs is not None:
|
||||||
logger.error(
|
claimed_pairs.append((source_name, table_id))
|
||||||
"filesystem-fallback master view failed for %s/%s: %s",
|
try:
|
||||||
source_name, table_id, e,
|
safe_path = str(parquet_path).replace("'", "''")
|
||||||
)
|
conn.execute(
|
||||||
|
f"CREATE OR REPLACE VIEW \"{table_id}\" AS "
|
||||||
|
f"SELECT * FROM read_parquet('{safe_path}')"
|
||||||
|
)
|
||||||
|
tables.append(table_id)
|
||||||
|
logger.info(
|
||||||
|
"filesystem-fallback master view created: "
|
||||||
|
"%s/%s (parquet at %s) — meta row was missing",
|
||||||
|
source_name, table_id, parquet_path,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"filesystem-fallback master view failed for %s/%s: %s",
|
||||||
|
source_name, table_id, e,
|
||||||
|
)
|
||||||
|
|
||||||
# Update sync_state in system DB
|
# Update sync_state in system DB
|
||||||
self._update_sync_state(meta_rows, source_name)
|
self._update_sync_state(meta_rows, source_name)
|
||||||
|
|
|
||||||
|
|
@ -702,6 +702,33 @@ def _write_minimal_parquet(path: Path, n_rows: int = 3) -> None:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_registry_materialized_row(table_id: str, source_type: str = "bigquery") -> None:
|
||||||
|
"""Insert a `query_mode='materialized'` row into table_registry so
|
||||||
|
the filesystem-fallback scan recognises the parquet as live (not
|
||||||
|
orphan from a deleted registry row).
|
||||||
|
|
||||||
|
Uses the same `get_system_db` + `TableRegistryRepository` path the
|
||||||
|
orchestrator's fallback uses internally."""
|
||||||
|
from src.db import get_system_db
|
||||||
|
from src.repositories.table_registry import TableRegistryRepository
|
||||||
|
conn = get_system_db()
|
||||||
|
try:
|
||||||
|
TableRegistryRepository(conn).register(
|
||||||
|
id=table_id,
|
||||||
|
name=table_id,
|
||||||
|
source_type=source_type,
|
||||||
|
bucket="bkt",
|
||||||
|
source_table=table_id,
|
||||||
|
source_query=f"SELECT 1 AS id",
|
||||||
|
query_mode="materialized",
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class TestFilesystemFallbackMasterViews:
|
class TestFilesystemFallbackMasterViews:
|
||||||
"""If a parquet exists on disk but never made it into _meta (the
|
"""If a parquet exists on disk but never made it into _meta (the
|
||||||
open-handle race that bit 0.40.0), the orchestrator must still
|
open-handle race that bit 0.40.0), the orchestrator must still
|
||||||
|
|
@ -724,6 +751,9 @@ class TestFilesystemFallbackMasterViews:
|
||||||
# registration hit lock conflict" scenario.
|
# registration hit lock conflict" scenario.
|
||||||
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
|
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
|
||||||
_write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=42)
|
_write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=42)
|
||||||
|
# Seed the matching materialized registry row — orphan parquets
|
||||||
|
# without a registry row are deliberately skipped.
|
||||||
|
_seed_registry_materialized_row("order_economics")
|
||||||
|
|
||||||
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
||||||
result = orch.rebuild()
|
result = orch.rebuild()
|
||||||
|
|
@ -766,6 +796,7 @@ class TestFilesystemFallbackMasterViews:
|
||||||
# Also drop a parquet of the same name on disk.
|
# Also drop a parquet of the same name on disk.
|
||||||
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
|
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
|
||||||
_write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=99)
|
_write_minimal_parquet(data_dir / "order_economics.parquet", n_rows=99)
|
||||||
|
_seed_registry_materialized_row("order_economics")
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
||||||
|
|
@ -806,6 +837,15 @@ class TestFilesystemFallbackMasterViews:
|
||||||
# Identifier validator rejects names starting with a digit.
|
# Identifier validator rejects names starting with a digit.
|
||||||
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
|
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
|
||||||
_write_minimal_parquet(data_dir / "9bad_name.parquet")
|
_write_minimal_parquet(data_dir / "9bad_name.parquet")
|
||||||
|
# Even though the registry would let it through, the identifier
|
||||||
|
# validator should still reject. (Seed registry to isolate the
|
||||||
|
# validator path from the orphan-skip path.)
|
||||||
|
try:
|
||||||
|
_seed_registry_materialized_row("9bad_name")
|
||||||
|
except Exception:
|
||||||
|
# Registry row insert may itself reject the bad id; that's
|
||||||
|
# fine — the orchestrator scan never sees it then either.
|
||||||
|
pass
|
||||||
|
|
||||||
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
||||||
result = orch.rebuild()
|
result = orch.rebuild()
|
||||||
|
|
@ -824,6 +864,39 @@ class TestFilesystemFallbackMasterViews:
|
||||||
finally:
|
finally:
|
||||||
master.close()
|
master.close()
|
||||||
|
|
||||||
|
def test_filesystem_fallback_skips_orphan_parquet(self, setup_env):
|
||||||
|
"""Orphan parquets — files left on disk after `DELETE
|
||||||
|
/api/admin/registry/{id}` either crashed mid-cleanup or the
|
||||||
|
operator dropped the row but the file lingers — must NOT get a
|
||||||
|
master view. Otherwise the deleted table would resurrect on next
|
||||||
|
rebuild, defeating the unregister contract.
|
||||||
|
|
||||||
|
Existing test `test_orchestrator_skips_orphan_parquet_in_extracts`
|
||||||
|
in `tests/test_admin_unregister_cleanup.py` pins this rule for the
|
||||||
|
wider unregister flow; this test pins it specifically for the
|
||||||
|
new filesystem-fallback path."""
|
||||||
|
from src.orchestrator import SyncOrchestrator
|
||||||
|
|
||||||
|
_create_mock_extract(
|
||||||
|
setup_env["extracts_dir"],
|
||||||
|
"bigquery",
|
||||||
|
tables=[
|
||||||
|
{"name": "remote_one", "data": [{"x": "1"}], "query_mode": "remote"},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
# Orphan parquet — NO registry row.
|
||||||
|
data_dir = setup_env["extracts_dir"] / "bigquery" / "data"
|
||||||
|
_write_minimal_parquet(data_dir / "deleted_table.parquet")
|
||||||
|
# NOT calling _seed_registry_materialized_row here — that's the
|
||||||
|
# whole point: registry row is missing.
|
||||||
|
|
||||||
|
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
||||||
|
result = orch.rebuild()
|
||||||
|
bq_tables = result.get("bigquery", [])
|
||||||
|
assert "deleted_table" not in bq_tables, (
|
||||||
|
f"orphan parquet must NOT resurrect as a master view; got {bq_tables}"
|
||||||
|
)
|
||||||
|
|
||||||
def test_filesystem_fallback_no_data_dir_is_safe(self, setup_env):
|
def test_filesystem_fallback_no_data_dir_is_safe(self, setup_env):
|
||||||
"""Sources without a `<extract_dir>/data/` directory (e.g. the
|
"""Sources without a `<extract_dir>/data/` directory (e.g. the
|
||||||
BigQuery extractor in remote-only mode pre-#160) must not crash
|
BigQuery extractor in remote-only mode pre-#160) must not crash
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue