feat(schema): v24 — rewrite materialized BQ source_query to BQ-native

Materialize now wraps admin SQL into bigquery_query('<billing>', '<inner>')
which requires the inner SQL to be BigQuery-flavor (backticked
identifiers, native function syntax). v24 migrates existing rows from
DuckDB-flavor (bq."ds"."tbl") to (`<project>.ds.tbl`) using the
configured BQ project. Idempotent on already-converted rows; logs a
warning and skips when the project isn't configured (operator can
configure + restart for retry).
This commit is contained in:
ZdenekSrotyr 2026-05-04 19:15:54 +02:00
parent 77eb3244c2
commit fac10b29e4
4 changed files with 189 additions and 6 deletions

View file

@ -443,7 +443,7 @@ Module sets `lifecycle { ignore_changes = [metadata_startup_script] }` on `googl
## Key Implementation Details
### DuckDB Schema (src/db.py)
- Schema v23 with auto-migration v1→…→v23 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)** — see CHANGELOG and docs/RBAC.md)
- Schema v24 with auto-migration v1→…→v24 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)**, **v24 rewrites materialized BQ `source_query` from DuckDB-flavor `bq."ds"."t"` to BQ-native `` `<project>.ds.t` `` so the new wrapping path accepts them; idempotent + warns when project unconfigured** — see CHANGELOG and docs/RBAC.md)
- `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc.
- `sync_state`, `sync_history`: track extraction progress
- `users`, `audit_log`: account state + audit trail. RBAC lives in `user_groups` + `user_group_members` + `resource_grants`.

View file

