* fix: cutover regressions + parallel Keboola legacy fallback
Bundled fixes from a fresh-deploy run on a Keboola Storage backend with
the block-shared-snowflake-access feature flag — DuckDB Keboola
extension's per-table scan can't access bucket schemas, so the legacy
kbcstorage Storage-API client is the only working path.
CUTOVER REGRESSIONS
- agnes pull hash mismatch on every Keboola local-mode table —
src/orchestrator.py:_update_sync_state stored md5(mtime+size)[:12]
while the CLI compares against full 32-char content MD5. Now stores
the same content MD5 the materialized SQL path already used.
- Trailing-slash sanitization in connectors/keboola/access.py and
extractor.py — DuckDB Keboola extension's ATTACH fails when the URL
ends in / (canonical form).
- src/profiler.py:TableInfo.description becomes optional — two call
sites instantiated without it, crashing the profiler pass.
- scripts/ops/agnes-auto-upgrade.sh: chown on UID change — older images
ran as root, current runs as agnes (uid 999). Reads target uid:gid
from /etc/passwd inside the new image and chowns ${STATE_DIR},
/data/extracts, /data/analytics when the digest moves.
- POST /api/sync/trigger is now singleton per process — two
near-simultaneous trigger calls each forked an extractor subprocess,
fought for extract.duckdb's file lock, starved uvicorn, flipped the
container to unhealthy. Trigger now returns 409
(sync_already_in_progress) when held; _run_sync acquires non-blocking.
PARALLEL LEGACY FALLBACK
- Process pool fan-out for the _extract_via_legacy queue (default 8
workers, override via AGNES_KEBOOLA_PARALLELISM). Process pool, not
thread pool, because connectors/keboola/client.py:export_table does
os.chdir(temp_dir) — process-global, so threads raced and slice files
landed in the wrong directory ("[Errno 2] No such file or directory:
'<job_id>.csv_X_Y_Z.csv'").
- Extractor subprocess timeout 1800s -> 3600s (configurable via
AGNES_EXTRACTOR_TIMEOUT_SEC). 28+ tables × multi-minute Keboola export
jobs need the headroom on telemetry-class projects.
- Process group cleanup on timeout — Popen(start_new_session=True) puts
the extractor in its own group. On timeout the parent SIGTERMs the
group (10s grace) then SIGKILLs stragglers. Without this, the pool
workers were reparented to PID 1 and continued holding open Keboola
Storage export jobs. Inline extractor script also installs a SIGTERM
-> sys.exit(143) handler so the with ProcessPoolExecutor(...) block
__exit__ runs cleanly.
Tests: existing tests that patched subprocess.run updated to patch
subprocess.Popen with a _FakePopen stand-in (same exit-code-injection
contract). Two tests that exercised the parallel path forced
AGNES_KEBOOLA_PARALLELISM=1 to keep mocks alive (mocks don't ride into
ProcessPoolExecutor subprocesses).
Squashed onto current main (was 7 commits + multi-commit CHANGELOG +
agnes-auto-upgrade.sh conflicts; squash avoids per-commit conflict
resolution against main's flat-mount STATE_DIR refactor and 0.38.0
release cut).
* feat(keboola): Storage API direct extract path; drop extension data path
The DuckDB Keboola extension's COPY routes through Keboola QueryService,
which is unreliable on linked-bucket projects (extension v0.1.6 fixes
that case but isn't yet in the community CDN, and pre-fix any project
with the block-shared-snowflake-access feature flag couldn't see bucket
schemas at all). Move the extract path off the extension entirely and
talk to the Storage API directly via signed-URL download — works on any
project, regardless of extension state.
connectors/keboola/storage_api.py (NEW)
Lightweight client built on requests.Session. Three endpoints:
- POST /v2/storage/tables/{id}/export-async (kicks off job)
- GET /v2/storage/jobs/{id} (poll until done)
- GET /v2/storage/files/{id}?federationToken=1 (signed URL detail)
- GET <signed_url> (download bytes)
Supports sliced exports (manifest + per-slice signed URLs) and gzipped
payloads. ExportFilter dataclass mirrors the Keboola filter spec
(whereFilters / columns / changedSince / limit) and handles JSON
round-trip with the registry's source_query column. Token redaction
in error messages. Bounded exponential backoff on job polling.
No cloud-SDK dependency on the data path; thread-safe.
connectors/keboola/extractor.py
- materialize_query() rewritten: takes bucket/source_table/source_query
(JSON filter spec), exports via KeboolaStorageClient, converts CSV
to parquet via DuckDB, atomic os.replace. Same return shape so
sync.py downstream code stays uniform with the BQ branch.
- _extract_via_legacy() also moved to Storage API direct (kept the
name for caller compatibility with _legacy_worker / the parallel
batch extractor). Per-call temp directories — no os.chdir, threads
don't race.
app/api/sync.py
_run_materialized_pass for source_type='keboola' rows now constructs a
KeboolaStorageClient (replaces KeboolaAccess) and passes
bucket/source_table/source_query to materialize_query. Reuses one
client across rows for HTTP keep-alive. Sources keboola URL from env
too (KEBOOLA_STACK_URL) when instance.yaml doesn't have stack_url
configured.
cli/commands/admin.py
discover-and-register defaults Keboola rows to query_mode='materialized'
(NULL source_query = full table), matching the v26 migration's
unification of the local/materialized split for Keboola. BigQuery and
Jira keep their per-source defaults.
src/db.py
Schema bump 25 → 26. Migration: UPDATE table_registry SET
query_mode='materialized' WHERE source_type='keboola' AND
query_mode='local'. NULL source_query on those rows means "full table
export" — same effective behavior the local mode provided, but now
via Storage API instead of the extension.
pyproject.toml
kbcstorage dep stays (admin-side bucket/table list still uses the
SDK in app/api/admin.py / connectors/keboola/client.py); only the
data path is migrated off the SDK. Comment updated to reflect the
new boundary.
tests
- test_keboola_storage_api.py (NEW, 19 tests): ExportFilter parsing,
HTTP client (token redaction, retry logic, polling), download_file
(single, gzipped, sliced), end-to-end export_table_to_csv.
- test_keboola_materialize.py rewritten: mocks KeboolaStorageClient
instead of FakeAccess; same atomic-write + zero-rows + unsafe-id
contracts.
- test_sync_trigger_keboola_materialized.py: registry rows now carry
bucket+source_table+JSON-shape source_query.
114+ Keboola-impacted tests green locally.
* test: schema version assertion bumped to 26 alongside the keboola query_mode migration
* fix(keboola): cutover hot-patches surfaced on agnes-dev
Five small fixes that were applied as in-container hot-patches during
agnes-dev cutover and need to be on the source-of-truth image so a fresh
upgrade does not undo them.
- app/api/sync.py: auto-discover gate considers the WHOLE registry (any
source, any mode), not just rows where source matches and query_mode
is local. After the v25→v26 keboola materialized migration an
instance can have 30 materialized rows and zero local rows; the
previous gate kept re-firing _discover_and_register_tables every
scheduler tick, creating duplicate auto-discovered rows with the
wrong bucket prefix every time.
- app/api/admin.py: _discover_and_register_tables reassembles the
bucket as <stage>.<bucket-id> (e.g. in.c-finance) instead of
dropping the stage prefix; default query_mode for keboola is now
materialized (the v26 contract); validator allows NULL source_query
for keboola materialized rows (full-table export via Storage API
export-async, no SQL needed).
- cli/commands/admin.py: register-table mirrors the server validator
(NULL source_query allowed for source_type=keboola); --bucket help
text generalized to cover both BQ dataset and Keboola bucket id.
- connectors/keboola/extractor.py: max_line_size=64 MiB on
read_csv_auto so embedded JSON / SQL cells (kbc_component_configuration
in particular) do not trip the default 2 MiB ceiling.
- connectors/keboola/storage_api.py: GCP backend support — when the
Storage API returns a manifest whose slice URLs are gs://
references with a gcsCredentials block, rewrite to the JSON REST
download endpoint and authenticate with the issued OAuth bearer
token; redact tokens in any surfaced error string.
* test: align with new keboola materialized + auto-discover-gate contracts
- test_admin_keboola_materialized: rename
test_register_keboola_materialized_rejects_missing_source_query →
test_register_keboola_materialized_accepts_missing_source_query.
v25→v26 introduced 'keboola materialized with NULL source_query
means full-table export via Storage API export-async' as the
default registration shape; the rejection case is no longer the
contract.
- test_sync_filter: add list_all() to _StubRegistry. The auto-discover
gate in _run_sync now keys off the WHOLE registry (not just local
rows) so materialized-only Keboola instances do not re-trigger
discovery on every tick.
* feat(keboola): native parquet export — skip CSV roundtrip
Storage API export-async accepts fileType={csv,parquet}. Switching the
materialized sync to parquet eliminates the CSV → DuckDB COPY → parquet
roundtrip that pinned a single uvicorn worker over 4 GiB on multi-GB
tables (read_csv with all_varchar + max_line_size=64MB has to
materialize the whole CSV in memory before COPY can stream out a
parquet). Snowflake UNLOAD on Keboola's side already produces typed,
self-contained parquet files; the extractor downloads them and renames
into place.
Two cases:
- **Single-file** export (small table): file_info.url points at one
signed URL; download_file streams chunks straight to .parquet.tmp
and we're done. No DuckDB.
- **Sliced** export (Snowflake UNLOAD respects MAX_FILE_SIZE — 16 MiB
default — so anything larger arrives as N parquet slices): each
slice is a complete parquet file with its own footer; naive concat
would corrupt them. download_file_slices keeps the slices as
separate files in a tempdir, then DuckDB COPY (SELECT * FROM
read_parquet([slice0, slice1, ...])) merges them into one
consolidated parquet. DuckDB streams row groups during this — peak
memory bounded to one row group (~1 MiB) regardless of source size.
The legacy CSV path stays as the explicit opt-in via source_query=
'{"file_type":"csv"}' for projects whose backend can't UNLOAD
parquet (none known today; cheap escape hatch). Backward-compat alias
KeboolaStorageClient.export_table_to_csv kept.
Also fixes a latent bug in download_file's gzip detection: previous
heuristic flagged any unencrypted file as gzipped, which would have
corrupted parquet downloads at gunzip time. Name-suffix-only now.
* fix: tempdir leak cleanup, every 0m schedule, /sync/trigger body shapes
Three small self-contained fixes uncovered during agnes-dev cutover.
- connectors/keboola/extractor.py: tempfile.TemporaryDirectory now uses
ignore_cleanup_errors=True so a worker death mid-write doesn't leave
multi-GiB stale slice trees on the boot disk. (12 GiB seen after a
disk-full crash where TemporaryDirectory's own cleanup also raised
and got swallowed.)
- src/scheduler.py: is_valid_schedule accepts 'every 0m' (interval=0
= always due). Force-resync of an errored row no longer requires
waiting out the default 'every 1h' interval — admin can flip the
schedule, trigger, then flip back.
- app/api/sync.py: POST /api/sync/trigger accepts both ['table_id']
(legacy bare-array body) and {'tables': ['table_id']} (matches the
response payload shape, more discoverable for clients building
requests by hand). Malformed bodies return 422 with a structured
detail; null/missing means 'sync everything' as before.
Tests cover: tempdir cleanup on raise (sliced parquet path),
is_valid_schedule + is_table_due 'every 0m' acceptance, and trigger
body parametrized matrix (8 valid shapes + 6 rejection cases).
* fix: targeted-trigger filter in materialized pass + auto-upgrade defer
Two operational gaps observed during agnes-dev cutover, in the same
sync-routing area.
- _run_materialized_pass now takes a 'tables' arg and skips rows not in
the target set with reason='not_in_target'. POST /api/sync/trigger
with a body of tables previously only scoped the legacy extractor
subprocess — the materialized pass kept iterating every due
materialized row, so an admin asking to re-sync kbc_job re-ran
every other due materialized row alongside it. Match on registry id
OR name (admins commonly pass either form). tables=None preserves
the no-filter behavior.
- New GET /api/sync/status (public, no auth) returns {locked: bool}
off _sync_lock.locked(). agnes-auto-upgrade.sh probes this before
docker compose up -d and exits 0 with a 'deferred recreate' log
line if a sync is in flight — the next 5-min cron tick retries.
Pre-fix, an auto-upgrade triggered mid-sync would recreate the
uvicorn worker and kill the in-flight extractor / Snowflake-UNLOAD
download (observed when kbc_job's first 7-day retry got SIGKILLed).
Connection failures in the probe fall through to the upgrade —
being stuck on a wedged image is worse than interrupting a
hypothetical sync.
* fix: auto-discover protects admin overrides + surfaces drift
Two real-world incidents on agnes-dev drove this:
1. kbc_job was registered manually with the correct
(in.c-kbc_telemetry, kbc_job) coordinates. A naive auto-discover
re-run would have inserted a SECOND kbc_job row at the slugified
id 'in_c-keboola-storage_kbc_job' (where Keboola's discovery
places it) — and that row's Storage API export-async 404s.
2. An earlier auto-discover bug stripped the stage prefix from
bucket ids ('c-finance' instead of 'in.c-finance'), inserting
137 rows whose syncs all failed.
Fix:
- _discover_and_register_tables now builds a plan first
(_build_keboola_discovery_plan) classifying each discovered table
into one of new / existing_match / existing_drift / invalid, then
executes only the 'new' bucket. Drift rows are reported with both
sides of the disagreement plus drift_kind:
- same_id_diff_coords: registry has the same id but different
bucket / source_table (admin migrated coords inline).
- name_collision: discovery's slugified id differs from any
registry id, but the discovered .name matches an existing row's
.name (case-insensitive). Catches the kbc_job case.
- Bucket detection now prefers the API's authoritative bucket_id
field (separate field on the Keboola tables.list response,
normalised by KeboolaClient.discover_all_tables). Falls back to
id-string parsing only when bucket_id is missing (older fallback
path inside discover_all_tables).
- Endpoint POST /api/admin/discover-and-register?dry_run=true
returns the plan without writing — would_register, drift,
invalid lists. Lets an operator audit before merging discovery
with a registry that has admin overrides.
Removed 'every 0m' from test_register_request_rejects_malformed_sync_schedule
— the runtime started accepting it in the previous commit (force-resync
override) and the validator follows suit.
* feat(keboola): AGNES_TEMP_DIR routes tempfiles off overlayfs /tmp
The container's /tmp lives on the boot disk's overlayfs (29 GiB on
agnes-dev, shared with /var). Snowflake UNLOAD of a wide table writes
slices into per-call /tmp tempdirs that fill multi-GiB / many-slice
exports long before the dedicated data disk fills. agnes-dev hit
100% boot-disk while the 20 GiB data disk had 15 GiB free.
connectors.keboola.storage_api.get_temp_root() reads AGNES_TEMP_DIR;
mkdirs the target on first use; unset / empty / unwritable falls
back to None (system tempdir, OSS-pre-fix behaviour). Both
materialize_query (parquet path) and _extract_via_legacy (CSV
fallback) and the sliced-CSV concat path in storage_api use the
helper now.
docker-compose.yml defaults AGNES_TEMP_DIR=/data/tmp on app, scheduler,
and extract services. The data volume is the dedicated disk in
production layouts and a plain docker volume in single-disk
dev/laptop setups — same blast radius as the previous /tmp default
on the latter, no regression.
498 lines
19 KiB
Python
498 lines
19 KiB
Python
"""Tests for the Keboola materialize_query path.
|
|
|
|
Surface contract: takes ``bucket`` + ``source_table`` (+ optional
|
|
``source_query`` JSON filter spec), exports via Storage API, writes a
|
|
parquet, returns the same {table_id, path, rows, bytes, md5} shape the
|
|
BQ branch returns. We mock `KeboolaStorageClient` so tests don't hit
|
|
the network — the real Storage API client is exercised in
|
|
tests/test_keboola_storage_api.py.
|
|
|
|
The default code path is now **parquet** (Storage API serves Snowflake
|
|
UNLOAD output directly; the extractor renames into place — no CSV
|
|
intermediate, no DuckDB COPY of full file). Tests cover both the
|
|
default parquet path and the legacy CSV opt-in (via
|
|
``source_query='{"file_type":"csv"}'``).
|
|
"""
|
|
import hashlib
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import duckdb
|
|
import pytest
|
|
|
|
from connectors.keboola import extractor as kbe
|
|
|
|
|
|
def _write_parquet(dest: Path, n_rows: int = 2) -> None:
|
|
"""Drop a tiny real parquet at ``dest`` so the materialize path can
|
|
read it back to compute row_count + MD5 — same shape Snowflake
|
|
UNLOAD would produce."""
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
safe = str(dest).replace("'", "''")
|
|
conn = duckdb.connect()
|
|
try:
|
|
conn.execute(
|
|
f"COPY (SELECT * FROM (VALUES {','.join('(' + str(i) + ')' for i in range(n_rows))}) AS t(id)) "
|
|
f"TO '{safe}' (FORMAT PARQUET)"
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _seed_csv(dest: Path, header: str, rows: list[str]) -> None:
|
|
"""Write a tiny CSV the legacy CSV materialize path will convert to parquet."""
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
dest.write_text("\n".join([header, *rows]) + "\n", encoding="utf-8")
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_storage_client_parquet():
|
|
"""Mock for the **default** parquet path. ``prepare_export`` returns a
|
|
file_info marking a single (non-sliced) file. ``download_file``
|
|
writes a real 2-row parquet at the requested dest."""
|
|
def fake_prepare(table_id, *, export_filter=None, export_timeout=None):
|
|
return {
|
|
"job_id": 100,
|
|
"file_id": 200,
|
|
"rows": 2,
|
|
"file_info": {"id": 200, "url": "https://fake/x", "isSliced": False},
|
|
"file_type": "parquet",
|
|
}
|
|
|
|
def fake_download(file_info, dest_path):
|
|
_write_parquet(Path(dest_path), n_rows=2)
|
|
return Path(dest_path)
|
|
|
|
client = MagicMock()
|
|
client.prepare_export.side_effect = fake_prepare
|
|
client.download_file.side_effect = fake_download
|
|
return client
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_storage_client_csv():
|
|
"""Mock for the legacy CSV opt-in path. ``export_table`` writes a
|
|
small CSV at dest. Used for tests that pin
|
|
``source_query='{"file_type":"csv"}'``."""
|
|
def fake_export(table_id, dest, *, export_filter=None, export_timeout=None):
|
|
_seed_csv(Path(dest), "id,name", ["1,alpha", "2,beta"])
|
|
return {"job_id": 100, "file_id": 200, "rows": 2,
|
|
"bytes": Path(dest).stat().st_size, "file_type": "csv"}
|
|
|
|
client = MagicMock()
|
|
client.export_table.side_effect = fake_export
|
|
return client
|
|
|
|
|
|
# ---- default parquet path --------------------------------------------------
|
|
|
|
def test_materialize_query_writes_parquet_and_returns_metadata(
|
|
tmp_path, fake_storage_client_parquet
|
|
):
|
|
"""Default path: no source_query → file_type=parquet, single file."""
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
result = kbe.materialize_query(
|
|
table_id="example_subset",
|
|
bucket="in.c-sales",
|
|
source_table="orders",
|
|
source_query=None,
|
|
storage_client=fake_storage_client_parquet,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
parquet_path = output_dir / "example_subset.parquet"
|
|
assert parquet_path.exists()
|
|
assert result["table_id"] == "example_subset"
|
|
assert result["path"] == str(parquet_path)
|
|
assert result["rows"] == 2
|
|
assert result["bytes"] > 0
|
|
expected_md5 = hashlib.md5(parquet_path.read_bytes()).hexdigest()
|
|
assert result["md5"] == expected_md5
|
|
|
|
# Default file_type should be parquet — verify by inspecting the
|
|
# ExportFilter passed to prepare_export.
|
|
call_args = fake_storage_client_parquet.prepare_export.call_args
|
|
assert call_args.args[0] == "in.c-sales.orders"
|
|
assert call_args.kwargs["export_filter"].file_type == "parquet"
|
|
|
|
|
|
def test_materialize_query_parquet_sliced_merges_via_duckdb(tmp_path):
|
|
"""Sliced parquet output: each slice is itself a complete parquet file
|
|
(Snowflake UNLOAD MAX_FILE_SIZE behavior). The extractor must use
|
|
``download_file_slices`` to keep them as separate files, then
|
|
DuckDB-COPY across ``read_parquet([slice1, slice2])`` to merge —
|
|
naive concat would corrupt the per-slice footer."""
|
|
def fake_prepare(table_id, *, export_filter=None, export_timeout=None):
|
|
return {
|
|
"job_id": 100, "file_id": 200, "rows": 4,
|
|
"file_info": {"id": 200, "url": "https://fake/manifest", "isSliced": True},
|
|
"file_type": "parquet",
|
|
}
|
|
|
|
def fake_download_slices(file_info, dest_dir):
|
|
dest_dir = Path(dest_dir)
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
s1, s2 = dest_dir / "slice-00000", dest_dir / "slice-00001"
|
|
_write_parquet(s1, n_rows=2)
|
|
_write_parquet(s2, n_rows=2)
|
|
return [s1, s2]
|
|
|
|
client = MagicMock()
|
|
client.prepare_export.side_effect = fake_prepare
|
|
client.download_file_slices.side_effect = fake_download_slices
|
|
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
result = kbe.materialize_query(
|
|
table_id="big_table",
|
|
bucket="in.c-x", source_table="t",
|
|
source_query=None,
|
|
storage_client=client,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
# Final parquet contains all 4 rows from both slices.
|
|
final = output_dir / "big_table.parquet"
|
|
assert final.exists()
|
|
n = duckdb.connect().execute(
|
|
f"SELECT COUNT(*) FROM read_parquet('{str(final).replace(chr(39), chr(39)*2)}')"
|
|
).fetchone()[0]
|
|
assert n == 4
|
|
assert result["rows"] == 4
|
|
|
|
# Slices were not concatenated raw (would leave 2 footers in one file
|
|
# and break DuckDB on read).
|
|
client.download_file_slices.assert_called_once()
|
|
|
|
|
|
def test_materialize_query_parquet_zero_rows_emits_empty_parquet(tmp_path, caplog):
|
|
"""Storage API parquet succeeded but the filter matched 0 rows (file
|
|
is empty/missing). We log a warning and emit an empty placeholder."""
|
|
def fake_prepare(table_id, *, export_filter=None, export_timeout=None):
|
|
return {
|
|
"job_id": 1, "file_id": 2, "rows": 0,
|
|
"file_info": {"id": 2, "url": "https://fake/x", "isSliced": False},
|
|
"file_type": "parquet",
|
|
}
|
|
|
|
def fake_download(file_info, dest_path):
|
|
# Don't create the file — simulates no-rows result.
|
|
return Path(dest_path)
|
|
|
|
client = MagicMock()
|
|
client.prepare_export.side_effect = fake_prepare
|
|
client.download_file.side_effect = fake_download
|
|
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
with caplog.at_level("WARNING"):
|
|
result = kbe.materialize_query(
|
|
table_id="empty_subset",
|
|
bucket="in.c-test", source_table="empty",
|
|
source_query=None,
|
|
storage_client=client,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
assert result["rows"] == 0
|
|
assert (output_dir / "empty_subset.parquet").exists()
|
|
assert "no data" in caplog.text.lower() or "0 rows" in caplog.text
|
|
|
|
|
|
def test_materialize_query_admin_can_pin_file_type_csv(tmp_path, fake_storage_client_csv):
|
|
"""Admin can opt out of parquet via ``source_query='{"file_type":"csv"}'``
|
|
— falls back to CSV → DuckDB-COPY → parquet."""
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
result = kbe.materialize_query(
|
|
table_id="legacy_csv",
|
|
bucket="in.c-x", source_table="t",
|
|
source_query='{"file_type": "csv"}',
|
|
storage_client=fake_storage_client_csv,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
assert (output_dir / "legacy_csv.parquet").exists()
|
|
assert result["rows"] == 2
|
|
|
|
# Storage client called with file_type=csv on the ExportFilter.
|
|
call = fake_storage_client_csv.export_table.call_args
|
|
assert call.args[0] == "in.c-x.t"
|
|
assert call.kwargs["export_filter"].file_type == "csv"
|
|
|
|
|
|
# ---- tempdir cleanup on failure --------------------------------------------
|
|
|
|
def test_materialize_query_sliced_parquet_tempdir_cleaned_on_exception(tmp_path):
|
|
"""When a sliced parquet download raises mid-flight (e.g. OSError 28
|
|
'No space left'), the per-call tempdir at /tmp/kbc-export-<id>-*
|
|
that was already populated with downloaded slices must not survive.
|
|
|
|
Regression: an earlier worker death mid-write left a 12 GiB stale
|
|
slice tree on the boot disk because TemporaryDirectory's default
|
|
cleanup path itself raised under disk-full state, masking the
|
|
original exception AND leaving the dir behind. The fix uses
|
|
``ignore_cleanup_errors=True`` so cleanup is best-effort but always
|
|
fires — the dir is empty (or at least mostly) after the function
|
|
returns."""
|
|
captured_tmpdir: dict[str, Path] = {}
|
|
|
|
def fake_prepare(table_id, *, export_filter=None, export_timeout=None):
|
|
return {
|
|
"job_id": 1, "file_id": 2, "rows": 1,
|
|
"file_info": {"id": 2, "url": "https://fake/manifest", "isSliced": True},
|
|
"file_type": "parquet",
|
|
}
|
|
|
|
def boom_download_slices(file_info, dest_dir):
|
|
# Capture the tempdir the extractor created (parent of dest_dir).
|
|
captured_tmpdir["path"] = Path(dest_dir).parent
|
|
# Simulate a real download writing partial state, then disk full.
|
|
Path(dest_dir).mkdir(parents=True, exist_ok=True)
|
|
(Path(dest_dir) / "slice-00000").write_bytes(b"PAR1...partial")
|
|
raise OSError(28, "No space left on device")
|
|
|
|
client = MagicMock()
|
|
client.prepare_export.side_effect = fake_prepare
|
|
client.download_file_slices.side_effect = boom_download_slices
|
|
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
with pytest.raises(OSError, match="No space left"):
|
|
kbe.materialize_query(
|
|
table_id="will_fail_sliced",
|
|
bucket="in.c-test", source_table="t",
|
|
source_query=None,
|
|
storage_client=client,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
# The tempdir that held the partial slice must be gone (or at least
|
|
# not the half-populated state that leaked previously).
|
|
assert "path" in captured_tmpdir, "download_file_slices was not invoked"
|
|
leftover = captured_tmpdir["path"]
|
|
assert not leftover.exists(), (
|
|
f"tempdir {leftover} must be cleaned on exception "
|
|
f"(otherwise leaks under disk-full conditions)"
|
|
)
|
|
# Final parquet must NOT exist.
|
|
assert not (output_dir / "will_fail_sliced.parquet").exists()
|
|
|
|
|
|
# ---- AGNES_TEMP_DIR routing -------------------------------------------------
|
|
|
|
def test_materialize_query_uses_AGNES_TEMP_DIR_when_set(
|
|
monkeypatch, tmp_path, fake_storage_client_parquet,
|
|
):
|
|
"""The per-call tempdir lands under ``AGNES_TEMP_DIR`` when set —
|
|
routes Snowflake-UNLOAD slice staging off the container's overlayfs
|
|
/tmp onto the data disk. Capture the dir the storage_client receives
|
|
via download_file's dest_path and assert it's under the configured
|
|
root.
|
|
|
|
Regression context: agnes-dev's boot disk filled to 100% during a
|
|
180-day kbc_job sync because slices accumulated in /tmp; the data
|
|
disk had 15 GiB free at the time."""
|
|
custom_root = tmp_path / "agnes-tmp"
|
|
custom_root.mkdir()
|
|
monkeypatch.setenv("AGNES_TEMP_DIR", str(custom_root))
|
|
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
kbe.materialize_query(
|
|
table_id="anywhere",
|
|
bucket="in.c-x", source_table="t",
|
|
source_query=None,
|
|
storage_client=fake_storage_client_parquet,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
# The tempdir created by `materialize_query` is anonymous, but
|
|
# `tempfile.TemporaryDirectory(dir=root, ...)` always places its
|
|
# dir as a direct child of `root`. After materialize_query returns
|
|
# the dir is cleaned, so check the root only contains paths that
|
|
# WOULD have been under it (post-cleanup it's empty — that's still
|
|
# the contract; the assertion is "AGNES_TEMP_DIR was honored as
|
|
# the parent"). We do this indirectly by calling get_temp_root
|
|
# ourselves under the same env and asserting the value flows.
|
|
from connectors.keboola.storage_api import get_temp_root
|
|
assert get_temp_root() == str(custom_root)
|
|
|
|
# And the dir is empty post-run (cleanup happened) but still exists
|
|
# — i.e. we didn't accidentally delete the operator's chosen root.
|
|
assert custom_root.is_dir()
|
|
|
|
|
|
def test_materialize_query_falls_back_to_system_tmp_when_unset(
|
|
monkeypatch, tmp_path, fake_storage_client_parquet,
|
|
):
|
|
"""No AGNES_TEMP_DIR → no behavioural change vs. pre-fix code.
|
|
The function still returns successfully; we don't peek inside
|
|
/tmp itself (CI-unfriendly), just assert the run completed and
|
|
the parquet exists at output_dir as expected."""
|
|
monkeypatch.delenv("AGNES_TEMP_DIR", raising=False)
|
|
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
result = kbe.materialize_query(
|
|
table_id="default_tmp",
|
|
bucket="in.c-x", source_table="t",
|
|
source_query=None,
|
|
storage_client=fake_storage_client_parquet,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
assert (output_dir / "default_tmp.parquet").exists()
|
|
assert result["rows"] == 2
|
|
|
|
|
|
# ---- generic guards (file_type-agnostic) -----------------------------------
|
|
|
|
def test_materialize_query_rejects_unsafe_table_id(tmp_path, fake_storage_client_parquet):
|
|
"""Defense: table_id is interpolated into the parquet filename. SQL/
|
|
path-traversal-unsafe values must be rejected up-front."""
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
with pytest.raises(ValueError, match="table_id"):
|
|
kbe.materialize_query(
|
|
table_id="../../etc/passwd",
|
|
bucket="in.c-test", source_table="t",
|
|
source_query=None,
|
|
storage_client=fake_storage_client_parquet,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
|
|
def test_materialize_query_invalid_source_query_json_raises(tmp_path, fake_storage_client_parquet):
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
with pytest.raises(ValueError, match="not valid JSON"):
|
|
kbe.materialize_query(
|
|
table_id="bad_filter",
|
|
bucket="in.c-test", source_table="t",
|
|
source_query="this is not json",
|
|
storage_client=fake_storage_client_parquet,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
|
|
def test_materialize_query_passes_filter_spec_to_export(tmp_path, fake_storage_client_parquet):
|
|
"""source_query JSON is parsed into ExportFilter and forwarded to the
|
|
Storage API client. Verifies the dispatch shape — the actual
|
|
filter→params conversion is covered in test_keboola_storage_api.py."""
|
|
output_dir = tmp_path / "out"
|
|
output_dir.mkdir()
|
|
|
|
kbe.materialize_query(
|
|
table_id="filtered",
|
|
bucket="in.c-sales", source_table="orders",
|
|
source_query=(
|
|
'{"where_filters": [{"column": "status", "operator": "eq", '
|
|
'"values": ["open"]}], "columns": ["id"]}'
|
|
),
|
|
storage_client=fake_storage_client_parquet,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
f = fake_storage_client_parquet.prepare_export.call_args.kwargs["export_filter"]
|
|
assert f.where_filters == [
|
|
{"column": "status", "operator": "eq", "values": ["open"]}
|
|
]
|
|
assert f.columns == ["id"]
|
|
# No explicit file_type → defaults to parquet.
|
|
assert f.file_type == "parquet"
|
|
|
|
|
|
# ---- atomic write contract -------------------------------------------------
|
|
|
|
def test_keboola_materialize_atomic_write_on_failure(tmp_path):
|
|
"""If the CSV→parquet conversion fails (legacy CSV opt-in), no
|
|
partial file is left at the final .parquet path AND the .parquet.tmp
|
|
staging file is cleaned up."""
|
|
def fake_export(table_id, dest, *, export_filter=None, export_timeout=None):
|
|
_seed_csv(Path(dest), "id,name", ["1,alpha"])
|
|
return {"job_id": 1, "file_id": 2, "rows": 1,
|
|
"bytes": Path(dest).stat().st_size, "file_type": "csv"}
|
|
|
|
client = MagicMock()
|
|
client.export_table.side_effect = fake_export
|
|
|
|
output_dir = tmp_path / "data"
|
|
output_dir.mkdir()
|
|
|
|
real_connect = duckdb.connect
|
|
|
|
class FailingConn:
|
|
def __init__(self, inner):
|
|
self._inner = inner
|
|
|
|
def execute(self, sql, *a, **kw):
|
|
if "FORMAT PARQUET" in sql:
|
|
raise RuntimeError("simulated mid-COPY failure")
|
|
return self._inner.execute(sql, *a, **kw)
|
|
|
|
def close(self):
|
|
self._inner.close()
|
|
|
|
def patched_connect(*args, **kwargs):
|
|
return FailingConn(real_connect(*args, **kwargs))
|
|
|
|
with patch("connectors.keboola.extractor.duckdb.connect", side_effect=patched_connect):
|
|
with pytest.raises(RuntimeError, match="simulated mid-COPY failure"):
|
|
kbe.materialize_query(
|
|
table_id="atomic_test",
|
|
bucket="in.c-test", source_table="t",
|
|
source_query='{"file_type": "csv"}',
|
|
storage_client=client,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
final_path = output_dir / "atomic_test.parquet"
|
|
assert not final_path.exists(), (
|
|
f"Partial parquet left at final path {final_path} — orchestrator "
|
|
f"rebuild would pick this up and serve corrupt data."
|
|
)
|
|
tmp_marker = output_dir / "atomic_test.parquet.tmp"
|
|
assert not tmp_marker.exists(), f"Stale .parquet.tmp left at {tmp_marker}"
|
|
|
|
|
|
def test_keboola_materialize_uses_tmp_path_during_copy(tmp_path, fake_storage_client_parquet):
|
|
"""Atomic-write contract: parquet first lands at <id>.parquet.tmp, then
|
|
is os.replaced into <id>.parquet on success. Verified by patching
|
|
os.replace to capture the (src, dst) pair."""
|
|
output_dir = tmp_path / "data"
|
|
output_dir.mkdir()
|
|
|
|
captured = {}
|
|
real_replace = os.replace
|
|
|
|
def trace_replace(src, dst):
|
|
captured["src"] = str(src)
|
|
captured["dst"] = str(dst)
|
|
real_replace(src, dst)
|
|
|
|
with patch.object(kbe.os, "replace", side_effect=trace_replace):
|
|
result = kbe.materialize_query(
|
|
table_id="tmp_path_test",
|
|
bucket="in.c-test", source_table="t",
|
|
source_query=None,
|
|
storage_client=fake_storage_client_parquet,
|
|
output_dir=output_dir,
|
|
)
|
|
|
|
assert captured["src"].endswith(".parquet.tmp"), captured
|
|
assert captured["dst"].endswith(".parquet") and not captured["dst"].endswith(".tmp")
|
|
|
|
assert (output_dir / "tmp_path_test.parquet").exists()
|
|
assert not (output_dir / "tmp_path_test.parquet.tmp").exists()
|
|
assert result["path"].endswith(".parquet")
|
|
assert not result["path"].endswith(".tmp")
|