From ef74ec010c6331c19a927d4b4416c0cd273b8da4 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr <139972147+ZdenekSrotyr@users.noreply.github.com> Date: Mon, 27 Apr 2026 21:52:46 +0200 Subject: [PATCH] =?UTF-8?q?fix(ops):=20#81=20Group=20B=20=E2=80=94=20Keboo?= =?UTF-8?q?la=20partial-failure=20exit=20code=202=20(squashed)=20(#99)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes M14 from issue #81. Keboola extractor exits 0/1/2 (success/full-fail/partial). sync.py interprets exit 2 as PARTIAL FAILURE (data-quality alert, distinct from exit 1). Tests: tests/test_keboola_extractor_exit_codes.py — 14 cases including runtime mock subprocess (rc=0/1/2/124). Refs #81 Group B. --- CHANGELOG.md | 12 ++ app/api/sync.py | 23 ++- connectors/keboola/extractor.py | 34 ++++- tests/test_keboola_extractor_exit_codes.py | 159 +++++++++++++++++++++ 4 files changed, 222 insertions(+), 6 deletions(-) create mode 100644 tests/test_keboola_extractor_exit_codes.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f1560e7..1817704 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,18 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Changed +- **BREAKING (ops)**: Keboola extractor now exits with three distinct + codes instead of two (issue #81 Group B / M14): `0` = full success, + `1` = full failure, `2` = **partial** failure (some tables succeeded, + some failed). Previously `exit(0)` fired even when 9 of 10 tables + failed, masking partial failures from the sync API and any operator + alerting hooked to non-zero exit codes. The sync API + (`POST /api/sync/trigger`) now logs `PARTIAL FAILURE (exit 2)` as a + data-quality alert (distinct from `FAILED (exit 1)`) and continues to + the orchestrator rebuild step — successful tables from this run plus + unchanged tables from previous runs stay queryable. Operators whose + alerting treated any non-zero exit as a hard error must teach it that + exit 2 is a partial-failure signal, not a deploy failure. - **BREAKING (security)**: The entire Script API is now **admin-only** (issue #44). `GET /api/scripts`, `POST /api/scripts/deploy`, `POST /api/scripts/run`, and `POST /api/scripts/{id}/run` all require the admin role; previously the list diff --git a/app/api/sync.py b/app/api/sync.py index 5586ecd..8e32d7b 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -110,10 +110,13 @@ if not url or not token: print("ERROR: Missing KEBOOLA_STACK_URL or KEBOOLA_STORAGE_TOKEN", file=sys.stderr) sys.exit(1) -from connectors.keboola.extractor import run +from connectors.keboola.extractor import run, compute_exit_code data_dir = Path(os.environ.get("DATA_DIR", "./data")) result = run(str(data_dir / "extracts" / "keboola"), configs, url, token) print(json.dumps(result)) +# Issue #81 Group B: surface partial-failure as exit 2 so the API +# caller can distinguish "every table failed" from "9/10 succeeded". +sys.exit(compute_exit_code(result, len(configs))) """] import sys as _sys @@ -129,10 +132,22 @@ print(json.dumps(result)) print(f"[SYNC] Extractor stdout: {result.stdout.strip()[-500:]}", file=_sys.stderr, flush=True) if result.stderr: print(f"[SYNC] Extractor stderr: {result.stderr[-500:]}", file=_sys.stderr, flush=True) - if result.returncode != 0: - print(f"[SYNC] Extractor FAILED (exit {result.returncode})", file=_sys.stderr, flush=True) - else: + # Issue #81 Group B: three exit codes. 0 = full success, + # 1 = full failure, 2 = partial. Partial is a data-quality + # alert, not a crash — the orchestrator's per-table _meta + # machinery already captured which tables succeeded; we just + # need to log loudly so operator alerting can pick it up. + if result.returncode == 0: print(f"[SYNC] Extractor OK", file=_sys.stderr, flush=True) + elif result.returncode == 2: + print( + f"[SYNC] Extractor PARTIAL FAILURE (exit 2) — some tables " + f"succeeded, some failed; see stderr for per-table errors. " + f"Successful tables will still be published by the orchestrator.", + file=_sys.stderr, flush=True, + ) + else: + print(f"[SYNC] Extractor FAILED (exit {result.returncode})", file=_sys.stderr, flush=True) # Run custom connectors (Tier A: local mount) connectors_dir = Path(os.environ.get("CONNECTORS_DIR", str(Path(__file__).parent.parent.parent / "connectors" / "custom"))) diff --git a/connectors/keboola/extractor.py b/connectors/keboola/extractor.py index 5ddcc00..65e462c 100644 --- a/connectors/keboola/extractor.py +++ b/connectors/keboola/extractor.py @@ -255,6 +255,30 @@ def _extract_via_legacy( os.unlink(csv_path) +def compute_exit_code(stats: Dict[str, Any], total: int) -> int: + """Map an extraction `stats` dict to a process exit code. + + Issue #81 Group B: distinguish full success from partial failure so + the sync API and CLI consumers can alert on partial vs. full failure + rather than treating any non-zero as one bucket. + + - ``0`` — every table succeeded (or no tables registered). + - ``1`` — every table failed (full failure). + - ``2`` — at least one succeeded and at least one failed (partial). + + `total` is the count of tables the extractor was asked to process. + `stats["tables_failed"]` is the count it actually failed. + """ + failed = stats.get("tables_failed", 0) + if total == 0: + return 0 + if failed == 0: + return 0 + if failed >= total: + return 1 + return 2 + + if __name__ == "__main__": """Standalone: reads config from env + table_registry, runs extraction. @@ -303,5 +327,11 @@ if __name__ == "__main__": result = run(str(data_dir / "extracts" / "keboola"), tables, url, token) logger.info("Extraction complete: %s", result) - failed = result.get("tables_failed", 0) - exit(1 if failed == len(tables) else 0) # exit 1 only if ALL tables failed + code = compute_exit_code(result, len(tables)) + if code == 2: + logger.error( + "Partial failure: %d of %d tables failed", result.get("tables_failed", 0), len(tables) + ) + elif code == 1: + logger.error("All %d tables failed", len(tables)) + exit(code) diff --git a/tests/test_keboola_extractor_exit_codes.py b/tests/test_keboola_extractor_exit_codes.py new file mode 100644 index 0000000..07a785e --- /dev/null +++ b/tests/test_keboola_extractor_exit_codes.py @@ -0,0 +1,159 @@ +"""Issue #81 Group B — Keboola extractor exit codes. + +Three contracts: +- 0 = full success (every table OK) +- 1 = full failure (every table failed) +- 2 = partial (at least one OK + at least one failed) + +Plus the sync.py interpretation: exit 2 must NOT be treated as a crash; +it logs a PARTIAL FAILURE notice and continues to the orchestrator +rebuild step (the orchestrator's per-table _meta machinery already +captures which tables succeeded). +""" + +import subprocess as subprocess_real + +import pytest + +from connectors.keboola.extractor import compute_exit_code + + +class TestComputeExitCode: + @pytest.mark.parametrize( + "stats,total,expected", + [ + # Full success + ({"tables_extracted": 10, "tables_failed": 0}, 10, 0), + # Single-table full success + ({"tables_extracted": 1, "tables_failed": 0}, 1, 0), + # No tables registered → 0 (vacuous success) + ({"tables_extracted": 0, "tables_failed": 0}, 0, 0), + # Full failure + ({"tables_extracted": 0, "tables_failed": 10}, 10, 1), + # Single-table full failure + ({"tables_extracted": 0, "tables_failed": 1}, 1, 1), + # Partial — single failure in 10 + ({"tables_extracted": 9, "tables_failed": 1}, 10, 2), + # Partial — half-and-half + ({"tables_extracted": 5, "tables_failed": 5}, 10, 2), + # Partial — only one succeeded + ({"tables_extracted": 1, "tables_failed": 9}, 10, 2), + ], + ) + def test_exit_code_matrix(self, stats, total, expected): + assert compute_exit_code(stats, total) == expected + + def test_missing_tables_failed_key_treated_as_zero(self): + """Defensive — older stats dicts without `tables_failed` should + be treated as full success.""" + assert compute_exit_code({"tables_extracted": 5}, 5) == 0 + + def test_failed_exceeds_total_still_full_failure(self): + """If somehow `tables_failed > total` (counting bug, retries), + exit 1 — not 2 — so partial-failure alerting only fires on a + legitimate mixed outcome.""" + assert compute_exit_code({"tables_failed": 11}, 10) == 1 + + +class TestSyncApiPartialFailureHandling: + """Runtime test: exit code from the extractor subprocess maps to the + correct [SYNC] log branch. Drives `_run_sync` with a mocked + `subprocess.run` and asserts the print() calls into stderr. This + catches inverted-comparison regressions (e.g. `if returncode == 1` + used for the partial branch) that a source-substring grep would + miss. + """ + + def _drive_run_sync(self, monkeypatch, capsys, returncode): + """Invoke `_run_sync` with the extractor subprocess returning + ``returncode``, return the captured stderr as a single string. + + sync.py does several `import` inside `_run_sync` (subprocess, + SyncOrchestrator, get_system_db). Stubs must target either the + global module (so the local import-from-cache picks them up) + or the runtime call sites via ``patch.object`` on the imported + names after the function has resolved them. + """ + from unittest.mock import MagicMock, patch + from app.api import sync as sync_mod + + def fake_run(*args, **kwargs): + return MagicMock( + returncode=returncode, stdout="{}", stderr="", + ) + # subprocess is imported locally inside _run_sync; patching the + # real module's run() works because Python's module cache means + # both call sites resolve to the same object. + monkeypatch.setattr(subprocess_real, "run", fake_run) + + # SyncOrchestrator is imported as `from src.orchestrator import + # SyncOrchestrator` inside _run_sync, so patching sync_mod + # doesn't reach it. Patch the source module instead. + from src import orchestrator as orch_mod + monkeypatch.setattr( + orch_mod, "SyncOrchestrator", + lambda *a, **kw: MagicMock(rebuild=MagicMock(return_value={})), + raising=False, + ) + + # Pretend a Keboola token is configured so the inline subprocess + # cmd is built (don't enter the missing-credentials early-exit). + monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "test-token") + monkeypatch.setenv("KEBOOLA_STACK_URL", "https://test.example") + + # _run_sync calls TableRegistryRepository.list_local on a real + # system DB connection. Stub the registry method directly so we + # don't need a populated DB; also stub get_system_db / + # get_data_source_type to avoid filesystem-dependency on a + # configured instance.yaml in CI. + from src.repositories.table_registry import TableRegistryRepository + monkeypatch.setattr( + TableRegistryRepository, "list_local", + lambda self, *a, **kw: [ + {"id": "x", "name": "x", "source_type": "keboola", + "bucket": "in.c-x", "source_table": "y", + "query_mode": "local"} + ], + ) + + # Stub system DB + data-source-type. sync.py does + # `from src.db import get_system_db` and + # `from app.instance_config import get_data_source_type` + # **inside** _run_sync (not module top-level), so we must patch + # on the SOURCE modules — patching sync_mod is silently + # ineffective because the local imports re-bind the names. + fake_conn = MagicMock() + fake_conn.close = MagicMock() + from src import db as db_mod + from app import instance_config as ic_mod + monkeypatch.setattr(db_mod, "get_system_db", lambda: fake_conn) + monkeypatch.setattr(ic_mod, "get_data_source_type", lambda: "keboola") + monkeypatch.setattr(ic_mod, "get_value", lambda *a, **kw: "") + + sync_mod._run_sync() + return capsys.readouterr().err + + def test_exit_0_is_logged_as_ok(self, monkeypatch, capsys): + stderr = self._drive_run_sync(monkeypatch, capsys, returncode=0) + assert "[SYNC] Extractor OK" in stderr + assert "PARTIAL FAILURE" not in stderr + assert "Extractor FAILED" not in stderr + + def test_exit_1_is_logged_as_failed(self, monkeypatch, capsys): + stderr = self._drive_run_sync(monkeypatch, capsys, returncode=1) + assert "[SYNC] Extractor FAILED (exit 1)" in stderr + assert "PARTIAL FAILURE" not in stderr + assert "Extractor OK" not in stderr + + def test_exit_2_is_logged_as_partial(self, monkeypatch, capsys): + stderr = self._drive_run_sync(monkeypatch, capsys, returncode=2) + assert "[SYNC] Extractor PARTIAL FAILURE (exit 2)" in stderr + # The partial branch must NOT also log OK or FAILED. + assert "Extractor OK" not in stderr + assert "Extractor FAILED (exit" not in stderr + + def test_exit_124_falls_through_to_failed(self, monkeypatch, capsys): + """Timeouts (124), signal kills (-N), and other non-zero codes + all hit the catchall else branch and log FAILED.""" + stderr = self._drive_run_sync(monkeypatch, capsys, returncode=124) + assert "[SYNC] Extractor FAILED (exit 124)" in stderr