diff --git a/app/api/sync.py b/app/api/sync.py index 5121836..67d688d 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -36,55 +36,41 @@ def _get_data_dir() -> Path: def _run_sync(tables: Optional[List[str]] = None): - """Run extractor + orchestrator in background. Called by trigger endpoint.""" + """Run extractor as subprocess + orchestrator rebuild. + + Extractor runs in a separate process to avoid DuckDB lock conflicts + with the main API process. Orchestrator rebuild runs in-process after + the extractor finishes (it only reads extract.duckdb files). + """ + import subprocess + import sys + try: - from app.instance_config import get_data_source_type, get_value - from src.db import get_system_db - from src.repositories.table_registry import TableRegistryRepository + # Run extractor as subprocess — completely separate DuckDB connections + env = {**os.environ} + cmd = [sys.executable, "-m", "connectors.keboola.extractor"] + logger.info("Starting extractor subprocess: %s", " ".join(cmd)) + + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=600, env=env, + cwd=str(Path(__file__).parent.parent.parent), # project root + ) + + if result.stdout: + logger.info("Extractor stdout: %s", result.stdout[-500:]) + if result.stderr: + logger.warning("Extractor stderr: %s", result.stderr[-500:]) + if result.returncode != 0: + logger.error("Extractor failed (exit %d)", result.returncode) + + # Rebuild master views (reads extract.duckdb files, no write conflict) from src.orchestrator import SyncOrchestrator - - source_type = get_data_source_type() - data_dir = _get_data_dir() - - # Get table configs from registry - sys_conn = get_system_db() - try: - repo = TableRegistryRepository(sys_conn) - if tables: - all_configs = [repo.get(t) for t in tables] - table_configs = [c for c in all_configs if c is not None] - else: - table_configs = repo.list_by_source(source_type) if source_type else repo.list_all() - finally: - sys_conn.close() - - # Run appropriate extractor - if source_type == "keboola": - from connectors.keboola.extractor import run as keboola_run - kbc_url = get_value("keboola", "url", default="") - kbc_token = os.environ.get( - get_value("keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN"), "" - ) - output = str(data_dir / "extracts" / "keboola") - result = keboola_run(output, table_configs, kbc_url, kbc_token) - logger.info("Keboola extraction: %s", result) - - elif source_type == "bigquery": - from connectors.bigquery.extractor import init_extract as bq_init - project_id = get_value("bigquery", "project_id", default="") - output = str(data_dir / "extracts" / "bigquery") - result = bq_init(output, project_id, table_configs) - logger.info("BigQuery extract init: %s", result) - - else: - logger.warning("Unknown data source type: %s", source_type) - return - - # Rebuild master views orch = SyncOrchestrator() views = orch.rebuild() logger.info("Orchestrator rebuild: %s", {k: len(v) for k, v in views.items()}) + except subprocess.TimeoutExpired: + logger.error("Extractor timed out after 600s") except Exception as e: logger.error(f"Data sync failed: {e}\n{traceback.format_exc()}") diff --git a/connectors/keboola/extractor.py b/connectors/keboola/extractor.py index b11b742..b5e4d78 100644 --- a/connectors/keboola/extractor.py +++ b/connectors/keboola/extractor.py @@ -168,16 +168,37 @@ def _extract_via_legacy( if __name__ == "__main__": - """Standalone: reads config from instance.yaml + table_registry, runs extraction.""" - from config.loader import load_instance_config + """Standalone: reads config from env + table_registry, runs extraction. + + Used by sync trigger subprocess. Reads KEBOOLA_STORAGE_TOKEN and + KEBOOLA_STACK_URL from environment, table list from DuckDB registry. + """ + import logging as _logging + _logging.basicConfig(level=_logging.INFO, format="%(levelname)s: %(message)s") + + # Read Keboola credentials — env first, then instance.yaml fallback + url = os.environ.get("KEBOOLA_STACK_URL", "") + token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "") + + if not url or not token: + try: + from config.loader import load_instance_config + config = load_instance_config() + kbc_config = config.get("keboola", {}) + url = url or kbc_config.get("url", "") + token_env = kbc_config.get("token_env", "KEBOOLA_STORAGE_TOKEN") + token = token or os.environ.get(token_env, "") + except Exception: + pass + + if not url or not token: + logger.error("Missing KEBOOLA_STACK_URL or KEBOOLA_STORAGE_TOKEN") + exit(1) + + # Read table list from registry from src.db import get_system_db from src.repositories.table_registry import TableRegistryRepository - config = load_instance_config() - kbc_config = config.get("keboola", {}) - url = kbc_config.get("url", "") - token = os.environ.get(kbc_config.get("token_env", "KEBOOLA_STORAGE_TOKEN"), "") - sys_conn = get_system_db() try: repo = TableRegistryRepository(sys_conn) @@ -187,7 +208,12 @@ if __name__ == "__main__": if not tables: logger.warning("No Keboola tables registered in table_registry") - else: - data_dir = Path(os.environ.get("DATA_DIR", "./data")) - result = run(str(data_dir / "extracts" / "keboola"), tables, url, token) - logger.info("Extraction complete: %s", result) + exit(0) + + logger.info("Extracting %d tables from %s", len(tables), url) + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + result = run(str(data_dir / "extracts" / "keboola"), tables, url, token) + logger.info("Extraction complete: %s", result) + + failed = result.get("tables_failed", 0) + exit(1 if failed == len(tables) else 0) # exit 1 only if ALL tables failed diff --git a/src/db.py b/src/db.py index eec41ea..a54bfaa 100644 --- a/src/db.py +++ b/src/db.py @@ -197,6 +197,11 @@ def get_system_db() -> duckdb.DuckDBPyConnection: _system_db_conn = duckdb.connect(db_path) _system_db_path = db_path _ensure_schema(_system_db_conn) + # WAL mode: allows concurrent readers while writing + try: + _system_db_conn.execute("PRAGMA enable_wal") + except Exception: + pass # Older DuckDB versions may not support this return _system_db_conn.cursor() diff --git a/src/orchestrator.py b/src/orchestrator.py index 75477e0..62d903e 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -50,7 +50,11 @@ class SyncOrchestrator: return {} result = {} - conn = duckdb.connect(self._db_path) + # Write to temp file then rename — avoids lock conflict with query endpoint + tmp_path = self._db_path + ".tmp" + if Path(tmp_path).exists(): + Path(tmp_path).unlink() + conn = duckdb.connect(tmp_path) try: # Detach any previously attached databases (except main and temp) attached = [ @@ -84,6 +88,11 @@ class SyncOrchestrator: finally: conn.close() + # Atomic swap: replace analytics.duckdb with new version + import shutil + if Path(tmp_path).exists(): + shutil.move(tmp_path, self._db_path) + return result def _do_rebuild_source(self, source_name: str) -> List[str]: @@ -93,17 +102,26 @@ class SyncOrchestrator: logger.warning("No extract.duckdb for source %s", source_name) return [] - conn = duckdb.connect(self._db_path) + tmp_path = self._db_path + ".tmp" + if Path(tmp_path).exists(): + Path(tmp_path).unlink() + conn = duckdb.connect(tmp_path) try: # Detach if already attached try: conn.execute(f"DETACH {source_name}") except Exception: pass - return self._attach_and_create_views(conn, source_name, str(db_file)) + tables = self._attach_and_create_views(conn, source_name, str(db_file)) finally: conn.close() + import shutil + if Path(tmp_path).exists(): + shutil.move(tmp_path, self._db_path) + + return tables + def _attach_and_create_views( self, conn: duckdb.DuckDBPyConnection, source_name: str, db_path: str ) -> List[str]: