feat(schema): v24 — rewrite materialized BQ source_query to BQ-native
Materialize now wraps admin SQL into bigquery_query('<billing>', '<inner>')
which requires the inner SQL to be BigQuery-flavor (backticked
identifiers, native function syntax). v24 migrates existing rows from
DuckDB-flavor (bq."ds"."tbl") to (`<project>.ds.tbl`) using the
configured BQ project. Idempotent on already-converted rows; logs a
warning and skips when the project isn't configured (operator can
configure + restart for retry).
This commit is contained in:
parent
77eb3244c2
commit
fac10b29e4
4 changed files with 189 additions and 6 deletions
|
|
@ -443,7 +443,7 @@ Module sets `lifecycle { ignore_changes = [metadata_startup_script] }` on `googl
|
|||
## Key Implementation Details
|
||||
|
||||
### DuckDB Schema (src/db.py)
|
||||
- Schema v23 with auto-migration v1→…→v23 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)** — see CHANGELOG and docs/RBAC.md)
|
||||
- Schema v24 with auto-migration v1→…→v24 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)**, **v24 rewrites materialized BQ `source_query` from DuckDB-flavor `bq."ds"."t"` to BQ-native `` `<project>.ds.t` `` so the new wrapping path accepts them; idempotent + warns when project unconfigured** — see CHANGELOG and docs/RBAC.md)
|
||||
- `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc.
|
||||
- `sync_state`, `sync_history`: track extraction progress
|
||||
- `users`, `audit_log`: account state + audit trail. RBAC lives in `user_groups` + `user_group_members` + `resource_grants`.
|
||||
|
|
|
|||
58
src/db.py
58
src/db.py
|
|
@ -39,7 +39,7 @@ def _maybe_instrument(con, db_tag: str):
|
|||
|
||||
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$")
|
||||
|
||||
SCHEMA_VERSION = 23
|
||||
SCHEMA_VERSION = 24
|
||||
|
||||
_SYSTEM_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
|
|
@ -1682,6 +1682,60 @@ _V22_TO_V23_MIGRATIONS = [
|
|||
]
|
||||
|
||||
|
||||
# v24: rewrite materialized BQ source_query from DuckDB-flavor
|
||||
# (bq."<dataset>"."<table>") to BigQuery-native (`<project>.<dataset>.<table>`)
|
||||
# so the new connectors.bigquery.extractor.materialize_query wrapping
|
||||
# path (which routes through bigquery_query() / BQ jobs API) accepts
|
||||
# them. Pre-v24, materialize used Storage Read API for the bq.<ds>.<tbl>
|
||||
# form, which fails for views — see PR for full motivation.
|
||||
#
|
||||
# This migration is implemented in Python (not pure SQL) because the
|
||||
# rewrite is a regex-and-replace per row: the project_id comes from
|
||||
# instance_config (file/env), not the DB. SQL alone can't pull the
|
||||
# project_id and substitute it. If the project isn't configured at
|
||||
# migration time, log a warning per affected row and leave them — the
|
||||
# operator must configure data_source.bigquery.project, restart, and
|
||||
# the migration will fire on next start (idempotent).
|
||||
def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None:
|
||||
import re as _re
|
||||
|
||||
try:
|
||||
from app.instance_config import get_value
|
||||
project_id = get_value("data_source", "bigquery", "project", default="") or ""
|
||||
except Exception:
|
||||
project_id = ""
|
||||
|
||||
pattern = _re.compile(r'bq\."([^"]+)"\."([^"]+)"')
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT id, source_query FROM table_registry "
|
||||
"WHERE query_mode = 'materialized' "
|
||||
"AND source_query LIKE '%bq.\"%' "
|
||||
"AND source_type = 'bigquery'"
|
||||
).fetchall()
|
||||
|
||||
for row_id, sq in rows:
|
||||
if sq is None:
|
||||
continue
|
||||
if not project_id:
|
||||
logger.warning(
|
||||
"v24 migration: skipping rewrite of source_query for row %r — "
|
||||
"data_source.bigquery.project is not configured. Set it via "
|
||||
"/admin/server-config and restart the app to retry the "
|
||||
"migration.", row_id,
|
||||
)
|
||||
continue
|
||||
new_sq = pattern.sub(rf'`{project_id}.\1.\2`', sq)
|
||||
if new_sq != sq:
|
||||
conn.execute(
|
||||
"UPDATE table_registry SET source_query = ? WHERE id = ?",
|
||||
[new_sq, row_id],
|
||||
)
|
||||
logger.info(
|
||||
"v24 migration: rewrote source_query for row %r", row_id,
|
||||
)
|
||||
|
||||
|
||||
def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
|
||||
"""Create tables if they don't exist. Apply migrations if schema version changed.
|
||||
|
||||
|
|
@ -1837,6 +1891,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
|
|||
if current < 23:
|
||||
for sql in _V22_TO_V23_MIGRATIONS:
|
||||
conn.execute(sql)
|
||||
if current < 24:
|
||||
_v23_to_v24_finalize(conn)
|
||||
conn.execute(
|
||||
"UPDATE schema_version SET version = ?, applied_at = current_timestamp",
|
||||
[SCHEMA_VERSION],
|
||||
|
|
|
|||
|
|
@ -13,8 +13,9 @@ import duckdb
|
|||
from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version
|
||||
|
||||
|
||||
def test_schema_version_is_23():
|
||||
assert SCHEMA_VERSION == 23
|
||||
def test_schema_version_is_24():
|
||||
# bumped from 23→24 for the materialized BQ source_query rewrite migration
|
||||
assert SCHEMA_VERSION == 24
|
||||
|
||||
|
||||
def test_v20_adds_source_query(tmp_path):
|
||||
|
|
@ -29,7 +30,7 @@ def test_v20_adds_source_query(tmp_path):
|
|||
).fetchall()
|
||||
}
|
||||
assert "source_query" in cols, f"source_query missing from {cols}"
|
||||
assert get_schema_version(conn) == 23
|
||||
assert get_schema_version(conn) == SCHEMA_VERSION
|
||||
conn.close()
|
||||
|
||||
|
||||
|
|
@ -83,7 +84,7 @@ def test_v19_db_migrates_to_v20(tmp_path):
|
|||
|
||||
_ensure_schema(conn)
|
||||
|
||||
assert get_schema_version(conn) == 23
|
||||
assert get_schema_version(conn) == SCHEMA_VERSION # bumped 23→24
|
||||
cols = {
|
||||
r[0] for r in conn.execute(
|
||||
"SELECT column_name FROM information_schema.columns "
|
||||
|
|
|
|||
126
tests/test_schema_v24_source_query_rewrite.py
Normal file
126
tests/test_schema_v24_source_query_rewrite.py
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
"""v24: rewrites table_registry.source_query for materialized BQ rows
|
||||
from DuckDB-flavor (bq.\"ds\".\"tbl\") to BQ-native (`<project>.ds.tbl`).
|
||||
The wrapping path (connectors.bigquery.extractor.materialize_query) only
|
||||
accepts BQ-native; pre-v24 rows would fail at materialize time without
|
||||
this conversion."""
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import duckdb
|
||||
import pytest
|
||||
|
||||
from src.db import _ensure_schema, get_schema_version, SCHEMA_VERSION
|
||||
|
||||
|
||||
def _seed_v23(conn, project_id: str = "prj-data"):
|
||||
conn.execute(
|
||||
"CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)"
|
||||
)
|
||||
conn.execute("INSERT INTO schema_version (version) VALUES (23)")
|
||||
conn.execute(
|
||||
"CREATE TABLE table_registry ("
|
||||
"id VARCHAR PRIMARY KEY, name VARCHAR, source_type VARCHAR, "
|
||||
"query_mode VARCHAR, bucket VARCHAR, source_table VARCHAR, source_query VARCHAR)"
|
||||
)
|
||||
|
||||
|
||||
def test_v24_rewrites_duckdb_flavor_to_bq_native(monkeypatch):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
monkeypatch.setenv("DATA_DIR", tmp)
|
||||
monkeypatch.setattr(
|
||||
"app.instance_config.get_value",
|
||||
lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"),
|
||||
)
|
||||
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
|
||||
db_path = Path(tmp, "state", "system.duckdb")
|
||||
conn = duckdb.connect(str(db_path))
|
||||
try:
|
||||
_seed_v23(conn)
|
||||
conn.execute(
|
||||
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
|
||||
'SELECT * FROM bq."ds"."tbl"'],
|
||||
)
|
||||
conn.execute(
|
||||
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||
["t2", "t2", "bigquery", "materialized", "analytics", "orders",
|
||||
'SELECT col1 FROM bq."analytics"."orders" WHERE col2 > 10'],
|
||||
)
|
||||
conn.execute(
|
||||
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||
["r1", "r1", "bigquery", "remote", "ds", "tbl", None],
|
||||
)
|
||||
|
||||
_ensure_schema(conn)
|
||||
assert get_schema_version(conn) == SCHEMA_VERSION
|
||||
assert SCHEMA_VERSION >= 24
|
||||
|
||||
rows = {r[0]: r[1] for r in conn.execute(
|
||||
"SELECT id, source_query FROM table_registry"
|
||||
).fetchall()}
|
||||
assert rows["t1"] == "SELECT * FROM `prj-data.ds.tbl`"
|
||||
assert rows["t2"] == (
|
||||
"SELECT col1 FROM `prj-data.analytics.orders` WHERE col2 > 10"
|
||||
)
|
||||
assert rows["r1"] is None # remote row untouched
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_v24_idempotent_when_already_bq_native(monkeypatch):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
monkeypatch.setenv("DATA_DIR", tmp)
|
||||
monkeypatch.setattr(
|
||||
"app.instance_config.get_value",
|
||||
lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"),
|
||||
)
|
||||
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
|
||||
db_path = Path(tmp, "state", "system.duckdb")
|
||||
conn = duckdb.connect(str(db_path))
|
||||
try:
|
||||
_seed_v23(conn)
|
||||
conn.execute(
|
||||
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
|
||||
"SELECT * FROM `prj-data.ds.tbl`"],
|
||||
)
|
||||
_ensure_schema(conn)
|
||||
row = conn.execute(
|
||||
"SELECT source_query FROM table_registry WHERE id='t1'"
|
||||
).fetchone()
|
||||
assert row[0] == "SELECT * FROM `prj-data.ds.tbl`"
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_v24_logs_warning_when_project_not_configured(monkeypatch, caplog):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
monkeypatch.setenv("DATA_DIR", tmp)
|
||||
monkeypatch.setattr(
|
||||
"app.instance_config.get_value",
|
||||
lambda *args, **kw: kw.get("default", ""), # no project configured
|
||||
)
|
||||
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
|
||||
db_path = Path(tmp, "state", "system.duckdb")
|
||||
conn = duckdb.connect(str(db_path))
|
||||
try:
|
||||
_seed_v23(conn)
|
||||
conn.execute(
|
||||
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
|
||||
'SELECT * FROM bq."ds"."tbl"'],
|
||||
)
|
||||
with caplog.at_level("WARNING"):
|
||||
_ensure_schema(conn)
|
||||
row = conn.execute(
|
||||
"SELECT source_query FROM table_registry WHERE id='t1'"
|
||||
).fetchone()
|
||||
assert row[0] == 'SELECT * FROM bq."ds"."tbl"'
|
||||
assert any(
|
||||
"v24" in r.message.lower() or "project" in r.message.lower()
|
||||
for r in caplog.records
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
Loading…
Reference in a new issue