agnes-the-ai-analyst/tests/test_sync_trigger_materialized.py
ZdenekSrotyr c7c42de0f0 feat(sync): treat MaterializeInFlightError as 'skipped, in_flight'
_run_materialized_pass distinguishes due-check skips from in-flight
skips and never calls state.set_error for either. summary['skipped']
becomes a list of {table, reason} dicts; the end-of-pass log line
breaks out the in_flight subcount.

Hoists is_table_due to module-level import so test monkeypatching of
the symbol intercepts the call (the previous local import made
patches a no-op).
2026-05-04 18:11:38 +02:00

485 lines
17 KiB
Python

"""_run_materialized_pass walks table_registry for materialized BQ rows
and runs each that is due via _materialize_table.
Tests inject a stub BqAccess (factories never called by these tests since
_materialize_table is patched) and assert that scheduling, error
aggregation, sync_state hash, and the disable-sentinel all behave
correctly.
"""
import duckdb
import pytest
from contextlib import contextmanager
from pathlib import Path
from unittest.mock import patch, MagicMock
from src.db import _ensure_schema
from src.repositories.table_registry import TableRegistryRepository
from src.repositories.sync_state import SyncStateRepository
from connectors.bigquery.access import BqAccess, BqProjects
@pytest.fixture
def system_db(tmp_path, monkeypatch):
db_path = tmp_path / "system.duckdb"
conn = duckdb.connect(str(db_path))
_ensure_schema(conn)
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
yield conn
conn.close()
@pytest.fixture
def stub_bq():
"""A BqAccess instance that the tests don't actually exercise (the test
patches `_materialize_table`); just needs to be a valid BqAccess so the
type contract doesn't break."""
@contextmanager
def _session(_p):
conn = duckdb.connect(":memory:")
try:
yield conn
finally:
conn.close()
return BqAccess(
BqProjects(billing="t", data="t"),
client_factory=lambda _p: MagicMock(),
duckdb_session_factory=_session,
)
def test_materialized_pass_calls_materialize_for_due_rows(system_db, stub_bq, tmp_path):
repo = TableRegistryRepository(system_db)
repo.register(
id="orders_90d", name="orders_90d",
source_type="bigquery", query_mode="materialized",
source_query="SELECT 1 AS n",
sync_schedule="every 1m", # always due in tests (no prior sync)
)
# Pre-create the parquet so _file_hash returns non-empty
parquet_dir = tmp_path / "data" / "extracts" / "bigquery" / "data"
parquet_dir.mkdir(parents=True, exist_ok=True)
(parquet_dir / "orders_90d.parquet").write_bytes(
b"PAR1" + b"\x00" * 16 + b"PAR1"
)
from app.api import sync as sync_mod
with patch("app.api.sync._materialize_table") as mock_mat:
mock_mat.return_value = {
"rows": 1, "size_bytes": 100, "query_mode": "materialized",
}
summary = sync_mod._run_materialized_pass(system_db, stub_bq)
mock_mat.assert_called_once()
call_kwargs = mock_mat.call_args.kwargs
assert call_kwargs["table_id"] == "orders_90d"
assert "SELECT 1 AS n" in call_kwargs["sql"]
assert call_kwargs["bq"] is stub_bq
# Default cap (10 GiB) flows through when no instance.yaml override
assert call_kwargs["max_bytes"] == 10 * 2**30
assert "orders_90d" in summary["materialized"]
assert not summary["errors"]
def test_materialized_pass_skips_undue_rows(system_db, stub_bq):
repo = TableRegistryRepository(system_db)
repo.register(
id="orders_daily", name="orders_daily",
source_type="bigquery", query_mode="materialized",
source_query="SELECT 1",
sync_schedule="daily 03:00",
)
state = SyncStateRepository(system_db)
state.update_sync(
table_id="orders_daily", rows=1, file_size_bytes=10, hash="x",
)
from app.api import sync as sync_mod
with patch("app.api.sync._materialize_table") as mock_mat:
summary = sync_mod._run_materialized_pass(system_db, stub_bq)
mock_mat.assert_not_called()
# summary["skipped"] is now list[dict] — see PR zs/materialize-sync-fix
assert {"table": "orders_daily", "reason": "due_check"} in summary["skipped"]
def test_materialized_pass_skips_non_materialized_rows(system_db, stub_bq):
repo = TableRegistryRepository(system_db)
repo.register(id="t1", name="t1", source_type="keboola", query_mode="local")
repo.register(id="t2", name="t2", source_type="bigquery", query_mode="remote")
from app.api import sync as sync_mod
with patch("app.api.sync._materialize_table") as mock_mat:
summary = sync_mod._run_materialized_pass(system_db, stub_bq)
mock_mat.assert_not_called()
assert summary == {"materialized": [], "skipped": [], "errors": []}
def test_materialized_pass_collects_errors_per_row(system_db, stub_bq, tmp_path):
"""One row failing must not stop a healthy sibling."""
repo = TableRegistryRepository(system_db)
repo.register(
id="ok", name="ok", source_type="bigquery",
query_mode="materialized", source_query="SELECT 1",
sync_schedule="every 1m",
)
repo.register(
id="bad", name="bad", source_type="bigquery",
query_mode="materialized", source_query="SELECT broken",
sync_schedule="every 1m",
)
parquet_dir = tmp_path / "data" / "extracts" / "bigquery" / "data"
parquet_dir.mkdir(parents=True, exist_ok=True)
(parquet_dir / "ok.parquet").write_bytes(b"PAR1" + b"\x00" * 16 + b"PAR1")
from app.api import sync as sync_mod
def _fake(table_id, sql, bq, output_dir, max_bytes):
if table_id == "bad":
raise RuntimeError("simulated COPY failure")
return {"rows": 1, "size_bytes": 100, "query_mode": "materialized"}
with patch("app.api.sync._materialize_table", side_effect=_fake):
summary = sync_mod._run_materialized_pass(system_db, stub_bq)
assert summary["materialized"] == ["ok"]
assert len(summary["errors"]) == 1
assert summary["errors"][0]["table"] == "bad"
assert "simulated" in summary["errors"][0]["error"]
def test_materialized_pass_records_parquet_hash(system_db, stub_bq, tmp_path):
"""sync_state.hash must be the MD5 of the parquet file — otherwise the
manifest reports an empty hash and every da sync re-downloads."""
repo = TableRegistryRepository(system_db)
repo.register(
id="hashed", name="hashed",
source_type="bigquery", query_mode="materialized",
source_query="SELECT 1",
sync_schedule="every 1m",
)
parquet_dir = tmp_path / "data" / "extracts" / "bigquery" / "data"
parquet_dir.mkdir(parents=True, exist_ok=True)
parquet_path = parquet_dir / "hashed.parquet"
def _fake(**kwargs):
parquet_path.write_bytes(b"PAR1" + b"\x00" * 16 + b"PAR1")
return {"rows": 1, "size_bytes": 24, "query_mode": "materialized"}
from app.api import sync as sync_mod
with patch("app.api.sync._materialize_table", side_effect=_fake):
sync_mod._run_materialized_pass(system_db, stub_bq)
state = SyncStateRepository(system_db)
row = state.get_table_state("hashed")
assert row is not None
import hashlib
expected = hashlib.md5(b"PAR1" + b"\x00" * 16 + b"PAR1").hexdigest()
assert row["hash"] == expected
def test_run_sync_keboola_timeout_does_not_skip_materialized(tmp_path, monkeypatch):
"""REGRESSION (Devin BUG_0001 on 2219255): when the Keboola extractor
subprocess raises TimeoutExpired, the materialized BQ pass and the
orchestrator rebuild must still fire. Pre-fix the timeout propagated
to the outer except handler and skipped the rest of _run_sync — on
a dual-source deployment a slow Keboola extractor would silently
block all materialized parquets and the master-view rebuild until
the next trigger."""
import duckdb
import subprocess as _sp
from src.db import _ensure_schema
db_path = tmp_path / "system.duckdb"
conn = duckdb.connect(str(db_path))
_ensure_schema(conn)
repo = TableRegistryRepository(conn)
# One Keboola row (drives the extractor subprocess) + one materialized
# BQ row (must still run after the Keboola timeout).
repo.register(
id="kbc_a", name="kbc_a", source_type="keboola",
query_mode="local", bucket="in.c-foo", source_table="a",
)
repo.register(
id="m1", name="m1", source_type="bigquery",
query_mode="materialized",
source_query="SELECT 1",
sync_schedule="every 1m",
)
conn.close()
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("KEBOOLA_STACK_URL", "https://example.invalid")
monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "fake")
from app.api import sync as sync_mod
# Subprocess raises TimeoutExpired the moment _run_sync calls it.
def _timeout(*a, **kw):
raise _sp.TimeoutExpired(cmd=["fake"], timeout=1)
monkeypatch.setattr(sync_mod.subprocess, "run", _timeout)
materialized_called = {"count": 0}
orchestrator_called = {"count": 0}
def _spy_materialized(_conn, _bq):
materialized_called["count"] += 1
return {"materialized": ["m1"], "skipped": [], "errors": []}
class _OrchStub:
def rebuild(self):
orchestrator_called["count"] += 1
return {}
monkeypatch.setattr("app.api.sync._run_materialized_pass", _spy_materialized)
monkeypatch.setattr(
"src.orchestrator.SyncOrchestrator", lambda *a, **kw: _OrchStub(),
)
monkeypatch.setattr(
"app.instance_config.get_data_source_type", lambda: "keboola",
)
monkeypatch.setattr(
"app.instance_config.get_value",
lambda *args, **kw: (
"my-bq-proj" if (args and args[-1] == "project")
else kw.get("default", "")
),
)
sync_mod._run_sync()
assert materialized_called["count"] == 1, (
"materialized pass must run even when Keboola subprocess timed out"
)
assert orchestrator_called["count"] == 1, (
"orchestrator rebuild must run after Keboola timeout to publish "
"any partial / materialized parquets that did land"
)
def test_run_sync_runs_materialized_pass_on_bq_only_deployment(
tmp_path, monkeypatch,
):
"""REGRESSION (Devin BUG_0002 on 2fa44f2): on BigQuery-only deployments
`list_local('bigquery')` is always empty (BQ rows are remote or
materialized, never local). The pre-fix _run_sync early-returned in
that case → materialized pass + orchestrator rebuild were dead code.
Post-fix: run_extractor_subprocess flag skips just the Keboola
subprocess, and the materialized pass still fires."""
import duckdb
from src.db import _ensure_schema
db_path = tmp_path / "system.duckdb"
conn = duckdb.connect(str(db_path))
_ensure_schema(conn)
repo = TableRegistryRepository(conn)
# Materialized BQ row — would be invisible to list_local('bigquery').
repo.register(
id="m1", name="m1", source_type="bigquery",
query_mode="materialized",
source_query="SELECT 1",
sync_schedule="every 1m",
)
conn.close()
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
# Patch the heavy collaborators so we observe what _run_sync invoked
# without actually running BQ / orchestrator.
from app.api import sync as sync_mod
materialized_called = {"count": 0}
orchestrator_called = {"count": 0}
def _spy_materialized_pass(_conn, _bq):
materialized_called["count"] += 1
return {"materialized": ["m1"], "skipped": [], "errors": []}
class _OrchStub:
def rebuild(self):
orchestrator_called["count"] += 1
return {}
monkeypatch.setattr(
"app.api.sync._run_materialized_pass",
_spy_materialized_pass,
)
monkeypatch.setattr(
"src.orchestrator.SyncOrchestrator",
lambda *a, **kw: _OrchStub(),
)
# Pretend instance.yaml says data_source.type=bigquery
monkeypatch.setattr(
"app.instance_config.get_data_source_type",
lambda: "bigquery",
)
# bq_project must be truthy so the materialized pass branch fires.
real_get_value = sync_mod.__dict__.get("get_value")
monkeypatch.setattr(
"app.instance_config.get_value",
lambda *args, **kw: (
"my-bq-proj" if (args and args[-1] == "project")
else kw.get("default", "")
),
)
sync_mod._run_sync()
assert materialized_called["count"] == 1, (
"materialized pass must run on BQ-only deployment (no local rows)"
)
assert orchestrator_called["count"] == 1, (
"orchestrator rebuild must run so materialized parquets are picked up"
)
@pytest.mark.parametrize("yaml_value, expected_max", [
(10737418240, 10737418240), # int — canonical
(10737418240.0, 10737418240), # float — YAML often parses as float
(1e10, 10000000000), # scientific notation
("10737418240", 10737418240), # string — coerced
(0, None), # explicit disable sentinel
(None, None), # missing key
("not-a-number", None), # malformed → fail-open + warn
])
def test_materialized_pass_max_bytes_yaml_coercion(
system_db, stub_bq, tmp_path, monkeypatch, yaml_value, expected_max,
):
"""`max_bytes_per_materialize` YAML value is coerced to int regardless of
the YAML scalar type (int / float / scientific / string). Devin found
that an `isinstance(raw, int)` guard silently disabled the guardrail
on float values."""
repo = TableRegistryRepository(system_db)
repo.register(
id="t", name="t", source_type="bigquery",
query_mode="materialized", source_query="SELECT 1",
sync_schedule="every 1m",
)
parquet_dir = tmp_path / "data" / "extracts" / "bigquery" / "data"
parquet_dir.mkdir(parents=True, exist_ok=True)
(parquet_dir / "t.parquet").write_bytes(b"PAR1" + b"\x00" * 16 + b"PAR1")
captured = {}
def _spy(table_id, sql, bq, output_dir, max_bytes):
captured["max_bytes"] = max_bytes
return {"rows": 1, "size_bytes": 100, "query_mode": "materialized"}
from app.api import sync as sync_mod
with patch(
"app.instance_config.get_value",
side_effect=lambda *a, **kw: (
yaml_value if a[-1] == "max_bytes_per_materialize"
else kw.get("default", "")
),
), patch("app.api.sync._materialize_table", side_effect=_spy):
sync_mod._run_materialized_pass(system_db, stub_bq)
assert captured["max_bytes"] == expected_max
def test_materialized_pass_keys_sync_state_by_name_not_id(
system_db, stub_bq, tmp_path,
):
"""Devin review: when admin registers a name with mixed case (e.g.
"Orders_90d") the slug-derived id ("orders_90d") differs from name.
sync_state must be keyed by `name` so the manifest's `registry_by_name`
lookup resolves and `query_mode='materialized'` flows through to the
client. Otherwise CLI sees `query_mode='local'` and downloads the
wrong file or skips the row."""
repo = TableRegistryRepository(system_db)
# Mixed-case name — id will be slugified to lowercase by the API path,
# but at the repo level we control both directly.
repo.register(
id="orders_90d", name="Orders_90d",
source_type="bigquery", query_mode="materialized",
source_query="SELECT 1",
sync_schedule="every 1m",
)
# Pre-create the parquet at the NAME-keyed path.
parquet_dir = tmp_path / "data" / "extracts" / "bigquery" / "data"
parquet_dir.mkdir(parents=True, exist_ok=True)
(parquet_dir / "Orders_90d.parquet").write_bytes(
b"PAR1" + b"\x00" * 16 + b"PAR1"
)
from app.api import sync as sync_mod
captured = {}
def _spy(table_id, sql, bq, output_dir, max_bytes):
captured["table_id"] = table_id
return {"rows": 1, "size_bytes": 100, "query_mode": "materialized"}
with patch("app.api.sync._materialize_table", side_effect=_spy):
sync_mod._run_materialized_pass(system_db, stub_bq)
# materialize_query was called with the NAME, not the id.
assert captured["table_id"] == "Orders_90d"
# sync_state row keyed by name.
state = SyncStateRepository(system_db)
name_row = state.get_table_state("Orders_90d")
id_row = state.get_table_state("orders_90d")
assert name_row is not None, "sync_state should be keyed by name"
assert id_row is None, "sync_state should NOT be keyed by id"
def test_materialized_pass_zero_max_bytes_disables_guardrail(
system_db, stub_bq, tmp_path, monkeypatch
):
"""`max_bytes_per_materialize: 0` in instance.yaml → None passed downstream
so materialize_query skips the dry-run entirely."""
repo = TableRegistryRepository(system_db)
repo.register(
id="big", name="big", source_type="bigquery",
query_mode="materialized", source_query="SELECT 1",
sync_schedule="every 1m",
)
parquet_dir = tmp_path / "data" / "extracts" / "bigquery" / "data"
parquet_dir.mkdir(parents=True, exist_ok=True)
(parquet_dir / "big.parquet").write_bytes(
b"PAR1" + b"\x00" * 16 + b"PAR1"
)
monkeypatch.setattr(
"app.api.sync.get_value",
lambda *args, **kwargs: 0 if args[-1] == "max_bytes_per_materialize" else "",
raising=False,
)
from app.api import sync as sync_mod
captured = {}
def _spy(**kwargs):
captured.update(kwargs)
return {"rows": 1, "size_bytes": 100, "query_mode": "materialized"}
# The function reads `get_value` via a local import in the body — patch
# the import target instead.
with patch(
"app.instance_config.get_value",
side_effect=lambda *args, **kw: (
0 if args[-1] == "max_bytes_per_materialize"
else kw.get("default", "")
),
), patch("app.api.sync._materialize_table", side_effect=_spy):
sync_mod._run_materialized_pass(system_db, stub_bq)
assert captured["max_bytes"] is None