Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors. Backend (BigQuery + Keboola, schema v20): - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19) - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload Operator + CLI surface: - da admin register-table --query / --query-mode materialized - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators Tests (incl. spike + integration + regression): - test_db_migration_v20, test_table_registry_source_query - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds) - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized - test_api_admin_materialized, test_cli_admin_materialized - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
77 lines
2.8 KiB
Python
77 lines
2.8 KiB
Python
"""v20 adds source_query column to table_registry.
|
|
|
|
Backs query_mode='materialized' for BigQuery: admin registers a SQL body
|
|
that the scheduler runs through the DuckDB BQ extension and writes as a
|
|
parquet to /data/extracts/bigquery/data/<id>.parquet.
|
|
|
|
The v19 step (#150) drops dataset_permissions, access_requests tables and
|
|
users.role, table_registry.is_public columns; v20 then ALTERs the post-v19
|
|
table_registry to add the source_query column.
|
|
"""
|
|
import duckdb
|
|
|
|
from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version
|
|
|
|
|
|
def test_schema_version_is_20():
|
|
assert SCHEMA_VERSION == 20
|
|
|
|
|
|
def test_v20_adds_source_query(tmp_path):
|
|
db_path = tmp_path / "system.duckdb"
|
|
conn = duckdb.connect(str(db_path))
|
|
_ensure_schema(conn)
|
|
|
|
cols = {
|
|
r[0] for r in conn.execute(
|
|
"SELECT column_name FROM information_schema.columns "
|
|
"WHERE table_name = 'table_registry'"
|
|
).fetchall()
|
|
}
|
|
assert "source_query" in cols, f"source_query missing from {cols}"
|
|
assert get_schema_version(conn) == 20
|
|
conn.close()
|
|
|
|
|
|
def test_v19_db_migrates_to_v20(tmp_path):
|
|
"""Pre-existing v19 DB (post-RBAC-drop) without source_query upgrades
|
|
cleanly without losing data."""
|
|
db_path = tmp_path / "system.duckdb"
|
|
conn = duckdb.connect(str(db_path))
|
|
|
|
# Simulate a v19 DB at minimal but realistic shape: schema_version row +
|
|
# a table_registry row in the post-v19 column shape (no is_public column,
|
|
# since v19 finalize dropped it via the table-rebuild idiom).
|
|
conn.execute(
|
|
"CREATE TABLE schema_version (version INTEGER, "
|
|
"applied_at TIMESTAMP DEFAULT current_timestamp)"
|
|
)
|
|
conn.execute("INSERT INTO schema_version (version) VALUES (19)")
|
|
conn.execute("""CREATE TABLE table_registry (
|
|
id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL,
|
|
source_type VARCHAR, bucket VARCHAR, source_table VARCHAR,
|
|
sync_strategy VARCHAR DEFAULT 'full_refresh',
|
|
query_mode VARCHAR DEFAULT 'local',
|
|
sync_schedule VARCHAR, profile_after_sync BOOLEAN DEFAULT true,
|
|
primary_key VARCHAR, folder VARCHAR, description TEXT,
|
|
registered_by VARCHAR,
|
|
registered_at TIMESTAMP DEFAULT current_timestamp
|
|
)""")
|
|
conn.execute("INSERT INTO table_registry (id, name) VALUES ('foo', 'foo')")
|
|
|
|
_ensure_schema(conn)
|
|
|
|
assert get_schema_version(conn) == 20
|
|
cols = {
|
|
r[0] for r in conn.execute(
|
|
"SELECT column_name FROM information_schema.columns "
|
|
"WHERE table_name = 'table_registry'"
|
|
).fetchall()
|
|
}
|
|
assert "source_query" in cols
|
|
# Existing row preserved, new column NULL
|
|
row = conn.execute(
|
|
"SELECT id, source_query FROM table_registry WHERE id='foo'"
|
|
).fetchone()
|
|
assert row == ("foo", None)
|
|
conn.close()
|