## Summary
Brings the Keboola connector to feature parity with the legacy internal data-analyst's per-table sync strategies. Closes the four documented gaps from the spec branch (`zs/keboola-connector-specs`):
- **Typed parquet** in the legacy SDK extraction path — column types from Keboola Storage metadata (provider cascade `user > ai-metadata-enrichment > keboola.snowflake-transformation`) survive the CSV → parquet roundtrip; invalid date strings (`'0000-00-00'`) and invalid numeric strings (`'Non-Manager'`) become NULL while keeping the column's typed schema. Pre-fix everything was VARCHAR.
- **Incremental sync** via Storage API `changedSince` — opt-in per table; pulls only delta rows, merges into the existing parquet by `primary_key` (drop_duplicates with keep='last'). Cuts daily extraction from O(full table) to O(delta).
- **Partitioned sync** — flat per-partition layout `data/<table>/<key>.parquet` (e.g. `2026_05.parquet`), per-affected-partition merge for daily updates, chunked initial load with 1-day overlap and 2-empty-chunk stop heuristic.
- **`where_filters`** — server-side row filter with date placeholders (`{{today}}`, `{{last_3_months}}`, `{{start_of_3_months_ago}}`, etc.) resolved at sync time. Force the SDK path; reject `incremental + where_filters` combination at API layer (changedSince already filters temporally).
## Architecture
- **Schema migration v25 → v26**: 7 new columns on `table_registry`. Existing `sync_strategy` column reused (pre-v26 it was inert catalog metadata; post-v26 the extractor dispatches off it).
- **Per-table dispatcher** in `extractor.run()` routes to one of `_extract_via_extension` (full_refresh + extension), `_extract_via_legacy` (full_refresh + filters or extension fallback), `extract_incremental`, or `extract_partitioned`.
- **API conflict policy**: `incremental + where_filters` → 422; `partitioned + query_mode='remote'` → 422; `partitioned ⇒ partition_by required`.
- **Admin UI**: third "Direct extract (Storage API)" radio in the Keboola Register / Edit modals, alongside existing "Whole table (extension)" and "Custom SQL". When selected, exposes a v26 sync-strategy panel with conditional fields per strategy.
## Test plan
- [x] **Unit + module** — 134 v26 tests covering migration, repo, parquet_io, where_filters, incremental (compute_changed_since + merge_parquet + extract_incremental E2E), partitioned (key derivation + merge_partition + chunked windows + extract_partitioned E2E), extractor dispatcher, admin API validators, PUT field clearing, registry-shape → dispatcher bridge
- [x] **HTML form structure** — all v26 inputs + visibility classes + JS payload fields verified in rendered template
- [x] **Real Keboola roundtrip** — registered a small test table as `sync_strategy='incremental'` against a test Storage project, triggered two syncs:
- Sync 1: `changedSince=None` → full pull → 9 rows typed parquet
- Sync 2: `changedSince=last_sync - 1d window` → 9 delta rows merged with 9 existing → 9 after dedup on primary_key (PK merge confirmed)
- [x] **Browser UX** — agent-browser session against a local uvicorn: login → admin/tables → register modal → switch radios → verify field visibility per strategy → submit → edit existing row → switch to Direct/Incremental → save → confirm DB persistence
- [x] **Regression** — no regressions in the broader 3252-test suite (3 pre-v26 tests updated for the deprecation-marker removal + schema-version bump; 2 pre-existing environment-sensitive test failures unrelated to this change)
## Bugs caught + fixed during E2E
The browser + real-Keboola roundtrip exposed four bugs the unit tests missed:
1. **JS visibility race** — two competing `forEach` loops set `display=''` then `display='none'` on form elements sharing `kb-strategy-incremental kb-strategy-partitioned` classes (window_days + max_history_days are reused across strategies). Fix: single-pass selector with class-based visibility resolver.
2. **PUT cannot clear field** — pre-v26 `updates = {k: v ... if v is not None}` collapsed "omitted from body" and "sent as null" into the same case, so admin couldn't switch a partitioned row back to full_refresh and have stale `partition_by` clear. Fix: `model_dump(exclude_unset=True)`.
3. **Subprocess DB lock conflict** — `_read_last_sync` reopened `system.duckdb` while the parent server held the write lock (subprocess contract at `app/api/sync.py:_run_sync` line 260). Fix: parent injects `__last_sync__` into table_config before subprocess spawn.
4. **Wrong KBC table_id** — `extract_incremental` / `extract_partitioned` built the Storage API table_id from the registry row's slugified `id` (`circle_inc`) instead of `bucket.source_table` (`in.c-finance.circle`), producing 404s. Fix: prefer `bucket+source_table`; fall back to `id` only when bucket empty.
## Operator notes
- Existing tables stay on `full_refresh` after migration; admins opt individual tables in via `agnes admin register-table --sync-strategy ...`, the Keboola Edit modal, or `POST/PUT /api/admin/registry`.
- `merge_parquet` and `merge_partition` use `pd.concat + drop_duplicates`, loading both existing and delta into pandas RAM. For tables in the multi-million-row range this may OOM — switch to `partitioned` strategy for those (per-partition merge keeps memory bounded). Documented in `### Internal` of the changelog entry.
- Date placeholders are resolved at **sync time**, not register time — a typo'd `{{lasst_week}}` is accepted at register and surfaces only when the next sync runs. By design (rolling windows need late-binding).
## Spec source
The four corresponding plans on the `zs/keboola-connector-specs` branch under `docs/superpowers/plans/2026-05-07-0[1-4]-*.md` capture the design rationale and link back to internal repo references for each subsystem.
<!-- devin-review-badge-begin -->
---
<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/217" target="_blank">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
<img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
</picture>
</a>
<!-- devin-review-badge-end -->
361 lines
15 KiB
Python
361 lines
15 KiB
Python
"""End-to-end tests for v26 Keboola sync-strategy support.
|
|
|
|
Coverage matrix:
|
|
- HTML: admin_tables.html contains the new Direct-extract radio + v26 panel + JS handlers
|
|
- API: POST /api/admin/register-table accepts each v26 strategy + persists v26 fields
|
|
- API: PUT /api/admin/registry/{id} updates v26 fields, switches strategies, clears stale values
|
|
- API: conflict policy → 422 on incremental+filters and partitioned+remote
|
|
- Roundtrip: registered v26 table comes back from GET /api/admin/registry with all fields
|
|
- Module: registered v26 table_config flows through extractor.run() dispatcher
|
|
to extract_incremental / extract_partitioned (with mocked SDK)
|
|
"""
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pyarrow as pa
|
|
import pyarrow.parquet as pq
|
|
import pytest
|
|
|
|
|
|
# ───────────────────────────── HTML form structure ────────────────────────────
|
|
|
|
|
|
HTML = Path("app/web/templates/admin_tables.html").read_text()
|
|
|
|
|
|
def test_html_register_modal_has_direct_extract_radio():
|
|
assert 'value="direct"' in HTML
|
|
assert 'Direct extract (Storage API)' in HTML
|
|
|
|
|
|
def test_html_has_kb_strategy_dropdown():
|
|
assert 'id="kbStrategy"' in HTML
|
|
assert 'id="editKbStrategy"' in HTML
|
|
for option in ("full_refresh", "incremental", "partitioned"):
|
|
assert f'value="{option}"' in HTML
|
|
|
|
|
|
def test_html_has_v26_inputs():
|
|
"""Every v26 field must be wired in both register and edit modals."""
|
|
for kid in [
|
|
"kbIncrementalWindowDays", "kbMaxHistoryDays",
|
|
"kbPartitionBy", "kbPartitionGranularity", "kbInitialLoadChunkDays",
|
|
"kbWhereFilters",
|
|
"editKbIncrementalWindowDays", "editKbMaxHistoryDays",
|
|
"editKbPartitionBy", "editKbPartitionGranularity", "editKbInitialLoadChunkDays",
|
|
"editKbWhereFilters",
|
|
]:
|
|
assert f'id="{kid}"' in HTML, f"v26 input missing: {kid}"
|
|
|
|
|
|
def test_html_visibility_classes_match_js_handlers():
|
|
for cls in [
|
|
"kb-direct-only", "kb-strategy-incremental", "kb-strategy-partitioned",
|
|
"kb-strategy-not-incremental",
|
|
"editkb-direct-only", "editkb-strategy-incremental",
|
|
"editkb-strategy-partitioned", "editkb-strategy-not-incremental",
|
|
]:
|
|
assert cls in HTML, f"visibility class missing: {cls}"
|
|
|
|
|
|
def test_html_js_payload_builders_send_v26_fields():
|
|
"""Spot-check: the JS payload builder emits at least one v26 field name
|
|
so the API receives them."""
|
|
for js_field in [
|
|
"sync_strategy", "incremental_window_days", "max_history_days",
|
|
"partition_by", "partition_granularity", "initial_load_chunk_days",
|
|
"where_filters",
|
|
]:
|
|
assert js_field in HTML, f"JS payload missing field: {js_field}"
|
|
|
|
|
|
def test_html_placeholders_documented_in_form_hint():
|
|
"""The where_filters help text must mention at least 4 placeholders so
|
|
operators don't have to read the source to know what's supported."""
|
|
for token in ("{{today}}", "{{last_3_months}}", "{{last_year}}", "{{start_of_3_months_ago}}"):
|
|
assert token in HTML, f"placeholder hint missing: {token}"
|
|
|
|
|
|
# ───────────────────────────── API roundtrip ──────────────────────────────────
|
|
|
|
|
|
def _auth(token):
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
def test_api_register_full_refresh_keboola(seeded_app):
|
|
"""Baseline: full_refresh registration with no v26 fields persists clean."""
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "circle",
|
|
"source_type": "keboola",
|
|
"bucket": "in.c-finance",
|
|
"source_table": "circle",
|
|
"query_mode": "local",
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
|
|
g = c.get("/api/admin/registry", headers=_auth(seeded_app["admin_token"]))
|
|
row = next(t for t in g.json()["tables"] if t["id"] == "circle")
|
|
assert row["sync_strategy"] == "full_refresh"
|
|
assert row["incremental_window_days"] is None
|
|
assert row["where_filters"] is None
|
|
|
|
|
|
def test_api_register_incremental_with_full_v26_payload(seeded_app):
|
|
"""Mirrors the JS payload from _buildKeboolaPayload(direct + incremental)."""
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "kpi_snapshot",
|
|
"source_type": "keboola",
|
|
"bucket": "in.c-finance",
|
|
"source_table": "kpi_leadership_snapshot",
|
|
"query_mode": "local",
|
|
"primary_key": ["kpi_id", "snapshot_date"],
|
|
"sync_strategy": "incremental",
|
|
"incremental_window_days": 1,
|
|
"max_history_days": 180,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
|
|
g = c.get("/api/admin/registry", headers=_auth(seeded_app["admin_token"]))
|
|
row = next(t for t in g.json()["tables"] if t["id"] == "kpi_snapshot")
|
|
assert row["sync_strategy"] == "incremental"
|
|
assert row["incremental_window_days"] == 1
|
|
assert row["max_history_days"] == 180
|
|
assert row["primary_key"] == ["kpi_id", "snapshot_date"]
|
|
|
|
|
|
def test_api_register_partitioned_with_full_v26_payload(seeded_app):
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "orders",
|
|
"source_type": "keboola",
|
|
"bucket": "in.c-sales",
|
|
"source_table": "orders",
|
|
"query_mode": "local",
|
|
"primary_key": ["id"],
|
|
"sync_strategy": "partitioned",
|
|
"partition_by": "order_date",
|
|
"partition_granularity": "month",
|
|
"initial_load_chunk_days": 30,
|
|
"max_history_days": 365,
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
|
|
g = c.get("/api/admin/registry", headers=_auth(seeded_app["admin_token"]))
|
|
row = next(t for t in g.json()["tables"] if t["id"] == "orders")
|
|
assert row["sync_strategy"] == "partitioned"
|
|
assert row["partition_by"] == "order_date"
|
|
assert row["partition_granularity"] == "month"
|
|
assert row["initial_load_chunk_days"] == 30
|
|
|
|
|
|
def test_api_register_with_where_filters(seeded_app):
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "account_balance",
|
|
"source_type": "keboola",
|
|
"bucket": "in.c-finance",
|
|
"source_table": "account_balance",
|
|
"query_mode": "local",
|
|
"sync_strategy": "full_refresh",
|
|
"where_filters": [
|
|
{"column": "date", "operator": "ge", "values": ["{{last_3_months}}"]},
|
|
{"column": "country_code", "operator": "eq", "values": ["CZ", "SK"]},
|
|
],
|
|
})
|
|
assert r.status_code == 201, r.text
|
|
|
|
g = c.get("/api/admin/registry", headers=_auth(seeded_app["admin_token"]))
|
|
row = next(t for t in g.json()["tables"] if t["id"] == "account_balance")
|
|
assert row["where_filters"][0]["column"] == "date"
|
|
# Placeholder must be PRESERVED at register time (resolved at sync time)
|
|
assert row["where_filters"][0]["values"] == ["{{last_3_months}}"]
|
|
assert row["where_filters"][1]["values"] == ["CZ", "SK"]
|
|
|
|
|
|
# ───────────────────────────── conflict policy ────────────────────────────────
|
|
|
|
|
|
def test_api_rejects_incremental_plus_where_filters(seeded_app):
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "x", "source_type": "keboola",
|
|
"bucket": "in.c-x", "source_table": "x", "query_mode": "local",
|
|
"sync_strategy": "incremental",
|
|
"where_filters": [{"column": "d", "operator": "ge", "values": ["x"]}],
|
|
})
|
|
assert r.status_code == 422
|
|
assert "incremental" in r.text.lower() or "where_filters" in r.text.lower()
|
|
|
|
|
|
def test_api_rejects_partitioned_remote(seeded_app):
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "y", "source_type": "keboola",
|
|
"bucket": "in.c-y", "source_table": "y", "query_mode": "remote",
|
|
"sync_strategy": "partitioned",
|
|
"partition_by": "date",
|
|
})
|
|
assert r.status_code == 422
|
|
|
|
|
|
def test_api_rejects_partitioned_without_partition_by(seeded_app):
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "z", "source_type": "keboola",
|
|
"bucket": "in.c-z", "source_table": "z", "query_mode": "local",
|
|
"sync_strategy": "partitioned",
|
|
})
|
|
assert r.status_code == 422
|
|
assert "partition_by" in r.text
|
|
|
|
|
|
def test_api_rejects_invalid_strategy(seeded_app):
|
|
c = seeded_app["client"]
|
|
r = c.post("/api/admin/register-table", headers=_auth(seeded_app["admin_token"]), json={
|
|
"name": "q", "source_type": "keboola",
|
|
"bucket": "in.c-q", "source_table": "q", "query_mode": "local",
|
|
"sync_strategy": "monthly_at_midnight",
|
|
})
|
|
assert r.status_code == 422
|
|
|
|
|
|
# ───────────────────────────── PUT (Edit modal) ───────────────────────────────
|
|
|
|
|
|
def test_api_put_changes_strategy_full_to_incremental(seeded_app):
|
|
c = seeded_app["client"]
|
|
auth = _auth(seeded_app["admin_token"])
|
|
c.post("/api/admin/register-table", headers=auth, json={
|
|
"name": "tab1", "source_type": "keboola",
|
|
"bucket": "in.c-x", "source_table": "tab1", "query_mode": "local",
|
|
"sync_strategy": "full_refresh",
|
|
})
|
|
|
|
r = c.put("/api/admin/registry/tab1", headers=auth, json={
|
|
"sync_strategy": "incremental",
|
|
"primary_key": ["id"],
|
|
"incremental_window_days": 7,
|
|
})
|
|
assert r.status_code == 200, r.text
|
|
|
|
row = next(t for t in c.get("/api/admin/registry", headers=auth).json()["tables"]
|
|
if t["id"] == "tab1")
|
|
assert row["sync_strategy"] == "incremental"
|
|
assert row["incremental_window_days"] == 7
|
|
|
|
|
|
def test_api_put_clears_v26_fields_on_strategy_switch(seeded_app):
|
|
"""JS sends explicit nulls for v26 fields when switching strategies; the
|
|
PUT path must propagate them so stale values don't survive."""
|
|
c = seeded_app["client"]
|
|
auth = _auth(seeded_app["admin_token"])
|
|
c.post("/api/admin/register-table", headers=auth, json={
|
|
"name": "tab2", "source_type": "keboola",
|
|
"bucket": "in.c-x", "source_table": "tab2", "query_mode": "local",
|
|
"primary_key": ["id"],
|
|
"sync_strategy": "partitioned",
|
|
"partition_by": "date",
|
|
"partition_granularity": "month",
|
|
"max_history_days": 180,
|
|
})
|
|
|
|
# Switch back to full_refresh — partition_by/granularity should clear
|
|
r = c.put("/api/admin/registry/tab2", headers=auth, json={
|
|
"sync_strategy": "full_refresh",
|
|
"partition_by": None,
|
|
"partition_granularity": None,
|
|
"initial_load_chunk_days": None,
|
|
"max_history_days": None,
|
|
"incremental_window_days": None,
|
|
"where_filters": None,
|
|
})
|
|
assert r.status_code == 200, r.text
|
|
|
|
row = next(t for t in c.get("/api/admin/registry", headers=auth).json()["tables"]
|
|
if t["id"] == "tab2")
|
|
assert row["sync_strategy"] == "full_refresh"
|
|
assert row["partition_by"] is None
|
|
assert row["partition_granularity"] is None
|
|
assert row["max_history_days"] is None
|
|
|
|
|
|
def test_api_put_updates_where_filters(seeded_app):
|
|
c = seeded_app["client"]
|
|
auth = _auth(seeded_app["admin_token"])
|
|
c.post("/api/admin/register-table", headers=auth, json={
|
|
"name": "tab3", "source_type": "keboola",
|
|
"bucket": "in.c-x", "source_table": "tab3", "query_mode": "local",
|
|
"sync_strategy": "full_refresh",
|
|
"where_filters": [{"column": "d", "operator": "ge", "values": ["{{last_week}}"]}],
|
|
})
|
|
|
|
r = c.put("/api/admin/registry/tab3", headers=auth, json={
|
|
"where_filters": [
|
|
{"column": "d", "operator": "ge", "values": ["{{last_year}}"]},
|
|
{"column": "country", "operator": "eq", "values": ["US"]},
|
|
],
|
|
})
|
|
assert r.status_code == 200, r.text
|
|
|
|
row = next(t for t in c.get("/api/admin/registry", headers=auth).json()["tables"]
|
|
if t["id"] == "tab3")
|
|
assert len(row["where_filters"]) == 2
|
|
assert row["where_filters"][0]["values"] == ["{{last_year}}"]
|
|
|
|
|
|
# ───────────────────────────── module-level dispatch ──────────────────────────
|
|
|
|
|
|
def test_extractor_dispatches_v26_table_from_registry(tmp_path, seeded_app, monkeypatch):
|
|
"""A row registered through the API as 'incremental' is correctly
|
|
routed by extractor.run() to extract_incremental.
|
|
|
|
Bridges the API+DB persistence layer to the dispatcher logic — proves
|
|
the registered row's table_config dict shape (as returned by
|
|
list_by_source) matches what the dispatcher reads."""
|
|
c = seeded_app["client"]
|
|
auth = _auth(seeded_app["admin_token"])
|
|
c.post("/api/admin/register-table", headers=auth, json={
|
|
"name": "events_local",
|
|
"source_type": "keboola",
|
|
"bucket": "in.c-evt",
|
|
"source_table": "events",
|
|
"query_mode": "local",
|
|
"primary_key": ["id"],
|
|
"sync_strategy": "incremental",
|
|
"incremental_window_days": 2,
|
|
})
|
|
|
|
# Round-trip the registered row through the API to get the table_config
|
|
# shape the dispatcher consumes (extractor.run takes table_configs from
|
|
# repo.list_by_source, which produces the same dict shape as the GET
|
|
# response — verify the new v26 columns survive the read path).
|
|
g = c.get("/api/admin/registry", headers=auth)
|
|
target = next(t for t in g.json()["tables"] if t["id"] == "events_local")
|
|
assert target["sync_strategy"] == "incremental"
|
|
assert target["incremental_window_days"] == 2
|
|
|
|
from connectors.keboola import extractor
|
|
|
|
called = {"incremental": 0, "extension": 0}
|
|
def fake_incremental(**kw):
|
|
called["incremental"] += 1
|
|
pa_t = pa.table({"id": pa.array([1])})
|
|
pq.write_table(pa_t, kw["parquet_path"])
|
|
return {"rows": 1, "delta_rows": 1, "changed_since_used": None}
|
|
def fake_extension(*a, **kw):
|
|
called["extension"] += 1
|
|
|
|
monkeypatch.setattr(extractor, "_extract_via_extension", fake_extension)
|
|
monkeypatch.setattr(extractor, "_try_attach_extension", lambda *a, **kw: True)
|
|
monkeypatch.setattr(extractor, "_read_last_sync", lambda tid: None)
|
|
monkeypatch.setattr("connectors.keboola.incremental.extract_incremental", fake_incremental)
|
|
|
|
result = extractor.run(str(tmp_path), [target], "https://kbc.example", "tok")
|
|
assert called == {"incremental": 1, "extension": 0}
|
|
assert result["tables_extracted"] == 1
|