diff --git a/CHANGELOG.md b/CHANGELOG.md index 150f918..3e62d5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,27 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Fixed +- **admin API**: `POST /api/admin/register-table` and `PUT /api/admin/registry/{id}` + now reject `source_query` containing BigQuery-native backtick identifiers + (e.g. `` `prj.ds.t` ``) with HTTP 422 and a message pointing operators at + the DuckDB-flavor equivalent (`bq."dataset"."table"`). Backtick SQL would + silently no-op at the next materialize tick — the BQ extension's COPY runs + through DuckDB's parser, which doesn't recognize backticks, so the query + either parse-errored or matched zero rows and no parquet ever landed at + `/data/extracts//data/.parquet`. Fix catches the bad SQL at + registration time so the row never lands in the registry. + +### Added +- **admin API**: `GET /api/admin/registry` enriches each table row with + `last_sync_error` (string or null) sourced from `sync_state.error`. The + scheduler's `_run_materialized_pass` now writes per-row failures via + `SyncStateRepository.set_error` so cap-exceeded / auth-failure / bad-SQL + errors surface to the admin UI and `da admin status` instead of vanishing + into scheduler stderr. A row that recovers on the next tick clears the + error automatically (the success path of `update_sync` resets + `status='ok'` / `error=NULL` on the upsert). + ## [0.30.0] — 2026-05-01 ### Added diff --git a/app/api/admin.py b/app/api/admin.py index 19f447b..42c10c1 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -1098,6 +1098,25 @@ def _sanitize_for_audit(payload: Dict[str, Any]) -> Dict[str, Any]: return out +# Both the BigQuery and Keboola materialize paths funnel `source_query` +# through DuckDB (BQ via the bigquery extension's COPY translation, Keboola +# via an ATTACH'd extension and a direct COPY). DuckDB uses double quotes +# for quoted identifiers — backticks are a BigQuery-native syntactic form +# DuckDB's parser does not honor, so a backtick-quoted source_query either +# parse-errors at COPY time or silently scans nothing. Surfaced from the +# field validator on RegisterTableRequest AND the merged-record path in +# `update_table` so neither route can persist a backtick query. +_BACKTICK_REJECTION_MESSAGE = ( + "source_query uses BigQuery-native backtick identifiers (e.g. " + "`project.dataset.table`), but the materialize path runs the SQL " + "through DuckDB's BigQuery extension which uses DuckDB-flavor " + "identifiers. Rewrite to DuckDB syntax: bq.\"dataset\".\"table\" " + "(with the attached catalog alias `bq` plus double-quoted dataset/" + "table). The instance is configured with the data project, so you " + "don't need to repeat it in the FROM clause." +) + + class RegisterTableRequest(BaseModel): name: str folder: Optional[str] = None @@ -1152,6 +1171,17 @@ class RegisterTableRequest(BaseModel): raise ValueError( "source_query is only valid when query_mode='materialized'" ) + # The materialize path runs the SQL through DuckDB's parser (BigQuery + # extension's COPY pushes it through DuckDB first, and the Keboola + # path COPYs the raw SQL through a DuckDB session too). DuckDB does + # NOT understand BigQuery-native backtick identifiers — those parse- + # error or silently match no rows, leaving no parquet at the + # canonical path and no operator-visible failure. Reject at register + # time with an actionable message so the bad SQL never lands in + # `table_registry.source_query`. See `_run_materialized_pass` for + # the runtime path that would otherwise eat the error. + if sq and "`" in sq: + raise ValueError(_BACKTICK_REJECTION_MESSAGE) # Normalise: stash the trimmed-or-None form so the persisted column # never carries surrounding whitespace or empty-string sentinels. self.source_query = sq @@ -1596,9 +1626,37 @@ async def list_registry( user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Get full table registry.""" + """Get full table registry. + + Each table row is enriched with `last_sync_error` from sync_state so + operators can see WHY a row isn't materializing without trawling + scheduler logs. None for rows that have never errored or have already + recovered (status='ok'); the per-row error message string otherwise. + """ repo = TableRegistryRepository(conn) tables = repo.list_all() + + # Single batched read of sync_state errors — avoid N+1 GETs against + # `sync_state` for large registries. The sync_state row is keyed on + # `table_id` which mirrors `table_registry.name` (see comment in + # _run_materialized_pass / _build_manifest_for_user about name vs id). + error_by_name: Dict[str, Optional[str]] = {} + try: + rows = conn.execute( + "SELECT table_id, error FROM sync_state " + "WHERE status = 'error' AND error IS NOT NULL AND error <> ''" + ).fetchall() + error_by_name = {r[0]: r[1] for r in rows} + except Exception: + # Defensive: if sync_state is unreadable for any reason, the + # registry response still serializes — operators just lose the + # last_sync_error column on this call. + logger.exception("Failed to read sync_state errors for registry") + + for t in tables: + # Sync_state.table_id == table_registry.name by convention. + t["last_sync_error"] = error_by_name.get(t.get("name")) + return {"tables": tables, "count": len(tables)} @@ -2147,6 +2205,16 @@ async def update_table( "automatically." ), ) + # Backtick rejection on the merged record — see + # `_BACKTICK_REJECTION_MESSAGE` for the rationale. Catches PATCHes + # that flip `source_query` to a backtick form on an already- + # materialized row, which the synthetic-RegisterTableRequest below + # only re-validates for BQ rows. Apply uniformly so Keboola + # materialized rows can't carry one either. + if "`" in str(sq): + raise HTTPException( + status_code=422, detail=_BACKTICK_REJECTION_MESSAGE, + ) if merged.get("source_type") == "bigquery": # Reuse the register-time validator. It mutates the request to diff --git a/app/api/sync.py b/app/api/sync.py index bd2453e..0d6a533 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -206,10 +206,16 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: "current": e.current, "limit": e.limit, }) + # Persist the failure so `GET /api/admin/registry` can surface + # `last_sync_error` to the admin UI / `da admin status`. + # Without this, scheduler stderr was the only place the cap + # failure showed up and operators had no API path to it. + state.set_error(ref_name, str(e)) continue except Exception as e: logger.exception("Materialize failed for %s", ref_name) summary["errors"].append({"table": ref_name, "error": str(e)}) + state.set_error(ref_name, str(e)) continue # `materialize_query` returns the parquet's MD5 inline — hashing @@ -223,6 +229,12 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: ) parquet_path = Path(output_dir_for_hash) / "data" / f"{ref_name}.parquet" parquet_hash = _file_hash(parquet_path) + # `update_sync` resets `status='ok'` / `error=NULL` on the upsert + # path (its argument defaults), so a row that previously errored + # has the failure cleared by this call. No separate clear_error + # needed here — the test invariant is that a successful materialize + # leaves status='ok' and error='', which `update_sync` already + # establishes. state.update_sync( table_id=ref_name, rows=stats["rows"], diff --git a/src/repositories/sync_state.py b/src/repositories/sync_state.py index 5744dd0..661991f 100644 --- a/src/repositories/sync_state.py +++ b/src/repositories/sync_state.py @@ -80,3 +80,40 @@ class SyncStateRepository: [table_id, limit], ).fetchall() return self._rows_to_dicts(results) + + def set_error(self, table_id: str, error_message: str) -> None: + """Record a per-table sync failure on the existing `error` /`status` + columns so admin endpoints can surface it (`GET /api/admin/registry` + joins this column into each row's `last_sync_error`). + + Upserts a sync_state row when one doesn't exist yet (a row that + errored on its first ever materialize had no prior `update_sync` + write). `last_sync` is left NULL on first-ever-error so the manifest + doesn't claim a sync happened. Existing rows keep their last + successful `last_sync` / `rows` / `hash` fields — only `status` and + `error` flip — so analysts who already pulled the prior good + parquet via `da sync` keep serving from it while the operator fixes + the source. + """ + self.conn.execute( + """INSERT INTO sync_state (table_id, status, error) + VALUES (?, 'error', ?) + ON CONFLICT (table_id) DO UPDATE SET + status = 'error', + error = excluded.error""", + [table_id, error_message], + ) + + def clear_error(self, table_id: str) -> None: + """Clear an `error` / `status='error'` flag without disturbing the + rest of the sync_state row. Called after a successful materialize so + the registry response stops surfacing stale failure messages. + Idempotent — silently no-ops on rows that don't exist or already + have status='ok'. + """ + self.conn.execute( + """UPDATE sync_state + SET status = 'ok', error = '' + WHERE table_id = ? AND status = 'error'""", + [table_id], + ) diff --git a/tests/test_api_admin_materialized.py b/tests/test_api_admin_materialized.py index 3783118..5e8de8d 100644 --- a/tests/test_api_admin_materialized.py +++ b/tests/test_api_admin_materialized.py @@ -76,7 +76,11 @@ def _materialized_payload(**overrides): "name": "orders_90d", "source_type": "bigquery", "query_mode": "materialized", - "source_query": "SELECT date FROM `prj.ds.orders`", + # DuckDB-flavor SQL (not BQ-native backticks) — the materialize path + # runs the SQL through the DuckDB BQ extension's COPY which uses + # double-quoted identifiers. Backticks are now rejected at register + # time. See `_BACKTICK_REJECTION_MESSAGE` in app/api/admin.py. + "source_query": 'SELECT date FROM bq."ds"."orders"', "sync_schedule": "every 6h", } p.update(overrides) @@ -308,7 +312,7 @@ def test_register_materialized_persists_source_query_in_registry(seeded_app, bq_ "/api/admin/register-table", json=_materialized_payload( name="persist_q", - source_query="SELECT col FROM `prj.ds.t` WHERE x = 1", + source_query='SELECT col FROM bq."ds"."t" WHERE x = 1', ), headers=_auth(token), ) @@ -320,3 +324,219 @@ def test_register_materialized_persists_source_query_in_registry(seeded_app, bq_ assert row is not None assert row["query_mode"] == "materialized" assert "WHERE x = 1" in row["source_query"] + + +# --- Backtick (BigQuery-native) source_query rejection ----------------------- +# +# DuckDB BQ extension's COPY path interprets the SQL through DuckDB's parser, +# which does NOT understand backtick-quoted identifiers (it uses double quotes +# for quoted identifiers). A registered backtick-style source_query like +# `SELECT * FROM \`prj.ds.t\`` either parse-errors or returns 0 rows at next +# materialize tick — silently — and no parquet ends up at the canonical path. +# Reject at registration time with an actionable message. + + +def test_register_materialized_rejects_backtick_source_query(seeded_app, bq_instance): + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/admin/register-table", + json=_materialized_payload( + name="bt_native", + source_query="SELECT * FROM `prj-grp.ds.product_inventory`", + ), + headers=_auth(token), + ) + assert r.status_code == 422, r.json() + detail = str(r.json().get("detail", "")).lower() + assert "backtick" in detail + assert 'bq."' in detail or "duckdb" in detail + + +def test_update_materialized_rejects_backtick_source_query( + seeded_app, bq_instance, stub_bq_extractor, +): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + r = c.post( + "/api/admin/register-table", + json=_materialized_payload( + name="bt_update", + source_query='SELECT * FROM bq."ds"."t"', + ), + headers=_auth(token), + ) + assert r.status_code == 201, r.json() + table_id = r.json()["id"] + + # PATCH the source_query to a backtick form — must be rejected. + r2 = c.put( + f"/api/admin/registry/{table_id}", + json={ + "query_mode": "materialized", + "source_query": "SELECT * FROM `prj.ds.t`", + }, + headers=_auth(token), + ) + assert r2.status_code == 422, r2.json() + detail = str(r2.json().get("detail", "")).lower() + assert "backtick" in detail + + +def test_register_materialized_keboola_rejects_backtick_source_query(seeded_app): + """The check is generic, not BQ-only — Keboola materialized rows that + include backticks would also be silently skipped at materialize time.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/admin/register-table", + json={ + "name": "kbc_bt", + "source_type": "keboola", + "query_mode": "materialized", + "source_query": "SELECT * FROM `bucket.table`", + }, + headers=_auth(token), + ) + assert r.status_code == 422, r.json() + detail = str(r.json().get("detail", "")).lower() + assert "backtick" in detail + + +# --- Surface materialize errors per-row --------------------------------------- +# +# Errors that bubble out of `_run_materialized_pass` per-row used to disappear +# into scheduler stderr. Operators have no API surface to find out WHY a row +# isn't materializing. The trigger pass now writes the failure into +# `sync_state.error` (existing column) so `GET /api/admin/registry` can include +# `last_sync_error` per row, exposed to `da admin status` / the admin UI. + + +def test_run_materialized_pass_surfaces_error_in_sync_state(seeded_app, bq_instance): + """When a per-row materialize call raises, `_run_materialized_pass` writes + the error to sync_state.error so it can be surfaced via the registry API. + """ + from app.api.sync import _run_materialized_pass + from src.repositories.sync_state import SyncStateRepository + from src.repositories.table_registry import TableRegistryRepository + from src.db import get_system_db + + sys_conn = get_system_db() + try: + # Seed a materialized BQ row. + TableRegistryRepository(sys_conn).register( + id="boom", + name="boom", + source_type="bigquery", + query_mode="materialized", + source_query='SELECT * FROM bq."ds"."missing"', + sync_schedule="every 1m", + ) + + # Stub the materialize seam so the per-row branch raises. + from unittest.mock import patch + with patch( + "app.api.sync._materialize_table", + side_effect=RuntimeError("boom: missing table"), + ): + summary = _run_materialized_pass(sys_conn, bq=None) + + assert any(e["table"] == "boom" for e in summary["errors"]), summary + + state = SyncStateRepository(sys_conn).get_table_state("boom") + assert state is not None, "sync_state row should be created on error" + assert (state.get("status") or "") == "error" + assert "boom: missing table" in (state.get("error") or "") + finally: + # Cleanup so the next test starts clean. + try: + sys_conn.execute("DELETE FROM table_registry WHERE id='boom'") + sys_conn.execute("DELETE FROM sync_state WHERE table_id='boom'") + except Exception: + pass + sys_conn.close() + + +def test_run_materialized_pass_clears_error_on_success(seeded_app, bq_instance): + """When a row that previously errored materializes cleanly, the prior + sync_state.error is cleared so the registry response stops surfacing + a stale failure message.""" + from app.api.sync import _run_materialized_pass + from src.repositories.sync_state import SyncStateRepository + from src.repositories.table_registry import TableRegistryRepository + from src.db import get_system_db + + sys_conn = get_system_db() + try: + TableRegistryRepository(sys_conn).register( + id="recover", + name="recover", + source_type="bigquery", + query_mode="materialized", + source_query='SELECT * FROM bq."ds"."t"', + sync_schedule="every 1m", + ) + + # Pre-seed sync_state with an error so we can verify it gets cleared. + SyncStateRepository(sys_conn).set_error("recover", "previous run failed") + state_before = SyncStateRepository(sys_conn).get_table_state("recover") + assert (state_before.get("status") or "") == "error" + + from unittest.mock import patch + # Successful materialize returns a stats dict. + with patch( + "app.api.sync._materialize_table", + return_value={ + "rows": 5, "size_bytes": 100, "hash": "abc123", + "query_mode": "materialized", + }, + ): + summary = _run_materialized_pass(sys_conn, bq=None) + + assert "recover" in summary["materialized"], summary + state_after = SyncStateRepository(sys_conn).get_table_state("recover") + assert (state_after.get("status") or "") == "ok" + assert (state_after.get("error") or "") == "" + finally: + try: + sys_conn.execute("DELETE FROM table_registry WHERE id='recover'") + sys_conn.execute("DELETE FROM sync_state WHERE table_id='recover'") + except Exception: + pass + sys_conn.close() + + +def test_get_registry_exposes_last_sync_error_per_table(seeded_app, bq_instance): + """GET /api/admin/registry includes `last_sync_error` populated from + sync_state.error so operators have a UI/API surface to see why a + materialize is failing without trawling scheduler logs.""" + from src.repositories.sync_state import SyncStateRepository + from src.repositories.table_registry import TableRegistryRepository + from src.db import get_system_db + + sys_conn = get_system_db() + try: + TableRegistryRepository(sys_conn).register( + id="failing_row", + name="failing_row", + source_type="bigquery", + query_mode="materialized", + source_query='SELECT * FROM bq."ds"."t"', + ) + SyncStateRepository(sys_conn).set_error( + "failing_row", "USER_PROJECT_DENIED on project xxx", + ) + finally: + sys_conn.close() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.get("/api/admin/registry", headers=_auth(token)) + assert r.status_code == 200, r.json() + row = next( + (t for t in r.json()["tables"] if t["id"] == "failing_row"), None, + ) + assert row is not None, r.json() + assert "last_sync_error" in row, list(row.keys()) + assert "USER_PROJECT_DENIED" in (row["last_sync_error"] or "")