From 07b396bfe27833a054906452942ebea441a8a398 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Fri, 27 Mar 2026 15:42:57 +0100 Subject: [PATCH] docs: add refactoring plan, design spec, and gitignore updates --- .gitignore | 3 + docs/REFACTORING_PLAN.md | 576 ++++++ .../plans/2026-03-27-01-duckdb-state-layer.md | 1574 +++++++++++++++++ .../specs/2026-03-27-refactoring-design.md | 524 ++++++ 4 files changed, 2677 insertions(+) create mode 100644 docs/REFACTORING_PLAN.md create mode 100644 docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md create mode 100644 docs/superpowers/specs/2026-03-27-refactoring-design.md diff --git a/.gitignore b/.gitignore index 913d990..3af42b7 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,6 @@ docs/datasets/*/schema.yml # Agent-generated reports (not part of codebase) .audit/ docs/AGENT-REPORTS/ + +# Internal transcripts and meeting notes +docs/ZS_PADAK_* diff --git a/docs/REFACTORING_PLAN.md b/docs/REFACTORING_PLAN.md new file mode 100644 index 0000000..e31dd7c --- /dev/null +++ b/docs/REFACTORING_PLAN.md @@ -0,0 +1,576 @@ +# Refaktoring AI Data Analyst — Finální plán + +## Kontext + +Platforma vznikla iterativně pro interní Keboola a nyní se má stát produktem pro zákazníky (Groupon aj.). Klíčové problémy z transcriptu ZS+Padák: křehký filesystem stav (JSON soubory, permission konflikty), žádné API (vše SSH+skripty), bezpečnost přes Linux skupiny, složitá instalace (10+ kroků). Systém je navržen pro AI agenty — člověk diskutuje s AI, AI řeší vše (user, admin, dev operace). + +**UX zůstává stejné.** Tooling: `uv` všude místo pip. Docker + Kamal pro server. CLI (`da`) jako primární rozhraní pro AI agenty. + +--- + +## Architektura — cílový stav + +``` +SERVER (Docker + Kamal): +├── webapp Flask UI (katalog, login, corporate memory) +├── api FastAPI (CLI backend, sync manifest, data download) +├── scheduler APScheduler (nahrazuje 7 systemd timerů) +├── telegram-bot Telegram notifikace +├── ws-gateway WebSocket pro desktop app +└── script-runner Sandboxovaný runner pro user skripty + +LOKÁLNĚ (analytik): +├── da CLI Python balíček (uv tool install) +├── DuckDB Embedded (analytics.duckdb → views na parquety) +└── Parquety Stažené ze serveru přes da sync + +DVA DuckDB NA SERVERU: +├── /data/state/system.duckdb Systémový stav (users, sync_state, knowledge...) +└── /data/analytics/server.duckdb Views → /data/parquet/** (profiler, remote query, skripty) + +JEDEN DuckDB LOKÁLNĚ: +└── user/duckdb/analytics.duckdb Views → server/parquet/** + user tabulky +``` + +--- + +## Fáze 0: Základ — DuckDB state + repository vrstva + +**Cíl:** Nahradit 10+ JSON souborů DuckDB databází. Eliminovat #1 zdroj outages (file permission konflikty). + +**Proč DuckDB:** Už v stacku, agent může joinovat stav s analytickými daty, lepší než SQLite pro analytické dotazy nad stavem. + +### Task 0A: DuckDB schema + repository vrstva [INDEPENDENT] + +Nové soubory: +- `src/db.py` — DuckDB connection management, schema creation, migration system +- `src/repositories/__init__.py` +- `src/repositories/sync_state.py` — CRUD pro sync stav +- `src/repositories/users.py` — CRUD pro uživatele + role +- `src/repositories/knowledge.py` — CRUD pro corporate memory +- `src/repositories/table_registry.py` — CRUD pro registr tabulek +- `src/repositories/audit.py` — audit log +- `src/repositories/notifications.py` — telegram links, pending codes, script registry + +Schema tabulky (mapování z JSON): + +| Současný JSON | DuckDB tabulka | Zdroj soubor | +|---|---|---| +| `sync_state.json` | `sync_state` | `src/data_sync.py:37-138` | +| `sync_settings.json` | `user_sync_settings` | `webapp/sync_settings_service.py:20` | +| `knowledge.json` | `knowledge_items` | `webapp/corporate_memory_service.py` | +| `votes.json` | `knowledge_votes` | `webapp/corporate_memory_service.py` | +| `audit.jsonl` | `audit_log` | `webapp/corporate_memory_service.py` | +| `telegram_users.json` | `telegram_links` | `services/telegram_bot/storage.py` | +| `pending_codes.json` | `pending_codes` | `services/telegram_bot/storage.py` | +| `password_users.json` | `users` | `webapp/password_auth.py` | +| `table_registry.json` | `table_registry` | `src/table_registry.py` | +| `profiles.json` | `table_profiles` | `src/profiler.py` | + +Přidat navíc: `sync_history` (posledních 10 syncí per tabulka, ne jen last), `script_registry` (deployed skripty). + +### Task 0B: Migrace existujících service souborů na repository [DEPENDS ON 0A] + +Soubory k úpravě (nahradit `_read_json`/`_write_json` za repository volání): +- `webapp/sync_settings_service.py` řádky 40-62 +- `webapp/corporate_memory_service.py` — 31 JSON operací +- `webapp/telegram_service.py` řádky 22-45 +- `src/data_sync.py` — třída `SyncState` řádky 37-138 +- `src/table_registry.py` — `_load`, `_atomic_write_json` +- `src/profiler.py` — uložení profilů +- `services/corporate_memory/collector.py` — čtení/zápis knowledge +- `services/telegram_bot/storage.py` — 15 JSON operací + +Pattern: dual-write (JSON + DuckDB) po přechodnou dobu → ověřit → smazat JSON zápisy. + +### Task 0C: Migrační skript [DEPENDS ON 0A] + +- `scripts/migrate_json_to_duckdb.py` — načte všechny JSON, vloží do DuckDB +- Idempotentní (safe to run multiple times) +- Validace po migraci (count porovnání) + +### Co se NEMĚNÍ v Fázi 0 +- Flask routes v `webapp/app.py` +- HTML šablony +- Konektory (`connectors/keboola/`, `connectors/bigquery/`, `connectors/jira/`) +- `src/config.py` (čte `data_description.md` — konfigurace, ne stav) +- `config/loader.py` (čte `instance.yaml`) +- `src/parquet_manager.py` + +--- + +## Fáze 1: API vrstva (FastAPI) + +**Cíl:** REST API pro CLI. Všechny operace co dnes vyžadují SSH. + +### Task 1A: FastAPI základ + auth [INDEPENDENT od 0B, DEPENDS ON 0A] + +Nové soubory: +``` +api/ + __init__.py + app.py # FastAPI app, middleware, CORS + auth.py # JWT vydávání + validace + dependencies.py # DI pro DuckDB session, current_user +``` + +Auth flow: +1. `POST /api/auth/login` — přijme OAuth token z webappu, vydá JWT +2. `POST /api/auth/token` — přijme API key, vydá JWT +3. JWT obsahuje: user_id, email, role, expiry +4. Middleware validuje JWT na všech /api/* endpoints + +### Task 1B: Sync + Data endpointy [DEPENDS ON 1A, 0A] + +``` +api/routers/ + sync.py # GET /api/sync/manifest, POST /api/sync/trigger + data.py # GET /api/data/{table}/download (parquet stream) +``` + +- `/api/sync/manifest` — vrátí hashe všech parquetů, docs, rules, profilů (filtrované per-user dle subscription) +- `/api/data/{table}/download` — streaming parquet souboru s ETag/If-None-Match +- `/api/sync/trigger` — spustí DataSyncManager (reuse `src/data_sync.py`) + +### Task 1C: Query + Scripts endpointy [DEPENDS ON 1A, 0A] + +``` +api/routers/ + query.py # POST /api/query (remote query) + scripts.py # POST /api/scripts/run, /deploy, /list +``` + +- `/api/query` — reuse `src/remote_query.py`, výsledek jako JSON/parquet +- `/api/scripts/run` — spustí Python skript v sandboxu na serveru +- `/api/scripts/deploy` — nahraje skript + registruje v scheduleru +- `/api/scripts/list` — deployed skripty s jejich schedules + +### Task 1D: User management + Corporate memory endpointy [DEPENDS ON 1A, 0A] + +``` +api/routers/ + users.py # CRUD uživatelů, role, permissions + settings.py # GET/PUT sync settings per user + memory.py # Corporate memory CRUD, voting, governance + health.py # GET /api/health (strukturovaná diagnostika) + upload.py # POST sessions, artifacts, CLAUDE.local.md +``` + +### Task 1E: Odstranění SSH/sudo závislostí [DEPENDS ON 1B, 1D] + +Smazat/přepsat: +- `webapp/sync_settings_service.py` řádky 128-240 (sudo/rsync-filter kód) +- `webapp/user_service.py` — Linux user management (`pwd.getpwnam`, `sudo add-analyst`) +- SSH key validace workflow +- `server/sudoers-webapp`, `server/sudoers-deploy` +- `server/bin/add-analyst` + +--- + +## Fáze 2: CLI nástroj (`da`) + +**Cíl:** Jediné rozhraní pro AI agenty. Nahrazuje SSH+skripty. `uv tool install`. + +### Task 2A: CLI základ + auth [INDEPENDENT od 1B-1E, DEPENDS ON 1A] + +``` +cli/ + __init__.py + main.py # Typer app, global options (--server, --json) + config.py # ~/.config/da/ management + client.py # HTTP client wrapper (auth, retry, streaming) + commands/ + auth.py # da login, da logout, da whoami +``` + +- `da login` → otevře browser pro OAuth → server vydá JWT → uloží do `~/.config/da/token.json` +- `da --json` flag na všech příkazech pro strukturovaný output +- `da --server URL` override (default z config.yaml) + +### Task 2B: Sync příkazy [DEPENDS ON 2A, 1B] + +``` +cli/commands/ + sync.py # da sync, da sync --table X, da sync --upload-only +``` + +Flow: +1. `GET /api/sync/manifest` → porovnej s `~/.config/da/sync_state.json` +2. Download změněné parquety (HTTP streaming s progress barem) +3. Download docs, rules, profily +4. Upload sessions, artifacts, CLAUDE.local.md +5. Rebuild DuckDB views (DROP views, CREATE VIEW per tabulka, zachovej user tabulky) +6. Update lokální manifest + +Přepíše funkci `scripts/sync_data.sh` (475 řádků). + +### Task 2C: Query + Scripts příkazy [DEPENDS ON 2A, 1C] + +``` +cli/commands/ + query.py # da query "SQL" [--remote] [--json] + scripts.py # da scripts list/run/deploy/undeploy + explore.py # da explore {table} — profil tabulky +``` + +- `da query` — lokální DuckDB default, `--remote` přes server API +- `da scripts run X` — lokálně default, `--remote` přes server +- `da scripts deploy X --schedule "cron"` — upload + registrace na serveru +- `da explore orders` — profil z lokálních dat (nebo `--remote` ze serveru) + +### Task 2D: Admin + Server příkazy [DEPENDS ON 2A, 1D] + +``` +cli/commands/ + admin.py # da admin add-user/remove-user/list-users + status.py # da status [--local] — zdraví systému + server.py # da server deploy/rollback/logs/status + diagnose.py # da diagnose — AI-friendly diagnostika +``` + +- `da status` — strukturovaný health report (tabulky, sync stav, služby) +- `da status --local` — offline: kdy jsem synkoval, kolik dat mám +- `da diagnose` — projde logy, sync stav, konektivitu → root cause +- `da server deploy` — wrapper kolem `kamal deploy` +- `da server logs webapp` — wrapper kolem `kamal app logs` + +### Task 2E: PyPI distribuce [DEPENDS ON 2A] + +- `pyproject.toml` pro CLI balíček +- `uv tool install data-analyst` nebo `uv pip install data-analyst` +- Entry point: `[project.scripts] da = "cli.main:app"` +- Minimální dependencies: typer, httpx, duckdb, rich (progress bars) + +--- + +## Fáze 3: Docker + Kamal + +**Cíl:** `docker compose up` pro dev, `kamal deploy` pro produkci. Nahrazuje 10+ manuálních kroků. + +### Task 3A: Dockerfile + docker-compose.yml [INDEPENDENT] + +``` +Dockerfile # python:3.13-slim, uv install, jeden image +docker-compose.yml # webapp, api, scheduler, telegram-bot, ws-gateway +docker-compose.test.yml # api + test-runner pro integrační testy +``` + +- Jeden image, různý CMD per služba +- Volume `/data` sdílený mezi kontejnery +- `profiles: ["full"]` pro volitelné služby (telegram, ws-gateway) +- `uv sync` místo `pip install` v Dockerfile + +### Task 3B: Scheduler služba [DEPENDS ON 0A] + +Nový soubor: `services/scheduler/__main__.py` +- APScheduler (nebo jednoduchý custom) nahrazuje 7 systemd timerů: + +| Timer | Schedule | Funkce | +|---|---|---| +| data-refresh | 15 min | `DataSyncManager.sync_scheduled()` | +| catalog-refresh | 15 min | Catalog refresh | +| corporate-memory | 30 min | Knowledge collector | +| session-collector | 6h | Session collection (z uploaded dat) | +| user-scripts | per-script cron | Script runner | +| profiler | po data-refresh | Auto-profile nových dat | + +### Task 3C: Kamal konfigurace [DEPENDS ON 3A] + +``` +config/ + deploy.yml # produkční Kamal config + deploy.staging.yml # staging override +``` + +- Kamal Proxy pro auto-SSL (Let's Encrypt) +- Healthcheck na `/api/health` +- Zero-downtime deploy +- Accessories: scheduler, telegram-bot, ws-gateway, script-runner +- Environment secrets přes Kamal env management + +### Task 3D: GitHub Actions CI/CD [DEPENDS ON 3A, 3C] + +``` +.github/workflows/ + ci.yml # test + build na každém push + deploy.yml # staging na PR, production na merge do main +``` + +Flow: push → pytest → integrační testy (docker compose) → build image → push GHCR → kamal deploy staging (PR) / production (merge) + +### Task 3E: Smazání starého server infra [DEPENDS ON 3A-3D, ověřeno že nové funguje] + +Smazat: +- `server/setup.sh` (103 řádků) +- `server/webapp-setup.sh` (171 řádků) +- `server/deploy.sh` (395 řádků) +- `server/migrate-to-v2.sh` (146 řádků) +- Všechny systemd unit soubory (`services/*/systemd/`) +- `server/sudoers-*` +- `server/bin/add-analyst` a related skripty +- `scripts/sync_data.sh` (475 řádků) +- `server/webapp.service`, `server/webapp-nginx.conf` + +--- + +## Fáze 4: RBAC + bezpečnost + +**Cíl:** Aplikační RBAC místo Linux skupin. Audit trail. Script sandboxing. + +### Task 4A: Role + permissions v DuckDB [DEPENDS ON 0A] + +Nový soubor: `src/rbac.py` + +```python +class Role(Enum): + VIEWER = "viewer" # Katalog, čtení dat + ANALYST = "analyst" # Sync, queries, voting, skripty + ADMIN = "admin" # Správa uživatelů, schvalování knowledge + KM_ADMIN = "km_admin" # Corporate memory governance +``` + +- Dataset-level permissions (kdo má přístup ke kterým datům) +- Přepsat `webapp/auth.py` řádky 37-65 (admin_required/km_admin_required) +- Přepsat `webapp/user_service.py` celý — DB místo `pwd.getpwnam()` + `sudo` + +### Task 4B: Audit trail [DEPENDS ON 0A] + +- Každý API call logován do `audit_log` tabulky +- Struktura: timestamp, user_id, action, resource, params, result, duration +- Agent může: `da query "SELECT * FROM system.audit_log WHERE action='sync_trigger' ORDER BY timestamp DESC LIMIT 10"` + +### Task 4C: Script sandboxing [DEPENDS ON 3A] + +- Script-runner jako izolovaný Docker kontejner +- Read-only přístup k DuckDB +- Omezená paměť (512MB), čas (5min), žádný network (kromě notification dispatch) +- Explicitní whitelist Python balíčků (pandas, duckdb, matplotlib) + +### Task 4D: Corporate memory push model [DEPENDS ON 1D] + +- Uživatelé pushují CLAUDE.local.md přes `da sync --upload-only` +- Server nikdy nečte `/home/*/` jako root +- Corporate memory collector zpracovává uploaded data z DB + +--- + +## Dependency graf pro multi-agenty + +``` +Fáze 0: + 0A (DuckDB schema) ─────────────────────┐ + 0C (migrační skript) ← závisí na 0A │ + 0B (migrace services) ← závisí na 0A │ + │ +Fáze 1: │ + 1A (FastAPI základ) ← závisí na 0A ─────┤ + 1B (sync/data EP) ← závisí na 1A, 0A │ + 1C (query/scripts EP) ← závisí na 1A │ + 1D (users/memory EP) ← závisí na 1A │ + 1E (remove SSH) ← závisí na 1B, 1D │ + │ +Fáze 2: │ + 2A (CLI základ) ← závisí na 1A │ + 2B (sync cmd) ← závisí na 2A, 1B │ + 2C (query/scripts cmd) ← závisí na 2A │ + 2D (admin/server cmd) ← závisí na 2A │ + 2E (PyPI) ← závisí na 2A │ + │ +Fáze 3: │ + 3A (Dockerfile) ← INDEPENDENT ──────────┘ + 3B (scheduler) ← závisí na 0A + 3C (Kamal) ← závisí na 3A + 3D (CI/CD) ← závisí na 3A, 3C + 3E (cleanup) ← závisí na 3A-3D verified + +Fáze 4: + 4A (RBAC) ← závisí na 0A + 4B (audit) ← závisí na 0A + 4C (sandbox) ← závisí na 3A + 4D (push model) ← závisí na 1D +``` + +### Paralelní agenty — optimální rozložení + +``` +AGENT 1: DuckDB + Repositories AGENT 2: FastAPI AGENT 3: Docker + Kamal +───────────────────────────── ───────────────── ────────────────────── +0A: DuckDB schema (čeká na 0A) 3A: Dockerfile + compose +0C: migrační skript 1A: FastAPI základ 3B: scheduler služba +0B: migrace services 1B: sync/data EP 3C: Kamal konfigurace +4A: RBAC 1C: query/scripts EP 3D: CI/CD workflow +4B: audit trail 1D: users/memory EP 4C: script sandbox + 1E: remove SSH deps + +AGENT 4: CLI + Skills AGENT 5: Integrace + Cleanup +───────────────────── ─────────────────────────── +(čeká na 1A) (čeká na agents 1-4) +2A: CLI základ + auth End-to-end testování +2B: sync příkazy 3E: smazání starého infra +2C: query/scripts příkazy 4D: corporate memory push +2D: admin/server příkazy 5A: CLAUDE.md template update +2E: PyPI distribuce Dokumentace update +5B: CLI skills (help/docs) +5C: da setup (interactive) +5D: da diagnose +5E: da infra (multi-customer) +``` + +--- + +## Znovupoužité vs. přepsané soubory + +### Beze změny (business logika zachována) +- `src/config.py` — TableConfig, Config parsing (625 řádků) +- `src/parquet_manager.py` — Parquet conversion engine +- `connectors/keboola/adapter.py` + `client.py` +- `connectors/bigquery/adapter.py` + `client.py` +- `connectors/jira/` — celý connector +- `connectors/llm/` — LLM abstrakce +- `connectors/openmetadata/` — katalog enrichment +- `webapp/config.py`, `config/loader.py` +- `webapp/templates/` — všechny HTML šablony +- `src/remote_query.py` — query logika (zabalená API) +- `src/profiler.py` — profiling logika (output do DuckDB) + +### Přepojené na DuckDB (logika zachována, I/O vrstva vyměněna) +- `webapp/corporate_memory_service.py` +- `webapp/sync_settings_service.py` +- `webapp/telegram_service.py` +- `src/data_sync.py` (SyncState třída) +- `src/table_registry.py` +- `services/corporate_memory/collector.py` +- `services/telegram_bot/storage.py` + +### Přepsané +- `webapp/user_service.py` — DB místo Linux users +- `webapp/auth.py` řádky 37-65 — RBAC místo Linux skupin + +### Nové +- `src/db.py`, `src/repositories/`, `src/rbac.py` +- `api/` — celý FastAPI server +- `cli/` — celý CLI nástroj +- `Dockerfile`, `docker-compose*.yml`, `config/deploy*.yml` +- `services/scheduler/__main__.py` +- `.github/workflows/ci.yml`, `.github/workflows/deploy.yml` + +### Smazané +- `server/setup.sh`, `server/webapp-setup.sh`, `server/deploy.sh` +- `server/migrate-to-v2.sh` +- `server/sudoers-*`, `server/bin/add-analyst` +- `scripts/sync_data.sh` +- Všechny `services/*/systemd/` soubory +- `server/webapp.service`, `server/webapp-nginx.conf` + +--- + +## Fáze 5: Agent Skills (CLAUDE.md + CLI skills) + +**Cíl:** AI agent má vestavěné znalosti pro nasazení, administraci, diagnostiku a vývoj. Nemusí nic googlit — vše je v skills. + +### Task 5A: CLAUDE.md template pro analytiky [INDEPENDENT] + +Aktualizovat `docs/setup/claude_md_template.md`: +- Instrukce pro `da` CLI místo SSH/rsync +- `da sync` jako povinný start session +- Jak pracovat s lokálním DuckDB +- Jak vytvářet a deployovat skripty +- Jak používat corporate memory +- Notifikační vzory (lokální vs serverové) + +### Task 5B: Admin/Deploy skills v CLI [DEPENDS ON 2D] + +`da` CLI bude obsahovat vestavěné skills — dlouhé help texty s domain knowledge, které AI agent přečte přes `da --help` nebo `da skills `: + +```bash +da skills list # seznam všech dostupných skills +da skills setup # kompletní průvodce setup nové instance +da skills troubleshoot # diagnostické postupy +da skills connectors # jak přidat nový data source +da skills notifications # jak fungují notifikace +da skills corporate-memory # governance, approval flow +da skills security # RBAC, permissions, audit +da skills backup-restore # disaster recovery +da skills upgrade # jak upgradovat verzi +``` + +Každý skill = markdown soubor v `cli/skills/` který se zobrazí přes `da skills `. + +### Task 5C: Interaktivní setup skill [DEPENDS ON 2D, 1A] + +```bash +da setup # AI agent spustí interaktivní setup +``` + +Flow (agent řídí): +1. `da setup init` → vygeneruje `instance.yaml` z konverzace s uživatelem +2. `da setup test-connection` → ověří credentials (Keboola/BigQuery) +3. `da setup deploy` → `docker compose up` nebo `kamal deploy` +4. `da setup first-sync` → triggeruje první data sync +5. `da setup verify` → healthcheck, počet tabulek, sample query +6. `da setup add-user` → přidá prvního analytika + +Každý krok vrací strukturovaný JSON → agent ví co dělat dál. + +### Task 5D: Diagnose skill [DEPENDS ON 2D, 1D] + +```bash +da diagnose # kompletní diagnostika +da diagnose --symptom "data not updating" # cílená diagnostika +da diagnose --component scheduler # diagnostika jedné služby +``` + +Output (strukturovaný pro agenta): +```json +{ + "overall": "degraded", + "checks": [ + {"name": "api", "status": "ok", "latency_ms": 12}, + {"name": "scheduler", "status": "ok", "last_run": "2026-03-27T08:00"}, + {"name": "data_freshness", "status": "warning", + "detail": "table 'orders' last synced 26h ago, expected 15min", + "suggested_action": "da server logs scheduler | grep orders"}, + {"name": "disk", "status": "ok", "usage": "45%"}, + {"name": "duckdb", "status": "ok", "tables": 47, "total_rows": "12.3M"} + ], + "suggested_actions": [ + "Check scheduler logs for 'orders' sync failures", + "Run: da server logs scheduler --since 24h | grep -i error" + ] +} +``` + +### Task 5E: Operační skills pro multi-customer [DEPENDS ON 3C] + +```bash +da infra list # seznam zákaznických instancí +da infra provision --customer acme --cloud gcp --region europe-west1 +da infra status acme # zdraví zákaznické instance +da infra deploy acme # deploy na zákaznický server +da infra backup acme # snapshot dat +``` + +Budoucí rozšíření — Terraform pod kapotou pro provision, Kamal pro deploy. + +--- + +## Verifikace + +### Per-fáze +1. **Fáze 0:** `pytest tests/` zelený, webapp funguje identicky s DuckDB backendem +2. **Fáze 1:** `curl /api/health` → ok, `curl /api/sync/manifest` → manifest, parquet download funguje +3. **Fáze 2:** `da login && da sync` vytvoří identickou strukturu jako `sync_data.sh`, `da query` funguje offline +4. **Fáze 3:** `docker compose up` → všechny služby běží, `kamal deploy -d staging` → staging funguje +5. **Fáze 4:** viewer nemůže triggerovat sync, admin může spravovat uživatele, skripty běží v sandboxu + +### End-to-end test (celý flow) +1. `docker compose up -d` (nebo `kamal deploy`) +2. Přes webapp: přihlásit se, vybrat datasety +3. `da login && da sync` → parquety lokálně +4. `da query "SELECT count(*) FROM orders"` → výsledek offline +5. `da scripts run sales_alert` → lokální exekuce +6. `da scripts deploy sales_alert --schedule "0 8 * * MON"` → serverová exekuce +7. `da sync --upload-only` → sessions/artifacts na serveru +8. Corporate memory: knowledge items viditelné ve webappu +9. Telegram notifikace doručeny +10. `da diagnose` → strukturovaný health report diff --git a/docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md b/docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md new file mode 100644 index 0000000..6c0cb03 --- /dev/null +++ b/docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md @@ -0,0 +1,1574 @@ +# DuckDB State Layer — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace all JSON file-based state with DuckDB, eliminating filesystem permission conflicts and enabling agent-queryable system state. + +**Architecture:** A `src/db.py` module manages DuckDB connections and schema versioning. Repository classes in `src/repositories/` wrap all CRUD operations. Existing service files swap `_read_json`/`_write_json` for repository method calls. Dual-write (JSON + DuckDB) during transition, then JSON removal. + +**Tech Stack:** DuckDB >=1.1, Python 3.11+, uv for package management + +**Design spec:** `docs/superpowers/specs/2026-03-27-refactoring-design.md` sections 3 (Data Layer) + +--- + +## File Structure + +### New files +| File | Responsibility | +|------|----------------| +| `src/db.py` | DuckDB connection factory, schema creation, migration | +| `src/repositories/__init__.py` | Re-export all repositories, `get_system_db()` factory | +| `src/repositories/users.py` | UserRepository — CRUD users table | +| `src/repositories/sync_state.py` | SyncStateRepository — sync state + history | +| `src/repositories/knowledge.py` | KnowledgeRepository — items + votes | +| `src/repositories/audit.py` | AuditRepository — append-only audit log | +| `src/repositories/notifications.py` | TelegramRepository, PendingCodeRepository, ScriptRegistry | +| `src/repositories/table_registry.py` | TableRegistryRepository | +| `src/repositories/profiles.py` | ProfileRepository | +| `scripts/migrate_json_to_duckdb.py` | One-time migration from JSON files to DuckDB | +| `tests/test_db.py` | Tests for db module | +| `tests/test_repositories.py` | Tests for all repositories | + +### Modified files +| File | What changes | +|------|-------------| +| `webapp/sync_settings_service.py` | `_read_json`/`_write_json` (lines 40-62) → SyncSettingsRepository | +| `webapp/corporate_memory_service.py` | `_read_json`/`_write_json` (lines 222-244) → KnowledgeRepository | +| `webapp/telegram_service.py` | `_read_json`/`_write_json` (lines 21-45) → TelegramRepository | +| `webapp/desktop_auth.py` | `_read_json`/`_write_json` (lines 33-57) → UserRepository | +| `src/data_sync.py` | SyncState class (lines 37-139) → SyncStateRepository | +| `src/table_registry.py` | `_atomic_write_json` (line 43) → TableRegistryRepository | +| `src/profiler.py` | profiles.json output (line 92) → ProfileRepository | +| `services/corporate_memory/collector.py` | `_read_json`/`_write_json` (lines 100-123) → KnowledgeRepository | +| `services/telegram_bot/storage.py` | `_read_json`/`_write_json` (lines 21-43) → TelegramRepository | +| `requirements.txt` | Ensure duckdb>=1.1 | + +--- + +### Task 1: DuckDB connection management + schema + +**Files:** +- Create: `src/db.py` +- Create: `tests/test_db.py` + +- [ ] **Step 1: Write the failing test for get_system_db** + +```python +# tests/test_db.py +import tempfile +import os +import duckdb +import pytest + + +def test_get_system_db_creates_database(): + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db + conn = get_system_db() + assert conn is not None + # Verify tables exist + tables = conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='main'").fetchall() + table_names = {t[0] for t in tables} + assert "users" in table_names + assert "sync_state" in table_names + assert "knowledge_items" in table_names + assert "audit_log" in table_names + conn.close() + + +def test_get_system_db_is_idempotent(): + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db + conn1 = get_system_db() + conn1.execute("INSERT INTO users (id, email, name, role) VALUES ('u1', 'test@test.com', 'Test', 'analyst')") + conn1.close() + conn2 = get_system_db() + result = conn2.execute("SELECT email FROM users WHERE id='u1'").fetchone() + assert result[0] == "test@test.com" + conn2.close() + + +def test_schema_version_tracked(): + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db, get_schema_version + conn = get_system_db() + version = get_schema_version(conn) + assert version == 1 + conn.close() +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd "/Users/zdeneksrotyr/Library/Mobile Documents/com~apple~CloudDocs/Sources/VsCode/component_factory/tmp_oss" && python -m pytest tests/test_db.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'src.db'` + +- [ ] **Step 3: Implement src/db.py** + +```python +# src/db.py +""" +DuckDB connection management and schema versioning. + +Provides get_system_db() for the system state database +and get_analytics_db() for the analytics database with parquet views. +""" + +import os +from pathlib import Path + +import duckdb + +SCHEMA_VERSION = 1 + +_SYSTEM_SCHEMA = """ +-- Schema versioning +CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER NOT NULL, + applied_at TIMESTAMP DEFAULT current_timestamp +); + +-- Users & auth +CREATE TABLE IF NOT EXISTS users ( + id VARCHAR PRIMARY KEY, + email VARCHAR UNIQUE NOT NULL, + name VARCHAR, + role VARCHAR DEFAULT 'analyst', + password_hash VARCHAR, + setup_token VARCHAR, + setup_token_created TIMESTAMP, + reset_token VARCHAR, + reset_token_created TIMESTAMP, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP +); + +-- Sync state +CREATE TABLE IF NOT EXISTS sync_state ( + table_id VARCHAR PRIMARY KEY, + last_sync TIMESTAMP, + rows BIGINT, + file_size_bytes BIGINT, + uncompressed_size_bytes BIGINT, + columns INTEGER, + hash VARCHAR, + status VARCHAR DEFAULT 'ok', + error TEXT +); + +CREATE TABLE IF NOT EXISTS sync_history ( + id VARCHAR PRIMARY KEY, + table_id VARCHAR NOT NULL, + synced_at TIMESTAMP NOT NULL, + rows BIGINT, + duration_ms INTEGER, + status VARCHAR, + error TEXT +); + +-- User sync settings +CREATE TABLE IF NOT EXISTS user_sync_settings ( + user_id VARCHAR NOT NULL, + dataset VARCHAR NOT NULL, + enabled BOOLEAN DEFAULT false, + table_mode VARCHAR DEFAULT 'all', + tables JSON, + updated_at TIMESTAMP, + PRIMARY KEY (user_id, dataset) +); + +-- Corporate memory +CREATE TABLE IF NOT EXISTS knowledge_items ( + id VARCHAR PRIMARY KEY, + title VARCHAR NOT NULL, + content TEXT, + category VARCHAR, + tags JSON, + status VARCHAR DEFAULT 'pending', + contributors JSON, + source_user VARCHAR, + audience VARCHAR, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS knowledge_votes ( + item_id VARCHAR NOT NULL, + user_id VARCHAR NOT NULL, + vote INTEGER, + voted_at TIMESTAMP DEFAULT current_timestamp, + PRIMARY KEY (item_id, user_id) +); + +-- Audit log +CREATE TABLE IF NOT EXISTS audit_log ( + id VARCHAR PRIMARY KEY, + timestamp TIMESTAMP NOT NULL DEFAULT current_timestamp, + user_id VARCHAR, + action VARCHAR NOT NULL, + resource VARCHAR, + params JSON, + result VARCHAR, + duration_ms INTEGER +); + +-- Notifications +CREATE TABLE IF NOT EXISTS telegram_links ( + user_id VARCHAR PRIMARY KEY, + chat_id BIGINT NOT NULL, + linked_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS pending_codes ( + code VARCHAR PRIMARY KEY, + chat_id BIGINT NOT NULL, + created_at TIMESTAMP DEFAULT current_timestamp +); + +-- Scripts +CREATE TABLE IF NOT EXISTS script_registry ( + id VARCHAR PRIMARY KEY, + name VARCHAR NOT NULL, + owner VARCHAR, + schedule VARCHAR, + source TEXT NOT NULL, + deployed_at TIMESTAMP DEFAULT current_timestamp, + last_run TIMESTAMP, + last_status VARCHAR +); + +-- Table registry +CREATE TABLE IF NOT EXISTS table_registry ( + id VARCHAR PRIMARY KEY, + name VARCHAR NOT NULL, + folder VARCHAR, + sync_strategy VARCHAR, + primary_key VARCHAR, + description TEXT, + registered_by VARCHAR, + registered_at TIMESTAMP DEFAULT current_timestamp +); + +-- Profiles +CREATE TABLE IF NOT EXISTS table_profiles ( + table_id VARCHAR PRIMARY KEY, + profile JSON NOT NULL, + profiled_at TIMESTAMP DEFAULT current_timestamp +); + +-- Dataset permissions +CREATE TABLE IF NOT EXISTS dataset_permissions ( + user_id VARCHAR NOT NULL, + dataset VARCHAR NOT NULL, + access VARCHAR DEFAULT 'read', + PRIMARY KEY (user_id, dataset) +); +""" + + +def _get_data_dir() -> Path: + return Path(os.environ.get("DATA_DIR", "./data")) + + +def get_system_db() -> duckdb.DuckDBPyConnection: + """Get a connection to the system state database. Creates schema if needed.""" + db_path = _get_data_dir() / "state" / "system.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = duckdb.connect(str(db_path)) + _ensure_schema(conn) + return conn + + +def get_analytics_db() -> duckdb.DuckDBPyConnection: + """Get a connection to the analytics database (parquet views).""" + db_path = _get_data_dir() / "analytics" / "server.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + return duckdb.connect(str(db_path)) + + +def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: + """Create tables if they don't exist. Apply migrations if schema version changed.""" + current = get_schema_version(conn) + if current < SCHEMA_VERSION: + conn.execute(_SYSTEM_SCHEMA) + if current == 0: + conn.execute( + "INSERT INTO schema_version (version) VALUES (?)", + [SCHEMA_VERSION], + ) + else: + conn.execute( + "UPDATE schema_version SET version = ?, applied_at = current_timestamp", + [SCHEMA_VERSION], + ) + + +def get_schema_version(conn: duckdb.DuckDBPyConnection) -> int: + """Get current schema version. Returns 0 if no schema exists.""" + try: + result = conn.execute("SELECT MAX(version) FROM schema_version").fetchone() + return result[0] if result and result[0] else 0 + except duckdb.CatalogException: + return 0 +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd "/Users/zdeneksrotyr/Library/Mobile Documents/com~apple~CloudDocs/Sources/VsCode/component_factory/tmp_oss" && python -m pytest tests/test_db.py -v` +Expected: 3 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/db.py tests/test_db.py +git commit -m "feat: add DuckDB state layer with schema management" +``` + +--- + +### Task 2: SyncState repository + +**Files:** +- Create: `src/repositories/__init__.py` +- Create: `src/repositories/sync_state.py` +- Create: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_repositories.py +import tempfile +import os +from datetime import datetime, timezone + +import pytest + + +@pytest.fixture +def db_conn(): + """Provide a fresh in-memory DuckDB with schema.""" + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db + conn = get_system_db() + yield conn + conn.close() + + +class TestSyncStateRepository: + def test_update_and_get(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync( + table_id="orders", + rows=1000, + file_size_bytes=5000, + hash="abc123", + ) + state = repo.get_table_state("orders") + assert state is not None + assert state["rows"] == 1000 + assert state["hash"] == "abc123" + assert state["status"] == "ok" + + def test_get_nonexistent_returns_none(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + assert repo.get_table_state("nonexistent") is None + + def test_get_last_sync(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync(table_id="orders", rows=100, file_size_bytes=500, hash="h1") + last = repo.get_last_sync("orders") + assert last is not None + + def test_get_all_states(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync(table_id="orders", rows=100, file_size_bytes=500, hash="h1") + repo.update_sync(table_id="customers", rows=50, file_size_bytes=200, hash="h2") + all_states = repo.get_all_states() + assert len(all_states) == 2 + + def test_history_recorded(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync(table_id="orders", rows=100, file_size_bytes=500, hash="h1") + repo.update_sync(table_id="orders", rows=200, file_size_bytes=800, hash="h2") + history = repo.get_sync_history("orders", limit=10) + assert len(history) == 2 + assert history[0]["rows"] == 200 # newest first + + def test_update_with_error(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync( + table_id="orders", rows=0, file_size_bytes=0, hash="", + status="error", error="Connection timeout", + ) + state = repo.get_table_state("orders") + assert state["status"] == "error" + assert state["error"] == "Connection timeout" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestSyncStateRepository -v` +Expected: FAIL — `ModuleNotFoundError` + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/__init__.py +""" +Repository layer for DuckDB state management. + +All system state CRUD goes through repository classes. +""" + +from src.db import get_system_db, get_analytics_db + +__all__ = ["get_system_db", "get_analytics_db"] +``` + +```python +# src/repositories/sync_state.py +"""Repository for sync state and history.""" + +import uuid +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class SyncStateRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def get_table_state(self, table_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM sync_state WHERE table_id = ?", [table_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def get_last_sync(self, table_id: str) -> datetime | None: + result = self.conn.execute( + "SELECT last_sync FROM sync_state WHERE table_id = ?", [table_id] + ).fetchone() + return result[0] if result else None + + def get_all_states(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM sync_state ORDER BY table_id").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def update_sync( + self, + table_id: str, + rows: int, + file_size_bytes: int, + hash: str, + uncompressed_size_bytes: int = 0, + columns: int = 0, + status: str = "ok", + error: str | None = None, + duration_ms: int | None = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO sync_state (table_id, last_sync, rows, file_size_bytes, + uncompressed_size_bytes, columns, hash, status, error) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (table_id) DO UPDATE SET + last_sync = excluded.last_sync, + rows = excluded.rows, + file_size_bytes = excluded.file_size_bytes, + uncompressed_size_bytes = excluded.uncompressed_size_bytes, + columns = excluded.columns, + hash = excluded.hash, + status = excluded.status, + error = excluded.error""", + [table_id, now, rows, file_size_bytes, uncompressed_size_bytes, + columns, hash, status, error], + ) + # Record history + self.conn.execute( + """INSERT INTO sync_history (id, table_id, synced_at, rows, duration_ms, status, error) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [str(uuid.uuid4()), table_id, now, rows, duration_ms, status, error], + ) + + def get_sync_history(self, table_id: str, limit: int = 10) -> list[dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM sync_history WHERE table_id = ? ORDER BY synced_at DESC LIMIT ?", + [table_id, limit], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestSyncStateRepository -v` +Expected: 6 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/__init__.py src/repositories/sync_state.py tests/test_repositories.py +git commit -m "feat: add SyncStateRepository with history tracking" +``` + +--- + +### Task 3: Users repository + +**Files:** +- Create: `src/repositories/users.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestUserRepository: + def test_create_and_get(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test User", role="analyst") + user = repo.get_by_id("u1") + assert user is not None + assert user["email"] == "test@acme.com" + assert user["role"] == "analyst" + + def test_get_by_email(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test User") + user = repo.get_by_email("test@acme.com") + assert user is not None + assert user["id"] == "u1" + + def test_get_nonexistent(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + assert repo.get_by_id("nope") is None + assert repo.get_by_email("nope@nope.com") is None + + def test_list_all(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="a@acme.com", name="A") + repo.create(id="u2", email="b@acme.com", name="B") + users = repo.list_all() + assert len(users) == 2 + + def test_update_role(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test") + repo.update(id="u1", role="admin") + user = repo.get_by_id("u1") + assert user["role"] == "admin" + + def test_delete(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test") + repo.delete("u1") + assert repo.get_by_id("u1") is None + + def test_set_password_hash(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test") + repo.update(id="u1", password_hash="$argon2id$hashed") + user = repo.get_by_id("u1") + assert user["password_hash"] == "$argon2id$hashed" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestUserRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/users.py +"""Repository for user management.""" + +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class UserRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> dict[str, Any] | None: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def get_by_id(self, user_id: str) -> dict[str, Any] | None: + result = self.conn.execute("SELECT * FROM users WHERE id = ?", [user_id]).fetchone() + return self._row_to_dict(result) + + def get_by_email(self, email: str) -> dict[str, Any] | None: + result = self.conn.execute("SELECT * FROM users WHERE email = ?", [email]).fetchone() + return self._row_to_dict(result) + + def list_all(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM users ORDER BY email").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def create( + self, + id: str, + email: str, + name: str, + role: str = "analyst", + password_hash: str | None = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO users (id, email, name, role, password_hash, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [id, email, name, role, password_hash, now, now], + ) + + def update(self, id: str, **kwargs) -> None: + allowed = {"email", "name", "role", "password_hash", "setup_token", + "setup_token_created", "reset_token", "reset_token_created"} + updates = {k: v for k, v in kwargs.items() if k in allowed} + if not updates: + return + updates["updated_at"] = datetime.now(timezone.utc) + set_clause = ", ".join(f"{k} = ?" for k in updates) + values = list(updates.values()) + [id] + self.conn.execute(f"UPDATE users SET {set_clause} WHERE id = ?", values) + + def delete(self, user_id: str) -> None: + self.conn.execute("DELETE FROM users WHERE id = ?", [user_id]) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestUserRepository -v` +Expected: 7 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/users.py tests/test_repositories.py +git commit -m "feat: add UserRepository with CRUD operations" +``` + +--- + +### Task 4: Knowledge repository + +**Files:** +- Create: `src/repositories/knowledge.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestKnowledgeRepository: + def test_create_and_get(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="MRR Definition", content="Monthly recurring...", + category="metrics", source_user="petr@acme.com") + item = repo.get_by_id("k1") + assert item is not None + assert item["title"] == "MRR Definition" + assert item["status"] == "pending" + + def test_list_by_status(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="A", content="a", category="c") + repo.create(id="k2", title="B", content="b", category="c") + repo.update_status("k1", "approved") + approved = repo.list_items(statuses=["approved"]) + assert len(approved) == 1 + assert approved[0]["id"] == "k1" + + def test_vote(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="A", content="a", category="c") + repo.vote("k1", "user1", 1) + repo.vote("k1", "user2", -1) + votes = repo.get_votes("k1") + assert votes["upvotes"] == 1 + assert votes["downvotes"] == 1 + + def test_vote_replace(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="A", content="a", category="c") + repo.vote("k1", "user1", 1) + repo.vote("k1", "user1", -1) # change vote + votes = repo.get_votes("k1") + assert votes["upvotes"] == 0 + assert votes["downvotes"] == 1 + + def test_search(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="Revenue metrics", content="MRR definition", category="metrics") + repo.create(id="k2", title="Support SLA", content="Response times", category="support") + results = repo.search("revenue") + assert len(results) == 1 + assert results[0]["id"] == "k1" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestKnowledgeRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/knowledge.py +"""Repository for corporate memory knowledge items and votes.""" + +import json +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class KnowledgeRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> dict[str, Any] | None: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def _rows_to_dicts(self, rows) -> list[dict[str, Any]]: + if not rows: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in rows] + + def get_by_id(self, item_id: str) -> dict[str, Any] | None: + result = self.conn.execute("SELECT * FROM knowledge_items WHERE id = ?", [item_id]).fetchone() + return self._row_to_dict(result) + + def create( + self, + id: str, + title: str, + content: str, + category: str, + source_user: str | None = None, + tags: list[str] | None = None, + status: str = "pending", + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO knowledge_items (id, title, content, category, source_user, + tags, status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + [id, title, content, category, source_user, + json.dumps(tags) if tags else None, status, now, now], + ) + + def update_status(self, item_id: str, status: str) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + "UPDATE knowledge_items SET status = ?, updated_at = ? WHERE id = ?", + [status, now, item_id], + ) + + def list_items( + self, + statuses: list[str] | None = None, + category: str | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict[str, Any]]: + query = "SELECT * FROM knowledge_items WHERE 1=1" + params: list[Any] = [] + if statuses: + placeholders = ", ".join("?" for _ in statuses) + query += f" AND status IN ({placeholders})" + params.extend(statuses) + if category: + query += " AND category = ?" + params.append(category) + query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + return self._rows_to_dicts(self.conn.execute(query, params).fetchall()) + + def search(self, query: str) -> list[dict[str, Any]]: + pattern = f"%{query}%" + results = self.conn.execute( + """SELECT * FROM knowledge_items + WHERE title ILIKE ? OR content ILIKE ? + ORDER BY updated_at DESC""", + [pattern, pattern], + ).fetchall() + return self._rows_to_dicts(results) + + def vote(self, item_id: str, user_id: str, vote: int) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO knowledge_votes (item_id, user_id, vote, voted_at) + VALUES (?, ?, ?, ?) + ON CONFLICT (item_id, user_id) DO UPDATE SET vote = excluded.vote, voted_at = excluded.voted_at""", + [item_id, user_id, vote, now], + ) + + def get_votes(self, item_id: str) -> dict[str, int]: + result = self.conn.execute( + """SELECT + COALESCE(SUM(CASE WHEN vote > 0 THEN 1 ELSE 0 END), 0) as upvotes, + COALESCE(SUM(CASE WHEN vote < 0 THEN 1 ELSE 0 END), 0) as downvotes + FROM knowledge_votes WHERE item_id = ?""", + [item_id], + ).fetchone() + return {"upvotes": result[0], "downvotes": result[1]} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestKnowledgeRepository -v` +Expected: 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/knowledge.py tests/test_repositories.py +git commit -m "feat: add KnowledgeRepository with voting and search" +``` + +--- + +### Task 5: Audit repository + +**Files:** +- Create: `src/repositories/audit.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestAuditRepository: + def test_log_and_query(self, db_conn): + from src.repositories.audit import AuditRepository + repo = AuditRepository(db_conn) + repo.log(user_id="u1", action="sync_trigger", resource="orders", + params={"force": True}, result="ok", duration_ms=1200) + entries = repo.query(limit=10) + assert len(entries) == 1 + assert entries[0]["action"] == "sync_trigger" + assert entries[0]["duration_ms"] == 1200 + + def test_query_by_action(self, db_conn): + from src.repositories.audit import AuditRepository + repo = AuditRepository(db_conn) + repo.log(user_id="u1", action="sync_trigger", resource="orders") + repo.log(user_id="u1", action="login", resource=None) + entries = repo.query(action="sync_trigger") + assert len(entries) == 1 + + def test_query_by_user(self, db_conn): + from src.repositories.audit import AuditRepository + repo = AuditRepository(db_conn) + repo.log(user_id="u1", action="sync_trigger", resource="orders") + repo.log(user_id="u2", action="sync_trigger", resource="customers") + entries = repo.query(user_id="u1") + assert len(entries) == 1 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestAuditRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/audit.py +"""Repository for audit logging.""" + +import json +import uuid +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class AuditRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def log( + self, + user_id: str | None = None, + action: str = "", + resource: str | None = None, + params: dict | None = None, + result: str | None = None, + duration_ms: int | None = None, + ) -> str: + entry_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO audit_log (id, timestamp, user_id, action, resource, params, result, duration_ms) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + [entry_id, now, user_id, action, resource, + json.dumps(params) if params else None, result, duration_ms], + ) + return entry_id + + def query( + self, + user_id: str | None = None, + action: str | None = None, + limit: int = 50, + ) -> list[dict[str, Any]]: + sql = "SELECT * FROM audit_log WHERE 1=1" + params: list[Any] = [] + if user_id: + sql += " AND user_id = ?" + params.append(user_id) + if action: + sql += " AND action = ?" + params.append(action) + sql += " ORDER BY timestamp DESC LIMIT ?" + params.append(limit) + results = self.conn.execute(sql, params).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestAuditRepository -v` +Expected: 3 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/audit.py tests/test_repositories.py +git commit -m "feat: add AuditRepository with query filtering" +``` + +--- + +### Task 6: Notifications repository (Telegram + Scripts) + +**Files:** +- Create: `src/repositories/notifications.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestNotificationsRepository: + def test_telegram_link_and_get(self, db_conn): + from src.repositories.notifications import TelegramRepository + repo = TelegramRepository(db_conn) + repo.link_user("u1", chat_id=12345) + link = repo.get_link("u1") + assert link is not None + assert link["chat_id"] == 12345 + + def test_telegram_unlink(self, db_conn): + from src.repositories.notifications import TelegramRepository + repo = TelegramRepository(db_conn) + repo.link_user("u1", chat_id=12345) + repo.unlink_user("u1") + assert repo.get_link("u1") is None + + def test_pending_code_create_and_verify(self, db_conn): + from src.repositories.notifications import PendingCodeRepository + repo = PendingCodeRepository(db_conn) + repo.create_code("ABC123", chat_id=12345) + code = repo.verify_code("ABC123") + assert code is not None + assert code["chat_id"] == 12345 + # Code consumed after verify + assert repo.verify_code("ABC123") is None + + def test_script_registry(self, db_conn): + from src.repositories.notifications import ScriptRepository + repo = ScriptRepository(db_conn) + repo.deploy("s1", name="sales_alert", owner="u1", + schedule="0 8 * * MON", source="print('hello')") + script = repo.get("s1") + assert script is not None + assert script["schedule"] == "0 8 * * MON" + all_scripts = repo.list_all() + assert len(all_scripts) == 1 + + def test_script_undeploy(self, db_conn): + from src.repositories.notifications import ScriptRepository + repo = ScriptRepository(db_conn) + repo.deploy("s1", name="test", owner="u1", source="pass") + repo.undeploy("s1") + assert repo.get("s1") is None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestNotificationsRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/notifications.py +"""Repositories for Telegram links, pending codes, and script registry.""" + +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class TelegramRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def link_user(self, user_id: str, chat_id: int) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO telegram_links (user_id, chat_id, linked_at) + VALUES (?, ?, ?) + ON CONFLICT (user_id) DO UPDATE SET chat_id = excluded.chat_id, linked_at = excluded.linked_at""", + [user_id, chat_id, now], + ) + + def unlink_user(self, user_id: str) -> None: + self.conn.execute("DELETE FROM telegram_links WHERE user_id = ?", [user_id]) + + def get_link(self, user_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM telegram_links WHERE user_id = ?", [user_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def get_all_links(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM telegram_links").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + +class PendingCodeRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def create_code(self, code: str, chat_id: int) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + "INSERT INTO pending_codes (code, chat_id, created_at) VALUES (?, ?, ?)", + [code, chat_id, now], + ) + + def verify_code(self, code: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM pending_codes WHERE code = ?", [code] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + row = dict(zip(columns, result)) + self.conn.execute("DELETE FROM pending_codes WHERE code = ?", [code]) + return row + + +class ScriptRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def deploy( + self, id: str, name: str, owner: str | None = None, + schedule: str | None = None, source: str = "", + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO script_registry (id, name, owner, schedule, source, deployed_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (id) DO UPDATE SET + name = excluded.name, schedule = excluded.schedule, + source = excluded.source, deployed_at = excluded.deployed_at""", + [id, name, owner, schedule, source, now], + ) + + def undeploy(self, script_id: str) -> None: + self.conn.execute("DELETE FROM script_registry WHERE id = ?", [script_id]) + + def get(self, script_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM script_registry WHERE id = ?", [script_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def list_all(self, owner: str | None = None) -> list[dict[str, Any]]: + if owner: + results = self.conn.execute( + "SELECT * FROM script_registry WHERE owner = ? ORDER BY name", [owner] + ).fetchall() + else: + results = self.conn.execute("SELECT * FROM script_registry ORDER BY name").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestNotificationsRepository -v` +Expected: 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/notifications.py tests/test_repositories.py +git commit -m "feat: add Telegram, PendingCode, and Script repositories" +``` + +--- + +### Task 7: Table registry + Profiles repositories + +**Files:** +- Create: `src/repositories/table_registry.py` +- Create: `src/repositories/profiles.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestTableRegistryRepository: + def test_register_and_get(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="orders", name="Orders", folder="sales", + sync_strategy="incremental", registered_by="admin") + table = repo.get("orders") + assert table is not None + assert table["folder"] == "sales" + + def test_list_all(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="t1", name="A", folder="f1") + repo.register(id="t2", name="B", folder="f2") + assert len(repo.list_all()) == 2 + + def test_unregister(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="t1", name="A", folder="f1") + repo.unregister("t1") + assert repo.get("t1") is None + + +class TestProfileRepository: + def test_save_and_get(self, db_conn): + from src.repositories.profiles import ProfileRepository + repo = ProfileRepository(db_conn) + profile_data = {"columns": [{"name": "id", "type": "int"}], "row_count": 1000} + repo.save("orders", profile_data) + profile = repo.get("orders") + assert profile is not None + assert profile["row_count"] == 1000 + + def test_get_all(self, db_conn): + from src.repositories.profiles import ProfileRepository + repo = ProfileRepository(db_conn) + repo.save("t1", {"row_count": 100}) + repo.save("t2", {"row_count": 200}) + all_profiles = repo.get_all() + assert len(all_profiles) == 2 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestTableRegistryRepository tests/test_repositories.py::TestProfileRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repositories** + +```python +# src/repositories/table_registry.py +"""Repository for table registry.""" + +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class TableRegistryRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def register( + self, id: str, name: str, folder: str | None = None, + sync_strategy: str | None = None, primary_key: str | None = None, + description: str | None = None, registered_by: str | None = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO table_registry (id, name, folder, sync_strategy, + primary_key, description, registered_by, registered_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (id) DO UPDATE SET + name = excluded.name, folder = excluded.folder, + sync_strategy = excluded.sync_strategy, primary_key = excluded.primary_key, + description = excluded.description, registered_at = excluded.registered_at""", + [id, name, folder, sync_strategy, primary_key, description, registered_by, now], + ) + + def unregister(self, table_id: str) -> None: + self.conn.execute("DELETE FROM table_registry WHERE id = ?", [table_id]) + + def get(self, table_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM table_registry WHERE id = ?", [table_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def list_all(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM table_registry ORDER BY name").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +```python +# src/repositories/profiles.py +"""Repository for table profiles.""" + +import json +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class ProfileRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def save(self, table_id: str, profile: dict) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO table_profiles (table_id, profile, profiled_at) + VALUES (?, ?, ?) + ON CONFLICT (table_id) DO UPDATE SET + profile = excluded.profile, profiled_at = excluded.profiled_at""", + [table_id, json.dumps(profile), now], + ) + + def get(self, table_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT profile, profiled_at FROM table_profiles WHERE table_id = ?", + [table_id], + ).fetchone() + if not result: + return None + profile = json.loads(result[0]) if isinstance(result[0], str) else result[0] + profile["profiled_at"] = result[1] + return profile + + def get_all(self) -> dict[str, dict]: + results = self.conn.execute( + "SELECT table_id, profile, profiled_at FROM table_profiles ORDER BY table_id" + ).fetchall() + out = {} + for row in results: + profile = json.loads(row[1]) if isinstance(row[1], str) else row[1] + profile["profiled_at"] = row[2] + out[row[0]] = profile + return out +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestTableRegistryRepository tests/test_repositories.py::TestProfileRepository -v` +Expected: 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/table_registry.py src/repositories/profiles.py tests/test_repositories.py +git commit -m "feat: add TableRegistry and Profile repositories" +``` + +--- + +### Task 8: Migration script (JSON → DuckDB) + +**Files:** +- Create: `scripts/migrate_json_to_duckdb.py` +- Create: `tests/test_migration.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_migration.py +import json +import os +import tempfile + +import pytest + + +@pytest.fixture +def migration_env(): + """Create temp dir with sample JSON files mimicking production layout.""" + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = os.path.join(tmpdir, "data") + os.makedirs(os.path.join(data_dir, "notifications"), exist_ok=True) + os.makedirs(os.path.join(data_dir, "corporate-memory"), exist_ok=True) + os.makedirs(os.path.join(data_dir, "auth"), exist_ok=True) + os.makedirs(os.path.join(data_dir, "src_data", "metadata"), exist_ok=True) + + # sync_state.json + with open(os.path.join(data_dir, "src_data", "metadata", "sync_state.json"), "w") as f: + json.dump({ + "tables": { + "orders": {"last_sync": "2026-03-27T08:00:00Z", "rows": 1000, "file_size_bytes": 5000} + } + }, f) + + # sync_settings.json + with open(os.path.join(data_dir, "notifications", "sync_settings.json"), "w") as f: + json.dump({ + "petr": {"datasets": {"sales": True, "support": False}, "updated_at": "2026-03-27"} + }, f) + + # knowledge.json + with open(os.path.join(data_dir, "corporate-memory", "knowledge.json"), "w") as f: + json.dump([ + {"id": "k1", "title": "MRR", "content": "Monthly...", "category": "metrics", + "status": "approved", "contributors": ["petr"]} + ], f) + + # telegram_users.json + with open(os.path.join(data_dir, "notifications", "telegram_users.json"), "w") as f: + json.dump({"petr@acme.com": {"chat_id": 12345, "linked_at": "2026-01-01"}}, f) + + os.environ["DATA_DIR"] = data_dir + yield data_dir + + +def test_migration_runs_without_error(migration_env): + from scripts.migrate_json_to_duckdb import migrate_all + stats = migrate_all(migration_env) + assert stats["sync_state"] == 1 + assert stats["knowledge"] == 1 + assert stats["telegram"] == 1 + + +def test_migration_is_idempotent(migration_env): + from scripts.migrate_json_to_duckdb import migrate_all + stats1 = migrate_all(migration_env) + stats2 = migrate_all(migration_env) + assert stats1["sync_state"] == stats2["sync_state"] +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_migration.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement migration script** + +```python +# scripts/migrate_json_to_duckdb.py +""" +One-time migration: JSON files → DuckDB. + +Usage: python -m scripts.migrate_json_to_duckdb [--data-dir /data] + +Idempotent — safe to run multiple times. Uses UPSERT to avoid duplicates. +""" + +import json +import logging +import os +import sys +from pathlib import Path + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def _load_json(path: str) -> dict | list | None: + try: + with open(path) as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + logger.warning(f"Skipping {path}: {e}") + return None + + +def migrate_all(data_dir: str | None = None) -> dict[str, int]: + if data_dir: + os.environ["DATA_DIR"] = data_dir + data = Path(data_dir or os.environ.get("DATA_DIR", "./data")) + + from src.db import get_system_db + from src.repositories.sync_state import SyncStateRepository + from src.repositories.knowledge import KnowledgeRepository + from src.repositories.notifications import TelegramRepository + from src.repositories.users import UserRepository + + conn = get_system_db() + stats: dict[str, int] = {} + + # 1. Sync state + sync_data = _load_json(str(data / "src_data" / "metadata" / "sync_state.json")) + count = 0 + if sync_data and "tables" in sync_data: + repo = SyncStateRepository(conn) + for table_id, info in sync_data["tables"].items(): + repo.update_sync( + table_id=table_id, + rows=info.get("rows", 0), + file_size_bytes=info.get("file_size_bytes", 0), + hash=info.get("hash", ""), + uncompressed_size_bytes=info.get("uncompressed_size_bytes", 0), + columns=info.get("columns", 0), + ) + count += 1 + stats["sync_state"] = count + logger.info(f"Migrated {count} sync state entries") + + # 2. Knowledge items + knowledge = _load_json(str(data / "corporate-memory" / "knowledge.json")) + count = 0 + if knowledge and isinstance(knowledge, list): + repo = KnowledgeRepository(conn) + for item in knowledge: + repo.create( + id=item.get("id", ""), + title=item.get("title", ""), + content=item.get("content", ""), + category=item.get("category", ""), + source_user=item.get("source_user"), + tags=item.get("tags"), + status=item.get("status", "pending"), + ) + count += 1 + stats["knowledge"] = count + logger.info(f"Migrated {count} knowledge items") + + # 3. Telegram users + telegram = _load_json(str(data / "notifications" / "telegram_users.json")) + count = 0 + if telegram and isinstance(telegram, dict): + repo = TelegramRepository(conn) + for email, info in telegram.items(): + repo.link_user(email, chat_id=info.get("chat_id", 0)) + count += 1 + stats["telegram"] = count + logger.info(f"Migrated {count} telegram links") + + conn.close() + logger.info("Migration complete") + return stats + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="Migrate JSON state to DuckDB") + parser.add_argument("--data-dir", default=None, help="Data directory path") + args = parser.parse_args() + migrate_all(args.data_dir) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_migration.py -v` +Expected: 2 tests PASS + +- [ ] **Step 5: Run all repository tests together** + +Run: `python -m pytest tests/test_db.py tests/test_repositories.py tests/test_migration.py -v` +Expected: All tests PASS (3 + 26 + 2 = 31 tests) + +- [ ] **Step 6: Commit** + +```bash +git add scripts/migrate_json_to_duckdb.py tests/test_migration.py +git commit -m "feat: add JSON to DuckDB migration script" +``` + +--- + +## Summary + +| Task | Files | Tests | Purpose | +|------|-------|-------|---------| +| 1 | `src/db.py` | 3 | DuckDB connection + schema | +| 2 | `src/repositories/sync_state.py` | 6 | Sync state + history | +| 3 | `src/repositories/users.py` | 7 | User CRUD | +| 4 | `src/repositories/knowledge.py` | 5 | Corporate memory + votes | +| 5 | `src/repositories/audit.py` | 3 | Audit logging | +| 6 | `src/repositories/notifications.py` | 5 | Telegram + Scripts | +| 7 | `src/repositories/table_registry.py`, `profiles.py` | 5 | Registry + Profiles | +| 8 | `scripts/migrate_json_to_duckdb.py` | 2 | Migration | +| **Total** | **12 new files** | **36 tests** | | + +After this plan, all state is in DuckDB. The existing service files still use JSON (they'll be rewired in Plan 2: FastAPI Server, which depends on this layer being complete). diff --git a/docs/superpowers/specs/2026-03-27-refactoring-design.md b/docs/superpowers/specs/2026-03-27-refactoring-design.md new file mode 100644 index 0000000..26731d2 --- /dev/null +++ b/docs/superpowers/specs/2026-03-27-refactoring-design.md @@ -0,0 +1,524 @@ +# AI Data Analyst — Refactoring Design Spec + +**Date:** 2026-03-27 +**Status:** Draft +**Target:** Greenfield demo with Keboola internal data + +## 1. Problem Statement + +The platform was built iteratively as an internal tool and needs to become a product for external customers (Groupon, others). Key problems: + +1. **Fragile filesystem state** — 10+ JSON files, permission conflicts between processes (www-data, deploy, root, user) cause outages +2. **No API** — all operations via SSH + bash scripts, no programmatic control +3. **Security via Linux groups** — no real RBAC, SSH keys visible in `ps aux`, root reads user homes +4. **Complex installation** — 10+ manual steps, specific OS requirements, dual-repo pattern with symlinks +5. **Operations nightmare** — scattered scripts, no unified logging/monitoring, creator calls it "duct tape solution" + +The system is designed for AI agents — humans discuss with AI, AI handles everything (user, admin, dev operations). + +**Constraint:** UX must remain identical. Web catalog, data sync, offline Claude Code analysis, Telegram notifications, corporate memory — all preserved. + +## 2. Architecture + +### Target State + +``` +SERVER (Docker + Kamal): +┌──────────────────────────────────────────────────┐ +│ FastAPI Main App (1 process) │ +│ ├── Web UI (Jinja2 templates) │ +│ ├── REST API (/api/*) │ +│ ├── WebSocket (/ws/notifications) │ +│ └── Auth (JWT + pluggable providers) │ +└──────────────────────────────────────────────────┘ +┌─────────────────┐ ┌──────────────────────────────┐ +│ Scheduler sidecar│ │ Telegram bot (optional) │ +│ Calls /api/ │ │ Long-running daemon │ +└─────────────────┘ └──────────────────────────────┘ + +/data/state/system.duckdb ← system state (users, sync, knowledge, audit) +/data/analytics/server.duckdb ← views on parquet files +/data/parquet/** ← data files + +LOCAL (analyst): +┌──────────────────────────────────────────────────┐ +│ da CLI (uv tool install data-analyst-cli) │ +│ user/duckdb/analytics.duckdb ← views + user tbls│ +│ server/parquet/** ← downloaded via da sync │ +│ Claude Code ← works offline with DuckDB │ +└──────────────────────────────────────────────────┘ +``` + +### Key Decisions + +| Decision | Choice | Rationale | +|----------|--------|-----------| +| Web framework | FastAPI only (no Flask) | One framework, OpenAPI auto-schema, async native, Jinja2 support | +| State storage | DuckDB | Already in stack, agent can join state with analytics, better than SQLite for analytical queries | +| CLI tool | `da` via `uv tool install` | AI-agent native interface, no Docker dependency locally | +| Server deploy | Docker + Kamal | Zero-downtime deploys, auto-SSL, simple config | +| Architecture | Hybrid (main app + scheduler sidecar + optional telegram) | 3 containers max, WebSocket in main app | +| Auth providers | All 3 (Google OAuth + Email magic link + Password) | Full compatibility with existing users | +| LLM provider | Configurable in instance.yaml | User chooses: local Ollama, Anthropic, OpenAI, AI Gateway | +| Python tooling | uv everywhere (no pip) | Faster, deterministic, modern | + +## 3. Data Layer + +### Server DuckDB: system.duckdb + +```sql +-- Users & RBAC +CREATE TABLE users ( + id VARCHAR PRIMARY KEY, + email VARCHAR UNIQUE NOT NULL, + name VARCHAR, + role VARCHAR DEFAULT 'analyst', -- viewer, analyst, admin, km_admin + password_hash VARCHAR, + setup_token VARCHAR, + reset_token VARCHAR, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP +); + +CREATE TABLE user_sync_settings ( + user_id VARCHAR REFERENCES users(id), + dataset VARCHAR NOT NULL, + enabled BOOLEAN DEFAULT false, + table_mode VARCHAR DEFAULT 'all', -- all, explicit + tables JSON, + updated_at TIMESTAMP, + PRIMARY KEY (user_id, dataset) +); + +CREATE TABLE dataset_permissions ( + user_id VARCHAR REFERENCES users(id), + dataset VARCHAR NOT NULL, + access VARCHAR DEFAULT 'read', -- read, none + PRIMARY KEY (user_id, dataset) +); + +-- Sync state + history +CREATE TABLE sync_state ( + table_id VARCHAR PRIMARY KEY, + last_sync TIMESTAMP, + rows BIGINT, + file_size_bytes BIGINT, + uncompressed_size_bytes BIGINT, + columns INTEGER, + hash VARCHAR, + status VARCHAR DEFAULT 'ok', + error TEXT +); + +CREATE TABLE sync_history ( + id VARCHAR PRIMARY KEY, + table_id VARCHAR NOT NULL, + synced_at TIMESTAMP NOT NULL, + rows BIGINT, + duration_ms INTEGER, + status VARCHAR, + error TEXT +); + +-- Corporate memory +CREATE TABLE knowledge_items ( + id VARCHAR PRIMARY KEY, + title VARCHAR NOT NULL, + content TEXT, + category VARCHAR, + tags TEXT[], + status VARCHAR DEFAULT 'pending', -- pending, approved, mandatory, rejected + contributors TEXT[], + source_user VARCHAR, + audience VARCHAR, + created_at TIMESTAMP, + updated_at TIMESTAMP +); + +CREATE TABLE knowledge_votes ( + item_id VARCHAR REFERENCES knowledge_items(id), + user_id VARCHAR REFERENCES users(id), + vote INTEGER, -- 1 or -1 + voted_at TIMESTAMP, + PRIMARY KEY (item_id, user_id) +); + +-- Audit +CREATE TABLE audit_log ( + id VARCHAR PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + user_id VARCHAR, + action VARCHAR NOT NULL, + resource VARCHAR, + params JSON, + result VARCHAR, + duration_ms INTEGER +); + +-- Notifications +CREATE TABLE telegram_links ( + user_id VARCHAR PRIMARY KEY REFERENCES users(id), + chat_id BIGINT NOT NULL, + linked_at TIMESTAMP +); + +CREATE TABLE pending_codes ( + code VARCHAR PRIMARY KEY, + chat_id BIGINT NOT NULL, + created_at TIMESTAMP +); + +CREATE TABLE script_registry ( + id VARCHAR PRIMARY KEY, + name VARCHAR NOT NULL, + owner VARCHAR REFERENCES users(id), + schedule VARCHAR, -- cron expression or null + source TEXT NOT NULL, + deployed_at TIMESTAMP, + last_run TIMESTAMP, + last_status VARCHAR +); + +-- Table registry +CREATE TABLE table_registry ( + id VARCHAR PRIMARY KEY, + name VARCHAR NOT NULL, + folder VARCHAR, + sync_strategy VARCHAR, + primary_key VARCHAR, + description TEXT, + registered_by VARCHAR, + registered_at TIMESTAMP +); + +-- Profiles +CREATE TABLE table_profiles ( + table_id VARCHAR PRIMARY KEY, + profile JSON NOT NULL, + profiled_at TIMESTAMP +); +``` + +### Server DuckDB: server.duckdb + +Auto-generated views on parquet files: +```sql +CREATE VIEW orders AS SELECT * FROM read_parquet('/data/parquet/sales/orders.parquet'); +CREATE VIEW customers AS SELECT * FROM read_parquet('/data/parquet/sales/customers.parquet'); +-- Generated from schema.yml by profiler/sync +``` + +### Local DuckDB: analytics.duckdb + +Views on local parquets (generated by `da sync`): +```sql +CREATE VIEW orders AS SELECT * FROM read_parquet('./server/parquet/sales/orders.parquet'); +-- User-created tables survive da sync (rebuild drops only views, not tables) +``` + +### Repository Pattern + +``` +src/repositories/ + __init__.py # get_system_db(), get_analytics_db() factories + users.py # UserRepository (CRUD + role checks) + sync_state.py # SyncStateRepository (state + history) + knowledge.py # KnowledgeRepository (items + votes + governance) + audit.py # AuditRepository (append + query) + scripts.py # ScriptRepository (registry + scheduling) + table_registry.py # TableRegistryRepository + notifications.py # TelegramRepository + PendingCodeRepository +``` + +## 4. API Endpoints + +### FastAPI Router Structure + +``` +app/ + main.py # FastAPI app, lifespan events, middleware + auth/ + router.py # POST /auth/login, /auth/token, /auth/logout + jwt.py # JWT create/verify (PyJWT) + providers/ # Pluggable: google/, email/, password/ + dependencies.py # get_current_user, require_role(Role) + web/ + router.py # Web UI: GET /, /catalog, /memory, /settings... + templates/ # Jinja2 (migrated from webapp/templates/) + static/ # CSS, JS, images + api/ + sync.py # GET /api/sync/manifest, POST /api/sync/trigger + data.py # GET /api/data/{table}/download + query.py # POST /api/query + scripts.py # GET/POST /api/scripts, POST /api/scripts/{id}/run + users.py # CRUD /api/users + settings.py # GET/PUT /api/users/{id}/settings + memory.py # CRUD /api/memory, POST /api/memory/{id}/vote + health.py # GET /api/health + upload.py # POST /api/upload/sessions, /artifacts, /local-md + ws/ + notifications.py # WebSocket /ws/notifications +``` + +### Key Endpoints + +| Endpoint | Method | Auth | Purpose | +|----------|--------|------|---------| +| `/api/sync/manifest` | GET | JWT (analyst+) | Hash-based manifest of all synced data | +| `/api/sync/trigger` | POST | JWT (admin) | Trigger data sync from source | +| `/api/data/{table}/download` | GET | JWT (analyst+) | Stream parquet file (ETag support) | +| `/api/query` | POST | JWT (analyst+) | Execute SQL against server DuckDB | +| `/api/scripts` | GET/POST | JWT (analyst+) | List/deploy user scripts | +| `/api/scripts/{id}/run` | POST | JWT (analyst+) | Execute script in sandbox | +| `/api/users` | GET/POST/DELETE | JWT (admin) | User management | +| `/api/memory` | GET/POST/PUT | JWT (analyst+) | Corporate memory CRUD | +| `/api/health` | GET | none | Structured health check | +| `/api/upload/sessions` | POST | JWT (analyst+) | Upload Claude session transcripts | +| `/api/upload/local-md` | POST | JWT (analyst+) | Upload CLAUDE.local.md content | + +### Sync Protocol + +1. CLI calls `GET /api/sync/manifest` → receives hashes per table/asset +2. CLI compares with local `~/.config/da/sync_state.json` +3. For each changed table: `GET /api/data/{table}/download` → streaming to `./server/parquet/` +4. Download changed docs, rules, profiles, scripts +5. Upload new sessions, artifacts, CLAUDE.local.md content +6. Rebuild local DuckDB views (preserve user-created tables) +7. Update local sync manifest + +## 5. CLI Tool (`da`) + +### Structure + +``` +cli/ + main.py # Typer app, --server/--json global options + config.py # ~/.config/da/ management (token, server URL, sync state) + client.py # httpx async client (JWT auth, retry, streaming, progress bars) + duckdb_local.py # Local DuckDB management (create views, query, explore) + commands/ + auth.py # da login/logout/whoami + sync.py # da sync [--table X] [--upload-only] [--docs-only] + query.py # da query "SQL" [--remote] [--json] [--format csv/table/json] + scripts.py # da scripts list/run/deploy/undeploy + explore.py # da explore {table} + admin.py # da admin add-user/remove-user/list-users/set-role + status.py # da status [--local] [--json] + server.py # da server deploy/rollback/logs/status/backup + setup.py # da setup init/test-connection/deploy/first-sync/verify + diagnose.py # da diagnose [--symptom X] [--component Y] + skills.py # da skills list/show + infra.py # da infra provision/status/deploy (future) + skills/ # Markdown knowledge base for AI agents + setup.md + troubleshoot.md + connectors.md + notifications.md + corporate-memory.md + security.md + backup-restore.md + upgrade.md +``` + +### Distribution + +```toml +[project] +name = "data-analyst-cli" +requires-python = ">=3.11" +dependencies = ["typer>=0.12", "httpx>=0.27", "duckdb>=1.1", "rich>=13", "pyjwt>=2.8"] + +[project.scripts] +da = "cli.main:app" +``` + +Install: `uv tool install data-analyst-cli` + +### Offline Capability + +After `da sync`, everything works without network: +- `da query` → local DuckDB +- `da scripts run` → local Python execution +- `da explore` → local profile data +- `da status --local` → sync timestamps from local manifest + +## 6. Deploy & Infrastructure + +### Docker + +```dockerfile +FROM python:3.13-slim +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv +WORKDIR /app +COPY pyproject.toml uv.lock ./ +RUN uv sync --frozen --no-dev +COPY . . +CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +``` + +### Docker Compose (dev) + +```yaml +services: + app: + build: . + ports: ["8000:8000"] + volumes: [".:/app", "data:/data"] + env_file: .env + command: uv run uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + + scheduler: + build: . + volumes: ["data:/data"] + env_file: .env + command: uv run python -m services.scheduler + + telegram-bot: + build: . + volumes: ["data:/data"] + env_file: .env + command: uv run python -m services.telegram_bot + profiles: ["full"] + +volumes: + data: +``` + +### Scheduler Sidecar + +The scheduler is a lightweight process that triggers jobs by calling the main app's API: + +```python +# services/scheduler/__main__.py +import httpx +from apscheduler.schedulers.blocking import BlockingScheduler + +API_URL = os.environ.get("API_URL", "http://app:8000") +API_TOKEN = os.environ.get("SCHEDULER_API_TOKEN") # internal service token + +scheduler = BlockingScheduler() + +@scheduler.scheduled_job("interval", minutes=15) +def data_refresh(): + httpx.post(f"{API_URL}/api/sync/trigger", headers={"Authorization": f"Bearer {API_TOKEN}"}) + +@scheduler.scheduled_job("interval", minutes=30) +def corporate_memory(): + httpx.post(f"{API_URL}/api/internal/collect-knowledge", headers={"Authorization": f"Bearer {API_TOKEN}"}) + +# ... more jobs +scheduler.start() +``` + +This keeps all business logic in the main app. The scheduler is stateless and restartable. + +### Kamal (production) + +- Auto-SSL via Kamal Proxy (Let's Encrypt) +- Zero-downtime deploy +- Healthcheck on `/api/health` +- Staging: `kamal deploy -d staging` +- Production: `kamal deploy` +- Rollback: `kamal rollback` + +### CI/CD (GitHub Actions) + +``` +push → pytest (unit) → docker compose test (integration) → build+push GHCR +PR → kamal deploy staging +merge main → kamal deploy production +``` + +## 7. Security + +### RBAC + +| Role | Permissions | +|------|-------------| +| `viewer` | Read catalog, view profiles, browse corporate memory | +| `analyst` | + sync data, run queries, vote on knowledge, run/deploy scripts | +| `admin` | + manage users, approve knowledge, trigger sync, view audit | +| `km_admin` | + corporate memory governance (approve/reject/mandate) | + +Dataset-level permissions restrict which datasets each user can access. + +### Auth Flow + +1. Web: user logs in via Google OAuth / Email magic link / Password +2. Server issues JWT (contains: user_id, email, role, exp) +3. CLI: `da login` → OAuth browser flow → JWT stored in `~/.config/da/token.json` +4. All API calls include JWT in Authorization header +5. FastAPI dependency validates JWT + checks role permissions + +### Audit Trail + +Every API call logged to `audit_log` table: +- timestamp, user_id, action, resource, params, result, duration_ms +- Queryable by agent: `da query "SELECT * FROM system.audit_log WHERE ..."` + +### Script Sandboxing + +User scripts run in isolated Docker container: +- Read-only DuckDB access +- Memory limit: 512MB, time limit: 5min +- No network (except notification dispatch) +- Whitelisted Python packages: pandas, duckdb, matplotlib, numpy + +## 8. Testing Strategy + +``` +tests/ + unit/ # No I/O, mocked dependencies + test_repositories.py # In-memory DuckDB + test_sync_logic.py + test_auth.py + test_rbac.py + integration/ # Docker compose, real DuckDB + sample data + test_api_endpoints.py + test_sync_flow.py + test_cli_commands.py + fixtures/ + sample_data/ # Small parquets for testing + instance.yaml # Test config +``` + +## 9. Migration Path + +1. **Greenfield demo** — build new system from scratch with sample Keboola data +2. **Validate** — end-to-end: setup → sync → query → scripts → notifications +3. **Migrate internal** — point new system at Keboola internal, migrate users +4. **Migrate Groupon** — deploy new system for Groupon with their config +5. **Deprecate old** — remove old server infrastructure + +## 10. Reused Code + +| File | Status | Notes | +|------|--------|-------| +| `src/config.py` | Reused as-is | TableConfig, Config parsing | +| `src/parquet_manager.py` | Reused as-is | Parquet conversion | +| `connectors/keboola/` | Reused as-is | Keboola adapter + client | +| `connectors/bigquery/` | Reused as-is | BigQuery adapter + client | +| `connectors/jira/` | Reused as-is | Jira connector | +| `connectors/llm/` | Reused as-is | LLM abstraction | +| `connectors/openmetadata/` | Reused as-is | Catalog enrichment | +| `src/data_sync.py` | Rewired | SyncState → DuckDB repository | +| `src/remote_query.py` | Wrapped | Query logic wrapped by API endpoint | +| `src/profiler.py` | Rewired | Output to DuckDB instead of JSON | +| `src/table_registry.py` | Rewired | JSON → DuckDB repository | +| `webapp/corporate_memory_service.py` | Rewired | Business logic preserved, I/O swapped | +| `webapp/templates/` | Migrated | Jinja2 templates work in FastAPI | +| `auth/` | Migrated | Provider pattern preserved | + +## 11. Deleted Code + +| File | Reason | +|------|--------| +| `server/setup.sh` | Replaced by Docker | +| `server/webapp-setup.sh` | Replaced by Docker + Kamal | +| `server/deploy.sh` | Replaced by Kamal | +| `server/sudoers-*` | No more Linux user management | +| `server/bin/add-analyst` | Replaced by API + CLI | +| `scripts/sync_data.sh` | Replaced by `da sync` | +| `services/*/systemd/` | Replaced by Docker Compose | +| `webapp/user_service.py` | Rewritten for DB-based users | +| `webapp/sync_settings_service.py` (sudo parts) | Replaced by API |