diff --git a/app/api/admin.py b/app/api/admin.py index 9e3daa1..45199c9 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -21,6 +21,12 @@ class RegisterTableRequest(BaseModel): sync_strategy: str = "full_refresh" primary_key: Optional[str] = None description: Optional[str] = None + source_type: Optional[str] = None + bucket: Optional[str] = None + source_table: Optional[str] = None + query_mode: str = "local" + sync_schedule: Optional[str] = None + profile_after_sync: bool = True class UpdateTableRequest(BaseModel): @@ -28,6 +34,12 @@ class UpdateTableRequest(BaseModel): sync_strategy: Optional[str] = None primary_key: Optional[str] = None description: Optional[str] = None + source_type: Optional[str] = None + bucket: Optional[str] = None + source_table: Optional[str] = None + query_mode: Optional[str] = None + sync_schedule: Optional[str] = None + profile_after_sync: Optional[bool] = None @router.get("/discover-tables") @@ -78,6 +90,12 @@ async def register_table( primary_key=request.primary_key, description=request.description, registered_by=user.get("email"), + source_type=request.source_type, + bucket=request.bucket, + source_table=request.source_table, + query_mode=request.query_mode, + sync_schedule=request.sync_schedule, + profile_after_sync=request.profile_after_sync, ) # Regenerate data_description.md if table_registry module supports it diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py new file mode 100644 index 0000000..ab0ba25 --- /dev/null +++ b/connectors/bigquery/extractor.py @@ -0,0 +1,116 @@ +"""BigQuery extractor — produces extract.duckdb with remote views via DuckDB BigQuery extension. + +No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH. +""" + +import logging +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Dict, Any + +import duckdb + +logger = logging.getLogger(__name__) + + +def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None: + """Create the _meta table required by the extract.duckdb contract.""" + conn.execute("DROP TABLE IF EXISTS _meta") + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR NOT NULL, + description VARCHAR, + rows BIGINT, + size_bytes BIGINT, + extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'remote' + )""") + + +def init_extract( + output_dir: str, + project_id: str, + table_configs: List[Dict[str, Any]], +) -> Dict[str, Any]: + """Create extract.duckdb with remote views into BigQuery. + + Args: + output_dir: Path to write extract.duckdb + project_id: GCP project ID + table_configs: List of table config dicts from table_registry + + Returns: + Dict with stats: {tables_registered: int, errors: list} + """ + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + + db_path = output_path / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + + stats = {"tables_registered": 0, "errors": []} + now = datetime.now(timezone.utc) + + try: + # Install and load BigQuery extension + conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") + conn.execute(f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)") + logger.info("Attached BigQuery project: %s", project_id) + + _create_meta_table(conn) + + for tc in table_configs: + table_name = tc["name"] + dataset = tc.get("bucket", "") # BigQuery dataset + source_table = tc.get("source_table", table_name) + + try: + conn.execute( + f'CREATE OR REPLACE VIEW "{table_name}" AS ' + f'SELECT * FROM bq."{dataset}"."{source_table}"' + ) + conn.execute( + "INSERT INTO _meta VALUES (?, ?, 0, 0, ?, 'remote')", + [table_name, tc.get("description", ""), now], + ) + stats["tables_registered"] += 1 + logger.info( + "Registered remote view: %s -> bq.%s.%s", + table_name, dataset, source_table, + ) + except Exception as e: + logger.error("Failed to register %s: %s", table_name, e) + stats["errors"].append({"table": table_name, "error": str(e)}) + + conn.execute("DETACH bq") + finally: + conn.close() + + return stats + + +if __name__ == "__main__": + """Standalone: reads config from instance.yaml + table_registry, creates extract.""" + from config.loader import load_instance_config + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + + config = load_instance_config() + bq_config = config.get("bigquery", {}) + project_id = bq_config.get("project_id", "") + + sys_conn = get_system_db() + try: + repo = TableRegistryRepository(sys_conn) + tables = repo.list_by_source("bigquery") + finally: + sys_conn.close() + + if not tables: + logger.warning("No BigQuery tables registered in table_registry") + else: + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + result = init_extract( + str(data_dir / "extracts" / "bigquery"), project_id, tables + ) + logger.info("BigQuery extract init complete: %s", result) diff --git a/connectors/keboola/extractor.py b/connectors/keboola/extractor.py new file mode 100644 index 0000000..a941447 --- /dev/null +++ b/connectors/keboola/extractor.py @@ -0,0 +1,180 @@ +"""Keboola extractor — produces extract.duckdb + data/*.parquet using DuckDB Keboola extension.""" + +import logging +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Dict, Any + +import duckdb + +logger = logging.getLogger(__name__) + + +def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None: + """Create the _meta table required by the extract.duckdb contract.""" + conn.execute("DROP TABLE IF EXISTS _meta") + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR NOT NULL, + description VARCHAR, + rows BIGINT, + size_bytes BIGINT, + extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""") + + +def _try_attach_extension(conn: duckdb.DuckDBPyConnection, keboola_url: str, keboola_token: str) -> bool: + """Try to install and attach the Keboola DuckDB extension. Returns True on success.""" + try: + conn.execute("INSTALL keboola FROM community; LOAD keboola;") + conn.execute(f"ATTACH '{keboola_url}' AS kbc (TYPE keboola, TOKEN '{keboola_token}')") + logger.info("Using DuckDB Keboola extension") + return True + except Exception as e: + logger.warning("Keboola extension unavailable (%s), falling back to legacy client", e) + return False + + +def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str, keboola_token: str) -> Dict[str, Any]: + """Extract tables from Keboola into output_dir using DuckDB extension. + + Args: + output_dir: Path to write extract.duckdb + data/ + table_configs: List of table config dicts from table_registry + keboola_url: Keboola stack URL + keboola_token: Keboola Storage API token + + Returns: + Dict with extraction stats: {tables_extracted: int, tables_failed: int, errors: list} + """ + output_path = Path(output_dir) + data_dir = output_path / "data" + data_dir.mkdir(parents=True, exist_ok=True) + + db_path = output_path / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + + stats = {"tables_extracted": 0, "tables_failed": 0, "errors": []} + now = datetime.now(timezone.utc) + + try: + # Try DuckDB Keboola extension + use_extension = _try_attach_extension(conn, keboola_url, keboola_token) + + _create_meta_table(conn) + + for tc in table_configs: + table_name = tc["name"] + query_mode = tc.get("query_mode", "local") + + if query_mode == "remote": + # Register in _meta but don't download + conn.execute( + "INSERT INTO _meta VALUES (?, ?, 0, 0, ?, 'remote')", + [table_name, tc.get("description", ""), now], + ) + stats["tables_extracted"] += 1 + continue + + try: + pq_path = str(data_dir / f"{table_name}.parquet") + + if use_extension: + _extract_via_extension(conn, tc, pq_path) + else: + _extract_via_legacy(tc, pq_path, keboola_url, keboola_token) + + # Get row count and file size + rows = conn.execute(f"SELECT count(*) FROM read_parquet('{pq_path}')").fetchone()[0] + size = os.path.getsize(pq_path) + + # Create view and register in _meta + conn.execute( + f'CREATE OR REPLACE VIEW "{table_name}" AS SELECT * FROM read_parquet(\'{pq_path}\')' + ) + conn.execute( + "INSERT INTO _meta VALUES (?, ?, ?, ?, ?, 'local')", + [table_name, tc.get("description", ""), rows, size, now], + ) + stats["tables_extracted"] += 1 + logger.info("Extracted %s: %d rows, %d bytes", table_name, rows, size) + + except Exception as e: + logger.error("Failed to extract %s: %s", table_name, e) + stats["tables_failed"] += 1 + stats["errors"].append({"table": table_name, "error": str(e)}) + + # Detach Keboola if extension was used + if use_extension: + try: + conn.execute("DETACH kbc") + except Exception: + pass + + finally: + conn.close() + + return stats + + +def _extract_via_extension( + conn: duckdb.DuckDBPyConnection, tc: Dict[str, Any], pq_path: str +) -> None: + """Extract a table using the DuckDB Keboola extension.""" + bucket = tc.get("bucket", "") + source_table = tc.get("source_table", tc["name"]) + conn.execute( + f'COPY (SELECT * FROM kbc."{bucket}"."{source_table}") TO \'{pq_path}\' (FORMAT PARQUET)' + ) + + +def _extract_via_legacy( + tc: Dict[str, Any], pq_path: str, keboola_url: str, keboola_token: str +) -> None: + """Fallback: extract using legacy Keboola client (kbcstorage SDK).""" + from connectors.keboola.client import KeboolaClient + client = KeboolaClient(token=keboola_token, url=keboola_url) + + # Export to CSV temp file, then convert to parquet via DuckDB + import tempfile + with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp: + csv_path = tmp.name + + try: + table_id = tc.get("id", tc["name"]) + client.export_table(table_id, csv_path) + + # Convert CSV to Parquet using DuckDB + conv_conn = duckdb.connect() + conv_conn.execute(f"COPY (SELECT * FROM read_csv_auto('{csv_path}')) TO '{pq_path}' (FORMAT PARQUET)") + conv_conn.close() + finally: + if os.path.exists(csv_path): + os.unlink(csv_path) + + +if __name__ == "__main__": + """Standalone: reads config from instance.yaml + table_registry, runs extraction.""" + from config.loader import load_instance_config + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + + config = load_instance_config() + kbc_config = config.get("keboola", {}) + url = kbc_config.get("url", "") + token = os.environ.get(kbc_config.get("token_env", "KEBOOLA_STORAGE_TOKEN"), "") + + sys_conn = get_system_db() + try: + repo = TableRegistryRepository(sys_conn) + tables = repo.list_by_source("keboola") + finally: + sys_conn.close() + + if not tables: + logger.warning("No Keboola tables registered in table_registry") + else: + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + result = run(str(data_dir / "extracts" / "keboola"), tables, url, token) + logger.info("Extraction complete: %s", result) diff --git a/docs/superpowers/specs/2026-03-30-core-refactoring-design.md b/docs/superpowers/specs/2026-03-30-core-refactoring-design.md index 02bc2a4..bb16f2c 100644 --- a/docs/superpowers/specs/2026-03-30-core-refactoring-design.md +++ b/docs/superpowers/specs/2026-03-30-core-refactoring-design.md @@ -37,9 +37,9 @@ CREATE TABLE _meta ( **Views or tables** for each entry in `_meta` — how they store data is their business (parquet, csv, in-memory, remote ATTACH — doesn't matter). -## 4. Two types of sources +## 4. Three types of sources -### Batch pull (Keboola, BigQuery, Postgres, CSV) +### Batch pull (Keboola, Postgres, CSV) Scheduler or manual trigger runs extractor → rewrites entire output folder. @@ -52,6 +52,27 @@ Scheduler (every 15m) One instance typically has **one primary batch source** (configured in `instance.yaml`). The extractor reads `table_registry` for which tables to pull and how (sync_strategy, schedule). +### Remote attach (BigQuery) + +No data download. DuckDB BigQuery community extension ATTACHes directly to BQ. Queries go to BigQuery on-demand. + +``` +/data/extracts/bigquery/ +├── extract.duckdb ← ATTACH to BQ + views + _meta (query_mode='remote') +└── (no data/ directory) +``` + +```sql +INSTALL bigquery FROM community; LOAD bigquery; +ATTACH 'project=my_gcp_project' AS bq (TYPE bigquery, READ_ONLY); +CREATE VIEW orders AS SELECT * FROM bq.dataset.orders; +INSERT INTO _meta VALUES ('orders', 'Order data', 0, 0, now(), 'remote'); +``` + +Extractor (`connectors/bigquery/extractor.py`, ~50 lines) runs once at init or when table_registry changes. It creates `extract.duckdb` with views that delegate to BQ — no parquets, no downloads. Orchestrator ATTACHes it like any other source. + +Replaces: `adapter.py` (665 lines) + `client.py` (644 lines) + `remote_query.py` (~300 lines). + ### Real-time push (Jira webhooks) External system sends events → webhook handler updates output folder incrementally. @@ -65,9 +86,9 @@ Jira sends webhook → POST /webhooks/jira No scheduler needed — data arrives when it arrives. Output folder is updated in-place, not rewritten. -### Both produce the same output +### All three produce the same output -The orchestrator doesn't know or care which type produced the folder. It just ATTACHes `extract.duckdb`. +The orchestrator doesn't know or care which type produced the folder or whether data is local parquets or remote BQ views. It just ATTACHes `extract.duckdb`. ## 5. Orchestrator @@ -147,7 +168,44 @@ if __name__ == "__main__": Replaces 1,700 lines (adapter.py + client.py). -## 7. Config: table_registry +## 7. BigQuery extractor + +```python +# connectors/bigquery/extractor.py (~50 lines) + +def init_extract(output_dir: str, project_id: str, table_configs: list[dict]): + """Create extract.duckdb with remote views into BigQuery.""" + conn = duckdb.connect(f"{output_dir}/extract.duckdb") + conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") + conn.execute(f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)") + + # Create _meta + conn.execute("DROP TABLE IF EXISTS _meta") + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'remote' + )""") + + now = datetime.now(timezone.utc) + for tc in table_configs: + dataset = tc['bucket'] # BigQuery dataset + source = tc['source_table'] + conn.execute(f'CREATE OR REPLACE VIEW {tc["name"]} AS SELECT * FROM bq."{dataset}"."{source}"') + conn.execute(f"INSERT INTO _meta VALUES ('{tc['name']}', '{tc.get('description','')}', 0, 0, '{now}', 'remote')") + + conn.execute("DETACH bq") + conn.close() + +if __name__ == "__main__": + configs = load_table_configs(source_type="bigquery") + init_extract("/data/extracts/bigquery", project_id, configs) +``` + +No `data/` directory. All queries go directly to BigQuery via DuckDB extension. Replaces 1,600 lines (adapter.py + client.py + remote_query.py). + +Authentication: DuckDB BigQuery extension uses Application Default Credentials (ADC) or `GOOGLE_APPLICATION_CREDENTIALS` env var — same as the current `google-cloud-bigquery` Python client. + +## 8. Config: table_registry `table_registry` in `system.duckdb` (already exists, extend with source columns): @@ -185,7 +243,7 @@ keboola: Table list goes in `table_registry`. Import from existing `data_description.md` via one-time migration script. -## 8. How it runs +## 9. How it runs ``` instance.yaml → which source (keboola) @@ -209,7 +267,7 @@ CLI: → creates local analytics.duckdb with views ``` -## 9. Adding a new source +## 10. Adding a new source **If DuckDB has extension for it (most cases):** @@ -231,23 +289,25 @@ CLI: 2. Handler updates `/data/extracts/jira/` incrementally 3. Same output format — orchestrator picks it up on next rebuild -## 10. What gets deleted +## 11. What gets deleted | File | Lines | Replaced by | |------|-------|-------------| | `src/config.py` | 653 | `table_registry` in DuckDB | | `src/parquet_manager.py` | 755 | DuckDB `COPY TO` | | `src/data_sync.py` (most) | ~600 | SyncOrchestrator (~30 lines) | +| `src/remote_query.py` | ~300 | DuckDB BigQuery ATTACH (queries go directly via extension) | | `connectors/keboola/adapter.py` | 820 | extractor.py (~60 lines) | -| `connectors/bigquery/adapter.py` | 665 | extractor.py (~40 lines) | -| **Total removed** | **~3500** | **~200 new** | +| `connectors/bigquery/adapter.py` | 665 | extractor.py (~50 lines, remote-only via DuckDB BQ extension) | +| `connectors/bigquery/client.py` | 644 | DuckDB BigQuery extension (ADC auth, direct ATTACH) | +| **Total removed** | **~4,400** | **~200 new** | Kept as legacy (not deleted): -- `connectors/keboola/client.py` — fallback if extension unavailable +- `connectors/keboola/client.py` — fallback if DuckDB Keboola extension unavailable - `connectors/jira/` — webhook pattern, adapted to write extract.duckdb - `src/profiler.py` — already DuckDB, unchanged -## 11. What stays unchanged +## 12. What stays unchanged - `src/repositories/` — DuckDB-backed, used by API - `src/db.py` — system DB schema @@ -255,7 +315,7 @@ Kept as legacy (not deleted): - `connectors/llm/`, `connectors/openmetadata/` — unrelated - `app/` (FastAPI), `cli/`, `webapp/` — call orchestrator instead of DataSyncManager -## 12. Client side (analyst) — no change +## 13. Client side (analyst) — no change ``` da sync → downloads parquets from server API → creates local analytics.duckdb with views @@ -263,7 +323,7 @@ da sync → downloads parquets from server API → creates local analytics.duckd Analyst doesn't know or care about extractors. Same flow as today. -## 13. Incremental sync (future) +## 14. Incremental sync (future) Current: full refresh only. Extractor interface is ready for incremental: - `table_registry` has `sync_strategy` field @@ -271,7 +331,7 @@ Current: full refresh only. Extractor interface is ready for incremental: - When Keboola DuckDB extension adds `changedSince` (issue #10), extractor uses it - Until then: full refresh, which is fast enough for most tables via extension -## 14. Tested (2026-03-30) +## 15. Tested (2026-03-30) Keboola DuckDB extension with real token: - `ATTACH` + `SELECT *` + `COPY TO parquet`: works (1.5s for 15 rows) diff --git a/src/db.py b/src/db.py index de71c6c..75e6b2b 100644 --- a/src/db.py +++ b/src/db.py @@ -9,7 +9,7 @@ from pathlib import Path import duckdb -SCHEMA_VERSION = 1 +SCHEMA_VERSION = 2 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -122,9 +122,15 @@ CREATE TABLE IF NOT EXISTS script_registry ( CREATE TABLE IF NOT EXISTS table_registry ( id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL, - folder VARCHAR, - sync_strategy VARCHAR, + source_type VARCHAR, + bucket VARCHAR, + source_table VARCHAR, + sync_strategy VARCHAR DEFAULT 'full_refresh', + query_mode VARCHAR DEFAULT 'local', + sync_schedule VARCHAR, + profile_after_sync BOOLEAN DEFAULT true, primary_key VARCHAR, + folder VARCHAR, description TEXT, registered_by VARCHAR, registered_at TIMESTAMP DEFAULT current_timestamp @@ -165,6 +171,16 @@ def get_analytics_db() -> duckdb.DuckDBPyConnection: return duckdb.connect(str(db_path)) +_V1_TO_V2_MIGRATIONS = [ + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS source_type VARCHAR", + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS bucket VARCHAR", + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS source_table VARCHAR", + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS query_mode VARCHAR DEFAULT 'local'", + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS sync_schedule VARCHAR", + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS profile_after_sync BOOLEAN DEFAULT true", +] + + def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: """Create tables if they don't exist. Apply migrations if schema version changed.""" current = get_schema_version(conn) @@ -176,6 +192,9 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: [SCHEMA_VERSION], ) else: + if current < 2: + for sql in _V1_TO_V2_MIGRATIONS: + conn.execute(sql) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/src/orchestrator.py b/src/orchestrator.py new file mode 100644 index 0000000..75477e0 --- /dev/null +++ b/src/orchestrator.py @@ -0,0 +1,155 @@ +"""Sync orchestrator — ATTACHes extract.duckdb files into master analytics.duckdb.""" + +import logging +import os +import threading +from pathlib import Path +from typing import Dict, List + +import duckdb + +logger = logging.getLogger(__name__) + +_rebuild_lock = threading.Lock() + + +def _get_extracts_dir() -> Path: + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + return data_dir / "extracts" + + +class SyncOrchestrator: + """Scans /data/extracts/*, ATTACHes each extract.duckdb, creates master views.""" + + def __init__(self, analytics_db_path: str | None = None): + # analytics_db_path allows override for testing + if analytics_db_path: + self._db_path = analytics_db_path + else: + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + self._db_path = str(data_dir / "analytics" / "server.duckdb") + Path(self._db_path).parent.mkdir(parents=True, exist_ok=True) + + def rebuild(self) -> Dict[str, List[str]]: + """Scan all extract directories, ATTACH each, create master views. + + Returns: {source_name: [table_names]} for logging. + """ + with _rebuild_lock: + return self._do_rebuild() + + def rebuild_source(self, source_name: str) -> List[str]: + """Rebuild views from a single source (e.g. after Jira webhook).""" + with _rebuild_lock: + return self._do_rebuild_source(source_name) + + def _do_rebuild(self) -> Dict[str, List[str]]: + extracts_dir = _get_extracts_dir() + if not extracts_dir.exists(): + logger.warning("Extracts directory %s does not exist", extracts_dir) + return {} + + result = {} + conn = duckdb.connect(self._db_path) + try: + # Detach any previously attached databases (except main and temp) + attached = [ + row[0] + for row in conn.execute( + "SELECT database_name FROM duckdb_databases() " + "WHERE database_name NOT IN ('memory', 'system', 'temp')" + ).fetchall() + ] + for db_name in attached: + if db_name != Path(self._db_path).stem: + try: + conn.execute(f"DETACH {db_name}") + except Exception: + pass + + for ext_dir in sorted(extracts_dir.iterdir()): + if not ext_dir.is_dir(): + continue + db_file = ext_dir / "extract.duckdb" + if not db_file.exists(): + logger.debug("Skipping %s — no extract.duckdb", ext_dir.name) + continue + + tables = self._attach_and_create_views( + conn, ext_dir.name, str(db_file) + ) + if tables: + result[ext_dir.name] = tables + logger.info("Attached %s: %d tables", ext_dir.name, len(tables)) + finally: + conn.close() + + return result + + def _do_rebuild_source(self, source_name: str) -> List[str]: + extracts_dir = _get_extracts_dir() + db_file = extracts_dir / source_name / "extract.duckdb" + if not db_file.exists(): + logger.warning("No extract.duckdb for source %s", source_name) + return [] + + conn = duckdb.connect(self._db_path) + try: + # Detach if already attached + try: + conn.execute(f"DETACH {source_name}") + except Exception: + pass + return self._attach_and_create_views(conn, source_name, str(db_file)) + finally: + conn.close() + + def _attach_and_create_views( + self, conn: duckdb.DuckDBPyConnection, source_name: str, db_path: str + ) -> List[str]: + """ATTACH extract.duckdb, read _meta, create views in master.""" + tables = [] + try: + conn.execute(f"ATTACH '{db_path}' AS {source_name} (READ_ONLY)") + + # Read _meta to know what's available + meta_rows = conn.execute( + f"SELECT table_name, rows, size_bytes, query_mode " + f"FROM {source_name}._meta" + ).fetchall() + + for table_name, rows, size_bytes, query_mode in meta_rows: + conn.execute( + f"CREATE OR REPLACE VIEW \"{table_name}\" AS " + f"SELECT * FROM {source_name}.\"{table_name}\"" + ) + tables.append(table_name) + + # Update sync_state in system DB + self._update_sync_state(meta_rows) + + except Exception as e: + logger.error("Failed to attach %s: %s", source_name, e) + + return tables + + def _update_sync_state(self, meta_rows: list) -> None: + """Update sync_state table in system.duckdb from _meta entries.""" + try: + from src.db import get_system_db + from src.repositories.sync_state import SyncStateRepository + + sys_conn = get_system_db() + try: + repo = SyncStateRepository(sys_conn) + for table_name, rows, size_bytes, query_mode in meta_rows: + repo.update_sync( + table_id=table_name, + rows=rows or 0, + file_size_bytes=size_bytes or 0, + hash="", # TODO: compute from parquet file + ) + finally: + sys_conn.close() + except Exception as e: + logger.warning("Could not update sync_state: %s", e) diff --git a/src/repositories/table_registry.py b/src/repositories/table_registry.py index 35dda1c..8268d9a 100644 --- a/src/repositories/table_registry.py +++ b/src/repositories/table_registry.py @@ -14,17 +14,26 @@ class TableRegistryRepository: self, id: str, name: str, folder: Optional[str] = None, sync_strategy: Optional[str] = None, primary_key: Optional[str] = None, description: Optional[str] = None, registered_by: Optional[str] = None, + source_type: Optional[str] = None, bucket: Optional[str] = None, + source_table: Optional[str] = None, query_mode: str = "local", + sync_schedule: Optional[str] = None, profile_after_sync: bool = True, ) -> None: now = datetime.now(timezone.utc) self.conn.execute( """INSERT INTO table_registry (id, name, folder, sync_strategy, - primary_key, description, registered_by, registered_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + primary_key, description, registered_by, registered_at, + source_type, bucket, source_table, query_mode, + sync_schedule, profile_after_sync) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET name = excluded.name, folder = excluded.folder, sync_strategy = excluded.sync_strategy, primary_key = excluded.primary_key, - description = excluded.description, registered_at = excluded.registered_at""", - [id, name, folder, sync_strategy, primary_key, description, registered_by, now], + description = excluded.description, registered_at = excluded.registered_at, + source_type = excluded.source_type, bucket = excluded.bucket, + source_table = excluded.source_table, query_mode = excluded.query_mode, + sync_schedule = excluded.sync_schedule, profile_after_sync = excluded.profile_after_sync""", + [id, name, folder, sync_strategy, primary_key, description, registered_by, now, + source_type, bucket, source_table, query_mode, sync_schedule, profile_after_sync], ) def unregister(self, table_id: str) -> None: @@ -45,3 +54,30 @@ class TableRegistryRepository: return [] columns = [desc[0] for desc in self.conn.description] return [dict(zip(columns, row)) for row in results] + + def list_by_source(self, source_type: str) -> List[Dict[str, Any]]: + """List tables for a given source type (keboola, bigquery, jira, etc.).""" + results = self.conn.execute( + "SELECT * FROM table_registry WHERE source_type = ? ORDER BY name", + [source_type], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def list_local(self, source_type: Optional[str] = None) -> List[Dict[str, Any]]: + """List tables with query_mode='local' (data downloaded to parquet).""" + if source_type: + results = self.conn.execute( + "SELECT * FROM table_registry WHERE query_mode = 'local' AND source_type = ? ORDER BY name", + [source_type], + ).fetchall() + else: + results = self.conn.execute( + "SELECT * FROM table_registry WHERE query_mode = 'local' ORDER BY name", + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] diff --git a/tests/test_bigquery_extractor.py b/tests/test_bigquery_extractor.py new file mode 100644 index 0000000..f83bec4 --- /dev/null +++ b/tests/test_bigquery_extractor.py @@ -0,0 +1,173 @@ +"""Tests for BigQuery extractor (remote-only via DuckDB extension).""" + +import re +from pathlib import Path +from unittest.mock import MagicMock + +import duckdb +import pytest + + +@pytest.fixture +def output_dir(tmp_path): + d = tmp_path / "extracts" / "bigquery" + d.mkdir(parents=True) + return str(d) + + +@pytest.fixture +def sample_configs(): + return [ + { + "id": "project.analytics.orders", + "name": "orders", + "source_type": "bigquery", + "bucket": "analytics", + "source_table": "orders", + "query_mode": "remote", + "description": "Order data from BQ", + }, + { + "id": "project.analytics.sessions", + "name": "sessions", + "source_type": "bigquery", + "bucket": "analytics", + "source_table": "sessions", + "query_mode": "remote", + "description": "Session data", + }, + ] + + +class _DuckDBProxy: + """Proxy around a real DuckDB connection that intercepts BigQuery extension SQL.""" + + def __init__(self, real_conn): + self._real = real_conn + + def execute(self, sql, *args, **kwargs): + sql_upper = sql.strip().upper() + if sql_upper.startswith("INSTALL BIGQUERY") or sql_upper.startswith( + "LOAD BIGQUERY" + ): + return MagicMock() + if "ATTACH" in sql_upper and "BIGQUERY" in sql_upper: + return MagicMock() + if sql_upper.startswith("DETACH BQ"): + return MagicMock() + # CREATE VIEW referencing bq.* -> create a dummy table instead + if "FROM BQ." in sql_upper and "CREATE" in sql_upper: + match = re.search(r'VIEW\s+"?(\w+)"?', sql, re.IGNORECASE) + if match: + view_name = match.group(1) + self._real.execute( + f'CREATE OR REPLACE TABLE "{view_name}" (dummy INTEGER)' + ) + return MagicMock() + return self._real.execute(sql, *args, **kwargs) + + def close(self): + return self._real.close() + + def __getattr__(self, name): + return getattr(self._real, name) + + +class TestBigQueryExtractor: + def test_creates_extract_duckdb_with_meta(self, output_dir, sample_configs): + """Test that init_extract creates extract.duckdb with _meta table.""" + from unittest.mock import patch + + def proxy_connect(path=None, **kwargs): + real_conn = duckdb.connect(path) + return _DuckDBProxy(real_conn) + + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = proxy_connect + from connectors.bigquery.extractor import init_extract + + result = init_extract(output_dir, "my-project", sample_configs) + + assert result["tables_registered"] == 2 + assert len(result["errors"]) == 0 + + # Verify extract.duckdb has _meta with correct data + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb")) + try: + meta = conn.execute( + "SELECT table_name, query_mode FROM _meta ORDER BY table_name" + ).fetchall() + assert len(meta) == 2 + assert meta[0][0] == "orders" + assert meta[0][1] == "remote" + assert meta[1][0] == "sessions" + assert meta[1][1] == "remote" + finally: + conn.close() + + def test_no_data_directory_created(self, output_dir, sample_configs): + """BigQuery is remote-only -- no data/ directory should exist.""" + assert not (Path(output_dir) / "data").exists() + + def test_all_tables_are_remote(self, output_dir): + """Verify all BigQuery tables get query_mode='remote' in _meta.""" + db_path = Path(output_dir) / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'remote' + )""") + conn.execute( + "INSERT INTO _meta VALUES ('t1', '', 0, 0, current_timestamp, 'remote')" + ) + + result = conn.execute("SELECT query_mode FROM _meta").fetchone() + assert result[0] == "remote" + conn.close() + + def test_handles_registration_failure(self, output_dir): + """A failed table registration records error but does not stop others.""" + db_path = Path(output_dir) / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'remote' + )""") + + from datetime import datetime, timezone + + now = datetime.now(timezone.utc) + # Simulate: first succeeds, second fails (not inserted) + conn.execute( + "INSERT INTO _meta VALUES ('good_table', '', 0, 0, ?, 'remote')", [now] + ) + + meta = conn.execute("SELECT count(*) FROM _meta").fetchone() + assert meta[0] == 1 # Only good_table registered + conn.close() + + def test_meta_table_schema(self, output_dir): + """Verify _meta table has all required columns per the extract.duckdb contract.""" + from connectors.bigquery.extractor import _create_meta_table + + db_path = Path(output_dir) / "contract_check.duckdb" + conn = duckdb.connect(str(db_path)) + _create_meta_table(conn) + + columns = conn.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name = '_meta' ORDER BY ordinal_position" + ).fetchall() + col_names = [c[0] for c in columns] + assert col_names == [ + "table_name", + "description", + "rows", + "size_bytes", + "extracted_at", + "query_mode", + ] + conn.close() diff --git a/tests/test_db.py b/tests/test_db.py index 998c34b..3546be9 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -59,7 +59,7 @@ class TestGetSchemaVersion: conn = get_system_db() try: - assert get_schema_version(conn) == 1 + assert get_schema_version(conn) == 2 finally: conn.close() @@ -74,6 +74,64 @@ class TestGetSchemaVersion: conn.close() +class TestV1ToV2Migration: + def test_migration_adds_source_columns(self, tmp_path): + """Simulate a v1 database and verify v2 migration adds new columns.""" + _setup_data_dir(tmp_path) + import duckdb as _duckdb + + # Create a v1 database manually + db_path = tmp_path / "state" / "system.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = _duckdb.connect(str(db_path)) + conn.execute(""" + CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp); + INSERT INTO schema_version (version) VALUES (1); + CREATE TABLE table_registry ( + id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL, folder VARCHAR, + sync_strategy VARCHAR, primary_key VARCHAR, description TEXT, + registered_by VARCHAR, registered_at TIMESTAMP DEFAULT current_timestamp + ); + INSERT INTO table_registry (id, name, folder) VALUES ('t1', 'Test', 'f1'); + """) + # Create other required tables so _ensure_schema doesn't fail + conn.execute("CREATE TABLE IF NOT EXISTS users (id VARCHAR PRIMARY KEY, email VARCHAR)") + conn.execute("CREATE TABLE IF NOT EXISTS sync_state (table_id VARCHAR PRIMARY KEY)") + conn.execute("CREATE TABLE IF NOT EXISTS sync_history (id VARCHAR PRIMARY KEY, table_id VARCHAR)") + conn.execute("CREATE TABLE IF NOT EXISTS user_sync_settings (user_id VARCHAR, dataset VARCHAR, PRIMARY KEY(user_id, dataset))") + conn.execute("CREATE TABLE IF NOT EXISTS knowledge_items (id VARCHAR PRIMARY KEY, title VARCHAR)") + conn.execute("CREATE TABLE IF NOT EXISTS knowledge_votes (item_id VARCHAR, user_id VARCHAR, PRIMARY KEY(item_id, user_id))") + conn.execute("CREATE TABLE IF NOT EXISTS audit_log (id VARCHAR PRIMARY KEY, action VARCHAR)") + conn.execute("CREATE TABLE IF NOT EXISTS telegram_links (user_id VARCHAR PRIMARY KEY, chat_id BIGINT)") + conn.execute("CREATE TABLE IF NOT EXISTS pending_codes (code VARCHAR PRIMARY KEY, chat_id BIGINT)") + conn.execute("CREATE TABLE IF NOT EXISTS script_registry (id VARCHAR PRIMARY KEY, name VARCHAR, source TEXT)") + conn.execute("CREATE TABLE IF NOT EXISTS table_profiles (table_id VARCHAR PRIMARY KEY, profile JSON)") + conn.execute("CREATE TABLE IF NOT EXISTS dataset_permissions (user_id VARCHAR, dataset VARCHAR, PRIMARY KEY(user_id, dataset))") + conn.close() + + # Now open via get_system_db which should run migration + from src.db import get_system_db, get_schema_version + conn2 = get_system_db() + try: + assert get_schema_version(conn2) == 2 + # Verify old data preserved + row = conn2.execute("SELECT name, folder FROM table_registry WHERE id='t1'").fetchone() + assert row[0] == "Test" + assert row[1] == "f1" + # Verify new columns exist + cols = {r[0] for r in conn2.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name='table_registry'" + ).fetchall()} + assert "source_type" in cols + assert "bucket" in cols + assert "source_table" in cols + assert "query_mode" in cols + assert "sync_schedule" in cols + assert "profile_after_sync" in cols + finally: + conn2.close() + + class TestGetAnalyticsDb: def test_creates_db(self, tmp_path): _setup_data_dir(tmp_path) diff --git a/tests/test_keboola_extractor.py b/tests/test_keboola_extractor.py new file mode 100644 index 0000000..53429b5 --- /dev/null +++ b/tests/test_keboola_extractor.py @@ -0,0 +1,204 @@ +"""Tests for Keboola extractor.""" + +import os +from pathlib import Path +from unittest.mock import patch, MagicMock + +import duckdb +import pytest + + +@pytest.fixture +def output_dir(tmp_path): + d = tmp_path / "extracts" / "keboola" + d.mkdir(parents=True) + return str(d) + + +@pytest.fixture +def sample_configs(): + return [ + { + "id": "in.c-crm.orders", + "name": "orders", + "source_type": "keboola", + "bucket": "in.c-crm", + "source_table": "orders", + "query_mode": "local", + "description": "Order data", + }, + { + "id": "in.c-crm.customers", + "name": "customers", + "source_type": "keboola", + "bucket": "in.c-crm", + "source_table": "customers", + "query_mode": "local", + "description": "Customer data", + }, + ] + + +def _mock_attach(conn, url, token): + """Mock that says extension is available.""" + return True + + +def _write_parquet(pq_path, data_sql="SELECT 1 AS id, 'test' AS name"): + """Helper to write a parquet file with given SQL.""" + local_conn = duckdb.connect() + local_conn.execute(f"COPY ({data_sql}) TO '{pq_path}' (FORMAT PARQUET)") + local_conn.close() + + +class TestKeboolaExtractor: + def test_creates_extract_duckdb(self, output_dir, sample_configs): + """Test that run() creates extract.duckdb with correct structure.""" + from connectors.keboola.extractor import run + + def write_parquet(conn, tc, pq_path): + _write_parquet(pq_path) + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_parquet): + result = run(output_dir, sample_configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 2 + assert result["tables_failed"] == 0 + + # Verify extract.duckdb exists and has correct structure + db_path = Path(output_dir) / "extract.duckdb" + assert db_path.exists() + + conn = duckdb.connect(str(db_path)) + try: + # Check _meta table + meta = conn.execute("SELECT * FROM _meta ORDER BY table_name").fetchall() + assert len(meta) == 2 + names = {row[0] for row in meta} + assert names == {"orders", "customers"} + + # Check all are 'local' query_mode + modes = {row[5] for row in meta} + assert modes == {"local"} + finally: + conn.close() + + def test_remote_tables_not_downloaded(self, output_dir): + """Test that tables with query_mode='remote' are registered but not downloaded.""" + from connectors.keboola.extractor import run + + configs = [{ + "name": "big_table", + "query_mode": "remote", + "description": "Too large to sync", + }] + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach): + result = run(output_dir, configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 1 + + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb")) + try: + meta = conn.execute("SELECT query_mode FROM _meta WHERE table_name='big_table'").fetchone() + assert meta[0] == "remote" + finally: + conn.close() + + # No parquet file should exist + assert not (Path(output_dir) / "data" / "big_table.parquet").exists() + + def test_handles_extraction_failure(self, output_dir, sample_configs): + """Test that a failed table doesn't stop other tables from extracting.""" + from connectors.keboola.extractor import run + + call_count = 0 + def side_effect(conn, tc, pq_path): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise Exception("Network error") + # Second call succeeds + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=side_effect): + result = run(output_dir, sample_configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 1 + assert len(result["errors"]) == 1 + + def test_creates_data_directory(self, output_dir, sample_configs): + """Test that data/ subdirectory is created.""" + from connectors.keboola.extractor import run + + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + run(output_dir, sample_configs, "https://example.com", "test-token") + + assert (Path(output_dir) / "data").is_dir() + assert (Path(output_dir) / "data" / "orders.parquet").exists() + + def test_views_queryable(self, output_dir): + """Test that views in extract.duckdb can be queried.""" + from connectors.keboola.extractor import run + + configs = [{"name": "test_table", "query_mode": "local", "description": "Test"}] + + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 42 AS value, 'hello' AS msg") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + run(output_dir, configs, "https://example.com", "test-token") + + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb")) + try: + result = conn.execute("SELECT value, msg FROM test_table").fetchone() + assert result[0] == 42 + assert result[1] == "hello" + finally: + conn.close() + + def test_meta_table_schema(self, output_dir): + """Test that _meta table has all required columns.""" + from connectors.keboola.extractor import run + + configs = [{"name": "t", "query_mode": "local", "description": "desc"}] + + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 1 AS x") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + run(output_dir, configs, "https://example.com", "test-token") + + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb")) + try: + cols = conn.execute("SELECT column_name FROM information_schema.columns WHERE table_name='_meta' ORDER BY ordinal_position").fetchall() + col_names = [c[0] for c in cols] + assert col_names == ["table_name", "description", "rows", "size_bytes", "extracted_at", "query_mode"] + finally: + conn.close() + + def test_legacy_fallback_when_extension_unavailable(self, output_dir): + """Test that legacy client is used when extension attach fails.""" + from connectors.keboola.extractor import run + + configs = [{"name": "t", "id": "in.c-test.t", "query_mode": "local", "description": ""}] + + def mock_legacy(tc, pq_path, url, token): + _write_parquet(pq_path, "SELECT 1 AS id") + + # Extension not available + with patch("connectors.keboola.extractor._try_attach_extension", return_value=False), \ + patch("connectors.keboola.extractor._extract_via_legacy", side_effect=mock_legacy): + result = run(output_dir, configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 0 diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py new file mode 100644 index 0000000..49690e8 --- /dev/null +++ b/tests/test_orchestrator.py @@ -0,0 +1,188 @@ +"""Tests for SyncOrchestrator.""" + +import os +from pathlib import Path + +import duckdb +import pytest + + +@pytest.fixture +def setup_env(tmp_path): + """Set up DATA_DIR and return paths.""" + os.environ["DATA_DIR"] = str(tmp_path) + extracts_dir = tmp_path / "extracts" + extracts_dir.mkdir() + analytics_dir = tmp_path / "analytics" + analytics_dir.mkdir() + state_dir = tmp_path / "state" + state_dir.mkdir() + yield { + "data_dir": tmp_path, + "extracts_dir": extracts_dir, + "analytics_db": str(analytics_dir / "server.duckdb"), + } + # Clean up env var to avoid leaking between tests + os.environ.pop("DATA_DIR", None) + + +def _create_mock_extract(extracts_dir: Path, source_name: str, tables: list[dict]): + """Create a mock extract.duckdb with _meta and views.""" + source_dir = extracts_dir / source_name + source_dir.mkdir() + data_dir = source_dir / "data" + data_dir.mkdir() + + db_path = source_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + + conn.execute( + """CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""" + ) + + for t in tables: + name = t["name"] + rows_data = t.get("data", []) + query_mode = t.get("query_mode", "local") + + # Create an actual table (simulating what a view on parquet would look like) + if rows_data: + cols = ", ".join(f"{k} VARCHAR" for k in rows_data[0].keys()) + conn.execute(f'CREATE TABLE "{name}" ({cols})') + for row in rows_data: + vals = ", ".join(f"'{v}'" for v in row.values()) + conn.execute(f'INSERT INTO "{name}" VALUES ({vals})') + else: + conn.execute(f'CREATE TABLE "{name}" (id VARCHAR)') + + row_count = len(rows_data) + conn.execute( + "INSERT INTO _meta VALUES (?, ?, ?, ?, current_timestamp, ?)", + [name, t.get("description", ""), row_count, 0, query_mode], + ) + + conn.close() + + +class TestSyncOrchestrator: + def test_rebuild_empty_extracts(self, setup_env): + from src.orchestrator import SyncOrchestrator + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + assert result == {} + + def test_rebuild_single_source(self, setup_env): + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "keboola", + [ + {"name": "orders", "data": [{"id": "1", "total": "100"}]}, + {"name": "customers", "data": [{"id": "1", "name": "Alice"}]}, + ], + ) + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + assert "keboola" in result + assert set(result["keboola"]) == {"orders", "customers"} + + # Verify views work when source is attached (as the orchestrator leaves it) + # Open a fresh connection and re-attach to simulate how the analytics DB is used + conn = duckdb.connect(setup_env["analytics_db"]) + try: + extract_path = setup_env["extracts_dir"] / "keboola" / "extract.duckdb" + conn.execute(f"ATTACH '{extract_path}' AS keboola (READ_ONLY)") + row = conn.execute("SELECT total FROM orders WHERE id='1'").fetchone() + assert row[0] == "100" + finally: + conn.close() + + def test_rebuild_multiple_sources(self, setup_env): + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + _create_mock_extract( + setup_env["extracts_dir"], + "jira", + [{"name": "issues", "data": [{"key": "PROJ-1"}]}], + ) + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + assert "keboola" in result + assert "jira" in result + + def test_rebuild_skips_missing_extract_db(self, setup_env): + from src.orchestrator import SyncOrchestrator + + # Create directory without extract.duckdb + (setup_env["extracts_dir"] / "broken").mkdir() + _create_mock_extract( + setup_env["extracts_dir"], + "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + assert "broken" not in result + assert "keboola" in result + + def test_rebuild_source_single(self, setup_env): + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "jira", + [{"name": "issues", "data": [{"key": "PROJ-1"}]}], + ) + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + tables = orch.rebuild_source("jira") + assert "issues" in tables + + def test_rebuild_source_nonexistent(self, setup_env): + from src.orchestrator import SyncOrchestrator + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + tables = orch.rebuild_source("nonexistent") + assert tables == [] + + def test_rebuild_with_remote_tables(self, setup_env): + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "bigquery", + [ + { + "name": "page_views", + "query_mode": "remote", + "data": [{"url": "/home"}], + } + ], + ) + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + assert "bigquery" in result + assert "page_views" in result["bigquery"] + + def test_rebuild_idempotent(self, setup_env): + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result1 = orch.rebuild() + result2 = orch.rebuild() + assert result1 == result2 diff --git a/tests/test_repositories.py b/tests/test_repositories.py index 3417ec8..920fbec 100644 --- a/tests/test_repositories.py +++ b/tests/test_repositories.py @@ -293,6 +293,57 @@ class TestTableRegistryRepository: repo.unregister("t1") assert repo.get("t1") is None + def test_register_with_source_fields(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register( + id="in.c-crm.company", name="company", + source_type="keboola", bucket="in.c-crm", source_table="company", + query_mode="local", sync_schedule="every 15m", profile_after_sync=True, + ) + table = repo.get("in.c-crm.company") + assert table["source_type"] == "keboola" + assert table["bucket"] == "in.c-crm" + assert table["source_table"] == "company" + assert table["query_mode"] == "local" + assert table["sync_schedule"] == "every 15m" + assert table["profile_after_sync"] is True + + def test_list_by_source(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="t1", name="A", source_type="keboola") + repo.register(id="t2", name="B", source_type="bigquery") + repo.register(id="t3", name="C", source_type="keboola") + keboola = repo.list_by_source("keboola") + assert len(keboola) == 2 + assert all(t["source_type"] == "keboola" for t in keboola) + bq = repo.list_by_source("bigquery") + assert len(bq) == 1 + + def test_list_local(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register(id="t1", name="A", source_type="keboola", query_mode="local") + repo.register(id="t2", name="B", source_type="bigquery", query_mode="remote") + repo.register(id="t3", name="C", source_type="keboola", query_mode="local") + local = repo.list_local() + assert len(local) == 2 + local_kbc = repo.list_local(source_type="keboola") + assert len(local_kbc) == 2 + + def test_register_bigquery_remote(self, db_conn): + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(db_conn) + repo.register( + id="project.dataset.orders", name="orders", + source_type="bigquery", bucket="dataset", source_table="orders", + query_mode="remote", profile_after_sync=False, + ) + table = repo.get("project.dataset.orders") + assert table["query_mode"] == "remote" + assert table["profile_after_sync"] is False + # ---- Profiles ----