fix(materialized): address 4 Devin Review findings on PR #152
Devin Review on commit 7052a235 flagged 4 real bugs in the Keboola
materialized path. All four are fixed; 3 new regression tests pin the
behavior so future refactors can't quietly regress.
BUG_pr-review-job-3fbd31c9_0001 — _run_materialized_pass gated behind 'if bq_project:'
app/api/sync.py:444-466 wrapped the entire materialized pass (which
dispatches BOTH BigQuery AND Keboola rows by source_type) in a check
for data_source.bigquery.project being non-empty. On Keboola-only
instances this short-circuited and Keboola materialized rows sat in
table_registry forever without their SQL being evaluated — the feature
CHANGELOG advertised was dead code on the most common deployment shape.
Fix: always run the materialized pass; the BQ branch's per-row try/except
catches the typed BqAccessError(not_configured) the sentinel raises
when no BQ project is set, so non-BQ instances incur a per-row error
for any (hypothetical) BQ-tagged row but the Keboola path runs cleanly.
Log line renamed 'Materialized BQ' → 'Materialized SQL' to match.
BUG_pr-review-job-3fbd31c9_0004 — wrong config key 'url' instead of 'stack_url'
app/api/sync.py:149 read get_value('data_source', 'keboola', 'url'),
but the canonical config key documented in instance.yaml.example:111
and used by app/api/admin.py:1503 + 2359 is 'stack_url'. Production
Keboola instances would always see an empty URL and fail with the
'not configured' error. The pre-existing test patched the wrong key
too, so it passed without catching the mismatch. Fix: use stack_url
in both sync.py and the test fixture.
BUG_pr-review-job-3fbd31c9_0003 — no atomic write in Keboola materialize_query
connectors/keboola/extractor.py wrote COPY directly to the final
'<id>.parquet' path. A mid-COPY failure (network, disk full, extension
crash) left a partial parquet that the orchestrator rebuild would
later pick up and serve to analysts. BQ's materialize_query already
uses a '<id>.parquet.tmp' staging path + os.replace() atomic swap
(connectors/bigquery/extractor.py:370-445); Keboola now mirrors that
pattern with the same try/except cleanup on COPY failure.
BUG_pr-review-job-3fbd31c9_0002 — full file read into memory for MD5
Same file:60-62 used parquet_path.read_bytes() for the MD5 hash.
Multi-GB Keboola materialized results would OOM on memory-constrained
containers. BQ's version uses streaming 8 KiB-chunk hashing
(connectors/bigquery/extractor.py:438-442); Keboola now mirrors it.
Tests:
- test_run_sync_runs_materialized_pass_on_keboola_only_instance —
pins BUG_0001's fix; setting bigquery.project='' must NOT skip
Keboola materialized dispatch
- test_keboola_materialize_atomic_write_on_failure — pins BUG_0003;
a mid-COPY RuntimeError leaves no .parquet AND no .parquet.tmp at
the canonical path
- test_keboola_materialize_uses_tmp_path_during_copy — documents the
atomic-write contract: COPY targets .parquet.tmp, final swap to
.parquet (no .tmp suffix on the result['path'])
- existing test_run_materialized_pass_dispatches_keboola_to_keboola_extractor
fixture updated: stack_url instead of url
Full sweep: 2505 passed, 25 skipped, 0 failed (modulo 8 pre-existing
internal_roles schema-migration failures called out in the task brief).
This commit is contained in:
parent
7052a23552
commit
16938ae7cb
4 changed files with 253 additions and 43 deletions
|
|
@ -146,7 +146,7 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict:
|
|||
if keboola_access is None:
|
||||
from connectors.keboola.access import KeboolaAccess
|
||||
keboola_url = get_value(
|
||||
"data_source", "keboola", "url", default=""
|
||||
"data_source", "keboola", "stack_url", default=""
|
||||
) or ""
|
||||
token_env = get_value(
|
||||
"data_source", "keboola", "token_env",
|
||||
|
|
@ -158,7 +158,7 @@ def _run_materialized_pass(conn: duckdb.DuckDBPyConnection, bq) -> dict:
|
|||
"table": ref_name,
|
||||
"error": (
|
||||
"Keboola URL/token not configured for "
|
||||
"materialized path (data_source.keboola.url "
|
||||
"materialized path (data_source.keboola.stack_url "
|
||||
f"+ env {token_env})"
|
||||
),
|
||||
})
|
||||
|
|
@ -434,27 +434,28 @@ sys.exit(compute_exit_code(result, len(configs)))
|
|||
except subprocess.TimeoutExpired:
|
||||
logger.error("Custom connector %s timed out", connector_dir.name)
|
||||
|
||||
# Materialized BigQuery pass — runs admin-registered SQL through the
|
||||
# DuckDB BQ extension (via BqAccess) and writes parquet for due rows.
|
||||
# The orchestrator rebuild below picks the parquets up via the
|
||||
# standard local-parquet discovery. Wrapped so a misconfigured BQ
|
||||
# facade doesn't kill the Keboola path.
|
||||
# Materialized SQL pass — runs admin-registered SQL through the
|
||||
# source's DuckDB extension (BQ via BqAccess, Keboola via
|
||||
# KeboolaAccess) and writes parquet for due rows. _run_materialized_pass
|
||||
# itself dispatches by source_type, so we always run it regardless of
|
||||
# which (or both) source types have a `project` / `stack_url` set —
|
||||
# Keboola-only instances would otherwise silently skip Keboola
|
||||
# materialized rows just because no BQ project is configured (Devin
|
||||
# finding 2026-05-01: BUG_pr-review-job-3fbd31c9_0001). The BQ
|
||||
# branch inside _run_materialized_pass uses a per-row try/except so
|
||||
# the sentinel BqAccess (not_configured) raises a typed error that
|
||||
# gets recorded against that row only — no cascade.
|
||||
try:
|
||||
from app.instance_config import get_value as _get_value
|
||||
bq_project = _get_value(
|
||||
"data_source", "bigquery", "project", default=""
|
||||
) or ""
|
||||
if bq_project:
|
||||
from connectors.bigquery.access import get_bq_access
|
||||
from src.db import get_system_db as _get_system_db
|
||||
bq_access = get_bq_access()
|
||||
bq_access = get_bq_access() # sentinel if no BQ project; OK
|
||||
mat_conn = _get_system_db()
|
||||
try:
|
||||
mat_summary = _run_materialized_pass(mat_conn, bq_access)
|
||||
finally:
|
||||
mat_conn.close()
|
||||
print(
|
||||
f"[SYNC] Materialized BQ: {len(mat_summary['materialized'])} ok, "
|
||||
f"[SYNC] Materialized SQL: {len(mat_summary['materialized'])} ok, "
|
||||
f"{len(mat_summary['skipped'])} skipped, "
|
||||
f"{len(mat_summary['errors'])} errors",
|
||||
file=_sys.stderr, flush=True,
|
||||
|
|
@ -466,7 +467,7 @@ sys.exit(compute_exit_code(result, len(configs)))
|
|||
)
|
||||
except Exception as e:
|
||||
print(
|
||||
f"[SYNC] Materialized BQ pass FAILED: {e}",
|
||||
f"[SYNC] Materialized SQL pass FAILED: {e}",
|
||||
file=_sys.stderr, flush=True,
|
||||
)
|
||||
traceback.print_exc()
|
||||
|
|
|
|||
|
|
@ -43,23 +43,41 @@ def materialize_query(
|
|||
raise ValueError(f"unsafe table_id for materialize: {table_id!r}")
|
||||
|
||||
parquet_path = Path(output_dir) / f"{table_id}.parquet"
|
||||
safe_pq_lit = str(parquet_path).replace("'", "''")
|
||||
tmp_path = Path(output_dir) / f"{table_id}.parquet.tmp"
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
safe_tmp_lit = str(tmp_path).replace("'", "''")
|
||||
|
||||
# Atomic write — mirror BQ's pattern at connectors/bigquery/extractor.py:370.
|
||||
# COPY into a `.parquet.tmp`, hash + size from the tmp file, only swap to
|
||||
# the final path on success. A mid-COPY failure (network, disk full,
|
||||
# extension crash) leaves no partial parquet at the canonical path that
|
||||
# the orchestrator rebuild would pick up. Devin finding 2026-05-01:
|
||||
# BUG_pr-review-job-3fbd31c9_0003.
|
||||
with keboola_access.duckdb_session() as conn:
|
||||
# Run the admin SELECT and copy the result to parquet.
|
||||
# The COPY wrapper is identical to the existing legacy extract
|
||||
# path at extractor.py:209; the only difference is the SELECT is
|
||||
# admin-supplied rather than `SELECT * FROM kbc.bucket.table`.
|
||||
conn.execute(f"COPY ({sql}) TO '{safe_pq_lit}' (FORMAT PARQUET)")
|
||||
|
||||
# Read back row count.
|
||||
try:
|
||||
conn.execute(f"COPY ({sql}) TO '{safe_tmp_lit}' (FORMAT PARQUET)")
|
||||
row_count = conn.execute(
|
||||
f"SELECT COUNT(*) FROM read_parquet('{safe_pq_lit}')"
|
||||
f"SELECT COUNT(*) FROM read_parquet('{safe_tmp_lit}')"
|
||||
).fetchone()[0]
|
||||
except Exception:
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
raise
|
||||
|
||||
file_bytes = parquet_path.read_bytes()
|
||||
md5 = hashlib.md5(file_bytes).hexdigest()
|
||||
size = len(file_bytes)
|
||||
# Streaming MD5 — never read the entire parquet into memory. Keboola
|
||||
# materialized results can reach multi-GB sizes (admin-aggregated
|
||||
# subsets); hashing in 8 KiB chunks keeps memory bounded. Mirror of BQ's
|
||||
# streaming hash at connectors/bigquery/extractor.py:438. Devin finding
|
||||
# 2026-05-01: BUG_pr-review-job-3fbd31c9_0002.
|
||||
h = hashlib.md5()
|
||||
with open(tmp_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(8192), b""):
|
||||
h.update(chunk)
|
||||
md5 = h.hexdigest()
|
||||
size = tmp_path.stat().st_size
|
||||
|
||||
os.replace(tmp_path, parquet_path)
|
||||
|
||||
if row_count == 0:
|
||||
logger.warning(
|
||||
|
|
|
|||
|
|
@ -93,3 +93,112 @@ def test_materialize_query_rejects_unsafe_table_id(tmp_path):
|
|||
keboola_access=FakeAccess(),
|
||||
output_dir=output_dir,
|
||||
)
|
||||
|
||||
|
||||
def test_keboola_materialize_atomic_write_on_failure(tmp_path, monkeypatch):
|
||||
"""Devin finding 2026-05-01 (BUG_pr-review-job-3fbd31c9_0003):
|
||||
if the COPY raises mid-stream, no partial file is left at the final
|
||||
.parquet path AND the .parquet.tmp staging file is cleaned up. Pre-fix,
|
||||
materialize_query wrote directly to the final path, so a network/disk
|
||||
error mid-COPY would leave a corrupt parquet that the orchestrator
|
||||
rebuild could pick up and serve to analysts."""
|
||||
from connectors.keboola import extractor as kbe
|
||||
|
||||
output_dir = tmp_path / "data"
|
||||
output_dir.mkdir()
|
||||
|
||||
class FakeAccess:
|
||||
def duckdb_session(self):
|
||||
from contextlib import contextmanager
|
||||
|
||||
class FailingConn:
|
||||
def execute(self, sql, *a, **kw):
|
||||
if "COPY" in sql:
|
||||
raise RuntimeError("simulated mid-COPY failure")
|
||||
raise AssertionError("unexpected execute: " + sql)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
def _cm():
|
||||
yield FailingConn()
|
||||
return _cm()
|
||||
|
||||
with pytest.raises(RuntimeError, match="simulated mid-COPY failure"):
|
||||
kbe.materialize_query(
|
||||
table_id="atomic_test",
|
||||
sql="SELECT 1",
|
||||
keboola_access=FakeAccess(),
|
||||
output_dir=output_dir,
|
||||
)
|
||||
|
||||
# Final parquet must NOT exist (we never reached os.replace).
|
||||
final_path = output_dir / "atomic_test.parquet"
|
||||
assert not final_path.exists(), (
|
||||
f"Partial parquet left at final path {final_path} — orchestrator "
|
||||
f"rebuild would pick this up and serve corrupt data."
|
||||
)
|
||||
# tmp file also cleaned up (the extractor unlinks it on COPY failure).
|
||||
tmp_path_marker = output_dir / "atomic_test.parquet.tmp"
|
||||
assert not tmp_path_marker.exists(), (
|
||||
f"Stale .parquet.tmp left at {tmp_path_marker}"
|
||||
)
|
||||
|
||||
|
||||
def test_keboola_materialize_uses_tmp_path_during_copy(tmp_path):
|
||||
"""Atomic-write contract: COPY targets <id>.parquet.tmp first (verifiable
|
||||
via the SQL string passed to conn.execute). After success, the file lands
|
||||
at <id>.parquet (no .tmp suffix). This documents the contract that
|
||||
BUG_pr-review-job-3fbd31c9_0003 closed."""
|
||||
import duckdb
|
||||
from connectors.keboola import extractor as kbe
|
||||
|
||||
real_conn = duckdb.connect(":memory:")
|
||||
real_conn.execute("CREATE TABLE t AS SELECT 1 AS x, 'hello' AS y")
|
||||
|
||||
sqls_seen = []
|
||||
|
||||
class TracingConn:
|
||||
"""Thin wrapper that records SQL strings. DuckDBPyConnection.execute
|
||||
is read-only, so monkey-patching the method directly fails."""
|
||||
|
||||
def __init__(self, inner):
|
||||
self._inner = inner
|
||||
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
sqls_seen.append(sql)
|
||||
return self._inner.execute(sql, *args, **kwargs)
|
||||
|
||||
def close(self):
|
||||
self._inner.close()
|
||||
|
||||
class FakeAccess:
|
||||
def duckdb_session(self):
|
||||
from contextlib import contextmanager
|
||||
|
||||
@contextmanager
|
||||
def _cm():
|
||||
yield TracingConn(real_conn)
|
||||
return _cm()
|
||||
|
||||
output_dir = tmp_path / "data"
|
||||
output_dir.mkdir()
|
||||
|
||||
result = kbe.materialize_query(
|
||||
table_id="tmp_path_test",
|
||||
sql="SELECT * FROM t",
|
||||
keboola_access=FakeAccess(),
|
||||
output_dir=output_dir,
|
||||
)
|
||||
|
||||
# COPY SQL targeted .parquet.tmp.
|
||||
copy_sql = next((s for s in sqls_seen if "COPY" in s), None)
|
||||
assert copy_sql is not None, sqls_seen
|
||||
assert ".parquet.tmp" in copy_sql, copy_sql
|
||||
|
||||
# Final file landed without .tmp suffix.
|
||||
assert (output_dir / "tmp_path_test.parquet").exists()
|
||||
assert not (output_dir / "tmp_path_test.parquet.tmp").exists()
|
||||
assert result["path"].endswith(".parquet")
|
||||
assert not result["path"].endswith(".tmp")
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ def test_run_materialized_pass_dispatches_keboola_to_keboola_extractor(
|
|||
# Patch get_value to return the keboola URL/token_env.
|
||||
def _fake_get_value(*keys, default=None):
|
||||
path = keys
|
||||
if path == ("data_source", "keboola", "url"):
|
||||
if path == ("data_source", "keboola", "stack_url"):
|
||||
return "https://connection.keboola.com/"
|
||||
if path == ("data_source", "keboola", "token_env"):
|
||||
return "KEBOOLA_STORAGE_TOKEN"
|
||||
|
|
@ -130,3 +130,85 @@ def test_run_materialized_pass_dispatches_bigquery_to_bq_extractor(
|
|||
assert bq_called.called
|
||||
assert not kb_called.called
|
||||
assert "events_summary" in summary["materialized"]
|
||||
|
||||
|
||||
def test_run_sync_runs_materialized_pass_on_keboola_only_instance(
|
||||
system_db, tmp_path, monkeypatch
|
||||
):
|
||||
"""Devin finding 2026-05-01 (BUG_pr-review-job-3fbd31c9_0001):
|
||||
on a Keboola-only instance (no data_source.bigquery.project), the
|
||||
materialized pass must still run so Keboola materialized rows get
|
||||
processed. Pre-fix, _run_sync gated the entire pass behind
|
||||
`if bq_project:` and silently skipped Keboola materialized."""
|
||||
from app.api import sync as sync_mod
|
||||
|
||||
# Register a Keboola materialized row.
|
||||
repo = TableRegistryRepository(system_db)
|
||||
repo.register(
|
||||
id="kb_aggregated",
|
||||
name="kb_aggregated",
|
||||
source_type="keboola",
|
||||
query_mode="materialized",
|
||||
source_query="SELECT 1 AS x",
|
||||
registered_by="admin@test",
|
||||
)
|
||||
|
||||
# Stub the Keboola materialize entry — verifies dispatch reached it.
|
||||
kb_called = MagicMock(return_value={
|
||||
"table_id": "kb_aggregated",
|
||||
"path": str(tmp_path / "kb_aggregated.parquet"),
|
||||
"rows": 1,
|
||||
"bytes": 100,
|
||||
"md5": "abc",
|
||||
})
|
||||
|
||||
# Pretend we're on Keboola-only — empty BQ project. The sentinel
|
||||
# BqAccess will be constructed but never invoked because no BQ row
|
||||
# is in registry. Patch get_value to mirror Keboola-only config.
|
||||
def _fake_get_value(*keys, default=None):
|
||||
if keys == ("data_source", "bigquery", "project"):
|
||||
return "" # KEY: no BQ project configured
|
||||
if keys == ("data_source", "keboola", "stack_url"):
|
||||
return "https://connection.keboola.com/"
|
||||
if keys == ("data_source", "keboola", "token_env"):
|
||||
return "KEBOOLA_STORAGE_TOKEN"
|
||||
if keys == ("data_source", "bigquery", "max_bytes_per_materialize"):
|
||||
return 0
|
||||
return default
|
||||
|
||||
monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "fake-token")
|
||||
monkeypatch.setattr("app.instance_config.get_value", _fake_get_value)
|
||||
|
||||
# Pre-create the parquet file so the post-materialize hash bookkeeping
|
||||
# in _run_materialized_pass doesn't ENOENT.
|
||||
parquet_dir = Path(tmp_path) / "data" / "extracts" / "keboola" / "data"
|
||||
parquet_dir.mkdir(parents=True, exist_ok=True)
|
||||
(parquet_dir / "kb_aggregated.parquet").write_bytes(
|
||||
b"PAR1" + b"\x00" * 16 + b"PAR1"
|
||||
)
|
||||
|
||||
with patch("connectors.keboola.extractor.materialize_query", kb_called):
|
||||
# Use the public entry point _run_materialized_pass with a
|
||||
# sentinel bq (None or a BqAccess that errors on .client()).
|
||||
# The Keboola dispatch branch never touches `bq`, so even None works.
|
||||
# We construct a minimal BqAccess so the BQ branch (if any row went
|
||||
# through it) would surface a typed error per-row.
|
||||
@contextmanager
|
||||
def _err_session(_p):
|
||||
raise RuntimeError("BQ not configured — should not be called for Keboola-only")
|
||||
yield # unreachable
|
||||
|
||||
sentinel_bq = BqAccess(
|
||||
BqProjects(billing="", data=""),
|
||||
client_factory=lambda _p: (_ for _ in ()).throw(RuntimeError("not configured")),
|
||||
duckdb_session_factory=_err_session,
|
||||
)
|
||||
|
||||
summary = sync_mod._run_materialized_pass(system_db, sentinel_bq)
|
||||
|
||||
# Critical assertion: Keboola materialize was actually invoked.
|
||||
assert kb_called.called, (
|
||||
"Keboola materialize_query was not called on Keboola-only instance — "
|
||||
"the bq_project gate in _run_sync would have skipped this entirely."
|
||||
)
|
||||
assert "kb_aggregated" in summary["materialized"]
|
||||
|
|
|
|||
Loading…
Reference in a new issue