Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors. Backend (BigQuery + Keboola, schema v20): - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19) - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload Operator + CLI surface: - da admin register-table --query / --query-mode materialized - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators Tests (incl. spike + integration + regression): - test_db_migration_v20, test_table_registry_source_query - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds) - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized - test_api_admin_materialized, test_cli_admin_materialized - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
139 lines
4.6 KiB
Python
139 lines
4.6 KiB
Python
"""BigQuery `materialize_query` writes parquet via BqAccess + DuckDB COPY.
|
|
|
|
The function takes a `BqAccess` instance so the BQ extension session and
|
|
SECRET token live in one place across the codebase (cf. `v2_scan` / `v2_sample`
|
|
/ `v2_schema`). Tests inject a stub BqAccess whose `duckdb_session()` yields
|
|
an in-memory connection with a pre-attached `bq` catalog containing fixture
|
|
tables, exercising the COPY path end-to-end without any GCP traffic.
|
|
"""
|
|
import duckdb
|
|
import pytest
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock
|
|
|
|
from connectors.bigquery.access import BqAccess, BqProjects
|
|
from connectors.bigquery.extractor import materialize_query, MaterializeBudgetError
|
|
|
|
|
|
def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess:
|
|
"""Return a BqAccess wired to factories that yield an in-memory DuckDB
|
|
with a pretend `bq` catalog containing test tables. `tables` maps
|
|
DuckDB-three-part references like `'bq.test.orders'` to a SELECT
|
|
expression to seed them with.
|
|
"""
|
|
tables = tables or {}
|
|
|
|
@contextmanager
|
|
def _session(_projects):
|
|
conn = duckdb.connect(":memory:")
|
|
try:
|
|
conn.execute("ATTACH ':memory:' AS bq")
|
|
schemas = {ref.rsplit(".", 1)[0] for ref in tables}
|
|
for s in schemas:
|
|
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
|
|
for ref, body in tables.items():
|
|
conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}")
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
# client_factory returns a stub whose .query(sql, job_config=...) yields
|
|
# a job whose .total_bytes_processed defaults to 0 (fail-open).
|
|
def _client(_projects):
|
|
client = MagicMock()
|
|
job = MagicMock()
|
|
job.total_bytes_processed = 0
|
|
client.query.return_value = job
|
|
return client
|
|
|
|
return BqAccess(
|
|
BqProjects(billing="test-billing", data="test-data"),
|
|
client_factory=_client,
|
|
duckdb_session_factory=_session,
|
|
)
|
|
|
|
|
|
def test_materialize_writes_parquet_and_returns_stats(tmp_path):
|
|
out = tmp_path / "extracts" / "bigquery"
|
|
out.mkdir(parents=True)
|
|
|
|
bq = _make_stub_bq({
|
|
"bq.test.orders": (
|
|
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
|
|
"SELECT 'US' AS region, 250 AS revenue"
|
|
)
|
|
})
|
|
|
|
stats = materialize_query(
|
|
table_id="orders_summary",
|
|
sql="SELECT region, SUM(revenue) AS revenue FROM bq.test.orders GROUP BY 1",
|
|
bq=bq,
|
|
output_dir=str(out),
|
|
)
|
|
|
|
parquet_path = out / "data" / "orders_summary.parquet"
|
|
assert parquet_path.exists()
|
|
assert stats["rows"] == 2
|
|
assert stats["size_bytes"] > 0
|
|
assert stats["query_mode"] == "materialized"
|
|
|
|
# Parquet readable end-to-end
|
|
rows = duckdb.connect().execute(
|
|
f"SELECT region, revenue FROM read_parquet('{parquet_path}') ORDER BY region"
|
|
).fetchall()
|
|
assert rows == [("EU", 100), ("US", 250)]
|
|
|
|
|
|
def test_materialize_atomic_on_failure(tmp_path):
|
|
"""Bad SQL must not leave a half-written parquet behind."""
|
|
out = tmp_path / "extracts" / "bigquery"
|
|
out.mkdir(parents=True)
|
|
parquet_path = out / "data" / "broken.parquet"
|
|
|
|
bq = _make_stub_bq({"bq.test.orders": "SELECT 1 AS n"})
|
|
|
|
with pytest.raises(Exception):
|
|
materialize_query(
|
|
table_id="broken",
|
|
sql="SELECT * FROM bq.test.does_not_exist",
|
|
bq=bq,
|
|
output_dir=str(out),
|
|
)
|
|
assert not parquet_path.exists()
|
|
# Tmp also cleaned
|
|
assert not (out / "data" / "broken.parquet.tmp").exists()
|
|
|
|
|
|
def test_materialize_rejects_unsafe_table_id(tmp_path):
|
|
"""table_id becomes the parquet filename — block path traversal up front."""
|
|
out = tmp_path / "extracts" / "bigquery"
|
|
out.mkdir(parents=True)
|
|
bq = _make_stub_bq()
|
|
|
|
with pytest.raises(ValueError, match="unsafe"):
|
|
materialize_query(
|
|
table_id="../etc/passwd",
|
|
sql="SELECT 1",
|
|
bq=bq,
|
|
output_dir=str(out),
|
|
)
|
|
|
|
|
|
def test_materialize_overwrites_existing_parquet(tmp_path):
|
|
out = tmp_path / "extracts" / "bigquery"
|
|
out.mkdir(parents=True)
|
|
bq = _make_stub_bq({"bq.test.tiny": "SELECT 1 AS n"})
|
|
|
|
materialize_query(
|
|
table_id="t1", sql="SELECT 1 AS n",
|
|
bq=bq, output_dir=str(out),
|
|
)
|
|
materialize_query(
|
|
table_id="t1", sql="SELECT 2 AS n",
|
|
bq=bq, output_dir=str(out),
|
|
)
|
|
rows = duckdb.connect().execute(
|
|
f"SELECT n FROM read_parquet('{out}/data/t1.parquet')"
|
|
).fetchall()
|
|
assert rows == [(2,)]
|