fix(admin+orchestrator): DELETE registry drops parquet + sync_state; rebuild skips orphan parquets

E2E sub-agent finding: register a materialized BQ row → sync to materialize
the parquet at `/data/extracts/bigquery/data/<id>.parquet` → DELETE the
registry row. The DB row goes away but:

- the parquet file stays on disk forever, AND
- the sync_state row stays, so `/api/sync/manifest` keeps advertising the
  dropped table to `da sync`, AND
- the orchestrator's next rebuild can resurrect a master view by picking
  up the leftover parquet.

Two-part fix in `unregister_table`:

1. For materialized rows on bigquery/keboola, remove
   `${DATA_DIR}/extracts/<source_type>/data/<name>.parquet` (and any stale
   `<name>.parquet.tmp` from a crashed prior materialize). Filename is
   keyed on `table_registry.name` to match sync_state bookkeeping.
   File-removal errors are logged but don't fail the DELETE — the registry
   row is already gone, and an orphan parquet won't get a master view at
   next rebuild because the orchestrator's _meta-driven scan never picks
   up bare parquet files.

2. Always clear `sync_state` + `sync_history` rows for the dropped table_id
   so the manifest stops advertising the table — applies to all source
   types and modes, not just materialized, since any synced row had a
   sync_state entry.

Orchestrator-side defensive guard (Finding 2b) is a no-op in the current
implementation: `_attach_and_create_views` only creates master views from
`_meta` rows in each connector's `extract.duckdb`, so a parquet without a
matching `_meta` entry is already invisible to the rebuild. The new
test `test_orchestrator_skips_orphan_parquet_in_extracts` is kept as a
regression guard for that contract.

5 tests cover: BQ + Keboola materialized DELETE removes parquet, remote
DELETE doesn't error trying to remove a non-existent file, sync_state
cleared on DELETE, orchestrator orphan-skip invariant.
This commit is contained in:
ZdenekSrotyr 2026-05-01 22:54:11 +02:00
parent f0979f997a
commit dd46461c6c
3 changed files with 327 additions and 0 deletions

View file

@ -20,6 +20,14 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
either parse-errored or matched zero rows and no parquet ever landed at either parse-errored or matched zero rows and no parquet ever landed at
`/data/extracts/<source>/data/<id>.parquet`. Fix catches the bad SQL at `/data/extracts/<source>/data/<id>.parquet`. Fix catches the bad SQL at
registration time so the row never lands in the registry. registration time so the row never lands in the registry.
- **admin API**: `DELETE /api/admin/registry/{id}` now removes the canonical
materialized parquet (`${DATA_DIR}/extracts/<source_type>/data/<name>.parquet`
plus any stale `.parquet.tmp`) AND clears the matching `sync_state` /
`sync_history` rows. Pre-fix the registry row was dropped but the parquet
+ sync_state row stayed, so `GET /api/sync/manifest` kept advertising the
dropped table to `da sync` and analysts kept downloading it. Defensive
failure handling — file-removal errors are logged but don't fail the
DELETE.
### Added ### Added
- **admin API**: `GET /api/admin/registry` enriches each table row with - **admin API**: `GET /api/admin/registry` enriches each table row with

View file

