feat: generic _remote_attach contract for remote DuckDB extension views
Extractors with remote tables now write a _remote_attach table into extract.duckdb so the orchestrator can re-ATTACH external extensions at query time. The mechanism is source-agnostic — any connector can use it. - Keboola extractor writes _remote_attach + creates views on kbc.* - Orchestrator reads _remote_attach, installs extension, reads token from env - Graceful degradation: missing token → warning, local tables still work
This commit is contained in:
parent
ee7d5630ef
commit
06e1cf0a8d
5 changed files with 207 additions and 4 deletions
18
CLAUDE.md
18
CLAUDE.md
|
|
@ -65,6 +65,24 @@ Every data source produces the same output:
|
|||
└── data/ ← parquet files (local sources only)
|
||||
```
|
||||
|
||||
### Remote table support (`_remote_attach`)
|
||||
|
||||
Extractors with remote/passthrough tables (query_mode='remote') include a `_remote_attach` table
|
||||
in extract.duckdb so the orchestrator can re-ATTACH the external DuckDB extension at query time:
|
||||
|
||||
```sql
|
||||
CREATE TABLE _remote_attach (
|
||||
alias VARCHAR, -- DuckDB alias used in views, e.g. 'kbc'
|
||||
extension VARCHAR, -- Extension name, e.g. 'keboola'
|
||||
url VARCHAR, -- Connection URL
|
||||
token_env VARCHAR -- Env-var name holding the auth token (NOT the token itself)
|
||||
);
|
||||
```
|
||||
|
||||
The orchestrator reads this table, installs/loads the extension, reads the token from the
|
||||
environment, and ATTACHes the external source. Views referencing `kbc."bucket"."table"` then
|
||||
resolve correctly. This mechanism is generic — any connector can use it.
|
||||
|
||||
The SyncOrchestrator scans `/data/extracts/*/extract.duckdb`, ATTACHes each into master `analytics.duckdb`, and creates views.
|
||||
|
||||
```
|
||||
|
|
|
|||
|
|
@ -24,6 +24,23 @@ def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None:
|
|||
)""")
|
||||
|
||||
|
||||
def _create_remote_attach_table(
|
||||
conn: duckdb.DuckDBPyConnection, keboola_url: str
|
||||
) -> None:
|
||||
"""Write _remote_attach so orchestrator can re-ATTACH the Keboola extension."""
|
||||
conn.execute("DROP TABLE IF EXISTS _remote_attach")
|
||||
conn.execute("""CREATE TABLE _remote_attach (
|
||||
alias VARCHAR,
|
||||
extension VARCHAR,
|
||||
url VARCHAR,
|
||||
token_env VARCHAR
|
||||
)""")
|
||||
conn.execute(
|
||||
"INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
|
||||
["kbc", "keboola", keboola_url, "KEBOOLA_STORAGE_TOKEN"],
|
||||
)
|
||||
|
||||
|
||||
def _try_attach_extension(conn: duckdb.DuckDBPyConnection, keboola_url: str, keboola_token: str) -> bool:
|
||||
"""Try to install and attach the Keboola DuckDB extension. Returns True on success."""
|
||||
try:
|
||||
|
|
@ -69,12 +86,23 @@ def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str,
|
|||
|
||||
_create_meta_table(conn)
|
||||
|
||||
has_remote = any(tc.get("query_mode") == "remote" for tc in table_configs)
|
||||
if has_remote and use_extension:
|
||||
_create_remote_attach_table(conn, keboola_url)
|
||||
|
||||
for tc in table_configs:
|
||||
table_name = tc["name"]
|
||||
query_mode = tc.get("query_mode", "local")
|
||||
|
||||
if query_mode == "remote":
|
||||
# Register in _meta but don't download
|
||||
# Create view pointing to kbc extension (requires re-ATTACH at query time)
|
||||
bucket = tc.get("bucket", "")
|
||||
source_table = tc.get("source_table", table_name)
|
||||
if use_extension and bucket:
|
||||
conn.execute(
|
||||
f'CREATE OR REPLACE VIEW "{table_name}" AS '
|
||||
f'SELECT * FROM kbc."{bucket}"."{source_table}"'
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO _meta VALUES (?, ?, 0, 0, ?, 'remote')",
|
||||
[table_name, tc.get("description", ""), now],
|
||||
|
|
|
|||
|
|
@ -1,4 +1,21 @@
|
|||
"""Sync orchestrator — ATTACHes extract.duckdb files into master analytics.duckdb."""
|
||||
"""Sync orchestrator — ATTACHes extract.duckdb files into master analytics.duckdb.
|
||||
|
||||
Remote table support
|
||||
--------------------
|
||||
Extractors that create views referencing external DuckDB extensions (e.g. Keboola,
|
||||
BigQuery) must include a ``_remote_attach`` table in their extract.duckdb:
|
||||
|
||||
CREATE TABLE _remote_attach (
|
||||
alias VARCHAR, -- DuckDB alias used in views, e.g. 'kbc'
|
||||
extension VARCHAR, -- Extension name, e.g. 'keboola'
|
||||
url VARCHAR, -- Connection URL
|
||||
token_env VARCHAR -- Env-var name holding the auth token (NOT the token itself)
|
||||
);
|
||||
|
||||
At rebuild time the orchestrator reads ``_remote_attach``, installs/loads the
|
||||
extension, reads the token from the environment, and ATTACHes the external source
|
||||
so that remote views resolve correctly.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
|
|
@ -131,6 +148,9 @@ class SyncOrchestrator:
|
|||
try:
|
||||
conn.execute(f"ATTACH '{db_path}' AS {source_name} (READ_ONLY)")
|
||||
|
||||
# Re-ATTACH external extensions needed by remote views
|
||||
self._attach_remote_extensions(conn, source_name)
|
||||
|
||||
# Read _meta to know what's available
|
||||
meta_rows = conn.execute(
|
||||
f"SELECT table_name, rows, size_bytes, query_mode "
|
||||
|
|
@ -152,6 +172,51 @@ class SyncOrchestrator:
|
|||
|
||||
return tables
|
||||
|
||||
def _attach_remote_extensions(
|
||||
self, conn: duckdb.DuckDBPyConnection, source_name: str
|
||||
) -> None:
|
||||
"""Read _remote_attach from extract.duckdb and ATTACH external sources."""
|
||||
try:
|
||||
tables = conn.execute(
|
||||
f"SELECT table_name FROM information_schema.tables "
|
||||
f"WHERE table_schema='{source_name}' AND table_name='_remote_attach'"
|
||||
).fetchall()
|
||||
if not tables:
|
||||
return
|
||||
except Exception:
|
||||
return
|
||||
|
||||
rows = conn.execute(
|
||||
f"SELECT alias, extension, url, token_env FROM {source_name}._remote_attach"
|
||||
).fetchall()
|
||||
|
||||
for alias, extension, url, token_env in rows:
|
||||
token = os.environ.get(token_env, "")
|
||||
if not token:
|
||||
logger.warning(
|
||||
"Remote attach %s: env var %s not set, skipping", alias, token_env
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
# Skip if already attached (e.g. multiple sources share same extension)
|
||||
attached = {
|
||||
r[0] for r in conn.execute(
|
||||
"SELECT database_name FROM duckdb_databases()"
|
||||
).fetchall()
|
||||
}
|
||||
if alias in attached:
|
||||
logger.debug("Remote source %s already attached", alias)
|
||||
continue
|
||||
|
||||
conn.execute(f"INSTALL {extension} FROM community; LOAD {extension};")
|
||||
conn.execute(
|
||||
f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{token}')"
|
||||
)
|
||||
logger.info("Attached remote source %s via %s extension", alias, extension)
|
||||
except Exception as e:
|
||||
logger.error("Failed to attach remote source %s: %s", alias, e)
|
||||
|
||||
def _update_sync_state(self, meta_rows: list, source_name: str) -> None:
|
||||
"""Update sync_state table in system.duckdb from _meta entries."""
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -40,7 +40,9 @@ def sample_configs():
|
|||
|
||||
|
||||
def _mock_attach(conn, url, token):
|
||||
"""Mock that says extension is available."""
|
||||
"""Mock that says extension is available and ATTACHes a fake kbc catalog."""
|
||||
# Create in-memory DB as kbc so views referencing kbc."bucket"."table" can be created
|
||||
conn.execute("ATTACH ':memory:' AS kbc")
|
||||
return True
|
||||
|
||||
|
||||
|
|
@ -90,11 +92,20 @@ class TestKeboolaExtractor:
|
|||
|
||||
configs = [{
|
||||
"name": "big_table",
|
||||
"bucket": "in.c-events",
|
||||
"source_table": "big_table",
|
||||
"query_mode": "remote",
|
||||
"description": "Too large to sync",
|
||||
}]
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach):
|
||||
def mock_attach_with_schema(conn, url, token):
|
||||
"""Mock kbc with the expected bucket schema so remote views can be created."""
|
||||
conn.execute("ATTACH ':memory:' AS kbc")
|
||||
conn.execute('CREATE SCHEMA kbc."in.c-events"')
|
||||
conn.execute('CREATE TABLE kbc."in.c-events"."big_table" (id VARCHAR)')
|
||||
return True
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=mock_attach_with_schema):
|
||||
result = run(output_dir, configs, "https://example.com", "test-token")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
|
|
@ -103,6 +114,13 @@ class TestKeboolaExtractor:
|
|||
try:
|
||||
meta = conn.execute("SELECT query_mode FROM _meta WHERE table_name='big_table'").fetchone()
|
||||
assert meta[0] == "remote"
|
||||
|
||||
# _remote_attach table should exist with Keboola connection info
|
||||
ra = conn.execute("SELECT alias, extension, url, token_env FROM _remote_attach").fetchone()
|
||||
assert ra[0] == "kbc"
|
||||
assert ra[1] == "keboola"
|
||||
assert ra[2] == "https://example.com"
|
||||
assert ra[3] == "KEBOOLA_STORAGE_TOKEN"
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
|
|
|||
|
|
@ -174,6 +174,80 @@ class TestSyncOrchestrator:
|
|||
assert "bigquery" in result
|
||||
assert "page_views" in result["bigquery"]
|
||||
|
||||
def test_rebuild_reads_remote_attach_table(self, setup_env):
|
||||
"""Orchestrator reads _remote_attach and attempts to ATTACH the extension."""
|
||||
from unittest.mock import patch
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
|
||||
# Create extract.duckdb with _remote_attach + a local table
|
||||
source_dir = setup_env["extracts_dir"] / "keboola"
|
||||
source_dir.mkdir()
|
||||
(source_dir / "data").mkdir()
|
||||
|
||||
db_path = source_dir / "extract.duckdb"
|
||||
conn = duckdb.connect(str(db_path))
|
||||
conn.execute("""CREATE TABLE _meta (
|
||||
table_name VARCHAR, description VARCHAR, rows BIGINT,
|
||||
size_bytes BIGINT, extracted_at TIMESTAMP,
|
||||
query_mode VARCHAR DEFAULT 'local'
|
||||
)""")
|
||||
conn.execute("""CREATE TABLE _remote_attach (
|
||||
alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR
|
||||
)""")
|
||||
conn.execute(
|
||||
"INSERT INTO _remote_attach VALUES ('kbc', 'keboola', 'https://kbc.example.com', 'KEBOOLA_STORAGE_TOKEN')"
|
||||
)
|
||||
# Local table (has data, works without extension)
|
||||
conn.execute('CREATE TABLE "orders" (id VARCHAR)')
|
||||
conn.execute("INSERT INTO orders VALUES ('1')")
|
||||
conn.execute(
|
||||
"INSERT INTO _meta VALUES ('orders', '', 1, 0, current_timestamp, 'local')"
|
||||
)
|
||||
conn.close()
|
||||
|
||||
# Token env is set but extension install will fail (not available in test)
|
||||
# — orchestrator should log warning and continue with local tables
|
||||
with patch.dict(os.environ, {"KEBOOLA_STORAGE_TOKEN": "test-token"}):
|
||||
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
||||
result = orch.rebuild()
|
||||
|
||||
assert "keboola" in result
|
||||
assert "orders" in result["keboola"]
|
||||
|
||||
def test_rebuild_remote_attach_skips_missing_token(self, setup_env):
|
||||
"""Orchestrator skips remote ATTACH when env var is not set."""
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
|
||||
source_dir = setup_env["extracts_dir"] / "keboola"
|
||||
source_dir.mkdir()
|
||||
(source_dir / "data").mkdir()
|
||||
|
||||
db_path = source_dir / "extract.duckdb"
|
||||
conn = duckdb.connect(str(db_path))
|
||||
conn.execute("""CREATE TABLE _meta (
|
||||
table_name VARCHAR, description VARCHAR, rows BIGINT,
|
||||
size_bytes BIGINT, extracted_at TIMESTAMP,
|
||||
query_mode VARCHAR DEFAULT 'local'
|
||||
)""")
|
||||
conn.execute("""CREATE TABLE _remote_attach (
|
||||
alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR
|
||||
)""")
|
||||
conn.execute(
|
||||
"INSERT INTO _remote_attach VALUES ('kbc', 'keboola', 'https://kbc.example.com', 'NONEXISTENT_TOKEN_VAR')"
|
||||
)
|
||||
conn.execute('CREATE TABLE "orders" (id VARCHAR)')
|
||||
conn.execute(
|
||||
"INSERT INTO _meta VALUES ('orders', '', 0, 0, current_timestamp, 'local')"
|
||||
)
|
||||
conn.close()
|
||||
|
||||
# No token env set — remote attach should be skipped, local tables still work
|
||||
orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"])
|
||||
result = orch.rebuild()
|
||||
|
||||
assert "keboola" in result
|
||||
assert "orders" in result["keboola"]
|
||||
|
||||
def test_rebuild_idempotent(self, setup_env):
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue