From c7c42de0f03d99a5fb2d0185272d5a6dad406608 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 18:11:38 +0200 Subject: [PATCH] feat(sync): treat MaterializeInFlightError as 'skipped, in_flight' _run_materialized_pass distinguishes due-check skips from in-flight skips and never calls state.set_error for either. summary['skipped'] becomes a list of {table, reason} dicts; the end-of-pass log line breaks out the in_flight subcount. Hoists is_table_due to module-level import so test monkeypatching of the symbol intercepts the call (the previous local import made patches a no-op). --- app/api/sync.py | 20 ++++-- ...st_run_materialized_pass_in_flight_skip.py | 66 +++++++++++++++++++ tests/test_sync_trigger_materialized.py | 3 +- 3 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 tests/test_run_materialized_pass_in_flight_skip.py diff --git a/app/api/sync.py b/app/api/sync.py index 0d6a533..6a84b5c 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -20,7 +20,7 @@ from src.repositories.sync_state import SyncStateRepository from src.repositories.sync_settings import SyncSettingsRepository from src.repositories.table_registry import TableRegistryRepository from src.rbac import can_access_table -from src.scheduler import filter_due_tables +from src.scheduler import filter_due_tables, is_table_due logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/sync", tags=["sync"]) @@ -74,9 +74,8 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: its structured fields so operator alerting can pick out the cap-vs-actual bytes from the log line. """ - from src.scheduler import is_table_due from app.instance_config import get_value - from connectors.bigquery.extractor import MaterializeBudgetError + from connectors.bigquery.extractor import MaterializeBudgetError, MaterializeInFlightError bq_output_dir = str(Path(_get_data_dir()) / "extracts" / "bigquery") kb_output_dir = Path(_get_data_dir()) / "extracts" / "keboola" / "data" @@ -125,7 +124,7 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: last_iso = last.isoformat() if last else None schedule = row.get("sync_schedule") or "every 1h" if not is_table_due(schedule, last_iso): - summary["skipped"].append(ref_name) + summary["skipped"].append({"table": ref_name, "reason": "due_check"}) continue source_type = row.get("source_type") or "bigquery" # legacy default @@ -195,6 +194,13 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: ), }) continue + except MaterializeInFlightError: + # In-flight on a sibling worker / scheduler tick — treat as + # 'skipped, in-flight'. Do NOT call state.set_error: that + # would flip status='error' on a healthy concurrent run and + # the registry UI would surface a false-positive failure. + summary["skipped"].append({"table": ref_name, "reason": "in_flight"}) + continue except MaterializeBudgetError as e: logger.warning( "Materialize cap exceeded for %s: %s bytes > %s bytes", @@ -466,9 +472,13 @@ sys.exit(compute_exit_code(result, len(configs))) mat_summary = _run_materialized_pass(mat_conn, bq_access) finally: mat_conn.close() + skipped_count = len(mat_summary["skipped"]) + in_flight_count = sum( + 1 for s in mat_summary["skipped"] if s.get("reason") == "in_flight" + ) print( f"[SYNC] Materialized SQL: {len(mat_summary['materialized'])} ok, " - f"{len(mat_summary['skipped'])} skipped, " + f"{skipped_count} skipped (in_flight={in_flight_count}), " f"{len(mat_summary['errors'])} errors", file=_sys.stderr, flush=True, ) diff --git a/tests/test_run_materialized_pass_in_flight_skip.py b/tests/test_run_materialized_pass_in_flight_skip.py new file mode 100644 index 0000000..0dae36b --- /dev/null +++ b/tests/test_run_materialized_pass_in_flight_skip.py @@ -0,0 +1,66 @@ +"""When materialize_query raises MaterializeInFlightError, _run_materialized_pass +must record it as a 'skipped, in_flight' outcome and NOT call state.set_error +(otherwise sync_state surfaces a false-positive 'failure' for a healthy +in-progress run).""" +from __future__ import annotations +from unittest.mock import MagicMock, patch + +import pytest + +from app.api.sync import _run_materialized_pass +from connectors.bigquery.extractor import MaterializeInFlightError + + +@pytest.fixture +def fake_registry_with_one_materialized(monkeypatch, tmp_path): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + rows = [{ + "id": "in_flight_t", + "name": "in_flight_t", + "query_mode": "materialized", + "source_type": "bigquery", + "source_query": "SELECT * FROM `ds.t`", + "sync_schedule": None, + }] + + class _Repo: + def __init__(self, conn): pass + def list_all(self): return rows + + class _State: + def __init__(self, conn): + self.set_error_calls = [] + self.update_sync_calls = [] + def get_last_sync(self, _id): return None + def set_error(self, table_id, msg): self.set_error_calls.append((table_id, msg)) + def update_sync(self, **kw): self.update_sync_calls.append(kw) + + state = _State(None) + monkeypatch.setattr("app.api.sync.TableRegistryRepository", _Repo) + monkeypatch.setattr("app.api.sync.SyncStateRepository", lambda c: state) + return state + + +def test_in_flight_recorded_as_skipped_not_error(fake_registry_with_one_materialized): + state = fake_registry_with_one_materialized + + with patch( + "app.api.sync._materialize_table", + side_effect=MaterializeInFlightError("in_flight_t", layer="process"), + ): + summary = _run_materialized_pass(MagicMock(), MagicMock()) + + assert summary["materialized"] == [] + assert summary["errors"] == [] + assert len(summary["skipped"]) == 1 + skipped = summary["skipped"][0] + assert skipped == {"table": "in_flight_t", "reason": "in_flight"} + assert state.set_error_calls == [] + assert state.update_sync_calls == [] + + +def test_due_check_skipped_uses_due_check_reason(fake_registry_with_one_materialized, monkeypatch): + monkeypatch.setattr("app.api.sync.is_table_due", lambda *a, **k: False) + + summary = _run_materialized_pass(MagicMock(), MagicMock()) + assert summary["skipped"] == [{"table": "in_flight_t", "reason": "due_check"}] diff --git a/tests/test_sync_trigger_materialized.py b/tests/test_sync_trigger_materialized.py index 6f94e91..7667a5b 100644 --- a/tests/test_sync_trigger_materialized.py +++ b/tests/test_sync_trigger_materialized.py @@ -102,7 +102,8 @@ def test_materialized_pass_skips_undue_rows(system_db, stub_bq): summary = sync_mod._run_materialized_pass(system_db, stub_bq) mock_mat.assert_not_called() - assert "orders_daily" in summary["skipped"] + # summary["skipped"] is now list[dict] — see PR zs/materialize-sync-fix + assert {"table": "orders_daily", "reason": "due_check"} in summary["skipped"] def test_materialized_pass_skips_non_materialized_rows(system_db, stub_bq):