agnes-the-ai-analyst/connectors/keboola/partitioned.py
ZdenekSrotyr 506a378c3a
release: 0.47.1 — Keboola connector v27 (incremental, partitioned, where_filters, typed parquet) (#217)
## Summary

Brings the Keboola connector to feature parity with the legacy internal data-analyst's per-table sync strategies. Closes the four documented gaps from the spec branch (`zs/keboola-connector-specs`):

- **Typed parquet** in the legacy SDK extraction path — column types from Keboola Storage metadata (provider cascade `user > ai-metadata-enrichment > keboola.snowflake-transformation`) survive the CSV → parquet roundtrip; invalid date strings (`'0000-00-00'`) and invalid numeric strings (`'Non-Manager'`) become NULL while keeping the column's typed schema. Pre-fix everything was VARCHAR.
- **Incremental sync** via Storage API `changedSince` — opt-in per table; pulls only delta rows, merges into the existing parquet by `primary_key` (drop_duplicates with keep='last'). Cuts daily extraction from O(full table) to O(delta).
- **Partitioned sync** — flat per-partition layout `data/<table>/<key>.parquet` (e.g. `2026_05.parquet`), per-affected-partition merge for daily updates, chunked initial load with 1-day overlap and 2-empty-chunk stop heuristic.
- **`where_filters`** — server-side row filter with date placeholders (`{{today}}`, `{{last_3_months}}`, `{{start_of_3_months_ago}}`, etc.) resolved at sync time. Force the SDK path; reject `incremental + where_filters` combination at API layer (changedSince already filters temporally).

## Architecture

- **Schema migration v25 → v26**: 7 new columns on `table_registry`. Existing `sync_strategy` column reused (pre-v26 it was inert catalog metadata; post-v26 the extractor dispatches off it).
- **Per-table dispatcher** in `extractor.run()` routes to one of `_extract_via_extension` (full_refresh + extension), `_extract_via_legacy` (full_refresh + filters or extension fallback), `extract_incremental`, or `extract_partitioned`.
- **API conflict policy**: `incremental + where_filters` → 422; `partitioned + query_mode='remote'` → 422; `partitioned ⇒ partition_by required`.
- **Admin UI**: third "Direct extract (Storage API)" radio in the Keboola Register / Edit modals, alongside existing "Whole table (extension)" and "Custom SQL". When selected, exposes a v26 sync-strategy panel with conditional fields per strategy.

## Test plan

- [x] **Unit + module** — 134 v26 tests covering migration, repo, parquet_io, where_filters, incremental (compute_changed_since + merge_parquet + extract_incremental E2E), partitioned (key derivation + merge_partition + chunked windows + extract_partitioned E2E), extractor dispatcher, admin API validators, PUT field clearing, registry-shape → dispatcher bridge
- [x] **HTML form structure** — all v26 inputs + visibility classes + JS payload fields verified in rendered template
- [x] **Real Keboola roundtrip** — registered a small test table as `sync_strategy='incremental'` against a test Storage project, triggered two syncs:
  - Sync 1: `changedSince=None` → full pull → 9 rows typed parquet
  - Sync 2: `changedSince=last_sync - 1d window` → 9 delta rows merged with 9 existing → 9 after dedup on primary_key (PK merge confirmed)
- [x] **Browser UX** — agent-browser session against a local uvicorn: login → admin/tables → register modal → switch radios → verify field visibility per strategy → submit → edit existing row → switch to Direct/Incremental → save → confirm DB persistence
- [x] **Regression** — no regressions in the broader 3252-test suite (3 pre-v26 tests updated for the deprecation-marker removal + schema-version bump; 2 pre-existing environment-sensitive test failures unrelated to this change)

## Bugs caught + fixed during E2E

The browser + real-Keboola roundtrip exposed four bugs the unit tests missed:

1. **JS visibility race** — two competing `forEach` loops set `display=''` then `display='none'` on form elements sharing `kb-strategy-incremental kb-strategy-partitioned` classes (window_days + max_history_days are reused across strategies). Fix: single-pass selector with class-based visibility resolver.
2. **PUT cannot clear field** — pre-v26 `updates = {k: v ... if v is not None}` collapsed "omitted from body" and "sent as null" into the same case, so admin couldn't switch a partitioned row back to full_refresh and have stale `partition_by` clear. Fix: `model_dump(exclude_unset=True)`.
3. **Subprocess DB lock conflict** — `_read_last_sync` reopened `system.duckdb` while the parent server held the write lock (subprocess contract at `app/api/sync.py:_run_sync` line 260). Fix: parent injects `__last_sync__` into table_config before subprocess spawn.
4. **Wrong KBC table_id** — `extract_incremental` / `extract_partitioned` built the Storage API table_id from the registry row's slugified `id` (`circle_inc`) instead of `bucket.source_table` (`in.c-finance.circle`), producing 404s. Fix: prefer `bucket+source_table`; fall back to `id` only when bucket empty.

## Operator notes

- Existing tables stay on `full_refresh` after migration; admins opt individual tables in via `agnes admin register-table --sync-strategy ...`, the Keboola Edit modal, or `POST/PUT /api/admin/registry`.
- `merge_parquet` and `merge_partition` use `pd.concat + drop_duplicates`, loading both existing and delta into pandas RAM. For tables in the multi-million-row range this may OOM — switch to `partitioned` strategy for those (per-partition merge keeps memory bounded). Documented in `### Internal` of the changelog entry.
- Date placeholders are resolved at **sync time**, not register time — a typo'd `{{lasst_week}}` is accepted at register and surfaces only when the next sync runs. By design (rolling windows need late-binding).

## Spec source

The four corresponding plans on the `zs/keboola-connector-specs` branch under `docs/superpowers/plans/2026-05-07-0[1-4]-*.md` capture the design rationale and link back to internal repo references for each subsystem.
<!-- devin-review-badge-begin -->

---

<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/217" target="_blank">
  <picture>
    <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
    <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
  </picture>
</a>
<!-- devin-review-badge-end -->
2026-05-07 19:01:27 +02:00

489 lines
18 KiB
Python

"""Keboola partitioned sync — per-partition parquet files with per-partition merge.
Mirrors internal repo's `src/data_sync.py:529-1103` (partitioned sync
strategies) and `src/parquet_manager.py:425-600` (merge logic). Output
layout matches the legacy repo: flat directory of single files keyed by
partition string, e.g. `data/sales/2025_11.parquet`. Hive-style
(`year=2025/month=11/`) was rejected as overkill for the partition
counts we see (≤ 365 partitions/day-grain, ≤ 12 partitions/month-grain
per year).
The orchestrator exposes the directory as a single DuckDB view via
`read_parquet('<table>/*.parquet')` — to the analyst, a partitioned
table reads identically to a single-file table.
"""
from __future__ import annotations
import logging
import os
import tempfile
from datetime import date, datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from connectors.keboola.parquet_io import (
apply_schema_to_table,
convert_date_columns_to_date32,
csv_to_parquet,
_convert_column,
)
from connectors.keboola.incremental import compute_changed_since
logger = logging.getLogger(__name__)
SUPPORTED_GRANULARITIES = frozenset({"day", "month", "year"})
DEFAULT_GRANULARITY = "month"
DEFAULT_INITIAL_LOAD_CHUNK_DAYS = 30
INITIAL_LOAD_OVERLAP_DAYS = 1
INITIAL_LOAD_MAX_CHUNKS_SAFETY = 120 # ~10 years at 30-day chunks
INITIAL_LOAD_EMPTY_CHUNKS_TO_STOP = 2
class InvalidPartitionConfigError(ValueError):
"""Partition config (column, granularity) is malformed."""
# ───────────────────────────── partition keying ───────────────────────────────
def partition_key_for(value: Any, granularity: str) -> str:
"""Return the partition-key string for a single date/timestamp value.
Accepts `datetime.date`, `datetime.datetime`, and `pandas.Timestamp`.
"""
if granularity not in SUPPORTED_GRANULARITIES:
raise InvalidPartitionConfigError(
f"granularity must be one of {sorted(SUPPORTED_GRANULARITIES)}, got {granularity!r}"
)
if isinstance(value, pd.Timestamp):
d = value.to_pydatetime()
elif isinstance(value, datetime):
d = value
elif isinstance(value, date):
d = value
else:
raise InvalidPartitionConfigError(f"value must be date/datetime, got {type(value).__name__}")
if granularity == "day":
return d.strftime("%Y_%m_%d")
if granularity == "month":
return d.strftime("%Y_%m")
return d.strftime("%Y")
# ───────────────────────────── per-partition merge ────────────────────────────
def merge_partition(
*,
partition_path: Path,
delta_df: pd.DataFrame,
primary_key: List[str],
pyarrow_schema: Optional[pa.Schema],
date_columns: List[str],
) -> Dict[str, int]:
"""Merge a delta DataFrame into a single partition parquet.
If the partition file does not exist yet, this is the first batch for
that partition — write it directly. Otherwise read existing → concat →
drop_duplicates(PK, keep='last') → atomic write.
Caller has already type-converted `delta_df` (pandas dtypes from
Keboola metadata via `process_csv_to_partitions`); `apply_schema_to_table`
runs at write time to enforce the explicit PyArrow schema.
"""
partition_path = Path(partition_path)
partition_path.parent.mkdir(parents=True, exist_ok=True)
# Pre-normalize date columns in delta_df to pandas datetime so the concat
# with existing_df (which carries datetime64 from the typed parquet) doesn't
# produce a mixed object/str column that pa.Table.from_pandas can't reduce
# back to date32.
delta_df = delta_df.copy()
for col in date_columns or []:
if col in delta_df.columns:
delta_df[col] = pd.to_datetime(delta_df[col], errors="coerce")
if partition_path.exists():
existing_df = pq.read_table(partition_path).to_pandas()
combined = pd.concat([existing_df, delta_df], ignore_index=True)
if primary_key:
# Mirror the dedup-safety coercion in incremental.merge_parquet —
# Devin Review finding 0004. When a PK column is object-dtype
# post-concat (existing typed + delta string from a failed
# _convert_column), drop_duplicates compares int `1` and string
# `'1'` as unequal. Coerce to string for the comparison so the
# dedup is deterministic; pa.Table.from_pandas + apply_schema_to_table
# restore the canonical type after.
for pk in primary_key:
if pk in combined.columns and combined[pk].dtype == object:
combined[pk] = combined[pk].astype(str)
combined = combined.drop_duplicates(subset=primary_key, keep="last")
else:
combined = delta_df
table = pa.Table.from_pandas(combined, preserve_index=False)
if date_columns:
table = convert_date_columns_to_date32(table, date_columns)
if pyarrow_schema is not None:
table = apply_schema_to_table(table, pyarrow_schema)
tmp_path = partition_path.with_suffix(partition_path.suffix + ".tmp")
if tmp_path.exists():
tmp_path.unlink()
try:
pq.write_table(table, tmp_path, compression="snappy")
os.replace(tmp_path, partition_path)
finally:
if tmp_path.exists():
tmp_path.unlink()
return {"rows": len(combined), "delta_rows": len(delta_df)}
# ───────────────────────────── CSV → partition groups ─────────────────────────
def process_csv_to_partitions(
*,
csv_path: Path,
partition_by: str,
granularity: str,
dtypes: Dict[str, str],
) -> Dict[str, pd.DataFrame]:
"""Split a delta CSV into a `{partition_key: DataFrame}` map.
Loads with `dtype=str`, applies dtypes via `_convert_column`, parses
`partition_by` column as date, computes `partition_key_for` per row,
groups. Rows with unparseable partition values are dropped with a
warning (matches legacy: don't drop the whole sync over a few bad
rows; admin sees them in the log).
"""
if granularity not in SUPPORTED_GRANULARITIES:
raise InvalidPartitionConfigError(
f"granularity must be one of {sorted(SUPPORTED_GRANULARITIES)}"
)
df = pd.read_csv(csv_path, dtype=str)
if df.empty:
return {}
if partition_by not in df.columns:
raise InvalidPartitionConfigError(
f"partition_by column {partition_by!r} not present in CSV "
f"(columns: {list(df.columns)})"
)
if dtypes:
for col, dtype in dtypes.items():
if col not in df.columns or "datetime" in dtype:
continue
try:
df[col] = _convert_column(df[col], dtype, col_name=col)
except Exception as e:
logger.warning("partition: failed to apply dtype %s to %r: %s", dtype, col, e)
parsed = pd.to_datetime(df[partition_by], errors="coerce")
invalid_count = int(parsed.isna().sum())
if invalid_count > 0:
logger.warning(
"partition: %d rows with unparseable %r values dropped from this delta",
invalid_count, partition_by,
)
df = df.assign(_partition_dt=parsed)
df = df.dropna(subset=["_partition_dt"])
if df.empty:
return {}
df["_partition_key"] = df["_partition_dt"].apply(
lambda v: partition_key_for(v, granularity)
)
df = df.drop(columns=["_partition_dt"])
groups: Dict[str, pd.DataFrame] = {}
for key, group_df in df.groupby("_partition_key", sort=False):
groups[str(key)] = group_df.drop(columns=["_partition_key"]).reset_index(drop=True)
return groups
# ───────────────────────────── chunked initial load windows ───────────────────
def compute_chunk_windows(
*,
now: datetime,
chunk_days: int,
max_history_days: Optional[int],
overlap_days: int = INITIAL_LOAD_OVERLAP_DAYS,
) -> List[Tuple[str, str]]:
"""Compute the list of (changed_since, changed_until) ISO pairs for a
chunked initial load.
Walks history backwards: chunk[0] = (now-chunk_days, now), chunk[1] =
(now-2*chunk_days-overlap, now-chunk_days+overlap), etc. The
`+overlap` on the upper boundary deliberately re-fetches one day from
the previous chunk so boundary rows aren't lost; the caller dedupes
after.
When `max_history_days` is None, returns up to
`INITIAL_LOAD_MAX_CHUNKS_SAFETY` chunks; the caller stops earlier if
two consecutive chunks return zero rows.
"""
if max_history_days is not None:
if max_history_days <= 0:
return []
num_chunks = (max_history_days + chunk_days - 1) // chunk_days
else:
num_chunks = INITIAL_LOAD_MAX_CHUNKS_SAFETY
windows: List[Tuple[str, str]] = []
for i in range(num_chunks):
chunk_end_offset = i * chunk_days
chunk_start_offset = chunk_end_offset + chunk_days + overlap_days
if i == 0:
chunk_until = now
else:
chunk_until = now - timedelta(days=chunk_end_offset - overlap_days)
chunk_since = now - timedelta(days=chunk_start_offset)
windows.append((chunk_since.isoformat(), chunk_until.isoformat()))
return windows
# ───────────────────────────── orchestrator ───────────────────────────────────
def extract_partitioned(
*,
table_config: Dict[str, Any],
output_dir: Path,
last_sync: Optional[datetime],
keboola_url: str,
keboola_token: str,
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Extract one Keboola table into per-partition parquet files.
`output_dir` is `data/<table>/`. The function writes
`<output_dir>/<partition_key>.parquet` files; the orchestrator then
creates a DuckDB view via `read_parquet('<output_dir>/*.parquet')`.
Branches:
- First sync (last_sync=None): chunked initial load via
`compute_chunk_windows`, walking history backwards. Stops after 2
consecutive empty chunks or `max_history_days` reached.
- Subsequent sync: single delta pull with `changedSince`, group by
partition key, per-affected-partition merge.
"""
from connectors.keboola.client import KeboolaClient
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
granularity = table_config.get("partition_granularity") or DEFAULT_GRANULARITY
if granularity not in SUPPORTED_GRANULARITIES:
raise InvalidPartitionConfigError(
f"partition_granularity must be one of {sorted(SUPPORTED_GRANULARITIES)}, got {granularity!r}"
)
partition_by = table_config.get("partition_by")
if not partition_by:
raise InvalidPartitionConfigError("partition_by must be set for partitioned strategy")
# Mirror extract_incremental's table_id resolution — bucket + source_table
# is the Keboola Storage API reference, NOT the agnes registry id (which
# is the slugified view name).
bucket = table_config.get("bucket", "")
source_table = table_config.get("source_table") or table_config.get("name")
if bucket and source_table:
table_id = f"{bucket}.{source_table}"
else:
table_id = table_config.get("id") or table_config.get("name")
primary_key = table_config.get("primary_key") or []
if isinstance(primary_key, str):
primary_key = [primary_key]
now = now or datetime.now(timezone.utc)
client = KeboolaClient(token=keboola_token, url=keboola_url)
try:
pyarrow_schema = client.get_pyarrow_schema(table_id)
except Exception as e:
logger.warning("Schema unavailable for %s: %s", table_id, e)
pyarrow_schema = None
try:
dtypes = client.get_pandas_dtypes(table_id) if pyarrow_schema else {}
except Exception:
dtypes = {}
try:
date_columns = client.get_date_columns(table_id) if pyarrow_schema else []
except Exception:
date_columns = []
if last_sync is None:
return _first_sync_chunked(
client=client,
table_id=table_id,
output_dir=output_dir,
partition_by=partition_by,
granularity=granularity,
primary_key=primary_key,
dtypes=dtypes,
date_columns=date_columns,
pyarrow_schema=pyarrow_schema,
chunk_days=table_config.get("initial_load_chunk_days") or DEFAULT_INITIAL_LOAD_CHUNK_DAYS,
max_history_days=table_config.get("max_history_days"),
now=now,
)
return _incremental_sync(
client=client,
table_id=table_id,
output_dir=output_dir,
partition_by=partition_by,
granularity=granularity,
primary_key=primary_key,
dtypes=dtypes,
date_columns=date_columns,
pyarrow_schema=pyarrow_schema,
last_sync=last_sync,
window_days=table_config.get("incremental_window_days"),
max_history_days=table_config.get("max_history_days"),
now=now,
)
def _first_sync_chunked(
*,
client,
table_id: str,
output_dir: Path,
partition_by: str,
granularity: str,
primary_key: List[str],
dtypes: Dict[str, str],
date_columns: List[str],
pyarrow_schema: Optional[pa.Schema],
chunk_days: int,
max_history_days: Optional[int],
now: datetime,
) -> Dict[str, Any]:
windows = compute_chunk_windows(
now=now, chunk_days=chunk_days, max_history_days=max_history_days,
)
total_rows = 0
consecutive_empty = 0
chunks_run = 0
for since, until in windows:
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
csv_path = Path(tmp.name)
try:
export_info = client.export_table(
table_id, csv_path, changed_since=since, changed_until=until,
)
chunks_run += 1
rows_in_chunk = export_info.get("exported_rows", 0)
if rows_in_chunk == 0:
consecutive_empty += 1
if max_history_days is None and consecutive_empty >= INITIAL_LOAD_EMPTY_CHUNKS_TO_STOP:
logger.info(
"Initial load: %d consecutive empty chunks, stopping (table_id=%s)",
consecutive_empty, table_id,
)
break
continue
consecutive_empty = 0
groups = process_csv_to_partitions(
csv_path=csv_path, partition_by=partition_by,
granularity=granularity, dtypes=dtypes,
)
for partition_key, group_df in groups.items():
merge_partition(
partition_path=output_dir / f"{partition_key}.parquet",
delta_df=group_df, primary_key=primary_key,
pyarrow_schema=pyarrow_schema, date_columns=date_columns,
)
total_rows += len(group_df)
finally:
if csv_path.exists():
csv_path.unlink()
return {
"rows": _count_total_rows(output_dir),
"delta_rows": total_rows,
"chunks_run": chunks_run,
"partitions_written": len(list(output_dir.glob("*.parquet"))),
"changed_since_used": None,
}
def _incremental_sync(
*,
client,
table_id: str,
output_dir: Path,
partition_by: str,
granularity: str,
primary_key: List[str],
dtypes: Dict[str, str],
date_columns: List[str],
pyarrow_schema: Optional[pa.Schema],
last_sync: datetime,
window_days: Optional[int],
max_history_days: Optional[int],
now: datetime,
) -> Dict[str, Any]:
changed_since = compute_changed_since(
last_sync=last_sync, window_days=window_days,
max_history_days=max_history_days, now=now,
)
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
csv_path = Path(tmp.name)
try:
export_info = client.export_table(
table_id, csv_path, changed_since=changed_since,
)
delta_rows = export_info.get("exported_rows", 0)
if delta_rows == 0:
return {
"rows": _count_total_rows(output_dir),
"delta_rows": 0,
"partitions_touched": 0,
"changed_since_used": changed_since,
}
groups = process_csv_to_partitions(
csv_path=csv_path, partition_by=partition_by,
granularity=granularity, dtypes=dtypes,
)
for partition_key, group_df in groups.items():
merge_partition(
partition_path=output_dir / f"{partition_key}.parquet",
delta_df=group_df, primary_key=primary_key,
pyarrow_schema=pyarrow_schema, date_columns=date_columns,
)
return {
"rows": _count_total_rows(output_dir),
"delta_rows": delta_rows,
"partitions_touched": len(groups),
"changed_since_used": changed_since,
}
finally:
if csv_path.exists():
csv_path.unlink()
def _count_total_rows(output_dir: Path) -> int:
total = 0
for p in output_dir.glob("*.parquet"):
try:
total += pq.read_metadata(p).num_rows
except Exception:
continue
return total