## Summary
Brings the Keboola connector to feature parity with the legacy internal data-analyst's per-table sync strategies. Closes the four documented gaps from the spec branch (`zs/keboola-connector-specs`):
- **Typed parquet** in the legacy SDK extraction path — column types from Keboola Storage metadata (provider cascade `user > ai-metadata-enrichment > keboola.snowflake-transformation`) survive the CSV → parquet roundtrip; invalid date strings (`'0000-00-00'`) and invalid numeric strings (`'Non-Manager'`) become NULL while keeping the column's typed schema. Pre-fix everything was VARCHAR.
- **Incremental sync** via Storage API `changedSince` — opt-in per table; pulls only delta rows, merges into the existing parquet by `primary_key` (drop_duplicates with keep='last'). Cuts daily extraction from O(full table) to O(delta).
- **Partitioned sync** — flat per-partition layout `data/<table>/<key>.parquet` (e.g. `2026_05.parquet`), per-affected-partition merge for daily updates, chunked initial load with 1-day overlap and 2-empty-chunk stop heuristic.
- **`where_filters`** — server-side row filter with date placeholders (`{{today}}`, `{{last_3_months}}`, `{{start_of_3_months_ago}}`, etc.) resolved at sync time. Force the SDK path; reject `incremental + where_filters` combination at API layer (changedSince already filters temporally).
## Architecture
- **Schema migration v25 → v26**: 7 new columns on `table_registry`. Existing `sync_strategy` column reused (pre-v26 it was inert catalog metadata; post-v26 the extractor dispatches off it).
- **Per-table dispatcher** in `extractor.run()` routes to one of `_extract_via_extension` (full_refresh + extension), `_extract_via_legacy` (full_refresh + filters or extension fallback), `extract_incremental`, or `extract_partitioned`.
- **API conflict policy**: `incremental + where_filters` → 422; `partitioned + query_mode='remote'` → 422; `partitioned ⇒ partition_by required`.
- **Admin UI**: third "Direct extract (Storage API)" radio in the Keboola Register / Edit modals, alongside existing "Whole table (extension)" and "Custom SQL". When selected, exposes a v26 sync-strategy panel with conditional fields per strategy.
## Test plan
- [x] **Unit + module** — 134 v26 tests covering migration, repo, parquet_io, where_filters, incremental (compute_changed_since + merge_parquet + extract_incremental E2E), partitioned (key derivation + merge_partition + chunked windows + extract_partitioned E2E), extractor dispatcher, admin API validators, PUT field clearing, registry-shape → dispatcher bridge
- [x] **HTML form structure** — all v26 inputs + visibility classes + JS payload fields verified in rendered template
- [x] **Real Keboola roundtrip** — registered a small test table as `sync_strategy='incremental'` against a test Storage project, triggered two syncs:
- Sync 1: `changedSince=None` → full pull → 9 rows typed parquet
- Sync 2: `changedSince=last_sync - 1d window` → 9 delta rows merged with 9 existing → 9 after dedup on primary_key (PK merge confirmed)
- [x] **Browser UX** — agent-browser session against a local uvicorn: login → admin/tables → register modal → switch radios → verify field visibility per strategy → submit → edit existing row → switch to Direct/Incremental → save → confirm DB persistence
- [x] **Regression** — no regressions in the broader 3252-test suite (3 pre-v26 tests updated for the deprecation-marker removal + schema-version bump; 2 pre-existing environment-sensitive test failures unrelated to this change)
## Bugs caught + fixed during E2E
The browser + real-Keboola roundtrip exposed four bugs the unit tests missed:
1. **JS visibility race** — two competing `forEach` loops set `display=''` then `display='none'` on form elements sharing `kb-strategy-incremental kb-strategy-partitioned` classes (window_days + max_history_days are reused across strategies). Fix: single-pass selector with class-based visibility resolver.
2. **PUT cannot clear field** — pre-v26 `updates = {k: v ... if v is not None}` collapsed "omitted from body" and "sent as null" into the same case, so admin couldn't switch a partitioned row back to full_refresh and have stale `partition_by` clear. Fix: `model_dump(exclude_unset=True)`.
3. **Subprocess DB lock conflict** — `_read_last_sync` reopened `system.duckdb` while the parent server held the write lock (subprocess contract at `app/api/sync.py:_run_sync` line 260). Fix: parent injects `__last_sync__` into table_config before subprocess spawn.
4. **Wrong KBC table_id** — `extract_incremental` / `extract_partitioned` built the Storage API table_id from the registry row's slugified `id` (`circle_inc`) instead of `bucket.source_table` (`in.c-finance.circle`), producing 404s. Fix: prefer `bucket+source_table`; fall back to `id` only when bucket empty.
## Operator notes
- Existing tables stay on `full_refresh` after migration; admins opt individual tables in via `agnes admin register-table --sync-strategy ...`, the Keboola Edit modal, or `POST/PUT /api/admin/registry`.
- `merge_parquet` and `merge_partition` use `pd.concat + drop_duplicates`, loading both existing and delta into pandas RAM. For tables in the multi-million-row range this may OOM — switch to `partitioned` strategy for those (per-partition merge keeps memory bounded). Documented in `### Internal` of the changelog entry.
- Date placeholders are resolved at **sync time**, not register time — a typo'd `{{lasst_week}}` is accepted at register and surfaces only when the next sync runs. By design (rolling windows need late-binding).
## Spec source
The four corresponding plans on the `zs/keboola-connector-specs` branch under `docs/superpowers/plans/2026-05-07-0[1-4]-*.md` capture the design rationale and link back to internal repo references for each subsystem.
<!-- devin-review-badge-begin -->
---
<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/217" target="_blank">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
<img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
</picture>
</a>
<!-- devin-review-badge-end -->
203 lines
7.5 KiB
Python
203 lines
7.5 KiB
Python
"""Query commands — agnes query."""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
import typer
|
|
|
|
|
|
def query_command(
|
|
sql: Optional[str] = typer.Argument(None, help="SQL query to execute (positional)"),
|
|
sql_opt: Optional[str] = typer.Option(None, "--sql", help="SQL query to execute (named option)"),
|
|
remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"),
|
|
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"),
|
|
limit: int = typer.Option(1000, "--limit", help="Max rows to return"),
|
|
register_bq: Optional[List[str]] = typer.Option(
|
|
None,
|
|
"--register-bq",
|
|
help="Register a BigQuery result as a DuckDB view. Format: alias=BQ_SQL. Can be repeated.",
|
|
),
|
|
stdin: bool = typer.Option(False, "--stdin", help="Read SQL from stdin as JSON {\"sql\": \"...\"}"),
|
|
):
|
|
"""Execute SQL query against DuckDB."""
|
|
# Resolve SQL from exactly one of: positional, --sql, or --stdin
|
|
sources_provided = sum([
|
|
sql is not None,
|
|
sql_opt is not None,
|
|
stdin,
|
|
])
|
|
if sources_provided == 0:
|
|
typer.echo("Error: provide SQL as a positional argument, --sql option, or --stdin flag.", err=True)
|
|
raise typer.Exit(1)
|
|
if sources_provided > 1:
|
|
typer.echo("Error: only one of positional SQL, --sql, or --stdin may be used at a time.", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
if stdin:
|
|
raw = sys.stdin.read()
|
|
try:
|
|
payload = json.loads(raw)
|
|
resolved_sql = payload["sql"]
|
|
# Extract register_bq from stdin JSON
|
|
stdin_bq = payload.get("register_bq", {})
|
|
if stdin_bq and isinstance(stdin_bq, dict):
|
|
register_bq = [f"{k}={v}" for k, v in stdin_bq.items()]
|
|
except (json.JSONDecodeError, KeyError) as exc:
|
|
typer.echo(f"Error: failed to parse stdin JSON: {exc}", err=True)
|
|
raise typer.Exit(1)
|
|
elif sql_opt is not None:
|
|
resolved_sql = sql_opt
|
|
else:
|
|
resolved_sql = sql
|
|
|
|
if register_bq:
|
|
_query_hybrid(resolved_sql, fmt, limit, register_bq)
|
|
elif remote:
|
|
_query_remote(resolved_sql, fmt, limit)
|
|
else:
|
|
_query_local(resolved_sql, fmt, limit)
|
|
|
|
|
|
def _query_local(sql: str, fmt: str, limit: int):
|
|
"""Run query against local DuckDB."""
|
|
import duckdb
|
|
|
|
local_dir = Path(os.environ.get("AGNES_LOCAL_DIR", "."))
|
|
db_path = local_dir / "user" / "duckdb" / "analytics.duckdb"
|
|
if not db_path.exists():
|
|
typer.echo("Local DuckDB not found. Run: agnes pull", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
conn = duckdb.connect(str(db_path), read_only=True)
|
|
try:
|
|
result = conn.execute(sql).fetchmany(limit)
|
|
columns = [desc[0] for desc in conn.description] if conn.description else []
|
|
_output(columns, result, fmt)
|
|
except Exception as e:
|
|
typer.echo(f"Query error: {e}", err=True)
|
|
raise typer.Exit(1)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _query_remote(sql: str, fmt: str, limit: int):
|
|
"""Run query against server DuckDB via API."""
|
|
from cli.client import QUERY_TIMEOUT_S, api_post
|
|
from cli.error_render import render_error
|
|
|
|
resp = api_post(
|
|
"/api/query",
|
|
json={"sql": sql, "limit": limit},
|
|
timeout=QUERY_TIMEOUT_S,
|
|
)
|
|
if resp.status_code != 200:
|
|
# Parse JSON body if possible, fall back to text. The shared
|
|
# renderer pretty-prints typed BQ errors (cross_project_forbidden,
|
|
# remote_scan_too_large, bq_path_not_registered) instead of
|
|
# flattening the structured detail to a single truncated line.
|
|
try:
|
|
body = resp.json()
|
|
except Exception:
|
|
body = resp.text
|
|
typer.echo(render_error(resp.status_code, body), err=True)
|
|
raise typer.Exit(1)
|
|
|
|
data = resp.json()
|
|
_output(data["columns"], data["rows"], fmt)
|
|
if data.get("truncated"):
|
|
typer.echo(f"(truncated at {limit} rows)", err=True)
|
|
|
|
|
|
def _query_hybrid(sql: str, fmt: str, limit: int, register_bq_specs: List[str]):
|
|
"""Run a hybrid query: register BigQuery results as DuckDB views, then execute locally."""
|
|
import duckdb
|
|
from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config
|
|
|
|
local_dir = Path(os.environ.get("AGNES_LOCAL_DIR", "."))
|
|
db_path = local_dir / "user" / "duckdb" / "analytics.duckdb"
|
|
if not db_path.exists():
|
|
typer.echo("Local DuckDB not found. Run: agnes pull", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
conn = duckdb.connect(str(db_path), read_only=True)
|
|
try:
|
|
config = load_config()
|
|
engine_kwargs = {k: v for k, v in config.items() if k in (
|
|
"max_bq_registration_rows", "max_memory_mb", "max_result_rows", "timeout_seconds"
|
|
)}
|
|
# CLI --limit flag overrides config max_result_rows
|
|
engine_kwargs["max_result_rows"] = limit
|
|
engine = RemoteQueryEngine(conn, **engine_kwargs)
|
|
|
|
for spec in register_bq_specs:
|
|
if "=" not in spec:
|
|
typer.echo(
|
|
f"Error: --register-bq spec must be 'alias=BQ_SQL', got: {spec!r}",
|
|
err=True,
|
|
)
|
|
raise typer.Exit(1)
|
|
alias, bq_sql = spec.split("=", 1)
|
|
alias = alias.strip()
|
|
bq_sql = bq_sql.strip()
|
|
try:
|
|
info = engine.register_bq(alias, bq_sql)
|
|
typer.echo(
|
|
f"Registered BQ alias '{alias}': {info['rows']:,} rows, "
|
|
f"{info['memory_mb']:.1f} MiB",
|
|
err=True,
|
|
)
|
|
except RemoteQueryError as exc:
|
|
# Use the shared renderer so typed BqAccessError details
|
|
# (carried via RemoteQueryError.details) surface as a
|
|
# multi-line block with the operator-facing hint.
|
|
from cli.error_render import render_error
|
|
synthetic = {"detail": {
|
|
"kind": exc.error_type,
|
|
"alias": alias,
|
|
"message": str(exc),
|
|
**(exc.details or {}),
|
|
}}
|
|
typer.echo(render_error(400, synthetic), err=True)
|
|
raise typer.Exit(1)
|
|
|
|
try:
|
|
result = engine.execute(sql)
|
|
except RemoteQueryError as exc:
|
|
from cli.error_render import render_error
|
|
synthetic = {"detail": {
|
|
"kind": exc.error_type,
|
|
"message": str(exc),
|
|
**(exc.details or {}),
|
|
}}
|
|
typer.echo(render_error(400, synthetic), err=True)
|
|
raise typer.Exit(1)
|
|
|
|
_output(result["columns"], result["rows"], fmt)
|
|
if result.get("truncated"):
|
|
typer.echo(f"(truncated at {result['row_count']} rows)", err=True)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _output(columns: list, rows: list, fmt: str):
|
|
if fmt == "json":
|
|
output = [dict(zip(columns, row)) for row in rows]
|
|
typer.echo(json.dumps(output, indent=2, default=str))
|
|
elif fmt == "csv":
|
|
typer.echo(",".join(columns))
|
|
for row in rows:
|
|
typer.echo(",".join(str(v) if v is not None else "" for v in row))
|
|
else:
|
|
# Table format using rich
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
console = Console()
|
|
table = Table()
|
|
for col in columns:
|
|
table.add_column(col)
|
|
for row in rows:
|
|
table.add_row(*(str(v) if v is not None else "" for v in row))
|
|
console.print(table)
|