diff --git a/CLAUDE.md b/CLAUDE.md index 5c18bd2..9bec317 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -65,6 +65,24 @@ Every data source produces the same output: └── data/ ← parquet files (local sources only) ``` +### Remote table support (`_remote_attach`) + +Extractors with remote/passthrough tables (query_mode='remote') include a `_remote_attach` table +in extract.duckdb so the orchestrator can re-ATTACH the external DuckDB extension at query time: + +```sql +CREATE TABLE _remote_attach ( + alias VARCHAR, -- DuckDB alias used in views, e.g. 'kbc' + extension VARCHAR, -- Extension name, e.g. 'keboola' + url VARCHAR, -- Connection URL + token_env VARCHAR -- Env-var name holding the auth token (NOT the token itself) +); +``` + +The orchestrator reads this table, installs/loads the extension, reads the token from the +environment, and ATTACHes the external source. Views referencing `kbc."bucket"."table"` then +resolve correctly. This mechanism is generic — any connector can use it. + The SyncOrchestrator scans `/data/extracts/*/extract.duckdb`, ATTACHes each into master `analytics.duckdb`, and creates views. ``` diff --git a/connectors/keboola/extractor.py b/connectors/keboola/extractor.py index b5e4d78..696a340 100644 --- a/connectors/keboola/extractor.py +++ b/connectors/keboola/extractor.py @@ -24,6 +24,23 @@ def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None: )""") +def _create_remote_attach_table( + conn: duckdb.DuckDBPyConnection, keboola_url: str +) -> None: + """Write _remote_attach so orchestrator can re-ATTACH the Keboola extension.""" + conn.execute("DROP TABLE IF EXISTS _remote_attach") + conn.execute("""CREATE TABLE _remote_attach ( + alias VARCHAR, + extension VARCHAR, + url VARCHAR, + token_env VARCHAR + )""") + conn.execute( + "INSERT INTO _remote_attach VALUES (?, ?, ?, ?)", + ["kbc", "keboola", keboola_url, "KEBOOLA_STORAGE_TOKEN"], + ) + + def _try_attach_extension(conn: duckdb.DuckDBPyConnection, keboola_url: str, keboola_token: str) -> bool: """Try to install and attach the Keboola DuckDB extension. Returns True on success.""" try: @@ -69,12 +86,23 @@ def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str, _create_meta_table(conn) + has_remote = any(tc.get("query_mode") == "remote" for tc in table_configs) + if has_remote and use_extension: + _create_remote_attach_table(conn, keboola_url) + for tc in table_configs: table_name = tc["name"] query_mode = tc.get("query_mode", "local") if query_mode == "remote": - # Register in _meta but don't download + # Create view pointing to kbc extension (requires re-ATTACH at query time) + bucket = tc.get("bucket", "") + source_table = tc.get("source_table", table_name) + if use_extension and bucket: + conn.execute( + f'CREATE OR REPLACE VIEW "{table_name}" AS ' + f'SELECT * FROM kbc."{bucket}"."{source_table}"' + ) conn.execute( "INSERT INTO _meta VALUES (?, ?, 0, 0, ?, 'remote')", [table_name, tc.get("description", ""), now], diff --git a/src/orchestrator.py b/src/orchestrator.py index 6ef5118..c76b49d 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -1,4 +1,21 @@ -"""Sync orchestrator — ATTACHes extract.duckdb files into master analytics.duckdb.""" +"""Sync orchestrator — ATTACHes extract.duckdb files into master analytics.duckdb. + +Remote table support +-------------------- +Extractors that create views referencing external DuckDB extensions (e.g. Keboola, +BigQuery) must include a ``_remote_attach`` table in their extract.duckdb: + + CREATE TABLE _remote_attach ( + alias VARCHAR, -- DuckDB alias used in views, e.g. 'kbc' + extension VARCHAR, -- Extension name, e.g. 'keboola' + url VARCHAR, -- Connection URL + token_env VARCHAR -- Env-var name holding the auth token (NOT the token itself) + ); + +At rebuild time the orchestrator reads ``_remote_attach``, installs/loads the +extension, reads the token from the environment, and ATTACHes the external source +so that remote views resolve correctly. +""" import hashlib import logging @@ -131,6 +148,9 @@ class SyncOrchestrator: try: conn.execute(f"ATTACH '{db_path}' AS {source_name} (READ_ONLY)") + # Re-ATTACH external extensions needed by remote views + self._attach_remote_extensions(conn, source_name) + # Read _meta to know what's available meta_rows = conn.execute( f"SELECT table_name, rows, size_bytes, query_mode " @@ -152,6 +172,51 @@ class SyncOrchestrator: return tables + def _attach_remote_extensions( + self, conn: duckdb.DuckDBPyConnection, source_name: str + ) -> None: + """Read _remote_attach from extract.duckdb and ATTACH external sources.""" + try: + tables = conn.execute( + f"SELECT table_name FROM information_schema.tables " + f"WHERE table_schema='{source_name}' AND table_name='_remote_attach'" + ).fetchall() + if not tables: + return + except Exception: + return + + rows = conn.execute( + f"SELECT alias, extension, url, token_env FROM {source_name}._remote_attach" + ).fetchall() + + for alias, extension, url, token_env in rows: + token = os.environ.get(token_env, "") + if not token: + logger.warning( + "Remote attach %s: env var %s not set, skipping", alias, token_env + ) + continue + + try: + # Skip if already attached (e.g. multiple sources share same extension) + attached = { + r[0] for r in conn.execute( + "SELECT database_name FROM duckdb_databases()" + ).fetchall() + } + if alias in attached: + logger.debug("Remote source %s already attached", alias) + continue + + conn.execute(f"INSTALL {extension} FROM community; LOAD {extension};") + conn.execute( + f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{token}')" + ) + logger.info("Attached remote source %s via %s extension", alias, extension) + except Exception as e: + logger.error("Failed to attach remote source %s: %s", alias, e) + def _update_sync_state(self, meta_rows: list, source_name: str) -> None: """Update sync_state table in system.duckdb from _meta entries.""" try: diff --git a/tests/test_keboola_extractor.py b/tests/test_keboola_extractor.py index 53429b5..d3f6fa5 100644 --- a/tests/test_keboola_extractor.py +++ b/tests/test_keboola_extractor.py @@ -40,7 +40,9 @@ def sample_configs(): def _mock_attach(conn, url, token): - """Mock that says extension is available.""" + """Mock that says extension is available and ATTACHes a fake kbc catalog.""" + # Create in-memory DB as kbc so views referencing kbc."bucket"."table" can be created + conn.execute("ATTACH ':memory:' AS kbc") return True @@ -90,11 +92,20 @@ class TestKeboolaExtractor: configs = [{ "name": "big_table", + "bucket": "in.c-events", + "source_table": "big_table", "query_mode": "remote", "description": "Too large to sync", }] - with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach): + def mock_attach_with_schema(conn, url, token): + """Mock kbc with the expected bucket schema so remote views can be created.""" + conn.execute("ATTACH ':memory:' AS kbc") + conn.execute('CREATE SCHEMA kbc."in.c-events"') + conn.execute('CREATE TABLE kbc."in.c-events"."big_table" (id VARCHAR)') + return True + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=mock_attach_with_schema): result = run(output_dir, configs, "https://example.com", "test-token") assert result["tables_extracted"] == 1 @@ -103,6 +114,13 @@ class TestKeboolaExtractor: try: meta = conn.execute("SELECT query_mode FROM _meta WHERE table_name='big_table'").fetchone() assert meta[0] == "remote" + + # _remote_attach table should exist with Keboola connection info + ra = conn.execute("SELECT alias, extension, url, token_env FROM _remote_attach").fetchone() + assert ra[0] == "kbc" + assert ra[1] == "keboola" + assert ra[2] == "https://example.com" + assert ra[3] == "KEBOOLA_STORAGE_TOKEN" finally: conn.close() diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 49690e8..429d3dc 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -174,6 +174,80 @@ class TestSyncOrchestrator: assert "bigquery" in result assert "page_views" in result["bigquery"] + def test_rebuild_reads_remote_attach_table(self, setup_env): + """Orchestrator reads _remote_attach and attempts to ATTACH the extension.""" + from unittest.mock import patch + from src.orchestrator import SyncOrchestrator + + # Create extract.duckdb with _remote_attach + a local table + source_dir = setup_env["extracts_dir"] / "keboola" + source_dir.mkdir() + (source_dir / "data").mkdir() + + db_path = source_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""") + conn.execute("""CREATE TABLE _remote_attach ( + alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR + )""") + conn.execute( + "INSERT INTO _remote_attach VALUES ('kbc', 'keboola', 'https://kbc.example.com', 'KEBOOLA_STORAGE_TOKEN')" + ) + # Local table (has data, works without extension) + conn.execute('CREATE TABLE "orders" (id VARCHAR)') + conn.execute("INSERT INTO orders VALUES ('1')") + conn.execute( + "INSERT INTO _meta VALUES ('orders', '', 1, 0, current_timestamp, 'local')" + ) + conn.close() + + # Token env is set but extension install will fail (not available in test) + # — orchestrator should log warning and continue with local tables + with patch.dict(os.environ, {"KEBOOLA_STORAGE_TOKEN": "test-token"}): + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + assert "keboola" in result + assert "orders" in result["keboola"] + + def test_rebuild_remote_attach_skips_missing_token(self, setup_env): + """Orchestrator skips remote ATTACH when env var is not set.""" + from src.orchestrator import SyncOrchestrator + + source_dir = setup_env["extracts_dir"] / "keboola" + source_dir.mkdir() + (source_dir / "data").mkdir() + + db_path = source_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""") + conn.execute("""CREATE TABLE _remote_attach ( + alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR + )""") + conn.execute( + "INSERT INTO _remote_attach VALUES ('kbc', 'keboola', 'https://kbc.example.com', 'NONEXISTENT_TOKEN_VAR')" + ) + conn.execute('CREATE TABLE "orders" (id VARCHAR)') + conn.execute( + "INSERT INTO _meta VALUES ('orders', '', 0, 0, current_timestamp, 'local')" + ) + conn.close() + + # No token env set — remote attach should be skipped, local tables still work + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + assert "keboola" in result + assert "orders" in result["keboola"] + def test_rebuild_idempotent(self, setup_env): from src.orchestrator import SyncOrchestrator