@ -2273,6 +2273,14 @@ async def unregister_table(
For BQ rows, schedules a background rebuild so the dropped row's For BQ rows, schedules a background rebuild so the dropped row's
master view is removed from analytics.duckdb (rather than hanging master view is removed from analytics.duckdb (rather than hanging
around until the next scheduled sync). around until the next scheduled sync).
For materialized rows, also removes the canonical parquet at
`${DATA_DIR}/extracts/<source_type>/data/<id>.parquet` and clears
the matching `sync_state` row. Without these two cleanups, the
manifest endpoint kept advertising the dropped table to `da sync`
(sync_state-driven) and the orchestrator's next rebuild could
resurrect a master view from the leftover parquet (E2E sub-agent
finding 2026-05-01).
""" """
repo = TableRegistryRepository(conn) repo = TableRegistryRepository(conn)
existing = repo.get(table_id) existing = repo.get(table_id)
@ -2280,8 +2288,59 @@ async def unregister_table(
raise HTTPException(status_code=404, detail="Table not found") raise HTTPException(status_code=404, detail="Table not found")
was_bigquery = existing.get("source_type") == "bigquery" was_bigquery = existing.get("source_type") == "bigquery"
was_materialized = existing.get("query_mode") == "materialized"
source_type = existing.get("source_type") or ""
name = existing.get("name") or table_id
repo.unregister(table_id) repo.unregister(table_id)
# Drop the canonical parquet for materialized rows. Path layout:
# `${DATA_DIR}/extracts/<source_type>/data/<name>.parquet` — the
# filename is keyed by `table_registry.name` (matches sync_state
# bookkeeping convention; see _run_materialized_pass + the manifest
# builder for the same name-keyed lookup). Defensively remove the
# `.parquet.tmp` sibling too in case a prior materialize crashed
# mid-COPY. Failure to remove (file missing, permission error) is
# logged but doesn't fail the DELETE — the registry row is already
# gone, and the orphan parquet will not produce a master view at
# next rebuild because the orchestrator's _meta-driven scan never
# picks up bare parquet files.
if was_materialized and source_type in ("bigquery", "keboola"):
try:
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
base = data_dir / "extracts" / source_type / "data"
for candidate in (
base / f"{name}.parquet",
base / f"{name}.parquet.tmp",
):
if candidate.exists():
candidate.unlink()
logger.info(
"Removed materialized parquet for unregistered table %s: %s",
table_id, candidate,
)
except Exception as e:
logger.warning(
"Failed to remove materialized parquet for %s: %s — registry row is "
"still dropped; clean up the file manually if it lingers",
table_id, e,
)
# Clear sync_state for any source/mode (a row that was synced at any
# point — local/materialized — has a sync_state entry that the manifest
# serves regardless of registry state). Pre-fix, the manifest still
# advertised the dropped table to `da sync` because sync_state was
# never cleaned up, and analysts kept getting it through the manifest.
try:
conn.execute("DELETE FROM sync_state WHERE table_id = ?", [name])
conn.execute("DELETE FROM sync_history WHERE table_id = ?", [name])
except Exception as e:
logger.warning(
"Failed to clear sync_state for unregistered table %s: %s"
"manifest may still advertise the dropped row to da sync",
table_id, e,
)
AuditRepository(conn).log( AuditRepository(conn).log(
user_id=user.get("id"), user_id=user.get("id"),
action="unregister_table", action="unregister_table",

View file

@ -0,0 +1,260 @@
"""DELETE /api/admin/registry/{id} for materialized rows must remove the
materialized parquet file too otherwise sync_state still has the row,
the manifest still serves it, and `da sync` keeps trying to download
data for a table that no longer has a registry entry. The orchestrator's
rebuild path additionally skips parquets that lack a matching
table_registry row, so a transient race (or operator-deleted parquet)
can't resurrect a master view for a dropped table.
E2E sub-agent finding 2026-05-01: registering a materialized BQ row,
syncing, then DELETEing the registry row left the parquet at
`/data/extracts/bigquery/data/<id>.parquet` and a master view in
`analytics.duckdb` `/api/sync/manifest` and the master view both still
exposed the table.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock
import duckdb
import pytest
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def bq_instance(monkeypatch):
fake_cfg = {
"data_source": {
"type": "bigquery",
"bigquery": {"project": "my-test-project", "location": "us"},
},
}
monkeypatch.setattr(
"app.instance_config.load_instance_config", lambda: fake_cfg, raising=False,
)
from app.instance_config import reset_cache
reset_cache()
yield fake_cfg
reset_cache()
@pytest.fixture
def stub_bq_extractor(monkeypatch):
"""Bypass post-register rebuild's BQ traffic so the test stays offline."""
rebuild_mock = MagicMock(return_value={
"project_id": "my-test-project",
"tables_registered": 1, "errors": [], "skipped": False,
})
monkeypatch.setattr(
"connectors.bigquery.extractor.rebuild_from_registry",
rebuild_mock,
)
monkeypatch.setattr(
"src.orchestrator.SyncOrchestrator",
lambda *a, **kw: MagicMock(),
)
return rebuild_mock
def test_delete_materialized_bq_row_removes_parquet(
seeded_app, bq_instance, stub_bq_extractor,
):
"""DELETE on a materialized BQ registry row removes the canonical parquet
file at /data/extracts/bigquery/data/<id>.parquet so the orchestrator's
next rebuild can't resurrect a master view for the dropped row."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
data_dir = seeded_app["env"]["data_dir"]
# Seed a materialized row + drop a fake parquet at the canonical path.
r = c.post(
"/api/admin/register-table",
json={
"name": "drop_me",
"source_type": "bigquery",
"query_mode": "materialized",
"source_query": 'SELECT 1 FROM bq."ds"."t"',
},
headers=_auth(token),
)
assert r.status_code == 201, r.json()
table_id = r.json()["id"]
parquet_path = data_dir / "extracts" / "bigquery" / "data" / f"{table_id}.parquet"
parquet_path.parent.mkdir(parents=True, exist_ok=True)
parquet_path.write_bytes(b"PAR1\x00fake-parquet-content")
# Also drop a stale .tmp to verify defensive cleanup.
tmp_path = parquet_path.parent / f"{table_id}.parquet.tmp"
tmp_path.write_bytes(b"PAR1\x00partial")
assert parquet_path.exists()
assert tmp_path.exists()
r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token))
assert r2.status_code == 204
assert not parquet_path.exists(), "DELETE should remove the materialized parquet"
assert not tmp_path.exists(), "DELETE should also clean up stale .tmp file"
def test_delete_materialized_keboola_row_removes_parquet(seeded_app):
"""Same contract for Keboola materialized rows — the canonical path is
/data/extracts/keboola/data/<id>.parquet."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
data_dir = seeded_app["env"]["data_dir"]
r = c.post(
"/api/admin/register-table",
json={
"name": "kbc_drop_me",
"source_type": "keboola",
"query_mode": "materialized",
"source_query": "SELECT * FROM kbc.\"in.c-bucket\".\"events\"",
},
headers=_auth(token),
)
assert r.status_code == 201, r.json()
table_id = r.json()["id"]
parquet_path = data_dir / "extracts" / "keboola" / "data" / f"{table_id}.parquet"
parquet_path.parent.mkdir(parents=True, exist_ok=True)
parquet_path.write_bytes(b"PAR1\x00fake")
assert parquet_path.exists()
r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token))
assert r2.status_code == 204
assert not parquet_path.exists()
def test_delete_remote_bq_row_does_not_touch_data_dir(
seeded_app, bq_instance, stub_bq_extractor,
):
"""DELETE on a remote-mode row (no materialized parquet exists) must not
fail and must not error out trying to delete a non-existent file."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/admin/register-table",
json={
"name": "remote_drop",
"source_type": "bigquery",
"bucket": "analytics",
"source_table": "orders",
"query_mode": "remote",
},
headers=_auth(token),
)
assert r.status_code in (200, 202), r.json()
table_id = r.json()["id"]
r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token))
assert r2.status_code == 204
def test_delete_clears_sync_state_for_materialized_row(seeded_app):
"""DELETE must also clear the sync_state row so the manifest stops
advertising the dropped table to `da sync`."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/admin/register-table",
json={
"name": "manifest_drop",
"source_type": "keboola",
"query_mode": "materialized",
"source_query": "SELECT 1",
},
headers=_auth(token),
)
assert r.status_code == 201, r.json()
table_id = r.json()["id"]
# Seed a sync_state row as if it had been materialized.
from src.db import get_system_db
from src.repositories.sync_state import SyncStateRepository
sys_conn = get_system_db()
try:
SyncStateRepository(sys_conn).update_sync(
table_id="manifest_drop", # = registry.name
rows=10, file_size_bytes=1024, hash="abc",
)
assert SyncStateRepository(sys_conn).get_table_state("manifest_drop") is not None
finally:
sys_conn.close()
r2 = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(token))
assert r2.status_code == 204
sys_conn = get_system_db()
try:
st = SyncStateRepository(sys_conn).get_table_state("manifest_drop")
finally:
sys_conn.close()
assert st is None, "sync_state row should be removed by DELETE"
# --- Orchestrator skips orphan parquets without matching registry rows -------
def test_orchestrator_skips_orphan_parquet_in_extracts(e2e_env, monkeypatch):
"""A parquet at /data/extracts/<source>/data/<id>.parquet whose stem
has no matching `table_registry.name` row must NOT have a master view
created at rebuild time. Defensive the DELETE handler removes the
parquet at unregister time, but a transient race (or manual cleanup
in flight) shouldn't leave the orchestrator exposing a dropped table.
"""
from src.db import get_system_db
from src.orchestrator import SyncOrchestrator
from src.repositories.table_registry import TableRegistryRepository
from tests.conftest import create_mock_extract
extracts_dir = e2e_env["extracts_dir"]
# Build a normal extract.duckdb with a registered table.
create_mock_extract(extracts_dir, "bigquery", [
{"name": "valid_table", "data": [{"id": "1"}], "query_mode": "local"},
])
# Drop an orphan parquet at the connector's data dir without registering it.
orphan_path = extracts_dir / "bigquery" / "data" / "orphan_test.parquet"
conn0 = duckdb.connect()
conn0.execute(
f"COPY (SELECT 1 AS id) TO '{orphan_path}' (FORMAT PARQUET)"
)
conn0.close()
assert orphan_path.exists()
# Register only the legitimate row in table_registry.
sys_conn = get_system_db()
try:
TableRegistryRepository(sys_conn).register(
id="valid_table", name="valid_table",
source_type="bigquery", query_mode="local",
)
finally:
sys_conn.close()
orch = SyncOrchestrator(analytics_db_path=e2e_env["analytics_db"])
orch.rebuild()
# The orphan parquet must NOT be exposed as a master view.
analytics = duckdb.connect(e2e_env["analytics_db"], read_only=True)
try:
views = {
r[0] for r in analytics.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_type='VIEW'"
).fetchall()
}
finally:
analytics.close()
assert "valid_table" in views, views
assert "orphan_test" not in views, (
"orphan parquet without a registry row should not get a master view"
)