fix(keboola): per-table fallback to legacy Storage-API client (#183)
* fix(keboola): per-table fallback to legacy Storage-API client The DuckDB Keboola extension's per-table COPY fails with `Schema '..."in.c-..."' does not exist or not authorized` on projects whose Snowflake backend doesn't expose bucket schemas to the storage-token-derived QueryService role (keboola/duckdb-extension#17). ATTACH itself succeeds, so the existing extension-level fallback in `_try_attach_extension` never triggers — the table is just marked failed. - Promote `kbcstorage>=0.9.0` from optional to core dep so the legacy client import in `_extract_via_legacy` doesn't crash default installs with `ModuleNotFoundError`. - Wrap `_extract_via_extension` in a per-table try/except so a scan failure retries via `_extract_via_legacy` instead of recording `tables_failed` and moving on. Slower than the extension path, but produces correct parquets on affected projects while the upstream extension fix lands. * test(keboola): cover per-table extension→legacy fallback Two existing tests mocked _extract_via_extension to throw and asserted the original message survived in result["errors"]. With per-table fallback, the new flow retries via _extract_via_legacy — which on the mock URLs would throw a different (404 / DNS-fail) error, replacing the asserted message. - Mock _extract_via_legacy alongside _extract_via_extension in test_network_timeout_during_extraction + test_partial_failure_continues + test_all_tables_fail_returns_full_failure_stats so the assertion observes the final propagated error from the fallback chain. - Add test_extension_per_table_failure_falls_back_to_legacy that exercises the new behavior directly: extension scan fails with the QueryService schema-not-authorized message (keboola/duckdb-extension#17), legacy succeeds, parquet ends up queryable.
This commit is contained in:
parent
91f2605865
commit
4751094e1c
5 changed files with 97 additions and 7 deletions
|
|
@ -10,6 +10,13 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Fixed
|
||||
|
||||
- Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes:
|
||||
- `kbcstorage>=0.9.0` is promoted from optional to core dependency. The legacy fallback path in `connectors/keboola/extractor.py:_extract_via_legacy` has been there since the extension landed, but until now the bare `from kbcstorage.client import Client` would crash any default install with `ModuleNotFoundError`.
|
||||
- `connectors/keboola/extractor.py:run` now wraps `_extract_via_extension` in a per-table try/except — on any per-table scan failure it retries via the legacy client. Previously, when `ATTACH` succeeded but the table-level `COPY (SELECT * FROM kbc."<bucket>"."<table>")` failed, the table was just marked failed with no retry.
|
||||
Together these unblock deployments where the extension's bucket-schema scans return `Schema '..."in.c-..."' does not exist or not authorized` (keboola/duckdb-extension#17) while the upstream extension fix is in flight.
|
||||
|
||||
## [0.35.1] — 2026-05-05
|
||||
|
||||
### Fixed
|
||||
|
|
|
|||
|
|
@ -236,7 +236,20 @@ def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str,
|
|||
pq_path = str(data_dir / f"{table_name}.parquet")
|
||||
|
||||
if use_extension:
|
||||
_extract_via_extension(conn, tc, pq_path)
|
||||
try:
|
||||
_extract_via_extension(conn, tc, pq_path)
|
||||
except Exception as ext_err:
|
||||
# ATTACH succeeded but the per-table COPY failed —
|
||||
# most commonly a Keboola QueryService permission error
|
||||
# (`Schema '..."in.c-..."' does not exist or not
|
||||
# authorized`, see keboola/duckdb-extension#17). The
|
||||
# legacy Storage-API client doesn't go through
|
||||
# QueryService at all, so retry there.
|
||||
logger.warning(
|
||||
"Keboola extension scan failed for %s (%s); retrying via legacy Storage-API client",
|
||||
table_name, ext_err,
|
||||
)
|
||||
_extract_via_legacy(tc, pq_path, keboola_url, keboola_token)
|
||||
else:
|
||||
_extract_via_legacy(tc, pq_path, keboola_url, keboola_token)
|
||||
|
||||
|
|
|
|||
|
|
@ -62,11 +62,23 @@ dependencies = [
|
|||
# ModuleNotFoundError boot loops on default Compose deploys.
|
||||
"anthropic>=0.30.0",
|
||||
"openai>=1.30.0",
|
||||
# Legacy Keboola Storage API client. The primary Keboola path is the
|
||||
# DuckDB community extension (`connectors/keboola/access.py`,
|
||||
# `connectors/keboola/extractor.py`), but it routes scans through
|
||||
# Keboola QueryService — and on projects whose Snowflake backend
|
||||
# doesn't expose bucket schemas to the storage-token-derived role
|
||||
# the extension fails with `Schema '..."in.c-..."' does not exist
|
||||
# or not authorized` while the same token reads fine via the
|
||||
# `/v2/storage/tables/{id}/data-preview` REST endpoint
|
||||
# (keboola/duckdb-extension#17). The extractor has had a `kbcstorage`
|
||||
# fallback path since the extension landed
|
||||
# (`connectors/keboola/extractor.py:_extract_via_legacy`); making
|
||||
# the dep core means that fallback is actually importable in the
|
||||
# default install instead of crashing with `ModuleNotFoundError`.
|
||||
"kbcstorage>=0.9.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
# keboola-legacy: install kbcstorage>=0.9.0 manually if you need the legacy
|
||||
# Keboola client fallback (primary path uses DuckDB Keboola extension)
|
||||
dev = [
|
||||
"pytest>=9.0.0",
|
||||
"pytest-timeout>=2.0.0",
|
||||
|
|
|
|||
|
|
@ -318,7 +318,17 @@ class TestKeboolaExtractorFailureModes:
|
|||
raise socket.timeout("Connection timed out")
|
||||
_write_parquet(pq_path, "SELECT 1 AS id")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=side_effect):
|
||||
# When extension scan fails, the per-table flow now retries via
|
||||
# _extract_via_legacy. Mock it to re-raise the same socket.timeout
|
||||
# so we observe the final error surface; the contract under test is
|
||||
# "extension failure doesn't crash, error makes it into stats, other
|
||||
# tables continue", not which path produced the message.
|
||||
def legacy_reraise(tc, pq_path, url, token):
|
||||
raise socket.timeout("Connection timed out")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=side_effect), \
|
||||
patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_reraise):
|
||||
result = run(output_dir, configs, "https://example.com", "test-token")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
|
|
@ -348,6 +358,37 @@ class TestKeboolaExtractorFailureModes:
|
|||
assert val[0] == 42
|
||||
conn.close()
|
||||
|
||||
def test_extension_per_table_failure_falls_back_to_legacy(self, output_dir):
|
||||
"""When ATTACH succeeds but the per-table extension scan fails (e.g.
|
||||
Keboola QueryService schema/role mismatch — keboola/duckdb-extension#17),
|
||||
the extractor retries that table via the legacy Storage-API client."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
configs = [{"name": "t", "id": "in.c-test.t", "query_mode": "local",
|
||||
"bucket": "in.c-test", "source_table": "t", "description": ""}]
|
||||
|
||||
def extension_scan_fails(conn, tc, pq_path):
|
||||
raise RuntimeError(
|
||||
"Keboola scan failed: Schema 'KBC_USE4_NNNN.\"in.c-test\"' "
|
||||
"does not exist or not authorized."
|
||||
)
|
||||
|
||||
def legacy_succeeds(tc, pq_path, url, token):
|
||||
_write_parquet(pq_path, "SELECT 7 AS value")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=extension_scan_fails), \
|
||||
patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_succeeds):
|
||||
result = run(output_dir, configs, "https://example.com", "test-token")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
assert result["tables_failed"] == 0
|
||||
# Verify the legacy-produced data is queryable
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"))
|
||||
val = conn.execute("SELECT value FROM t").fetchone()
|
||||
assert val[0] == 7
|
||||
conn.close()
|
||||
|
||||
def test_all_tables_fail_returns_full_failure_stats(self, output_dir):
|
||||
"""When every table fails, the extractor returns all failures in stats
|
||||
without crashing."""
|
||||
|
|
@ -361,7 +402,14 @@ class TestKeboolaExtractorFailureModes:
|
|||
def always_fail(conn, tc, pq_path):
|
||||
raise RuntimeError("Extraction failed")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=always_fail):
|
||||
# Mock legacy too — otherwise it would attempt a real HTTP call to
|
||||
# the fake URL on each per-table fallback retry.
|
||||
def legacy_also_fails(tc, pq_path, url, token):
|
||||
raise RuntimeError("Extraction failed")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=always_fail), \
|
||||
patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_also_fails):
|
||||
result = run(output_dir, configs, "https://example.com", "test-token")
|
||||
|
||||
assert result["tables_extracted"] == 0
|
||||
|
|
@ -381,7 +429,8 @@ class TestKeboolaExtractorFailureModes:
|
|||
def write_pq(conn, tc, pq_path):
|
||||
_write_parquet(pq_path, "SELECT 1 AS id")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq):
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq):
|
||||
result = run(output_dir, configs, "https://example.com", "test-token")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
|
|
|
|||
|
|
@ -197,8 +197,17 @@ class TestKeboolaExtractorFull:
|
|||
raise RuntimeError("Connection reset")
|
||||
_write_parquet(pq_path, "SELECT 1 AS id")
|
||||
|
||||
# When extension scan fails, the per-table flow now retries via
|
||||
# _extract_via_legacy. Mock it to re-raise so we observe the final
|
||||
# error surface; the contract under test is "single table failure
|
||||
# doesn't abort remaining tables", not which path produced the
|
||||
# message.
|
||||
def legacy_reraise(tc, pq_path, url, token):
|
||||
raise RuntimeError("Connection reset")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=failing_first):
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=failing_first), \
|
||||
patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_reraise):
|
||||
result = run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
|
|
|
|||
Loading…
Reference in a new issue