feat: add _remote_attach to BigQuery extractor, support token-less ATTACH in orchestrator
BigQuery extension handles auth via GOOGLE_APPLICATION_CREDENTIALS env var, so _remote_attach uses empty token_env. Orchestrator now supports both token-based (Keboola) and env-based (BigQuery) authentication modes.
This commit is contained in:
parent
06e1cf0a8d
commit
3ba207a7f8
3 changed files with 41 additions and 6 deletions
|
|
@ -27,6 +27,25 @@ def _create_meta_table(conn: duckdb.DuckDBPyConnection) -> None:
|
|||
)""")
|
||||
|
||||
|
||||
def _create_remote_attach_table(
|
||||
conn: duckdb.DuckDBPyConnection, project_id: str
|
||||
) -> None:
|
||||
"""Write _remote_attach so orchestrator can re-ATTACH the BigQuery extension."""
|
||||
conn.execute("DROP TABLE IF EXISTS _remote_attach")
|
||||
conn.execute("""CREATE TABLE _remote_attach (
|
||||
alias VARCHAR,
|
||||
extension VARCHAR,
|
||||
url VARCHAR,
|
||||
token_env VARCHAR
|
||||
)""")
|
||||
# BigQuery uses GOOGLE_APPLICATION_CREDENTIALS env var for auth automatically.
|
||||
# token_env is empty — orchestrator ATTACHes without TOKEN param.
|
||||
conn.execute(
|
||||
"INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
|
||||
["bq", "bigquery", f"project={project_id}", ""],
|
||||
)
|
||||
|
||||
|
||||
def init_extract(
|
||||
output_dir: str,
|
||||
project_id: str,
|
||||
|
|
@ -58,6 +77,7 @@ def init_extract(
|
|||
logger.info("Attached BigQuery project: %s", project_id)
|
||||
|
||||
_create_meta_table(conn)
|
||||
_create_remote_attach_table(conn, project_id)
|
||||
|
||||
for tc in table_configs:
|
||||
table_name = tc["name"]
|
||||
|
|
|
|||
|
|
@ -191,8 +191,8 @@ class SyncOrchestrator:
|
|||
).fetchall()
|
||||
|
||||
for alias, extension, url, token_env in rows:
|
||||
token = os.environ.get(token_env, "")
|
||||
if not token:
|
||||
token = os.environ.get(token_env, "") if token_env else ""
|
||||
if token_env and not token:
|
||||
logger.warning(
|
||||
"Remote attach %s: env var %s not set, skipping", alias, token_env
|
||||
)
|
||||
|
|
@ -210,9 +210,15 @@ class SyncOrchestrator:
|
|||
continue
|
||||
|
||||
conn.execute(f"INSTALL {extension} FROM community; LOAD {extension};")
|
||||
conn.execute(
|
||||
f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{token}')"
|
||||
)
|
||||
if token:
|
||||
conn.execute(
|
||||
f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{token}')"
|
||||
)
|
||||
else:
|
||||
# Extensions like BigQuery handle auth via env (e.g. GOOGLE_APPLICATION_CREDENTIALS)
|
||||
conn.execute(
|
||||
f"ATTACH '{url}' AS {alias} (TYPE {extension}, READ_ONLY)"
|
||||
)
|
||||
logger.info("Attached remote source %s via %s extension", alias, extension)
|
||||
except Exception as e:
|
||||
logger.error("Failed to attach remote source %s: %s", alias, e)
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ class _DuckDBProxy:
|
|||
|
||||
class TestBigQueryExtractor:
|
||||
def test_creates_extract_duckdb_with_meta(self, output_dir, sample_configs):
|
||||
"""Test that init_extract creates extract.duckdb with _meta table."""
|
||||
"""Test that init_extract creates extract.duckdb with _meta and _remote_attach."""
|
||||
from unittest.mock import patch
|
||||
|
||||
def proxy_connect(path=None, **kwargs):
|
||||
|
|
@ -102,6 +102,15 @@ class TestBigQueryExtractor:
|
|||
assert meta[0][1] == "remote"
|
||||
assert meta[1][0] == "sessions"
|
||||
assert meta[1][1] == "remote"
|
||||
|
||||
# Verify _remote_attach table for orchestrator re-ATTACH
|
||||
ra = conn.execute(
|
||||
"SELECT alias, extension, url, token_env FROM _remote_attach"
|
||||
).fetchone()
|
||||
assert ra[0] == "bq"
|
||||
assert ra[1] == "bigquery"
|
||||
assert ra[2] == "project=my-project"
|
||||
assert ra[3] == "" # BQ handles auth via env automatically
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue