agnes-the-ai-analyst/connectors/keboola/extractor.py
ZdenekSrotyr 28430ced09
Keboola cutover: native parquet path + sync correctness + auto-discover protection (#190)
* fix: cutover regressions + parallel Keboola legacy fallback

Bundled fixes from a fresh-deploy run on a Keboola Storage backend with
the block-shared-snowflake-access feature flag — DuckDB Keboola
extension's per-table scan can't access bucket schemas, so the legacy
kbcstorage Storage-API client is the only working path.

CUTOVER REGRESSIONS

- agnes pull hash mismatch on every Keboola local-mode table —
  src/orchestrator.py:_update_sync_state stored md5(mtime+size)[:12]
  while the CLI compares against full 32-char content MD5. Now stores
  the same content MD5 the materialized SQL path already used.

- Trailing-slash sanitization in connectors/keboola/access.py and
  extractor.py — DuckDB Keboola extension's ATTACH fails when the URL
  ends in / (canonical form).

- src/profiler.py:TableInfo.description becomes optional — two call
  sites instantiated without it, crashing the profiler pass.

- scripts/ops/agnes-auto-upgrade.sh: chown on UID change — older images
  ran as root, current runs as agnes (uid 999). Reads target uid:gid
  from /etc/passwd inside the new image and chowns ${STATE_DIR},
  /data/extracts, /data/analytics when the digest moves.

- POST /api/sync/trigger is now singleton per process — two
  near-simultaneous trigger calls each forked an extractor subprocess,
  fought for extract.duckdb's file lock, starved uvicorn, flipped the
  container to unhealthy. Trigger now returns 409
  (sync_already_in_progress) when held; _run_sync acquires non-blocking.

PARALLEL LEGACY FALLBACK

- Process pool fan-out for the _extract_via_legacy queue (default 8
  workers, override via AGNES_KEBOOLA_PARALLELISM). Process pool, not
  thread pool, because connectors/keboola/client.py:export_table does
  os.chdir(temp_dir) — process-global, so threads raced and slice files
  landed in the wrong directory ("[Errno 2] No such file or directory:
  '<job_id>.csv_X_Y_Z.csv'").

- Extractor subprocess timeout 1800s -> 3600s (configurable via
  AGNES_EXTRACTOR_TIMEOUT_SEC). 28+ tables × multi-minute Keboola export
  jobs need the headroom on telemetry-class projects.

- Process group cleanup on timeout — Popen(start_new_session=True) puts
  the extractor in its own group. On timeout the parent SIGTERMs the
  group (10s grace) then SIGKILLs stragglers. Without this, the pool
  workers were reparented to PID 1 and continued holding open Keboola
  Storage export jobs. Inline extractor script also installs a SIGTERM
  -> sys.exit(143) handler so the with ProcessPoolExecutor(...) block
  __exit__ runs cleanly.

Tests: existing tests that patched subprocess.run updated to patch
subprocess.Popen with a _FakePopen stand-in (same exit-code-injection
contract). Two tests that exercised the parallel path forced
AGNES_KEBOOLA_PARALLELISM=1 to keep mocks alive (mocks don't ride into
ProcessPoolExecutor subprocesses).

Squashed onto current main (was 7 commits + multi-commit CHANGELOG +
agnes-auto-upgrade.sh conflicts; squash avoids per-commit conflict
resolution against main's flat-mount STATE_DIR refactor and 0.38.0
release cut).

* feat(keboola): Storage API direct extract path; drop extension data path

The DuckDB Keboola extension's COPY routes through Keboola QueryService,
which is unreliable on linked-bucket projects (extension v0.1.6 fixes
that case but isn't yet in the community CDN, and pre-fix any project
with the block-shared-snowflake-access feature flag couldn't see bucket
schemas at all). Move the extract path off the extension entirely and
talk to the Storage API directly via signed-URL download — works on any
project, regardless of extension state.

connectors/keboola/storage_api.py (NEW)
  Lightweight client built on requests.Session. Three endpoints:
  - POST /v2/storage/tables/{id}/export-async        (kicks off job)
  - GET  /v2/storage/jobs/{id}                        (poll until done)
  - GET  /v2/storage/files/{id}?federationToken=1     (signed URL detail)
  - GET  <signed_url>                                 (download bytes)
  Supports sliced exports (manifest + per-slice signed URLs) and gzipped
  payloads. ExportFilter dataclass mirrors the Keboola filter spec
  (whereFilters / columns / changedSince / limit) and handles JSON
  round-trip with the registry's source_query column. Token redaction
  in error messages. Bounded exponential backoff on job polling.
  No cloud-SDK dependency on the data path; thread-safe.

connectors/keboola/extractor.py
  - materialize_query() rewritten: takes bucket/source_table/source_query
    (JSON filter spec), exports via KeboolaStorageClient, converts CSV
    to parquet via DuckDB, atomic os.replace. Same return shape so
    sync.py downstream code stays uniform with the BQ branch.
  - _extract_via_legacy() also moved to Storage API direct (kept the
    name for caller compatibility with _legacy_worker / the parallel
    batch extractor). Per-call temp directories — no os.chdir, threads
    don't race.

app/api/sync.py
  _run_materialized_pass for source_type='keboola' rows now constructs a
  KeboolaStorageClient (replaces KeboolaAccess) and passes
  bucket/source_table/source_query to materialize_query. Reuses one
  client across rows for HTTP keep-alive. Sources keboola URL from env
  too (KEBOOLA_STACK_URL) when instance.yaml doesn't have stack_url
  configured.

cli/commands/admin.py
  discover-and-register defaults Keboola rows to query_mode='materialized'
  (NULL source_query = full table), matching the v26 migration's
  unification of the local/materialized split for Keboola. BigQuery and
  Jira keep their per-source defaults.

src/db.py
  Schema bump 25 → 26. Migration: UPDATE table_registry SET
  query_mode='materialized' WHERE source_type='keboola' AND
  query_mode='local'. NULL source_query on those rows means "full table
  export" — same effective behavior the local mode provided, but now
  via Storage API instead of the extension.

pyproject.toml
  kbcstorage dep stays (admin-side bucket/table list still uses the
  SDK in app/api/admin.py / connectors/keboola/client.py); only the
  data path is migrated off the SDK. Comment updated to reflect the
  new boundary.

tests
  - test_keboola_storage_api.py (NEW, 19 tests): ExportFilter parsing,
    HTTP client (token redaction, retry logic, polling), download_file
    (single, gzipped, sliced), end-to-end export_table_to_csv.
  - test_keboola_materialize.py rewritten: mocks KeboolaStorageClient
    instead of FakeAccess; same atomic-write + zero-rows + unsafe-id
    contracts.
  - test_sync_trigger_keboola_materialized.py: registry rows now carry
    bucket+source_table+JSON-shape source_query.

114+ Keboola-impacted tests green locally.

* test: schema version assertion bumped to 26 alongside the keboola query_mode migration

* fix(keboola): cutover hot-patches surfaced on agnes-dev

Five small fixes that were applied as in-container hot-patches during
agnes-dev cutover and need to be on the source-of-truth image so a fresh
upgrade does not undo them.

- app/api/sync.py: auto-discover gate considers the WHOLE registry (any
  source, any mode), not just rows where source matches and query_mode
  is local. After the v25→v26 keboola materialized migration an
  instance can have 30 materialized rows and zero local rows; the
  previous gate kept re-firing _discover_and_register_tables every
  scheduler tick, creating duplicate auto-discovered rows with the
  wrong bucket prefix every time.

- app/api/admin.py: _discover_and_register_tables reassembles the
  bucket as <stage>.<bucket-id> (e.g. in.c-finance) instead of
  dropping the stage prefix; default query_mode for keboola is now
  materialized (the v26 contract); validator allows NULL source_query
  for keboola materialized rows (full-table export via Storage API
  export-async, no SQL needed).

- cli/commands/admin.py: register-table mirrors the server validator
  (NULL source_query allowed for source_type=keboola); --bucket help
  text generalized to cover both BQ dataset and Keboola bucket id.

- connectors/keboola/extractor.py: max_line_size=64 MiB on
  read_csv_auto so embedded JSON / SQL cells (kbc_component_configuration
  in particular) do not trip the default 2 MiB ceiling.

- connectors/keboola/storage_api.py: GCP backend support — when the
  Storage API returns a manifest whose slice URLs are gs://
  references with a gcsCredentials block, rewrite to the JSON REST
  download endpoint and authenticate with the issued OAuth bearer
  token; redact tokens in any surfaced error string.

* test: align with new keboola materialized + auto-discover-gate contracts

- test_admin_keboola_materialized: rename
  test_register_keboola_materialized_rejects_missing_source_query →
  test_register_keboola_materialized_accepts_missing_source_query.
  v25→v26 introduced 'keboola materialized with NULL source_query
  means full-table export via Storage API export-async' as the
  default registration shape; the rejection case is no longer the
  contract.

- test_sync_filter: add list_all() to _StubRegistry. The auto-discover
  gate in _run_sync now keys off the WHOLE registry (not just local
  rows) so materialized-only Keboola instances do not re-trigger
  discovery on every tick.

* feat(keboola): native parquet export — skip CSV roundtrip

Storage API export-async accepts fileType={csv,parquet}. Switching the
materialized sync to parquet eliminates the CSV → DuckDB COPY → parquet
roundtrip that pinned a single uvicorn worker over 4 GiB on multi-GB
tables (read_csv with all_varchar + max_line_size=64MB has to
materialize the whole CSV in memory before COPY can stream out a
parquet). Snowflake UNLOAD on Keboola's side already produces typed,
self-contained parquet files; the extractor downloads them and renames
into place.

Two cases:

- **Single-file** export (small table): file_info.url points at one
  signed URL; download_file streams chunks straight to .parquet.tmp
  and we're done. No DuckDB.

- **Sliced** export (Snowflake UNLOAD respects MAX_FILE_SIZE — 16 MiB
  default — so anything larger arrives as N parquet slices): each
  slice is a complete parquet file with its own footer; naive concat
  would corrupt them. download_file_slices keeps the slices as
  separate files in a tempdir, then DuckDB COPY (SELECT * FROM
  read_parquet([slice0, slice1, ...])) merges them into one
  consolidated parquet. DuckDB streams row groups during this — peak
  memory bounded to one row group (~1 MiB) regardless of source size.

The legacy CSV path stays as the explicit opt-in via source_query=
'{"file_type":"csv"}' for projects whose backend can't UNLOAD
parquet (none known today; cheap escape hatch). Backward-compat alias
KeboolaStorageClient.export_table_to_csv kept.

Also fixes a latent bug in download_file's gzip detection: previous
heuristic flagged any unencrypted file as gzipped, which would have
corrupted parquet downloads at gunzip time. Name-suffix-only now.

* fix: tempdir leak cleanup, every 0m schedule, /sync/trigger body shapes

Three small self-contained fixes uncovered during agnes-dev cutover.

- connectors/keboola/extractor.py: tempfile.TemporaryDirectory now uses
  ignore_cleanup_errors=True so a worker death mid-write doesn't leave
  multi-GiB stale slice trees on the boot disk. (12 GiB seen after a
  disk-full crash where TemporaryDirectory's own cleanup also raised
  and got swallowed.)

- src/scheduler.py: is_valid_schedule accepts 'every 0m' (interval=0
  = always due). Force-resync of an errored row no longer requires
  waiting out the default 'every 1h' interval — admin can flip the
  schedule, trigger, then flip back.

- app/api/sync.py: POST /api/sync/trigger accepts both ['table_id']
  (legacy bare-array body) and {'tables': ['table_id']} (matches the
  response payload shape, more discoverable for clients building
  requests by hand). Malformed bodies return 422 with a structured
  detail; null/missing means 'sync everything' as before.

Tests cover: tempdir cleanup on raise (sliced parquet path),
is_valid_schedule + is_table_due 'every 0m' acceptance, and trigger
body parametrized matrix (8 valid shapes + 6 rejection cases).

* fix: targeted-trigger filter in materialized pass + auto-upgrade defer

Two operational gaps observed during agnes-dev cutover, in the same
sync-routing area.

- _run_materialized_pass now takes a 'tables' arg and skips rows not in
  the target set with reason='not_in_target'. POST /api/sync/trigger
  with a body of tables previously only scoped the legacy extractor
  subprocess — the materialized pass kept iterating every due
  materialized row, so an admin asking to re-sync kbc_job re-ran
  every other due materialized row alongside it. Match on registry id
  OR name (admins commonly pass either form). tables=None preserves
  the no-filter behavior.

- New GET /api/sync/status (public, no auth) returns {locked: bool}
  off _sync_lock.locked(). agnes-auto-upgrade.sh probes this before
  docker compose up -d and exits 0 with a 'deferred recreate' log
  line if a sync is in flight — the next 5-min cron tick retries.
  Pre-fix, an auto-upgrade triggered mid-sync would recreate the
  uvicorn worker and kill the in-flight extractor / Snowflake-UNLOAD
  download (observed when kbc_job's first 7-day retry got SIGKILLed).
  Connection failures in the probe fall through to the upgrade —
  being stuck on a wedged image is worse than interrupting a
  hypothetical sync.

* fix: auto-discover protects admin overrides + surfaces drift

Two real-world incidents on agnes-dev drove this:

1. kbc_job was registered manually with the correct
   (in.c-kbc_telemetry, kbc_job) coordinates. A naive auto-discover
   re-run would have inserted a SECOND kbc_job row at the slugified
   id 'in_c-keboola-storage_kbc_job' (where Keboola's discovery
   places it) — and that row's Storage API export-async 404s.

2. An earlier auto-discover bug stripped the stage prefix from
   bucket ids ('c-finance' instead of 'in.c-finance'), inserting
   137 rows whose syncs all failed.

Fix:

- _discover_and_register_tables now builds a plan first
  (_build_keboola_discovery_plan) classifying each discovered table
  into one of new / existing_match / existing_drift / invalid, then
  executes only the 'new' bucket. Drift rows are reported with both
  sides of the disagreement plus drift_kind:
  - same_id_diff_coords: registry has the same id but different
    bucket / source_table (admin migrated coords inline).
  - name_collision: discovery's slugified id differs from any
    registry id, but the discovered .name matches an existing row's
    .name (case-insensitive). Catches the kbc_job case.

- Bucket detection now prefers the API's authoritative bucket_id
  field (separate field on the Keboola tables.list response,
  normalised by KeboolaClient.discover_all_tables). Falls back to
  id-string parsing only when bucket_id is missing (older fallback
  path inside discover_all_tables).

- Endpoint POST /api/admin/discover-and-register?dry_run=true
  returns the plan without writing — would_register, drift,
  invalid lists. Lets an operator audit before merging discovery
  with a registry that has admin overrides.

Removed 'every 0m' from test_register_request_rejects_malformed_sync_schedule
— the runtime started accepting it in the previous commit (force-resync
override) and the validator follows suit.

* feat(keboola): AGNES_TEMP_DIR routes tempfiles off overlayfs /tmp

The container's /tmp lives on the boot disk's overlayfs (29 GiB on
agnes-dev, shared with /var). Snowflake UNLOAD of a wide table writes
slices into per-call /tmp tempdirs that fill multi-GiB / many-slice
exports long before the dedicated data disk fills. agnes-dev hit
100% boot-disk while the 20 GiB data disk had 15 GiB free.

connectors.keboola.storage_api.get_temp_root() reads AGNES_TEMP_DIR;
mkdirs the target on first use; unset / empty / unwritable falls
back to None (system tempdir, OSS-pre-fix behaviour). Both
materialize_query (parquet path) and _extract_via_legacy (CSV
fallback) and the sliced-CSV concat path in storage_api use the
helper now.

docker-compose.yml defaults AGNES_TEMP_DIR=/data/tmp on app, scheduler,
and extract services. The data volume is the dedicated disk in
production layouts and a plain docker volume in single-disk
dev/laptop setups — same blast radius as the previous /tmp default
on the latter, no regression.
2026-05-07 12:12:14 +02:00

734 lines
32 KiB
Python

"""Keboola extractor — produces extract.duckdb + data/*.parquet using DuckDB Keboola extension."""
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Any, Optional
import duckdb
from src.identifier_validation import (
is_safe_quoted_identifier,
validate_identifier,
validate_quoted_identifier,
)
logger = logging.getLogger(__name__)
def materialize_query(
table_id: str,
*,
bucket: str,
source_table: str,
source_query: Optional[str] = None,
storage_client=None, # KeboolaStorageClient (avoid circular import)
keboola_url: Optional[str] = None,
keboola_token: Optional[str] = None,
output_dir: Path,
) -> dict:
"""Materialize a Keboola Storage table to a local parquet via Storage API.
Replaces the previous DuckDB-extension path. The extension's QueryService
scan is unreliable on linked-bucket projects (keboola/duckdb-extension#17;
fix shipped upstream as v0.1.6 but not yet in the community CDN, and on
flag-restricted projects the pre-fix workspace role wouldn't have GRANTs
on the bucket schema anyway). The Storage API export-async path always
works regardless of project flags.
Parallel of `connectors/bigquery/extractor.py:materialize_query` in
surface — same return shape, same atomic write, same MD5 contract — but
the inputs differ because Keboola's structured filter spec replaces
BQ's free-form SQL.
Args:
table_id: parquet filename + sync_state key (must be a safe ident).
bucket: Keboola bucket id, e.g. ``in.c-crm``.
source_table: table id within the bucket, e.g. ``orders``.
source_query: optional JSON string with a Storage API filter spec
(see `storage_api.ExportFilter`). Empty / NULL = full table.
storage_client: pre-built `KeboolaStorageClient` (preferred — lets
sync.py share one across rows). When omitted, ``keboola_url``
and ``keboola_token`` are used to construct a one-shot client.
keboola_url, keboola_token: alternative to ``storage_client`` for
single-call usage (tests, ad-hoc).
output_dir: directory to write `<table_id>.parquet`.
Returns:
``{"table_id", "path", "rows", "bytes", "md5"}`` — same shape the
BQ branch returns, so ``app/api/sync.py:_run_materialized_pass``
downstream code stays uniform.
"""
import re
import hashlib
import json
import duckdb
if not re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", table_id):
raise ValueError(f"unsafe table_id for materialize: {table_id!r}")
# Lazy import to avoid pulling `requests` at module import time when only
# the sync trigger imports `extractor` for `run()`.
from connectors.keboola.storage_api import (
FILE_TYPE_CSV, FILE_TYPE_PARQUET, ExportFilter, KeboolaStorageClient,
)
if storage_client is None:
if not (keboola_url and keboola_token):
raise ValueError(
"materialize_query requires either storage_client or "
"(keboola_url + keboola_token)"
)
storage_client = KeboolaStorageClient(url=keboola_url, token=keboola_token)
# Filter spec is optional. Admin can register a row with no
# source_query at all (= full-table export), or with a JSON object
# describing whereFilters / columns / changedSince / file_type.
payload: dict = {}
if source_query:
try:
payload = json.loads(source_query)
except json.JSONDecodeError as e:
raise ValueError(
f"source_query for {table_id} is not valid JSON: {e}"
) from e
export_filter = ExportFilter.from_dict(payload)
# Default the materialized path to parquet — Storage API serves it
# via native Snowflake UNLOAD, the extractor renames it into place,
# no CSV intermediate, no DuckDB COPY, no peak-memory load. Admin
# can pin `{"file_type":"csv"}` in source_query to fall back (legacy
# debugging, or projects whose backend can't UNLOAD parquet — none
# known today, but the escape hatch costs nothing). Only override
# when the admin spec didn't *explicitly* set a file_type.
if "file_type" not in payload and "fileType" not in payload:
export_filter.file_type = FILE_TYPE_PARQUET
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
parquet_path = output_dir / f"{table_id}.parquet"
tmp_parquet = output_dir / f"{table_id}.parquet.tmp"
if tmp_parquet.exists():
tmp_parquet.unlink()
# Per-call temp dir for the intermediate file (CSV or parquet) —
# separates concurrent exports cleanly without the os.chdir() race
# the kbcstorage SDK has. ``ignore_cleanup_errors=True`` keeps
# disk-full / permission errors from masking the original
# exception, and prevents a half-cleaned dir from sitting around
# forever (a 12 GiB stale slice tree was seen after a worker died
# mid-write on a saturated boot disk). ``dir=get_temp_root()``
# routes to ``AGNES_TEMP_DIR`` when the operator has steered
# tempfiles off the overlayfs (e.g. onto the data disk) — see
# storage_api.get_temp_root for the rationale.
import tempfile
from connectors.keboola.storage_api import get_temp_root
with tempfile.TemporaryDirectory(
prefix=f"kbc-export-{table_id}-",
dir=get_temp_root(),
ignore_cleanup_errors=True,
) as tmpdir:
full_table_id = f"{bucket}.{source_table}"
if export_filter.file_type == FILE_TYPE_PARQUET:
# Native parquet path. Storage API serves Snowflake UNLOAD
# output directly. Two shapes to handle:
#
# 1. **Single file** (small exports): file_info.url points at
# one signed URL; download to tmp_parquet and we're done.
# 2. **Sliced** (large exports — Snowflake UNLOAD respects
# MAX_FILE_SIZE, default 16 MiB, so anything past that
# arrives as a manifest of N parquet slices). Each slice
# is itself a complete parquet file with its own footer;
# naively concatenating them like CSV would be invalid.
# We download all slices into the per-call tempdir, then
# DuckDB-COPY across `read_parquet([slice1, slice2, ...])`
# into one consolidated tmp_parquet. DuckDB streams row
# groups during this consolidation — peak memory is one
# row group (~1 MiB), not the full table.
stats = storage_client.prepare_export(
full_table_id, export_filter=export_filter,
)
file_info = stats["file_info"]
if file_info.get("isSliced"):
slice_dir = Path(tmpdir) / "slices"
slice_paths = storage_client.download_file_slices(
file_info, slice_dir
)
if not slice_paths:
raise RuntimeError(
f"sliced parquet export for {full_table_id} "
f"yielded no slices"
)
quoted = ", ".join(
"'" + str(p).replace("'", "''") + "'" for p in slice_paths
)
safe_tmp = str(tmp_parquet).replace("'", "''")
conv = duckdb.connect()
try:
conv.execute(
f"COPY (SELECT * FROM read_parquet([{quoted}])) "
f"TO '{safe_tmp}' (FORMAT PARQUET)"
)
finally:
conv.close()
else:
storage_client.download_file(file_info, tmp_parquet)
stats["bytes"] = (
tmp_parquet.stat().st_size if tmp_parquet.exists() else 0
)
if not tmp_parquet.exists() or tmp_parquet.stat().st_size == 0:
logger.warning(
"Storage API parquet export for %s returned no data "
"(filter may be too restrictive)",
full_table_id,
)
# Empty placeholder parquet so the orchestrator doesn't
# choke on a missing file.
duckdb.connect().execute(
f"COPY (SELECT 1 AS _empty WHERE FALSE) TO '{tmp_parquet}' (FORMAT PARQUET)"
).close()
else:
# Legacy CSV path. Kept for the explicit `{"file_type":"csv"}`
# opt-in. Slower (CSV parse + parquet rewrite) and
# memory-heavier (DuckDB pulls the CSV into a buffer with
# max_line_size headroom), but doesn't depend on Storage
# API parquet support if a future project backend lacks it.
csv_path = Path(tmpdir) / f"{table_id}.csv"
stats = storage_client.export_table(
full_table_id, csv_path, export_filter=export_filter,
)
if not csv_path.exists() or csv_path.stat().st_size == 0:
logger.warning(
"Storage API CSV export for %s returned no data "
"(filter may be too restrictive)",
full_table_id,
)
duckdb.connect().execute(
f"COPY (SELECT 1 AS _empty WHERE FALSE) TO '{tmp_parquet}' (FORMAT PARQUET)"
).close()
else:
# CSV → parquet via DuckDB. `all_varchar=True` matches the
# legacy client's behavior — preserves the source's exact
# character data without DuckDB's type inference rewriting
# numeric-looking strings (e.g. "Non-Manager") as NULL.
#
# `max_line_size=64MB` overrides DuckDB's default 2 MB cap
# on any single CSV line. Keboola tables that store
# embedded JSON / SQL transformation bodies routinely
# have multi-MB cells (e.g. `kbc_component_configuration`
# rows ship full Snowflake transformation SQL inline as
# a JSON column value); the default 2 MB ceiling rejects
# them with `Maximum line size of 2000000 bytes
# exceeded`. 64 MB is generous enough to absorb any
# reasonable embedded blob; DuckDB allocates a single
# buffer of this size per worker thread.
safe_csv = str(csv_path).replace("'", "''")
safe_tmp = str(tmp_parquet).replace("'", "''")
try:
conv = duckdb.connect()
conv.execute(
f"COPY (SELECT * FROM read_csv('{safe_csv}', "
f"all_varchar=true, max_line_size=67108864)) "
f"TO '{safe_tmp}' (FORMAT PARQUET)"
)
conv.close()
except Exception:
if tmp_parquet.exists():
tmp_parquet.unlink()
raise
# Row count from the parquet, not from `stats["rows"]` — Storage API
# sometimes omits totalRowsCount on small results, and the parquet is
# the authoritative count we'll be serving downstream anyway.
safe_tmp = str(tmp_parquet).replace("'", "''")
cnt_conn = duckdb.connect()
try:
row_count = cnt_conn.execute(
f"SELECT COUNT(*) FROM read_parquet('{safe_tmp}')"
).fetchone()[0]
finally:
cnt_conn.close()
# Streaming MD5 — bounded memory regardless of parquet size.
h = hashlib.md5()
with open(tmp_parquet, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
md5 = h.hexdigest()
size = tmp_parquet.stat().st_size
os.replace(tmp_parquet, parquet_path)
if row_count == 0:
logger.warning(
"Materialized Keboola export for %s wrote 0 rows — verify the "
"filter and that the source bucket has data.",
table_id,
)
return {
"table_id": table_id,
"path": str(parquet_path),
"rows": row_count,
"bytes": size,
"md5": md5,
}
def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None:
"""Create the _meta table required by the extract.duckdb contract."""
conn.execute("DROP TABLE IF EXISTS _meta")
conn.execute("""CREATE TABLE _meta (
table_name VARCHAR NOT NULL,
description VARCHAR,
rows BIGINT,
size_bytes BIGINT,
extracted_at TIMESTAMP,
query_mode VARCHAR DEFAULT 'local'
)""")
def _create_remote_attach_table(conn: duckdb.DuckDBPyConnection, keboola_url: str) -> None:
"""Write _remote_attach so orchestrator can re-ATTACH the Keboola extension."""
conn.execute("DROP TABLE IF EXISTS _remote_attach")
conn.execute("""CREATE TABLE _remote_attach (
alias VARCHAR,
extension VARCHAR,
url VARCHAR,
token_env VARCHAR
)""")
conn.execute(
"INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
["kbc", "keboola", keboola_url, "KEBOOLA_STORAGE_TOKEN"],
)
def _try_attach_extension(conn: duckdb.DuckDBPyConnection, keboola_url: str, keboola_token: str) -> bool:
"""Try to install and attach the Keboola DuckDB extension. Returns True on success."""
try:
conn.execute("INSTALL keboola FROM community; LOAD keboola;")
escaped_token = keboola_token.replace("'", "''")
# Strip trailing slash — the Keboola DuckDB extension's ATTACH fails
# with a network error when the URL ends in `/` (e.g. the canonical
# `https://connection.us-east4.gcp.keboola.com/` form). Bare host
# works.
attach_url = keboola_url.rstrip("/")
conn.execute(f"ATTACH '{attach_url}' AS kbc (TYPE keboola, TOKEN '{escaped_token}')")
logger.info("Using DuckDB Keboola extension")
return True
except Exception as e:
logger.warning("Keboola extension unavailable (%s), falling back to legacy client", e)
return False
def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str, keboola_token: str) -> Dict[str, Any]:
"""Extract tables from Keboola into output_dir using DuckDB extension.
Args:
output_dir: Path to write extract.duckdb + data/
table_configs: List of table config dicts from table_registry
keboola_url: Keboola stack URL
keboola_token: Keboola Storage API token
Returns:
Dict with extraction stats: {tables_extracted: int, tables_failed: int, errors: list}
"""
output_path = Path(output_dir)
data_dir = output_path / "data"
data_dir.mkdir(parents=True, exist_ok=True)
# Write to temp file then rename — avoids lock conflict with orchestrator
# which may hold a read lock on the existing extract.duckdb
db_path = output_path / "extract.duckdb"
tmp_db_path = output_path / "extract.duckdb.tmp"
if tmp_db_path.exists():
tmp_db_path.unlink()
conn = duckdb.connect(str(tmp_db_path))
stats = {"tables_extracted": 0, "tables_failed": 0, "errors": []}
now = datetime.now(timezone.utc)
# Per-table workitems whose extension scan failed and need the legacy
# Storage-API fallback. Drained in a parallel pool below the per-table
# serial loop. Items are `(tc, pq_path)` tuples.
legacy_queue: List[tuple] = []
try:
# Try DuckDB Keboola extension
use_extension = _try_attach_extension(conn, keboola_url, keboola_token)
_create_meta_table(conn)
has_remote = any(tc.get("query_mode") == "remote" for tc in table_configs)
if has_remote and use_extension:
_create_remote_attach_table(conn, keboola_url)
for tc in table_configs:
table_name = tc["name"]
query_mode = tc.get("query_mode", "local")
# Materialized rows are written by the sync trigger pass via
# `materialize_query()` — they live as parquets in
# /data/extracts/keboola/data/, picked up by the orchestrator's
# standard local-parquet discovery. Don't extract here (would
# double-write data via the source bucket reference and confuse
# sync_state bookkeeping). Mirror of the BQ extractor's skip at
# connectors/bigquery/extractor.py:190.
if query_mode == "materialized":
logger.info(
"Skipping legacy extract for %s — query_mode='materialized', "
"handled by _run_materialized_pass instead",
tc.get("id") or tc.get("name"),
)
continue
# #81 Group D — refuse rows whose identifiers don't pass the
# whitelist. The registry is admin-controlled but anyone with
# write access can otherwise inject SQL via the CREATE VIEW /
# COPY / SELECT interpolation below. Skip-and-continue rather
# than crashing the whole extraction; valid rows still process.
#
# `table_name` is the DuckDB view name in the master
# analytics DB. The orchestrator uses the STRICT validator
# (`^[a-zA-Z_][a-zA-Z0-9_]{0,63}$`) when re-creating views,
# so any name with `-` or `.` would pass extraction here
# but be silently dropped at orchestrator-rebuild time.
# Use the strict validator here too so the failure is
# caught early and visible in tables_failed.
if not validate_identifier(table_name, "Keboola table_name"):
stats["tables_failed"] += 1
stats["errors"].append({"table": table_name, "error": "unsafe identifier"})
continue
if query_mode == "remote":
# Create view pointing to kbc extension (requires re-ATTACH at query time)
bucket = tc.get("bucket", "")
source_table = tc.get("source_table", table_name)
if not (
validate_quoted_identifier(bucket, "Keboola bucket")
and validate_quoted_identifier(source_table, "Keboola source_table")
):
stats["tables_failed"] += 1
stats["errors"].append({"table": table_name, "error": "unsafe bucket/source_table"})
continue
if use_extension and bucket:
conn.execute(
f'CREATE OR REPLACE VIEW "{table_name}" AS SELECT * FROM kbc."{bucket}"."{source_table}"'
)
conn.execute(
"INSERT INTO _meta VALUES (?, ?, 0, 0, ?, 'remote')",
[table_name, tc.get("description", ""), now],
)
stats["tables_extracted"] += 1
continue
try:
pq_path = str(data_dir / f"{table_name}.parquet")
if use_extension:
try:
_extract_via_extension(conn, tc, pq_path)
except Exception as ext_err:
# ATTACH succeeded but the per-table COPY failed —
# most commonly a Keboola QueryService permission error
# (`Schema '..."in.c-..."' does not exist or not
# authorized`, see keboola/duckdb-extension#17). The
# legacy Storage-API client doesn't go through
# QueryService at all, so queue for the parallel
# legacy fallback below.
logger.warning(
"Keboola extension scan failed for %s (%s); queued for legacy Storage-API fallback",
table_name, ext_err,
)
legacy_queue.append((tc, pq_path))
continue
else:
legacy_queue.append((tc, pq_path))
continue
# Extension path succeeded — register _meta synchronously.
_register_local_meta(conn, tc, pq_path, now)
stats["tables_extracted"] += 1
rows_log = conn.execute(
f"SELECT count(*) FROM read_parquet('{pq_path.replace(chr(39), chr(39)*2)}')"
).fetchone()[0]
logger.info("Extracted %s via extension: %d rows", table_name, rows_log)
except Exception as e:
logger.error("Failed to extract %s: %s", table_name, e)
stats["tables_failed"] += 1
stats["errors"].append({"table": table_name, "error": str(e)})
# Detach Keboola if extension was used
if use_extension:
try:
conn.execute("DETACH kbc")
except Exception:
pass
# Phase 2: legacy fallback in parallel. Keboola Storage API export
# jobs are independent per table — a worker pool of N workers fans
# out the per-table HTTP roundtrips (export job submit + poll +
# CSV download) instead of stacking them sequentially. Project-level
# concurrency is bounded by the storage.jobsParallelism limit
# (typically 10); default to 4 to leave headroom for other clients.
# Override via AGNES_KEBOOLA_PARALLELISM env var.
#
# Workers are PROCESSES, not threads — `connectors/keboola/client.py:
# export_table` does `os.chdir(temp_dir)` to redirect kbcstorage's
# slice-file downloads into a per-call temp directory, and `os.chdir`
# is process-global. With threads, two parallel exports race on CWD
# and slice files end up in the wrong directory; the merge step then
# fails with `[Errno 2] No such file or directory:
# '<job_id>.csv_X_Y_Z.csv'`. ProcessPoolExecutor gives each worker
# its own process and therefore its own CWD.
if legacy_queue:
parallelism = max(1, int(os.environ.get("AGNES_KEBOOLA_PARALLELISM", "8")))
workers = min(parallelism, len(legacy_queue))
logger.info(
"Running legacy Storage-API fallback for %d tables across %d worker processes",
len(legacy_queue), workers,
)
if workers == 1:
legacy_results = [_legacy_worker(item, keboola_url, keboola_token) for item in legacy_queue]
else:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=workers) as ex:
futures = [ex.submit(_legacy_worker, item, keboola_url, keboola_token) for item in legacy_queue]
legacy_results = [f.result() for f in futures]
# Phase 3: serial _meta insert for legacy results. DuckDB conn
# isn't thread-safe, so we collect parallel work and only touch
# `conn` (and `stats`) here on the main thread.
for tc_, pq_, err in legacy_results:
tn = tc_["name"]
if err is not None:
logger.error("Failed to extract %s via legacy: %s", tn, err)
stats["tables_failed"] += 1
stats["errors"].append({"table": tn, "error": err})
continue
try:
_register_local_meta(conn, tc_, pq_, now)
stats["tables_extracted"] += 1
rows_log = conn.execute(
f"SELECT count(*) FROM read_parquet('{pq_.replace(chr(39), chr(39)*2)}')"
).fetchone()[0]
logger.info("Extracted %s via legacy: %d rows", tn, rows_log)
except Exception as e:
logger.error("Failed to register _meta for %s: %s", tn, e)
stats["tables_failed"] += 1
stats["errors"].append({"table": tn, "error": str(e)})
finally:
conn.execute("CHECKPOINT")
conn.close()
# Atomic replace: swap temp DB into place, cleaning up any WAL files
import shutil
old_wal = Path(str(db_path) + ".wal")
if old_wal.exists():
old_wal.unlink()
if tmp_db_path.exists():
shutil.move(str(tmp_db_path), str(db_path))
tmp_wal = Path(str(tmp_db_path) + ".wal")
if tmp_wal.exists():
tmp_wal.unlink()
return stats
def _register_local_meta(
conn: duckdb.DuckDBPyConnection,
tc: Dict[str, Any],
pq_path: str,
extracted_at: datetime,
) -> None:
"""After a parquet has been written for a local-mode table, create the
DuckDB view and register the row in `_meta`. Hoisted out of the run()
body so both the serial extension-success path and the parallel
legacy-result path share one implementation."""
table_name = tc["name"]
safe_pq_lit = pq_path.replace("'", "''")
rows = conn.execute(f"SELECT count(*) FROM read_parquet('{safe_pq_lit}')").fetchone()[0]
size = os.path.getsize(pq_path)
conn.execute(
f'CREATE OR REPLACE VIEW "{table_name}" AS SELECT * FROM read_parquet(\'{safe_pq_lit}\')'
)
conn.execute(
"INSERT INTO _meta VALUES (?, ?, ?, ?, ?, 'local')",
[table_name, tc.get("description", ""), rows, size, extracted_at],
)
def _extract_via_extension(conn: duckdb.DuckDBPyConnection, tc: Dict[str, Any], pq_path: str) -> None:
"""Extract a table using the DuckDB Keboola extension."""
bucket = tc.get("bucket", "")
source_table = tc.get("source_table", tc["name"])
# #81 Group D — defense-in-depth. The caller already validates these;
# refuse here too in case a future caller forgets. Use the relaxed
# quoted-identifier check that accepts Keboola's `in.c-foo` form.
if not (is_safe_quoted_identifier(bucket) and is_safe_quoted_identifier(source_table)):
raise ValueError(f"unsafe bucket/source_table: {bucket!r}/{source_table!r}")
safe_pq_lit = pq_path.replace("'", "''")
conn.execute(f'COPY (SELECT * FROM kbc."{bucket}"."{source_table}") TO \'{safe_pq_lit}\' (FORMAT PARQUET)')
def _legacy_worker(tc_pq, keboola_url: str, keboola_token: str):
"""Module-level wrapper for ProcessPoolExecutor — must be picklable.
Returns `(tc, pq_path, error_str_or_None)` so the main process can
aggregate results and update _meta serially on its DuckDB connection.
"""
tc_, pq_ = tc_pq
try:
_extract_via_legacy(tc_, pq_, keboola_url, keboola_token)
return (tc_, pq_, None)
except Exception as exc:
return (tc_, pq_, str(exc))
def _extract_via_legacy(tc: Dict[str, Any], pq_path: str, keboola_url: str, keboola_token: str) -> None:
"""Per-table extract via the Storage API export-async path.
Despite the name (kept for caller compatibility with `_legacy_worker`),
this no longer goes through the `kbcstorage` SDK — it talks to the
Storage API directly via `connectors/keboola/storage_api.py`. The old
SDK path had a thread-unsafe `os.chdir(temp_dir)` that broke parallel
execution; the direct path uses per-call temp directories and signed-URL
downloads, so threads / processes don't trip on each other.
Same surface as before — `(tc, pq_path, url, token) → writes parquet at
pq_path` — so callers (including the parallel `_legacy_worker`) don't
need to change.
"""
import tempfile
from connectors.keboola.storage_api import KeboolaStorageClient, get_temp_root
bucket = tc.get("bucket", "")
source_table = tc.get("source_table", tc["name"])
table_id = f"{bucket}.{source_table}" if bucket else tc.get("id", tc["name"])
with tempfile.TemporaryDirectory(
prefix=f"kbc-export-{tc['name']}-",
dir=get_temp_root(),
ignore_cleanup_errors=True,
) as tmpdir:
csv_path = Path(tmpdir) / f"{tc['name']}.csv"
client = KeboolaStorageClient(url=keboola_url, token=keboola_token)
client.export_table_to_csv(table_id, csv_path)
if not csv_path.exists() or csv_path.stat().st_size == 0:
# Storage API succeeded but produced no rows. Emit an empty
# parquet rather than crashing — same defensive behavior as
# `materialize_query`.
duckdb.connect().execute(
f"COPY (SELECT 1 AS _empty WHERE FALSE) TO '{pq_path}' (FORMAT PARQUET)"
).close()
return
# all_varchar=true preserves the source's exact character data —
# matches what the kbcstorage path used to do, prevents DuckDB
# type inference from rewriting numeric-looking strings as NULL.
# max_line_size=64MB overrides DuckDB's 2MB default; matches the
# materialize_query path. See comment there for rationale.
safe_csv = str(csv_path).replace("'", "''")
safe_pq = pq_path.replace("'", "''")
conv = duckdb.connect()
try:
conv.execute(
f"COPY (SELECT * FROM read_csv('{safe_csv}', "
f"all_varchar=true, max_line_size=67108864)) "
f"TO '{safe_pq}' (FORMAT PARQUET)"
)
finally:
conv.close()
def compute_exit_code(stats: Dict[str, Any], total: int) -> int:
"""Map an extraction `stats` dict to a process exit code.
Issue #81 Group B: distinguish full success from partial failure so
the sync API and CLI consumers can alert on partial vs. full failure
rather than treating any non-zero as one bucket.
- ``0`` — every table succeeded (or no tables registered).
- ``1`` — every table failed (full failure).
- ``2`` — at least one succeeded and at least one failed (partial).
`total` is the count of tables the extractor was asked to process.
`stats["tables_failed"]` is the count it actually failed.
"""
failed = stats.get("tables_failed", 0)
if total == 0:
return 0
if failed == 0:
return 0
if failed >= total:
return 1
return 2
if __name__ == "__main__":
"""Standalone: reads config from env + table_registry, runs extraction.
Used by sync trigger subprocess. Reads KEBOOLA_STORAGE_TOKEN and
KEBOOLA_STACK_URL from environment, table list from DuckDB registry.
"""
from app.logging_config import setup_logging
setup_logging(__name__)
# Read Keboola credentials — env first, then instance.yaml fallback
url = os.environ.get("KEBOOLA_STACK_URL", "")
token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
if not url or not token:
try:
from config.loader import load_instance_config
config = load_instance_config()
kbc_config = config.get("keboola", {})
url = url or kbc_config.get("url", "")
token_env = kbc_config.get("token_env", "KEBOOLA_STORAGE_TOKEN")
token = token or os.environ.get(token_env, "")
except Exception:
pass
if not url or not token:
logger.error("Missing KEBOOLA_STACK_URL or KEBOOLA_STORAGE_TOKEN")
exit(1)
# Read table list from registry
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
sys_conn = get_system_db()
try:
repo = TableRegistryRepository(sys_conn)
tables = repo.list_by_source("keboola")
finally:
sys_conn.close()
if not tables:
logger.warning("No Keboola tables registered in table_registry")
exit(0)
logger.info("Extracting %d tables from %s", len(tables), url)
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
result = run(str(data_dir / "extracts" / "keboola"), tables, url, token)
logger.info("Extraction complete: %s", result)
code = compute_exit_code(result, len(tables))
if code == 2:
logger.error("Partial failure: %d of %d tables failed", result.get("tables_failed", 0), len(tables))
elif code == 1:
logger.error("All %d tables failed", len(tables))
exit(code)