diff --git a/scripts/migrate_json_to_duckdb.py b/scripts/migrate_json_to_duckdb.py new file mode 100644 index 0000000..d364ba2 --- /dev/null +++ b/scripts/migrate_json_to_duckdb.py @@ -0,0 +1,182 @@ +""" +One-time migration: JSON files -> DuckDB. + +Usage: python -m scripts.migrate_json_to_duckdb [--data-dir /data] + +Idempotent -- safe to run multiple times. Uses UPSERT to avoid duplicates. +""" + +import json +import logging +import os +import sys +from pathlib import Path + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + + +def _load_json(path: str): + try: + with open(path) as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + logger.warning(f"Skipping {path}: {e}") + return None + + +def migrate_all(data_dir: str = None) -> dict: + if data_dir: + os.environ["DATA_DIR"] = data_dir + data = Path(data_dir or os.environ.get("DATA_DIR", "./data")) + + from src.db import get_system_db + from src.repositories.sync_state import SyncStateRepository + from src.repositories.knowledge import KnowledgeRepository + from src.repositories.notifications import TelegramRepository, PendingCodeRepository + from src.repositories.users import UserRepository + from src.repositories.table_registry import TableRegistryRepository + from src.repositories.profiles import ProfileRepository + + conn = get_system_db() + stats = {} + + # 1. Sync state + sync_data = _load_json(str(data / "src_data" / "metadata" / "sync_state.json")) + count = 0 + if sync_data and isinstance(sync_data, dict): + repo = SyncStateRepository(conn) + tables = sync_data.get("tables", sync_data) + if not isinstance(tables, dict): + tables = {} + for table_id, info in tables.items(): + if isinstance(info, dict): + repo.update_sync( + table_id=table_id, + rows=info.get("rows", 0), + file_size_bytes=info.get("file_size_bytes", 0), + hash=info.get("hash", ""), + uncompressed_size_bytes=info.get("uncompressed_size_bytes", 0), + columns=info.get("columns", 0), + ) + count += 1 + stats["sync_state"] = count + logger.info(f"Migrated {count} sync state entries") + + # 2. Knowledge items + knowledge = _load_json(str(data / "corporate-memory" / "knowledge.json")) + count = 0 + if knowledge and isinstance(knowledge, list): + repo = KnowledgeRepository(conn) + for item in knowledge: + if not isinstance(item, dict): + continue + item_id = item.get("id", "") + if not item_id: + continue + # Check if exists (idempotent) + existing = repo.get_by_id(item_id) + if existing: + continue + repo.create( + id=item_id, + title=item.get("title", ""), + content=item.get("content", ""), + category=item.get("category", ""), + source_user=item.get("source_user"), + tags=item.get("tags"), + status=item.get("status", "pending"), + ) + count += 1 + stats["knowledge"] = count + logger.info(f"Migrated {count} knowledge items") + + # 3. Telegram users + telegram = _load_json(str(data / "notifications" / "telegram_users.json")) + count = 0 + if telegram and isinstance(telegram, dict): + repo = TelegramRepository(conn) + for user_id, info in telegram.items(): + if isinstance(info, dict) and "chat_id" in info: + repo.link_user(user_id, chat_id=info["chat_id"]) + count += 1 + stats["telegram"] = count + logger.info(f"Migrated {count} telegram links") + + # 4. Password users + password_users = _load_json(str(data / "auth" / "password_users.json")) + count = 0 + if password_users and isinstance(password_users, dict): + user_repo = UserRepository(conn) + for email, info in password_users.items(): + if not isinstance(info, dict): + continue + existing = user_repo.get_by_email(email) + if existing: + continue + import uuid + user_repo.create( + id=str(uuid.uuid4()), + email=email, + name=info.get("name", email.split("@")[0]), + role="analyst", + password_hash=info.get("password_hash"), + ) + count += 1 + stats["users"] = count + logger.info(f"Migrated {count} password users") + + # 5. Table registry + registry = _load_json(str(data / "src_data" / "metadata" / "table_registry.json")) + count = 0 + if registry and isinstance(registry, dict): + repo = TableRegistryRepository(conn) + tables_list = registry.get("tables", []) + if isinstance(tables_list, list): + for t in tables_list: + if not isinstance(t, dict): + continue + tid = t.get("id", "") + if not tid: + continue + existing = repo.get(tid) + if existing: + continue + repo.register( + id=tid, + name=t.get("name", tid), + folder=t.get("folder"), + sync_strategy=t.get("sync_strategy"), + primary_key=t.get("primary_key"), + description=t.get("description"), + registered_by=t.get("registered_by"), + ) + count += 1 + stats["table_registry"] = count + logger.info(f"Migrated {count} table registry entries") + + # 6. Profiles + profiles = _load_json(str(data / "src_data" / "metadata" / "profiles.json")) + count = 0 + if profiles and isinstance(profiles, dict): + repo = ProfileRepository(conn) + tables_data = profiles.get("tables", profiles) + if isinstance(tables_data, dict): + for table_id, profile in tables_data.items(): + if isinstance(profile, dict): + repo.save(table_id, profile) + count += 1 + stats["profiles"] = count + logger.info(f"Migrated {count} table profiles") + + conn.close() + logger.info(f"Migration complete: {stats}") + return stats + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="Migrate JSON state to DuckDB") + parser.add_argument("--data-dir", default=None) + args = parser.parse_args() + migrate_all(args.data_dir) diff --git a/tests/test_migration.py b/tests/test_migration.py new file mode 100644 index 0000000..7802b72 --- /dev/null +++ b/tests/test_migration.py @@ -0,0 +1,91 @@ +"""Tests for JSON -> DuckDB migration script.""" + +import json +import os +import pytest + + +@pytest.fixture +def migration_env(tmp_path): + """Create temp dir with sample JSON files mimicking production layout.""" + data_dir = tmp_path / "data" + (data_dir / "notifications").mkdir(parents=True) + (data_dir / "corporate-memory").mkdir(parents=True) + (data_dir / "auth").mkdir(parents=True) + (data_dir / "src_data" / "metadata").mkdir(parents=True) + + # sync_state.json + (data_dir / "src_data" / "metadata" / "sync_state.json").write_text(json.dumps({ + "tables": { + "orders": {"rows": 1000, "file_size_bytes": 5000, "hash": "abc"}, + "customers": {"rows": 500, "file_size_bytes": 2000, "hash": "def"}, + } + })) + + # knowledge.json + (data_dir / "corporate-memory" / "knowledge.json").write_text(json.dumps([ + {"id": "k1", "title": "MRR", "content": "Monthly...", "category": "metrics", "status": "approved"}, + {"id": "k2", "title": "Churn", "content": "Rate of...", "category": "metrics", "status": "pending"}, + ])) + + # telegram_users.json + (data_dir / "notifications" / "telegram_users.json").write_text(json.dumps({ + "petr@acme.com": {"chat_id": 12345, "linked_at": "2026-01-01"}, + })) + + # password_users.json + (data_dir / "auth" / "password_users.json").write_text(json.dumps({ + "ext@partner.com": {"name": "External User", "password_hash": "$argon2id$hash123"}, + })) + + # table_registry.json + (data_dir / "src_data" / "metadata" / "table_registry.json").write_text(json.dumps({ + "tables": [ + {"id": "orders", "name": "Orders", "folder": "sales", "sync_strategy": "incremental"}, + ] + })) + + # profiles.json + (data_dir / "src_data" / "metadata" / "profiles.json").write_text(json.dumps({ + "tables": { + "orders": {"row_count": 1000, "columns": [{"name": "id"}]}, + } + })) + + os.environ["DATA_DIR"] = str(data_dir) + return str(data_dir) + + +def test_migration_runs(migration_env): + from scripts.migrate_json_to_duckdb import migrate_all + stats = migrate_all(migration_env) + assert stats["sync_state"] == 2 + assert stats["knowledge"] == 2 + assert stats["telegram"] == 1 + assert stats["users"] == 1 + assert stats["table_registry"] == 1 + assert stats["profiles"] == 1 + + +def test_migration_idempotent(migration_env): + from scripts.migrate_json_to_duckdb import migrate_all + stats1 = migrate_all(migration_env) + stats2 = migrate_all(migration_env) + # Second run should find existing items and skip them + assert stats2["knowledge"] == 0 # already existed + assert stats2["users"] == 0 + assert stats2["table_registry"] == 0 + # sync_state uses UPSERT so count stays same + assert stats2["sync_state"] == 2 + + +def test_migration_with_missing_files(tmp_path): + """Migration should handle missing JSON files gracefully.""" + data_dir = tmp_path / "empty_data" + data_dir.mkdir() + os.environ["DATA_DIR"] = str(data_dir) + from scripts.migrate_json_to_duckdb import migrate_all + stats = migrate_all(str(data_dir)) + assert stats["sync_state"] == 0 + assert stats["knowledge"] == 0 + assert stats["telegram"] == 0