Phase 0: extend table_registry schema (v1→v2 migration), add source_type/bucket/source_table/query_mode columns. Phase 1: SyncOrchestrator ATTACHes extract.duckdb files into master analytics.duckdb. Keboola extractor uses DuckDB extension with legacy client fallback. BigQuery extractor is remote-only via DuckDB BQ extension (no data download). 62 tests passing.
12 KiB
Core Refactoring — DuckDB-Centric Extract Architecture
Date: 2026-03-30 Status: Draft v2
1. Problem
Current sync core is 5,900 lines with heavy dependencies (pandas, pyarrow, kbcstorage). Fragile markdown config parser. Adding a connector requires 500-1700 lines of Python. Tightly coupled — connector downloads, type-casts, merges, partitions, and writes to disk all in one place.
2. Core idea
Every data source produces the same thing: a folder with extract.duckdb + data/. The orchestrator doesn't care how the data got there — it just ATTACHes the DuckDB file.
/data/extracts/{source_name}/
├── extract.duckdb ← MUST exist. Contains _meta table + views/tables on data.
└── data/ ← Data files the views point to (parquet, csv, whatever).
That's it. That's the entire contract.
3. extract.duckdb contract
Every extract.duckdb MUST contain:
_meta table — describes what's inside:
CREATE TABLE _meta (
table_name VARCHAR NOT NULL,
description VARCHAR,
rows BIGINT,
size_bytes BIGINT,
extracted_at TIMESTAMP,
query_mode VARCHAR DEFAULT 'local' -- 'local' = data is here, 'remote' = query on demand
);
Views or tables for each entry in _meta — how they store data is their business (parquet, csv, in-memory, remote ATTACH — doesn't matter).
4. Three types of sources
Batch pull (Keboola, Postgres, CSV)
Scheduler or manual trigger runs extractor → rewrites entire output folder.
Scheduler (every 15m)
→ python -m connectors.keboola.extract
→ output: /data/extracts/keboola/extract.duckdb + data/*.parquet
→ orchestrator.rebuild()
One instance typically has one primary batch source (configured in instance.yaml). The extractor reads table_registry for which tables to pull and how (sync_strategy, schedule).
Remote attach (BigQuery)
No data download. DuckDB BigQuery community extension ATTACHes directly to BQ. Queries go to BigQuery on-demand.
/data/extracts/bigquery/
├── extract.duckdb ← ATTACH to BQ + views + _meta (query_mode='remote')
└── (no data/ directory)
INSTALL bigquery FROM community; LOAD bigquery;
ATTACH 'project=my_gcp_project' AS bq (TYPE bigquery, READ_ONLY);
CREATE VIEW orders AS SELECT * FROM bq.dataset.orders;
INSERT INTO _meta VALUES ('orders', 'Order data', 0, 0, now(), 'remote');
Extractor (connectors/bigquery/extractor.py, ~50 lines) runs once at init or when table_registry changes. It creates extract.duckdb with views that delegate to BQ — no parquets, no downloads. Orchestrator ATTACHes it like any other source.
Replaces: adapter.py (665 lines) + client.py (644 lines) + remote_query.py (~300 lines).
Real-time push (Jira webhooks)
External system sends events → webhook handler updates output folder incrementally.
Jira sends webhook → POST /webhooks/jira
→ handler processes event
→ appends/updates parquet in /data/extracts/jira/data/
→ updates extract.duckdb views + _meta
No scheduler needed — data arrives when it arrives. Output folder is updated in-place, not rewritten.
All three produce the same output
The orchestrator doesn't know or care which type produced the folder or whether data is local parquets or remote BQ views. It just ATTACHes extract.duckdb.
5. Orchestrator
class SyncOrchestrator:
def rebuild(self):
"""Scan /data/extracts/*, ATTACH each, create master views."""
master = duckdb.connect("/data/analytics.duckdb")
for ext_dir in sorted(Path("/data/extracts").iterdir()):
db = ext_dir / "extract.duckdb"
if not db.exists():
continue
name = ext_dir.name
master.execute(f"ATTACH '{db}' AS {name} (READ_ONLY)")
# Read _meta to know what's available
meta = master.execute(f"SELECT table_name, rows, query_mode FROM {name}._meta").fetchall()
# Create flat views in master
for table_name, rows, query_mode in meta:
master.execute(f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM {name}.{table_name}")
self.state.update_sync(table_name, rows=rows)
master.close()
~30 lines. Replaces 734-line DataSyncManager.
6. Keboola extractor
# connectors/keboola/extractor.py (~60 lines)
def run(output_dir: str, table_configs: list[dict]):
"""Extract tables from Keboola into output_dir."""
data_dir = Path(output_dir) / "data"
data_dir.mkdir(parents=True, exist_ok=True)
conn = duckdb.connect(f"{output_dir}/extract.duckdb")
conn.execute("INSTALL keboola FROM community; LOAD keboola;")
conn.execute(f"ATTACH '{url}' AS kbc (TYPE keboola, TOKEN '{token}')")
# Create _meta
conn.execute("DROP TABLE IF EXISTS _meta")
conn.execute("""CREATE TABLE _meta (
table_name VARCHAR, description VARCHAR, rows BIGINT,
size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'local'
)""")
now = datetime.now(timezone.utc)
for tc in table_configs:
if tc["query_mode"] == "remote":
# Register in _meta but don't download
conn.execute(f"INSERT INTO _meta VALUES ('{tc['name']}', '', 0, 0, '{now}', 'remote')")
continue
pq_path = str(data_dir / f"{tc['name']}.parquet")
conn.execute(f"""COPY (SELECT * FROM kbc."{tc['bucket']}".{tc['source_table']})
TO '{pq_path}' (FORMAT PARQUET)""")
rows = conn.execute(f"SELECT count(*) FROM read_parquet('{pq_path}')").fetchone()[0]
size = os.path.getsize(pq_path)
conn.execute(f"CREATE OR REPLACE VIEW {tc['name']} AS SELECT * FROM read_parquet('{pq_path}')")
conn.execute(f"INSERT INTO _meta VALUES ('{tc['name']}', '{tc.get('description','')}', {rows}, {size}, '{now}', 'local')")
conn.execute("DETACH kbc")
conn.close()
if __name__ == "__main__":
# Standalone: reads config from table_registry, runs extraction
configs = load_table_configs()
run("/data/extracts/keboola", configs)
Replaces 1,700 lines (adapter.py + client.py).
7. BigQuery extractor
# connectors/bigquery/extractor.py (~50 lines)
def init_extract(output_dir: str, project_id: str, table_configs: list[dict]):
"""Create extract.duckdb with remote views into BigQuery."""
conn = duckdb.connect(f"{output_dir}/extract.duckdb")
conn.execute("INSTALL bigquery FROM community; LOAD bigquery;")
conn.execute(f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)")
# Create _meta
conn.execute("DROP TABLE IF EXISTS _meta")
conn.execute("""CREATE TABLE _meta (
table_name VARCHAR, description VARCHAR, rows BIGINT,
size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'remote'
)""")
now = datetime.now(timezone.utc)
for tc in table_configs:
dataset = tc['bucket'] # BigQuery dataset
source = tc['source_table']
conn.execute(f'CREATE OR REPLACE VIEW {tc["name"]} AS SELECT * FROM bq."{dataset}"."{source}"')
conn.execute(f"INSERT INTO _meta VALUES ('{tc['name']}', '{tc.get('description','')}', 0, 0, '{now}', 'remote')")
conn.execute("DETACH bq")
conn.close()
if __name__ == "__main__":
configs = load_table_configs(source_type="bigquery")
init_extract("/data/extracts/bigquery", project_id, configs)
No data/ directory. All queries go directly to BigQuery via DuckDB extension. Replaces 1,600 lines (adapter.py + client.py + remote_query.py).
Authentication: DuckDB BigQuery extension uses Application Default Credentials (ADC) or GOOGLE_APPLICATION_CREDENTIALS env var — same as the current google-cloud-bigquery Python client.
8. Config: table_registry
table_registry in system.duckdb (already exists, extend with source columns):
CREATE TABLE IF NOT EXISTS table_registry (
id VARCHAR PRIMARY KEY,
name VARCHAR NOT NULL,
-- Source
source_type VARCHAR NOT NULL, -- 'keboola', 'bigquery', 'jira', 'postgres'
bucket VARCHAR, -- Keboola bucket or schema
source_table VARCHAR, -- table name in source
-- Sync behavior
sync_strategy VARCHAR DEFAULT 'full_refresh',
query_mode VARCHAR DEFAULT 'local',
sync_schedule VARCHAR,
profile_after_sync BOOLEAN DEFAULT true,
-- Metadata
primary_key VARCHAR,
description TEXT,
registered_by VARCHAR,
registered_at TIMESTAMP DEFAULT current_timestamp
);
Instance-level source config stays in instance.yaml:
data_source: keboola
keboola:
url: https://connection.us-east4.gcp.keboola.com
token_env: KEBOOLA_STORAGE_TOKEN
Table list goes in table_registry. Import from existing data_description.md via one-time migration script.
9. How it runs
instance.yaml → which source (keboola)
table_registry → which tables + how (full_refresh, schedule)
Scheduler:
Every 15 min:
1. Read table_registry for tables due to sync
2. Run extractor: python -m connectors.keboola.extract
3. Extractor writes /data/extracts/keboola/
4. orchestrator.rebuild() → ATTACH → master views
API trigger:
POST /api/sync/trigger
→ same as scheduler step 2-4
CLI:
da sync (on analyst machine)
→ calls GET /api/sync/manifest
→ downloads parquets from /api/data/{table}/download
→ creates local analytics.duckdb with views
10. Adding a new source
If DuckDB has extension for it (most cases):
- Add tables to
table_registry(via admin API or CLI) - Write extractor script:
connectors/{name}/extractor.py(~30-60 lines)INSTALL extension; LOAD extension; ATTACH source; COPY TO parquet
- Add to scheduler config
- Done
If no DuckDB extension (REST API, custom):
- Same as above but extractor fetches data via HTTP/SDK
- Writes result to DuckDB via
read_json_autoorconn.register() - Same output format:
extract.duckdb+data/
Jira-style webhook:
- Add webhook endpoint to FastAPI
- Handler updates
/data/extracts/jira/incrementally - Same output format — orchestrator picks it up on next rebuild
11. What gets deleted
| File | Lines | Replaced by |
|---|---|---|
src/config.py |
653 | table_registry in DuckDB |
src/parquet_manager.py |
755 | DuckDB COPY TO |
src/data_sync.py (most) |
~600 | SyncOrchestrator (~30 lines) |
src/remote_query.py |
~300 | DuckDB BigQuery ATTACH (queries go directly via extension) |
connectors/keboola/adapter.py |
820 | extractor.py (~60 lines) |
connectors/bigquery/adapter.py |
665 | extractor.py (~50 lines, remote-only via DuckDB BQ extension) |
connectors/bigquery/client.py |
644 | DuckDB BigQuery extension (ADC auth, direct ATTACH) |
| Total removed | ~4,400 | ~200 new |
Kept as legacy (not deleted):
connectors/keboola/client.py— fallback if DuckDB Keboola extension unavailableconnectors/jira/— webhook pattern, adapted to write extract.duckdbsrc/profiler.py— already DuckDB, unchanged
12. What stays unchanged
src/repositories/— DuckDB-backed, used by APIsrc/db.py— system DB schemasrc/profiler.py— already uses DuckDBconnectors/llm/,connectors/openmetadata/— unrelatedapp/(FastAPI),cli/,webapp/— call orchestrator instead of DataSyncManager
13. Client side (analyst) — no change
da sync → downloads parquets from server API → creates local analytics.duckdb with views
Analyst doesn't know or care about extractors. Same flow as today.
14. Incremental sync (future)
Current: full refresh only. Extractor interface is ready for incremental:
table_registryhassync_strategyfield- Extractor can check last sync time from
_meta.extracted_at - When Keboola DuckDB extension adds
changedSince(issue #10), extractor uses it - Until then: full refresh, which is fast enough for most tables via extension
15. Tested (2026-03-30)
Keboola DuckDB extension with real token:
ATTACH+SELECT *+COPY TO parquet: works (1.5s for 15 rows)- Extension: v0.1.0, requires DuckDB 1.5.1+
- Issues filed: keboola/duckdb-extension#6 through #11