agnes-the-ai-analyst/docs/superpowers/specs/2026-03-30-core-refactoring-design.md

9.3 KiB

Core Refactoring — DuckDB-Centric Extract Architecture

Date: 2026-03-30 Status: Draft v2

1. Problem

Current sync core is 5,900 lines with heavy dependencies (pandas, pyarrow, kbcstorage). Fragile markdown config parser. Adding a connector requires 500-1700 lines of Python. Tightly coupled — connector downloads, type-casts, merges, partitions, and writes to disk all in one place.

2. Core idea

Every data source produces the same thing: a folder with extract.duckdb + data/. The orchestrator doesn't care how the data got there — it just ATTACHes the DuckDB file.

/data/extracts/{source_name}/
├── extract.duckdb          ← MUST exist. Contains _meta table + views/tables on data.
└── data/                   ← Data files the views point to (parquet, csv, whatever).

That's it. That's the entire contract.

3. extract.duckdb contract

Every extract.duckdb MUST contain:

_meta table — describes what's inside:

CREATE TABLE _meta (
    table_name VARCHAR NOT NULL,
    description VARCHAR,
    rows BIGINT,
    size_bytes BIGINT,
    extracted_at TIMESTAMP,
    query_mode VARCHAR DEFAULT 'local'   -- 'local' = data is here, 'remote' = query on demand
);

