agnes-the-ai-analyst/tests/test_bq_cost_guardrail.py
ZdenekSrotyr 85d3810535 feat(materialized): query_mode='materialized' for BigQuery + Keboola — admin SELECT → parquet → analyst
Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors.

Backend (BigQuery + Keboola, schema v20):
  - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19)
  - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state
  - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc
  - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows
  - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query
  - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload

Operator + CLI surface:
  - da admin register-table --query / --query-mode materialized
  - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators

Tests (incl. spike + integration + regression):
  - test_db_migration_v20, test_table_registry_source_query
  - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips
  - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds)
  - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized
  - test_api_admin_materialized, test_cli_admin_materialized
  - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e

Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
2026-05-01 20:25:56 +02:00

137 lines
4.5 KiB
Python

"""materialize_query refuses to run when dry-run estimate exceeds the cap.
The cap is wired through `data_source.bigquery.max_bytes_per_materialize`
(read by the trigger pass; default 10 GiB; set 0 to disable). The dry-run
itself reuses `app.api.v2_scan._bq_dry_run_bytes` so cost-estimate logic
lives in exactly one place. Fail-open behaviour (DuckDB-syntax SQL the
native BQ client can't parse → estimate=0 → COPY proceeds with a warning)
is documented and exercised here too.
"""
import duckdb
import pytest
from contextlib import contextmanager
from unittest.mock import MagicMock, patch
from connectors.bigquery.access import BqAccess, BqProjects
from connectors.bigquery.extractor import materialize_query, MaterializeBudgetError
def _bq_with_seed(tables: dict[str, str] | None = None) -> BqAccess:
"""Stub BqAccess seeded with in-memory tables (same recipe as
test_bq_materialize)."""
tables = tables or {}
@contextmanager
def _session(_projects):
conn = duckdb.connect(":memory:")
try:
conn.execute("ATTACH ':memory:' AS bq")
for s in {ref.rsplit(".", 1)[0] for ref in tables}:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
for ref, body in tables.items():
conn.execute(f"CREATE OR REPLACE TABLE {ref} AS {body}")
yield conn
finally:
conn.close()
return BqAccess(
BqProjects(billing="test-billing", data="test-data"),
client_factory=lambda _p: MagicMock(),
duckdb_session_factory=_session,
)
def test_refuses_when_estimate_exceeds_cap(tmp_path):
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _bq_with_seed({"bq.test.tiny": "SELECT 1 AS n"})
with patch(
"app.api.v2_scan._bq_dry_run_bytes", return_value=100 * 2**30
):
with pytest.raises(MaterializeBudgetError) as exc:
materialize_query(
table_id="huge",
sql="SELECT * FROM bq.test.tiny",
bq=bq,
output_dir=str(out),
max_bytes=10 * 2**30,
)
err = exc.value
assert err.table_id == "huge"
assert err.current == 100 * 2**30
assert err.limit == 10 * 2**30
def test_proceeds_when_estimate_under_cap(tmp_path):
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _bq_with_seed({"bq.test.tiny": "SELECT 1 AS n"})
with patch("app.api.v2_scan._bq_dry_run_bytes", return_value=1024):
stats = materialize_query(
table_id="tiny",
sql="SELECT * FROM bq.test.tiny",
bq=bq,
output_dir=str(out),
max_bytes=10 * 2**30,
)
assert stats["rows"] == 1
def test_no_cap_skips_dry_run(tmp_path):
"""When max_bytes=None (default), no dry-run is performed."""
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _bq_with_seed({"bq.test.tiny": "SELECT 1 AS n"})
with patch("app.api.v2_scan._bq_dry_run_bytes") as mock_dry:
stats = materialize_query(
table_id="t1",
sql="SELECT * FROM bq.test.tiny",
bq=bq,
output_dir=str(out),
)
mock_dry.assert_not_called()
assert stats["rows"] == 1
def test_zero_max_bytes_skips_dry_run(tmp_path):
"""Sentinel: max_bytes=0 disables the guardrail (config docs)."""
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _bq_with_seed({"bq.test.tiny": "SELECT 1 AS n"})
with patch("app.api.v2_scan._bq_dry_run_bytes") as mock_dry:
stats = materialize_query(
table_id="t1",
sql="SELECT * FROM bq.test.tiny",
bq=bq,
output_dir=str(out),
max_bytes=0,
)
mock_dry.assert_not_called()
assert stats["rows"] == 1
def test_dry_run_failure_is_fail_open(tmp_path):
"""If the dry-run errors (DuckDB syntax, missing google lib, transient
upstream failure) we don't block — log + proceed with COPY. Operators
who need hard-fail watch logs for the warning."""
out = tmp_path / "extracts" / "bigquery"
out.mkdir(parents=True)
bq = _bq_with_seed({"bq.test.tiny": "SELECT 1 AS n"})
with patch(
"app.api.v2_scan._bq_dry_run_bytes", side_effect=RuntimeError("boom")
):
stats = materialize_query(
table_id="t1",
sql="SELECT * FROM bq.test.tiny",
bq=bq,
output_dir=str(out),
max_bytes=10 * 2**30,
)
assert stats["rows"] == 1