diff --git a/app/api/sync.py b/app/api/sync.py index d6bcd92..bd2453e 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -146,7 +146,7 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: if keboola_access is None: from connectors.keboola.access import KeboolaAccess keboola_url = get_value( - "data_source", "keboola", "url", default="" + "data_source", "keboola", "stack_url", default="" ) or "" token_env = get_value( "data_source", "keboola", "token_env", @@ -158,7 +158,7 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict: "table": ref_name, "error": ( "Keboola URL/token not configured for " - "materialized path (data_source.keboola.url " + "materialized path (data_source.keboola.stack_url " f"+ env {token_env})" ), }) @@ -434,39 +434,40 @@ sys.exit(compute_exit_code(result, len(configs))) except subprocess.TimeoutExpired: logger.error("Custom connector %s timed out", connector_dir.name) - # Materialized BigQuery pass — runs admin-registered SQL through the - # DuckDB BQ extension (via BqAccess) and writes parquet for due rows. - # The orchestrator rebuild below picks the parquets up via the - # standard local-parquet discovery. Wrapped so a misconfigured BQ - # facade doesn't kill the Keboola path. + # Materialized SQL pass — runs admin-registered SQL through the + # source's DuckDB extension (BQ via BqAccess, Keboola via + # KeboolaAccess) and writes parquet for due rows. _run_materialized_pass + # itself dispatches by source_type, so we always run it regardless of + # which (or both) source types have a `project` / `stack_url` set — + # Keboola-only instances would otherwise silently skip Keboola + # materialized rows just because no BQ project is configured (Devin + # finding 2026-05-01: BUG_pr-review-job-3fbd31c9_0001). The BQ + # branch inside _run_materialized_pass uses a per-row try/except so + # the sentinel BqAccess (not_configured) raises a typed error that + # gets recorded against that row only — no cascade. try: - from app.instance_config import get_value as _get_value - bq_project = _get_value( - "data_source", "bigquery", "project", default="" - ) or "" - if bq_project: - from connectors.bigquery.access import get_bq_access - from src.db import get_system_db as _get_system_db - bq_access = get_bq_access() - mat_conn = _get_system_db() - try: - mat_summary = _run_materialized_pass(mat_conn, bq_access) - finally: - mat_conn.close() + from connectors.bigquery.access import get_bq_access + from src.db import get_system_db as _get_system_db + bq_access = get_bq_access() # sentinel if no BQ project; OK + mat_conn = _get_system_db() + try: + mat_summary = _run_materialized_pass(mat_conn, bq_access) + finally: + mat_conn.close() + print( + f"[SYNC] Materialized SQL: {len(mat_summary['materialized'])} ok, " + f"{len(mat_summary['skipped'])} skipped, " + f"{len(mat_summary['errors'])} errors", + file=_sys.stderr, flush=True, + ) + for err in mat_summary["errors"]: print( - f"[SYNC] Materialized BQ: {len(mat_summary['materialized'])} ok, " - f"{len(mat_summary['skipped'])} skipped, " - f"{len(mat_summary['errors'])} errors", + f"[SYNC] {err['table']}: {err['error']}", file=_sys.stderr, flush=True, ) - for err in mat_summary["errors"]: - print( - f"[SYNC] {err['table']}: {err['error']}", - file=_sys.stderr, flush=True, - ) except Exception as e: print( - f"[SYNC] Materialized BQ pass FAILED: {e}", + f"[SYNC] Materialized SQL pass FAILED: {e}", file=_sys.stderr, flush=True, ) traceback.print_exc() diff --git a/connectors/keboola/extractor.py b/connectors/keboola/extractor.py index 828542b..cadf985 100644 --- a/connectors/keboola/extractor.py +++ b/connectors/keboola/extractor.py @@ -43,23 +43,41 @@ def materialize_query( raise ValueError(f"unsafe table_id for materialize: {table_id!r}") parquet_path = Path(output_dir) / f"{table_id}.parquet" - safe_pq_lit = str(parquet_path).replace("'", "''") + tmp_path = Path(output_dir) / f"{table_id}.parquet.tmp" + if tmp_path.exists(): + tmp_path.unlink() + safe_tmp_lit = str(tmp_path).replace("'", "''") + # Atomic write — mirror BQ's pattern at connectors/bigquery/extractor.py:370. + # COPY into a `.parquet.tmp`, hash + size from the tmp file, only swap to + # the final path on success. A mid-COPY failure (network, disk full, + # extension crash) leaves no partial parquet at the canonical path that + # the orchestrator rebuild would pick up. Devin finding 2026-05-01: + # BUG_pr-review-job-3fbd31c9_0003. with keboola_access.duckdb_session() as conn: - # Run the admin SELECT and copy the result to parquet. - # The COPY wrapper is identical to the existing legacy extract - # path at extractor.py:209; the only difference is the SELECT is - # admin-supplied rather than `SELECT * FROM kbc.bucket.table`. - conn.execute(f"COPY ({sql}) TO '{safe_pq_lit}' (FORMAT PARQUET)") + try: + conn.execute(f"COPY ({sql}) TO '{safe_tmp_lit}' (FORMAT PARQUET)") + row_count = conn.execute( + f"SELECT COUNT(*) FROM read_parquet('{safe_tmp_lit}')" + ).fetchone()[0] + except Exception: + if tmp_path.exists(): + tmp_path.unlink() + raise - # Read back row count. - row_count = conn.execute( - f"SELECT COUNT(*) FROM read_parquet('{safe_pq_lit}')" - ).fetchone()[0] + # Streaming MD5 — never read the entire parquet into memory. Keboola + # materialized results can reach multi-GB sizes (admin-aggregated + # subsets); hashing in 8 KiB chunks keeps memory bounded. Mirror of BQ's + # streaming hash at connectors/bigquery/extractor.py:438. Devin finding + # 2026-05-01: BUG_pr-review-job-3fbd31c9_0002. + h = hashlib.md5() + with open(tmp_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + md5 = h.hexdigest() + size = tmp_path.stat().st_size - file_bytes = parquet_path.read_bytes() - md5 = hashlib.md5(file_bytes).hexdigest() - size = len(file_bytes) + os.replace(tmp_path, parquet_path) if row_count == 0: logger.warning( diff --git a/tests/test_keboola_materialize.py b/tests/test_keboola_materialize.py index 216c638..3412929 100644 --- a/tests/test_keboola_materialize.py +++ b/tests/test_keboola_materialize.py @@ -93,3 +93,112 @@ def test_materialize_query_rejects_unsafe_table_id(tmp_path): keboola_access=FakeAccess(), output_dir=output_dir, ) + + +def test_keboola_materialize_atomic_write_on_failure(tmp_path, monkeypatch): + """Devin finding 2026-05-01 (BUG_pr-review-job-3fbd31c9_0003): + if the COPY raises mid-stream, no partial file is left at the final + .parquet path AND the .parquet.tmp staging file is cleaned up. Pre-fix, + materialize_query wrote directly to the final path, so a network/disk + error mid-COPY would leave a corrupt parquet that the orchestrator + rebuild could pick up and serve to analysts.""" + from connectors.keboola import extractor as kbe + + output_dir = tmp_path / "data" + output_dir.mkdir() + + class FakeAccess: + def duckdb_session(self): + from contextlib import contextmanager + + class FailingConn: + def execute(self, sql, *a, **kw): + if "COPY" in sql: + raise RuntimeError("simulated mid-COPY failure") + raise AssertionError("unexpected execute: " + sql) + + def close(self): + pass + + @contextmanager + def _cm(): + yield FailingConn() + return _cm() + + with pytest.raises(RuntimeError, match="simulated mid-COPY failure"): + kbe.materialize_query( + table_id="atomic_test", + sql="SELECT 1", + keboola_access=FakeAccess(), + output_dir=output_dir, + ) + + # Final parquet must NOT exist (we never reached os.replace). + final_path = output_dir / "atomic_test.parquet" + assert not final_path.exists(), ( + f"Partial parquet left at final path {final_path} — orchestrator " + f"rebuild would pick this up and serve corrupt data." + ) + # tmp file also cleaned up (the extractor unlinks it on COPY failure). + tmp_path_marker = output_dir / "atomic_test.parquet.tmp" + assert not tmp_path_marker.exists(), ( + f"Stale .parquet.tmp left at {tmp_path_marker}" + ) + + +def test_keboola_materialize_uses_tmp_path_during_copy(tmp_path): + """Atomic-write contract: COPY targets .parquet.tmp first (verifiable + via the SQL string passed to conn.execute). After success, the file lands + at .parquet (no .tmp suffix). This documents the contract that + BUG_pr-review-job-3fbd31c9_0003 closed.""" + import duckdb + from connectors.keboola import extractor as kbe + + real_conn = duckdb.connect(":memory:") + real_conn.execute("CREATE TABLE t AS SELECT 1 AS x, 'hello' AS y") + + sqls_seen = [] + + class TracingConn: + """Thin wrapper that records SQL strings. DuckDBPyConnection.execute + is read-only, so monkey-patching the method directly fails.""" + + def __init__(self, inner): + self._inner = inner + + def execute(self, sql, *args, **kwargs): + sqls_seen.append(sql) + return self._inner.execute(sql, *args, **kwargs) + + def close(self): + self._inner.close() + + class FakeAccess: + def duckdb_session(self): + from contextlib import contextmanager + + @contextmanager + def _cm(): + yield TracingConn(real_conn) + return _cm() + + output_dir = tmp_path / "data" + output_dir.mkdir() + + result = kbe.materialize_query( + table_id="tmp_path_test", + sql="SELECT * FROM t", + keboola_access=FakeAccess(), + output_dir=output_dir, + ) + + # COPY SQL targeted .parquet.tmp. + copy_sql = next((s for s in sqls_seen if "COPY" in s), None) + assert copy_sql is not None, sqls_seen + assert ".parquet.tmp" in copy_sql, copy_sql + + # Final file landed without .tmp suffix. + assert (output_dir / "tmp_path_test.parquet").exists() + assert not (output_dir / "tmp_path_test.parquet.tmp").exists() + assert result["path"].endswith(".parquet") + assert not result["path"].endswith(".tmp") diff --git a/tests/test_sync_trigger_keboola_materialized.py b/tests/test_sync_trigger_keboola_materialized.py index 83a0b5f..e2a2d3d 100644 --- a/tests/test_sync_trigger_keboola_materialized.py +++ b/tests/test_sync_trigger_keboola_materialized.py @@ -63,7 +63,7 @@ def test_run_materialized_pass_dispatches_keboola_to_keboola_extractor( # Patch get_value to return the keboola URL/token_env. def _fake_get_value(*keys, default=None): path = keys - if path == ("data_source", "keboola", "url"): + if path == ("data_source", "keboola", "stack_url"): return "https://connection.keboola.com/" if path == ("data_source", "keboola", "token_env"): return "KEBOOLA_STORAGE_TOKEN" @@ -130,3 +130,85 @@ def test_run_materialized_pass_dispatches_bigquery_to_bq_extractor( assert bq_called.called assert not kb_called.called assert "events_summary" in summary["materialized"] + + +def test_run_sync_runs_materialized_pass_on_keboola_only_instance( + system_db, tmp_path, monkeypatch +): + """Devin finding 2026-05-01 (BUG_pr-review-job-3fbd31c9_0001): + on a Keboola-only instance (no data_source.bigquery.project), the + materialized pass must still run so Keboola materialized rows get + processed. Pre-fix, _run_sync gated the entire pass behind + `if bq_project:` and silently skipped Keboola materialized.""" + from app.api import sync as sync_mod + + # Register a Keboola materialized row. + repo = TableRegistryRepository(system_db) + repo.register( + id="kb_aggregated", + name="kb_aggregated", + source_type="keboola", + query_mode="materialized", + source_query="SELECT 1 AS x", + registered_by="admin@test", + ) + + # Stub the Keboola materialize entry — verifies dispatch reached it. + kb_called = MagicMock(return_value={ + "table_id": "kb_aggregated", + "path": str(tmp_path / "kb_aggregated.parquet"), + "rows": 1, + "bytes": 100, + "md5": "abc", + }) + + # Pretend we're on Keboola-only — empty BQ project. The sentinel + # BqAccess will be constructed but never invoked because no BQ row + # is in registry. Patch get_value to mirror Keboola-only config. + def _fake_get_value(*keys, default=None): + if keys == ("data_source", "bigquery", "project"): + return "" # KEY: no BQ project configured + if keys == ("data_source", "keboola", "stack_url"): + return "https://connection.keboola.com/" + if keys == ("data_source", "keboola", "token_env"): + return "KEBOOLA_STORAGE_TOKEN" + if keys == ("data_source", "bigquery", "max_bytes_per_materialize"): + return 0 + return default + + monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "fake-token") + monkeypatch.setattr("app.instance_config.get_value", _fake_get_value) + + # Pre-create the parquet file so the post-materialize hash bookkeeping + # in _run_materialized_pass doesn't ENOENT. + parquet_dir = Path(tmp_path) / "data" / "extracts" / "keboola" / "data" + parquet_dir.mkdir(parents=True, exist_ok=True) + (parquet_dir / "kb_aggregated.parquet").write_bytes( + b"PAR1" + b"\x00" * 16 + b"PAR1" + ) + + with patch("connectors.keboola.extractor.materialize_query", kb_called): + # Use the public entry point _run_materialized_pass with a + # sentinel bq (None or a BqAccess that errors on .client()). + # The Keboola dispatch branch never touches `bq`, so even None works. + # We construct a minimal BqAccess so the BQ branch (if any row went + # through it) would surface a typed error per-row. + @contextmanager + def _err_session(_p): + raise RuntimeError("BQ not configured — should not be called for Keboola-only") + yield # unreachable + + sentinel_bq = BqAccess( + BqProjects(billing="", data=""), + client_factory=lambda _p: (_ for _ in ()).throw(RuntimeError("not configured")), + duckdb_session_factory=_err_session, + ) + + summary = sync_mod._run_materialized_pass(system_db, sentinel_bq) + + # Critical assertion: Keboola materialize was actually invoked. + assert kb_called.called, ( + "Keboola materialize_query was not called on Keboola-only instance — " + "the bq_project gate in _run_sync would have skipped this entirely." + ) + assert "kb_aggregated" in summary["materialized"]