Views or tables for each entry in _meta — how they store data is their business (parquet, csv, in-memory, remote ATTACH — doesn't matter).

4. Two types of sources

Batch pull (Keboola, BigQuery, Postgres, CSV)

Scheduler or manual trigger runs extractor → rewrites entire output folder.

Scheduler (every 15m)
  → python -m connectors.keboola.extract
  → output: /data/extracts/keboola/extract.duckdb + data/*.parquet
  → orchestrator.rebuild()

One instance typically has one primary batch source (configured in instance.yaml). The extractor reads table_registry for which tables to pull and how (sync_strategy, schedule).

Real-time push (Jira webhooks)

External system sends events → webhook handler updates output folder incrementally.

Jira sends webhook → POST /webhooks/jira
  → handler processes event
  → appends/updates parquet in /data/extracts/jira/data/
  → updates extract.duckdb views + _meta

No scheduler needed — data arrives when it arrives. Output folder is updated in-place, not rewritten.

Both produce the same output

The orchestrator doesn't know or care which type produced the folder. It just ATTACHes extract.duckdb.

5. Orchestrator

class SyncOrchestrator:
    def rebuild(self):
        """Scan /data/extracts/*, ATTACH each, create master views."""
        master = duckdb.connect("/data/analytics.duckdb")

        for ext_dir in sorted(Path("/data/extracts").iterdir()):
            db = ext_dir / "extract.duckdb"
            if not db.exists():
                continue

            name = ext_dir.name
            master.execute(f"ATTACH '{db}' AS {name} (READ_ONLY)")

            # Read _meta to know what's available
            meta = master.execute(f"SELECT table_name, rows, query_mode FROM {name}._meta").fetchall()

            # Create flat views in master
            for table_name, rows, query_mode in meta:
                master.execute(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {name}.{table_name}")
                self.state.update_sync(table_name, rows=rows)

        master.close()

~30 lines. Replaces 734-line DataSyncManager.

6. Keboola extractor

# connectors/keboola/extractor.py (~60 lines)

def run(output_dir: str, table_configs: list[dict]):
    """Extract tables from Keboola into output_dir."""
    data_dir = Path(output_dir) / "data"
    data_dir.mkdir(parents=True, exist_ok=True)

    conn = duckdb.connect(f"{output_dir}/extract.duckdb")
    conn.execute("INSTALL keboola FROM community; LOAD keboola;")
    conn.execute(f"ATTACH '{url}' AS kbc (TYPE keboola, TOKEN '{token}')")

    # Create _meta
    conn.execute("DROP TABLE IF EXISTS _meta")
    conn.execute("""CREATE TABLE _meta (
        table_name VARCHAR, description VARCHAR, rows BIGINT,
        size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'local'
    )""")

    now = datetime.now(timezone.utc)
    for tc in table_configs:
        if tc["query_mode"] == "remote":
            # Register in _meta but don't download
            conn.execute(f"INSERT INTO _meta VALUES ('{tc['name']}', '', 0, 0, '{now}', 'remote')")
            continue

        pq_path = str(data_dir / f"{tc['name']}.parquet")
        conn.execute(f"""COPY (SELECT * FROM kbc."{tc['bucket']}".{tc['source_table']})
                        TO '{pq_path}' (FORMAT PARQUET)""")

        rows = conn.execute(f"SELECT count(*) FROM read_parquet('{pq_path}')").fetchone()[0]
        size = os.path.getsize(pq_path)

        conn.execute(f"CREATE OR REPLACE VIEW {tc['name']} AS SELECT * FROM read_parquet('{pq_path}')")
        conn.execute(f"INSERT INTO _meta VALUES ('{tc['name']}', '{tc.get('description','')}', {rows}, {size}, '{now}', 'local')")

    conn.execute("DETACH kbc")
    conn.close()

if __name__ == "__main__":
    # Standalone: reads config from table_registry, runs extraction
    configs = load_table_configs()
    run("/data/extracts/keboola", configs)

Replaces 1,700 lines (adapter.py + client.py).

7. Config: table_registry

table_registry in system.duckdb (already exists, extend with source columns):

CREATE TABLE IF NOT EXISTS table_registry (
    id VARCHAR PRIMARY KEY,
    name VARCHAR NOT NULL,

    -- Source
    source_type VARCHAR NOT NULL,     -- 'keboola', 'bigquery', 'jira', 'postgres'
    bucket VARCHAR,                   -- Keboola bucket or schema
    source_table VARCHAR,             -- table name in source

    -- Sync behavior
    sync_strategy VARCHAR DEFAULT 'full_refresh',
    query_mode VARCHAR DEFAULT 'local',
    sync_schedule VARCHAR,
    profile_after_sync BOOLEAN DEFAULT true,

    -- Metadata
    primary_key VARCHAR,
    description TEXT,
    registered_by VARCHAR,
    registered_at TIMESTAMP DEFAULT current_timestamp
);

Instance-level source config stays in instance.yaml:

data_source: keboola
keboola:
  url: https://connection.us-east4.gcp.keboola.com
  token_env: KEBOOLA_STORAGE_TOKEN

Table list goes in table_registry. Import from existing data_description.md via one-time migration script.

8. How it runs

instance.yaml → which source (keboola)
table_registry → which tables + how (full_refresh, schedule)

Scheduler:
  Every 15 min:
    1. Read table_registry for tables due to sync
    2. Run extractor: python -m connectors.keboola.extract
    3. Extractor writes /data/extracts/keboola/
    4. orchestrator.rebuild() → ATTACH → master views

API trigger:
  POST /api/sync/trigger
    → same as scheduler step 2-4

CLI:
  da sync (on analyst machine)
    → calls GET /api/sync/manifest
    → downloads parquets from /api/data/{table}/download
    → creates local analytics.duckdb with views

9. Adding a new source

If DuckDB has extension for it (most cases):

  1. Add tables to table_registry (via admin API or CLI)
  2. Write extractor script: connectors/{name}/extractor.py (~30-60 lines)
    • INSTALL extension; LOAD extension; ATTACH source; COPY TO parquet
  3. Add to scheduler config
  4. Done

If no DuckDB extension (REST API, custom):

  1. Same as above but extractor fetches data via HTTP/SDK
  2. Writes result to DuckDB via read_json_auto or conn.register()
  3. Same output format: extract.duckdb + data/

Jira-style webhook:

  1. Add webhook endpoint to FastAPI
  2. Handler updates /data/extracts/jira/ incrementally
  3. Same output format — orchestrator picks it up on next rebuild

10. What gets deleted

File Lines Replaced by
src/config.py 653 table_registry in DuckDB
src/parquet_manager.py 755 DuckDB COPY TO
src/data_sync.py (most) ~600 SyncOrchestrator (~30 lines)
connectors/keboola/adapter.py 820 extractor.py (~60 lines)
connectors/bigquery/adapter.py 665 extractor.py (~40 lines)
Total removed ~3500 ~200 new

Kept as legacy (not deleted):

  • connectors/keboola/client.py — fallback if extension unavailable
  • connectors/jira/ — webhook pattern, adapted to write extract.duckdb
  • src/profiler.py — already DuckDB, unchanged

11. What stays unchanged

  • src/repositories/ — DuckDB-backed, used by API
  • src/db.py — system DB schema
  • src/profiler.py — already uses DuckDB
  • connectors/llm/, connectors/openmetadata/ — unrelated
  • app/ (FastAPI), cli/, webapp/ — call orchestrator instead of DataSyncManager

12. Client side (analyst) — no change

da sync → downloads parquets from server API → creates local analytics.duckdb with views

Analyst doesn't know or care about extractors. Same flow as today.

13. Incremental sync (future)

Current: full refresh only. Extractor interface is ready for incremental:

  • table_registry has sync_strategy field
  • Extractor can check last sync time from _meta.extracted_at
  • When Keboola DuckDB extension adds changedSince (issue #10), extractor uses it
  • Until then: full refresh, which is fast enough for most tables via extension

14. Tested (2026-03-30)

Keboola DuckDB extension with real token:

  • ATTACH + SELECT * + COPY TO parquet: works (1.5s for 15 rows)
  • Extension: v0.1.0, requires DuckDB 1.5.1+
  • Issues filed: keboola/duckdb-extension#6 through #11