agnes-the-ai-analyst/tests/test_keboola_parquet_io.py
ZdenekSrotyr 506a378c3a
release: 0.47.1 — Keboola connector v27 (incremental, partitioned, where_filters, typed parquet) (#217)
## Summary

Brings the Keboola connector to feature parity with the legacy internal data-analyst's per-table sync strategies. Closes the four documented gaps from the spec branch (`zs/keboola-connector-specs`):

- **Typed parquet** in the legacy SDK extraction path — column types from Keboola Storage metadata (provider cascade `user > ai-metadata-enrichment > keboola.snowflake-transformation`) survive the CSV → parquet roundtrip; invalid date strings (`'0000-00-00'`) and invalid numeric strings (`'Non-Manager'`) become NULL while keeping the column's typed schema. Pre-fix everything was VARCHAR.
- **Incremental sync** via Storage API `changedSince` — opt-in per table; pulls only delta rows, merges into the existing parquet by `primary_key` (drop_duplicates with keep='last'). Cuts daily extraction from O(full table) to O(delta).
- **Partitioned sync** — flat per-partition layout `data/<table>/<key>.parquet` (e.g. `2026_05.parquet`), per-affected-partition merge for daily updates, chunked initial load with 1-day overlap and 2-empty-chunk stop heuristic.
- **`where_filters`** — server-side row filter with date placeholders (`{{today}}`, `{{last_3_months}}`, `{{start_of_3_months_ago}}`, etc.) resolved at sync time. Force the SDK path; reject `incremental + where_filters` combination at API layer (changedSince already filters temporally).

## Architecture

- **Schema migration v25 → v26**: 7 new columns on `table_registry`. Existing `sync_strategy` column reused (pre-v26 it was inert catalog metadata; post-v26 the extractor dispatches off it).
- **Per-table dispatcher** in `extractor.run()` routes to one of `_extract_via_extension` (full_refresh + extension), `_extract_via_legacy` (full_refresh + filters or extension fallback), `extract_incremental`, or `extract_partitioned`.
- **API conflict policy**: `incremental + where_filters` → 422; `partitioned + query_mode='remote'` → 422; `partitioned ⇒ partition_by required`.
- **Admin UI**: third "Direct extract (Storage API)" radio in the Keboola Register / Edit modals, alongside existing "Whole table (extension)" and "Custom SQL". When selected, exposes a v26 sync-strategy panel with conditional fields per strategy.

## Test plan

- [x] **Unit + module** — 134 v26 tests covering migration, repo, parquet_io, where_filters, incremental (compute_changed_since + merge_parquet + extract_incremental E2E), partitioned (key derivation + merge_partition + chunked windows + extract_partitioned E2E), extractor dispatcher, admin API validators, PUT field clearing, registry-shape → dispatcher bridge
- [x] **HTML form structure** — all v26 inputs + visibility classes + JS payload fields verified in rendered template
- [x] **Real Keboola roundtrip** — registered a small test table as `sync_strategy='incremental'` against a test Storage project, triggered two syncs:
  - Sync 1: `changedSince=None` → full pull → 9 rows typed parquet
  - Sync 2: `changedSince=last_sync - 1d window` → 9 delta rows merged with 9 existing → 9 after dedup on primary_key (PK merge confirmed)
- [x] **Browser UX** — agent-browser session against a local uvicorn: login → admin/tables → register modal → switch radios → verify field visibility per strategy → submit → edit existing row → switch to Direct/Incremental → save → confirm DB persistence
- [x] **Regression** — no regressions in the broader 3252-test suite (3 pre-v26 tests updated for the deprecation-marker removal + schema-version bump; 2 pre-existing environment-sensitive test failures unrelated to this change)

## Bugs caught + fixed during E2E

The browser + real-Keboola roundtrip exposed four bugs the unit tests missed:

1. **JS visibility race** — two competing `forEach` loops set `display=''` then `display='none'` on form elements sharing `kb-strategy-incremental kb-strategy-partitioned` classes (window_days + max_history_days are reused across strategies). Fix: single-pass selector with class-based visibility resolver.
2. **PUT cannot clear field** — pre-v26 `updates = {k: v ... if v is not None}` collapsed "omitted from body" and "sent as null" into the same case, so admin couldn't switch a partitioned row back to full_refresh and have stale `partition_by` clear. Fix: `model_dump(exclude_unset=True)`.
3. **Subprocess DB lock conflict** — `_read_last_sync` reopened `system.duckdb` while the parent server held the write lock (subprocess contract at `app/api/sync.py:_run_sync` line 260). Fix: parent injects `__last_sync__` into table_config before subprocess spawn.
4. **Wrong KBC table_id** — `extract_incremental` / `extract_partitioned` built the Storage API table_id from the registry row's slugified `id` (`circle_inc`) instead of `bucket.source_table` (`in.c-finance.circle`), producing 404s. Fix: prefer `bucket+source_table`; fall back to `id` only when bucket empty.

## Operator notes

- Existing tables stay on `full_refresh` after migration; admins opt individual tables in via `agnes admin register-table --sync-strategy ...`, the Keboola Edit modal, or `POST/PUT /api/admin/registry`.
- `merge_parquet` and `merge_partition` use `pd.concat + drop_duplicates`, loading both existing and delta into pandas RAM. For tables in the multi-million-row range this may OOM — switch to `partitioned` strategy for those (per-partition merge keeps memory bounded). Documented in `### Internal` of the changelog entry.
- Date placeholders are resolved at **sync time**, not register time — a typo'd `{{lasst_week}}` is accepted at register and surfaces only when the next sync runs. By design (rolling windows need late-binding).

## Spec source

The four corresponding plans on the `zs/keboola-connector-specs` branch under `docs/superpowers/plans/2026-05-07-0[1-4]-*.md` capture the design rationale and link back to internal repo references for each subsystem.
<!-- devin-review-badge-begin -->

---

<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/217" target="_blank">
  <picture>
    <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
    <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
  </picture>
</a>
<!-- devin-review-badge-end -->
2026-05-07 19:01:27 +02:00

275 lines
9.6 KiB
Python

"""Tests for parquet_io helpers — typed parquet schema enforcement.
Ports the typed-schema parts of internal repo's `src/parquet_manager.py`
into the OSS Keboola legacy SDK extraction path. Three pure-function
helpers: convert_date_columns_to_date32, apply_schema_to_table, csv_to_parquet.
"""
import csv as _csv
import logging
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
# ───────────────────────────── convert_date_columns_to_date32 ─────────────────
def test_string_dates_become_date32():
from connectors.keboola.parquet_io import convert_date_columns_to_date32
table = pa.table({
"id": pa.array([1, 2, 3], type=pa.int64()),
"created_on": pa.array(["2025-01-15", "2025-02-20", "2025-03-30"]),
})
result = convert_date_columns_to_date32(table, ["created_on"])
assert result.schema.field("created_on").type == pa.date32()
assert result.schema.field("id").type == pa.int64()
def test_invalid_date_becomes_null_keeping_date32(caplog):
from connectors.keboola.parquet_io import convert_date_columns_to_date32
table = pa.table({
"created_on": pa.array(["2025-01-15", "0000-00-00", "not-a-date"]),
})
with caplog.at_level(logging.WARNING):
result = convert_date_columns_to_date32(table, ["created_on"])
assert result.schema.field("created_on").type == pa.date32()
col = result.column("created_on").to_pylist()
assert col[0].isoformat() == "2025-01-15"
assert col[1] is None
assert col[2] is None
assert "2 invalid date values" in caplog.text
assert "0000-00-00" in caplog.text
def test_all_null_column_gets_typed_nulls():
from connectors.keboola.parquet_io import convert_date_columns_to_date32
table = pa.table({
"created_on": pa.array([None, None, None], type=pa.string()),
})
result = convert_date_columns_to_date32(table, ["created_on"])
assert result.schema.field("created_on").type == pa.date32()
assert result.column("created_on").null_count == 3
def test_already_timestamp_column_casts_to_date32():
import datetime
from connectors.keboola.parquet_io import convert_date_columns_to_date32
table = pa.table({
"created_on": pa.array(
[datetime.datetime(2025, 1, 15, 12, 30)],
type=pa.timestamp("us"),
),
})
result = convert_date_columns_to_date32(table, ["created_on"])
assert result.schema.field("created_on").type == pa.date32()
assert result.column("created_on").to_pylist()[0].isoformat() == "2025-01-15"
def test_no_date_columns_listed_returns_unchanged():
from connectors.keboola.parquet_io import convert_date_columns_to_date32
table = pa.table({"x": pa.array([1, 2, 3])})
result = convert_date_columns_to_date32(table, [])
assert result is table
def test_date_column_not_in_table_silently_ignored():
from connectors.keboola.parquet_io import convert_date_columns_to_date32
table = pa.table({"x": pa.array([1, 2])})
result = convert_date_columns_to_date32(table, ["nonexistent"])
assert result.schema.field("x").type == pa.int64()
# ───────────────────────────── apply_schema_to_table ──────────────────────────
def test_null_type_column_gets_target_type():
from connectors.keboola.parquet_io import apply_schema_to_table
table = pa.table({"x": pa.nulls(3)})
target = pa.schema([pa.field("x", pa.int64())])
result = apply_schema_to_table(table, target)
assert result.schema.field("x").type == pa.int64()
assert result.column("x").null_count == 3
def test_string_to_timestamp_with_utc_suffix():
from connectors.keboola.parquet_io import apply_schema_to_table
table = pa.table({
"ts": pa.array(["2022-01-12T16:17:35.000Z", "2022-01-13T08:00:00.000Z"]),
})
target = pa.schema([pa.field("ts", pa.timestamp("us"))])
result = apply_schema_to_table(table, target)
assert result.schema.field("ts").type == pa.timestamp("us")
out = result.column("ts").to_pylist()
assert out[0].isoformat() == "2022-01-12T16:17:35"
def test_string_to_int_invalid_becomes_null():
from connectors.keboola.parquet_io import apply_schema_to_table
table = pa.table({
"amount": pa.array(["100", "200", "Non-Manager", "300"]),
})
target = pa.schema([pa.field("amount", pa.int64())])
result = apply_schema_to_table(table, target)
assert result.schema.field("amount").type == pa.int64()
out = result.column("amount").to_pylist()
assert out == [100, 200, None, 300]
def test_matching_type_kept_as_is():
from connectors.keboola.parquet_io import apply_schema_to_table
table = pa.table({"x": pa.array([1, 2, 3], type=pa.int64())})
target = pa.schema([pa.field("x", pa.int64())])
result = apply_schema_to_table(table, target)
assert result.column("x").to_pylist() == [1, 2, 3]
def test_column_not_in_target_kept_with_inferred_type():
from connectors.keboola.parquet_io import apply_schema_to_table
table = pa.table({
"x": pa.array([1, 2], type=pa.int64()),
"extra": pa.array(["a", "b"]),
})
target = pa.schema([pa.field("x", pa.int64())])
result = apply_schema_to_table(table, target)
assert "extra" in result.column_names
assert result.schema.field("extra").type == pa.string()
def test_uncastable_keeps_original_with_warning(caplog):
from connectors.keboola.parquet_io import apply_schema_to_table
table = pa.table({"x": pa.array(["abc", "def"])})
target = pa.schema([pa.field("x", pa.bool_())])
with caplog.at_level(logging.WARNING):
result = apply_schema_to_table(table, target)
assert result.schema.field("x").type == pa.string()
assert "cannot cast" in caplog.text
def test_empty_target_schema_returns_table_unchanged():
from connectors.keboola.parquet_io import apply_schema_to_table
table = pa.table({"x": pa.array([1, 2])})
result = apply_schema_to_table(table, pa.schema([]))
assert result is table
# ───────────────────────────── csv_to_parquet ─────────────────────────────────
def _write_csv(tmp_path: Path, rows: list[dict]) -> Path:
csv_path = tmp_path / "in.csv"
with csv_path.open("w", newline="") as f:
if not rows:
return csv_path
writer = _csv.DictWriter(f, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
return csv_path
def test_int_column_typed_via_dtypes(tmp_path):
from connectors.keboola.parquet_io import csv_to_parquet
csv_path = _write_csv(tmp_path, [
{"id": "1", "amount": "100"},
{"id": "2", "amount": "200"},
{"id": "3", "amount": ""},
])
pq_path = tmp_path / "out.parquet"
csv_to_parquet(
csv_path=csv_path,
parquet_path=pq_path,
dtypes={"id": "Int64", "amount": "Int64"},
)
table = pq.read_table(pq_path)
assert table.schema.field("amount").type == pa.int64()
assert table.column("amount").to_pylist() == [100, 200, None]
def test_date_column_typed_via_date32(tmp_path):
from connectors.keboola.parquet_io import csv_to_parquet
csv_path = _write_csv(tmp_path, [
{"id": "1", "created_on": "2025-01-15"},
{"id": "2", "created_on": "0000-00-00"},
])
pq_path = tmp_path / "out.parquet"
csv_to_parquet(
csv_path=csv_path,
parquet_path=pq_path,
dtypes={"id": "Int64"},
date_columns=["created_on"],
)
table = pq.read_table(pq_path)
assert table.schema.field("created_on").type == pa.date32()
out = table.column("created_on").to_pylist()
assert out[0].isoformat() == "2025-01-15"
assert out[1] is None
def test_pyarrow_schema_overrides_inferred(tmp_path):
from connectors.keboola.parquet_io import csv_to_parquet
csv_path = _write_csv(tmp_path, [
{"flag": "true"},
{"flag": "false"},
{"flag": ""},
])
pq_path = tmp_path / "out.parquet"
schema = pa.schema([pa.field("flag", pa.bool_())])
csv_to_parquet(
csv_path=csv_path,
parquet_path=pq_path,
dtypes={"flag": "boolean"},
pyarrow_schema=schema,
)
table = pq.read_table(pq_path)
assert table.schema.field("flag").type == pa.bool_()
assert table.column("flag").to_pylist() == [True, False, None]
def test_missing_dtype_column_falls_through_as_string(tmp_path):
from connectors.keboola.parquet_io import csv_to_parquet
csv_path = _write_csv(tmp_path, [{"x": "abc", "y": "1"}])
pq_path = tmp_path / "out.parquet"
csv_to_parquet(
csv_path=csv_path,
parquet_path=pq_path,
dtypes={"y": "Int64"},
)
table = pq.read_table(pq_path)
# pyarrow may use string or large_string for object columns from pandas
assert pa.types.is_string(table.schema.field("x").type) or pa.types.is_large_string(table.schema.field("x").type)
assert table.schema.field("y").type == pa.int64()
def test_empty_csv_writes_empty_parquet(tmp_path):
from connectors.keboola.parquet_io import csv_to_parquet
csv_path = tmp_path / "empty.csv"
csv_path.write_text("id,amount\n")
pq_path = tmp_path / "out.parquet"
csv_to_parquet(
csv_path=csv_path,
parquet_path=pq_path,
dtypes={"id": "Int64", "amount": "Int64"},
)
table = pq.read_table(pq_path)
assert table.num_rows == 0
assert table.schema.field("id").type == pa.int64()