From bc3ba0d43d5ef509d036e67444cc8d4ad9060ca3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Fri, 1 May 2026 23:04:51 +0200 Subject: [PATCH] feat(admin-api): reject register-table for source_type not configured on instance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit E2E sub-agent finding: instance configured with `data_source.type='bigquery'` and no `data_source.keboola.*` block. Admin POSTs `{source_type: 'keboola'}` to /api/admin/register-table → returns 201, row lands in the registry, but never syncs because the scheduler has no Keboola URL/token to ATTACH against. Operator only notices the gap when `da catalog` keeps showing nothing. The new `_validate_source_type_configured` helper runs immediately after the id/view-name collision checks in `register_table`. A source_type is considered configured when: - it matches `get_data_source_type()` (the instance's primary), OR - a non-empty `data_source.` block exists in the effective `instance.yaml` (multi-source instance), OR - it's in `_SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE` (Jira / local — both get data through paths that don't involve `data_source.*`). Returns 422 with a message that names the configured primary source and points at `/admin/server-config` for enabling a secondary one. None / empty source_type is still tolerated for backward compat with legacy CLI scripts that don't set the field — the route resolves it later. 5 new tests cover: keboola-on-bq rejected, bq-on-keboola rejected, matching source_type still works, jira allowed regardless, omitted source_type passes through. Existing tests that registered Keboola rows on the unconfigured default test instance now opt into a `keboola_instance` fixture to satisfy the new validator (tests/test_admin_bq_register.py + .keboola_materialized + .unregister_cleanup; the multi-source PUT test in test_admin_bq_register adds a `keboola` block to its synthetic config). Pre-existing test_missing_project_returns_error failure in TestRebuildFromRegistry is unrelated (config-cache leakage from a previous test in the same class) — confirmed pre-existing on the prior commit via `git stash` reproduction. --- CHANGELOG.md | 9 + app/api/admin.py | 69 +++++++ tests/test_admin_bq_register.py | 46 ++++- tests/test_admin_keboola_materialized.py | 26 +++ ...t_admin_register_source_type_validation.py | 188 ++++++++++++++++++ tests/test_admin_unregister_cleanup.py | 29 ++- 6 files changed, 359 insertions(+), 8 deletions(-) create mode 100644 tests/test_admin_register_source_type_validation.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d4d2e2a..7ece257 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,15 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C into scheduler stderr. A row that recovers on the next tick clears the error automatically (the success path of `update_sync` resets `status='ok'` / `error=NULL` on the upsert). +- **admin API**: `POST /api/admin/register-table` now refuses requests whose + `source_type` isn't actually configured on the instance — pre-fix, an + admin could register `source_type='keboola'` on a BQ-only instance and + the row would land in the registry but never sync (no Keboola URL/token + to ATTACH against). Returns 422 with a message naming the configured + primary source and pointing at `/admin/server-config` for enabling a + secondary source. `jira` / `local` are exempt — they don't sit under + `data_source.*`. Omitted source_type still tolerated for legacy CLI + callers. ## [0.30.0] — 2026-05-01 diff --git a/app/api/admin.py b/app/api/admin.py index 537ff8f..0ac8e41 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -1378,6 +1378,68 @@ def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: req.query_mode = "remote" +# Source types that don't depend on a `data_source..*` block — they +# get their data through a different ingestion path (e.g. Jira via +# webhooks). Registrations against these types are allowed regardless of +# the configured primary `data_source.type`. +_SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE: frozenset[str] = frozenset({ + "jira", + "local", +}) + + +def _validate_source_type_configured(source_type: Optional[str]) -> None: + """Refuse register-table requests whose ``source_type`` isn't actually + configured on this instance. + + Pre-fix the route happily persisted e.g. ``source_type='keboola'`` on a + BQ-only instance — the row landed in the registry but the scheduler had + no Keboola URL/token to ATTACH against, so it silently never synced. + No upfront error, no operator-visible signal until they noticed the + table was missing from `da catalog`. + + A source_type is considered configured when: + + - it matches the instance's primary ``data_source.type``, OR + - a non-empty ``data_source.`` block exists in the + effective `instance.yaml` (multi-source instances),OR + - it's in the small allowlist of types that don't sit under + `data_source.*` at all (Jira, local — see + ``_SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE``). + + A bare/None source_type is tolerated for backward compat with legacy + CLI scripts; the route resolves it later against + ``get_data_source_type()``. + """ + if not source_type: + return + if source_type in _SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE: + return + + from app.instance_config import get_data_source_type, get_value + + configured_primary = get_data_source_type() + if source_type == configured_primary: + return + + # Multi-source: accept if a non-empty `data_source.` block + # exists. Empty dict / None / "" all count as "not configured". + secondary_block = get_value("data_source", source_type, default=None) + if secondary_block: + # Truthy non-empty dict / mapping / scalar — treat as configured. + return + + raise HTTPException( + status_code=422, + detail=( + f"source_type={source_type!r} is not configured on this instance. " + f"The configured data source is {configured_primary!r}. To enable " + f"a secondary source, set data_source.{source_type}.* fields in " + "instance.yaml or via /admin/server-config." + ), + ) + + class UpdateTableRequest(BaseModel): name: Optional[str] = None sync_strategy: Optional[str] = Field( @@ -1886,6 +1948,13 @@ def register_table( detail=f"View name '{request.name}' is already in use by table id '{existing_by_name.get('id')}'", ) + # Refuse rows whose source_type isn't actually configured — pre-fix the + # row landed in the registry but never synced because there was no + # Keboola URL/token (or BQ project) to ATTACH against. Surfaces the + # misconfig at registration time so the operator sees the gap before + # they wonder why `da catalog` is missing the table. + _validate_source_type_configured(request.source_type) + # BQ rows go through the extra validation + post-insert materialization # contract from issue #108. Other source types keep the legacy insert-only # flow — Keboola materialization happens via the scheduled sync, Jira via diff --git a/tests/test_admin_bq_register.py b/tests/test_admin_bq_register.py index 87528c7..c862f31 100644 --- a/tests/test_admin_bq_register.py +++ b/tests/test_admin_bq_register.py @@ -69,6 +69,32 @@ def bq_instance(monkeypatch): reset_cache() +@pytest.fixture +def keboola_instance(monkeypatch): + """Mirror of bq_instance for Keboola — required by tests that POST + `source_type='keboola'` payloads. The new register-table source-type + availability validator (see _validate_source_type_configured) refuses + Keboola registrations on the default unconfigured test instance, + which `get_data_source_type()` resolves to 'local'.""" + fake_cfg = { + "data_source": { + "type": "keboola", + "keboola": { + "stack_url": "https://connection.keboola.com", + "project_id": "1234", + "token_env": "KEBOOLA_STORAGE_TOKEN", + }, + }, + } + monkeypatch.setattr( + "app.instance_config.load_instance_config", lambda: fake_cfg, raising=False, + ) + from app.instance_config import reset_cache + reset_cache() + yield fake_cfg + reset_cache() + + @pytest.fixture def stub_bq_extractor(monkeypatch): """Replace rebuild_from_registry + SyncOrchestrator.rebuild with mocks @@ -508,7 +534,7 @@ class TestRegistryAuditLog: from src.repositories.audit import AuditRepository return AuditRepository(conn).query(action=action, limit=10) - def test_register_keboola_writes_audit_entry(self, seeded_app): + def test_register_keboola_writes_audit_entry(self, seeded_app, keboola_instance): c = seeded_app["client"] token = seeded_app["admin_token"] resp = c.post( @@ -566,7 +592,7 @@ class TestRegistryAuditLog: assert out["primary_key"] == ["id"] # whitelisted assert out["description"] == "raw description stays raw" - def test_update_writes_audit_entry(self, seeded_app): + def test_update_writes_audit_entry(self, seeded_app, keboola_instance): c = seeded_app["client"] token = seeded_app["admin_token"] c.post( @@ -589,7 +615,7 @@ class TestRegistryAuditLog: conn.close() assert any(r["resource"] == "kb_upd" for r in rows) - def test_unregister_writes_audit_entry(self, seeded_app): + def test_unregister_writes_audit_entry(self, seeded_app, keboola_instance): c = seeded_app["client"] token = seeded_app["admin_token"] c.post( @@ -906,7 +932,7 @@ class TestKeboolaRegisterStatusCode: its decorator — each branch returns its own. Keboola (non-BQ) must still explicitly return 201 with the registered-row body.""" - def test_keboola_register_returns_201(self, seeded_app): + def test_keboola_register_returns_201(self, seeded_app, keboola_instance): c = seeded_app["client"] token = seeded_app["admin_token"] resp = c.post( @@ -936,13 +962,21 @@ class TestUpdateTableBigQueryValidation: ): from app.instance_config import reset_cache # Set a malformed project_id in instance.yaml so the BQ validator - # rejects the merged row at PUT time. + # rejects the merged row at PUT time. Configure both `bigquery` + # AND `keboola` blocks so the test's initial Keboola register + # passes the source_type-availability validator (multi-source + # instance shape — see _validate_source_type_configured). monkeypatch.setattr( "app.instance_config.load_instance_config", lambda: { "data_source": { "type": "bigquery", "bigquery": {"project": "Bad Project With Spaces"}, + "keboola": { + "stack_url": "https://connection.keboola.com", + "project_id": "1234", + "token_env": "KEBOOLA_STORAGE_TOKEN", + }, } }, raising=False, @@ -1003,7 +1037,7 @@ class TestUpdateTableBigQueryValidation: ) assert resp.status_code == 400, resp.text - def test_put_preserves_registered_at_across_edits(self, seeded_app): + def test_put_preserves_registered_at_across_edits(self, seeded_app, keboola_instance): """Issue #130 — PUT /api/admin/registry/{id} must NOT reset registered_at on every edit. The original timestamp from the initial register call must survive subsequent PUTs.""" diff --git a/tests/test_admin_keboola_materialized.py b/tests/test_admin_keboola_materialized.py index 62fab26..5dd3a09 100644 --- a/tests/test_admin_keboola_materialized.py +++ b/tests/test_admin_keboola_materialized.py @@ -2,6 +2,32 @@ import pytest +@pytest.fixture(autouse=True) +def _keboola_instance(monkeypatch): + """Configure the test instance with a Keboola data source so the new + register-table source_type-availability validator (introduced in this + PR) accepts `source_type='keboola'` payloads. Pre-validator the test + suite passed without any data_source config because the route blindly + persisted whatever source_type the caller sent.""" + fake_cfg = { + "data_source": { + "type": "keboola", + "keboola": { + "stack_url": "https://connection.keboola.com", + "project_id": "1234", + "token_env": "KEBOOLA_STORAGE_TOKEN", + }, + }, + } + monkeypatch.setattr( + "app.instance_config.load_instance_config", lambda: fake_cfg, raising=False, + ) + from app.instance_config import reset_cache + reset_cache() + yield + reset_cache() + + def test_register_keboola_materialized_accepts_source_query(seeded_app): c = seeded_app["client"] token = seeded_app["admin_token"] diff --git a/tests/test_admin_register_source_type_validation.py b/tests/test_admin_register_source_type_validation.py new file mode 100644 index 0000000..e6328dd --- /dev/null +++ b/tests/test_admin_register_source_type_validation.py @@ -0,0 +1,188 @@ +"""POST /api/admin/register-table validates that the requested source_type +is actually configured on this instance — otherwise the row would never +sync (no Keboola URL/token to ATTACH against, or no BigQuery project). + +E2E sub-agent finding 2026-05-01: instance configured with +`data_source.type='bigquery'` and no `data_source.keboola.*` block. Admin +POSTs `{source_type: 'keboola'}` → returns 201, row lands in registry but +never syncs. No upfront validation surfaces the misconfig. +""" +from __future__ import annotations + +import pytest + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def bq_only_instance(monkeypatch): + """Instance configured ONLY for BigQuery — no keboola block.""" + fake_cfg = { + "data_source": { + "type": "bigquery", + "bigquery": {"project": "my-test-project", "location": "us"}, + }, + } + monkeypatch.setattr( + "app.instance_config.load_instance_config", lambda: fake_cfg, raising=False, + ) + monkeypatch.setattr( + "config.loader.load_instance_config", lambda: fake_cfg, raising=False, + ) + from app.instance_config import reset_cache + reset_cache() + yield fake_cfg + reset_cache() + + +@pytest.fixture +def keboola_only_instance(monkeypatch): + """Instance configured ONLY for Keboola — no bigquery block.""" + fake_cfg = { + "data_source": { + "type": "keboola", + "keboola": { + "stack_url": "https://connection.keboola.com", + "project_id": "1234", + "token_env": "KEBOOLA_STORAGE_TOKEN", + }, + }, + } + monkeypatch.setattr( + "app.instance_config.load_instance_config", lambda: fake_cfg, raising=False, + ) + monkeypatch.setattr( + "config.loader.load_instance_config", lambda: fake_cfg, raising=False, + ) + from app.instance_config import reset_cache + reset_cache() + yield fake_cfg + reset_cache() + + +def test_register_keboola_on_bq_only_instance_rejected(seeded_app, bq_only_instance): + """source_type='keboola' against a BQ-only instance must 422 with a + message pointing the operator at /admin/server-config to enable the + secondary source. Without this, the row lands in the registry and + never syncs because there's no Keboola URL/token to ATTACH.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + r = c.post( + "/api/admin/register-table", + json={ + "name": "should_reject", + "source_type": "keboola", + "bucket": "in.c-main", + "source_table": "events", + "query_mode": "local", + }, + headers=_auth(token), + ) + assert r.status_code == 422, r.json() + detail = str(r.json().get("detail", "")).lower() + assert "not configured" in detail or "not enabled" in detail + assert "keboola" in detail + assert "bigquery" in detail # message names the actually-configured source + + +def test_register_bq_on_keboola_only_instance_rejected(seeded_app, keboola_only_instance): + """Symmetric: source_type='bigquery' on a Keboola-only instance + rejects.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + r = c.post( + "/api/admin/register-table", + json={ + "name": "should_reject_bq", + "source_type": "bigquery", + "bucket": "analytics", + "source_table": "orders", + "query_mode": "remote", + }, + headers=_auth(token), + ) + assert r.status_code == 422, r.json() + detail = str(r.json().get("detail", "")).lower() + assert "not configured" in detail + assert "bigquery" in detail + + +def test_register_matching_source_type_succeeds(seeded_app, bq_only_instance): + """Sanity: BQ row on a BQ instance still works — the new validation + only refuses MISmatches.""" + from unittest.mock import MagicMock + import pytest as _pt + + # Stub the BQ rebuild to keep test offline. + from connectors.bigquery import extractor as _bq + _orig = _bq.rebuild_from_registry + _bq.rebuild_from_registry = MagicMock(return_value={ + "project_id": "my-test-project", "tables_registered": 1, + "errors": [], "skipped": False, + }) + from src import orchestrator as _orch + _orig_orch = _orch.SyncOrchestrator + _orch.SyncOrchestrator = lambda *a, **kw: MagicMock() + try: + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/admin/register-table", + json={ + "name": "matching_bq", + "source_type": "bigquery", + "bucket": "analytics", + "source_table": "orders", + "query_mode": "remote", + }, + headers=_auth(token), + ) + # 200 (sync materialize) or 202 (timeout). Both are success codes here. + assert r.status_code in (200, 202), r.json() + finally: + _bq.rebuild_from_registry = _orig + _orch.SyncOrchestrator = _orig_orch + + +def test_register_jira_does_not_require_data_source_match(seeded_app, bq_only_instance): + """Jira rows are ingested via webhooks, not via `data_source.*` config. + They should be allowed regardless of the configured `data_source.type` + so a BQ-primary instance can still receive Jira webhooks.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + r = c.post( + "/api/admin/register-table", + json={ + "name": "jira_issues", + "source_type": "jira", + "query_mode": "local", + }, + headers=_auth(token), + ) + # Jira goes through the insert-only branch and returns 201. + assert r.status_code == 201, r.json() + + +def test_register_omitted_source_type_passes_through(seeded_app, bq_only_instance): + """Backwards compat: callers that don't set source_type (legacy CLI + scripts) must still succeed — the route resolves source_type later + against `get_data_source_type()`. The new validator only refuses + EXPLICIT mismatches.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + r = c.post( + "/api/admin/register-table", + json={ + "name": "legacy_caller", + # source_type omitted entirely + "query_mode": "local", + }, + headers=_auth(token), + ) + assert r.status_code == 201, r.json() diff --git a/tests/test_admin_unregister_cleanup.py b/tests/test_admin_unregister_cleanup.py index 1d44a42..4b5ffd4 100644 --- a/tests/test_admin_unregister_cleanup.py +++ b/tests/test_admin_unregister_cleanup.py @@ -60,6 +60,31 @@ def stub_bq_extractor(monkeypatch): return rebuild_mock +@pytest.fixture +def keboola_instance(monkeypatch): + """Keboola instance fixture — required by tests that POST + `source_type='keboola'` payloads. The register-table source-type + availability validator refuses Keboola registrations on the default + unconfigured test instance.""" + fake_cfg = { + "data_source": { + "type": "keboola", + "keboola": { + "stack_url": "https://connection.keboola.com", + "project_id": "1234", + "token_env": "KEBOOLA_STORAGE_TOKEN", + }, + }, + } + monkeypatch.setattr( + "app.instance_config.load_instance_config", lambda: fake_cfg, raising=False, + ) + from app.instance_config import reset_cache + reset_cache() + yield fake_cfg + reset_cache() + + def test_delete_materialized_bq_row_removes_parquet( seeded_app, bq_instance, stub_bq_extractor, ): @@ -101,7 +126,7 @@ def test_delete_materialized_bq_row_removes_parquet( assert not tmp_path.exists(), "DELETE should also clean up stale .tmp file" -def test_delete_materialized_keboola_row_removes_parquet(seeded_app): +def test_delete_materialized_keboola_row_removes_parquet(seeded_app, keboola_instance): """Same contract for Keboola materialized rows — the canonical path is /data/extracts/keboola/data/.parquet.""" c = seeded_app["client"] @@ -157,7 +182,7 @@ def test_delete_remote_bq_row_does_not_touch_data_dir( assert r2.status_code == 204 -def test_delete_clears_sync_state_for_materialized_row(seeded_app): +def test_delete_clears_sync_state_for_materialized_row(seeded_app, keboola_instance): """DELETE must also clear the sync_state row so the manifest stops advertising the dropped table to `da sync`.""" c = seeded_app["client"]