agnes-the-ai-analyst/tests/test_keboola_partitioned.py
ZdenekSrotyr 506a378c3a
release: 0.47.1 — Keboola connector v27 (incremental, partitioned, where_filters, typed parquet) (#217)
## Summary

Brings the Keboola connector to feature parity with the legacy internal data-analyst's per-table sync strategies. Closes the four documented gaps from the spec branch (`zs/keboola-connector-specs`):

- **Typed parquet** in the legacy SDK extraction path — column types from Keboola Storage metadata (provider cascade `user > ai-metadata-enrichment > keboola.snowflake-transformation`) survive the CSV → parquet roundtrip; invalid date strings (`'0000-00-00'`) and invalid numeric strings (`'Non-Manager'`) become NULL while keeping the column's typed schema. Pre-fix everything was VARCHAR.
- **Incremental sync** via Storage API `changedSince` — opt-in per table; pulls only delta rows, merges into the existing parquet by `primary_key` (drop_duplicates with keep='last'). Cuts daily extraction from O(full table) to O(delta).
- **Partitioned sync** — flat per-partition layout `data/<table>/<key>.parquet` (e.g. `2026_05.parquet`), per-affected-partition merge for daily updates, chunked initial load with 1-day overlap and 2-empty-chunk stop heuristic.
- **`where_filters`** — server-side row filter with date placeholders (`{{today}}`, `{{last_3_months}}`, `{{start_of_3_months_ago}}`, etc.) resolved at sync time. Force the SDK path; reject `incremental + where_filters` combination at API layer (changedSince already filters temporally).

## Architecture

- **Schema migration v25 → v26**: 7 new columns on `table_registry`. Existing `sync_strategy` column reused (pre-v26 it was inert catalog metadata; post-v26 the extractor dispatches off it).
- **Per-table dispatcher** in `extractor.run()` routes to one of `_extract_via_extension` (full_refresh + extension), `_extract_via_legacy` (full_refresh + filters or extension fallback), `extract_incremental`, or `extract_partitioned`.
- **API conflict policy**: `incremental + where_filters` → 422; `partitioned + query_mode='remote'` → 422; `partitioned ⇒ partition_by required`.
- **Admin UI**: third "Direct extract (Storage API)" radio in the Keboola Register / Edit modals, alongside existing "Whole table (extension)" and "Custom SQL". When selected, exposes a v26 sync-strategy panel with conditional fields per strategy.

## Test plan

- [x] **Unit + module** — 134 v26 tests covering migration, repo, parquet_io, where_filters, incremental (compute_changed_since + merge_parquet + extract_incremental E2E), partitioned (key derivation + merge_partition + chunked windows + extract_partitioned E2E), extractor dispatcher, admin API validators, PUT field clearing, registry-shape → dispatcher bridge
- [x] **HTML form structure** — all v26 inputs + visibility classes + JS payload fields verified in rendered template
- [x] **Real Keboola roundtrip** — registered a small test table as `sync_strategy='incremental'` against a test Storage project, triggered two syncs:
  - Sync 1: `changedSince=None` → full pull → 9 rows typed parquet
  - Sync 2: `changedSince=last_sync - 1d window` → 9 delta rows merged with 9 existing → 9 after dedup on primary_key (PK merge confirmed)
- [x] **Browser UX** — agent-browser session against a local uvicorn: login → admin/tables → register modal → switch radios → verify field visibility per strategy → submit → edit existing row → switch to Direct/Incremental → save → confirm DB persistence
- [x] **Regression** — no regressions in the broader 3252-test suite (3 pre-v26 tests updated for the deprecation-marker removal + schema-version bump; 2 pre-existing environment-sensitive test failures unrelated to this change)

## Bugs caught + fixed during E2E

The browser + real-Keboola roundtrip exposed four bugs the unit tests missed:

1. **JS visibility race** — two competing `forEach` loops set `display=''` then `display='none'` on form elements sharing `kb-strategy-incremental kb-strategy-partitioned` classes (window_days + max_history_days are reused across strategies). Fix: single-pass selector with class-based visibility resolver.
2. **PUT cannot clear field** — pre-v26 `updates = {k: v ... if v is not None}` collapsed "omitted from body" and "sent as null" into the same case, so admin couldn't switch a partitioned row back to full_refresh and have stale `partition_by` clear. Fix: `model_dump(exclude_unset=True)`.
3. **Subprocess DB lock conflict** — `_read_last_sync` reopened `system.duckdb` while the parent server held the write lock (subprocess contract at `app/api/sync.py:_run_sync` line 260). Fix: parent injects `__last_sync__` into table_config before subprocess spawn.
4. **Wrong KBC table_id** — `extract_incremental` / `extract_partitioned` built the Storage API table_id from the registry row's slugified `id` (`circle_inc`) instead of `bucket.source_table` (`in.c-finance.circle`), producing 404s. Fix: prefer `bucket+source_table`; fall back to `id` only when bucket empty.

## Operator notes

- Existing tables stay on `full_refresh` after migration; admins opt individual tables in via `agnes admin register-table --sync-strategy ...`, the Keboola Edit modal, or `POST/PUT /api/admin/registry`.
- `merge_parquet` and `merge_partition` use `pd.concat + drop_duplicates`, loading both existing and delta into pandas RAM. For tables in the multi-million-row range this may OOM — switch to `partitioned` strategy for those (per-partition merge keeps memory bounded). Documented in `### Internal` of the changelog entry.
- Date placeholders are resolved at **sync time**, not register time — a typo'd `{{lasst_week}}` is accepted at register and surfaces only when the next sync runs. By design (rolling windows need late-binding).

## Spec source

The four corresponding plans on the `zs/keboola-connector-specs` branch under `docs/superpowers/plans/2026-05-07-0[1-4]-*.md` capture the design rationale and link back to internal repo references for each subsystem.
<!-- devin-review-badge-begin -->

---

<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/217" target="_blank">
  <picture>
    <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
    <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
  </picture>
</a>
<!-- devin-review-badge-end -->
2026-05-07 19:01:27 +02:00

250 lines
8.7 KiB
Python

"""Unit tests for partitioned sync helpers."""
from datetime import date, datetime, timezone
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
# ───────────────────────────── partition_key_for ──────────────────────────────
def test_partition_key_day():
from connectors.keboola.partitioned import partition_key_for
assert partition_key_for(date(2026, 5, 7), "day") == "2026_05_07"
def test_partition_key_month():
from connectors.keboola.partitioned import partition_key_for
assert partition_key_for(date(2026, 5, 7), "month") == "2026_05"
def test_partition_key_year():
from connectors.keboola.partitioned import partition_key_for
assert partition_key_for(date(2026, 5, 7), "year") == "2026"
def test_partition_key_accepts_datetime():
from connectors.keboola.partitioned import partition_key_for
assert partition_key_for(datetime(2026, 5, 7, 12, 30), "day") == "2026_05_07"
def test_partition_key_accepts_pandas_timestamp():
from connectors.keboola.partitioned import partition_key_for
assert partition_key_for(pd.Timestamp("2026-05-07"), "month") == "2026_05"
def test_invalid_granularity_raises():
from connectors.keboola.partitioned import partition_key_for, InvalidPartitionConfigError
with pytest.raises(InvalidPartitionConfigError, match="granularity"):
partition_key_for(date(2026, 5, 7), "hour")
# ───────────────────────────── merge_partition ────────────────────────────────
def test_merge_partition_inserts_new_rows(tmp_path):
from connectors.keboola.partitioned import merge_partition
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("date", pa.date32()),
pa.field("v", pa.int64()),
])
pq_path = tmp_path / "2026_05.parquet"
pq.write_table(
pa.Table.from_pylist([
{"id": 1, "date": date(2026, 5, 1), "v": 10},
], schema=schema),
pq_path, compression="snappy",
)
delta_df = pd.DataFrame([{"id": 2, "date": "2026-05-15", "v": 20}])
merge_partition(
partition_path=pq_path,
delta_df=delta_df,
primary_key=["id"],
pyarrow_schema=schema,
date_columns=["date"],
)
rows = sorted(pq.read_table(pq_path).to_pylist(), key=lambda r: r["id"])
assert len(rows) == 2
assert rows[1]["v"] == 20
def test_merge_partition_replaces_by_pk(tmp_path):
from connectors.keboola.partitioned import merge_partition
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("date", pa.date32()),
pa.field("v", pa.int64()),
])
pq_path = tmp_path / "2026_05.parquet"
pq.write_table(
pa.Table.from_pylist([
{"id": 1, "date": date(2026, 5, 1), "v": 10},
], schema=schema),
pq_path, compression="snappy",
)
delta_df = pd.DataFrame([{"id": 1, "date": "2026-05-01", "v": 999}])
merge_partition(
partition_path=pq_path, delta_df=delta_df,
primary_key=["id"], pyarrow_schema=schema, date_columns=["date"],
)
rows = pq.read_table(pq_path).to_pylist()
assert len(rows) == 1
assert rows[0]["v"] == 999
def test_merge_partition_creates_new_file_when_missing(tmp_path):
from connectors.keboola.partitioned import merge_partition
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("date", pa.date32()),
])
pq_path = tmp_path / "2026_06.parquet"
assert not pq_path.exists()
delta_df = pd.DataFrame([{"id": 1, "date": "2026-06-01"}])
merge_partition(
partition_path=pq_path, delta_df=delta_df,
primary_key=["id"], pyarrow_schema=schema, date_columns=["date"],
)
assert pq_path.exists()
assert pq.read_table(pq_path).num_rows == 1
def test_merge_partition_atomic_on_failure(tmp_path, monkeypatch):
from connectors.keboola.partitioned import merge_partition
schema = pa.schema([pa.field("id", pa.int64())])
pq_path = tmp_path / "2026_05.parquet"
pq.write_table(
pa.Table.from_pylist([{"id": 1}], schema=schema), pq_path, compression="snappy"
)
original = pq_path.read_bytes()
def boom(*a, **kw):
raise RuntimeError("disk full")
monkeypatch.setattr("pyarrow.parquet.write_table", boom)
delta_df = pd.DataFrame([{"id": 2}])
with pytest.raises(RuntimeError):
merge_partition(
partition_path=pq_path, delta_df=delta_df,
primary_key=["id"], pyarrow_schema=schema, date_columns=[],
)
assert pq_path.read_bytes() == original
# ───────────────────────────── process_csv_to_partitions ──────────────────────
def test_process_csv_to_partitions_groups_by_month(tmp_path):
from connectors.keboola.partitioned import process_csv_to_partitions
csv_path = tmp_path / "delta.csv"
csv_path.write_text(
"id,date\n"
"1,2026-05-01\n2,2026-05-15\n3,2026-06-02\n4,2026-06-20\n"
)
groups = process_csv_to_partitions(
csv_path=csv_path, partition_by="date",
granularity="month", dtypes={"id": "Int64"},
)
assert set(groups.keys()) == {"2026_05", "2026_06"}
assert len(groups["2026_05"]) == 2
assert len(groups["2026_06"]) == 2
def test_process_csv_to_partitions_groups_by_day(tmp_path):
from connectors.keboola.partitioned import process_csv_to_partitions
csv_path = tmp_path / "delta.csv"
csv_path.write_text("id,date\n1,2026-05-01\n2,2026-05-01\n3,2026-05-02\n")
groups = process_csv_to_partitions(
csv_path=csv_path, partition_by="date",
granularity="day", dtypes={"id": "Int64"},
)
assert set(groups.keys()) == {"2026_05_01", "2026_05_02"}
assert len(groups["2026_05_01"]) == 2
def test_process_csv_to_partitions_skips_unparseable(tmp_path, caplog):
from connectors.keboola.partitioned import process_csv_to_partitions
csv_path = tmp_path / "delta.csv"
csv_path.write_text("id,date\n1,2026-05-01\n2,not-a-date\n3,0000-00-00\n")
import logging
with caplog.at_level(logging.WARNING):
groups = process_csv_to_partitions(
csv_path=csv_path, partition_by="date",
granularity="month", dtypes={"id": "Int64"},
)
assert set(groups.keys()) == {"2026_05"}
assert "2 rows with unparseable" in caplog.text
def test_process_csv_to_partitions_empty_csv(tmp_path):
from connectors.keboola.partitioned import process_csv_to_partitions
csv_path = tmp_path / "empty.csv"
csv_path.write_text("id,date\n")
groups = process_csv_to_partitions(
csv_path=csv_path, partition_by="date",
granularity="month", dtypes={},
)
assert groups == {}
def test_process_csv_to_partitions_missing_partition_column_raises(tmp_path):
from connectors.keboola.partitioned import process_csv_to_partitions, InvalidPartitionConfigError
csv_path = tmp_path / "delta.csv"
csv_path.write_text("id\n1\n")
with pytest.raises(InvalidPartitionConfigError, match="partition_by column"):
process_csv_to_partitions(
csv_path=csv_path, partition_by="date",
granularity="month", dtypes={},
)
# ───────────────────────────── compute_chunk_windows ──────────────────────────
def test_compute_chunk_windows_with_max_history():
from connectors.keboola.partitioned import compute_chunk_windows
now = datetime(2026, 5, 7, tzinfo=timezone.utc)
windows = compute_chunk_windows(
now=now, chunk_days=30, max_history_days=90, overlap_days=1,
)
# 90 / 30 = 3 chunks, walking backwards from now
assert len(windows) == 3
sinces = [w[0] for w in windows]
assert sinces == sorted(sinces, reverse=True)
def test_compute_chunk_windows_unbounded_caps_at_safety():
from connectors.keboola.partitioned import (
compute_chunk_windows, INITIAL_LOAD_MAX_CHUNKS_SAFETY,
)
now = datetime(2026, 5, 7, tzinfo=timezone.utc)
windows = compute_chunk_windows(
now=now, chunk_days=30, max_history_days=None, overlap_days=1,
)
assert len(windows) == INITIAL_LOAD_MAX_CHUNKS_SAFETY
def test_compute_chunk_windows_zero_history_returns_empty():
from connectors.keboola.partitioned import compute_chunk_windows
now = datetime(2026, 5, 7, tzinfo=timezone.utc)
windows = compute_chunk_windows(
now=now, chunk_days=30, max_history_days=0, overlap_days=1,
)
assert windows == []