From 1bf97c725c2c819b676c1dc4e3f40e3c190101b3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 30 Mar 2026 20:16:33 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20wire=20orchestrator=20into=20API=20?= =?UTF-8?q?=E2=80=94=20replace=20DataSyncManager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sync.py: _run_sync() now calls extractor + SyncOrchestrator.rebuild() data.py: parquet lookup searches /data/extracts/ first, legacy fallback catalog.py: list tables from DuckDB table_registry instead of src.config admin.py: discover-tables uses KeboolaClient directly, remove old TableRegistry dep --- app/api/admin.py | 29 ++++++++++++----------- app/api/catalog.py | 34 +++++++++++++-------------- app/api/data.py | 15 ++++++++---- app/api/sync.py | 57 ++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 88 insertions(+), 47 deletions(-) diff --git a/app/api/admin.py b/app/api/admin.py index 45199c9..a5968bc 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -48,12 +48,20 @@ async def discover_tables( ): """Discover all available tables from the configured data source.""" try: - from src.data_sync import create_data_source - source = create_data_source() - tables = source.discover_tables() - return {"tables": tables, "count": len(tables), "source": source.get_source_name()} - except ImportError: - return {"tables": [], "count": 0, "error": "Data source not configured"} + from app.instance_config import get_data_source_type + source_type = get_data_source_type() + + if source_type == "keboola": + from connectors.keboola.client import KeboolaClient + import os + from app.instance_config import get_value + url = get_value("keboola", "url", default="") + token = os.environ.get(get_value("keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN"), "") + client = KeboolaClient(token=token, url=url) + tables = client.discover_all_tables() + return {"tables": tables, "count": len(tables), "source": "keboola"} + else: + return {"tables": [], "count": 0, "source": source_type, "error": "Discovery not implemented for this source"} except Exception as e: raise HTTPException(status_code=500, detail=f"Discovery failed: {e}") @@ -98,15 +106,6 @@ async def register_table( profile_after_sync=request.profile_after_sync, ) - # Regenerate data_description.md if table_registry module supports it - try: - from src.table_registry import TableRegistry - tr = TableRegistry() - tr.generate_data_description_md() - logger.info(f"Regenerated data_description.md after registering {table_id}") - except Exception as e: - logger.warning(f"Could not regenerate data_description.md: {e}") - return {"id": table_id, "name": request.name, "status": "registered"} diff --git a/app/api/catalog.py b/app/api/catalog.py index 8b18ee3..1a7945b 100644 --- a/app/api/catalog.py +++ b/app/api/catalog.py @@ -46,24 +46,24 @@ async def get_table_profile( @router.get("/tables") async def list_catalog_tables( user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """List all available tables from data_description.md.""" - try: - from src.config import get_config - config = get_config() - tables = [] - for tc in config.tables: - tables.append({ - "id": tc.id, - "name": tc.name, - "description": tc.description, - "dataset": getattr(tc, "dataset", None), - "sync_strategy": tc.sync_strategy, - "query_mode": getattr(tc, "query_mode", "local"), - }) - return {"tables": tables, "count": len(tables)} - except Exception as e: - return {"tables": [], "count": 0, "error": str(e)} + """List all available tables from table_registry.""" + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(conn) + all_tables = repo.list_all() + tables = [ + { + "id": t["id"], + "name": t["name"], + "description": t.get("description"), + "source_type": t.get("source_type"), + "sync_strategy": t.get("sync_strategy"), + "query_mode": t.get("query_mode", "local"), + } + for t in all_tables + ] + return {"tables": tables, "count": len(tables)} @router.get("/metrics/{metric_path:path}") diff --git a/app/api/data.py b/app/api/data.py index 692f7e9..2b1b615 100644 --- a/app/api/data.py +++ b/app/api/data.py @@ -23,13 +23,18 @@ async def download_table( ): """Stream a parquet file for download. Supports ETag for caching.""" data_dir = _get_data_dir() - parquet_dir = data_dir / "src_data" / "parquet" - # Find the parquet file (may be in a subfolder) - candidates = list(parquet_dir.rglob(f"{table_id}.parquet")) + # Search in extracts directory (v2 extract.duckdb architecture) + extracts_dir = data_dir / "extracts" + candidates = list(extracts_dir.rglob(f"data/{table_id}.parquet")) if extracts_dir.exists() else [] + + # Fallback to legacy path for backward compatibility if not candidates: - # Try with folder structure: folder/table.parquet - candidates = list(parquet_dir.rglob(f"*/{table_id}.parquet")) + parquet_dir = data_dir / "src_data" / "parquet" + candidates = list(parquet_dir.rglob(f"{table_id}.parquet")) + if not candidates: + candidates = list(parquet_dir.rglob(f"*/{table_id}.parquet")) + if not candidates: raise HTTPException(status_code=404, detail=f"Table '{table_id}' not found") diff --git a/app/api/sync.py b/app/api/sync.py index 9731039..8c8f62f 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -35,18 +35,55 @@ def _get_data_dir() -> Path: def _run_sync(tables: Optional[List[str]] = None): - """Run DataSyncManager in background. Called by trigger endpoint.""" + """Run extractor + orchestrator in background. Called by trigger endpoint.""" try: - from src.data_sync import DataSyncManager - manager = DataSyncManager() - if tables: - for t in tables: - manager.sync_table(t) + from app.instance_config import get_data_source_type, get_value + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + from src.orchestrator import SyncOrchestrator + + source_type = get_data_source_type() + data_dir = _get_data_dir() + + # Get table configs from registry + sys_conn = get_system_db() + try: + repo = TableRegistryRepository(sys_conn) + if tables: + all_configs = [repo.get(t) for t in tables] + table_configs = [c for c in all_configs if c is not None] + else: + table_configs = repo.list_by_source(source_type) if source_type else repo.list_all() + finally: + sys_conn.close() + + # Run appropriate extractor + if source_type == "keboola": + from connectors.keboola.extractor import run as keboola_run + kbc_url = get_value("keboola", "url", default="") + kbc_token = os.environ.get( + get_value("keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN"), "" + ) + output = str(data_dir / "extracts" / "keboola") + result = keboola_run(output, table_configs, kbc_url, kbc_token) + logger.info("Keboola extraction: %s", result) + + elif source_type == "bigquery": + from connectors.bigquery.extractor import init_extract as bq_init + project_id = get_value("bigquery", "project_id", default="") + output = str(data_dir / "extracts" / "bigquery") + result = bq_init(output, project_id, table_configs) + logger.info("BigQuery extract init: %s", result) + else: - manager.sync_all() - logger.info("Data sync completed successfully") - except ImportError as e: - logger.warning(f"DataSyncManager not available: {e}") + logger.warning("Unknown data source type: %s", source_type) + return + + # Rebuild master views + orch = SyncOrchestrator() + views = orch.rebuild() + logger.info("Orchestrator rebuild: %s", {k: len(v) for k, v in views.items()}) + except Exception as e: logger.error(f"Data sync failed: {e}\n{traceback.format_exc()}")