fix(admin-api): reject backtick BQ-native source_query at register; surface materialize errors per-row
E2E testing showed admin POSTs of materialized BQ rows whose source_query uses BigQuery-native backtick identifiers (`prj.ds.t`) silently no-op'd at the next sync tick — the materialize path runs the SQL through the DuckDB BQ extension's COPY which uses DuckDB's parser; backticks aren't recognized and the query either parse-errors or matches zero rows. No parquet lands at the canonical path and no error reaches an operator-visible surface. Two-part fix: 1. RegisterTableRequest's _check_mode_query_coherence model_validator now rejects any source_query containing a backtick with a 422 + actionable message pointing at the DuckDB equivalent (bq."dataset"."table"). Same check is applied in update_table on the merged record so PATCHes that flip a stored source_query to backtick form are also caught. Covers BQ AND Keboola materialized rows since both connectors funnel source_query through DuckDB's COPY. 2. _run_materialized_pass now persists per-row failures via the new SyncStateRepository.set_error / clear_error methods (existing sync_state.error / status columns — no schema migration). GET /api/admin/registry enriches each row with `last_sync_error` from a single batched SELECT against sync_state, so the admin UI / da admin status can show "this table failed last sync because: X" instead of operators having to trawl scheduler logs. Recovered rows have the error cleared automatically — update_sync's success path resets status='ok' / error=NULL on the upsert. The materialized-path test fixture's _materialized_payload helper is updated to use DuckDB-flavor SQL (the prior backtick example pre-dated the fix). 6 new tests cover register/update rejection on BQ + Keboola, the sync_state error persistence, and the registry response surface.
This commit is contained in:
parent
a4339ce679
commit
f0979f997a
5 changed files with 361 additions and 3 deletions
21
CHANGELOG.md
21
CHANGELOG.md
|
|
@ -10,6 +10,27 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Fixed
|
||||
- **admin API**: `POST /api/admin/register-table` and `PUT /api/admin/registry/{id}`
|
||||
now reject `source_query` containing BigQuery-native backtick identifiers
|
||||
(e.g. `` `prj.ds.t` ``) with HTTP 422 and a message pointing operators at
|
||||
the DuckDB-flavor equivalent (`bq."dataset"."table"`). Backtick SQL would
|
||||
silently no-op at the next materialize tick — the BQ extension's COPY runs
|
||||
through DuckDB's parser, which doesn't recognize backticks, so the query
|
||||
either parse-errored or matched zero rows and no parquet ever landed at
|
||||
`/data/extracts/<source>/data/<id>.parquet`. Fix catches the bad SQL at
|
||||
registration time so the row never lands in the registry.
|
||||
|
||||
### Added
|
||||
- **admin API**: `GET /api/admin/registry` enriches each table row with
|
||||
`last_sync_error` (string or null) sourced from `sync_state.error`. The
|
||||
scheduler's `_run_materialized_pass` now writes per-row failures via
|
||||
`SyncStateRepository.set_error` so cap-exceeded / auth-failure / bad-SQL
|
||||
errors surface to the admin UI and `da admin status` instead of vanishing
|
||||
into scheduler stderr. A row that recovers on the next tick clears the
|
||||
error automatically (the success path of `update_sync` resets
|
||||
`status='ok'` / `error=NULL` on the upsert).
|
||||
|
||||
## [0.30.0] — 2026-05-01
|
||||
|
||||
### Added
|
||||
|
|
|
|||
|
|
@ -1098,6 +1098,25 @@ def _sanitize_for_audit(payload: Dict[str, Any]) -> Dict[str, Any]:
|
|||
return out
|
||||
|
||||
|
||||
# Both the BigQuery and Keboola materialize paths funnel `source_query`
|
||||
# through DuckDB (BQ via the bigquery extension's COPY translation, Keboola
|
||||
# via an ATTACH'd extension and a direct COPY). DuckDB uses double quotes
|
||||
# for quoted identifiers — backticks are a BigQuery-native syntactic form
|
||||
# DuckDB's parser does not honor, so a backtick-quoted source_query either
|
||||
# parse-errors at COPY time or silently scans nothing. Surfaced from the
|
||||
# field validator on RegisterTableRequest AND the merged-record path in
|
||||
# `update_table` so neither route can persist a backtick query.
|
||||
_BACKTICK_REJECTION_MESSAGE = (
|
||||
"source_query uses BigQuery-native backtick identifiers (e.g. "
|
||||
"`project.dataset.table`), but the materialize path runs the SQL "
|
||||
"through DuckDB's BigQuery extension which uses DuckDB-flavor "
|
||||
"identifiers. Rewrite to DuckDB syntax: bq.\"dataset\".\"table\" "
|
||||
"(with the attached catalog alias `bq` plus double-quoted dataset/"
|
||||
"table). The instance is configured with the data project, so you "
|
||||
"don't need to repeat it in the FROM clause."
|
||||
)
|
||||
|
||||
|
||||
class RegisterTableRequest(BaseModel):
|
||||
name: str
|
||||
folder: Optional[str] = None
|
||||
|
|
@ -1152,6 +1171,17 @@ class RegisterTableRequest(BaseModel):
|
|||
raise ValueError(
|
||||
"source_query is only valid when query_mode='materialized'"
|
||||
)
|
||||
# The materialize path runs the SQL through DuckDB's parser (BigQuery
|
||||
# extension's COPY pushes it through DuckDB first, and the Keboola
|
||||
# path COPYs the raw SQL through a DuckDB session too). DuckDB does
|
||||
# NOT understand BigQuery-native backtick identifiers — those parse-
|
||||
# error or silently match no rows, leaving no parquet at the
|
||||
# canonical path and no operator-visible failure. Reject at register
|
||||
# time with an actionable message so the bad SQL never lands in
|
||||
# `table_registry.source_query`. See `_run_materialized_pass` for
|
||||
# the runtime path that would otherwise eat the error.
|
||||
if sq and "`" in sq:
|
||||
raise ValueError(_BACKTICK_REJECTION_MESSAGE)
|
||||
# Normalise: stash the trimmed-or-None form so the persisted column
|
||||
# never carries surrounding whitespace or empty-string sentinels.
|
||||
self.source_query = sq
|
||||
|
|
@ -1596,9 +1626,37 @@ async def list_registry(
|
|||
user: dict = Depends(require_admin),
|
||||
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
||||
):
|
||||
"""Get full table registry."""
|
||||
"""Get full table registry.
|
||||
|
||||
Each table row is enriched with `last_sync_error` from sync_state so
|
||||
operators can see WHY a row isn't materializing without trawling
|
||||
scheduler logs. None for rows that have never errored or have already
|
||||
recovered (status='ok'); the per-row error message string otherwise.
|
||||
"""
|
||||
repo = TableRegistryRepository(conn)
|
||||
tables = repo.list_all()
|
||||
|
||||
# Single batched read of sync_state errors — avoid N+1 GETs against
|
||||
# `sync_state` for large registries. The sync_state row is keyed on
|
||||
# `table_id` which mirrors `table_registry.name` (see comment in
|
||||
# _run_materialized_pass / _build_manifest_for_user about name vs id).
|
||||
error_by_name: Dict[str, Optional[str]] = {}
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT table_id, error FROM sync_state "
|
||||
"WHERE status = 'error' AND error IS NOT NULL AND error <> ''"
|
||||
).fetchall()
|
||||
error_by_name = {r[0]: r[1] for r in rows}
|
||||
except Exception:
|
||||
# Defensive: if sync_state is unreadable for any reason, the
|
||||
# registry response still serializes — operators just lose the
|
||||
# last_sync_error column on this call.
|
||||
logger.exception("Failed to read sync_state errors for registry")
|
||||
|
||||
for t in tables:
|
||||
# Sync_state.table_id == table_registry.name by convention.
|
||||
t["last_sync_error"] = error_by_name.get(t.get("name"))
|
||||
|
||||
return {"tables": tables, "count": len(tables)}
|
||||
|
||||
|
||||
|
|
@ -2147,6 +2205,16 @@ async def update_table(
|
|||
"automatically."
|
||||
),
|
||||
)
|
||||
# Backtick rejection on the merged record — see
|
||||
# `_BACKTICK_REJECTION_MESSAGE` for the rationale. Catches PATCHes
|
||||
# that flip `source_query` to a backtick form on an already-
|
||||
# materialized row, which the synthetic-RegisterTableRequest below
|
||||
# only re-validates for BQ rows. Apply uniformly so Keboola
|
||||
# materialized rows can't carry one either.
|
||||
if "`" in str(sq):
|
||||
raise HTTPException(
|
||||
status_code=422, detail=_BACKTICK_REJECTION_MESSAGE,
|
||||
)
|
||||
|
||||
if merged.get("source_type") == "bigquery":
|
||||
# Reuse the register-time validator. It mutates the request to
|
||||
|
|
|
|||
|
|
@ -206,10 +206,16 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict:
|
|||
"current": e.current,
|
||||
"limit": e.limit,
|
||||
})
|
||||
# Persist the failure so `GET /api/admin/registry` can surface
|
||||
# `last_sync_error` to the admin UI / `da admin status`.
|
||||
# Without this, scheduler stderr was the only place the cap
|
||||
# failure showed up and operators had no API path to it.
|
||||
state.set_error(ref_name, str(e))
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.exception("Materialize failed for %s", ref_name)
|
||||
summary["errors"].append({"table": ref_name, "error": str(e)})
|
||||
state.set_error(ref_name, str(e))
|
||||
continue
|
||||
|
||||
# `materialize_query` returns the parquet's MD5 inline — hashing
|
||||
|
|
@ -223,6 +229,12 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict:
|
|||
)
|
||||
parquet_path = Path(output_dir_for_hash) / "data" / f"{ref_name}.parquet"
|
||||
parquet_hash = _file_hash(parquet_path)
|
||||
# `update_sync` resets `status='ok'` / `error=NULL` on the upsert
|
||||
# path (its argument defaults), so a row that previously errored
|
||||
# has the failure cleared by this call. No separate clear_error
|
||||
# needed here — the test invariant is that a successful materialize
|
||||
# leaves status='ok' and error='', which `update_sync` already
|
||||
# establishes.
|
||||
state.update_sync(
|
||||
table_id=ref_name,
|
||||
rows=stats["rows"],
|
||||
|
|
|
|||
|
|
@ -80,3 +80,40 @@ class SyncStateRepository:
|
|||
[table_id, limit],
|
||||
).fetchall()
|
||||
return self._rows_to_dicts(results)
|
||||
|
||||
def set_error(self, table_id: str, error_message: str) -> None:
|
||||
"""Record a per-table sync failure on the existing `error` /`status`
|
||||
columns so admin endpoints can surface it (`GET /api/admin/registry`
|
||||
joins this column into each row's `last_sync_error`).
|
||||
|
||||
Upserts a sync_state row when one doesn't exist yet (a row that
|
||||
errored on its first ever materialize had no prior `update_sync`
|
||||
write). `last_sync` is left NULL on first-ever-error so the manifest
|
||||
doesn't claim a sync happened. Existing rows keep their last
|
||||
successful `last_sync` / `rows` / `hash` fields — only `status` and
|
||||
`error` flip — so analysts who already pulled the prior good
|
||||
parquet via `da sync` keep serving from it while the operator fixes
|
||||
the source.
|
||||
"""
|
||||
self.conn.execute(
|
||||
"""INSERT INTO sync_state (table_id, status, error)
|
||||
VALUES (?, 'error', ?)
|
||||
ON CONFLICT (table_id) DO UPDATE SET
|
||||
status = 'error',
|
||||
error = excluded.error""",
|
||||
[table_id, error_message],
|
||||
)
|
||||
|
||||
def clear_error(self, table_id: str) -> None:
|
||||
"""Clear an `error` / `status='error'` flag without disturbing the
|
||||
rest of the sync_state row. Called after a successful materialize so
|
||||
the registry response stops surfacing stale failure messages.
|
||||
Idempotent — silently no-ops on rows that don't exist or already
|
||||
have status='ok'.
|
||||
"""
|
||||
self.conn.execute(
|
||||
"""UPDATE sync_state
|
||||
SET status = 'ok', error = ''
|
||||
WHERE table_id = ? AND status = 'error'""",
|
||||
[table_id],
|
||||
)
|
||||
|
|
|
|||
|
|
@ -76,7 +76,11 @@ def _materialized_payload(**overrides):
|
|||
"name": "orders_90d",
|
||||
"source_type": "bigquery",
|
||||
"query_mode": "materialized",
|
||||
"source_query": "SELECT date FROM `prj.ds.orders`",
|
||||
# DuckDB-flavor SQL (not BQ-native backticks) — the materialize path
|
||||
# runs the SQL through the DuckDB BQ extension's COPY which uses
|
||||
# double-quoted identifiers. Backticks are now rejected at register
|
||||
# time. See `_BACKTICK_REJECTION_MESSAGE` in app/api/admin.py.
|
||||
"source_query": 'SELECT date FROM bq."ds"."orders"',
|
||||
"sync_schedule": "every 6h",
|
||||
}
|
||||
p.update(overrides)
|
||||
|
|
@ -308,7 +312,7 @@ def test_register_materialized_persists_source_query_in_registry(seeded_app, bq_
|
|||
"/api/admin/register-table",
|
||||
json=_materialized_payload(
|
||||
name="persist_q",
|
||||
source_query="SELECT col FROM `prj.ds.t` WHERE x = 1",
|
||||
source_query='SELECT col FROM bq."ds"."t" WHERE x = 1',
|
||||
),
|
||||
headers=_auth(token),
|
||||
)
|
||||
|
|
@ -320,3 +324,219 @@ def test_register_materialized_persists_source_query_in_registry(seeded_app, bq_
|
|||
assert row is not None
|
||||
assert row["query_mode"] == "materialized"
|
||||
assert "WHERE x = 1" in row["source_query"]
|
||||
|
||||
|
||||
# --- Backtick (BigQuery-native) source_query rejection -----------------------
|
||||
#
|
||||
# DuckDB BQ extension's COPY path interprets the SQL through DuckDB's parser,
|
||||
# which does NOT understand backtick-quoted identifiers (it uses double quotes
|
||||
# for quoted identifiers). A registered backtick-style source_query like
|
||||
# `SELECT * FROM \`prj.ds.t\`` either parse-errors or returns 0 rows at next
|
||||
# materialize tick — silently — and no parquet ends up at the canonical path.
|
||||
# Reject at registration time with an actionable message.
|
||||
|
||||
|
||||
def test_register_materialized_rejects_backtick_source_query(seeded_app, bq_instance):
|
||||
c = seeded_app["client"]
|
||||
token = seeded_app["admin_token"]
|
||||
r = c.post(
|
||||
"/api/admin/register-table",
|
||||
json=_materialized_payload(
|
||||
name="bt_native",
|
||||
source_query="SELECT * FROM `prj-grp.ds.product_inventory`",
|
||||
),
|
||||
headers=_auth(token),
|
||||
)
|
||||
assert r.status_code == 422, r.json()
|
||||
detail = str(r.json().get("detail", "")).lower()
|
||||
assert "backtick" in detail
|
||||
assert 'bq."' in detail or "duckdb" in detail
|
||||
|
||||
|
||||
def test_update_materialized_rejects_backtick_source_query(
|
||||
seeded_app, bq_instance, stub_bq_extractor,
|
||||
):
|
||||
c = seeded_app["client"]
|
||||
token = seeded_app["admin_token"]
|
||||
|
||||
r = c.post(
|
||||
"/api/admin/register-table",
|
||||
json=_materialized_payload(
|
||||
name="bt_update",
|
||||
source_query='SELECT * FROM bq."ds"."t"',
|
||||
),
|
||||
headers=_auth(token),
|
||||
)
|
||||
assert r.status_code == 201, r.json()
|
||||
table_id = r.json()["id"]
|
||||
|
||||
# PATCH the source_query to a backtick form — must be rejected.
|
||||
r2 = c.put(
|
||||
f"/api/admin/registry/{table_id}",
|
||||
json={
|
||||
"query_mode": "materialized",
|
||||
"source_query": "SELECT * FROM `prj.ds.t`",
|
||||
},
|
||||
headers=_auth(token),
|
||||
)
|
||||
assert r2.status_code == 422, r2.json()
|
||||
detail = str(r2.json().get("detail", "")).lower()
|
||||
assert "backtick" in detail
|
||||
|
||||
|
||||
def test_register_materialized_keboola_rejects_backtick_source_query(seeded_app):
|
||||
"""The check is generic, not BQ-only — Keboola materialized rows that
|
||||
include backticks would also be silently skipped at materialize time."""
|
||||
c = seeded_app["client"]
|
||||
token = seeded_app["admin_token"]
|
||||
r = c.post(
|
||||
"/api/admin/register-table",
|
||||
json={
|
||||
"name": "kbc_bt",
|
||||
"source_type": "keboola",
|
||||
"query_mode": "materialized",
|
||||
"source_query": "SELECT * FROM `bucket.table`",
|
||||
},
|
||||
headers=_auth(token),
|
||||
)
|
||||
assert r.status_code == 422, r.json()
|
||||
detail = str(r.json().get("detail", "")).lower()
|
||||
assert "backtick" in detail
|
||||
|
||||
|
||||
# --- Surface materialize errors per-row ---------------------------------------
|
||||
#
|
||||
# Errors that bubble out of `_run_materialized_pass` per-row used to disappear
|
||||
# into scheduler stderr. Operators have no API surface to find out WHY a row
|
||||
# isn't materializing. The trigger pass now writes the failure into
|
||||
# `sync_state.error` (existing column) so `GET /api/admin/registry` can include
|
||||
# `last_sync_error` per row, exposed to `da admin status` / the admin UI.
|
||||
|
||||
|
||||
def test_run_materialized_pass_surfaces_error_in_sync_state(seeded_app, bq_instance):
|
||||
"""When a per-row materialize call raises, `_run_materialized_pass` writes
|
||||
the error to sync_state.error so it can be surfaced via the registry API.
|
||||
"""
|
||||
from app.api.sync import _run_materialized_pass
|
||||
from src.repositories.sync_state import SyncStateRepository
|
||||
from src.repositories.table_registry import TableRegistryRepository
|
||||
from src.db import get_system_db
|
||||
|
||||
sys_conn = get_system_db()
|
||||
try:
|
||||
# Seed a materialized BQ row.
|
||||
TableRegistryRepository(sys_conn).register(
|
||||
id="boom",
|
||||
name="boom",
|
||||
source_type="bigquery",
|
||||
query_mode="materialized",
|
||||
source_query='SELECT * FROM bq."ds"."missing"',
|
||||
sync_schedule="every 1m",
|
||||
)
|
||||
|
||||
# Stub the materialize seam so the per-row branch raises.
|
||||
from unittest.mock import patch
|
||||
with patch(
|
||||
"app.api.sync._materialize_table",
|
||||
side_effect=RuntimeError("boom: missing table"),
|
||||
):
|
||||
summary = _run_materialized_pass(sys_conn, bq=None)
|
||||
|
||||
assert any(e["table"] == "boom" for e in summary["errors"]), summary
|
||||
|
||||
state = SyncStateRepository(sys_conn).get_table_state("boom")
|
||||
assert state is not None, "sync_state row should be created on error"
|
||||
assert (state.get("status") or "") == "error"
|
||||
assert "boom: missing table" in (state.get("error") or "")
|
||||
finally:
|
||||
# Cleanup so the next test starts clean.
|
||||
try:
|
||||
sys_conn.execute("DELETE FROM table_registry WHERE id='boom'")
|
||||
sys_conn.execute("DELETE FROM sync_state WHERE table_id='boom'")
|
||||
except Exception:
|
||||
pass
|
||||
sys_conn.close()
|
||||
|
||||
|
||||
def test_run_materialized_pass_clears_error_on_success(seeded_app, bq_instance):
|
||||
"""When a row that previously errored materializes cleanly, the prior
|
||||
sync_state.error is cleared so the registry response stops surfacing
|
||||
a stale failure message."""
|
||||
from app.api.sync import _run_materialized_pass
|
||||
from src.repositories.sync_state import SyncStateRepository
|
||||
from src.repositories.table_registry import TableRegistryRepository
|
||||
from src.db import get_system_db
|
||||
|
||||
sys_conn = get_system_db()
|
||||
try:
|
||||
TableRegistryRepository(sys_conn).register(
|
||||
id="recover",
|
||||
name="recover",
|
||||
source_type="bigquery",
|
||||
query_mode="materialized",
|
||||
source_query='SELECT * FROM bq."ds"."t"',
|
||||
sync_schedule="every 1m",
|
||||
)
|
||||
|
||||
# Pre-seed sync_state with an error so we can verify it gets cleared.
|
||||
SyncStateRepository(sys_conn).set_error("recover", "previous run failed")
|
||||
state_before = SyncStateRepository(sys_conn).get_table_state("recover")
|
||||
assert (state_before.get("status") or "") == "error"
|
||||
|
||||
from unittest.mock import patch
|
||||
# Successful materialize returns a stats dict.
|
||||
with patch(
|
||||
"app.api.sync._materialize_table",
|
||||
return_value={
|
||||
"rows": 5, "size_bytes": 100, "hash": "abc123",
|
||||
"query_mode": "materialized",
|
||||
},
|
||||
):
|
||||
summary = _run_materialized_pass(sys_conn, bq=None)
|
||||
|
||||
assert "recover" in summary["materialized"], summary
|
||||
state_after = SyncStateRepository(sys_conn).get_table_state("recover")
|
||||
assert (state_after.get("status") or "") == "ok"
|
||||
assert (state_after.get("error") or "") == ""
|
||||
finally:
|
||||
try:
|
||||
sys_conn.execute("DELETE FROM table_registry WHERE id='recover'")
|
||||
sys_conn.execute("DELETE FROM sync_state WHERE table_id='recover'")
|
||||
except Exception:
|
||||
pass
|
||||
sys_conn.close()
|
||||
|
||||
|
||||
def test_get_registry_exposes_last_sync_error_per_table(seeded_app, bq_instance):
|
||||
"""GET /api/admin/registry includes `last_sync_error` populated from
|
||||
sync_state.error so operators have a UI/API surface to see why a
|
||||
materialize is failing without trawling scheduler logs."""
|
||||
from src.repositories.sync_state import SyncStateRepository
|
||||
from src.repositories.table_registry import TableRegistryRepository
|
||||
from src.db import get_system_db
|
||||
|
||||
sys_conn = get_system_db()
|
||||
try:
|
||||
TableRegistryRepository(sys_conn).register(
|
||||
id="failing_row",
|
||||
name="failing_row",
|
||||
source_type="bigquery",
|
||||
query_mode="materialized",
|
||||
source_query='SELECT * FROM bq."ds"."t"',
|
||||
)
|
||||
SyncStateRepository(sys_conn).set_error(
|
||||
"failing_row", "USER_PROJECT_DENIED on project xxx",
|
||||
)
|
||||
finally:
|
||||
sys_conn.close()
|
||||
|
||||
c = seeded_app["client"]
|
||||
token = seeded_app["admin_token"]
|
||||
r = c.get("/api/admin/registry", headers=_auth(token))
|
||||
assert r.status_code == 200, r.json()
|
||||
row = next(
|
||||
(t for t in r.json()["tables"] if t["id"] == "failing_row"), None,
|
||||
)
|
||||
assert row is not None, r.json()
|
||||
assert "last_sync_error" in row, list(row.keys())
|
||||
assert "USER_PROJECT_DENIED" in (row["last_sync_error"] or "")
|
||||
|
|
|
|||
Loading…
Reference in a new issue