Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors. Backend (BigQuery + Keboola, schema v20): - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19) - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload Operator + CLI surface: - da admin register-table --query / --query-mode materialized - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators Tests (incl. spike + integration + regression): - test_db_migration_v20, test_table_registry_source_query - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds) - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized - test_api_admin_materialized, test_cli_admin_materialized - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
335 lines
12 KiB
Python
335 lines
12 KiB
Python
"""End-to-end integration coverage for query_mode='materialized'.
|
|
|
|
Unit tests verify each piece in isolation; this file glues them together:
|
|
|
|
1. Admin POST /api/admin/register-table (materialized) → registry row written
|
|
2. _run_materialized_pass writes parquet + sync_state with correct hash
|
|
3. GET /api/sync/manifest (per-user) returns the row with query_mode +
|
|
the parquet hash, filtered by RBAC
|
|
4. Mode-switch transitions (remote → materialized, materialized → SQL edit
|
|
preserves registered_at) maintain registry invariants.
|
|
|
|
Devil's-advocate review found these were the gaps the unit tests left
|
|
open. Each piece passes in isolation; this file proves they compose.
|
|
"""
|
|
import duckdb
|
|
import hashlib
|
|
import pytest
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from connectors.bigquery.access import BqAccess, BqProjects
|
|
from src.repositories.table_registry import TableRegistryRepository
|
|
from src.repositories.sync_state import SyncStateRepository
|
|
|
|
|
|
def _auth(token):
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
@pytest.fixture
|
|
def bq_instance(monkeypatch):
|
|
"""Force instance.yaml to look like a BigQuery deployment so the BQ
|
|
register validator's project_id check passes."""
|
|
fake_cfg = {
|
|
"data_source": {
|
|
"type": "bigquery",
|
|
"bigquery": {"project": "my-test-project", "location": "us"},
|
|
},
|
|
}
|
|
monkeypatch.setattr(
|
|
"app.instance_config.load_instance_config",
|
|
lambda: fake_cfg,
|
|
raising=False,
|
|
)
|
|
from app.instance_config import reset_cache
|
|
reset_cache()
|
|
yield fake_cfg
|
|
reset_cache()
|
|
|
|
|
|
@pytest.fixture
|
|
def stub_bq_extractor(monkeypatch):
|
|
"""Mirror tests/test_admin_bq_register.py::stub_bq_extractor — replaces
|
|
rebuild_from_registry + SyncOrchestrator so the API's post-register
|
|
materialize doesn't hit real BQ during HTTP-driven tests."""
|
|
rebuild_mock = MagicMock(return_value={
|
|
"project_id": "my-test-project",
|
|
"tables_registered": 1,
|
|
"errors": [],
|
|
"skipped": False,
|
|
})
|
|
monkeypatch.setattr(
|
|
"connectors.bigquery.extractor.rebuild_from_registry",
|
|
rebuild_mock,
|
|
)
|
|
orch_mock = MagicMock()
|
|
monkeypatch.setattr(
|
|
"src.orchestrator.SyncOrchestrator",
|
|
lambda *a, **kw: orch_mock,
|
|
)
|
|
return {"rebuild": rebuild_mock, "orchestrator": orch_mock}
|
|
|
|
|
|
@pytest.fixture
|
|
def stub_bq():
|
|
"""Real-shape BqAccess wired to in-memory DuckDB factories so the
|
|
materialize_query path can run end-to-end without GCP."""
|
|
@contextmanager
|
|
def _session(_p):
|
|
conn = duckdb.connect(":memory:")
|
|
try:
|
|
conn.execute("ATTACH ':memory:' AS bq")
|
|
conn.execute("CREATE SCHEMA bq.test")
|
|
conn.execute(
|
|
"CREATE OR REPLACE TABLE bq.test.orders AS "
|
|
"SELECT 'EU' AS region, 100 AS revenue UNION ALL "
|
|
"SELECT 'US' AS region, 250 AS revenue"
|
|
)
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
return BqAccess(
|
|
BqProjects(billing="my-test-project", data="my-test-project"),
|
|
client_factory=lambda _p: MagicMock(),
|
|
duckdb_session_factory=_session,
|
|
)
|
|
|
|
|
|
def test_e2e_register_then_materialize_then_manifest_via_repo(
|
|
bq_instance, stub_bq, tmp_path, monkeypatch,
|
|
):
|
|
"""Glue test: register row at the repository layer (skips HTTP/auth),
|
|
run the materialized pass, verify sync_state, then exercise the
|
|
`_build_manifest_for_user` admin path. Catches integration breakage
|
|
that unit tests miss because each only sees one layer."""
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
|
|
db_path = tmp_path / "system.duckdb"
|
|
conn = duckdb.connect(str(db_path))
|
|
from src.db import _ensure_schema
|
|
_ensure_schema(conn)
|
|
|
|
table_id = "orders_summary_e2e"
|
|
repo = TableRegistryRepository(conn)
|
|
repo.register(
|
|
id=table_id, name=table_id, source_type="bigquery",
|
|
query_mode="materialized",
|
|
source_query="SELECT region, SUM(revenue) AS revenue "
|
|
"FROM bq.test.orders GROUP BY 1",
|
|
sync_schedule="every 1m",
|
|
)
|
|
|
|
# Run the materialized pass.
|
|
from app.api import sync as sync_mod
|
|
summary = sync_mod._run_materialized_pass(conn, stub_bq)
|
|
assert table_id in summary["materialized"], summary
|
|
assert not summary["errors"]
|
|
|
|
# Parquet on disk.
|
|
parquet_path = (
|
|
tmp_path / "data" / "extracts" / "bigquery" / "data"
|
|
/ f"{table_id}.parquet"
|
|
)
|
|
assert parquet_path.exists(), f"Expected {parquet_path} to exist"
|
|
|
|
# sync_state hash matches the file's MD5.
|
|
expected_hash = hashlib.md5(parquet_path.read_bytes()).hexdigest()
|
|
state = SyncStateRepository(conn)
|
|
row = state.get_table_state(table_id)
|
|
assert row is not None
|
|
assert row["hash"] == expected_hash
|
|
assert row["rows"] == 2
|
|
|
|
# Manifest builder exposes query_mode + hash to admin (no RBAC filter).
|
|
# Post-#150 RBAC: admin shortcut keys on Admin user_group membership,
|
|
# not the legacy `users.role` column. Seed the user + Admin membership
|
|
# so `can_access_table` short-circuits to True.
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO users (id, email) VALUES ('u-admin', 'admin@test')"
|
|
)
|
|
admin_group_id = conn.execute(
|
|
"SELECT id FROM user_groups WHERE name = 'Admin'"
|
|
).fetchone()[0]
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO user_group_members (user_id, group_id, source) "
|
|
"VALUES ('u-admin', ?, 'admin')",
|
|
[admin_group_id],
|
|
)
|
|
admin_user = {"id": "u-admin", "email": "admin@test"}
|
|
manifest = sync_mod._build_manifest_for_user(conn, admin_user)
|
|
assert table_id in manifest["tables"]
|
|
entry = manifest["tables"][table_id]
|
|
assert entry["query_mode"] == "materialized"
|
|
assert entry["hash"] == expected_hash
|
|
assert entry["rows"] == 2
|
|
|
|
conn.close()
|
|
|
|
|
|
def test_remote_to_materialized_transition_clears_bucket_table(
|
|
seeded_app, bq_instance, stub_bq_extractor,
|
|
):
|
|
"""Switching a remote BQ row to materialized must accept source_query
|
|
and the merged validator must not trip on the now-irrelevant
|
|
bucket/source_table fields."""
|
|
c = seeded_app["client"]
|
|
token = seeded_app["admin_token"]
|
|
|
|
# Seed a remote row.
|
|
r = c.post("/api/admin/register-table", json={
|
|
"name": "live_to_mat",
|
|
"source_type": "bigquery",
|
|
"bucket": "analytics",
|
|
"source_table": "orders",
|
|
"query_mode": "remote",
|
|
}, headers=_auth(token))
|
|
assert r.status_code in (200, 202), r.json()
|
|
table_id = r.json()["id"]
|
|
|
|
# Switch to materialized — must include source_query for the validator.
|
|
r2 = c.put(f"/api/admin/registry/{table_id}", json={
|
|
"query_mode": "materialized",
|
|
"source_query": "SELECT 1 AS n",
|
|
}, headers=_auth(token))
|
|
assert r2.status_code == 200, r2.json()
|
|
|
|
# Verify the merged record reflects the switch.
|
|
r3 = c.get("/api/admin/registry", headers=_auth(token))
|
|
row = next((t for t in r3.json()["tables"] if t["id"] == table_id), None)
|
|
assert row is not None
|
|
assert row["query_mode"] == "materialized"
|
|
assert row["source_query"] == "SELECT 1 AS n"
|
|
|
|
|
|
def test_materialized_sql_edit_preserves_registered_at(
|
|
seeded_app, bq_instance, stub_bq_extractor, monkeypatch,
|
|
):
|
|
"""Editing source_query on an existing materialized row must not
|
|
reset registered_at — the row's registration history is preserved
|
|
across SQL edits (issue #130 invariant)."""
|
|
c = seeded_app["client"]
|
|
token = seeded_app["admin_token"]
|
|
|
|
# Seed a materialized row.
|
|
r = c.post("/api/admin/register-table", json={
|
|
"name": "sql_edit_target",
|
|
"source_type": "bigquery",
|
|
"query_mode": "materialized",
|
|
"source_query": "SELECT 1 AS n",
|
|
}, headers=_auth(token))
|
|
assert r.status_code == 201, r.json()
|
|
table_id = r.json()["id"]
|
|
|
|
# Capture the original registered_at.
|
|
r2 = c.get("/api/admin/registry", headers=_auth(token))
|
|
row = next((t for t in r2.json()["tables"] if t["id"] == table_id), None)
|
|
original_ts = row["registered_at"]
|
|
assert original_ts is not None
|
|
|
|
# Edit the SQL.
|
|
import time
|
|
time.sleep(0.01) # ensure a clock tick elapses so a fresh stamp would differ
|
|
r3 = c.put(f"/api/admin/registry/{table_id}", json={
|
|
"query_mode": "materialized",
|
|
"source_query": "SELECT 2 AS n",
|
|
}, headers=_auth(token))
|
|
assert r3.status_code == 200, r3.json()
|
|
|
|
r4 = c.get("/api/admin/registry", headers=_auth(token))
|
|
row = next((t for t in r4.json()["tables"] if t["id"] == table_id), None)
|
|
assert row["source_query"] == "SELECT 2 AS n"
|
|
# registered_at preserved across edit
|
|
assert row["registered_at"] == original_ts, (
|
|
f"Expected registered_at preserved (issue #130 contract). "
|
|
f"Original: {original_ts}, after edit: {row['registered_at']}"
|
|
)
|
|
|
|
|
|
def test_materialized_zero_rows_logs_warning(stub_bq, tmp_path, caplog):
|
|
"""Devil's-advocate item: an SQL filter that returns 0 rows is
|
|
indistinguishable from 'SQL is wrong'. Confirm we log a WARNING so
|
|
operators can grep on it."""
|
|
import logging
|
|
from connectors.bigquery.extractor import materialize_query
|
|
|
|
out = tmp_path / "extracts" / "bigquery"
|
|
out.mkdir(parents=True)
|
|
|
|
# Add an empty BQ table to the stub for this test.
|
|
@contextmanager
|
|
def _session_empty(_p):
|
|
conn = duckdb.connect(":memory:")
|
|
try:
|
|
conn.execute("ATTACH ':memory:' AS bq")
|
|
conn.execute("CREATE SCHEMA bq.test")
|
|
conn.execute("CREATE OR REPLACE TABLE bq.test.empty AS "
|
|
"SELECT 1 AS n WHERE FALSE")
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
bq_empty = BqAccess(
|
|
BqProjects(billing="t", data="t"),
|
|
client_factory=lambda _p: MagicMock(),
|
|
duckdb_session_factory=_session_empty,
|
|
)
|
|
|
|
with caplog.at_level(logging.WARNING, logger="connectors.bigquery.extractor"):
|
|
stats = materialize_query(
|
|
table_id="empty_t",
|
|
sql="SELECT * FROM bq.test.empty",
|
|
bq=bq_empty,
|
|
output_dir=str(out),
|
|
)
|
|
|
|
assert stats["rows"] == 0
|
|
assert any("0 rows" in rec.message for rec in caplog.records), (
|
|
f"Expected '0 rows' WARNING; got: {[r.message for r in caplog.records]}"
|
|
)
|
|
|
|
|
|
def test_attach_real_error_propagates(stub_bq, tmp_path):
|
|
"""ATTACH 'project=...' that fails for a real reason (not the
|
|
'already attached' tolerated case) must propagate so callers see
|
|
the actual error instead of a confusing downstream 'bq is not
|
|
attached' message."""
|
|
from connectors.bigquery.extractor import materialize_query
|
|
|
|
out = tmp_path / "extracts" / "bigquery"
|
|
out.mkdir(parents=True)
|
|
|
|
@contextmanager
|
|
def _session_attach_fails(_p):
|
|
conn = duckdb.connect(":memory:")
|
|
try:
|
|
# Force ATTACH 'project=...' to raise something other than
|
|
# "already attached" by intercepting via execute wrapper —
|
|
# since DuckDB's real connection doesn't accept attribute
|
|
# patches, we use a thin proxy for this test.
|
|
class _Proxy:
|
|
def __init__(self, real):
|
|
self._real = real
|
|
def execute(self, sql, *a, **kw):
|
|
if sql.startswith("ATTACH 'project="):
|
|
raise duckdb.Error("fake permission denied: missing serviceusage.services.use")
|
|
return self._real.execute(sql, *a, **kw)
|
|
def __getattr__(self, name):
|
|
return getattr(self._real, name)
|
|
def close(self):
|
|
return self._real.close()
|
|
yield _Proxy(conn)
|
|
finally:
|
|
conn.close()
|
|
|
|
bq_bad = BqAccess(
|
|
BqProjects(billing="t", data="t"),
|
|
client_factory=lambda _p: MagicMock(),
|
|
duckdb_session_factory=_session_attach_fails,
|
|
)
|
|
|
|
with pytest.raises(duckdb.Error, match="permission denied"):
|
|
materialize_query(
|
|
table_id="x", sql="SELECT 1",
|
|
bq=bq_bad, output_dir=str(out),
|
|
)
|