From dd46461c6c9e37be618ee30172845b3350eb282e Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Fri, 1 May 2026 22:54:11 +0200 Subject: [PATCH] fix(admin+orchestrator): DELETE registry drops parquet + sync_state; rebuild skips orphan parquets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit E2E sub-agent finding: register a materialized BQ row → sync to materialize the parquet at `/data/extracts/bigquery/data/.parquet` → DELETE the registry row. The DB row goes away but: - the parquet file stays on disk forever, AND - the sync_state row stays, so `/api/sync/manifest` keeps advertising the dropped table to `da sync`, AND - the orchestrator's next rebuild can resurrect a master view by picking up the leftover parquet. Two-part fix in `unregister_table`: 1. For materialized rows on bigquery/keboola, remove `${DATA_DIR}/extracts//data/.parquet` (and any stale `.parquet.tmp` from a crashed prior materialize). Filename is keyed on `table_registry.name` to match sync_state bookkeeping. File-removal errors are logged but don't fail the DELETE — the registry row is already gone, and an orphan parquet won't get a master view at next rebuild because the orchestrator's _meta-driven scan never picks up bare parquet files. 2. Always clear `sync_state` + `sync_history` rows for the dropped table_id so the manifest stops advertising the table — applies to all source types and modes, not just materialized, since any synced row had a sync_state entry. Orchestrator-side defensive guard (Finding 2b) is a no-op in the current implementation: `_attach_and_create_views` only creates master views from `_meta` rows in each connector's `extract.duckdb`, so a parquet without a matching `_meta` entry is already invisible to the rebuild. The new test `test_orchestrator_skips_orphan_parquet_in_extracts` is kept as a regression guard for that contract. 5 tests cover: BQ + Keboola materialized DELETE removes parquet, remote DELETE doesn't error trying to remove a non-existent file, sync_state cleared on DELETE, orchestrator orphan-skip invariant. --- CHANGELOG.md | 8 + app/api/admin.py | 59 ++++++ tests/test_admin_unregister_cleanup.py | 260 +++++++++++++++++++++++++ 3 files changed, 327 insertions(+) create mode 100644 tests/test_admin_unregister_cleanup.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e62d5d..d4d2e2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,14 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C either parse-errored or matched zero rows and no parquet ever landed at `/data/extracts//data/.parquet`. Fix catches the bad SQL at registration time so the row never lands in the registry. +- **admin API**: `DELETE /api/admin/registry/{id}` now removes the canonical + materialized parquet (`${DATA_DIR}/extracts//data/.parquet` + plus any stale `.parquet.tmp`) AND clears the matching `sync_state` / + `sync_history` rows. Pre-fix the registry row was dropped but the parquet + + sync_state row stayed, so `GET /api/sync/manifest` kept advertising the + dropped table to `da sync` and analysts kept downloading it. Defensive + failure handling — file-removal errors are logged but don't fail the + DELETE. ### Added - **admin API**: `GET /api/admin/registry` enriches each table row with diff --git a/app/api/admin.py b/app/api/admin.py index 42c10c1..537ff8f 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -2273,6 +2273,14 @@ async def unregister_table( For BQ rows, schedules a background rebuild so the dropped row's master view is removed from analytics.duckdb (rather than hanging around until the next scheduled sync). + + For materialized rows, also removes the canonical parquet at + `${DATA_DIR}/extracts//data/.parquet` and clears + the matching `sync_state` row. Without these two cleanups, the + manifest endpoint kept advertising the dropped table to `da sync` + (sync_state-driven) and the orchestrator's next rebuild could + resurrect a master view from the leftover parquet (E2E sub-agent + finding 2026-05-01). """ repo = TableRegistryRepository(conn) existing = repo.get(table_id) @@ -2280,8 +2288,59 @@ async def unregister_table( raise HTTPException(status_code=404, detail="Table not found") was_bigquery = existing.get("source_type") == "bigquery" + was_materialized = existing.get("query_mode") == "materialized" + source_type = existing.get("source_type") or "" + name = existing.get("name") or table_id + repo.unregister(table_id) + # Drop the canonical parquet for materialized rows. Path layout: + # `${DATA_DIR}/extracts//data/.parquet` — the + # filename is keyed by `table_registry.name` (matches sync_state + # bookkeeping convention; see _run_materialized_pass + the manifest + # builder for the same name-keyed lookup). Defensively remove the + # `.parquet.tmp` sibling too in case a prior materialize crashed + # mid-COPY. Failure to remove (file missing, permission error) is + # logged but doesn't fail the DELETE — the registry row is already + # gone, and the orphan parquet will not produce a master view at + # next rebuild because the orchestrator's _meta-driven scan never + # picks up bare parquet files. + if was_materialized and source_type in ("bigquery", "keboola"): + try: + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + base = data_dir / "extracts" / source_type / "data" + for candidate in ( + base / f"{name}.parquet", + base / f"{name}.parquet.tmp", + ): + if candidate.exists(): + candidate.unlink() + logger.info( + "Removed materialized parquet for unregistered table %s: %s", + table_id, candidate, + ) + except Exception as e: + logger.warning( + "Failed to remove materialized parquet for %s: %s — registry row is " + "still dropped; clean up the file manually if it lingers", + table_id, e, + ) + + # Clear sync_state for any source/mode (a row that was synced at any + # point — local/materialized — has a sync_state entry that the manifest + # serves regardless of registry state). Pre-fix, the manifest still + # advertised the dropped table to `da sync` because sync_state was + # never cleaned up, and analysts kept getting it through the manifest. + try: + conn.execute("DELETE FROM sync_state WHERE table_id = ?", [name]) + conn.execute("DELETE FROM sync_history WHERE table_id = ?", [name]) + except Exception as e: + logger.warning( + "Failed to clear sync_state for unregistered table %s: %s — " + "manifest may still advertise the dropped row to da sync", + table_id, e, + ) + AuditRepository(conn).log( user_id=user.get("id"), action="unregister_table", diff --git a/tests/test_admin_unregister_cleanup.py b/tests/test_admin_unregister_cleanup.py new file mode 100644 index 0000000..1d44a42 --- /dev/null +++ b/tests/test_admin_unregister_cleanup.py @@ -0,0 +1,260 @@ +"""DELETE /api/admin/registry/{id} for materialized rows must remove the +materialized parquet file too — otherwise sync_state still has the row, +the manifest still serves it, and `da sync` keeps trying to download +data for a table that no longer has a registry entry. The orchestrator's +rebuild path additionally skips parquets that lack a matching +table_registry row, so a transient race (or operator-deleted parquet) +can't resurrect a master view for a dropped table. + +E2E sub-agent finding 2026-05-01: registering a materialized BQ row, +syncing, then DELETEing the registry row left the parquet at +`/data/extracts/bigquery/data/.parquet` and a master view in +`analytics.duckdb` — `/api/sync/manifest` and the master view both still +exposed the table. +""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock + +import duckdb +import pytest + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def bq_instance(monkeypatch): + fake_cfg = { + "data_source": { + "type": "bigquery", + "bigquery": {"project": "my-test-project", "location": "us"}, + }, + } + monkeypatch.setattr( + "app.instance_config.load_instance_config", lambda: fake_cfg, raising=False, + ) + from app.instance_config import reset_cache + reset_cache() + yield fake_cfg + reset_cache() + + +@pytest.fixture +def stub_bq_extractor(monkeypatch): + """Bypass post-register rebuild's BQ traffic so the test stays offline.""" + rebuild_mock = MagicMock(return_value={ + "project_id": "my-test-project", + "tables_registered": 1, "errors": [], "skipped": False, + }) + monkeypatch.setattr( + "connectors.bigquery.extractor.rebuild_from_registry", + rebuild_mock, + ) + monkeypatch.setattr( + "src.orchestrator.SyncOrchestrator", + lambda *a, **kw: MagicMock(), + ) + return rebuild_mock + + +def test_delete_materialized_bq_row_removes_parquet( + seeded_app, bq_instance, stub_bq_extractor, +): + """DELETE on a materialized BQ registry row removes the canonical parquet + file at /data/extracts/bigquery/data/.parquet so the orchestrator's + next rebuild can't resurrect a master view for the dropped row.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + data_dir = seeded_app["env"]["data_dir"] + + # Seed a materialized row + drop a fake parquet at the canonical path. + r = c.post( + "/api/admin/register-table", + json={ + "name": "drop_me", + "source_type": "bigquery", + "query_mode": "materialized", + "source_query": 'SELECT 1 FROM bq."ds"."t"', + }, + headers=_auth(token), + ) + assert r.status_code == 201, r.json() + table_id = r.json()["id"] + + parquet_path = data_dir / "extracts" / "bigquery" / "data" / f"{table_id}.parquet" + parquet_path.parent.mkdir(parents=True, exist_ok=True) + parquet_path.write_bytes(b"PAR1\x00fake-parquet-content") + # Also drop a stale .tmp to verify defensive cleanup. + tmp_path = parquet_path.parent / f"{table_id}.parquet.tmp" + tmp_path.write_bytes(b"PAR1\x00partial") + + assert parquet_path.exists() + assert tmp_path.exists() + + r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token)) + assert r2.status_code == 204 + + assert not parquet_path.exists(), "DELETE should remove the materialized parquet" + assert not tmp_path.exists(), "DELETE should also clean up stale .tmp file" + + +def test_delete_materialized_keboola_row_removes_parquet(seeded_app): + """Same contract for Keboola materialized rows — the canonical path is + /data/extracts/keboola/data/.parquet.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + data_dir = seeded_app["env"]["data_dir"] + + r = c.post( + "/api/admin/register-table", + json={ + "name": "kbc_drop_me", + "source_type": "keboola", + "query_mode": "materialized", + "source_query": "SELECT * FROM kbc.\"in.c-bucket\".\"events\"", + }, + headers=_auth(token), + ) + assert r.status_code == 201, r.json() + table_id = r.json()["id"] + + parquet_path = data_dir / "extracts" / "keboola" / "data" / f"{table_id}.parquet" + parquet_path.parent.mkdir(parents=True, exist_ok=True) + parquet_path.write_bytes(b"PAR1\x00fake") + assert parquet_path.exists() + + r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token)) + assert r2.status_code == 204 + assert not parquet_path.exists() + + +def test_delete_remote_bq_row_does_not_touch_data_dir( + seeded_app, bq_instance, stub_bq_extractor, +): + """DELETE on a remote-mode row (no materialized parquet exists) must not + fail and must not error out trying to delete a non-existent file.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + r = c.post( + "/api/admin/register-table", + json={ + "name": "remote_drop", + "source_type": "bigquery", + "bucket": "analytics", + "source_table": "orders", + "query_mode": "remote", + }, + headers=_auth(token), + ) + assert r.status_code in (200, 202), r.json() + table_id = r.json()["id"] + + r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token)) + assert r2.status_code == 204 + + +def test_delete_clears_sync_state_for_materialized_row(seeded_app): + """DELETE must also clear the sync_state row so the manifest stops + advertising the dropped table to `da sync`.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + r = c.post( + "/api/admin/register-table", + json={ + "name": "manifest_drop", + "source_type": "keboola", + "query_mode": "materialized", + "source_query": "SELECT 1", + }, + headers=_auth(token), + ) + assert r.status_code == 201, r.json() + table_id = r.json()["id"] + + # Seed a sync_state row as if it had been materialized. + from src.db import get_system_db + from src.repositories.sync_state import SyncStateRepository + sys_conn = get_system_db() + try: + SyncStateRepository(sys_conn).update_sync( + table_id="manifest_drop", # = registry.name + rows=10, file_size_bytes=1024, hash="abc", + ) + assert SyncStateRepository(sys_conn).get_table_state("manifest_drop") is not None + finally: + sys_conn.close() + + r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token)) + assert r2.status_code == 204 + + sys_conn = get_system_db() + try: + st = SyncStateRepository(sys_conn).get_table_state("manifest_drop") + finally: + sys_conn.close() + assert st is None, "sync_state row should be removed by DELETE" + + +# --- Orchestrator skips orphan parquets without matching registry rows ------- + + +def test_orchestrator_skips_orphan_parquet_in_extracts(e2e_env, monkeypatch): + """A parquet at /data/extracts//data/.parquet whose stem + has no matching `table_registry.name` row must NOT have a master view + created at rebuild time. Defensive — the DELETE handler removes the + parquet at unregister time, but a transient race (or manual cleanup + in flight) shouldn't leave the orchestrator exposing a dropped table. + """ + from src.db import get_system_db + from src.orchestrator import SyncOrchestrator + from src.repositories.table_registry import TableRegistryRepository + from tests.conftest import create_mock_extract + + extracts_dir = e2e_env["extracts_dir"] + + # Build a normal extract.duckdb with a registered table. + create_mock_extract(extracts_dir, "bigquery", [ + {"name": "valid_table", "data": [{"id": "1"}], "query_mode": "local"}, + ]) + # Drop an orphan parquet at the connector's data dir without registering it. + orphan_path = extracts_dir / "bigquery" / "data" / "orphan_test.parquet" + conn0 = duckdb.connect() + conn0.execute( + f"COPY (SELECT 1 AS id) TO '{orphan_path}' (FORMAT PARQUET)" + ) + conn0.close() + assert orphan_path.exists() + + # Register only the legitimate row in table_registry. + sys_conn = get_system_db() + try: + TableRegistryRepository(sys_conn).register( + id="valid_table", name="valid_table", + source_type="bigquery", query_mode="local", + ) + finally: + sys_conn.close() + + orch = SyncOrchestrator(analytics_db_path=e2e_env["analytics_db"]) + orch.rebuild() + + # The orphan parquet must NOT be exposed as a master view. + analytics = duckdb.connect(e2e_env["analytics_db"], read_only=True) + try: + views = { + r[0] for r in analytics.execute( + "SELECT table_name FROM information_schema.tables " + "WHERE table_type='VIEW'" + ).fetchall() + } + finally: + analytics.close() + assert "valid_table" in views, views + assert "orphan_test" not in views, ( + "orphan parquet without a registry row should not get a master view" + )