From fac10b29e42915137bc73e599ee3ec59d9f8d7bf Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 19:15:54 +0200 Subject: [PATCH] =?UTF-8?q?feat(schema):=20v24=20=E2=80=94=20rewrite=20mat?= =?UTF-8?q?erialized=20BQ=20source=5Fquery=20to=20BQ-native?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Materialize now wraps admin SQL into bigquery_query('', '') which requires the inner SQL to be BigQuery-flavor (backticked identifiers, native function syntax). v24 migrates existing rows from DuckDB-flavor (bq."ds"."tbl") to (`.ds.tbl`) using the configured BQ project. Idempotent on already-converted rows; logs a warning and skips when the project isn't configured (operator can configure + restart for retry). --- CLAUDE.md | 2 +- src/db.py | 58 +++++++- tests/test_db_schema_version.py | 9 +- tests/test_schema_v24_source_query_rewrite.py | 126 ++++++++++++++++++ 4 files changed, 189 insertions(+), 6 deletions(-) create mode 100644 tests/test_schema_v24_source_query_rewrite.py diff --git a/CLAUDE.md b/CLAUDE.md index fe9a0e0..b274ffd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -443,7 +443,7 @@ Module sets `lifecycle { ignore_changes = [metadata_startup_script] }` on `googl ## Key Implementation Details ### DuckDB Schema (src/db.py) -- Schema v23 with auto-migration v1→…→v23 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)** — see CHANGELOG and docs/RBAC.md) +- Schema v24 with auto-migration v1→…→v24 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8/v9 added the legacy internal_roles/role-grants tables, v10 added `view_ownership` for cross-connector view-name collision detection (issue #81 Group C), v11 added marketplace_registry + marketplace_plugins + user_groups + plugin_access, v12 added users.groups JSON + user_groups.is_system, **v13 replaces internal_roles/group_mappings/user_role_grants/plugin_access with user_group_members + resource_grants and drops users.groups JSON**, v14 adds FK constraints on user_group_members + resource_grants after orphan cleanup, v15 adds knowledge_items context-engineering columns + contradictions + session_extraction_state, v16 adds verification_evidence, v17 adds knowledge_item_relations, v18 drops stranded non-google memberships from google-managed groups, **v19 drops legacy `dataset_permissions`, `access_requests` tables and `users.role`, `table_registry.is_public` columns — table access is now exclusively per-group via `resource_grants(resource_type='table')`**, **v20 adds `source_query` TEXT to `table_registry` to back `query_mode='materialized'` (BigQuery scheduled-query parquet path)**, **v21 adds `welcome_template` singleton table backing the Agent Setup Prompt admin override (`/admin/agent-prompt`)**, **v22 reserves the `setup_banner` table — feature dropped mid-development; table retained for forward compatibility with already-migrated instances**, **v23 adds `claude_md_template` singleton table backing the Agent Workspace Prompt admin override (`/admin/workspace-prompt`)**, **v24 rewrites materialized BQ `source_query` from DuckDB-flavor `bq."ds"."t"` to BQ-native `` `.ds.t` `` so the new wrapping path accepts them; idempotent + warns when project unconfigured** — see CHANGELOG and docs/RBAC.md) - `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc. - `sync_state`, `sync_history`: track extraction progress - `users`, `audit_log`: account state + audit trail. RBAC lives in `user_groups` + `user_group_members` + `resource_grants`. diff --git a/src/db.py b/src/db.py index 5c3d8a3..f4afe12 100644 --- a/src/db.py +++ b/src/db.py @@ -39,7 +39,7 @@ def _maybe_instrument(con, db_tag: str): _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 23 +SCHEMA_VERSION = 24 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -1682,6 +1682,60 @@ _V22_TO_V23_MIGRATIONS = [ ] +# v24: rewrite materialized BQ source_query from DuckDB-flavor +# (bq.""."") to BigQuery-native (`..
`) +# so the new connectors.bigquery.extractor.materialize_query wrapping +# path (which routes through bigquery_query() / BQ jobs API) accepts +# them. Pre-v24, materialize used Storage Read API for the bq.. +# form, which fails for views — see PR for full motivation. +# +# This migration is implemented in Python (not pure SQL) because the +# rewrite is a regex-and-replace per row: the project_id comes from +# instance_config (file/env), not the DB. SQL alone can't pull the +# project_id and substitute it. If the project isn't configured at +# migration time, log a warning per affected row and leave them — the +# operator must configure data_source.bigquery.project, restart, and +# the migration will fire on next start (idempotent). +def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None: + import re as _re + + try: + from app.instance_config import get_value + project_id = get_value("data_source", "bigquery", "project", default="") or "" + except Exception: + project_id = "" + + pattern = _re.compile(r'bq\."([^"]+)"\."([^"]+)"') + + rows = conn.execute( + "SELECT id, source_query FROM table_registry " + "WHERE query_mode = 'materialized' " + "AND source_query LIKE '%bq.\"%' " + "AND source_type = 'bigquery'" + ).fetchall() + + for row_id, sq in rows: + if sq is None: + continue + if not project_id: + logger.warning( + "v24 migration: skipping rewrite of source_query for row %r — " + "data_source.bigquery.project is not configured. Set it via " + "/admin/server-config and restart the app to retry the " + "migration.", row_id, + ) + continue + new_sq = pattern.sub(rf'`{project_id}.\1.\2`', sq) + if new_sq != sq: + conn.execute( + "UPDATE table_registry SET source_query = ? WHERE id = ?", + [new_sq, row_id], + ) + logger.info( + "v24 migration: rewrote source_query for row %r", row_id, + ) + + def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: """Create tables if they don't exist. Apply migrations if schema version changed. @@ -1837,6 +1891,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 23: for sql in _V22_TO_V23_MIGRATIONS: conn.execute(sql) + if current < 24: + _v23_to_v24_finalize(conn) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/tests/test_db_schema_version.py b/tests/test_db_schema_version.py index b2bfc1a..b46f7fc 100644 --- a/tests/test_db_schema_version.py +++ b/tests/test_db_schema_version.py @@ -13,8 +13,9 @@ import duckdb from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version -def test_schema_version_is_23(): - assert SCHEMA_VERSION == 23 +def test_schema_version_is_24(): + # bumped from 23→24 for the materialized BQ source_query rewrite migration + assert SCHEMA_VERSION == 24 def test_v20_adds_source_query(tmp_path): @@ -29,7 +30,7 @@ def test_v20_adds_source_query(tmp_path): ).fetchall() } assert "source_query" in cols, f"source_query missing from {cols}" - assert get_schema_version(conn) == 23 + assert get_schema_version(conn) == SCHEMA_VERSION conn.close() @@ -83,7 +84,7 @@ def test_v19_db_migrates_to_v20(tmp_path): _ensure_schema(conn) - assert get_schema_version(conn) == 23 + assert get_schema_version(conn) == SCHEMA_VERSION # bumped 23→24 cols = { r[0] for r in conn.execute( "SELECT column_name FROM information_schema.columns " diff --git a/tests/test_schema_v24_source_query_rewrite.py b/tests/test_schema_v24_source_query_rewrite.py new file mode 100644 index 0000000..9a56130 --- /dev/null +++ b/tests/test_schema_v24_source_query_rewrite.py @@ -0,0 +1,126 @@ +"""v24: rewrites table_registry.source_query for materialized BQ rows +from DuckDB-flavor (bq.\"ds\".\"tbl\") to BQ-native (`.ds.tbl`). +The wrapping path (connectors.bigquery.extractor.materialize_query) only +accepts BQ-native; pre-v24 rows would fail at materialize time without +this conversion.""" +from __future__ import annotations +import os +import tempfile +from pathlib import Path + +import duckdb +import pytest + +from src.db import _ensure_schema, get_schema_version, SCHEMA_VERSION + + +def _seed_v23(conn, project_id: str = "prj-data"): + conn.execute( + "CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)" + ) + conn.execute("INSERT INTO schema_version (version) VALUES (23)") + conn.execute( + "CREATE TABLE table_registry (" + "id VARCHAR PRIMARY KEY, name VARCHAR, source_type VARCHAR, " + "query_mode VARCHAR, bucket VARCHAR, source_table VARCHAR, source_query VARCHAR)" + ) + + +def test_v24_rewrites_duckdb_flavor_to_bq_native(monkeypatch): + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"), + ) + Path(tmp, "state").mkdir(parents=True, exist_ok=True) + db_path = Path(tmp, "state", "system.duckdb") + conn = duckdb.connect(str(db_path)) + try: + _seed_v23(conn) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t1", "t1", "bigquery", "materialized", "ds", "tbl", + 'SELECT * FROM bq."ds"."tbl"'], + ) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t2", "t2", "bigquery", "materialized", "analytics", "orders", + 'SELECT col1 FROM bq."analytics"."orders" WHERE col2 > 10'], + ) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["r1", "r1", "bigquery", "remote", "ds", "tbl", None], + ) + + _ensure_schema(conn) + assert get_schema_version(conn) == SCHEMA_VERSION + assert SCHEMA_VERSION >= 24 + + rows = {r[0]: r[1] for r in conn.execute( + "SELECT id, source_query FROM table_registry" + ).fetchall()} + assert rows["t1"] == "SELECT * FROM `prj-data.ds.tbl`" + assert rows["t2"] == ( + "SELECT col1 FROM `prj-data.analytics.orders` WHERE col2 > 10" + ) + assert rows["r1"] is None # remote row untouched + finally: + conn.close() + + +def test_v24_idempotent_when_already_bq_native(monkeypatch): + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: "prj-data" if args == ("data_source", "bigquery", "project") else kw.get("default"), + ) + Path(tmp, "state").mkdir(parents=True, exist_ok=True) + db_path = Path(tmp, "state", "system.duckdb") + conn = duckdb.connect(str(db_path)) + try: + _seed_v23(conn) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t1", "t1", "bigquery", "materialized", "ds", "tbl", + "SELECT * FROM `prj-data.ds.tbl`"], + ) + _ensure_schema(conn) + row = conn.execute( + "SELECT source_query FROM table_registry WHERE id='t1'" + ).fetchone() + assert row[0] == "SELECT * FROM `prj-data.ds.tbl`" + finally: + conn.close() + + +def test_v24_logs_warning_when_project_not_configured(monkeypatch, caplog): + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: kw.get("default", ""), # no project configured + ) + Path(tmp, "state").mkdir(parents=True, exist_ok=True) + db_path = Path(tmp, "state", "system.duckdb") + conn = duckdb.connect(str(db_path)) + try: + _seed_v23(conn) + conn.execute( + 'INSERT INTO table_registry VALUES (?, ?, ?, ?, ?, ?, ?)', + ["t1", "t1", "bigquery", "materialized", "ds", "tbl", + 'SELECT * FROM bq."ds"."tbl"'], + ) + with caplog.at_level("WARNING"): + _ensure_schema(conn) + row = conn.execute( + "SELECT source_query FROM table_registry WHERE id='t1'" + ).fetchone() + assert row[0] == 'SELECT * FROM bq."ds"."tbl"' + assert any( + "v24" in r.message.lower() or "project" in r.message.lower() + for r in caplog.records + ) + finally: + conn.close()