From 7612385ed6ec0c290c665d7965d4872708da4b9b Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 31 Mar 2026 13:57:02 +0200 Subject: [PATCH] fix: extractor subprocess reads table configs via stdin, not DuckDB Subprocess cannot open system.duckdb (main process holds lock). Now main process reads table_registry and passes configs as JSON via stdin to subprocess. Subprocess never touches system.duckdb. --- app/api/sync.py | 70 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/app/api/sync.py b/app/api/sync.py index 790d489..6bf9213 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -38,26 +38,76 @@ def _get_data_dir() -> Path: def _run_sync(tables: Optional[List[str]] = None): """Run extractor as subprocess + orchestrator rebuild. - Extractor runs in a separate process to avoid DuckDB lock conflicts - with the main API process. Orchestrator rebuild runs in-process after - the extractor finishes (it only reads extract.duckdb files). + Reads table configs from DuckDB (in main process which has the shared + connection), passes them as JSON via stdin to the extractor subprocess. + This avoids DuckDB lock conflicts — subprocess never opens system.duckdb. """ + import json as _json import subprocess import sys try: - # Run extractor as subprocess — completely separate DuckDB connections + from app.instance_config import get_data_source_type, get_value + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + + source_type = get_data_source_type() + data_dir = _get_data_dir() + + # Read table configs in main process (has shared DuckDB connection) + sys_conn = get_system_db() + try: + repo = TableRegistryRepository(sys_conn) + if tables: + all_configs = [repo.get(t) for t in tables] + table_configs = [c for c in all_configs if c is not None] + else: + table_configs = repo.list_by_source(source_type) if source_type else repo.list_all() + finally: + sys_conn.close() + + if not table_configs: + logger.warning("No tables to sync for source_type=%s", source_type) + return + + # Serialize configs — strip non-serializable fields + serializable = [] + for tc in table_configs: + serializable.append({k: (v.isoformat() if hasattr(v, 'isoformat') else v) + for k, v in tc.items() if v is not None}) + + # Run extractor subprocess with table configs via stdin + # Subprocess does NOT open system.duckdb — no lock conflict env = {**os.environ} - cmd = [sys.executable, "-m", "connectors.keboola.extractor"] - logger.info("Starting extractor subprocess: %s", " ".join(cmd)) + cmd = [sys.executable, "-c", """ +import json, sys, os, logging +from pathlib import Path +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + +configs = json.load(sys.stdin) +url = os.environ.get("KEBOOLA_STACK_URL", "") +token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "") + +if not url or not token: + print("ERROR: Missing KEBOOLA_STACK_URL or KEBOOLA_STORAGE_TOKEN", file=sys.stderr) + sys.exit(1) + +from connectors.keboola.extractor import run +data_dir = Path(os.environ.get("DATA_DIR", "./data")) +result = run(str(data_dir / "extracts" / "keboola"), configs, url, token) +print(json.dumps(result)) +"""] + + logger.info("Starting extractor subprocess for %d tables", len(table_configs)) result = subprocess.run( - cmd, capture_output=True, text=True, timeout=1800, env=env, - cwd=str(Path(__file__).parent.parent.parent), # project root + cmd, input=_json.dumps(serializable), capture_output=True, text=True, + timeout=1800, env=env, + cwd=str(Path(__file__).parent.parent.parent), ) if result.stdout: - logger.info("Extractor stdout: %s", result.stdout[-500:]) + logger.info("Extractor result: %s", result.stdout.strip()[-500:]) if result.stderr: logger.warning("Extractor stderr: %s", result.stderr[-500:]) if result.returncode != 0: @@ -70,7 +120,7 @@ def _run_sync(tables: Optional[List[str]] = None): logger.info("Orchestrator rebuild: %s", {k: len(v) for k, v in views.items()}) except subprocess.TimeoutExpired: - logger.error("Extractor timed out after 600s") + logger.error("Extractor timed out after 1800s") except Exception as e: logger.error(f"Data sync failed: {e}\n{traceback.format_exc()}")