agnes-the-ai-analyst/tests/test_cli_admin_materialized.py
ZdenekSrotyr 85d3810535 feat(materialized): query_mode='materialized' for BigQuery + Keboola — admin SELECT → parquet → analyst
Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors.

Backend (BigQuery + Keboola, schema v20):
  - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19)
  - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state
  - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc
  - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows
  - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query
  - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload

Operator + CLI surface:
  - da admin register-table --query / --query-mode materialized
  - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators

Tests (incl. spike + integration + regression):
  - test_db_migration_v20, test_table_registry_source_query
  - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips
  - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds)
  - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized
  - test_api_admin_materialized, test_cli_admin_materialized
  - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e

Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
2026-05-01 20:25:56 +02:00

157 lines
4.8 KiB
Python

"""`da admin register-table --query-mode materialized --query @file.sql`
sends source_query in the payload; existing local/remote paths still work
unchanged."""
from typer.testing import CliRunner
from unittest.mock import MagicMock
from cli.main import app
def _fake_resp(status_code, body=None):
resp = MagicMock()
resp.status_code = status_code
resp.json = lambda: body or {"id": "x", "name": "x", "status": "registered"}
return resp
def test_register_materialized_with_inline_query(monkeypatch):
captured = {}
def fake_post(path, json):
captured["path"] = path
captured["json"] = json
return _fake_resp(201)
monkeypatch.setattr("cli.commands.admin.api_post", fake_post)
runner = CliRunner()
result = runner.invoke(app, [
"admin", "register-table", "orders_90d",
"--source-type", "bigquery",
"--query-mode", "materialized",
"--query", "SELECT date FROM `prj.ds.orders`",
"--sync-schedule", "every 6h",
])
assert result.exit_code == 0, result.stdout
assert captured["path"] == "/api/admin/register-table"
assert captured["json"]["query_mode"] == "materialized"
assert captured["json"]["source_query"] == "SELECT date FROM `prj.ds.orders`"
assert captured["json"]["sync_schedule"] == "every 6h"
def test_register_materialized_reads_query_from_file(tmp_path, monkeypatch):
sql_file = tmp_path / "orders.sql"
sql_file.write_text(
"SELECT date, SUM(revenue) FROM `prj.ds.orders` GROUP BY 1\n"
)
captured = {}
def fake_post(path, json):
captured["json"] = json
return _fake_resp(201)
monkeypatch.setattr("cli.commands.admin.api_post", fake_post)
runner = CliRunner()
result = runner.invoke(app, [
"admin", "register-table", "orders_90d",
"--source-type", "bigquery",
"--query-mode", "materialized",
"--query", f"@{sql_file}",
"--sync-schedule", "daily 03:00",
])
assert result.exit_code == 0, result.stdout
assert "SELECT date, SUM(revenue)" in captured["json"]["source_query"]
assert not captured["json"]["source_query"].endswith("\n")
def test_register_materialized_without_query_fails(monkeypatch):
"""--query-mode materialized without --query is a client-side error,
no API call made."""
called = {"count": 0}
def fake_post(*args, **kwargs):
called["count"] += 1
return _fake_resp(201)
monkeypatch.setattr("cli.commands.admin.api_post", fake_post)
runner = CliRunner()
result = runner.invoke(app, [
"admin", "register-table", "orders_90d",
"--source-type", "bigquery",
"--query-mode", "materialized",
])
assert result.exit_code != 0
assert called["count"] == 0
combined = result.stdout + (result.stderr or "")
assert "--query" in combined
def test_register_local_mode_does_not_send_source_query(monkeypatch):
"""Default local mode shouldn't send source_query — server-side
validator forbids it on local."""
captured = {}
def fake_post(path, json):
captured["json"] = json
return _fake_resp(201)
monkeypatch.setattr("cli.commands.admin.api_post", fake_post)
runner = CliRunner()
result = runner.invoke(app, [
"admin", "register-table", "kbc_orders",
"--source-type", "keboola",
"--bucket", "in.c-crm",
])
assert result.exit_code == 0
assert "source_query" not in captured["json"]
assert "sync_schedule" not in captured["json"]
def test_register_query_at_path_missing_file_fails(monkeypatch):
"""@file.sql where the file doesn't exist surfaces a clear error."""
monkeypatch.setattr(
"cli.commands.admin.api_post", lambda *a, **kw: _fake_resp(201),
)
runner = CliRunner()
result = runner.invoke(app, [
"admin", "register-table", "x",
"--source-type", "bigquery",
"--query-mode", "materialized",
"--query", "@/tmp/definitely-does-not-exist-9b4f7e2c.sql",
])
assert result.exit_code != 0
def test_register_remote_path_unchanged(monkeypatch):
"""The pre-existing --bucket / --source-table / --query-mode remote
flow still works without --query."""
captured = {}
def fake_post(path, json):
captured["json"] = json
return _fake_resp(200)
monkeypatch.setattr("cli.commands.admin.api_post", fake_post)
runner = CliRunner()
result = runner.invoke(app, [
"admin", "register-table", "live_orders",
"--source-type", "bigquery",
"--bucket", "analytics",
"--source-table", "orders",
"--query-mode", "remote",
])
assert result.exit_code == 0
assert captured["json"]["query_mode"] == "remote"
assert "source_query" not in captured["json"]
assert captured["json"]["bucket"] == "analytics"
assert captured["json"]["source_table"] == "orders"