feat: add DuckDB state layer with schema management

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
ZdenekSrotyr 2026-03-27 13:55:54 +01:00
parent eb7e5bdf8f
commit f76411c603
2 changed files with 295 additions and 0 deletions

190
src/db.py Normal file
View file

@ -0,0 +1,190 @@
"""DuckDB connection management and schema initialization.
Provides connections to the system state database and analytics database,
with automatic directory creation and schema bootstrapping.
"""
import os
from pathlib import Path
import duckdb
SCHEMA_VERSION = 1
_SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL,
applied_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS audit_log (
id VARCHAR PRIMARY KEY,
timestamp TIMESTAMP DEFAULT current_timestamp,
actor VARCHAR,
action VARCHAR NOT NULL,
entity_type VARCHAR,
entity_id VARCHAR,
details JSON
);
CREATE TABLE IF NOT EXISTS dataset_permissions (
id VARCHAR PRIMARY KEY,
user_email VARCHAR NOT NULL,
dataset VARCHAR NOT NULL,
permission VARCHAR NOT NULL DEFAULT 'read',
granted_by VARCHAR,
granted_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS knowledge_items (
id VARCHAR PRIMARY KEY,
title VARCHAR NOT NULL,
content VARCHAR,
category VARCHAR,
author VARCHAR,
status VARCHAR DEFAULT 'active',
metadata JSON,
created_at TIMESTAMP DEFAULT current_timestamp,
updated_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS knowledge_votes (
id VARCHAR PRIMARY KEY,
item_id VARCHAR NOT NULL,
user_email VARCHAR NOT NULL,
vote INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS pending_codes (
code VARCHAR PRIMARY KEY,
user_email VARCHAR NOT NULL,
purpose VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
expires_at TIMESTAMP
);
CREATE TABLE IF NOT EXISTS script_registry (
id VARCHAR PRIMARY KEY,
name VARCHAR NOT NULL,
path VARCHAR NOT NULL,
description VARCHAR,
author VARCHAR,
metadata JSON,
created_at TIMESTAMP DEFAULT current_timestamp,
updated_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS sync_history (
id VARCHAR PRIMARY KEY,
table_name VARCHAR NOT NULL,
status VARCHAR NOT NULL,
rows_synced INTEGER,
started_at TIMESTAMP DEFAULT current_timestamp,
finished_at TIMESTAMP,
error VARCHAR,
metadata JSON
);
CREATE TABLE IF NOT EXISTS sync_state (
table_name VARCHAR PRIMARY KEY,
last_sync TIMESTAMP,
status VARCHAR DEFAULT 'pending',
row_count INTEGER,
file_hash VARCHAR,
metadata JSON
);
CREATE TABLE IF NOT EXISTS table_profiles (
table_name VARCHAR PRIMARY KEY,
profile JSON,
profiled_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS table_registry (
table_name VARCHAR PRIMARY KEY,
bucket VARCHAR,
source VARCHAR,
sync_strategy VARCHAR DEFAULT 'full',
primary_key VARCHAR,
description VARCHAR,
metadata JSON,
registered_at TIMESTAMP DEFAULT current_timestamp,
updated_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS telegram_links (
chat_id VARCHAR PRIMARY KEY,
user_email VARCHAR NOT NULL,
linked_at TIMESTAMP DEFAULT current_timestamp,
active BOOLEAN DEFAULT true
);
CREATE TABLE IF NOT EXISTS user_sync_settings (
user_email VARCHAR PRIMARY KEY,
settings JSON,
updated_at TIMESTAMP DEFAULT current_timestamp
);
CREATE TABLE IF NOT EXISTS users (
email VARCHAR PRIMARY KEY,
name VARCHAR,
picture VARCHAR,
role VARCHAR DEFAULT 'analyst',
is_active BOOLEAN DEFAULT true,
metadata JSON,
created_at TIMESTAMP DEFAULT current_timestamp,
last_login TIMESTAMP
);
"""
def _get_data_dir() -> Path:
"""Return the DATA_DIR path, defaulting to ./data."""
return Path(os.environ.get("DATA_DIR", "data"))
def get_system_db() -> duckdb.DuckDBPyConnection:
"""Open (or create) the system state database and ensure schema exists.
Returns a DuckDB connection to {DATA_DIR}/state/system.duckdb.
Creates directories and all schema tables on first call.
"""
db_dir = _get_data_dir() / "state"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "system.duckdb"
conn = duckdb.connect(str(db_path))
conn.execute(_SCHEMA_SQL)
# Seed schema_version if empty
row = conn.execute("SELECT COUNT(*) FROM schema_version").fetchone()
if row[0] == 0:
conn.execute(
"INSERT INTO schema_version (version) VALUES (?)", [SCHEMA_VERSION]
)
return conn
def get_analytics_db() -> duckdb.DuckDBPyConnection:
"""Open (or create) the analytics database.
Returns a DuckDB connection to {DATA_DIR}/analytics/server.duckdb.
Creates directories if needed.
"""
db_dir = _get_data_dir() / "analytics"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "server.duckdb"
return duckdb.connect(str(db_path))
def get_schema_version(conn: duckdb.DuckDBPyConnection) -> int:
"""Return the current schema version, or 0 if no schema_version table."""
try:
row = conn.execute(
"SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1"
).fetchone()
return row[0] if row else 0
except duckdb.CatalogException:
return 0

105
tests/test_db.py Normal file
View file

@ -0,0 +1,105 @@
"""Tests for src.db — DuckDB connection management and schema."""
import os
import tempfile
import duckdb
import pytest
def _setup_data_dir(tmp_path):
"""Set DATA_DIR env var to a temporary directory."""
os.environ["DATA_DIR"] = str(tmp_path)
class TestGetSystemDb:
"""Tests for get_system_db()."""
def test_get_system_db_creates_tables(self, tmp_path):
_setup_data_dir(tmp_path)
from src.db import get_system_db
conn = get_system_db()
try:
tables = [
row[0]
for row in conn.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'main' ORDER BY table_name"
).fetchall()
]
expected = sorted([
"schema_version",
"users",
"sync_state",
"sync_history",
"user_sync_settings",
"knowledge_items",
"knowledge_votes",
"audit_log",
"telegram_links",
"pending_codes",
"script_registry",
"table_registry",
"table_profiles",
"dataset_permissions",
])
assert tables == expected
finally:
conn.close()
def test_get_system_db_idempotent(self, tmp_path):
_setup_data_dir(tmp_path)
from src.db import get_system_db
conn = get_system_db()
conn.execute(
"INSERT INTO users (email, name) VALUES ('test@example.com', 'Test')"
)
conn.close()
conn2 = get_system_db()
try:
rows = conn2.execute("SELECT email FROM users").fetchall()
assert len(rows) == 1
assert rows[0][0] == "test@example.com"
finally:
conn2.close()
class TestGetSchemaVersion:
"""Tests for get_schema_version()."""
def test_get_schema_version(self, tmp_path):
_setup_data_dir(tmp_path)
from src.db import get_schema_version, get_system_db
conn = get_system_db()
try:
assert get_schema_version(conn) == 1
finally:
conn.close()
def test_get_schema_version_no_table(self, tmp_path):
_setup_data_dir(tmp_path)
from src.db import get_schema_version
db_path = tmp_path / "empty.duckdb"
conn = duckdb.connect(str(db_path))
try:
assert get_schema_version(conn) == 0
finally:
conn.close()
class TestGetAnalyticsDb:
"""Tests for get_analytics_db()."""
def test_get_analytics_db(self, tmp_path):
_setup_data_dir(tmp_path)
from src.db import get_analytics_db
conn = get_analytics_db()
try:
assert (tmp_path / "analytics" / "server.duckdb").exists()
finally:
conn.close()