feat: add migration scripts for extract.duckdb transition

migrate_registry_to_duckdb.py: imports tables from data_description.md
or table_registry.json into DuckDB table_registry with source columns.
migrate_parquets_to_extracts.py: copies parquets to /data/extracts/
and creates extract.duckdb with _meta + views.
This commit is contained in:
ZdenekSrotyr 2026-03-30 20:21:12 +02:00
parent e058c71777
commit 8bc1fceb52
2 changed files with 320 additions and 0 deletions

View file

@ -0,0 +1,134 @@
"""Move existing parquet files to extract.duckdb directory structure.
One-time script for existing deployments. Moves parquets from
/data/src_data/parquet/ to /data/extracts/{source}/data/ and creates
extract.duckdb with _meta + views.
Usage:
python scripts/migrate_parquets_to_extracts.py [--source keboola] [--dry-run]
"""
import argparse
import logging
import os
import shutil
import sys
from datetime import datetime, timezone
from pathlib import Path
import duckdb
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
sys.path.insert(0, str(Path(__file__).parent.parent))
def migrate_parquets(source_name: str, dry_run: bool = False) -> dict:
"""Move parquets and create extract.duckdb.
Returns: {moved: int, total_bytes: int, tables: list[str]}
"""
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
old_parquet_dir = data_dir / "src_data" / "parquet"
new_extract_dir = data_dir / "extracts" / source_name
new_data_dir = new_extract_dir / "data"
if not old_parquet_dir.exists():
logger.warning("No parquet directory found at %s", old_parquet_dir)
return {"moved": 0, "total_bytes": 0, "tables": []}
parquet_files = list(old_parquet_dir.rglob("*.parquet"))
if not parquet_files:
logger.warning("No parquet files found in %s", old_parquet_dir)
return {"moved": 0, "total_bytes": 0, "tables": []}
logger.info("Found %d parquet files in %s", len(parquet_files), old_parquet_dir)
if not dry_run:
new_data_dir.mkdir(parents=True, exist_ok=True)
moved = 0
total_bytes = 0
tables = []
for pq_file in parquet_files:
table_name = pq_file.stem
size = pq_file.stat().st_size
dest = new_data_dir / pq_file.name
if dry_run:
logger.info(" [DRY RUN] Would move: %s -> %s (%d bytes)", pq_file, dest, size)
else:
# Copy instead of move to be safe — user can delete originals after verification
shutil.copy2(str(pq_file), str(dest))
logger.info(" Copied: %s -> %s (%d bytes)", pq_file.name, dest, size)
moved += 1
total_bytes += size
if table_name not in tables:
tables.append(table_name)
# Create extract.duckdb
if not dry_run and tables:
db_path = new_extract_dir / "extract.duckdb"
conn = duckdb.connect(str(db_path))
try:
conn.execute("DROP TABLE IF EXISTS _meta")
conn.execute("""CREATE TABLE _meta (
table_name VARCHAR NOT NULL,
description VARCHAR,
rows BIGINT,
size_bytes BIGINT,
extracted_at TIMESTAMP,
query_mode VARCHAR DEFAULT 'local'
)""")
now = datetime.now(timezone.utc)
for table_name in tables:
pq_path = str(new_data_dir / f"{table_name}.parquet")
if not Path(pq_path).exists():
continue
# Create view
conn.execute(
f'CREATE OR REPLACE VIEW "{table_name}" AS SELECT * FROM read_parquet(\'{pq_path}\')'
)
# Count rows
try:
rows = conn.execute(f"SELECT count(*) FROM read_parquet('{pq_path}')").fetchone()[0]
except Exception:
rows = 0
size = Path(pq_path).stat().st_size
conn.execute(
"INSERT INTO _meta VALUES (?, ?, ?, ?, ?, 'local')",
[table_name, "", rows, size, now],
)
logger.info("Created extract.duckdb at %s with %d tables", db_path, len(tables))
finally:
conn.close()
return {"moved": moved, "total_bytes": total_bytes, "tables": tables}
def main():
parser = argparse.ArgumentParser(description="Migrate parquets to extract.duckdb structure")
parser.add_argument("--source", default="keboola", help="Source name (default: keboola)")
parser.add_argument("--dry-run", action="store_true", help="Show what would be done without doing it")
args = parser.parse_args()
result = migrate_parquets(args.source, dry_run=args.dry_run)
logger.info(
"Migration %s: %d files, %d tables, %.1f MB",
"preview" if args.dry_run else "complete",
result["moved"],
len(result["tables"]),
result["total_bytes"] / 1024 / 1024,
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,186 @@
"""Migrate table registry from data_description.md or JSON to DuckDB.
One-time script for existing deployments transitioning to extract.duckdb architecture.
Idempotent safe to run multiple times (uses INSERT OR REPLACE).
Usage:
python scripts/migrate_registry_to_duckdb.py [--from-md docs/data_description.md] [--from-json path/to/table_registry.json]
"""
import argparse
import json
import logging
import os
import re
import sys
from pathlib import Path
import yaml
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
def _parse_table_id(table_id: str, default_source: str) -> dict:
"""Infer source_type, bucket, source_table from table ID.
Keboola: 'in.c-crm.company' bucket='in.c-crm', source_table='company'
BigQuery: 'project.dataset.table' bucket='dataset', source_table='table'
"""
parts = table_id.rsplit(".", 1)
if len(parts) == 2:
return {"bucket": parts[0], "source_table": parts[1]}
return {"bucket": "", "source_table": table_id}
def migrate_from_markdown(md_path: Path, source_type: str) -> list[dict]:
"""Parse data_description.md and return table configs."""
content = md_path.read_text()
yaml_blocks = re.findall(r"```yaml\n(.*?)```", content, re.DOTALL)
tables = []
for block in yaml_blocks:
data = yaml.safe_load(block)
if data and "tables" in data:
tables.extend(data["tables"])
configs = []
for t in tables:
parsed = _parse_table_id(t.get("id", t.get("name", "")), source_type)
configs.append({
"id": t.get("id", t.get("name", "")),
"name": t.get("name", ""),
"source_type": source_type,
"bucket": parsed["bucket"],
"source_table": parsed["source_table"],
"sync_strategy": t.get("sync_strategy", "full_refresh"),
"query_mode": t.get("query_mode", "local"),
"sync_schedule": t.get("sync_schedule"),
"profile_after_sync": t.get("profile_after_sync", True),
"primary_key": t.get("primary_key"),
"folder": t.get("folder"),
"description": t.get("description", ""),
})
return configs
def migrate_from_json(json_path: Path, source_type: str) -> list[dict]:
"""Parse table_registry.json and return table configs."""
data = json.loads(json_path.read_text())
tables = data.get("tables", [])
configs = []
for t in tables:
parsed = _parse_table_id(t.get("id", t.get("name", "")), source_type)
configs.append({
"id": t.get("id", t.get("name", "")),
"name": t.get("name", ""),
"source_type": source_type,
"bucket": parsed["bucket"],
"source_table": parsed["source_table"],
"sync_strategy": t.get("sync_strategy", "full_refresh"),
"query_mode": t.get("query_mode", "local"),
"sync_schedule": t.get("sync_schedule"),
"profile_after_sync": t.get("profile_after_sync", True),
"primary_key": t.get("primary_key"),
"folder": t.get("folder"),
"description": t.get("description", ""),
})
return configs
def write_to_duckdb(configs: list[dict]) -> int:
"""Write table configs to DuckDB table_registry. Returns count."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
conn = get_system_db()
try:
repo = TableRegistryRepository(conn)
count = 0
for c in configs:
repo.register(
id=c["id"],
name=c["name"],
source_type=c["source_type"],
bucket=c["bucket"],
source_table=c["source_table"],
sync_strategy=c["sync_strategy"],
query_mode=c["query_mode"],
sync_schedule=c["sync_schedule"],
profile_after_sync=c["profile_after_sync"],
primary_key=c["primary_key"],
folder=c["folder"],
description=c["description"],
registered_by="migration",
)
count += 1
logger.info(" Registered: %s (%s)", c["name"], c["id"])
return count
finally:
conn.close()
def main():
parser = argparse.ArgumentParser(description="Migrate table registry to DuckDB")
parser.add_argument("--from-md", type=Path, help="Path to data_description.md")
parser.add_argument("--from-json", type=Path, help="Path to table_registry.json")
parser.add_argument("--source-type", default=None, help="Override source type (keboola, bigquery)")
args = parser.parse_args()
# Detect source type from instance.yaml if not specified
source_type = args.source_type
if not source_type:
try:
from app.instance_config import get_data_source_type
source_type = get_data_source_type()
except Exception:
source_type = "keboola"
logger.info("Detected source type: %s", source_type)
configs = []
if args.from_md:
if not args.from_md.exists():
logger.error("File not found: %s", args.from_md)
sys.exit(1)
configs = migrate_from_markdown(args.from_md, source_type)
logger.info("Parsed %d tables from %s", len(configs), args.from_md)
elif args.from_json:
if not args.from_json.exists():
logger.error("File not found: %s", args.from_json)
sys.exit(1)
configs = migrate_from_json(args.from_json, source_type)
logger.info("Parsed %d tables from %s", len(configs), args.from_json)
else:
# Auto-detect: try JSON first, then markdown
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
json_path = data_dir / "src_data" / "metadata" / "table_registry.json"
md_path = Path("docs/data_description.md")
if json_path.exists():
configs = migrate_from_json(json_path, source_type)
logger.info("Auto-detected: parsed %d tables from %s", len(configs), json_path)
elif md_path.exists():
configs = migrate_from_markdown(md_path, source_type)
logger.info("Auto-detected: parsed %d tables from %s", len(configs), md_path)
else:
logger.error("No source found. Use --from-md or --from-json")
sys.exit(1)
if not configs:
logger.warning("No tables found to migrate")
sys.exit(0)
count = write_to_duckdb(configs)
logger.info("Migration complete: %d tables written to DuckDB table_registry", count)
if __name__ == "__main__":
main()