agnes-the-ai-analyst/tests/test_bq_materialize.py
ZdenekSrotyr 85d3810535 feat(materialized): query_mode='materialized' for BigQuery + Keboola — admin SELECT → parquet → analyst
Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors.

Backend (BigQuery + Keboola, schema v20):
  - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19)
  - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state
  - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc
  - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows
  - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query
  - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload

Operator + CLI surface:
  - da admin register-table --query / --query-mode materialized
  - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators

Tests (incl. spike + integration + regression):
  - test_db_migration_v20, test_table_registry_source_query
  - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips
  - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds)
  - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized
  - test_api_admin_materialized, test_cli_admin_materialized
  - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e

Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
2026-05-01 20:25:56 +02:00

139 lines
4.6 KiB
Python

"""BigQuery `materialize_query` writes parquet via BqAccess + DuckDB COPY.
The function takes a `BqAccess` instance so the BQ extension session and
SECRET token live in one place across the codebase (cf. `v2_scan` / `v2_sample`
/ `v2_schema`). Tests inject a stub BqAccess whose `duckdb_session()` yields
an in-memory connection with a pre-attached `bq` catalog containing fixture
tables, exercising the COPY path end-to-end without any GCP traffic.
"""
import duckdb
import pytest
from contextlib import contextmanager
from pathlib import Path
from unittest.mock import MagicMock
from connectors.bigquery.access import BqAccess, BqProjects
from connectors.bigquery.extractor import materialize_query, MaterializeBudgetError
def _make_stub_bq(tables: dict[str, str] | None = None) -> BqAccess:
"""Return a BqAccess wired to factories that yield an in-memory DuckDB
with a pretend `bq` catalog containing test tables. `tables` maps
DuckDB-three-part references like `'bq.test.orders'` to a SELECT
expression to seed them with.
"""
tables = tables or {}
@contextmanager
def _session(_projects):
conn = duckdb.connect(":memory:")
try:
conn.execute("ATTACH ':memory:' AS bq")
schemas = {ref.rsplit(".", 1)[0] for ref in tables}
for s in schemas:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
for ref, body in tables.items():
conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}")
yield conn
finally:
conn.close()
# client_factory returns a stub whose .query(sql, job_config=...) yields
# a job whose .total_bytes_processed defaults to 0 (fail-open).
def _client(_projects):
client = MagicMock()
job = MagicMock()
job.total_bytes_processed = 0
client.query.return_value = job
return client
return BqAccess(
BqProjects(billing="test-billing", data="test-data"),
client_factory=_client,
duckdb_session_factory=_session,
)
def test_materialize_writes_parquet_and_returns_stats(tmp_path):
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _make_stub_bq({
"bq.test.orders": (
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
"SELECT 'US' AS region, 250 AS revenue"
)
})
stats = materialize_query(
table_id="orders_summary",
sql="SELECT region, SUM(revenue) AS revenue FROM bq.test.orders GROUP BY 1",
bq=bq,
output_dir=str(out),
)
parquet_path = out / "data" / "orders_summary.parquet"
assert parquet_path.exists()
assert stats["rows"] == 2
assert stats["size_bytes"] > 0
assert stats["query_mode"] == "materialized"
# Parquet readable end-to-end
rows = duckdb.connect().execute(
f"SELECT region, revenue FROM read_parquet('{parquet_path}') ORDER BY region"
).fetchall()
assert rows == [("EU", 100), ("US", 250)]
def test_materialize_atomic_on_failure(tmp_path):
"""Bad SQL must not leave a half-written parquet behind."""
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
parquet_path = out / "data" / "broken.parquet"
bq = _make_stub_bq({"bq.test.orders": "SELECT 1 AS n"})
with pytest.raises(Exception):
materialize_query(
table_id="broken",
sql="SELECT * FROM bq.test.does_not_exist",
bq=bq,
output_dir=str(out),
)
assert not parquet_path.exists()
# Tmp also cleaned
assert not (out / "data" / "broken.parquet.tmp").exists()
def test_materialize_rejects_unsafe_table_id(tmp_path):
"""table_id becomes the parquet filename — block path traversal up front."""
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _make_stub_bq()
with pytest.raises(ValueError, match="unsafe"):
materialize_query(
table_id="../etc/passwd",
sql="SELECT 1",
bq=bq,
output_dir=str(out),
)
def test_materialize_overwrites_existing_parquet(tmp_path):
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _make_stub_bq({"bq.test.tiny": "SELECT 1 AS n"})
materialize_query(
table_id="t1", sql="SELECT 1 AS n",
bq=bq, output_dir=str(out),
)
materialize_query(
table_id="t1", sql="SELECT 2 AS n",
bq=bq, output_dir=str(out),
)
rows = duckdb.connect().execute(
f"SELECT n FROM read_parquet('{out}/data/t1.parquet')"
).fetchall()
assert rows == [(2,)]