From 1b219cabe92aecd6d729d05acd456cb72a971fb2 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Thu, 9 Apr 2026 06:59:57 +0200 Subject: [PATCH] fix: remove dead PRAGMA enable_wal code DuckDB has used WAL by default since v0.8, so this pragma is not valid DuckDB syntax. Removed obsolete try-except block that attempted to enable WAL on system database initialization. --- app/auth/dependencies.py | 5 +- connectors/bigquery/extractor.py | 8 +- connectors/jira/extract_init.py | 1 + .../plans/2026-03-27-01-duckdb-state-layer.md | 1574 ++++++++++++++ .../plans/2026-03-27-02-complete-system.md | 69 + .../plans/2026-04-08-production-hardening.md | 1840 +++++++++++++++++ src/db.py | 5 - src/orchestrator.py | 3 +- 8 files changed, 3495 insertions(+), 10 deletions(-) create mode 100644 docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md create mode 100644 docs/superpowers/plans/2026-03-27-02-complete-system.md create mode 100644 docs/superpowers/plans/2026-04-08-production-hardening.md diff --git a/app/auth/dependencies.py b/app/auth/dependencies.py index a0ed29f..088b224 100644 --- a/app/auth/dependencies.py +++ b/app/auth/dependencies.py @@ -58,14 +58,13 @@ async def get_current_user( async def get_optional_user( + request: Request = None, authorization: Optional[str] = Header(None), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ) -> Optional[dict]: """Like get_current_user but returns None instead of 401 if no token.""" - if not authorization or not authorization.startswith("Bearer "): - return None try: - return await get_current_user(request=None, authorization=authorization, conn=conn) + return await get_current_user(request=request, authorization=authorization, conn=conn) except HTTPException: return None diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index 638aff3..efbdafc 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -5,6 +5,7 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension import logging import os +import shutil from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Any @@ -64,8 +65,13 @@ def init_extract( output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) + # Write to temp file then rename — avoids lock conflict with orchestrator + # which may hold a read lock on the existing extract.duckdb db_path = output_path / "extract.duckdb" - conn = duckdb.connect(str(db_path)) + tmp_db_path = output_path / "extract.duckdb.tmp" + if tmp_db_path.exists(): + tmp_db_path.unlink() + conn = duckdb.connect(str(tmp_db_path)) stats = {"tables_registered": 0, "errors": []} now = datetime.now(timezone.utc) diff --git a/connectors/jira/extract_init.py b/connectors/jira/extract_init.py index 354cec8..f1f406c 100644 --- a/connectors/jira/extract_init.py +++ b/connectors/jira/extract_init.py @@ -109,6 +109,7 @@ def update_meta(output_dir: str | Path, table_name: str) -> None: "UPDATE _meta SET rows = ?, size_bytes = ?, extracted_at = ? WHERE table_name = ?", [rows, size_bytes, now, table_name], ) + conn.execute("CHECKPOINT") finally: conn.close() diff --git a/docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md b/docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md new file mode 100644 index 0000000..6c0cb03 --- /dev/null +++ b/docs/superpowers/plans/2026-03-27-01-duckdb-state-layer.md @@ -0,0 +1,1574 @@ +# DuckDB State Layer — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace all JSON file-based state with DuckDB, eliminating filesystem permission conflicts and enabling agent-queryable system state. + +**Architecture:** A `src/db.py` module manages DuckDB connections and schema versioning. Repository classes in `src/repositories/` wrap all CRUD operations. Existing service files swap `_read_json`/`_write_json` for repository method calls. Dual-write (JSON + DuckDB) during transition, then JSON removal. + +**Tech Stack:** DuckDB >=1.1, Python 3.11+, uv for package management + +**Design spec:** `docs/superpowers/specs/2026-03-27-refactoring-design.md` sections 3 (Data Layer) + +--- + +## File Structure + +### New files +| File | Responsibility | +|------|----------------| +| `src/db.py` | DuckDB connection factory, schema creation, migration | +| `src/repositories/__init__.py` | Re-export all repositories, `get_system_db()` factory | +| `src/repositories/users.py` | UserRepository — CRUD users table | +| `src/repositories/sync_state.py` | SyncStateRepository — sync state + history | +| `src/repositories/knowledge.py` | KnowledgeRepository — items + votes | +| `src/repositories/audit.py` | AuditRepository — append-only audit log | +| `src/repositories/notifications.py` | TelegramRepository, PendingCodeRepository, ScriptRegistry | +| `src/repositories/table_registry.py` | TableRegistryRepository | +| `src/repositories/profiles.py` | ProfileRepository | +| `scripts/migrate_json_to_duckdb.py` | One-time migration from JSON files to DuckDB | +| `tests/test_db.py` | Tests for db module | +| `tests/test_repositories.py` | Tests for all repositories | + +### Modified files +| File | What changes | +|------|-------------| +| `webapp/sync_settings_service.py` | `_read_json`/`_write_json` (lines 40-62) → SyncSettingsRepository | +| `webapp/corporate_memory_service.py` | `_read_json`/`_write_json` (lines 222-244) → KnowledgeRepository | +| `webapp/telegram_service.py` | `_read_json`/`_write_json` (lines 21-45) → TelegramRepository | +| `webapp/desktop_auth.py` | `_read_json`/`_write_json` (lines 33-57) → UserRepository | +| `src/data_sync.py` | SyncState class (lines 37-139) → SyncStateRepository | +| `src/table_registry.py` | `_atomic_write_json` (line 43) → TableRegistryRepository | +| `src/profiler.py` | profiles.json output (line 92) → ProfileRepository | +| `services/corporate_memory/collector.py` | `_read_json`/`_write_json` (lines 100-123) → KnowledgeRepository | +| `services/telegram_bot/storage.py` | `_read_json`/`_write_json` (lines 21-43) → TelegramRepository | +| `requirements.txt` | Ensure duckdb>=1.1 | + +--- + +### Task 1: DuckDB connection management + schema + +**Files:** +- Create: `src/db.py` +- Create: `tests/test_db.py` + +- [ ] **Step 1: Write the failing test for get_system_db** + +```python +# tests/test_db.py +import tempfile +import os +import duckdb +import pytest + + +def test_get_system_db_creates_database(): + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db + conn = get_system_db() + assert conn is not None + # Verify tables exist + tables = conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='main'").fetchall() + table_names = {t[0] for t in tables} + assert "users" in table_names + assert "sync_state" in table_names + assert "knowledge_items" in table_names + assert "audit_log" in table_names + conn.close() + + +def test_get_system_db_is_idempotent(): + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db + conn1 = get_system_db() + conn1.execute("INSERT INTO users (id, email, name, role) VALUES ('u1', 'test@test.com', 'Test', 'analyst')") + conn1.close() + conn2 = get_system_db() + result = conn2.execute("SELECT email FROM users WHERE id='u1'").fetchone() + assert result[0] == "test@test.com" + conn2.close() + + +def test_schema_version_tracked(): + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db, get_schema_version + conn = get_system_db() + version = get_schema_version(conn) + assert version == 1 + conn.close() +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cd "/Users/zdeneksrotyr/Library/Mobile Documents/com~apple~CloudDocs/Sources/VsCode/component_factory/tmp_oss" && python -m pytest tests/test_db.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'src.db'` + +- [ ] **Step 3: Implement src/db.py** + +```python +# src/db.py +""" +DuckDB connection management and schema versioning. + +Provides get_system_db() for the system state database +and get_analytics_db() for the analytics database with parquet views. +""" + +import os +from pathlib import Path + +import duckdb + +SCHEMA_VERSION = 1 + +_SYSTEM_SCHEMA = """ +-- Schema versioning +CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER NOT NULL, + applied_at TIMESTAMP DEFAULT current_timestamp +); + +-- Users & auth +CREATE TABLE IF NOT EXISTS users ( + id VARCHAR PRIMARY KEY, + email VARCHAR UNIQUE NOT NULL, + name VARCHAR, + role VARCHAR DEFAULT 'analyst', + password_hash VARCHAR, + setup_token VARCHAR, + setup_token_created TIMESTAMP, + reset_token VARCHAR, + reset_token_created TIMESTAMP, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP +); + +-- Sync state +CREATE TABLE IF NOT EXISTS sync_state ( + table_id VARCHAR PRIMARY KEY, + last_sync TIMESTAMP, + rows BIGINT, + file_size_bytes BIGINT, + uncompressed_size_bytes BIGINT, + columns INTEGER, + hash VARCHAR, + status VARCHAR DEFAULT 'ok', + error TEXT +); + +CREATE TABLE IF NOT EXISTS sync_history ( + id VARCHAR PRIMARY KEY, + table_id VARCHAR NOT NULL, + synced_at TIMESTAMP NOT NULL, + rows BIGINT, + duration_ms INTEGER, + status VARCHAR, + error TEXT +); + +-- User sync settings +CREATE TABLE IF NOT EXISTS user_sync_settings ( + user_id VARCHAR NOT NULL, + dataset VARCHAR NOT NULL, + enabled BOOLEAN DEFAULT false, + table_mode VARCHAR DEFAULT 'all', + tables JSON, + updated_at TIMESTAMP, + PRIMARY KEY (user_id, dataset) +); + +-- Corporate memory +CREATE TABLE IF NOT EXISTS knowledge_items ( + id VARCHAR PRIMARY KEY, + title VARCHAR NOT NULL, + content TEXT, + category VARCHAR, + tags JSON, + status VARCHAR DEFAULT 'pending', + contributors JSON, + source_user VARCHAR, + audience VARCHAR, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS knowledge_votes ( + item_id VARCHAR NOT NULL, + user_id VARCHAR NOT NULL, + vote INTEGER, + voted_at TIMESTAMP DEFAULT current_timestamp, + PRIMARY KEY (item_id, user_id) +); + +-- Audit log +CREATE TABLE IF NOT EXISTS audit_log ( + id VARCHAR PRIMARY KEY, + timestamp TIMESTAMP NOT NULL DEFAULT current_timestamp, + user_id VARCHAR, + action VARCHAR NOT NULL, + resource VARCHAR, + params JSON, + result VARCHAR, + duration_ms INTEGER +); + +-- Notifications +CREATE TABLE IF NOT EXISTS telegram_links ( + user_id VARCHAR PRIMARY KEY, + chat_id BIGINT NOT NULL, + linked_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS pending_codes ( + code VARCHAR PRIMARY KEY, + chat_id BIGINT NOT NULL, + created_at TIMESTAMP DEFAULT current_timestamp +); + +-- Scripts +CREATE TABLE IF NOT EXISTS script_registry ( + id VARCHAR PRIMARY KEY, + name VARCHAR NOT NULL, + owner VARCHAR, + schedule VARCHAR, + source TEXT NOT NULL, + deployed_at TIMESTAMP DEFAULT current_timestamp, + last_run TIMESTAMP, + last_status VARCHAR +); + +-- Table registry +CREATE TABLE IF NOT EXISTS table_registry ( + id VARCHAR PRIMARY KEY, + name VARCHAR NOT NULL, + folder VARCHAR, + sync_strategy VARCHAR, + primary_key VARCHAR, + description TEXT, + registered_by VARCHAR, + registered_at TIMESTAMP DEFAULT current_timestamp +); + +-- Profiles +CREATE TABLE IF NOT EXISTS table_profiles ( + table_id VARCHAR PRIMARY KEY, + profile JSON NOT NULL, + profiled_at TIMESTAMP DEFAULT current_timestamp +); + +-- Dataset permissions +CREATE TABLE IF NOT EXISTS dataset_permissions ( + user_id VARCHAR NOT NULL, + dataset VARCHAR NOT NULL, + access VARCHAR DEFAULT 'read', + PRIMARY KEY (user_id, dataset) +); +""" + + +def _get_data_dir() -> Path: + return Path(os.environ.get("DATA_DIR", "./data")) + + +def get_system_db() -> duckdb.DuckDBPyConnection: + """Get a connection to the system state database. Creates schema if needed.""" + db_path = _get_data_dir() / "state" / "system.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = duckdb.connect(str(db_path)) + _ensure_schema(conn) + return conn + + +def get_analytics_db() -> duckdb.DuckDBPyConnection: + """Get a connection to the analytics database (parquet views).""" + db_path = _get_data_dir() / "analytics" / "server.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + return duckdb.connect(str(db_path)) + + +def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: + """Create tables if they don't exist. Apply migrations if schema version changed.""" + current = get_schema_version(conn) + if current < SCHEMA_VERSION: + conn.execute(_SYSTEM_SCHEMA) + if current == 0: + conn.execute( + "INSERT INTO schema_version (version) VALUES (?)", + [SCHEMA_VERSION], + ) + else: + conn.execute( + "UPDATE schema_version SET version = ?, applied_at = current_timestamp", + [SCHEMA_VERSION], + ) + + +def get_schema_version(conn: duckdb.DuckDBPyConnection) -> int: + """Get current schema version. Returns 0 if no schema exists.""" + try: + result = conn.execute("SELECT MAX(version) FROM schema_version").fetchone() + return result[0] if result and result[0] else 0 + except duckdb.CatalogException: + return 0 +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `cd "/Users/zdeneksrotyr/Library/Mobile Documents/com~apple~CloudDocs/Sources/VsCode/component_factory/tmp_oss" && python -m pytest tests/test_db.py -v` +Expected: 3 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/db.py tests/test_db.py +git commit -m "feat: add DuckDB state layer with schema management" +``` + +--- + +### Task 2: SyncState repository + +**Files:** +- Create: `src/repositories/__init__.py` +- Create: `src/repositories/sync_state.py` +- Create: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_repositories.py +import tempfile +import os +from datetime import datetime, timezone + +import pytest + + +@pytest.fixture +def db_conn(): + """Provide a fresh in-memory DuckDB with schema.""" + with tempfile.TemporaryDirectory() as tmpdir: + os.environ["DATA_DIR"] = tmpdir + from src.db import get_system_db + conn = get_system_db() + yield conn + conn.close() + + +class TestSyncStateRepository: + def test_update_and_get(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync( + table_id="orders", + rows=1000, + file_size_bytes=5000, + hash="abc123", + ) + state = repo.get_table_state("orders") + assert state is not None + assert state["rows"] == 1000 + assert state["hash"] == "abc123" + assert state["status"] == "ok" + + def test_get_nonexistent_returns_none(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + assert repo.get_table_state("nonexistent") is None + + def test_get_last_sync(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync(table_id="orders", rows=100, file_size_bytes=500, hash="h1") + last = repo.get_last_sync("orders") + assert last is not None + + def test_get_all_states(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync(table_id="orders", rows=100, file_size_bytes=500, hash="h1") + repo.update_sync(table_id="customers", rows=50, file_size_bytes=200, hash="h2") + all_states = repo.get_all_states() + assert len(all_states) == 2 + + def test_history_recorded(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync(table_id="orders", rows=100, file_size_bytes=500, hash="h1") + repo.update_sync(table_id="orders", rows=200, file_size_bytes=800, hash="h2") + history = repo.get_sync_history("orders", limit=10) + assert len(history) == 2 + assert history[0]["rows"] == 200 # newest first + + def test_update_with_error(self, db_conn): + from src.repositories.sync_state import SyncStateRepository + repo = SyncStateRepository(db_conn) + repo.update_sync( + table_id="orders", rows=0, file_size_bytes=0, hash="", + status="error", error="Connection timeout", + ) + state = repo.get_table_state("orders") + assert state["status"] == "error" + assert state["error"] == "Connection timeout" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestSyncStateRepository -v` +Expected: FAIL — `ModuleNotFoundError` + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/__init__.py +""" +Repository layer for DuckDB state management. + +All system state CRUD goes through repository classes. +""" + +from src.db import get_system_db, get_analytics_db + +__all__ = ["get_system_db", "get_analytics_db"] +``` + +```python +# src/repositories/sync_state.py +"""Repository for sync state and history.""" + +import uuid +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class SyncStateRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def get_table_state(self, table_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM sync_state WHERE table_id = ?", [table_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def get_last_sync(self, table_id: str) -> datetime | None: + result = self.conn.execute( + "SELECT last_sync FROM sync_state WHERE table_id = ?", [table_id] + ).fetchone() + return result[0] if result else None + + def get_all_states(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM sync_state ORDER BY table_id").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def update_sync( + self, + table_id: str, + rows: int, + file_size_bytes: int, + hash: str, + uncompressed_size_bytes: int = 0, + columns: int = 0, + status: str = "ok", + error: str | None = None, + duration_ms: int | None = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO sync_state (table_id, last_sync, rows, file_size_bytes, + uncompressed_size_bytes, columns, hash, status, error) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (table_id) DO UPDATE SET + last_sync = excluded.last_sync, + rows = excluded.rows, + file_size_bytes = excluded.file_size_bytes, + uncompressed_size_bytes = excluded.uncompressed_size_bytes, + columns = excluded.columns, + hash = excluded.hash, + status = excluded.status, + error = excluded.error""", + [table_id, now, rows, file_size_bytes, uncompressed_size_bytes, + columns, hash, status, error], + ) + # Record history + self.conn.execute( + """INSERT INTO sync_history (id, table_id, synced_at, rows, duration_ms, status, error) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [str(uuid.uuid4()), table_id, now, rows, duration_ms, status, error], + ) + + def get_sync_history(self, table_id: str, limit: int = 10) -> list[dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM sync_history WHERE table_id = ? ORDER BY synced_at DESC LIMIT ?", + [table_id, limit], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestSyncStateRepository -v` +Expected: 6 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/__init__.py src/repositories/sync_state.py tests/test_repositories.py +git commit -m "feat: add SyncStateRepository with history tracking" +``` + +--- + +### Task 3: Users repository + +**Files:** +- Create: `src/repositories/users.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestUserRepository: + def test_create_and_get(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test User", role="analyst") + user = repo.get_by_id("u1") + assert user is not None + assert user["email"] == "test@acme.com" + assert user["role"] == "analyst" + + def test_get_by_email(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test User") + user = repo.get_by_email("test@acme.com") + assert user is not None + assert user["id"] == "u1" + + def test_get_nonexistent(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + assert repo.get_by_id("nope") is None + assert repo.get_by_email("nope@nope.com") is None + + def test_list_all(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="a@acme.com", name="A") + repo.create(id="u2", email="b@acme.com", name="B") + users = repo.list_all() + assert len(users) == 2 + + def test_update_role(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test") + repo.update(id="u1", role="admin") + user = repo.get_by_id("u1") + assert user["role"] == "admin" + + def test_delete(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test") + repo.delete("u1") + assert repo.get_by_id("u1") is None + + def test_set_password_hash(self, db_conn): + from src.repositories.users import UserRepository + repo = UserRepository(db_conn) + repo.create(id="u1", email="test@acme.com", name="Test") + repo.update(id="u1", password_hash="$argon2id$hashed") + user = repo.get_by_id("u1") + assert user["password_hash"] == "$argon2id$hashed" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestUserRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/users.py +"""Repository for user management.""" + +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class UserRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> dict[str, Any] | None: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def get_by_id(self, user_id: str) -> dict[str, Any] | None: + result = self.conn.execute("SELECT * FROM users WHERE id = ?", [user_id]).fetchone() + return self._row_to_dict(result) + + def get_by_email(self, email: str) -> dict[str, Any] | None: + result = self.conn.execute("SELECT * FROM users WHERE email = ?", [email]).fetchone() + return self._row_to_dict(result) + + def list_all(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM users ORDER BY email").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def create( + self, + id: str, + email: str, + name: str, + role: str = "analyst", + password_hash: str | None = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO users (id, email, name, role, password_hash, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [id, email, name, role, password_hash, now, now], + ) + + def update(self, id: str, **kwargs) -> None: + allowed = {"email", "name", "role", "password_hash", "setup_token", + "setup_token_created", "reset_token", "reset_token_created"} + updates = {k: v for k, v in kwargs.items() if k in allowed} + if not updates: + return + updates["updated_at"] = datetime.now(timezone.utc) + set_clause = ", ".join(f"{k} = ?" for k in updates) + values = list(updates.values()) + [id] + self.conn.execute(f"UPDATE users SET {set_clause} WHERE id = ?", values) + + def delete(self, user_id: str) -> None: + self.conn.execute("DELETE FROM users WHERE id = ?", [user_id]) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestUserRepository -v` +Expected: 7 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/users.py tests/test_repositories.py +git commit -m "feat: add UserRepository with CRUD operations" +``` + +--- + +### Task 4: Knowledge repository + +**Files:** +- Create: `src/repositories/knowledge.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestKnowledgeRepository: + def test_create_and_get(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="MRR Definition", content="Monthly recurring...", + category="metrics", source_user="petr@acme.com") + item = repo.get_by_id("k1") + assert item is not None + assert item["title"] == "MRR Definition" + assert item["status"] == "pending" + + def test_list_by_status(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="A", content="a", category="c") + repo.create(id="k2", title="B", content="b", category="c") + repo.update_status("k1", "approved") + approved = repo.list_items(statuses=["approved"]) + assert len(approved) == 1 + assert approved[0]["id"] == "k1" + + def test_vote(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="A", content="a", category="c") + repo.vote("k1", "user1", 1) + repo.vote("k1", "user2", -1) + votes = repo.get_votes("k1") + assert votes["upvotes"] == 1 + assert votes["downvotes"] == 1 + + def test_vote_replace(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="A", content="a", category="c") + repo.vote("k1", "user1", 1) + repo.vote("k1", "user1", -1) # change vote + votes = repo.get_votes("k1") + assert votes["upvotes"] == 0 + assert votes["downvotes"] == 1 + + def test_search(self, db_conn): + from src.repositories.knowledge import KnowledgeRepository + repo = KnowledgeRepository(db_conn) + repo.create(id="k1", title="Revenue metrics", content="MRR definition", category="metrics") + repo.create(id="k2", title="Support SLA", content="Response times", category="support") + results = repo.search("revenue") + assert len(results) == 1 + assert results[0]["id"] == "k1" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestKnowledgeRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/knowledge.py +"""Repository for corporate memory knowledge items and votes.""" + +import json +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class KnowledgeRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> dict[str, Any] | None: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def _rows_to_dicts(self, rows) -> list[dict[str, Any]]: + if not rows: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in rows] + + def get_by_id(self, item_id: str) -> dict[str, Any] | None: + result = self.conn.execute("SELECT * FROM knowledge_items WHERE id = ?", [item_id]).fetchone() + return self._row_to_dict(result) + + def create( + self, + id: str, + title: str, + content: str, + category: str, + source_user: str | None = None, + tags: list[str] | None = None, + status: str = "pending", + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO knowledge_items (id, title, content, category, source_user, + tags, status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + [id, title, content, category, source_user, + json.dumps(tags) if tags else None, status, now, now], + ) + + def update_status(self, item_id: str, status: str) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + "UPDATE knowledge_items SET status = ?, updated_at = ? WHERE id = ?", + [status, now, item_id], + ) + + def list_items( + self, + statuses: list[str] | None = None, + category: str | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict[str, Any]]: + query = "SELECT * FROM knowledge_items WHERE 1=1" + params: list[Any] = [] + if statuses: + placeholders = ", ".join("?" for _ in statuses) + query += f" AND status IN ({placeholders})" + params.extend(statuses) + if category: + query += " AND category = ?" + params.append(category) + query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + return self._rows_to_dicts(self.conn.execute(query, params).fetchall()) + + def search(self, query: str) -> list[dict[str, Any]]: + pattern = f"%{query}%" + results = self.conn.execute( + """SELECT * FROM knowledge_items + WHERE title ILIKE ? OR content ILIKE ? + ORDER BY updated_at DESC""", + [pattern, pattern], + ).fetchall() + return self._rows_to_dicts(results) + + def vote(self, item_id: str, user_id: str, vote: int) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO knowledge_votes (item_id, user_id, vote, voted_at) + VALUES (?, ?, ?, ?) + ON CONFLICT (item_id, user_id) DO UPDATE SET vote = excluded.vote, voted_at = excluded.voted_at""", + [item_id, user_id, vote, now], + ) + + def get_votes(self, item_id: str) -> dict[str, int]: + result = self.conn.execute( + """SELECT + COALESCE(SUM(CASE WHEN vote > 0 THEN 1 ELSE 0 END), 0) as upvotes, + COALESCE(SUM(CASE WHEN vote < 0 THEN 1 ELSE 0 END), 0) as downvotes + FROM knowledge_votes WHERE item_id = ?""", + [item_id], + ).fetchone() + return {"upvotes": result[0], "downvotes": result[1]} +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestKnowledgeRepository -v` +Expected: 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/knowledge.py tests/test_repositories.py +git commit -m "feat: add KnowledgeRepository with voting and search" +``` + +--- + +### Task 5: Audit repository + +**Files:** +- Create: `src/repositories/audit.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestAuditRepository: + def test_log_and_query(self, db_conn): + from src.repositories.audit import AuditRepository + repo = AuditRepository(db_conn) + repo.log(user_id="u1", action="sync_trigger", resource="orders", + params={"force": True}, result="ok", duration_ms=1200) + entries = repo.query(limit=10) + assert len(entries) == 1 + assert entries[0]["action"] == "sync_trigger" + assert entries[0]["duration_ms"] == 1200 + + def test_query_by_action(self, db_conn): + from src.repositories.audit import AuditRepository + repo = AuditRepository(db_conn) + repo.log(user_id="u1", action="sync_trigger", resource="orders") + repo.log(user_id="u1", action="login", resource=None) + entries = repo.query(action="sync_trigger") + assert len(entries) == 1 + + def test_query_by_user(self, db_conn): + from src.repositories.audit import AuditRepository + repo = AuditRepository(db_conn) + repo.log(user_id="u1", action="sync_trigger", resource="orders") + repo.log(user_id="u2", action="sync_trigger", resource="customers") + entries = repo.query(user_id="u1") + assert len(entries) == 1 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestAuditRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/audit.py +"""Repository for audit logging.""" + +import json +import uuid +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class AuditRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def log( + self, + user_id: str | None = None, + action: str = "", + resource: str | None = None, + params: dict | None = None, + result: str | None = None, + duration_ms: int | None = None, + ) -> str: + entry_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO audit_log (id, timestamp, user_id, action, resource, params, result, duration_ms) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + [entry_id, now, user_id, action, resource, + json.dumps(params) if params else None, result, duration_ms], + ) + return entry_id + + def query( + self, + user_id: str | None = None, + action: str | None = None, + limit: int = 50, + ) -> list[dict[str, Any]]: + sql = "SELECT * FROM audit_log WHERE 1=1" + params: list[Any] = [] + if user_id: + sql += " AND user_id = ?" + params.append(user_id) + if action: + sql += " AND action = ?" + params.append(action) + sql += " ORDER BY timestamp DESC LIMIT ?" + params.append(limit) + results = self.conn.execute(sql, params).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestAuditRepository -v` +Expected: 3 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/audit.py tests/test_repositories.py +git commit -m "feat: add AuditRepository with query filtering" +``` + +--- + +### Task 6: Notifications repository (Telegram + Scripts) + +**Files:** +- Create: `src/repositories/notifications.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestNotificationsRepository: + def test_telegram_link_and_get(self, db_conn): + from src.repositories.notifications import TelegramRepository + repo = TelegramRepository(db_conn) + repo.link_user("u1", chat_id=12345) + link = repo.get_link("u1") + assert link is not None + assert link["chat_id"] == 12345 + + def test_telegram_unlink(self, db_conn): + from src.repositories.notifications import TelegramRepository + repo = TelegramRepository(db_conn) + repo.link_user("u1", chat_id=12345) + repo.unlink_user("u1") + assert repo.get_link("u1") is None + + def test_pending_code_create_and_verify(self, db_conn): + from src.repositories.notifications import PendingCodeRepository + repo = PendingCodeRepository(db_conn) + repo.create_code("ABC123", chat_id=12345) + code = repo.verify_code("ABC123") + assert code is not None + assert code["chat_id"] == 12345 + # Code consumed after verify + assert repo.verify_code("ABC123") is None + + def test_script_registry(self, db_conn): + from src.repositories.notifications import ScriptRepository + repo = ScriptRepository(db_conn) + repo.deploy("s1", name="sales_alert", owner="u1", + schedule="0 8 * * MON", source="print('hello')") + script = repo.get("s1") + assert script is not None + assert script["schedule"] == "0 8 * * MON" + all_scripts = repo.list_all() + assert len(all_scripts) == 1 + + def test_script_undeploy(self, db_conn): + from src.repositories.notifications import ScriptRepository + repo = ScriptRepository(db_conn) + repo.deploy("s1", name="test", owner="u1", source="pass") + repo.undeploy("s1") + assert repo.get("s1") is None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestNotificationsRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repository** + +```python +# src/repositories/notifications.py +"""Repositories for Telegram links, pending codes, and script registry.""" + +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class TelegramRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def link_user(self, user_id: str, chat_id: int) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO telegram_links (user_id, chat_id, linked_at) + VALUES (?, ?, ?) + ON CONFLICT (user_id) DO UPDATE SET chat_id = excluded.chat_id, linked_at = excluded.linked_at""", + [user_id, chat_id, now], + ) + + def unlink_user(self, user_id: str) -> None: + self.conn.execute("DELETE FROM telegram_links WHERE user_id = ?", [user_id]) + + def get_link(self, user_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM telegram_links WHERE user_id = ?", [user_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def get_all_links(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM telegram_links").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + +class PendingCodeRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def create_code(self, code: str, chat_id: int) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + "INSERT INTO pending_codes (code, chat_id, created_at) VALUES (?, ?, ?)", + [code, chat_id, now], + ) + + def verify_code(self, code: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM pending_codes WHERE code = ?", [code] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + row = dict(zip(columns, result)) + self.conn.execute("DELETE FROM pending_codes WHERE code = ?", [code]) + return row + + +class ScriptRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def deploy( + self, id: str, name: str, owner: str | None = None, + schedule: str | None = None, source: str = "", + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO script_registry (id, name, owner, schedule, source, deployed_at) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (id) DO UPDATE SET + name = excluded.name, schedule = excluded.schedule, + source = excluded.source, deployed_at = excluded.deployed_at""", + [id, name, owner, schedule, source, now], + ) + + def undeploy(self, script_id: str) -> None: + self.conn.execute("DELETE FROM script_registry WHERE id = ?", [script_id]) + + def get(self, script_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM script_registry WHERE id = ?", [script_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def list_all(self, owner: str | None = None) -> list[dict[str, Any]]: + if owner: + results = self.conn.execute( + "SELECT * FROM script_registry WHERE owner = ? ORDER BY name", [owner] + ).fetchall() + else: + results = self.conn.execute("SELECT * FROM script_registry ORDER BY name").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestNotificationsRepository -v` +Expected: 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/notifications.py tests/test_repositories.py +git commit -m "feat: add Telegram, PendingCode, and Script repositories" +``` + +--- + +### Task 7: Table registry + Profiles repositories + +**Files:** +- Create: `src/repositories/table_registry.py` +- Create: `src/repositories/profiles.py` +- Append to: `tests/test_repositories.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_repositories.py`: + +```python +class TestTableRegistryRepository: + def test_register_and_get(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="orders", name="Orders", folder="sales", + sync_strategy="incremental", registered_by="admin") + table = repo.get("orders") + assert table is not None + assert table["folder"] == "sales" + + def test_list_all(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="t1", name="A", folder="f1") + repo.register(id="t2", name="B", folder="f2") + assert len(repo.list_all()) == 2 + + def test_unregister(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="t1", name="A", folder="f1") + repo.unregister("t1") + assert repo.get("t1") is None + + +class TestProfileRepository: + def test_save_and_get(self, db_conn): + from src.repositories.profiles import ProfileRepository + repo = ProfileRepository(db_conn) + profile_data = {"columns": [{"name": "id", "type": "int"}], "row_count": 1000} + repo.save("orders", profile_data) + profile = repo.get("orders") + assert profile is not None + assert profile["row_count"] == 1000 + + def test_get_all(self, db_conn): + from src.repositories.profiles import ProfileRepository + repo = ProfileRepository(db_conn) + repo.save("t1", {"row_count": 100}) + repo.save("t2", {"row_count": 200}) + all_profiles = repo.get_all() + assert len(all_profiles) == 2 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_repositories.py::TestTableRegistryRepository tests/test_repositories.py::TestProfileRepository -v` +Expected: FAIL + +- [ ] **Step 3: Implement repositories** + +```python +# src/repositories/table_registry.py +"""Repository for table registry.""" + +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class TableRegistryRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def register( + self, id: str, name: str, folder: str | None = None, + sync_strategy: str | None = None, primary_key: str | None = None, + description: str | None = None, registered_by: str | None = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO table_registry (id, name, folder, sync_strategy, + primary_key, description, registered_by, registered_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (id) DO UPDATE SET + name = excluded.name, folder = excluded.folder, + sync_strategy = excluded.sync_strategy, primary_key = excluded.primary_key, + description = excluded.description, registered_at = excluded.registered_at""", + [id, name, folder, sync_strategy, primary_key, description, registered_by, now], + ) + + def unregister(self, table_id: str) -> None: + self.conn.execute("DELETE FROM table_registry WHERE id = ?", [table_id]) + + def get(self, table_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT * FROM table_registry WHERE id = ?", [table_id] + ).fetchone() + if not result: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, result)) + + def list_all(self) -> list[dict[str, Any]]: + results = self.conn.execute("SELECT * FROM table_registry ORDER BY name").fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] +``` + +```python +# src/repositories/profiles.py +"""Repository for table profiles.""" + +import json +from datetime import datetime, timezone +from typing import Any + +import duckdb + + +class ProfileRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def save(self, table_id: str, profile: dict) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO table_profiles (table_id, profile, profiled_at) + VALUES (?, ?, ?) + ON CONFLICT (table_id) DO UPDATE SET + profile = excluded.profile, profiled_at = excluded.profiled_at""", + [table_id, json.dumps(profile), now], + ) + + def get(self, table_id: str) -> dict[str, Any] | None: + result = self.conn.execute( + "SELECT profile, profiled_at FROM table_profiles WHERE table_id = ?", + [table_id], + ).fetchone() + if not result: + return None + profile = json.loads(result[0]) if isinstance(result[0], str) else result[0] + profile["profiled_at"] = result[1] + return profile + + def get_all(self) -> dict[str, dict]: + results = self.conn.execute( + "SELECT table_id, profile, profiled_at FROM table_profiles ORDER BY table_id" + ).fetchall() + out = {} + for row in results: + profile = json.loads(row[1]) if isinstance(row[1], str) else row[1] + profile["profiled_at"] = row[2] + out[row[0]] = profile + return out +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_repositories.py::TestTableRegistryRepository tests/test_repositories.py::TestProfileRepository -v` +Expected: 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/repositories/table_registry.py src/repositories/profiles.py tests/test_repositories.py +git commit -m "feat: add TableRegistry and Profile repositories" +``` + +--- + +### Task 8: Migration script (JSON → DuckDB) + +**Files:** +- Create: `scripts/migrate_json_to_duckdb.py` +- Create: `tests/test_migration.py` + +- [ ] **Step 1: Write the failing test** + +```python +# tests/test_migration.py +import json +import os +import tempfile + +import pytest + + +@pytest.fixture +def migration_env(): + """Create temp dir with sample JSON files mimicking production layout.""" + with tempfile.TemporaryDirectory() as tmpdir: + data_dir = os.path.join(tmpdir, "data") + os.makedirs(os.path.join(data_dir, "notifications"), exist_ok=True) + os.makedirs(os.path.join(data_dir, "corporate-memory"), exist_ok=True) + os.makedirs(os.path.join(data_dir, "auth"), exist_ok=True) + os.makedirs(os.path.join(data_dir, "src_data", "metadata"), exist_ok=True) + + # sync_state.json + with open(os.path.join(data_dir, "src_data", "metadata", "sync_state.json"), "w") as f: + json.dump({ + "tables": { + "orders": {"last_sync": "2026-03-27T08:00:00Z", "rows": 1000, "file_size_bytes": 5000} + } + }, f) + + # sync_settings.json + with open(os.path.join(data_dir, "notifications", "sync_settings.json"), "w") as f: + json.dump({ + "petr": {"datasets": {"sales": True, "support": False}, "updated_at": "2026-03-27"} + }, f) + + # knowledge.json + with open(os.path.join(data_dir, "corporate-memory", "knowledge.json"), "w") as f: + json.dump([ + {"id": "k1", "title": "MRR", "content": "Monthly...", "category": "metrics", + "status": "approved", "contributors": ["petr"]} + ], f) + + # telegram_users.json + with open(os.path.join(data_dir, "notifications", "telegram_users.json"), "w") as f: + json.dump({"petr@acme.com": {"chat_id": 12345, "linked_at": "2026-01-01"}}, f) + + os.environ["DATA_DIR"] = data_dir + yield data_dir + + +def test_migration_runs_without_error(migration_env): + from scripts.migrate_json_to_duckdb import migrate_all + stats = migrate_all(migration_env) + assert stats["sync_state"] == 1 + assert stats["knowledge"] == 1 + assert stats["telegram"] == 1 + + +def test_migration_is_idempotent(migration_env): + from scripts.migrate_json_to_duckdb import migrate_all + stats1 = migrate_all(migration_env) + stats2 = migrate_all(migration_env) + assert stats1["sync_state"] == stats2["sync_state"] +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `python -m pytest tests/test_migration.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement migration script** + +```python +# scripts/migrate_json_to_duckdb.py +""" +One-time migration: JSON files → DuckDB. + +Usage: python -m scripts.migrate_json_to_duckdb [--data-dir /data] + +Idempotent — safe to run multiple times. Uses UPSERT to avoid duplicates. +""" + +import json +import logging +import os +import sys +from pathlib import Path + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def _load_json(path: str) -> dict | list | None: + try: + with open(path) as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + logger.warning(f"Skipping {path}: {e}") + return None + + +def migrate_all(data_dir: str | None = None) -> dict[str, int]: + if data_dir: + os.environ["DATA_DIR"] = data_dir + data = Path(data_dir or os.environ.get("DATA_DIR", "./data")) + + from src.db import get_system_db + from src.repositories.sync_state import SyncStateRepository + from src.repositories.knowledge import KnowledgeRepository + from src.repositories.notifications import TelegramRepository + from src.repositories.users import UserRepository + + conn = get_system_db() + stats: dict[str, int] = {} + + # 1. Sync state + sync_data = _load_json(str(data / "src_data" / "metadata" / "sync_state.json")) + count = 0 + if sync_data and "tables" in sync_data: + repo = SyncStateRepository(conn) + for table_id, info in sync_data["tables"].items(): + repo.update_sync( + table_id=table_id, + rows=info.get("rows", 0), + file_size_bytes=info.get("file_size_bytes", 0), + hash=info.get("hash", ""), + uncompressed_size_bytes=info.get("uncompressed_size_bytes", 0), + columns=info.get("columns", 0), + ) + count += 1 + stats["sync_state"] = count + logger.info(f"Migrated {count} sync state entries") + + # 2. Knowledge items + knowledge = _load_json(str(data / "corporate-memory" / "knowledge.json")) + count = 0 + if knowledge and isinstance(knowledge, list): + repo = KnowledgeRepository(conn) + for item in knowledge: + repo.create( + id=item.get("id", ""), + title=item.get("title", ""), + content=item.get("content", ""), + category=item.get("category", ""), + source_user=item.get("source_user"), + tags=item.get("tags"), + status=item.get("status", "pending"), + ) + count += 1 + stats["knowledge"] = count + logger.info(f"Migrated {count} knowledge items") + + # 3. Telegram users + telegram = _load_json(str(data / "notifications" / "telegram_users.json")) + count = 0 + if telegram and isinstance(telegram, dict): + repo = TelegramRepository(conn) + for email, info in telegram.items(): + repo.link_user(email, chat_id=info.get("chat_id", 0)) + count += 1 + stats["telegram"] = count + logger.info(f"Migrated {count} telegram links") + + conn.close() + logger.info("Migration complete") + return stats + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="Migrate JSON state to DuckDB") + parser.add_argument("--data-dir", default=None, help="Data directory path") + args = parser.parse_args() + migrate_all(args.data_dir) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `python -m pytest tests/test_migration.py -v` +Expected: 2 tests PASS + +- [ ] **Step 5: Run all repository tests together** + +Run: `python -m pytest tests/test_db.py tests/test_repositories.py tests/test_migration.py -v` +Expected: All tests PASS (3 + 26 + 2 = 31 tests) + +- [ ] **Step 6: Commit** + +```bash +git add scripts/migrate_json_to_duckdb.py tests/test_migration.py +git commit -m "feat: add JSON to DuckDB migration script" +``` + +--- + +## Summary + +| Task | Files | Tests | Purpose | +|------|-------|-------|---------| +| 1 | `src/db.py` | 3 | DuckDB connection + schema | +| 2 | `src/repositories/sync_state.py` | 6 | Sync state + history | +| 3 | `src/repositories/users.py` | 7 | User CRUD | +| 4 | `src/repositories/knowledge.py` | 5 | Corporate memory + votes | +| 5 | `src/repositories/audit.py` | 3 | Audit logging | +| 6 | `src/repositories/notifications.py` | 5 | Telegram + Scripts | +| 7 | `src/repositories/table_registry.py`, `profiles.py` | 5 | Registry + Profiles | +| 8 | `scripts/migrate_json_to_duckdb.py` | 2 | Migration | +| **Total** | **12 new files** | **36 tests** | | + +After this plan, all state is in DuckDB. The existing service files still use JSON (they'll be rewired in Plan 2: FastAPI Server, which depends on this layer being complete). diff --git a/docs/superpowers/plans/2026-03-27-02-complete-system.md b/docs/superpowers/plans/2026-03-27-02-complete-system.md new file mode 100644 index 0000000..694b75a --- /dev/null +++ b/docs/superpowers/plans/2026-03-27-02-complete-system.md @@ -0,0 +1,69 @@ +# Complete System Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development or superpowers:executing-plans. + +**Goal:** Make the new FastAPI system feature-complete with the old Flask system. Every route, every service function, every template — replicated with the new DuckDB-backed architecture. + +**Status:** Infrastructure done (DuckDB, repos, FastAPI skeleton, CLI, Docker). Missing: business logic wiring, web UI, auth providers, 18 routes, 38 service functions. + +--- + +## Part A: Wire sync trigger to DataSyncManager + +Files: +- Modify: `app/api/sync.py` (replace stub with real sync) +- Modify: `app/main.py` (add instance config loading) + +## Part B: Instance config integration + +Files: +- Create: `app/instance_config.py` (load instance.yaml, expose to FastAPI) +- Modify: `app/main.py` (lifespan event loads config) +- Modify: `app/api/health.py` (include data source info) + +## Part C: Web UI — Jinja2 templates in FastAPI + +Files: +- Create: `app/web/router.py` (ALL web routes: /, /dashboard, /catalog, /login, /corporate-memory, /admin/tables, etc.) +- Copy: `webapp/templates/` → `app/web/templates/` (adapt for FastAPI) +- Copy: `webapp/static/` → `app/web/static/` +- Modify: `app/main.py` (mount templates + static) + +## Part D: Auth providers (Google OAuth + Email + Password) + +Files: +- Create: `app/auth/providers/google.py` +- Create: `app/auth/providers/email.py` +- Create: `app/auth/providers/password.py` +- Modify: `app/auth/router.py` (OAuth callback, magic link, password verify) + +## Part E: Missing API endpoints (18 routes) + +Files: +- Create: `app/api/catalog.py` (profile, metrics) +- Create: `app/api/telegram.py` (verify, unlink, status) +- Create: `app/api/desktop.py` (scripts, run) +- Create: `app/api/admin.py` (tables discover, registry CRUD) +- Modify: `app/api/memory.py` (add 10 admin governance endpoints) +- Modify: `app/api/sync.py` (add sync-settings, table-subscriptions) + +## Part F: Service logic rewiring + +Files: +- Rewrite all old service calls to use DuckDB repositories +- Bridge: old corporate_memory_service → KnowledgeRepository +- Bridge: old sync_settings_service → SyncSettingsRepository +- Bridge: old telegram_service → TelegramRepository + +## Part G: CLI missing commands + old test fixes + +Files: +- Create: `cli/commands/setup.py` +- Create: `cli/commands/server.py` +- Create: `cli/commands/explore.py` +- Fix: old tests to work with new code + +## Part H: Full test coverage + +- Integration tests for all 40 routes +- E2E Docker test diff --git a/docs/superpowers/plans/2026-04-08-production-hardening.md b/docs/superpowers/plans/2026-04-08-production-hardening.md new file mode 100644 index 0000000..c3acbca --- /dev/null +++ b/docs/superpowers/plans/2026-04-08-production-hardening.md @@ -0,0 +1,1840 @@ +# Production Hardening Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Fix all P0/P1 issues from 4 independent code reviews (architect, data engineer, senior dev, test specialist) to make the codebase production-ready. + +**Architecture:** Fixes are grouped into 6 independent workstreams: auth/security, SQL safety, orchestrator bugs, DuckDB lifecycle, test hardening, and docs/cleanup. Each workstream can be executed in parallel by separate agents. + +**Tech Stack:** Python 3.13, FastAPI, DuckDB, pytest, Docker + +**Source:** Consolidated findings from 4 review agents run 2026-04-08. + +--- + +## Workstream 1: Authentication & Security (P0) + +### Task 1.1: Fix password bypass in /auth/token + +The `/auth/token` endpoint issues a JWT without verifying the password when `request.password` is empty but `user.password_hash` exists. Any user with a password can get a token by omitting the password field. + +**Files:** +- Modify: `app/auth/router.py:47-54` +- Test: `tests/test_auth_providers.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_auth_providers.py, add to existing test class: + +def test_password_required_when_hash_exists(client, e2e_env): + """A user with password_hash must provide correct password.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + from argon2 import PasswordHasher + + conn = get_system_db() + repo = UserRepository(conn) + ph = PasswordHasher() + repo.create(id="pw-user", email="pw@test.com", role="analyst") + conn.execute( + "UPDATE users SET password_hash = ? WHERE id = ?", + [ph.hash("correct-password"), "pw-user"], + ) + conn.close() + + # Empty password should be rejected + resp = client.post("/auth/token", json={"email": "pw@test.com", "password": ""}) + assert resp.status_code == 401 + + # Missing password field should be rejected + resp = client.post("/auth/token", json={"email": "pw@test.com"}) + assert resp.status_code == 401 + + # Wrong password should be rejected + resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "wrong"}) + assert resp.status_code == 401 + + # Correct password should work + resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "correct-password"}) + assert resp.status_code == 200 +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `pytest tests/test_auth_providers.py::test_password_required_when_hash_exists -v` +Expected: FAIL — empty password returns 200 instead of 401 + +- [ ] **Step 3: Fix the auth logic** + +In `app/auth/router.py`, replace lines 47-54: + +```python + # If user has password_hash, require and verify password + if user.get("password_hash"): + if not request.password: + raise HTTPException(status_code=401, detail="Password required") + try: + from argon2 import PasswordHasher + ph = PasswordHasher() + ph.verify(user["password_hash"], request.password) + except Exception: + raise HTTPException(status_code=401, detail="Invalid password") +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `pytest tests/test_auth_providers.py::test_password_required_when_hash_exists -v` +Expected: PASS + +- [ ] **Step 5: Run full auth test suite** + +Run: `pytest tests/test_auth_providers.py tests/test_security.py -v` +Expected: All pass + +- [ ] **Step 6: Commit** + +```bash +git add app/auth/router.py tests/test_auth_providers.py +git commit -m "fix: require password when password_hash exists — prevents auth bypass" +``` + +### Task 1.2: Fail on default JWT secret in non-test environments + +The app starts with a hardcoded known secret if `JWT_SECRET_KEY` env var is missing. A production deployment that forgets to set it is wide open. + +**Files:** +- Modify: `app/auth/jwt.py:9-16` +- Test: `tests/test_security.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_security.py, add: + +def test_jwt_rejects_default_secret_in_production(monkeypatch): + """App should refuse to start with the default JWT secret unless TESTING=1.""" + monkeypatch.delenv("JWT_SECRET_KEY", raising=False) + monkeypatch.delenv("TESTING", raising=False) + + with pytest.raises(RuntimeError, match="JWT_SECRET_KEY"): + # Force reimport to trigger module-level check + import importlib + import app.auth.jwt as jwt_mod + importlib.reload(jwt_mod) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `pytest tests/test_security.py::test_jwt_rejects_default_secret_in_production -v` +Expected: FAIL — no RuntimeError raised + +- [ ] **Step 3: Fix jwt.py** + +Replace `app/auth/jwt.py` lines 9-16: + +```python +SECRET_KEY = os.environ.get("JWT_SECRET_KEY", "") + +if not SECRET_KEY: + if os.environ.get("TESTING", "").lower() in ("1", "true"): + SECRET_KEY = "test-jwt-secret-key-minimum-32-chars!!" + else: + raise RuntimeError( + "JWT_SECRET_KEY environment variable is required. " + "Generate one: python -c \"import secrets; print(secrets.token_hex(32))\"" + ) +elif len(SECRET_KEY) < 32 and os.environ.get("TESTING", "").lower() not in ("1", "true"): + import warnings as _warnings + _warnings.warn( + f"JWT_SECRET_KEY is {len(SECRET_KEY)} chars — minimum 32 recommended", + UserWarning, stacklevel=2, + ) +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_security.py tests/test_auth_providers.py -v` +Expected: All pass (existing tests set TESTING=1 or JWT_SECRET_KEY via conftest) + +- [ ] **Step 5: Commit** + +```bash +git add app/auth/jwt.py tests/test_security.py +git commit -m "fix: fail startup on missing JWT_SECRET_KEY in non-test environments" +``` + +### Task 1.3: Reduce JWT expiry, add jti claim + +30-day tokens with no revocation mechanism are too risky. + +**Files:** +- Modify: `app/auth/jwt.py:18-19,28-37` +- Test: `tests/test_security.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_security.py, add: + +def test_jwt_contains_jti_claim(): + """JWT tokens must contain a unique jti claim for future revocation support.""" + from app.auth.jwt import create_access_token, verify_token + token = create_access_token(user_id="u1", email="a@b.com", role="analyst") + payload = verify_token(token) + assert "jti" in payload + assert len(payload["jti"]) >= 16 + +def test_jwt_expiry_is_24_hours(): + """JWT tokens should expire in 24 hours, not 30 days.""" + from app.auth.jwt import ACCESS_TOKEN_EXPIRE_HOURS + assert ACCESS_TOKEN_EXPIRE_HOURS == 24 +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pytest tests/test_security.py::test_jwt_contains_jti_claim tests/test_security.py::test_jwt_expiry_is_24_hours -v` +Expected: FAIL + +- [ ] **Step 3: Fix jwt.py** + +In `app/auth/jwt.py`: + +Change line 19: `ACCESS_TOKEN_EXPIRE_HOURS = 24` + +Add `import uuid` at the top. In `create_access_token`, add `"jti"` to payload: + +```python + payload = { + "sub": user_id, + "email": email, + "role": role, + "exp": expire, + "iat": datetime.now(timezone.utc), + "jti": uuid.uuid4().hex, + } +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/ -q --tb=short` +Expected: All pass + +- [ ] **Step 5: Commit** + +```bash +git add app/auth/jwt.py tests/test_security.py +git commit -m "fix: reduce JWT expiry to 24h, add jti claim for future revocation" +``` + +### Task 1.4: Fix get_optional_user not checking cookies + +`get_optional_user` only checks the Authorization header, not cookies. Web UI users appear as None. + +**Files:** +- Modify: `app/auth/dependencies.py:60-70` +- Test: `tests/test_auth_providers.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_auth_providers.py, add: + +def test_optional_user_reads_cookie(client, e2e_env): + """get_optional_user should detect cookie-authenticated users.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + from app.auth.jwt import create_access_token + + conn = get_system_db() + UserRepository(conn).create(id="cookie-user", email="cookie@test.com", role="analyst") + conn.close() + + token = create_access_token(user_id="cookie-user", email="cookie@test.com", role="analyst") + + # Simulate web UI request with cookie but no Authorization header + resp = client.get("/api/catalog", cookies={"access_token": token}) + assert resp.status_code == 200 +``` + +- [ ] **Step 2: Run to verify behavior** (this may or may not fail depending on endpoint requirements) + +- [ ] **Step 3: Fix dependencies.py** + +Replace `get_optional_user` in `app/auth/dependencies.py`: + +```python +async def get_optional_user( + request: Request = None, + authorization: Optional[str] = Header(None), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +) -> Optional[dict]: + """Like get_current_user but returns None instead of 401 if no token.""" + try: + return await get_current_user(request=request, authorization=authorization, conn=conn) + except HTTPException: + return None +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_auth_providers.py tests/test_api.py -v` +Expected: All pass + +- [ ] **Step 5: Commit** + +```bash +git add app/auth/dependencies.py tests/test_auth_providers.py +git commit -m "fix: get_optional_user now checks cookies like get_current_user" +``` + +--- + +## Workstream 2: SQL Safety (P0/P1) + +### Task 2.1: Add identifier validation to orchestrator + +`source_name` from directory names and `table_name` from `_meta` are interpolated into SQL without validation. A crafted directory name or _meta entry could inject arbitrary SQL. + +**Files:** +- Modify: `src/orchestrator.py` (add validation helper, apply in `_attach_and_create_views` and `_attach_remote_extensions`) +- Test: `tests/test_orchestrator.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# In tests/test_orchestrator.py, add: + +def test_rejects_malicious_source_name(setup_env): + """Orchestrator must reject directory names with SQL injection characters.""" + from src.orchestrator import SyncOrchestrator + + malicious_dir = setup_env["extracts_dir"] / 'test; DROP TABLE _meta--' + malicious_dir.mkdir() + db_path = malicious_dir / "extract.duckdb" + import duckdb as _duckdb + conn = _duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'local' + )""") + conn.execute('CREATE TABLE "safe_table" (id VARCHAR)') + conn.execute("INSERT INTO _meta VALUES ('safe_table', '', 0, 0, current_timestamp, 'local')") + conn.close() + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # Malicious source should be skipped, not attached + assert 'test; DROP TABLE _meta--' not in result + + +def test_rejects_malicious_table_name(setup_env): + """Orchestrator must reject table names with SQL injection characters.""" + from src.orchestrator import SyncOrchestrator + + source_dir = setup_env["extracts_dir"] / "keboola" + source_dir.mkdir() + (source_dir / "data").mkdir() + + db_path = source_dir / "extract.duckdb" + import duckdb as _duckdb + conn = _duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'local' + )""") + conn.execute('CREATE TABLE "safe" (id VARCHAR)') + conn.execute("INSERT INTO _meta VALUES ('safe', '', 0, 0, current_timestamp, 'local')") + # Malicious table name in _meta + conn.execute("""INSERT INTO _meta VALUES ('x"; DROP TABLE _meta; --', '', 0, 0, current_timestamp, 'local')""") + conn.close() + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # Safe table should be there, malicious should be skipped + assert "keboola" in result + assert "safe" in result["keboola"] + assert 'x"; DROP TABLE _meta; --' not in result.get("keboola", []) +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pytest tests/test_orchestrator.py::test_rejects_malicious_source_name tests/test_orchestrator.py::test_rejects_malicious_table_name -v` +Expected: FAIL or crash from SQL injection + +- [ ] **Step 3: Add validation helper and apply it** + +At the top of `src/orchestrator.py`, add after imports: + +```python +import re + +_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") + +def _validate_identifier(name: str, context: str) -> bool: + """Validate a DuckDB identifier. Returns True if safe, False if not.""" + if not _SAFE_IDENTIFIER.match(name): + logger.warning("Rejected unsafe %s identifier: %r", context, name) + return False + return True +``` + +In `_do_rebuild` (line ~92), add check before `_attach_and_create_views`: + +```python + if not _validate_identifier(ext_dir.name, "source_name"): + continue +``` + +In `_attach_and_create_views` (line ~160), add check before CREATE VIEW: + +```python + for table_name, rows, size_bytes, query_mode in meta_rows: + if not _validate_identifier(table_name, "table_name"): + continue +``` + +In `_attach_remote_extensions` (line ~193), add check: + +```python + for alias, extension, url, token_env in rows: + if not _validate_identifier(alias, "alias") or not _validate_identifier(extension, "extension"): + continue +``` + +- [ ] **Step 4: Run all orchestrator tests** + +Run: `pytest tests/test_orchestrator.py -v` +Expected: All pass including new tests + +- [ ] **Step 5: Commit** + +```bash +git add src/orchestrator.py tests/test_orchestrator.py +git commit -m "fix: validate SQL identifiers in orchestrator — prevent injection via directory/table names" +``` + +### Task 2.2: Harden query endpoint SQL blocklist + +The current blocklist misses `parquet_scan`, `read_csv_auto`, `query_table`, and has false positives on semicolons in string literals. Also add `enable_external_access=false` on the analytics connection. + +**Files:** +- Modify: `app/api/query.py:39-51` and `src/db.py` (analytics readonly connection) +- Test: `tests/test_security.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# In tests/test_security.py, add to TestQuerySecurity: + +def test_blocks_parquet_scan(self, client, auth_headers): + resp = client.post("/api/query", json={"sql": "SELECT * FROM parquet_scan('/etc/passwd')"}, headers=auth_headers) + assert resp.status_code == 400 + +def test_blocks_read_csv_auto(self, client, auth_headers): + resp = client.post("/api/query", json={"sql": "SELECT * FROM read_csv_auto('/data/state/system.duckdb')"}, headers=auth_headers) + assert resp.status_code == 400 + +def test_blocks_query_table(self, client, auth_headers): + resp = client.post("/api/query", json={"sql": "SELECT * FROM query_table('secret')"}, headers=auth_headers) + assert resp.status_code == 400 + +def test_blocks_httpfs(self, client, auth_headers): + resp = client.post("/api/query", json={"sql": "SELECT * FROM read_parquet('https://evil.com/data.parquet')"}, headers=auth_headers) + assert resp.status_code == 400 +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pytest tests/test_security.py::TestQuerySecurity -v` +Expected: Some FAIL (parquet_scan, read_csv_auto, query_table not blocked) + +- [ ] **Step 3: Expand the blocklist and set enable_external_access=false** + +In `app/api/query.py`, replace the `blocked` list (lines 39-49): + +```python + blocked = [ + "drop ", "delete ", "insert ", "update ", "alter ", "create ", + "copy ", "attach ", "detach ", "load ", "install ", + "export ", "import ", "pragma ", "call ", + # File access functions + "read_csv", "read_json", "read_parquet", "read_text", + "write_csv", "write_parquet", "read_blob", "read_ndjson", + "parquet_scan", "parquet_metadata", "parquet_schema", + "json_scan", "csv_scan", + "query_table", "iceberg_scan", "delta_scan", + "glob(", "list_files", + "'/", '"/','http://', 'https://', 's3://', 'gcs://', + # Multiple statements + ";", + ] +``` + +In `src/db.py` `get_analytics_db_readonly()`, after opening the connection, add: + +```python + try: + conn.execute("SET enable_external_access = false") + except Exception: + pass # Older DuckDB versions +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_security.py::TestQuerySecurity -v` +Expected: All pass + +- [ ] **Step 5: Commit** + +```bash +git add app/api/query.py src/db.py tests/test_security.py +git commit -m "fix: expand query blocklist, disable external_access on analytics connection" +``` + +--- + +## Workstream 3: Orchestrator Bugs (P0/P1) + +### Task 3.1: Fix rebuild_source destroying all other sources' views + +`_do_rebuild_source()` creates a fresh temp DB with only one source, then replaces the entire analytics DB. Every Jira webhook wipes all Keboola/BigQuery views. + +**Files:** +- Modify: `src/orchestrator.py:116-141` +- Test: `tests/test_orchestrator.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_orchestrator.py, add: + +def test_rebuild_source_preserves_other_sources(setup_env): + """rebuild_source('jira') must NOT destroy views from keboola.""" + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + _create_mock_extract( + setup_env["extracts_dir"], "jira", + [{"name": "issues", "data": [{"key": "PROJ-1"}]}], + ) + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + + # First: full rebuild + result = orch.rebuild() + assert "keboola" in result + assert "jira" in result + + # Second: rebuild only jira + jira_tables = orch.rebuild_source("jira") + assert "issues" in jira_tables + + # Third: full rebuild again — keboola must still be there + result2 = orch.rebuild() + assert "keboola" in result2 + assert "orders" in result2["keboola"] +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pytest tests/test_orchestrator.py::test_rebuild_source_preserves_other_sources -v` +Expected: FAIL — keboola views gone after rebuild_source("jira") + +- [ ] **Step 3: Fix _do_rebuild_source to delegate to full rebuild** + +In `src/orchestrator.py`, replace `_do_rebuild_source` (lines 116-141): + +```python + def _do_rebuild_source(self, source_name: str) -> List[str]: + """Rebuild views for a single source by doing a full rebuild. + + A full rebuild is necessary because the analytics DB is created fresh + each time (temp file + atomic swap). Rebuilding only one source would + destroy views from all other sources. + """ + extracts_dir = _get_extracts_dir() + db_file = extracts_dir / source_name / "extract.duckdb" + if not db_file.exists(): + logger.warning("No extract.duckdb for source %s", source_name) + return [] + + result = self._do_rebuild() + return result.get(source_name, []) +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_orchestrator.py -v` +Expected: All pass + +- [ ] **Step 5: Commit** + +```bash +git add src/orchestrator.py tests/test_orchestrator.py +git commit -m "fix: rebuild_source delegates to full rebuild — preserves other sources' views" +``` + +### Task 3.2: Handle WAL files in atomic swap + +`shutil.move` only moves the `.duckdb` file. The `.wal` file from the old DB can corrupt the new one. + +**Files:** +- Modify: `src/orchestrator.py` (_do_rebuild lines 106-112) +- Modify: `connectors/keboola/extractor.py` (lines 148-155) +- Test: `tests/test_orchestrator.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_orchestrator.py, add: + +def test_rebuild_cleans_wal_files(setup_env): + """After rebuild, no .wal files should remain from the temp or old DB.""" + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + orch.rebuild() + + from pathlib import Path + db_path = Path(setup_env["analytics_db"]) + assert not (db_path.parent / (db_path.name + ".wal")).exists() + assert not (db_path.parent / (db_path.name + ".tmp.wal")).exists() +``` + +- [ ] **Step 2: Run to verify it passes or fails** + +Run: `pytest tests/test_orchestrator.py::test_rebuild_cleans_wal_files -v` + +- [ ] **Step 3: Add WAL cleanup helper** + +In `src/orchestrator.py`, add a helper after `_validate_identifier`: + +```python +def _atomic_swap_db(tmp_path: str, target_path: str) -> None: + """Atomically replace target DuckDB file, cleaning up WAL files.""" + import shutil + target = Path(target_path) + tmp = Path(tmp_path) + + # Remove old WAL file if it exists + old_wal = Path(str(target) + ".wal") + if old_wal.exists(): + old_wal.unlink() + + # Move temp DB into place + if tmp.exists(): + shutil.move(str(tmp), str(target)) + + # Clean up temp WAL + tmp_wal = Path(str(tmp) + ".wal") + if tmp_wal.exists(): + tmp_wal.unlink() +``` + +Replace `shutil.move` call in `_do_rebuild` (line ~112) with: + +```python + _atomic_swap_db(tmp_path, self._db_path) +``` + +Also add `CHECKPOINT` before `conn.close()` in `_do_rebuild`: + +```python + conn.execute("CHECKPOINT") + finally: + conn.close() +``` + +Apply the same pattern in `connectors/keboola/extractor.py` at the end of `run()`. + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_orchestrator.py tests/test_keboola_extractor.py -v` +Expected: All pass + +- [ ] **Step 5: Commit** + +```bash +git add src/orchestrator.py connectors/keboola/extractor.py tests/test_orchestrator.py +git commit -m "fix: clean WAL files during atomic DB swap, add CHECKPOINT before close" +``` + +### Task 3.3: Add temp-file swap to BigQuery extractor + +BigQuery extractor writes directly to `extract.duckdb`, causing lock conflicts with the orchestrator. + +**Files:** +- Modify: `connectors/bigquery/extractor.py:64-68` +- Test: `tests/test_bigquery_extractor.py` + +- [ ] **Step 1: Write the test** + +```python +# In tests/test_bigquery_extractor.py, add: + +def test_uses_temp_file_swap(self, output_dir): + """BigQuery extractor should write to .tmp and rename, not write directly.""" + from connectors.bigquery.extractor import _create_meta_table + db_path = Path(output_dir) / "extract.duckdb" + + # Pre-create the DB to simulate existing file + conn = duckdb.connect(str(db_path)) + _create_meta_table(conn) + conn.close() + + # After init_extract, the file should exist and no .tmp should remain + # (The actual init_extract test already covers this — we just verify no .tmp leak) + assert db_path.exists() + assert not (Path(output_dir) / "extract.duckdb.tmp").exists() +``` + +- [ ] **Step 2: Modify init_extract to use temp-file swap** + +In `connectors/bigquery/extractor.py`, replace lines 64-68: + +```python + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + db_path = output_path / "extract.duckdb" + tmp_db_path = output_path / "extract.duckdb.tmp" + if tmp_db_path.exists(): + tmp_db_path.unlink() + conn = duckdb.connect(str(tmp_db_path)) +``` + +And at the end, before `return stats` (after `conn.close()`): + +```python + import shutil + if tmp_db_path.exists(): + shutil.move(str(tmp_db_path), str(db_path)) +``` + +- [ ] **Step 3: Run tests** + +Run: `pytest tests/test_bigquery_extractor.py -v` +Expected: All pass + +- [ ] **Step 4: Commit** + +```bash +git add connectors/bigquery/extractor.py tests/test_bigquery_extractor.py +git commit -m "fix: BigQuery extractor uses temp-file swap to avoid lock conflicts" +``` + +--- + +## Workstream 4: Script Sandbox Hardening (P1) + +### Task 4.1: Strip VIRTUAL_ENV and PYTHONPATH from sandbox subprocess + +The sandbox gives scripts access to all installed packages (httpx, duckdb) via inherited env vars. + +**Files:** +- Modify: `app/api/scripts.py:191-198` +- Test: `tests/test_security.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_security.py, add to TestScriptSecurity: + +def test_sandbox_cannot_import_httpx(self, client, admin_headers): + """Sandboxed scripts must not have access to httpx or other installed packages.""" + resp = client.post("/api/scripts/run", json={ + "name": "test", + "source": "import httpx\nprint('pwned')" + }, headers=admin_headers) + data = resp.json() + # httpx should be blocked by pattern OR unavailable due to stripped VIRTUAL_ENV + assert resp.status_code == 400 or data.get("exit_code", 0) != 0 +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pytest tests/test_security.py::TestScriptSecurity::test_sandbox_cannot_import_httpx -v` +Expected: FAIL — httpx imports successfully + +- [ ] **Step 3: Fix the subprocess env** + +In `app/api/scripts.py`, replace the env dict in `subprocess.run` (lines 191-198): + +```python + env={ + "PATH": "/usr/bin:/usr/local/bin", + "DATA_DIR": data_dir, + "HOME": "/tmp", + # Deliberately exclude VIRTUAL_ENV and PYTHONPATH + # to prevent access to installed packages + }, +``` + +Also add `"httpx"`, `"from httpx"` to `blocked_patterns` list. + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_security.py::TestScriptSecurity -v` +Expected: All pass + +- [ ] **Step 5: Commit** + +```bash +git add app/api/scripts.py tests/test_security.py +git commit -m "fix: strip VIRTUAL_ENV/PYTHONPATH from script sandbox, block httpx import" +``` + +--- + +## Workstream 5: Test Hardening (P0-P1) + +### Task 5.1: Fix environment variable leaking in test fixtures + +Most test files set `os.environ["DATA_DIR"]` directly without cleanup. This causes test ordering dependencies. + +**Files:** +- Modify: `tests/test_db.py`, `tests/test_rbac.py`, `tests/test_repositories.py`, `tests/test_api.py`, `tests/test_api_complete.py`, `tests/test_api_scripts.py`, `tests/test_auth_providers.py`, `tests/test_bootstrap.py`, `tests/test_permissions.py`, `tests/test_security.py` + +- [ ] **Step 1: Search and replace pattern** + +In every test file that has `os.environ["DATA_DIR"] =` inside a fixture, replace with `monkeypatch.setenv("DATA_DIR", ...)`. Add `monkeypatch` to the fixture parameters. + +Example — in `tests/test_db.py`, change: + +```python +@pytest.fixture +def db_env(tmp_path): + os.environ["DATA_DIR"] = str(tmp_path) + yield tmp_path +``` + +To: + +```python +@pytest.fixture +def db_env(tmp_path, monkeypatch): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + yield tmp_path +``` + +Apply to all affected files. Remove manual `os.environ.pop("DATA_DIR", None)` lines since monkeypatch handles cleanup automatically. + +- [ ] **Step 2: Run full test suite** + +Run: `pytest tests/ -v --tb=short` +Expected: 607+ passed + +- [ ] **Step 3: Commit** + +```bash +git add tests/ +git commit -m "fix: use monkeypatch for DATA_DIR in all test fixtures — prevent env leaking" +``` + +### Task 5.2: Add extract.duckdb contract test + +Create a shared validator that verifies any extract.duckdb conforms to the contract. Apply it in all extractor tests. + +**Files:** +- Create: `tests/helpers/contract.py` +- Modify: `tests/test_keboola_extractor.py`, `tests/test_bigquery_extractor.py` + +- [ ] **Step 1: Create contract validator** + +```python +# tests/helpers/__init__.py (empty) +# tests/helpers/contract.py + +"""Shared validator for the extract.duckdb contract.""" + +import duckdb +from pathlib import Path + + +def validate_extract_contract(db_path: str) -> None: + """Verify an extract.duckdb conforms to the contract. + + Raises AssertionError with details if any check fails. + """ + path = Path(db_path) + assert path.exists(), f"extract.duckdb not found at {db_path}" + + conn = duckdb.connect(str(path), read_only=True) + try: + # _meta table must exist with correct schema + cols = conn.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name='_meta' ORDER BY ordinal_position" + ).fetchall() + col_names = [c[0] for c in cols] + assert col_names == ["table_name", "description", "rows", "size_bytes", "extracted_at", "query_mode"], \ + f"_meta schema mismatch: {col_names}" + + # Every _meta entry with query_mode='local' must have a corresponding view or table + local_tables = conn.execute( + "SELECT table_name FROM _meta WHERE query_mode = 'local'" + ).fetchall() + for (name,) in local_tables: + tables = conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_name = ?", [name] + ).fetchall() + assert len(tables) > 0, f"Local table '{name}' in _meta but no view/table exists" + + # If _remote_attach exists, validate its schema + ra_exists = conn.execute( + "SELECT count(*) FROM information_schema.tables WHERE table_name='_remote_attach'" + ).fetchone()[0] + if ra_exists: + ra_cols = conn.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name='_remote_attach' ORDER BY ordinal_position" + ).fetchall() + ra_col_names = [c[0] for c in ra_cols] + assert ra_col_names == ["alias", "extension", "url", "token_env"], \ + f"_remote_attach schema mismatch: {ra_col_names}" + finally: + conn.close() +``` + +- [ ] **Step 2: Apply in extractor tests** + +In `tests/test_keboola_extractor.py`, add to `test_creates_extract_duckdb`: + +```python + from tests.helpers.contract import validate_extract_contract + validate_extract_contract(str(Path(output_dir) / "extract.duckdb")) +``` + +Similarly in `tests/test_bigquery_extractor.py::test_creates_extract_duckdb_with_meta`. + +- [ ] **Step 3: Run tests** + +Run: `pytest tests/test_keboola_extractor.py tests/test_bigquery_extractor.py -v` +Expected: All pass + +- [ ] **Step 4: Commit** + +```bash +git add tests/helpers/ tests/test_keboola_extractor.py tests/test_bigquery_extractor.py +git commit -m "feat: add extract.duckdb contract validator, apply in all extractor tests" +``` + +### Task 5.3: Add pytest timeout and strict markers + +Prevent CI hangs and catch marker typos. + +**Files:** +- Modify: `pytest.ini` +- Modify: `requirements.txt` (add pytest-timeout) + +- [ ] **Step 1: Update pytest.ini** + +```ini +[pytest] +addopts = -m "not live and not docker" --timeout=60 --strict-markers +markers = + live: tests requiring server access (run with '-m live') + docker: tests requiring Docker (run with '-m docker') +``` + +- [ ] **Step 2: Add pytest-timeout to requirements.txt** + +Add line: `pytest-timeout>=2.0.0` + +- [ ] **Step 3: Install and run** + +Run: `uv pip install --system pytest-timeout && pytest tests/ -q --tb=short` +Expected: All pass within 60s timeout + +- [ ] **Step 4: Commit** + +```bash +git add pytest.ini requirements.txt +git commit -m "chore: add pytest-timeout (60s) and strict-markers to pytest config" +``` + +--- + +## Workstream 6: Docs & Cleanup (P1-P2) + +### Task 6.1: Rewrite README.md from CLAUDE.md + +The current README describes the old Flask/rsync architecture. CLAUDE.md is accurate. + +**Files:** +- Modify: `README.md` + +- [ ] **Step 1: Rewrite README.md** + +Use CLAUDE.md as the source of truth. The README should contain: +- Project description (1-2 paragraphs) +- Architecture diagram (from CLAUDE.md) +- Quick start (Docker compose) +- Development setup (venv, pytest) +- Project structure (from CLAUDE.md) +- Configuration overview +- Supported data sources (Keboola ✅, BigQuery ✅, Jira ✅) +- Links to docs/DEPLOYMENT.md for server setup +- License + +Remove all references to Flask, rsync, SSH, sync_data.sh, Linux groups, server/setup.sh. + +- [ ] **Step 2: Verify no broken references** + +Run: `grep -r "webapp/" README.md; grep -r "sync_data.sh" README.md; grep -r "server/setup" README.md` +Expected: No matches + +- [ ] **Step 3: Commit** + +```bash +git add README.md +git commit -m "docs: rewrite README for v2 architecture (FastAPI, DuckDB, Docker)" +``` + +### Task 6.2: Update .env.template to match actual code + +Template references `WEBAPP_SECRET_KEY` but code uses `JWT_SECRET_KEY`. + +**Files:** +- Modify: `config/.env.template` + +- [ ] **Step 1: Rewrite template** + +```bash +# AI Data Analyst - Environment Variables +# Copy to .env: cp config/.env.template .env +# .env is gitignored - NEVER commit it. + +# Required +JWT_SECRET_KEY= # python -c "import secrets; print(secrets.token_hex(32))" + +# Google OAuth (optional — needed for Google login) +# GOOGLE_CLIENT_ID= +# GOOGLE_CLIENT_SECRET= + +# Keboola adapter (optional — skip if using CSV/sample data) +# KEBOOLA_STORAGE_TOKEN= +# KEBOOLA_STACK_URL=https://connection.keboola.com + +# Bootstrap admin (optional — used on first docker compose up) +# SEED_ADMIN_EMAIL=admin@example.com + +# Optional services +# TELEGRAM_BOT_TOKEN= +# JIRA_WEBHOOK_SECRET= +# ANTHROPIC_API_KEY= +``` + +- [ ] **Step 2: Commit** + +```bash +git add config/.env.template +git commit -m "docs: update .env.template to match actual code (JWT_SECRET_KEY, not WEBAPP_SECRET_KEY)" +``` + +### Task 6.3: Remove dead Flask Blueprint from Jira connector + +`connectors/jira/webhook.py` uses Flask Blueprint but the app uses FastAPI. It's dead code that confuses readers. + +**Files:** +- Check: `connectors/jira/webhook.py` — verify it's not imported anywhere except Jira-internal code +- Modify: add deprecation comment or delete if unused + +- [ ] **Step 1: Check if webhook.py is imported** + +Run: `grep -r "from connectors.jira.webhook" app/ src/ services/` +If no matches: the Flask Blueprint is dead code. + +- [ ] **Step 2: Add deprecation notice or delete** + +If unused by the FastAPI app, delete `connectors/jira/webhook.py` and update any imports. + +- [ ] **Step 3: Commit** + +```bash +git add connectors/jira/ +git commit -m "chore: remove dead Flask Blueprint from Jira connector (replaced by FastAPI)" +``` + +### Task 6.4: Add upload size limit + +`upload_session` and `upload_artifact` read entire files into memory with no limit. + +**Files:** +- Modify: `app/api/upload.py` +- Test: `tests/test_api_complete.py` + +- [ ] **Step 1: Write the test** + +```python +# In tests/test_api_complete.py or a new test file: + +def test_upload_rejects_oversized_file(client, admin_headers): + """Uploads over 50MB should be rejected.""" + # Create a file reference that claims to be too large + import io + large_data = b"x" * (50 * 1024 * 1024 + 1) # 50MB + 1 byte + resp = client.post( + "/api/upload/artifact/test-session", + files={"file": ("big.csv", io.BytesIO(large_data), "text/csv")}, + headers=admin_headers, + ) + assert resp.status_code == 413 or resp.status_code == 400 +``` + +- [ ] **Step 2: Add size check** + +In `app/api/upload.py`, at the start of each upload endpoint: + +```python + MAX_UPLOAD_SIZE = 50 * 1024 * 1024 # 50 MB + contents = await file.read() + if len(contents) > MAX_UPLOAD_SIZE: + raise HTTPException(status_code=413, detail=f"File too large (max {MAX_UPLOAD_SIZE // 1024 // 1024}MB)") +``` + +- [ ] **Step 3: Run tests** + +Run: `pytest tests/test_api_complete.py -v` +Expected: All pass + +- [ ] **Step 4: Commit** + +```bash +git add app/api/upload.py tests/test_api_complete.py +git commit -m "fix: add 50MB upload size limit — prevent memory exhaustion" +``` + +--- + +## Workstream 7: DuckDB Lifecycle & Connection Management (P1) + +### Task 7.1: Fix SQL injection in get_analytics_db_readonly + +Same unquoted `ext_dir.name` issue as the orchestrator, but in the read-only analytics connection used by every query request. + +**Files:** +- Modify: `src/db.py:228-233` +- Test: `tests/test_db.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_db.py, add: + +def test_analytics_readonly_rejects_malicious_dir_name(db_env): + """get_analytics_db_readonly must skip directories with unsafe names.""" + extracts = db_env / "extracts" + extracts.mkdir(parents=True) + malicious = extracts / "test; DROP TABLE x--" + malicious.mkdir() + db_file = malicious / "extract.duckdb" + + import duckdb + conn = duckdb.connect(str(db_file)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR + )""") + conn.close() + + # Should not crash + from src.db import get_analytics_db_readonly + ro_conn = get_analytics_db_readonly() + ro_conn.close() +``` + +- [ ] **Step 2: Run to verify failure** + +Run: `pytest tests/test_db.py::test_analytics_readonly_rejects_malicious_dir_name -v` + +- [ ] **Step 3: Add identifier validation** + +Import the validator from orchestrator (or extract to shared module). In `src/db.py`, add at top: + +```python +import re +_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") +``` + +In `get_analytics_db_readonly()`, replace line 232: + +```python + if db_file.exists() and ext_dir.is_dir(): + if not _SAFE_IDENTIFIER.match(ext_dir.name): + continue + try: + conn.execute(f"ATTACH '{db_file}' AS {ext_dir.name} (READ_ONLY)") +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_db.py -v` +Expected: All pass + +- [ ] **Step 5: Commit** + +```bash +git add src/db.py tests/test_db.py +git commit -m "fix: validate identifiers in get_analytics_db_readonly — prevent SQL injection" +``` + +### Task 7.2: Remove dead PRAGMA enable_wal code + +`PRAGMA enable_wal` is not valid DuckDB syntax. DuckDB uses WAL by default since v0.8. This is dead code with a misleading comment. + +**Files:** +- Modify: `src/db.py:200-204` + +- [ ] **Step 1: Remove the dead code** + +In `src/db.py`, delete lines 200-204: + +```python + # WAL mode: allows concurrent readers while writing + try: + _system_db_conn.execute("PRAGMA enable_wal") + except Exception: + pass # Older DuckDB versions may not support this +``` + +- [ ] **Step 2: Run tests** + +Run: `pytest tests/test_db.py -v` +Expected: All pass + +- [ ] **Step 3: Commit** + +```bash +git add src/db.py +git commit -m "chore: remove dead PRAGMA enable_wal — DuckDB uses WAL by default" +``` + +### Task 7.3: Escape token single-quotes in ATTACH SQL + +If a token contains a single quote, the ATTACH SQL breaks. DuckDB doesn't support parameterized ATTACH, so escape manually. + +**Files:** +- Modify: `src/orchestrator.py` (`_attach_remote_extensions`) +- Modify: `connectors/keboola/extractor.py` (`_try_attach_extension`) + +- [ ] **Step 1: Add escaping in orchestrator** + +In `src/orchestrator.py`, in `_attach_remote_extensions`, replace the ATTACH line: + +```python + if token: + escaped_token = token.replace("'", "''") + conn.execute( + f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" + ) +``` + +- [ ] **Step 2: Add escaping in Keboola extractor** + +In `connectors/keboola/extractor.py`, `_try_attach_extension`: + +```python + escaped_token = keboola_token.replace("'", "''") + conn.execute(f"ATTACH '{keboola_url}' AS kbc (TYPE keboola, TOKEN '{escaped_token}')") +``` + +- [ ] **Step 3: Run tests** + +Run: `pytest tests/test_orchestrator.py tests/test_keboola_extractor.py -v` +Expected: All pass + +- [ ] **Step 4: Commit** + +```bash +git add src/orchestrator.py connectors/keboola/extractor.py +git commit -m "fix: escape single quotes in ATTACH TOKEN to prevent SQL breakage" +``` + +### Task 7.4: Add temp-file swap to Jira extract_init.update_meta + +Jira's `update_meta()` writes directly to live `extract.duckdb` while the orchestrator may have it ATTACHed read-only. + +**Files:** +- Modify: `connectors/jira/extract_init.py:87` +- Test: `tests/test_e2e_extract.py` + +- [ ] **Step 1: Examine current code and fix** + +The `update_meta()` function opens `extract.duckdb` directly. Since it only updates `_meta` rows and recreates views (not bulk writes), the simplest fix is to use a short-lived connection with CHECKPOINT: + +In `connectors/jira/extract_init.py`, after the `conn.execute("UPDATE _meta ...")` block, add before `conn.close()`: + +```python + conn.execute("CHECKPOINT") +``` + +This forces WAL flush and reduces the lock window. A full temp-file swap is not practical here since the Jira connector does incremental updates. + +- [ ] **Step 2: Run tests** + +Run: `pytest tests/test_e2e_extract.py::TestJiraWebhookToQuery -v` +Expected: Pass + +- [ ] **Step 3: Commit** + +```bash +git add connectors/jira/extract_init.py +git commit -m "fix: add CHECKPOINT after Jira meta update — reduce lock window" +``` + +--- + +## Workstream 8: Scalability & Robustness (P1) + +### Task 8.1: Fix table access check false positives in query endpoint + +The query endpoint checks table access with `table.lower() in sql_lower` — a substring match. A table named `id` blocks any query containing the word "id". A table named `orders` triggers on `ordered_items`. + +**Files:** +- Modify: `app/api/query.py:67-71` +- Test: `tests/test_security.py` + +- [ ] **Step 1: Write the failing test** + +```python +# In tests/test_security.py, add to TestQuerySecurity: + +def test_table_access_no_false_positive_on_column_name(self, client, auth_headers): + """A forbidden table named 'id' should not block queries that use 'id' as a column.""" + # This test verifies the table access check doesn't use naive substring matching + resp = client.post("/api/query", json={ + "sql": "SELECT id, name FROM allowed_table" + }, headers=auth_headers) + # Should not get 403 just because 'id' appears in SQL + assert resp.status_code != 403 or "id" not in resp.json().get("detail", "") +``` + +- [ ] **Step 2: Fix with word-boundary matching** + +In `app/api/query.py`, replace the table access check (lines 67-71): + +```python + # Check if query references any forbidden tables (word-boundary match) + import re + forbidden = all_views - set(allowed) + for table in forbidden: + # Use word boundaries to avoid false positives on column names + pattern = r'\b' + re.escape(table.lower()) + r'\b' + if re.search(pattern, sql_lower): + raise HTTPException(status_code=403, detail=f"Access denied to table '{table}'") +``` + +- [ ] **Step 3: Run tests** + +Run: `pytest tests/test_security.py::TestQuerySecurity -v` +Expected: All pass + +- [ ] **Step 4: Commit** + +```bash +git add app/api/query.py tests/test_security.py +git commit -m "fix: use word-boundary matching for table access check — prevent false positives" +``` + +### Task 8.2: Replace Docker healthcheck with curl + +The current healthcheck starts a full Python interpreter + imports httpx every 30 seconds. + +**Files:** +- Modify: `Dockerfile` (add curl) +- Modify: `docker-compose.yml:13` + +- [ ] **Step 1: Add curl to Dockerfile** + +In `Dockerfile`, add after the `FROM` line: + +```dockerfile +RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/* +``` + +- [ ] **Step 2: Update docker-compose healthcheck** + +In `docker-compose.yml`, replace line 13: + +```yaml + healthcheck: + test: ["CMD", "curl", "-sf", "http://localhost:8000/api/health"] + interval: 30s + timeout: 5s + retries: 3 +``` + +- [ ] **Step 3: Commit** + +```bash +git add Dockerfile docker-compose.yml +git commit -m "fix: use curl for Docker healthcheck instead of Python+httpx (faster, lighter)" +``` + +### Task 8.3: Add graceful shutdown handler + +No lifespan handler exists to close the shared DuckDB connection on shutdown. + +**Files:** +- Modify: `app/main.py` + +- [ ] **Step 1: Add lifespan handler** + +In `app/main.py`, add a lifespan context manager and use it in `FastAPI()`: + +```python +from contextlib import asynccontextmanager + +@asynccontextmanager +async def lifespan(app): + # Startup + yield + # Shutdown: close shared DuckDB connection + from src.db import close_system_db + close_system_db() +``` + +Change `app = FastAPI(...)` to `app = FastAPI(..., lifespan=lifespan)`. + +Add `close_system_db()` to `src/db.py`: + +```python +def close_system_db() -> None: + """Close the shared system DB connection. Called on app shutdown.""" + global _system_db_conn, _system_db_path + if _system_db_conn: + try: + _system_db_conn.close() + except Exception: + pass + _system_db_conn = None + _system_db_path = None +``` + +- [ ] **Step 2: Run tests** + +Run: `pytest tests/test_api.py -v` +Expected: All pass + +- [ ] **Step 3: Commit** + +```bash +git add app/main.py src/db.py +git commit -m "feat: add graceful shutdown handler — close DuckDB on app exit" +``` + +### Task 8.4: Extract shared _get_data_dir utility + +`_get_data_dir()` is copy-pasted in 4 API files. + +**Files:** +- Create: `app/utils.py` +- Modify: `app/api/sync.py`, `app/api/data.py`, `app/api/upload.py`, `app/api/catalog.py` + +- [ ] **Step 1: Create shared utility** + +```python +# app/utils.py +import os +from pathlib import Path + +def get_data_dir() -> Path: + return Path(os.environ.get("DATA_DIR", "./data")) +``` + +- [ ] **Step 2: Replace in all 4 files** + +In each file, replace: +```python +def _get_data_dir(): + return Path(os.environ.get("DATA_DIR", "./data")) +``` + +With: +```python +from app.utils import get_data_dir as _get_data_dir +``` + +- [ ] **Step 3: Run tests** + +Run: `pytest tests/ -q --tb=short` +Expected: All pass + +- [ ] **Step 4: Commit** + +```bash +git add app/utils.py app/api/sync.py app/api/data.py app/api/upload.py app/api/catalog.py +git commit -m "refactor: extract shared _get_data_dir to app/utils.py — DRY" +``` + +### Task 8.5: Move faker to dev dependencies + +Faker is a production dependency but only used for sample data generation. + +**Files:** +- Modify: `requirements.txt` +- Create: `requirements-dev.txt` + +- [ ] **Step 1: Move faker** + +Remove `faker>=24.0.0` from `requirements.txt`. + +Create `requirements-dev.txt`: + +``` +-r requirements.txt +faker>=24.0.0 +pytest>=9.0.0 +pytest-timeout>=2.0.0 +``` + +- [ ] **Step 2: Verify app starts without faker** + +Run: `python -c "from app.main import create_app; print('OK')"` +Expected: OK (faker not imported at startup) + +- [ ] **Step 3: Commit** + +```bash +git add requirements.txt requirements-dev.txt +git commit -m "chore: move faker to dev dependencies — not needed in production" +``` + +--- + +## Workstream 9: Missing Test Coverage (P0-P1) + +### Task 9.1: Add web UI smoke tests + +`app/web/router.py` has 46 functions with almost no test coverage. A template error would not be caught. + +**Files:** +- Create: `tests/test_web_ui.py` + +- [ ] **Step 1: Create smoke tests for all authenticated pages** + +```python +"""Smoke tests for web UI pages — verify they render without template errors.""" + +import os +import pytest +import duckdb +from fastapi.testclient import TestClient + + +@pytest.fixture +def web_client(tmp_path, monkeypatch): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + monkeypatch.setenv("TESTING", "1") + (tmp_path / "state").mkdir() + (tmp_path / "analytics").mkdir() + (tmp_path / "extracts").mkdir() + + from app.main import create_app + app = create_app() + return TestClient(app) + + +@pytest.fixture +def admin_cookie(web_client, tmp_path, monkeypatch): + """Create admin user and return cookie dict.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + from app.auth.jwt import create_access_token + + conn = get_system_db() + UserRepository(conn).create(id="admin1", email="admin@test.com", role="admin") + conn.close() + + token = create_access_token(user_id="admin1", email="admin@test.com", role="admin") + return {"access_token": token} + + +class TestWebUISmoke: + """Every page should return 200 without template errors.""" + + def test_login_page(self, web_client): + resp = web_client.get("/login") + assert resp.status_code == 200 + + def test_dashboard(self, web_client, admin_cookie): + resp = web_client.get("/", cookies=admin_cookie) + assert resp.status_code in (200, 302) + + def test_catalog(self, web_client, admin_cookie): + resp = web_client.get("/catalog", cookies=admin_cookie) + assert resp.status_code == 200 + + def test_admin_tables(self, web_client, admin_cookie): + resp = web_client.get("/admin/tables", cookies=admin_cookie) + assert resp.status_code == 200 + + def test_admin_permissions(self, web_client, admin_cookie): + resp = web_client.get("/admin/permissions", cookies=admin_cookie) + assert resp.status_code == 200 + + def test_corporate_memory(self, web_client, admin_cookie): + resp = web_client.get("/corporate-memory", cookies=admin_cookie) + assert resp.status_code == 200 + + def test_activity_center(self, web_client, admin_cookie): + resp = web_client.get("/activity-center", cookies=admin_cookie) + assert resp.status_code == 200 +``` + +- [ ] **Step 2: Run tests** + +Run: `pytest tests/test_web_ui.py -v` +Expected: All pass (or reveal actual template errors) + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_web_ui.py +git commit -m "test: add web UI smoke tests — catch template errors in 7 pages" +``` + +### Task 9.2: Add Jira service integration tests + +`connectors/jira/service.py` (15 functions) orchestrates the entire Jira webhook flow but has no dedicated tests. + +**Files:** +- Create: `tests/test_jira_service.py` + +- [ ] **Step 1: Create integration tests** + +```python +"""Tests for Jira service — webhook event processing pipeline.""" + +import os +from pathlib import Path +from unittest.mock import patch, MagicMock + +import duckdb +import pytest + +from connectors.jira.extract_init import init_extract, update_meta + + +@pytest.fixture +def jira_env(tmp_path, monkeypatch): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + jira_dir = tmp_path / "extracts" / "jira" + jira_dir.mkdir(parents=True) + return jira_dir + + +class TestJiraExtractInit: + def test_init_creates_extract_db(self, jira_env): + init_extract(jira_env) + assert (jira_env / "extract.duckdb").exists() + + conn = duckdb.connect(str(jira_env / "extract.duckdb")) + meta = conn.execute("SELECT * FROM _meta").fetchall() + conn.close() + assert isinstance(meta, list) + + def test_update_meta_creates_view(self, jira_env): + init_extract(jira_env) + + # Create a parquet file for 'issues' + issues_dir = jira_env / "data" / "issues" + issues_dir.mkdir(parents=True) + pq_path = str(issues_dir / "2026-04.parquet") + tmp = duckdb.connect() + tmp.execute( + f"COPY (SELECT 'PROJ-1' AS issue_key, 'Bug' AS type) " + f"TO '{pq_path}' (FORMAT PARQUET)" + ) + tmp.close() + + update_meta(jira_env, "issues") + + conn = duckdb.connect(str(jira_env / "extract.duckdb")) + rows = conn.execute("SELECT rows FROM _meta WHERE table_name='issues'").fetchone() + assert rows[0] == 1 + + data = conn.execute("SELECT issue_key FROM issues").fetchone() + assert data[0] == "PROJ-1" + conn.close() +``` + +- [ ] **Step 2: Run tests** + +Run: `pytest tests/test_jira_service.py -v` +Expected: All pass + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_jira_service.py +git commit -m "test: add Jira extract_init integration tests" +``` + +### Task 9.3: Add instance_config tests + +`app/instance_config.py` (10 functions) is loaded at startup and affects all web pages. No tests exist. + +**Files:** +- Create: `tests/test_instance_config.py` + +- [ ] **Step 1: Create tests** + +```python +"""Tests for instance_config — YAML loading and accessor functions.""" + +import os +from pathlib import Path + +import pytest + + +@pytest.fixture +def config_env(tmp_path, monkeypatch): + config_dir = tmp_path / "config" + config_dir.mkdir() + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + return config_dir + + +class TestInstanceConfig: + def test_missing_config_file_returns_defaults(self, config_env, monkeypatch): + """Missing instance.yaml should not crash, just return defaults.""" + from app.instance_config import get_instance_name, get_data_source_type + # Should return some default, not crash + name = get_instance_name() + assert isinstance(name, str) + + def test_loads_valid_yaml(self, config_env, tmp_path, monkeypatch): + """Valid instance.yaml should be loaded and accessible.""" + yaml_path = tmp_path / "config" / "instance.yaml" + yaml_path.write_text("instance_name: Test Instance\ndata_source: keboola\n") + + from app.instance_config import load_instance_config, get_instance_name + import importlib + import app.instance_config as mod + importlib.reload(mod) + + name = mod.get_instance_name() + assert "Test" in name or isinstance(name, str) +``` + +- [ ] **Step 2: Run tests** + +Run: `pytest tests/test_instance_config.py -v` +Expected: All pass + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_instance_config.py +git commit -m "test: add instance_config tests — missing file, valid YAML" +``` + +### Task 9.4: Add concurrent rebuild safety test + +Verify the atomic swap pattern works when a read connection is open. + +**Files:** +- Modify: `tests/test_orchestrator.py` + +- [ ] **Step 1: Write the test** + +```python +# In tests/test_orchestrator.py, add: + +def test_rebuild_while_reading(setup_env): + """Rebuild should succeed even while a read-only connection exists.""" + from src.orchestrator import SyncOrchestrator + import duckdb + + _create_mock_extract( + setup_env["extracts_dir"], "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + orch.rebuild() + + # Open a read-only connection (simulating query endpoint) + reader = duckdb.connect(setup_env["analytics_db"], read_only=True) + + # Rebuild while reader is open — should not crash + result = orch.rebuild() + assert "keboola" in result + + reader.close() +``` + +- [ ] **Step 2: Run test** + +Run: `pytest tests/test_orchestrator.py::test_rebuild_while_reading -v` +Expected: Pass + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_orchestrator.py +git commit -m "test: add concurrent rebuild safety test" +``` + +--- + +## Execution Order + +Workstreams are independent and can run in parallel. Within each workstream, tasks are sequential. + +**Critical path (do first):** +1. Task 1.1 (password bypass) — active auth vulnerability +2. Task 3.1 (rebuild_source) — active data loss bug +3. Task 2.1 (SQL injection) — security hardening + +**Then:** +4. Tasks 1.2, 1.3 (JWT hardening) +5. Tasks 2.2 (query blocklist) +6. Tasks 3.2, 3.3 (WAL + BQ swap) +7. Task 4.1 (sandbox) +8. Tasks 7.1-7.4 (DuckDB lifecycle) +9. Tasks 8.1-8.5 (scalability + cleanup) +10. Tasks 5.1-5.3 (test hardening) +11. Tasks 9.1-9.4 (missing test coverage) +12. Tasks 6.1-6.4 (docs + cleanup) +13. Task 1.4 (cookie auth) + +**Verification after all tasks:** + +```bash +pytest tests/ -v --tb=short # All 620+ tests pass +``` + +Workstreams are independent and can run in parallel. Within each workstream, tasks are sequential. + +**Critical path (do first):** +1. Task 1.1 (password bypass) — active auth vulnerability +2. Task 3.1 (rebuild_source) — active data loss bug +3. Task 2.1 (SQL injection) — security hardening + +**Then:** +4. Tasks 1.2, 1.3 (JWT hardening) +5. Tasks 2.2 (query blocklist) +6. Tasks 3.2, 3.3 (WAL + BQ swap) +7. Task 4.1 (sandbox) +8. Tasks 5.1-5.3 (test hardening) +9. Tasks 6.1-6.4 (docs + cleanup) + +**Verification after all tasks:** + +```bash +pytest tests/ -v --tb=short # All 607+ tests pass +``` diff --git a/src/db.py b/src/db.py index 671c872..b381f03 100644 --- a/src/db.py +++ b/src/db.py @@ -200,11 +200,6 @@ def get_system_db() -> duckdb.DuckDBPyConnection: _system_db_conn = duckdb.connect(db_path) _system_db_path = db_path _ensure_schema(_system_db_conn) - # WAL mode: allows concurrent readers while writing - try: - _system_db_conn.execute("PRAGMA enable_wal") - except Exception: - pass # Older DuckDB versions may not support this return _system_db_conn.cursor() diff --git a/src/orchestrator.py b/src/orchestrator.py index 6bd3efc..37b0d75 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -241,8 +241,9 @@ class SyncOrchestrator: conn.execute(f"INSTALL {extension} FROM community; LOAD {extension};") if token: + escaped_token = token.replace("'", "''") conn.execute( - f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{token}')" + f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" ) else: # Extensions like BigQuery handle auth via env (e.g. GOOGLE_APPLICATION_CREDENTIALS)