@ -39,7 +39,7 @@ def _maybe_instrument(con, db_tag: str):
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$")
SCHEMA_VERSION = 23
SCHEMA_VERSION = 24
_SYSTEM_SCHEMA = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -1682,6 +1682,60 @@ _V22_TO_V23_MIGRATIONS = [
]
# v24: rewrite materialized BQ source_query from DuckDB-flavor
# (bq."<dataset>"."<table>") to BigQuery-native (`<project>.<dataset>.<table>`)
# so the new connectors.bigquery.extractor.materialize_query wrapping
# path (which routes through bigquery_query() / BQ jobs API) accepts
# them. Pre-v24, materialize used Storage Read API for the bq.<ds>.<tbl>
# form, which fails for views — see PR for full motivation.
#
# This migration is implemented in Python (not pure SQL) because the
# rewrite is a regex-and-replace per row: the project_id comes from
# instance_config (file/env), not the DB. SQL alone can't pull the
# project_id and substitute it. If the project isn't configured at
# migration time, log a warning per affected row and leave them — the
# operator must configure data_source.bigquery.project, restart, and
# the migration will fire on next start (idempotent).
def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None:
import re as _re
try:
from app.instance_config import get_value
project_id = get_value("data_source", "bigquery", "project", default="") or ""
except Exception:
project_id = ""
pattern = _re.compile(r'bq\."([^"]+)"\."([^"]+)"')
rows = conn.execute(
"SELECT id, source_query FROM table_registry "
"WHERE query_mode = 'materialized' "
"AND source_query LIKE '%bq.\"%' "
"AND source_type = 'bigquery'"
).fetchall()
for row_id, sq in rows:
if sq is None:
continue
if not project_id:
logger.warning(
"v24 migration: skipping rewrite of source_query for row %r"
"data_source.bigquery.project is not configured. Set it via "
"/admin/server-config and restart the app to retry the "
"migration.", row_id,
)
continue
new_sq = pattern.sub(rf'`{project_id}.\1.\2`', sq)
if new_sq != sq:
conn.execute(
"UPDATE table_registry SET source_query = ? WHERE id = ?",
[new_sq, row_id],
)
logger.info(
"v24 migration: rewrote source_query for row %r", row_id,
)
def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
"""Create tables if they don't exist. Apply migrations if schema version changed.
@ -1837,6 +1891,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
if current < 23:
for sql in _V22_TO_V23_MIGRATIONS:
conn.execute(sql)
if current < 24:
_v23_to_v24_finalize(conn)
conn.execute(
"UPDATE schema_version SET version = ?, applied_at = current_timestamp",
[SCHEMA_VERSION],

View file

@ -13,8 +13,9 @@ import duckdb
from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version
def test_schema_version_is_23():
assert SCHEMA_VERSION == 23
def test_schema_version_is_24():
# bumped from 23→24 for the materialized BQ source_query rewrite migration
assert SCHEMA_VERSION == 24
def test_v20_adds_source_query(tmp_path):
@ -29,7 +30,7 @@ def test_v20_adds_source_query(tmp_path):
).fetchall()
}
assert "source_query" in cols, f"source_query missing from {cols}"
assert get_schema_version(conn) == 23
assert get_schema_version(conn) == SCHEMA_VERSION
conn.close()
@ -83,7 +84,7 @@ def test_v19_db_migrates_to_v20(tmp_path):
_ensure_schema(conn)
assert get_schema_version(conn) == 23
assert get_schema_version(conn) == SCHEMA_VERSION # bumped 23→24
cols = {
r[0] for r in conn.execute(
"SELECT column_name FROM information_schema.columns "

View file

@ -0,0 +1,126 @@
"""v24: rewrites table_registry.source_query for materialized BQ rows
from DuckDB-flavor (bq.\"ds\".\"tbl\") to BQ-native (`<project>.ds.tbl`).
The wrapping path (connectors.bigquery.extractor.materialize_query) only
accepts BQ-native; pre-v24 rows would fail at materialize time without
this conversion."""
from __future__ import annotations
import os
import tempfile
from pathlib import Path
import duckdb
import pytest
from src.db import _ensure_schema, get_schema_version, SCHEMA_VERSION
def _seed_v23(conn, project_id: str = "prj-data"):
conn.execute(
"CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)"
)
conn.execute("INSERT INTO schema_version (version) VALUES (23)")
conn.execute(
"CREATE TABLE table_registry ("
"id VARCHAR PRIMARY KEY, name VARCHAR, source_type VARCHAR, "
"query_mode VARCHAR, bucket VARCHAR, source_table VARCHAR, source_query VARCHAR)"
)
def test_v24_rewrites_duckdb_flavor_to_bq_native(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
monkeypatch.setattr(
"app.instance_config.get_value",
lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"),
)
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
db_path = Path(tmp, "state", "system.duckdb")
conn = duckdb.connect(str(db_path))
try:
_seed_v23(conn)
conn.execute(
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
'SELECT * FROM bq."ds"."tbl"'],
)
conn.execute(
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
["t2", "t2", "bigquery", "materialized", "analytics", "orders",
'SELECT col1 FROM bq."analytics"."orders" WHERE col2 > 10'],
)
conn.execute(
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
["r1", "r1", "bigquery", "remote", "ds", "tbl", None],
)
_ensure_schema(conn)
assert get_schema_version(conn) == SCHEMA_VERSION
assert SCHEMA_VERSION >= 24
rows = {r[0]: r[1] for r in conn.execute(
"SELECT id, source_query FROM table_registry"
).fetchall()}
assert rows["t1"] == "SELECT * FROM `prj-data.ds.tbl`"
assert rows["t2"] == (
"SELECT col1 FROM `prj-data.analytics.orders` WHERE col2 > 10"
)
assert rows["r1"] is None # remote row untouched
finally:
conn.close()
def test_v24_idempotent_when_already_bq_native(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
monkeypatch.setattr(
"app.instance_config.get_value",
lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"),
)
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
db_path = Path(tmp, "state", "system.duckdb")
conn = duckdb.connect(str(db_path))
try:
_seed_v23(conn)
conn.execute(
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
"SELECT * FROM `prj-data.ds.tbl`"],
)
_ensure_schema(conn)
row = conn.execute(
"SELECT source_query FROM table_registry WHERE id='t1'"
).fetchone()
assert row[0] == "SELECT * FROM `prj-data.ds.tbl`"
finally:
conn.close()
def test_v24_logs_warning_when_project_not_configured(monkeypatch, caplog):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
monkeypatch.setattr(
"app.instance_config.get_value",
lambda *args, **kw: kw.get("default", ""), # no project configured
)
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
db_path = Path(tmp, "state", "system.duckdb")
conn = duckdb.connect(str(db_path))
try:
_seed_v23(conn)
conn.execute(
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
'SELECT * FROM bq."ds"."tbl"'],
)
with caplog.at_level("WARNING"):
_ensure_schema(conn)
row = conn.execute(
"SELECT source_query FROM table_registry WHERE id='t1'"
).fetchone()
assert row[0] == 'SELECT * FROM bq."ds"."tbl"'
assert any(
"v24" in r.message.lower() or "project" in r.message.lower()
for r in caplog.records
)
finally:
conn.close()