From 4b4c071959c11c8c736f3dedd26a0d9c2707f292 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 08:33:10 +0200 Subject: [PATCH] fix: async httpx in metadata push, guard access_token, add push test - Replace synchronous httpx.post() with async httpx.AsyncClient in push_metadata_to_source endpoint to avoid blocking the event loop - Guard data["access_token"] in CLI analyst setup with .get() and a clear error message on missing key - Add test_push_non_keboola_table_fails and test_push_keboola_table to TestMetadataAPI, covering 400/404 path and the happy path with mocked async httpx --- app/api/metadata.py | 47 ++++++++++++------------- cli/commands/analyst.py | 5 ++- tests/test_api.py | 76 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 24 deletions(-) diff --git a/app/api/metadata.py b/app/api/metadata.py index c75a3dc..bcec9b5 100644 --- a/app/api/metadata.py +++ b/app/api/metadata.py @@ -97,32 +97,33 @@ async def push_metadata_to_source( pushed = 0 errors = [] - for col in columns: - column_name = col["column_name"] - metadata_payload = [] + async with httpx.AsyncClient() as client: + for col in columns: + column_name = col["column_name"] + metadata_payload = [] - if col.get("basetype"): - metadata_payload.append({"key": "KBC.datatype.basetype", "value": col["basetype"]}) - if col.get("description"): - metadata_payload.append({"key": "KBC.description", "value": col["description"]}) + if col.get("basetype"): + metadata_payload.append({"key": "KBC.datatype.basetype", "value": col["basetype"]}) + if col.get("description"): + metadata_payload.append({"key": "KBC.description", "value": col["description"]}) - if not metadata_payload: - continue + if not metadata_payload: + continue - endpoint = f"{stack_url}/v2/storage/tables/{source_table}/columns/{column_name}/metadata" - try: - resp = httpx.post( - endpoint, - headers={"X-StorageApi-Token": token}, - json={"provider": "ai-metadata-enrichment", "metadata": metadata_payload}, - timeout=30, - ) - if resp.status_code in (200, 201): - pushed += 1 - else: - errors.append(f"{column_name}: {resp.status_code} {resp.text[:200]}") - except httpx.RequestError as e: - errors.append(f"{column_name}: request error — {e}") + endpoint = f"{stack_url}/v2/storage/tables/{source_table}/columns/{column_name}/metadata" + try: + resp = await client.post( + endpoint, + headers={"X-StorageApi-Token": token}, + json={"provider": "ai-metadata-enrichment", "metadata": metadata_payload}, + timeout=30, + ) + if resp.status_code in (200, 201): + pushed += 1 + else: + errors.append(f"{column_name}: {resp.status_code} {resp.text[:200]}") + except httpx.RequestError as e: + errors.append(f"{column_name}: request error — {e}") result = {"status": "ok", "pushed": pushed} if errors: diff --git a/cli/commands/analyst.py b/cli/commands/analyst.py index f62ce28..4534148 100644 --- a/cli/commands/analyst.py +++ b/cli/commands/analyst.py @@ -77,7 +77,10 @@ def _connect_to_instance(server_url: str) -> str: typer.echo(f"Authentication failed: {e}", err=True) raise typer.Exit(1) - token = data["access_token"] + token = data.get("access_token") + if not token: + typer.echo("Authentication failed: server response missing access_token", err=True) + raise typer.Exit(1) role = data.get("role", "analyst") save_config({"server": server_url}) diff --git a/tests/test_api.py b/tests/test_api.py index df016c6..77894f3 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -375,3 +375,79 @@ class TestMetadataAPI: headers={"Authorization": f"Bearer {analyst_token}"}, ) assert resp.status_code == 403 + + def test_push_non_keboola_table_fails(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/admin/metadata/orders/push", + headers={"Authorization": f"Bearer {admin_token}"}, + ) + # 'orders' is not in table_registry — expect 404 or 400 + assert resp.status_code in (400, 404) + + def test_push_keboola_table(self, seeded_client, monkeypatch): + client, admin_token, _ = seeded_client + + # 1. Register a keboola table + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + + conn = get_system_db() + TableRegistryRepository(conn).register( + id="kbc_orders", + name="Orders", + source_type="keboola", + source_table="in.c-main.orders", + ) + conn.close() + + # 2. Save column metadata + client.post( + "/api/admin/metadata/kbc_orders", + json={"columns": [ + {"column_name": "id", "basetype": "STRING", "description": "Order ID"}, + ]}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + + # 3. Set required env vars + monkeypatch.setenv("KBC_STACK_URL", "https://connection.keboola.com") + monkeypatch.setenv("KBC_STORAGE_TOKEN", "test-token") + + # 4. Mock httpx.AsyncClient so no real HTTP call is made + import httpx as _httpx + from unittest.mock import AsyncMock, MagicMock, patch + + mock_response = _httpx.Response(200, json={"ok": True}) + mock_post = AsyncMock(return_value=mock_response) + + # AsyncClient is used as "async with httpx.AsyncClient() as client: await client.post(...)" + # We patch the class so the context-manager instance has our mock_post. + mock_async_client_instance = MagicMock() + mock_async_client_instance.post = mock_post + mock_async_client_instance.__aenter__ = AsyncMock(return_value=mock_async_client_instance) + mock_async_client_instance.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", return_value=mock_async_client_instance): + resp = client.post( + "/api/admin/metadata/kbc_orders/push", + headers={"Authorization": f"Bearer {admin_token}"}, + ) + + # 5. Assertions + assert resp.status_code == 200 + data = resp.json() + assert data.get("pushed") == 1 + + mock_post.assert_called_once() + call_args = mock_post.call_args + # Verify URL contains the source table name + called_url = call_args[0][0] if call_args[0] else call_args.kwargs.get("url", "") + assert "in.c-main.orders" in called_url + # Verify auth header + called_headers = call_args.kwargs.get("headers", {}) + assert called_headers.get("X-StorageApi-Token") == "test-token" + # Verify payload structure + called_json = call_args.kwargs.get("json", {}) + assert called_json.get("provider") == "ai-metadata-enrichment" + assert isinstance(called_json.get("metadata"), list)