From 4751094e1c601e4ace22b53867ed512e7ee26d5f Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr <139972147+ZdenekSrotyr@users.noreply.github.com> Date: Tue, 5 May 2026 15:47:44 +0200 Subject: [PATCH] fix(keboola): per-table fallback to legacy Storage-API client (#183) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(keboola): per-table fallback to legacy Storage-API client The DuckDB Keboola extension's per-table COPY fails with `Schema '..."in.c-..."' does not exist or not authorized` on projects whose Snowflake backend doesn't expose bucket schemas to the storage-token-derived QueryService role (keboola/duckdb-extension#17). ATTACH itself succeeds, so the existing extension-level fallback in `_try_attach_extension` never triggers — the table is just marked failed. - Promote `kbcstorage>=0.9.0` from optional to core dep so the legacy client import in `_extract_via_legacy` doesn't crash default installs with `ModuleNotFoundError`. - Wrap `_extract_via_extension` in a per-table try/except so a scan failure retries via `_extract_via_legacy` instead of recording `tables_failed` and moving on. Slower than the extension path, but produces correct parquets on affected projects while the upstream extension fix lands. * test(keboola): cover per-table extension→legacy fallback Two existing tests mocked _extract_via_extension to throw and asserted the original message survived in result["errors"]. With per-table fallback, the new flow retries via _extract_via_legacy — which on the mock URLs would throw a different (404 / DNS-fail) error, replacing the asserted message. - Mock _extract_via_legacy alongside _extract_via_extension in test_network_timeout_during_extraction + test_partial_failure_continues + test_all_tables_fail_returns_full_failure_stats so the assertion observes the final propagated error from the fallback chain. - Add test_extension_per_table_failure_falls_back_to_legacy that exercises the new behavior directly: extension scan fails with the QueryService schema-not-authorized message (keboola/duckdb-extension#17), legacy succeeds, parquet ends up queryable. --- CHANGELOG.md | 7 ++++ connectors/keboola/extractor.py | 15 +++++++- pyproject.toml | 16 +++++++- tests/test_keboola_extractor.py | 55 ++++++++++++++++++++++++++-- tests/test_keboola_extractor_full.py | 11 +++++- 5 files changed, 97 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d3a8d8..8af7204 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,13 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Fixed + +- Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes: + - `kbcstorage>=0.9.0` is promoted from optional to core dependency. The legacy fallback path in `connectors/keboola/extractor.py:_extract_via_legacy` has been there since the extension landed, but until now the bare `from kbcstorage.client import Client` would crash any default install with `ModuleNotFoundError`. + - `connectors/keboola/extractor.py:run` now wraps `_extract_via_extension` in a per-table try/except — on any per-table scan failure it retries via the legacy client. Previously, when `ATTACH` succeeded but the table-level `COPY (SELECT * FROM kbc.""."")` failed, the table was just marked failed with no retry. + Together these unblock deployments where the extension's bucket-schema scans return `Schema '..."in.c-..."' does not exist or not authorized` (keboola/duckdb-extension#17) while the upstream extension fix is in flight. + ## [0.35.1] — 2026-05-05 ### Fixed diff --git a/connectors/keboola/extractor.py b/connectors/keboola/extractor.py index cadf985..1d47e0c 100644 --- a/connectors/keboola/extractor.py +++ b/connectors/keboola/extractor.py @@ -236,7 +236,20 @@ def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str, pq_path = str(data_dir / f"{table_name}.parquet") if use_extension: - _extract_via_extension(conn, tc, pq_path) + try: + _extract_via_extension(conn, tc, pq_path) + except Exception as ext_err: + # ATTACH succeeded but the per-table COPY failed — + # most commonly a Keboola QueryService permission error + # (`Schema '..."in.c-..."' does not exist or not + # authorized`, see keboola/duckdb-extension#17). The + # legacy Storage-API client doesn't go through + # QueryService at all, so retry there. + logger.warning( + "Keboola extension scan failed for %s (%s); retrying via legacy Storage-API client", + table_name, ext_err, + ) + _extract_via_legacy(tc, pq_path, keboola_url, keboola_token) else: _extract_via_legacy(tc, pq_path, keboola_url, keboola_token) diff --git a/pyproject.toml b/pyproject.toml index 4359a74..c4c02c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,11 +62,23 @@ dependencies = [ # ModuleNotFoundError boot loops on default Compose deploys. "anthropic>=0.30.0", "openai>=1.30.0", + # Legacy Keboola Storage API client. The primary Keboola path is the + # DuckDB community extension (`connectors/keboola/access.py`, + # `connectors/keboola/extractor.py`), but it routes scans through + # Keboola QueryService — and on projects whose Snowflake backend + # doesn't expose bucket schemas to the storage-token-derived role + # the extension fails with `Schema '..."in.c-..."' does not exist + # or not authorized` while the same token reads fine via the + # `/v2/storage/tables/{id}/data-preview` REST endpoint + # (keboola/duckdb-extension#17). The extractor has had a `kbcstorage` + # fallback path since the extension landed + # (`connectors/keboola/extractor.py:_extract_via_legacy`); making + # the dep core means that fallback is actually importable in the + # default install instead of crashing with `ModuleNotFoundError`. + "kbcstorage>=0.9.0", ] [project.optional-dependencies] -# keboola-legacy: install kbcstorage>=0.9.0 manually if you need the legacy -# Keboola client fallback (primary path uses DuckDB Keboola extension) dev = [ "pytest>=9.0.0", "pytest-timeout>=2.0.0", diff --git a/tests/test_keboola_extractor.py b/tests/test_keboola_extractor.py index f6c65ee..2158179 100644 --- a/tests/test_keboola_extractor.py +++ b/tests/test_keboola_extractor.py @@ -318,7 +318,17 @@ class TestKeboolaExtractorFailureModes: raise socket.timeout("Connection timed out") _write_parquet(pq_path, "SELECT 1 AS id") - with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=side_effect): + # When extension scan fails, the per-table flow now retries via + # _extract_via_legacy. Mock it to re-raise the same socket.timeout + # so we observe the final error surface; the contract under test is + # "extension failure doesn't crash, error makes it into stats, other + # tables continue", not which path produced the message. + def legacy_reraise(tc, pq_path, url, token): + raise socket.timeout("Connection timed out") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=side_effect), \ + patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_reraise): result = run(output_dir, configs, "https://example.com", "test-token") assert result["tables_extracted"] == 1 @@ -348,6 +358,37 @@ class TestKeboolaExtractorFailureModes: assert val[0] == 42 conn.close() + def test_extension_per_table_failure_falls_back_to_legacy(self, output_dir): + """When ATTACH succeeds but the per-table extension scan fails (e.g. + Keboola QueryService schema/role mismatch — keboola/duckdb-extension#17), + the extractor retries that table via the legacy Storage-API client.""" + from connectors.keboola.extractor import run + + configs = [{"name": "t", "id": "in.c-test.t", "query_mode": "local", + "bucket": "in.c-test", "source_table": "t", "description": ""}] + + def extension_scan_fails(conn, tc, pq_path): + raise RuntimeError( + "Keboola scan failed: Schema 'KBC_USE4_NNNN.\"in.c-test\"' " + "does not exist or not authorized." + ) + + def legacy_succeeds(tc, pq_path, url, token): + _write_parquet(pq_path, "SELECT 7 AS value") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=extension_scan_fails), \ + patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_succeeds): + result = run(output_dir, configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 0 + # Verify the legacy-produced data is queryable + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb")) + val = conn.execute("SELECT value FROM t").fetchone() + assert val[0] == 7 + conn.close() + def test_all_tables_fail_returns_full_failure_stats(self, output_dir): """When every table fails, the extractor returns all failures in stats without crashing.""" @@ -361,7 +402,14 @@ class TestKeboolaExtractorFailureModes: def always_fail(conn, tc, pq_path): raise RuntimeError("Extraction failed") - with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=always_fail): + # Mock legacy too — otherwise it would attempt a real HTTP call to + # the fake URL on each per-table fallback retry. + def legacy_also_fails(tc, pq_path, url, token): + raise RuntimeError("Extraction failed") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=always_fail), \ + patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_also_fails): result = run(output_dir, configs, "https://example.com", "test-token") assert result["tables_extracted"] == 0 @@ -381,7 +429,8 @@ class TestKeboolaExtractorFailureModes: def write_pq(conn, tc, pq_path): _write_parquet(pq_path, "SELECT 1 AS id") - with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): result = run(output_dir, configs, "https://example.com", "test-token") assert result["tables_extracted"] == 1 diff --git a/tests/test_keboola_extractor_full.py b/tests/test_keboola_extractor_full.py index fe15503..34ad39f 100644 --- a/tests/test_keboola_extractor_full.py +++ b/tests/test_keboola_extractor_full.py @@ -197,8 +197,17 @@ class TestKeboolaExtractorFull: raise RuntimeError("Connection reset") _write_parquet(pq_path, "SELECT 1 AS id") + # When extension scan fails, the per-table flow now retries via + # _extract_via_legacy. Mock it to re-raise so we observe the final + # error surface; the contract under test is "single table failure + # doesn't abort remaining tables", not which path produced the + # message. + def legacy_reraise(tc, pq_path, url, token): + raise RuntimeError("Connection reset") + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ - patch("connectors.keboola.extractor._extract_via_extension", side_effect=failing_first): + patch("connectors.keboola.extractor._extract_via_extension", side_effect=failing_first), \ + patch("connectors.keboola.extractor._extract_via_legacy", side_effect=legacy_reraise): result = run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc") assert result["tables_extracted"] == 1