diff --git a/scripts/migrate_parquets_to_extracts.py b/scripts/migrate_parquets_to_extracts.py new file mode 100644 index 0000000..73dd23b --- /dev/null +++ b/scripts/migrate_parquets_to_extracts.py @@ -0,0 +1,134 @@ +"""Move existing parquet files to extract.duckdb directory structure. + +One-time script for existing deployments. Moves parquets from +/data/src_data/parquet/ to /data/extracts/{source}/data/ and creates +extract.duckdb with _meta + views. + +Usage: + python scripts/migrate_parquets_to_extracts.py [--source keboola] [--dry-run] +""" + +import argparse +import logging +import os +import shutil +import sys +from datetime import datetime, timezone +from pathlib import Path + +import duckdb + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def migrate_parquets(source_name: str, dry_run: bool = False) -> dict: + """Move parquets and create extract.duckdb. + + Returns: {moved: int, total_bytes: int, tables: list[str]} + """ + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + old_parquet_dir = data_dir / "src_data" / "parquet" + new_extract_dir = data_dir / "extracts" / source_name + new_data_dir = new_extract_dir / "data" + + if not old_parquet_dir.exists(): + logger.warning("No parquet directory found at %s", old_parquet_dir) + return {"moved": 0, "total_bytes": 0, "tables": []} + + parquet_files = list(old_parquet_dir.rglob("*.parquet")) + if not parquet_files: + logger.warning("No parquet files found in %s", old_parquet_dir) + return {"moved": 0, "total_bytes": 0, "tables": []} + + logger.info("Found %d parquet files in %s", len(parquet_files), old_parquet_dir) + + if not dry_run: + new_data_dir.mkdir(parents=True, exist_ok=True) + + moved = 0 + total_bytes = 0 + tables = [] + + for pq_file in parquet_files: + table_name = pq_file.stem + size = pq_file.stat().st_size + dest = new_data_dir / pq_file.name + + if dry_run: + logger.info(" [DRY RUN] Would move: %s -> %s (%d bytes)", pq_file, dest, size) + else: + # Copy instead of move to be safe — user can delete originals after verification + shutil.copy2(str(pq_file), str(dest)) + logger.info(" Copied: %s -> %s (%d bytes)", pq_file.name, dest, size) + + moved += 1 + total_bytes += size + if table_name not in tables: + tables.append(table_name) + + # Create extract.duckdb + if not dry_run and tables: + db_path = new_extract_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + try: + conn.execute("DROP TABLE IF EXISTS _meta") + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR NOT NULL, + description VARCHAR, + rows BIGINT, + size_bytes BIGINT, + extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""") + + now = datetime.now(timezone.utc) + for table_name in tables: + pq_path = str(new_data_dir / f"{table_name}.parquet") + if not Path(pq_path).exists(): + continue + + # Create view + conn.execute( + f'CREATE OR REPLACE VIEW "{table_name}" AS SELECT * FROM read_parquet(\'{pq_path}\')' + ) + + # Count rows + try: + rows = conn.execute(f"SELECT count(*) FROM read_parquet('{pq_path}')").fetchone()[0] + except Exception: + rows = 0 + + size = Path(pq_path).stat().st_size + conn.execute( + "INSERT INTO _meta VALUES (?, ?, ?, ?, ?, 'local')", + [table_name, "", rows, size, now], + ) + + logger.info("Created extract.duckdb at %s with %d tables", db_path, len(tables)) + finally: + conn.close() + + return {"moved": moved, "total_bytes": total_bytes, "tables": tables} + + +def main(): + parser = argparse.ArgumentParser(description="Migrate parquets to extract.duckdb structure") + parser.add_argument("--source", default="keboola", help="Source name (default: keboola)") + parser.add_argument("--dry-run", action="store_true", help="Show what would be done without doing it") + args = parser.parse_args() + + result = migrate_parquets(args.source, dry_run=args.dry_run) + logger.info( + "Migration %s: %d files, %d tables, %.1f MB", + "preview" if args.dry_run else "complete", + result["moved"], + len(result["tables"]), + result["total_bytes"] / 1024 / 1024, + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/migrate_registry_to_duckdb.py b/scripts/migrate_registry_to_duckdb.py new file mode 100644 index 0000000..c078c26 --- /dev/null +++ b/scripts/migrate_registry_to_duckdb.py @@ -0,0 +1,186 @@ +"""Migrate table registry from data_description.md or JSON to DuckDB. + +One-time script for existing deployments transitioning to extract.duckdb architecture. +Idempotent — safe to run multiple times (uses INSERT OR REPLACE). + +Usage: + python scripts/migrate_registry_to_duckdb.py [--from-md docs/data_description.md] [--from-json path/to/table_registry.json] +""" + +import argparse +import json +import logging +import os +import re +import sys +from pathlib import Path + +import yaml + +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") +logger = logging.getLogger(__name__) + +# Add project root to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +def _parse_table_id(table_id: str, default_source: str) -> dict: + """Infer source_type, bucket, source_table from table ID. + + Keboola: 'in.c-crm.company' → bucket='in.c-crm', source_table='company' + BigQuery: 'project.dataset.table' → bucket='dataset', source_table='table' + """ + parts = table_id.rsplit(".", 1) + if len(parts) == 2: + return {"bucket": parts[0], "source_table": parts[1]} + return {"bucket": "", "source_table": table_id} + + +def migrate_from_markdown(md_path: Path, source_type: str) -> list[dict]: + """Parse data_description.md and return table configs.""" + content = md_path.read_text() + yaml_blocks = re.findall(r"```yaml\n(.*?)```", content, re.DOTALL) + + tables = [] + for block in yaml_blocks: + data = yaml.safe_load(block) + if data and "tables" in data: + tables.extend(data["tables"]) + + configs = [] + for t in tables: + parsed = _parse_table_id(t.get("id", t.get("name", "")), source_type) + configs.append({ + "id": t.get("id", t.get("name", "")), + "name": t.get("name", ""), + "source_type": source_type, + "bucket": parsed["bucket"], + "source_table": parsed["source_table"], + "sync_strategy": t.get("sync_strategy", "full_refresh"), + "query_mode": t.get("query_mode", "local"), + "sync_schedule": t.get("sync_schedule"), + "profile_after_sync": t.get("profile_after_sync", True), + "primary_key": t.get("primary_key"), + "folder": t.get("folder"), + "description": t.get("description", ""), + }) + + return configs + + +def migrate_from_json(json_path: Path, source_type: str) -> list[dict]: + """Parse table_registry.json and return table configs.""" + data = json.loads(json_path.read_text()) + tables = data.get("tables", []) + + configs = [] + for t in tables: + parsed = _parse_table_id(t.get("id", t.get("name", "")), source_type) + configs.append({ + "id": t.get("id", t.get("name", "")), + "name": t.get("name", ""), + "source_type": source_type, + "bucket": parsed["bucket"], + "source_table": parsed["source_table"], + "sync_strategy": t.get("sync_strategy", "full_refresh"), + "query_mode": t.get("query_mode", "local"), + "sync_schedule": t.get("sync_schedule"), + "profile_after_sync": t.get("profile_after_sync", True), + "primary_key": t.get("primary_key"), + "folder": t.get("folder"), + "description": t.get("description", ""), + }) + + return configs + + +def write_to_duckdb(configs: list[dict]) -> int: + """Write table configs to DuckDB table_registry. Returns count.""" + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + + conn = get_system_db() + try: + repo = TableRegistryRepository(conn) + count = 0 + for c in configs: + repo.register( + id=c["id"], + name=c["name"], + source_type=c["source_type"], + bucket=c["bucket"], + source_table=c["source_table"], + sync_strategy=c["sync_strategy"], + query_mode=c["query_mode"], + sync_schedule=c["sync_schedule"], + profile_after_sync=c["profile_after_sync"], + primary_key=c["primary_key"], + folder=c["folder"], + description=c["description"], + registered_by="migration", + ) + count += 1 + logger.info(" Registered: %s (%s)", c["name"], c["id"]) + return count + finally: + conn.close() + + +def main(): + parser = argparse.ArgumentParser(description="Migrate table registry to DuckDB") + parser.add_argument("--from-md", type=Path, help="Path to data_description.md") + parser.add_argument("--from-json", type=Path, help="Path to table_registry.json") + parser.add_argument("--source-type", default=None, help="Override source type (keboola, bigquery)") + args = parser.parse_args() + + # Detect source type from instance.yaml if not specified + source_type = args.source_type + if not source_type: + try: + from app.instance_config import get_data_source_type + source_type = get_data_source_type() + except Exception: + source_type = "keboola" + logger.info("Detected source type: %s", source_type) + + configs = [] + if args.from_md: + if not args.from_md.exists(): + logger.error("File not found: %s", args.from_md) + sys.exit(1) + configs = migrate_from_markdown(args.from_md, source_type) + logger.info("Parsed %d tables from %s", len(configs), args.from_md) + + elif args.from_json: + if not args.from_json.exists(): + logger.error("File not found: %s", args.from_json) + sys.exit(1) + configs = migrate_from_json(args.from_json, source_type) + logger.info("Parsed %d tables from %s", len(configs), args.from_json) + + else: + # Auto-detect: try JSON first, then markdown + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + json_path = data_dir / "src_data" / "metadata" / "table_registry.json" + md_path = Path("docs/data_description.md") + + if json_path.exists(): + configs = migrate_from_json(json_path, source_type) + logger.info("Auto-detected: parsed %d tables from %s", len(configs), json_path) + elif md_path.exists(): + configs = migrate_from_markdown(md_path, source_type) + logger.info("Auto-detected: parsed %d tables from %s", len(configs), md_path) + else: + logger.error("No source found. Use --from-md or --from-json") + sys.exit(1) + + if not configs: + logger.warning("No tables found to migrate") + sys.exit(0) + + count = write_to_duckdb(configs) + logger.info("Migration complete: %d tables written to DuckDB table_registry", count) + + +if __name__ == "__main__": + main()