feat(schema): v24 — rewrite materialized BQ source_query to BQ-native
Materialize now wraps admin SQL into bigquery_query('<billing>', '<inner>')
which requires the inner SQL to be BigQuery-flavor (backticked
identifiers, native function syntax). v24 migrates existing rows from
DuckDB-flavor (bq."ds"."tbl") to (`<project>.ds.tbl`) using the
configured BQ project. Idempotent on already-converted rows; logs a
warning and skips when the project isn't configured (operator can
configure + restart for retry).
This commit is contained in:
parent
77eb3244c2
commit
fac10b29e4
4 changed files with 189 additions and 6 deletions
|
|
@ -443,7 +443,7 @@ Module sets `lifecycle { ignore_changes = [metadata_startup_script] }` on `googl
|
||||||
## Key Implementation Details
|
## Key Implementation Details
|
||||||
|
|
||||||
### DuckDB Schema (src/db.py)
|
### DuckDB Schema (src/db.py)
|
||||||
- Schema v23 with auto-migration v1→…→v23 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)** — see CHANGELOG and docs/RBAC.md)
|
- Schema v24 with auto-migration v1→…→v24 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)**, **v24 rewrites materialized BQ `source_query` from DuckDB-flavor `bq."ds"."t"` to BQ-native `` `<project>.ds.t` `` so the new wrapping path accepts them; idempotent + warns when project unconfigured** — see CHANGELOG and docs/RBAC.md)
|
||||||
- `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc.
|
- `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc.
|
||||||
- `sync_state`, `sync_history`: track extraction progress
|
- `sync_state`, `sync_history`: track extraction progress
|
||||||
- `users`, `audit_log`: account state + audit trail. RBAC lives in `user_groups` + `user_group_members` + `resource_grants`.
|
- `users`, `audit_log`: account state + audit trail. RBAC lives in `user_groups` + `user_group_members` + `resource_grants`.
|
||||||
|
|
|
||||||
58
src/db.py
58
src/db.py
|
|
@ -39,7 +39,7 @@ def _maybe_instrument(con, db_tag: str):
|
||||||
|
|
||||||
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$")
|
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$")
|
||||||
|
|
||||||
SCHEMA_VERSION = 23
|
SCHEMA_VERSION = 24
|
||||||
|
|
||||||
_SYSTEM_SCHEMA = """
|
_SYSTEM_SCHEMA = """
|
||||||
CREATE TABLE IF NOT EXISTS schema_version (
|
CREATE TABLE IF NOT EXISTS schema_version (
|
||||||
|
|
@ -1682,6 +1682,60 @@ _V22_TO_V23_MIGRATIONS = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# v24: rewrite materialized BQ source_query from DuckDB-flavor
|
||||||
|
# (bq."<dataset>"."<table>") to BigQuery-native (`<project>.<dataset>.<table>`)
|
||||||
|
# so the new connectors.bigquery.extractor.materialize_query wrapping
|
||||||
|
# path (which routes through bigquery_query() / BQ jobs API) accepts
|
||||||
|
# them. Pre-v24, materialize used Storage Read API for the bq.<ds>.<tbl>
|
||||||
|
# form, which fails for views — see PR for full motivation.
|
||||||
|
#
|
||||||
|
# This migration is implemented in Python (not pure SQL) because the
|
||||||
|
# rewrite is a regex-and-replace per row: the project_id comes from
|
||||||
|
# instance_config (file/env), not the DB. SQL alone can't pull the
|
||||||
|
# project_id and substitute it. If the project isn't configured at
|
||||||
|
# migration time, log a warning per affected row and leave them — the
|
||||||
|
# operator must configure data_source.bigquery.project, restart, and
|
||||||
|
# the migration will fire on next start (idempotent).
|
||||||
|
def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None:
|
||||||
|
import re as _re
|
||||||
|
|
||||||
|
try:
|
||||||
|
from app.instance_config import get_value
|
||||||
|
project_id = get_value("data_source", "bigquery", "project", default="") or ""
|
||||||
|
except Exception:
|
||||||
|
project_id = ""
|
||||||
|
|
||||||
|
pattern = _re.compile(r'bq\."([^"]+)"\."([^"]+)"')
|
||||||
|
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT id, source_query FROM table_registry "
|
||||||
|
"WHERE query_mode = 'materialized' "
|
||||||
|
"AND source_query LIKE '%bq.\"%' "
|
||||||
|
"AND source_type = 'bigquery'"
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
for row_id, sq in rows:
|
||||||
|
if sq is None:
|
||||||
|
continue
|
||||||
|
if not project_id:
|
||||||
|
logger.warning(
|
||||||
|
"v24 migration: skipping rewrite of source_query for row %r — "
|
||||||
|
"data_source.bigquery.project is not configured. Set it via "
|
||||||
|
"/admin/server-config and restart the app to retry the "
|
||||||
|
"migration.", row_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
new_sq = pattern.sub(rf'`{project_id}.\1.\2`', sq)
|
||||||
|
if new_sq != sq:
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE table_registry SET source_query = ? WHERE id = ?",
|
||||||
|
[new_sq, row_id],
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"v24 migration: rewrote source_query for row %r", row_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
|
def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
|
||||||
"""Create tables if they don't exist. Apply migrations if schema version changed.
|
"""Create tables if they don't exist. Apply migrations if schema version changed.
|
||||||
|
|
||||||
|
|
@ -1837,6 +1891,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
|
||||||
if current < 23:
|
if current < 23:
|
||||||
for sql in _V22_TO_V23_MIGRATIONS:
|
for sql in _V22_TO_V23_MIGRATIONS:
|
||||||
conn.execute(sql)
|
conn.execute(sql)
|
||||||
|
if current < 24:
|
||||||
|
_v23_to_v24_finalize(conn)
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"UPDATE schema_version SET version = ?, applied_at = current_timestamp",
|
"UPDATE schema_version SET version = ?, applied_at = current_timestamp",
|
||||||
[SCHEMA_VERSION],
|
[SCHEMA_VERSION],
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,9 @@ import duckdb
|
||||||
from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version
|
from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version
|
||||||
|
|
||||||
|
|
||||||
def test_schema_version_is_23():
|
def test_schema_version_is_24():
|
||||||
assert SCHEMA_VERSION == 23
|
# bumped from 23→24 for the materialized BQ source_query rewrite migration
|
||||||
|
assert SCHEMA_VERSION == 24
|
||||||
|
|
||||||
|
|
||||||
def test_v20_adds_source_query(tmp_path):
|
def test_v20_adds_source_query(tmp_path):
|
||||||
|
|
@ -29,7 +30,7 @@ def test_v20_adds_source_query(tmp_path):
|
||||||
).fetchall()
|
).fetchall()
|
||||||
}
|
}
|
||||||
assert "source_query" in cols, f"source_query missing from {cols}"
|
assert "source_query" in cols, f"source_query missing from {cols}"
|
||||||
assert get_schema_version(conn) == 23
|
assert get_schema_version(conn) == SCHEMA_VERSION
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -83,7 +84,7 @@ def test_v19_db_migrates_to_v20(tmp_path):
|
||||||
|
|
||||||
_ensure_schema(conn)
|
_ensure_schema(conn)
|
||||||
|
|
||||||
assert get_schema_version(conn) == 23
|
assert get_schema_version(conn) == SCHEMA_VERSION # bumped 23→24
|
||||||
cols = {
|
cols = {
|
||||||
r[0] for r in conn.execute(
|
r[0] for r in conn.execute(
|
||||||
"SELECT column_name FROM information_schema.columns "
|
"SELECT column_name FROM information_schema.columns "
|
||||||
|
|
|
||||||
126
tests/test_schema_v24_source_query_rewrite.py
Normal file
126
tests/test_schema_v24_source_query_rewrite.py
Normal file
|
|
@ -0,0 +1,126 @@
|
||||||
|
"""v24: rewrites table_registry.source_query for materialized BQ rows
|
||||||
|
from DuckDB-flavor (bq.\"ds\".\"tbl\") to BQ-native (`<project>.ds.tbl`).
|
||||||
|
The wrapping path (connectors.bigquery.extractor.materialize_query) only
|
||||||
|
accepts BQ-native; pre-v24 rows would fail at materialize time without
|
||||||
|
this conversion."""
|
||||||
|
from __future__ import annotations
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import duckdb
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from src.db import _ensure_schema, get_schema_version, SCHEMA_VERSION
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_v23(conn, project_id: str = "prj-data"):
|
||||||
|
conn.execute(
|
||||||
|
"CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)"
|
||||||
|
)
|
||||||
|
conn.execute("INSERT INTO schema_version (version) VALUES (23)")
|
||||||
|
conn.execute(
|
||||||
|
"CREATE TABLE table_registry ("
|
||||||
|
"id VARCHAR PRIMARY KEY, name VARCHAR, source_type VARCHAR, "
|
||||||
|
"query_mode VARCHAR, bucket VARCHAR, source_table VARCHAR, source_query VARCHAR)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_v24_rewrites_duckdb_flavor_to_bq_native(monkeypatch):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
monkeypatch.setenv("DATA_DIR", tmp)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"app.instance_config.get_value",
|
||||||
|
lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"),
|
||||||
|
)
|
||||||
|
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
|
||||||
|
db_path = Path(tmp, "state", "system.duckdb")
|
||||||
|
conn = duckdb.connect(str(db_path))
|
||||||
|
try:
|
||||||
|
_seed_v23(conn)
|
||||||
|
conn.execute(
|
||||||
|
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||||
|
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
|
||||||
|
'SELECT * FROM bq."ds"."tbl"'],
|
||||||
|
)
|
||||||
|
conn.execute(
|
||||||
|
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||||
|
["t2", "t2", "bigquery", "materialized", "analytics", "orders",
|
||||||
|
'SELECT col1 FROM bq."analytics"."orders" WHERE col2 > 10'],
|
||||||
|
)
|
||||||
|
conn.execute(
|
||||||
|
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||||
|
["r1", "r1", "bigquery", "remote", "ds", "tbl", None],
|
||||||
|
)
|
||||||
|
|
||||||
|
_ensure_schema(conn)
|
||||||
|
assert get_schema_version(conn) == SCHEMA_VERSION
|
||||||
|
assert SCHEMA_VERSION >= 24
|
||||||
|
|
||||||
|
rows = {r[0]: r[1] for r in conn.execute(
|
||||||
|
"SELECT id, source_query FROM table_registry"
|
||||||
|
).fetchall()}
|
||||||
|
assert rows["t1"] == "SELECT * FROM `prj-data.ds.tbl`"
|
||||||
|
assert rows["t2"] == (
|
||||||
|
"SELECT col1 FROM `prj-data.analytics.orders` WHERE col2 > 10"
|
||||||
|
)
|
||||||
|
assert rows["r1"] is None # remote row untouched
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_v24_idempotent_when_already_bq_native(monkeypatch):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
monkeypatch.setenv("DATA_DIR", tmp)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"app.instance_config.get_value",
|
||||||
|
lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"),
|
||||||
|
)
|
||||||
|
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
|
||||||
|
db_path = Path(tmp, "state", "system.duckdb")
|
||||||
|
conn = duckdb.connect(str(db_path))
|
||||||
|
try:
|
||||||
|
_seed_v23(conn)
|
||||||
|
conn.execute(
|
||||||
|
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||||
|
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
|
||||||
|
"SELECT * FROM `prj-data.ds.tbl`"],
|
||||||
|
)
|
||||||
|
_ensure_schema(conn)
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT source_query FROM table_registry WHERE id='t1'"
|
||||||
|
).fetchone()
|
||||||
|
assert row[0] == "SELECT * FROM `prj-data.ds.tbl`"
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_v24_logs_warning_when_project_not_configured(monkeypatch, caplog):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
monkeypatch.setenv("DATA_DIR", tmp)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"app.instance_config.get_value",
|
||||||
|
lambda *args, **kw: kw.get("default", ""), # no project configured
|
||||||
|
)
|
||||||
|
Path(tmp, "state").mkdir(parents=True, exist_ok=True)
|
||||||
|
db_path = Path(tmp, "state", "system.duckdb")
|
||||||
|
conn = duckdb.connect(str(db_path))
|
||||||
|
try:
|
||||||
|
_seed_v23(conn)
|
||||||
|
conn.execute(
|
||||||
|
'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)',
|
||||||
|
["t1", "t1", "bigquery", "materialized", "ds", "tbl",
|
||||||
|
'SELECT * FROM bq."ds"."tbl"'],
|
||||||
|
)
|
||||||
|
with caplog.at_level("WARNING"):
|
||||||
|
_ensure_schema(conn)
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT source_query FROM table_registry WHERE id='t1'"
|
||||||
|
).fetchone()
|
||||||
|
assert row[0] == 'SELECT * FROM bq."ds"."tbl"'
|
||||||
|
assert any(
|
||||||
|
"v24" in r.message.lower() or "project" in r.message.lower()
|
||||||
|
for r in caplog.records
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
Loading…
Reference in a new issue