Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors. Backend (BigQuery + Keboola, schema v20): - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19) - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload Operator + CLI surface: - da admin register-table --query / --query-mode materialized - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators Tests (incl. spike + integration + regression): - test_db_migration_v20, test_table_registry_source_query - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds) - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized - test_api_admin_materialized, test_cli_admin_materialized - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
"""init_extract skips rows with query_mode='materialized'.
|
|
|
|
Materialized rows are written by the sync trigger pass via
|
|
`materialize_query()`; they live as parquets in /data/extracts/bigquery/data/
|
|
and surface via the orchestrator's standard local-parquet discovery.
|
|
Creating a remote view in extract.duckdb for the same name would shadow
|
|
the parquet via cross-source name collision.
|
|
|
|
Pattern matches `tests/test_bigquery_extractor.py::TestViewVsTableTemplates`
|
|
(uses `_CapturingProxy` to wrap a real DuckDB conn and stub BQ-specific calls).
|
|
"""
|
|
import duckdb
|
|
from unittest.mock import MagicMock
|
|
|
|
|
|
class _CapturingProxy:
|
|
"""Wraps a real DuckDB connection, captures SQL, stubs BQ-specific calls.
|
|
|
|
DuckDBPyConnection.execute is C-level read-only, so we wrap rather than
|
|
monkey-patch. Shape lifted directly from tests/test_bigquery_extractor.py
|
|
to keep stub behavior consistent across the BQ test suite.
|
|
"""
|
|
|
|
def __init__(self, real_conn, captured: list):
|
|
self._real = real_conn
|
|
self._captured = captured
|
|
|
|
def execute(self, sql, *args, **kwargs):
|
|
self._captured.append(sql)
|
|
stripped_u = sql.strip().upper()
|
|
if stripped_u.startswith(("INSTALL ", "LOAD ", "CREATE SECRET")):
|
|
return MagicMock()
|
|
if stripped_u.startswith("ATTACH ") and "BIGQUERY" in stripped_u:
|
|
return MagicMock()
|
|
if stripped_u.startswith("DETACH "):
|
|
return MagicMock()
|
|
if 'FROM bq.' in sql or 'FROM bigquery_query' in sql:
|
|
return MagicMock()
|
|
return self._real.execute(sql, *args, **kwargs)
|
|
|
|
def close(self):
|
|
return self._real.close()
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._real, name)
|
|
|
|
|
|
def test_init_extract_skips_materialized_rows(tmp_path, monkeypatch):
|
|
"""A registry mix of remote + materialized rows: only the remote row
|
|
gets a `_meta` entry; the materialized row is silently skipped."""
|
|
from connectors.bigquery.extractor import init_extract
|
|
|
|
monkeypatch.setattr(
|
|
"connectors.bigquery.extractor.get_metadata_token",
|
|
lambda: "test-token",
|
|
)
|
|
monkeypatch.setattr(
|
|
"connectors.bigquery.extractor._detect_table_type",
|
|
lambda *a, **kw: "BASE TABLE",
|
|
)
|
|
|
|
captured: list[str] = []
|
|
real_connect = duckdb.connect
|
|
|
|
def spy_connect(*a, **kw):
|
|
return _CapturingProxy(real_connect(*a, **kw), captured)
|
|
|
|
monkeypatch.setattr(
|
|
"connectors.bigquery.extractor.duckdb.connect", spy_connect
|
|
)
|
|
|
|
configs = [
|
|
{
|
|
"name": "live_orders", "bucket": "dset", "source_table": "live",
|
|
"query_mode": "remote", "description": "",
|
|
},
|
|
{
|
|
"name": "agg_90d", "bucket": "dset", "source_table": "live",
|
|
"query_mode": "materialized",
|
|
"source_query": "SELECT 1",
|
|
"description": "",
|
|
},
|
|
]
|
|
stats = init_extract(str(tmp_path), "test-project", configs)
|
|
|
|
db_path = tmp_path / "extract.duckdb"
|
|
assert db_path.exists(), "extract.duckdb should be written"
|
|
|
|
db = duckdb.connect(str(db_path))
|
|
meta = db.execute(
|
|
"SELECT table_name, query_mode FROM _meta ORDER BY table_name"
|
|
).fetchall()
|
|
db.close()
|
|
|
|
assert meta == [("live_orders", "remote")]
|
|
assert stats["tables_registered"] == 1
|
|
# No CREATE VIEW for the materialized row
|
|
assert not any("agg_90d" in s for s in captured if "CREATE" in s.upper())
|