From f76411c603f60ce74676f062163b76fde6ad7409 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Fri, 27 Mar 2026 13:55:54 +0100 Subject: [PATCH] feat: add DuckDB state layer with schema management Co-Authored-By: Claude Opus 4.6 (1M context) --- src/db.py | 190 +++++++++++++++++++++++++++++++++++++++++++++++ tests/test_db.py | 105 ++++++++++++++++++++++++++ 2 files changed, 295 insertions(+) create mode 100644 src/db.py create mode 100644 tests/test_db.py diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..66052a2 --- /dev/null +++ b/src/db.py @@ -0,0 +1,190 @@ +"""DuckDB connection management and schema initialization. + +Provides connections to the system state database and analytics database, +with automatic directory creation and schema bootstrapping. +""" +import os +from pathlib import Path + +import duckdb + +SCHEMA_VERSION = 1 + +_SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS schema_version ( + version INTEGER NOT NULL, + applied_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS audit_log ( + id VARCHAR PRIMARY KEY, + timestamp TIMESTAMP DEFAULT current_timestamp, + actor VARCHAR, + action VARCHAR NOT NULL, + entity_type VARCHAR, + entity_id VARCHAR, + details JSON +); + +CREATE TABLE IF NOT EXISTS dataset_permissions ( + id VARCHAR PRIMARY KEY, + user_email VARCHAR NOT NULL, + dataset VARCHAR NOT NULL, + permission VARCHAR NOT NULL DEFAULT 'read', + granted_by VARCHAR, + granted_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS knowledge_items ( + id VARCHAR PRIMARY KEY, + title VARCHAR NOT NULL, + content VARCHAR, + category VARCHAR, + author VARCHAR, + status VARCHAR DEFAULT 'active', + metadata JSON, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS knowledge_votes ( + id VARCHAR PRIMARY KEY, + item_id VARCHAR NOT NULL, + user_email VARCHAR NOT NULL, + vote INTEGER NOT NULL, + created_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS pending_codes ( + code VARCHAR PRIMARY KEY, + user_email VARCHAR NOT NULL, + purpose VARCHAR, + created_at TIMESTAMP DEFAULT current_timestamp, + expires_at TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS script_registry ( + id VARCHAR PRIMARY KEY, + name VARCHAR NOT NULL, + path VARCHAR NOT NULL, + description VARCHAR, + author VARCHAR, + metadata JSON, + created_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS sync_history ( + id VARCHAR PRIMARY KEY, + table_name VARCHAR NOT NULL, + status VARCHAR NOT NULL, + rows_synced INTEGER, + started_at TIMESTAMP DEFAULT current_timestamp, + finished_at TIMESTAMP, + error VARCHAR, + metadata JSON +); + +CREATE TABLE IF NOT EXISTS sync_state ( + table_name VARCHAR PRIMARY KEY, + last_sync TIMESTAMP, + status VARCHAR DEFAULT 'pending', + row_count INTEGER, + file_hash VARCHAR, + metadata JSON +); + +CREATE TABLE IF NOT EXISTS table_profiles ( + table_name VARCHAR PRIMARY KEY, + profile JSON, + profiled_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS table_registry ( + table_name VARCHAR PRIMARY KEY, + bucket VARCHAR, + source VARCHAR, + sync_strategy VARCHAR DEFAULT 'full', + primary_key VARCHAR, + description VARCHAR, + metadata JSON, + registered_at TIMESTAMP DEFAULT current_timestamp, + updated_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS telegram_links ( + chat_id VARCHAR PRIMARY KEY, + user_email VARCHAR NOT NULL, + linked_at TIMESTAMP DEFAULT current_timestamp, + active BOOLEAN DEFAULT true +); + +CREATE TABLE IF NOT EXISTS user_sync_settings ( + user_email VARCHAR PRIMARY KEY, + settings JSON, + updated_at TIMESTAMP DEFAULT current_timestamp +); + +CREATE TABLE IF NOT EXISTS users ( + email VARCHAR PRIMARY KEY, + name VARCHAR, + picture VARCHAR, + role VARCHAR DEFAULT 'analyst', + is_active BOOLEAN DEFAULT true, + metadata JSON, + created_at TIMESTAMP DEFAULT current_timestamp, + last_login TIMESTAMP +); +""" + + +def _get_data_dir() -> Path: + """Return the DATA_DIR path, defaulting to ./data.""" + return Path(os.environ.get("DATA_DIR", "data")) + + +def get_system_db() -> duckdb.DuckDBPyConnection: + """Open (or create) the system state database and ensure schema exists. + + Returns a DuckDB connection to {DATA_DIR}/state/system.duckdb. + Creates directories and all schema tables on first call. + """ + db_dir = _get_data_dir() / "state" + db_dir.mkdir(parents=True, exist_ok=True) + db_path = db_dir / "system.duckdb" + + conn = duckdb.connect(str(db_path)) + conn.execute(_SCHEMA_SQL) + + # Seed schema_version if empty + row = conn.execute("SELECT COUNT(*) FROM schema_version").fetchone() + if row[0] == 0: + conn.execute( + "INSERT INTO schema_version (version) VALUES (?)", [SCHEMA_VERSION] + ) + + return conn + + +def get_analytics_db() -> duckdb.DuckDBPyConnection: + """Open (or create) the analytics database. + + Returns a DuckDB connection to {DATA_DIR}/analytics/server.duckdb. + Creates directories if needed. + """ + db_dir = _get_data_dir() / "analytics" + db_dir.mkdir(parents=True, exist_ok=True) + db_path = db_dir / "server.duckdb" + + return duckdb.connect(str(db_path)) + + +def get_schema_version(conn: duckdb.DuckDBPyConnection) -> int: + """Return the current schema version, or 0 if no schema_version table.""" + try: + row = conn.execute( + "SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1" + ).fetchone() + return row[0] if row else 0 + except duckdb.CatalogException: + return 0 diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..b571bc4 --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,105 @@ +"""Tests for src.db — DuckDB connection management and schema.""" +import os +import tempfile + +import duckdb +import pytest + + +def _setup_data_dir(tmp_path): + """Set DATA_DIR env var to a temporary directory.""" + os.environ["DATA_DIR"] = str(tmp_path) + + +class TestGetSystemDb: + """Tests for get_system_db().""" + + def test_get_system_db_creates_tables(self, tmp_path): + _setup_data_dir(tmp_path) + from src.db import get_system_db + + conn = get_system_db() + try: + tables = [ + row[0] + for row in conn.execute( + "SELECT table_name FROM information_schema.tables " + "WHERE table_schema = 'main' ORDER BY table_name" + ).fetchall() + ] + expected = sorted([ + "schema_version", + "users", + "sync_state", + "sync_history", + "user_sync_settings", + "knowledge_items", + "knowledge_votes", + "audit_log", + "telegram_links", + "pending_codes", + "script_registry", + "table_registry", + "table_profiles", + "dataset_permissions", + ]) + assert tables == expected + finally: + conn.close() + + def test_get_system_db_idempotent(self, tmp_path): + _setup_data_dir(tmp_path) + from src.db import get_system_db + + conn = get_system_db() + conn.execute( + "INSERT INTO users (email, name) VALUES ('test@example.com', 'Test')" + ) + conn.close() + + conn2 = get_system_db() + try: + rows = conn2.execute("SELECT email FROM users").fetchall() + assert len(rows) == 1 + assert rows[0][0] == "test@example.com" + finally: + conn2.close() + + +class TestGetSchemaVersion: + """Tests for get_schema_version().""" + + def test_get_schema_version(self, tmp_path): + _setup_data_dir(tmp_path) + from src.db import get_schema_version, get_system_db + + conn = get_system_db() + try: + assert get_schema_version(conn) == 1 + finally: + conn.close() + + def test_get_schema_version_no_table(self, tmp_path): + _setup_data_dir(tmp_path) + from src.db import get_schema_version + + db_path = tmp_path / "empty.duckdb" + conn = duckdb.connect(str(db_path)) + try: + assert get_schema_version(conn) == 0 + finally: + conn.close() + + +class TestGetAnalyticsDb: + """Tests for get_analytics_db().""" + + def test_get_analytics_db(self, tmp_path): + _setup_data_dir(tmp_path) + from src.db import get_analytics_db + + conn = get_analytics_db() + try: + assert (tmp_path / "analytics" / "server.duckdb").exists() + finally: + conn.close()