agnes-the-ai-analyst/tests/test_materialized_e2e.py
ZdenekSrotyr 85d3810535 feat(materialized): query_mode='materialized' for BigQuery + Keboola — admin SELECT → parquet → analyst
Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors.

Backend (BigQuery + Keboola, schema v20):
  - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19)
  - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state
  - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc
  - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows
  - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query
  - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload

Operator + CLI surface:
  - da admin register-table --query / --query-mode materialized
  - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators

Tests (incl. spike + integration + regression):
  - test_db_migration_v20, test_table_registry_source_query
  - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips
  - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds)
  - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized
  - test_api_admin_materialized, test_cli_admin_materialized
  - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e

Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
2026-05-01 20:25:56 +02:00

335 lines
12 KiB
Python

"""End-to-end integration coverage for query_mode='materialized'.
Unit tests verify each piece in isolation; this file glues them together:
1. Admin POST /api/admin/register-table (materialized) → registry row written
2. _run_materialized_pass writes parquet + sync_state with correct hash
3. GET /api/sync/manifest (per-user) returns the row with query_mode +
the parquet hash, filtered by RBAC
4. Mode-switch transitions (remote → materialized, materialized → SQL edit
preserves registered_at) maintain registry invariants.
Devil's-advocate review found these were the gaps the unit tests left
open. Each piece passes in isolation; this file proves they compose.
"""
import duckdb
import hashlib
import pytest
from contextlib import contextmanager
from pathlib import Path
from unittest.mock import patch, MagicMock
from connectors.bigquery.access import BqAccess, BqProjects
from src.repositories.table_registry import TableRegistryRepository
from src.repositories.sync_state import SyncStateRepository
def _auth(token):
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def bq_instance(monkeypatch):
"""Force instance.yaml to look like a BigQuery deployment so the BQ
register validator's project_id check passes."""
fake_cfg = {
"data_source": {
"type": "bigquery",
"bigquery": {"project": "my-test-project", "location": "us"},
},
}
monkeypatch.setattr(
"app.instance_config.load_instance_config",
lambda: fake_cfg,
raising=False,
)
from app.instance_config import reset_cache
reset_cache()
yield fake_cfg
reset_cache()
@pytest.fixture
def stub_bq_extractor(monkeypatch):
"""Mirror tests/test_admin_bq_register.py::stub_bq_extractor — replaces
rebuild_from_registry + SyncOrchestrator so the API's post-register
materialize doesn't hit real BQ during HTTP-driven tests."""
rebuild_mock = MagicMock(return_value={
"project_id": "my-test-project",
"tables_registered": 1,
"errors": [],
"skipped": False,
})
monkeypatch.setattr(
"connectors.bigquery.extractor.rebuild_from_registry",
rebuild_mock,
)
orch_mock = MagicMock()
monkeypatch.setattr(
"src.orchestrator.SyncOrchestrator",
lambda *a, **kw: orch_mock,
)
return {"rebuild": rebuild_mock, "orchestrator": orch_mock}
@pytest.fixture
def stub_bq():
"""Real-shape BqAccess wired to in-memory DuckDB factories so the
materialize_query path can run end-to-end without GCP."""
@contextmanager
def _session(_p):
conn = duckdb.connect(":memory:")
try:
conn.execute("ATTACH ':memory:' AS bq")
conn.execute("CREATE SCHEMA bq.test")
conn.execute(
"CREATE OR REPLACE TABLE bq.test.orders AS "
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
"SELECT 'US' AS region, 250 AS revenue"
)
yield conn
finally:
conn.close()
return BqAccess(
BqProjects(billing="my-test-project", data="my-test-project"),
client_factory=lambda _p: MagicMock(),
duckdb_session_factory=_session,
)
def test_e2e_register_then_materialize_then_manifest_via_repo(
bq_instance, stub_bq, tmp_path, monkeypatch,
):
"""Glue test: register row at the repository layer (skips HTTP/auth),
run the materialized pass, verify sync_state, then exercise the
`_build_manifest_for_user` admin path. Catches integration breakage
that unit tests miss because each only sees one layer."""
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
db_path = tmp_path / "system.duckdb"
conn = duckdb.connect(str(db_path))
from src.db import _ensure_schema
_ensure_schema(conn)
table_id = "orders_summary_e2e"
repo = TableRegistryRepository(conn)
repo.register(
id=table_id, name=table_id, source_type="bigquery",
query_mode="materialized",
source_query="SELECT region, SUM(revenue) AS revenue "
"FROM bq.test.orders GROUP BY 1",
sync_schedule="every 1m",
)
# Run the materialized pass.
from app.api import sync as sync_mod
summary = sync_mod._run_materialized_pass(conn, stub_bq)
assert table_id in summary["materialized"], summary
assert not summary["errors"]
# Parquet on disk.
parquet_path = (
tmp_path / "data" / "extracts" / "bigquery" / "data"
/ f"{table_id}.parquet"
)
assert parquet_path.exists(), f"Expected {parquet_path} to exist"
# sync_state hash matches the file's MD5.
expected_hash = hashlib.md5(parquet_path.read_bytes()).hexdigest()
state = SyncStateRepository(conn)
row = state.get_table_state(table_id)
assert row is not None
assert row["hash"] == expected_hash
assert row["rows"] == 2
# Manifest builder exposes query_mode + hash to admin (no RBAC filter).
# Post-#150 RBAC: admin shortcut keys on Admin user_group membership,
# not the legacy `users.role` column. Seed the user + Admin membership
# so `can_access_table` short-circuits to True.
conn.execute(
"INSERT OR IGNORE INTO users (id, email) VALUES ('u-admin', 'admin@test')"
)
admin_group_id = conn.execute(
"SELECT id FROM user_groups WHERE name = 'Admin'"
).fetchone()[0]
conn.execute(
"INSERT OR IGNORE INTO user_group_members (user_id, group_id, source) "
"VALUES ('u-admin', ?, 'admin')",
[admin_group_id],
)
admin_user = {"id": "u-admin", "email": "admin@test"}
manifest = sync_mod._build_manifest_for_user(conn, admin_user)
assert table_id in manifest["tables"]
entry = manifest["tables"][table_id]
assert entry["query_mode"] == "materialized"
assert entry["hash"] == expected_hash
assert entry["rows"] == 2
conn.close()
def test_remote_to_materialized_transition_clears_bucket_table(
seeded_app, bq_instance, stub_bq_extractor,
):
"""Switching a remote BQ row to materialized must accept source_query
and the merged validator must not trip on the now-irrelevant
bucket/source_table fields."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Seed a remote row.
r = c.post("/api/admin/register-table", json={
"name": "live_to_mat",
"source_type": "bigquery",
"bucket": "analytics",
"source_table": "orders",
"query_mode": "remote",
}, headers=_auth(token))
assert r.status_code in (200, 202), r.json()
table_id = r.json()["id"]
# Switch to materialized — must include source_query for the validator.
r2 = c.put(f"/api/admin/registry/{table_id}", json={
"query_mode": "materialized",
"source_query": "SELECT 1 AS n",
}, headers=_auth(token))
assert r2.status_code == 200, r2.json()
# Verify the merged record reflects the switch.
r3 = c.get("/api/admin/registry", headers=_auth(token))
row = next((t for t in r3.json()["tables"] if t["id"] == table_id), None)
assert row is not None
assert row["query_mode"] == "materialized"
assert row["source_query"] == "SELECT 1 AS n"
def test_materialized_sql_edit_preserves_registered_at(
seeded_app, bq_instance, stub_bq_extractor, monkeypatch,
):
"""Editing source_query on an existing materialized row must not
reset registered_at — the row's registration history is preserved
across SQL edits (issue #130 invariant)."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Seed a materialized row.
r = c.post("/api/admin/register-table", json={
"name": "sql_edit_target",
"source_type": "bigquery",
"query_mode": "materialized",
"source_query": "SELECT 1 AS n",
}, headers=_auth(token))
assert r.status_code == 201, r.json()
table_id = r.json()["id"]
# Capture the original registered_at.
r2 = c.get("/api/admin/registry", headers=_auth(token))
row = next((t for t in r2.json()["tables"] if t["id"] == table_id), None)
original_ts = row["registered_at"]
assert original_ts is not None
# Edit the SQL.
import time
time.sleep(0.01) # ensure a clock tick elapses so a fresh stamp would differ
r3 = c.put(f"/api/admin/registry/{table_id}", json={
"query_mode": "materialized",
"source_query": "SELECT 2 AS n",
}, headers=_auth(token))
assert r3.status_code == 200, r3.json()
r4 = c.get("/api/admin/registry", headers=_auth(token))
row = next((t for t in r4.json()["tables"] if t["id"] == table_id), None)
assert row["source_query"] == "SELECT 2 AS n"
# registered_at preserved across edit
assert row["registered_at"] == original_ts, (
f"Expected registered_at preserved (issue #130 contract). "
f"Original: {original_ts}, after edit: {row['registered_at']}"
)
def test_materialized_zero_rows_logs_warning(stub_bq, tmp_path, caplog):
"""Devil's-advocate item: an SQL filter that returns 0 rows is
indistinguishable from 'SQL is wrong'. Confirm we log a WARNING so
operators can grep on it."""
import logging
from connectors.bigquery.extractor import materialize_query
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
# Add an empty BQ table to the stub for this test.
@contextmanager
def _session_empty(_p):
conn = duckdb.connect(":memory:")
try:
conn.execute("ATTACH ':memory:' AS bq")
conn.execute("CREATE SCHEMA bq.test")
conn.execute("CREATE OR REPLACE TABLE bq.test.empty AS "
"SELECT 1 AS n WHERE FALSE")
yield conn
finally:
conn.close()
bq_empty = BqAccess(
BqProjects(billing="t", data="t"),
client_factory=lambda _p: MagicMock(),
duckdb_session_factory=_session_empty,
)
with caplog.at_level(logging.WARNING, logger="connectors.bigquery.extractor"):
stats = materialize_query(
table_id="empty_t",
sql="SELECT * FROM bq.test.empty",
bq=bq_empty,
output_dir=str(out),
)
assert stats["rows"] == 0
assert any("0 rows" in rec.message for rec in caplog.records), (
f"Expected '0 rows' WARNING; got: {[r.message for r in caplog.records]}"
)
def test_attach_real_error_propagates(stub_bq, tmp_path):
"""ATTACH 'project=...' that fails for a real reason (not the
'already attached' tolerated case) must propagate so callers see
the actual error instead of a confusing downstream 'bq is not
attached' message."""
from connectors.bigquery.extractor import materialize_query
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
@contextmanager
def _session_attach_fails(_p):
conn = duckdb.connect(":memory:")
try:
# Force ATTACH 'project=...' to raise something other than
# "already attached" by intercepting via execute wrapper —
# since DuckDB's real connection doesn't accept attribute
# patches, we use a thin proxy for this test.
class _Proxy:
def __init__(self, real):
self._real = real
def execute(self, sql, *a, **kw):
if sql.startswith("ATTACH 'project="):
raise duckdb.Error("fake permission denied: missing serviceusage.services.use")
return self._real.execute(sql, *a, **kw)
def __getattr__(self, name):
return getattr(self._real, name)
def close(self):
return self._real.close()
yield _Proxy(conn)
finally:
conn.close()
bq_bad = BqAccess(
BqProjects(billing="t", data="t"),
client_factory=lambda _p: MagicMock(),
duckdb_session_factory=_session_attach_fails,
)
with pytest.raises(duckdb.Error, match="permission denied"):
materialize_query(
table_id="x", sql="SELECT 1",
bq=bq_bad, output_dir=str(out),
)