From 9c2bd3ff25fd6caf5674d8d4a11c557a2ec5144d Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sun, 12 Apr 2026 11:13:24 +0200 Subject: [PATCH] test: add 132 API gap tests across 8 endpoint modules Covers upload (sessions, artifacts, local-md), scripts (deploy/run/delete), settings (get/dataset), memory (CRUD, voting, admin governance), access-requests (create, approve, deny), permissions (grant/revoke/list), metadata (get/save/push), and admin configure+registry endpoints. Each file tests happy path, auth required (401), role enforcement (403), and input validation (422) independently using the seeded_app fixture. --- tests/test_access_requests_api.py | 204 ++++++++++++++++++++ tests/test_admin_configure_api.py | 264 +++++++++++++++++++++++++ tests/test_memory_api.py | 308 ++++++++++++++++++++++++++++++ tests/test_metadata_api.py | 161 ++++++++++++++++ tests/test_permissions_api.py | 168 ++++++++++++++++ tests/test_scripts_api.py | 185 ++++++++++++++++++ tests/test_settings_api.py | 126 ++++++++++++ tests/test_upload_api.py | 185 ++++++++++++++++++ 8 files changed, 1601 insertions(+) create mode 100644 tests/test_access_requests_api.py create mode 100644 tests/test_admin_configure_api.py create mode 100644 tests/test_memory_api.py create mode 100644 tests/test_metadata_api.py create mode 100644 tests/test_permissions_api.py create mode 100644 tests/test_scripts_api.py create mode 100644 tests/test_settings_api.py create mode 100644 tests/test_upload_api.py diff --git a/tests/test_access_requests_api.py b/tests/test_access_requests_api.py new file mode 100644 index 0000000..b3007ad --- /dev/null +++ b/tests/test_access_requests_api.py @@ -0,0 +1,204 @@ +"""Tests for access requests API — create, list, approve, deny.""" + +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestAccessRequestCreate: + def test_create_request(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/access-requests", + json={"table_id": "orders", "reason": "Need for analysis"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + data = resp.json() + assert "id" in data + assert data["status"] == "pending" + assert data["table_id"] == "orders" + + def test_create_request_without_reason(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/access-requests", + json={"table_id": "customers"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + + def test_create_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/access-requests", + json={"table_id": "orders", "reason": "Need access"}, + ) + assert resp.status_code == 401 + + def test_create_missing_table_id_returns_422(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/access-requests", + json={"reason": "No table_id"}, + headers=_auth(token), + ) + assert resp.status_code == 422 + + def test_duplicate_pending_request_returns_409(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + + # First request + resp1 = c.post( + "/api/access-requests", + json={"table_id": "invoices", "reason": "First request"}, + headers=_auth(token), + ) + assert resp1.status_code == 201 + + # Duplicate request for the same table + resp2 = c.post( + "/api/access-requests", + json={"table_id": "invoices", "reason": "Duplicate"}, + headers=_auth(token), + ) + assert resp2.status_code == 409 + + +class TestAccessRequestMyRequests: + def test_list_my_requests_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/access-requests/my", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert "requests" in data + assert isinstance(data["requests"], list) + + def test_list_my_requests_after_create(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + + c.post( + "/api/access-requests", + json={"table_id": "my_table", "reason": "for analysis"}, + headers=_auth(token), + ) + + resp = c.get("/api/access-requests/my", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert len(data["requests"]) >= 1 + table_ids = [r["table_id"] for r in data["requests"]] + assert "my_table" in table_ids + + def test_my_requests_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/access-requests/my") + assert resp.status_code == 401 + + +class TestAccessRequestPending: + def test_list_pending_as_admin(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + + # Create a request + c.post( + "/api/access-requests", + json={"table_id": "secret_table", "reason": "Need access"}, + headers=_auth(analyst_token), + ) + + resp = c.get("/api/access-requests/pending", headers=_auth(admin_token)) + assert resp.status_code == 200 + data = resp.json() + assert "requests" in data + assert "count" in data + + def test_list_pending_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/access-requests/pending", headers=_auth(token)) + assert resp.status_code == 403 + + def test_list_pending_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/access-requests/pending") + assert resp.status_code == 401 + + +class TestAccessRequestApproveAndDeny: + def _create_request(self, c, analyst_token, table_id="test_table"): + resp = c.post( + "/api/access-requests", + json={"table_id": table_id, "reason": "Test"}, + headers=_auth(analyst_token), + ) + assert resp.status_code == 201 + return resp.json()["id"] + + def test_approve_request_as_admin(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + + req_id = self._create_request(c, analyst_token, "approve_table") + + resp = c.post(f"/api/access-requests/{req_id}/approve", headers=_auth(admin_token)) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "approved" + assert data["id"] == req_id + + def test_approve_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + + req_id = self._create_request(c, analyst_token, "approve_table2") + + resp = c.post(f"/api/access-requests/{req_id}/approve", headers=_auth(analyst_token)) + assert resp.status_code == 403 + + def test_deny_request_as_admin(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + + req_id = self._create_request(c, analyst_token, "deny_table") + + resp = c.post(f"/api/access-requests/{req_id}/deny", headers=_auth(admin_token)) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "denied" + assert data["id"] == req_id + + def test_deny_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + + req_id = self._create_request(c, analyst_token, "deny_table2") + + resp = c.post(f"/api/access-requests/{req_id}/deny", headers=_auth(analyst_token)) + assert resp.status_code == 403 + + def test_approve_nonexistent_request_returns_404(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + resp = c.post("/api/access-requests/nonexistent-id/approve", headers=_auth(admin_token)) + assert resp.status_code == 404 + + def test_deny_nonexistent_request_returns_404(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + resp = c.post("/api/access-requests/nonexistent-id/deny", headers=_auth(admin_token)) + assert resp.status_code == 404 diff --git a/tests/test_admin_configure_api.py b/tests/test_admin_configure_api.py new file mode 100644 index 0000000..2706760 --- /dev/null +++ b/tests/test_admin_configure_api.py @@ -0,0 +1,264 @@ +"""Tests for admin configure and registry API endpoints.""" + +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestAdminConfigure: + def test_configure_local_source(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/configure", + json={"data_source": "local"}, + headers=_auth(token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["data_source"] == "local" + + def test_configure_invalid_source_type_returns_400(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/configure", + json={"data_source": "invalid_source"}, + headers=_auth(token), + ) + assert resp.status_code == 400 + assert "data_source" in resp.json()["detail"].lower() or "must be" in resp.json()["detail"] + + def test_configure_requires_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/admin/configure", + json={"data_source": "local"}, + headers=_auth(token), + ) + assert resp.status_code == 403 + + def test_configure_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/admin/configure", + json={"data_source": "local"}, + ) + assert resp.status_code == 401 + + def test_configure_bigquery_missing_project_returns_400(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/configure", + json={"data_source": "bigquery"}, # missing bigquery_project + headers=_auth(token), + ) + assert resp.status_code == 400 + + def test_configure_bigquery_with_project(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/configure", + json={"data_source": "bigquery", "bigquery_project": "my-project"}, + headers=_auth(token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["data_source"] == "bigquery" + + def test_configure_missing_data_source_returns_422(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/configure", + json={}, # missing data_source entirely + headers=_auth(token), + ) + assert resp.status_code == 422 + + +class TestAdminRegistry: + def test_list_registry_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/admin/registry", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert "tables" in data + assert "count" in data + assert data["count"] == 0 + + def test_list_registry_requires_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/admin/registry", headers=_auth(token)) + assert resp.status_code == 403 + + def test_list_registry_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/admin/registry") + assert resp.status_code == 401 + + +class TestRegisterTable: + def test_register_table_success(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/register-table", + json={"name": "orders", "source_type": "keboola", "bucket": "in.c-crm", + "source_table": "orders", "query_mode": "local"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + data = resp.json() + assert data["id"] == "orders" + assert data["name"] == "orders" + assert data["status"] == "registered" + + def test_register_table_appears_in_registry(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + c.post( + "/api/admin/register-table", + json={"name": "customers", "source_type": "keboola"}, + headers=_auth(token), + ) + + resp = c.get("/api/admin/registry", headers=_auth(token)) + assert resp.status_code == 200 + names = [t["name"] for t in resp.json()["tables"]] + assert "customers" in names + + def test_register_duplicate_returns_409(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Register once + c.post( + "/api/admin/register-table", + json={"name": "dup_table"}, + headers=_auth(token), + ) + + # Register again + resp = c.post( + "/api/admin/register-table", + json={"name": "dup_table"}, + headers=_auth(token), + ) + assert resp.status_code == 409 + + def test_register_requires_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/admin/register-table", + json={"name": "new_table"}, + headers=_auth(token), + ) + assert resp.status_code == 403 + + def test_register_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/admin/register-table", + json={"name": "new_table"}, + ) + assert resp.status_code == 401 + + def test_register_table_with_all_fields(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/register-table", + json={ + "name": "full_table", + "source_type": "keboola", + "bucket": "in.c-crm", + "source_table": "full_table", + "query_mode": "local", + "sync_schedule": "0 6 * * *", + "description": "Full configuration table", + "profile_after_sync": True, + }, + headers=_auth(token), + ) + assert resp.status_code == 201 + + +class TestDeleteRegistryTable: + def test_delete_registered_table(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Register + c.post( + "/api/admin/register-table", + json={"name": "to_delete"}, + headers=_auth(token), + ) + + # Delete + resp = c.delete("/api/admin/registry/to_delete", headers=_auth(token)) + assert resp.status_code == 204 + + # Verify gone from registry + list_resp = c.get("/api/admin/registry", headers=_auth(token)) + names = [t["name"] for t in list_resp.json()["tables"]] + assert "to_delete" not in names + + def test_delete_nonexistent_table_returns_404(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.delete("/api/admin/registry/nonexistent_table", headers=_auth(token)) + assert resp.status_code == 404 + + def test_delete_requires_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.delete("/api/admin/registry/some_table", headers=_auth(token)) + assert resp.status_code == 403 + + def test_delete_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.delete("/api/admin/registry/some_table") + assert resp.status_code == 401 + + +class TestDiscoverAndRegister: + def test_discover_and_register_requires_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post("/api/admin/discover-and-register", headers=_auth(token)) + assert resp.status_code == 403 + + def test_discover_and_register_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/admin/discover-and-register") + assert resp.status_code == 401 + + def test_discover_and_register_non_keboola_returns_zero(self, seeded_app): + """With no keboola config, discover-and-register returns 0 registered tables.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Configure as local (non-keboola) + c.post( + "/api/admin/configure", + json={"data_source": "local"}, + headers=_auth(token), + ) + + resp = c.post("/api/admin/discover-and-register", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["registered"] == 0 + assert data["source"] != "keboola" diff --git a/tests/test_memory_api.py b/tests/test_memory_api.py new file mode 100644 index 0000000..ab4988a --- /dev/null +++ b/tests/test_memory_api.py @@ -0,0 +1,308 @@ +"""Tests for corporate memory API — knowledge items, voting, governance.""" + +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestMemoryCreate: + def test_create_knowledge_item(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/memory", + json={"title": "Best Practice", "content": "Always document your code.", "category": "engineering"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + data = resp.json() + assert "id" in data + assert data["status"] == "pending" + + def test_create_with_tags(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/memory", + json={ + "title": "Tagged Item", + "content": "Content here", + "category": "process", + "tags": ["tag1", "tag2"], + }, + headers=_auth(token), + ) + assert resp.status_code == 201 + assert "id" in resp.json() + + def test_create_missing_title_returns_422(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/memory", + json={"content": "No title", "category": "engineering"}, + headers=_auth(token), + ) + assert resp.status_code == 422 + + def test_create_missing_content_returns_422(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/memory", + json={"title": "No content", "category": "engineering"}, + headers=_auth(token), + ) + assert resp.status_code == 422 + + def test_create_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/memory", + json={"title": "Test", "content": "Content", "category": "engineering"}, + ) + assert resp.status_code == 401 + + +class TestMemoryList: + def _create_item(self, c, token, title="Test Item", category="engineering"): + resp = c.post( + "/api/memory", + json={"title": title, "content": f"Content for {title}", "category": category}, + headers=_auth(token), + ) + assert resp.status_code == 201 + return resp.json()["id"] + + def test_list_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/memory", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert "items" in data + assert "count" in data + + def test_list_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/memory") + assert resp.status_code == 401 + + def test_list_pagination(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Create 3 items + for i in range(3): + self._create_item(c, token, title=f"Item {i}") + + # Page 1 with per_page=2 + resp = c.get("/api/memory?page=1&per_page=2", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["per_page"] == 2 + assert data["page"] == 1 + assert len(data["items"]) <= 2 + + def test_list_search(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + self._create_item(c, token, title="Unique Keyword SearchTarget") + self._create_item(c, token, title="Another Item") + + resp = c.get("/api/memory?search=SearchTarget", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["count"] >= 1 + titles = [item["title"] for item in data["items"]] + assert any("SearchTarget" in t for t in titles) + + +class TestMemoryStats: + def test_get_stats(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/memory/stats", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert "total" in data + assert "by_status" in data + assert "categories" in data + + def test_get_stats_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/memory/stats") + assert resp.status_code == 401 + + +class TestMemoryVote: + def _create_item(self, c, token): + resp = c.post( + "/api/memory", + json={"title": "Voteable", "content": "vote me", "category": "process"}, + headers=_auth(token), + ) + return resp.json()["id"] + + def test_vote_upvote(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + item_id = self._create_item(c, token) + + resp = c.post(f"/api/memory/{item_id}/vote", json={"vote": 1}, headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["upvotes"] >= 1 + + def test_vote_downvote(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + item_id = self._create_item(c, token) + + resp = c.post(f"/api/memory/{item_id}/vote", json={"vote": -1}, headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["downvotes"] >= 1 + + def test_vote_invalid_value_returns_400(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + item_id = self._create_item(c, token) + + resp = c.post(f"/api/memory/{item_id}/vote", json={"vote": 5}, headers=_auth(token)) + assert resp.status_code == 400 + + def test_vote_nonexistent_item_returns_404(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post("/api/memory/nonexistent-id/vote", json={"vote": 1}, headers=_auth(token)) + assert resp.status_code == 404 + + def test_vote_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/memory/some-id/vote", json={"vote": 1}) + assert resp.status_code == 401 + + +class TestMemoryMyVotes: + def test_get_my_votes_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/memory/my-votes", headers=_auth(token)) + assert resp.status_code == 200 + assert isinstance(resp.json(), dict) + + def test_get_my_votes_after_voting(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Create and vote + item_resp = c.post( + "/api/memory", + json={"title": "My Vote Item", "content": "content", "category": "engineering"}, + headers=_auth(token), + ) + item_id = item_resp.json()["id"] + c.post(f"/api/memory/{item_id}/vote", json={"vote": 1}, headers=_auth(token)) + + # Check my-votes + resp = c.get("/api/memory/my-votes", headers=_auth(token)) + assert resp.status_code == 200 + votes = resp.json() + assert item_id in votes + assert votes[item_id] == 1 + + def test_my_votes_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/memory/my-votes") + assert resp.status_code == 401 + + +class TestMemoryAdminEndpoints: + def _create_item(self, c, token): + resp = c.post( + "/api/memory", + json={"title": "Admin Test", "content": "content", "category": "policy"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + return resp.json()["id"] + + def test_admin_approve(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + item_id = self._create_item(c, admin_token) + + resp = c.post(f"/api/memory/admin/approve?item_id={item_id}", headers=_auth(admin_token)) + assert resp.status_code == 200 + assert resp.json()["status"] == "approved" + + def test_admin_reject(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + item_id = self._create_item(c, admin_token) + + resp = c.post( + f"/api/memory/admin/reject?item_id={item_id}", + json={"reason": "not relevant"}, + headers=_auth(admin_token), + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "rejected" + + def test_admin_mandate(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + item_id = self._create_item(c, admin_token) + + resp = c.post( + f"/api/memory/admin/mandate?item_id={item_id}", + json={"reason": "company policy", "audience": "all"}, + headers=_auth(admin_token), + ) + assert resp.status_code == 200 + assert resp.json()["status"] == "mandatory" + + def test_admin_approve_analyst_gets_403(self, seeded_app): + """Analyst cannot use admin governance endpoints.""" + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + item_id = self._create_item(c, admin_token) + + resp = c.post(f"/api/memory/admin/approve?item_id={item_id}", headers=_auth(analyst_token)) + assert resp.status_code == 403 + + def test_admin_reject_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + item_id = self._create_item(c, admin_token) + + resp = c.post( + f"/api/memory/admin/reject?item_id={item_id}", + json={"reason": "nope"}, + headers=_auth(analyst_token), + ) + assert resp.status_code == 403 + + def test_admin_mandate_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + item_id = self._create_item(c, admin_token) + + resp = c.post( + f"/api/memory/admin/mandate?item_id={item_id}", + json={"reason": "policy"}, + headers=_auth(analyst_token), + ) + assert resp.status_code == 403 + + def test_admin_approve_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/memory/admin/approve?item_id=some-id") + assert resp.status_code == 401 diff --git a/tests/test_metadata_api.py b/tests/test_metadata_api.py new file mode 100644 index 0000000..93093e5 --- /dev/null +++ b/tests/test_metadata_api.py @@ -0,0 +1,161 @@ +"""Tests for admin metadata API — column metadata CRUD and push.""" + +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +def _register_table(c, token, table_name="test_table"): + """Helper to register a table in the registry.""" + resp = c.post( + "/api/admin/register-table", + json={"name": table_name, "source_type": "keboola", "bucket": "in.c-test", + "source_table": table_name, "query_mode": "local"}, + headers=_auth(token), + ) + return resp.json().get("id", table_name.lower()) + + +class TestGetMetadata: + def test_get_metadata_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/admin/metadata/some_table", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["table_id"] == "some_table" + assert "columns" in data + assert isinstance(data["columns"], list) + + def test_get_metadata_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/admin/metadata/some_table") + assert resp.status_code == 401 + + def test_get_metadata_analyst_allowed(self, seeded_app): + """GET metadata is allowed for authenticated users (not admin-only).""" + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/admin/metadata/some_table", headers=_auth(token)) + assert resp.status_code == 200 + + +class TestSaveMetadata: + def test_save_column_metadata_as_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + table_id = _register_table(c, token, "orders_meta") + + resp = c.post( + f"/api/admin/metadata/{table_id}", + json={ + "columns": [ + {"column_name": "id", "basetype": "INTEGER", "description": "Primary key", "confidence": "manual"}, + {"column_name": "name", "basetype": "VARCHAR", "description": "Customer name"}, + ] + }, + headers=_auth(token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["count"] == 2 + + def test_save_metadata_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/admin/metadata/some_table", + json={"columns": [{"column_name": "id", "basetype": "INTEGER"}]}, + headers=_auth(token), + ) + assert resp.status_code == 403 + + def test_save_metadata_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/admin/metadata/some_table", + json={"columns": [{"column_name": "id"}]}, + ) + assert resp.status_code == 401 + + def test_save_metadata_missing_columns_returns_422(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/metadata/some_table", + json={}, # missing 'columns' + headers=_auth(token), + ) + assert resp.status_code == 422 + + def test_save_then_get_metadata(self, seeded_app): + """Save metadata then verify it can be retrieved.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + table_id = _register_table(c, token, "round_trip_table") + + c.post( + f"/api/admin/metadata/{table_id}", + json={ + "columns": [ + {"column_name": "amount", "basetype": "DECIMAL", "description": "Order amount"}, + ] + }, + headers=_auth(token), + ) + + resp = c.get(f"/api/admin/metadata/{table_id}", headers=_auth(token)) + assert resp.status_code == 200 + columns = resp.json()["columns"] + assert len(columns) >= 1 + col_names = [c_["column_name"] for c_ in columns] + assert "amount" in col_names + + +class TestPushMetadata: + def test_push_metadata_table_not_found_returns_404(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post("/api/admin/metadata/nonexistent_table/push", headers=_auth(token)) + assert resp.status_code == 404 + + def test_push_metadata_non_keboola_returns_400(self, seeded_app): + """Push is only supported for keboola tables.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Register a bigquery table (non-keboola) + c.post( + "/api/admin/register-table", + json={"name": "bq_table", "source_type": "bigquery", "query_mode": "remote"}, + headers=_auth(token), + ) + + resp = c.post("/api/admin/metadata/bq_table/push", headers=_auth(token)) + assert resp.status_code == 400 + assert "keboola" in resp.json()["detail"].lower() + + def test_push_metadata_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post("/api/admin/metadata/some_table/push", headers=_auth(token)) + assert resp.status_code == 403 + + def test_push_metadata_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/admin/metadata/some_table/push") + assert resp.status_code == 401 + + def test_push_keboola_table_without_env_vars_returns_500(self, seeded_app): + """Keboola table without env vars configured should return 500.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + + table_id = _register_table(c, token, "kbc_push_table") + + # Should fail with 500 because KBC_STACK_URL and KBC_STORAGE_TOKEN are not set + resp = c.post(f"/api/admin/metadata/{table_id}/push", headers=_auth(token)) + assert resp.status_code == 500 diff --git a/tests/test_permissions_api.py b/tests/test_permissions_api.py new file mode 100644 index 0000000..4b79084 --- /dev/null +++ b/tests/test_permissions_api.py @@ -0,0 +1,168 @@ +"""Tests for admin permissions API — grant, revoke, list.""" + +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestGrantPermission: + def test_grant_permission_as_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/permissions", + json={"user_id": "analyst1", "dataset": "sales", "access": "read"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + data = resp.json() + assert data["user_id"] == "analyst1" + assert data["dataset"] == "sales" + assert data["access"] == "read" + + def test_grant_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/admin/permissions", + json={"user_id": "analyst1", "dataset": "sales", "access": "read"}, + headers=_auth(token), + ) + assert resp.status_code == 403 + + def test_grant_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/admin/permissions", + json={"user_id": "analyst1", "dataset": "sales", "access": "read"}, + ) + assert resp.status_code == 401 + + def test_grant_missing_fields_returns_422(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/permissions", + json={"user_id": "analyst1"}, # missing 'dataset' + headers=_auth(token), + ) + assert resp.status_code == 422 + + +class TestRevokePermission: + def _delete_with_json(self, c, url, body, headers=None): + """TestClient.delete() does not support a body — use request() method.""" + import json + h = {"Content-Type": "application/json"} + if headers: + h.update(headers) + return c.request("DELETE", url, content=json.dumps(body), headers=h) + + def test_revoke_permission_as_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Grant first + c.post( + "/api/admin/permissions", + json={"user_id": "analyst1", "dataset": "to_revoke", "access": "read"}, + headers=_auth(token), + ) + + # Then revoke + resp = self._delete_with_json( + c, "/api/admin/permissions", + {"user_id": "analyst1", "dataset": "to_revoke"}, + headers=_auth(token), + ) + assert resp.status_code == 200 + assert resp.json()["revoked"] is True + + def test_revoke_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = self._delete_with_json( + c, "/api/admin/permissions", + {"user_id": "analyst1", "dataset": "sales"}, + headers=_auth(token), + ) + assert resp.status_code == 403 + + def test_revoke_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = self._delete_with_json( + c, "/api/admin/permissions", + {"user_id": "analyst1", "dataset": "sales"}, + ) + assert resp.status_code == 401 + + +class TestListUserPermissions: + def test_list_user_permissions_as_admin(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Grant a permission + c.post( + "/api/admin/permissions", + json={"user_id": "analyst1", "dataset": "finance", "access": "read"}, + headers=_auth(token), + ) + + resp = c.get("/api/admin/permissions/analyst1", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["user_id"] == "analyst1" + assert "permissions" in data + datasets = [p["dataset"] for p in data["permissions"]] + assert "finance" in datasets + + def test_list_user_permissions_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/admin/permissions/analyst1", headers=_auth(token)) + assert resp.status_code == 403 + + def test_list_user_permissions_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/admin/permissions/analyst1") + assert resp.status_code == 401 + + +class TestListAllPermissions: + def test_list_all_permissions_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/admin/permissions", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert "permissions" in data + assert isinstance(data["permissions"], list) + + def test_list_all_permissions_after_grant(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + + c.post( + "/api/admin/permissions", + json={"user_id": "analyst1", "dataset": "all_perms_test", "access": "read"}, + headers=_auth(token), + ) + + resp = c.get("/api/admin/permissions", headers=_auth(token)) + assert resp.status_code == 200 + datasets = [p["dataset"] for p in resp.json()["permissions"]] + assert "all_perms_test" in datasets + + def test_list_all_permissions_analyst_gets_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/admin/permissions", headers=_auth(token)) + assert resp.status_code == 403 + + def test_list_all_permissions_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/admin/permissions") + assert resp.status_code == 401 diff --git a/tests/test_scripts_api.py b/tests/test_scripts_api.py new file mode 100644 index 0000000..79ac002 --- /dev/null +++ b/tests/test_scripts_api.py @@ -0,0 +1,185 @@ +"""Tests for scripts API endpoints — deploy, run, list, undeploy.""" + +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestScriptsList: + def test_list_scripts_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/scripts", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["count"] == 0 + assert data["scripts"] == [] + + def test_list_scripts_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/scripts") + assert resp.status_code == 401 + + +class TestScriptsDeploy: + def test_deploy_safe_script(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/scripts/deploy", + json={"name": "hello", "source": "print('hello world')"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + data = resp.json() + assert data["name"] == "hello" + assert "id" in data + + def test_deploy_with_schedule(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/scripts/deploy", + json={"name": "scheduled", "source": "print('scheduled')", "schedule": "0 8 * * MON"}, + headers=_auth(token), + ) + assert resp.status_code == 201 + data = resp.json() + assert data["schedule"] == "0 8 * * MON" + + def test_deploy_script_with_blocked_import_deploys_ok_but_run_fails(self, seeded_app): + """Deploy stores scripts as-is; safety validation happens at run time, not deploy time.""" + c = seeded_app["client"] + token = seeded_app["analyst_token"] + # Deploy succeeds (no pre-validation at deploy time) + deploy_resp = c.post( + "/api/scripts/deploy", + json={"name": "bad_import", "source": "import os; print(os.getcwd())"}, + headers=_auth(token), + ) + assert deploy_resp.status_code == 201 + script_id = deploy_resp.json()["id"] + + # Running it should fail with 400 due to blocked import + run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token)) + assert run_resp.status_code == 400 + assert "Blocked" in run_resp.json()["detail"] or "disallowed" in run_resp.json()["detail"] + + def test_deploy_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/scripts/deploy", + json={"name": "hello", "source": "print('hello')"}, + ) + assert resp.status_code == 401 + + def test_deploy_appears_in_list(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + c.post( + "/api/scripts/deploy", + json={"name": "listed_script", "source": "x = 1"}, + headers=_auth(token), + ) + resp = c.get("/api/scripts", headers=_auth(token)) + assert resp.json()["count"] >= 1 + names = [s["name"] for s in resp.json()["scripts"]] + assert "listed_script" in names + + +class TestScriptsRun: + def test_run_adhoc_safe_script(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/scripts/run", + json={"source": "print('hello from adhoc')", "name": "adhoc_test"}, + headers=_auth(token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["exit_code"] == 0 + assert "hello from adhoc" in data["stdout"] + + def test_run_adhoc_blocked_os_module(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/scripts/run", + json={"source": "import sys; print(sys.path)", "name": "bad"}, + headers=_auth(token), + ) + assert resp.status_code == 400 + + def test_run_adhoc_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/scripts/run", + json={"source": "print('hello')", "name": "test"}, + ) + assert resp.status_code == 401 + + def test_run_adhoc_no_source_returns_400(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/scripts/run", + json={"name": "no_source"}, + headers=_auth(token), + ) + assert resp.status_code == 400 + + def test_run_deployed_script(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + # Deploy first + deploy_resp = c.post( + "/api/scripts/deploy", + json={"name": "calc", "source": "print(2+2)"}, + headers=_auth(token), + ) + assert deploy_resp.status_code == 201 + script_id = deploy_resp.json()["id"] + + # Run deployed script + resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token)) + assert resp.status_code == 200 + assert "4" in resp.json()["stdout"] + + def test_run_nonexistent_script_returns_404(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post("/api/scripts/nonexistent-id/run", headers=_auth(token)) + assert resp.status_code == 404 + + +class TestScriptsDelete: + def test_undeploy_requires_admin(self, seeded_app): + """Analyst cannot delete scripts — only admin can.""" + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + + # Deploy as analyst + deploy_resp = c.post( + "/api/scripts/deploy", + json={"name": "to_delete", "source": "print('bye')"}, + headers=_auth(analyst_token), + ) + script_id = deploy_resp.json()["id"] + + # Analyst cannot delete + resp = c.delete(f"/api/scripts/{script_id}", headers=_auth(analyst_token)) + assert resp.status_code == 403 + + # Admin can delete + resp = c.delete(f"/api/scripts/{script_id}", headers=_auth(admin_token)) + assert resp.status_code == 204 + + def test_undeploy_nonexistent_returns_404(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.delete("/api/scripts/does-not-exist", headers=_auth(token)) + assert resp.status_code == 404 diff --git a/tests/test_settings_api.py b/tests/test_settings_api.py new file mode 100644 index 0000000..84def4c --- /dev/null +++ b/tests/test_settings_api.py @@ -0,0 +1,126 @@ +"""Tests for user settings API endpoints.""" + +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestSettingsGet: + def test_get_settings_returns_user_id(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/settings", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["user_id"] == "admin1" + assert "sync_settings" in data + assert "permissions" in data + + def test_get_settings_analyst(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.get("/api/settings", headers=_auth(token)) + assert resp.status_code == 200 + data = resp.json() + assert data["user_id"] == "analyst1" + + def test_get_settings_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/settings") + assert resp.status_code == 401 + + def test_get_settings_empty_permissions_for_new_user(self, seeded_app): + """New users have no permissions by default.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.get("/api/settings", headers=_auth(token)) + assert resp.status_code == 200 + # Admin sees their own settings — permissions list should exist (may be empty) + assert isinstance(resp.json()["permissions"], list) + + +class TestSettingsDataset: + def test_update_dataset_setting_with_permission(self, seeded_app): + """Admin granting permission first, then analyst can update the dataset setting.""" + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + analyst_token = seeded_app["analyst_token"] + + # Grant permission to analyst first + c.post( + "/api/admin/permissions", + json={"user_id": "analyst1", "dataset": "sales_data", "access": "read"}, + headers=_auth(admin_token), + ) + + resp = c.put( + "/api/settings/dataset", + json={"dataset": "sales_data", "enabled": True}, + headers=_auth(analyst_token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["dataset"] == "sales_data" + assert data["enabled"] is True + + def test_update_dataset_setting_without_permission_returns_403(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.put( + "/api/settings/dataset", + json={"dataset": "secret_data", "enabled": True}, + headers=_auth(token), + ) + assert resp.status_code == 403 + + def test_update_dataset_setting_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.put( + "/api/settings/dataset", + json={"dataset": "sales_data", "enabled": True}, + ) + assert resp.status_code == 401 + + def test_update_dataset_missing_fields_returns_422(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.put( + "/api/settings/dataset", + json={"dataset": "sales_data"}, # missing 'enabled' + headers=_auth(token), + ) + assert resp.status_code == 422 + + def test_update_without_explicit_permission_returns_403_even_for_admin(self, seeded_app): + """The dataset settings endpoint checks dataset_permissions table — even admin + needs explicit permission to enable/disable a specific dataset via this endpoint.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.put( + "/api/settings/dataset", + json={"dataset": "any_dataset_no_perm", "enabled": False}, + headers=_auth(token), + ) + # The endpoint checks perm_repo.has_access which doesn't have admin bypass + assert resp.status_code == 403 + + def test_disable_dataset_with_permission(self, seeded_app): + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + + # Grant explicit permission to admin for the dataset + c.post( + "/api/admin/permissions", + json={"user_id": "admin1", "dataset": "some_table", "access": "read"}, + headers=_auth(admin_token), + ) + + resp = c.put( + "/api/settings/dataset", + json={"dataset": "some_table", "enabled": False}, + headers=_auth(admin_token), + ) + assert resp.status_code == 200 + assert resp.json()["enabled"] is False diff --git a/tests/test_upload_api.py b/tests/test_upload_api.py new file mode 100644 index 0000000..87ac0d0 --- /dev/null +++ b/tests/test_upload_api.py @@ -0,0 +1,185 @@ +"""Tests for upload API endpoints — sessions, artifacts, local-md.""" + +import io +import pytest + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestUploadSessions: + def test_upload_session_success(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + content = b'{"type": "message", "text": "hello"}\n{"type": "message", "text": "world"}\n' + resp = c.post( + "/api/upload/sessions", + files={"file": ("session.jsonl", io.BytesIO(content), "application/jsonl")}, + headers=_auth(token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["filename"] == "session.jsonl" + assert data["size"] == len(content) + + def test_upload_session_requires_auth(self, seeded_app): + c = seeded_app["client"] + content = b'{"type": "message"}\n' + resp = c.post( + "/api/upload/sessions", + files={"file": ("session.jsonl", io.BytesIO(content), "application/jsonl")}, + ) + assert resp.status_code == 401 + + def test_upload_session_directory_traversal_rejected(self, seeded_app): + """Filenames with ../ should be sanitized — only the basename is kept.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + content = b'{"type": "message"}\n' + resp = c.post( + "/api/upload/sessions", + files={"file": ("../../etc/passwd", io.BytesIO(content), "application/jsonl")}, + headers=_auth(token), + ) + # The upload should succeed, but the path traversal should be stripped + assert resp.status_code == 200 + data = resp.json() + # filename must be just the basename — no slashes, no traversal + assert "/" not in data["filename"] + assert data["filename"] in ("passwd", "etc") + + def test_upload_session_empty_content(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/upload/sessions", + files={"file": ("empty.jsonl", io.BytesIO(b""), "application/jsonl")}, + headers=_auth(token), + ) + assert resp.status_code == 200 + assert resp.json()["size"] == 0 + + def test_upload_session_analyst_allowed(self, seeded_app): + """Analyst users can also upload sessions.""" + c = seeded_app["client"] + token = seeded_app["analyst_token"] + content = b'{"type": "message"}\n' + resp = c.post( + "/api/upload/sessions", + files={"file": ("analyst_session.jsonl", io.BytesIO(content), "application/jsonl")}, + headers=_auth(token), + ) + assert resp.status_code == 200 + + +class TestUploadArtifacts: + def test_upload_html_artifact(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + content = b"Report" + resp = c.post( + "/api/upload/artifacts", + files={"file": ("report.html", io.BytesIO(content), "text/html")}, + headers=_auth(token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["filename"] == "report.html" + assert data["size"] == len(content) + + def test_upload_png_artifact(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + # Minimal valid PNG header + content = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100 + resp = c.post( + "/api/upload/artifacts", + files={"file": ("chart.png", io.BytesIO(content), "image/png")}, + headers=_auth(token), + ) + assert resp.status_code == 200 + assert resp.json()["filename"] == "chart.png" + + def test_upload_artifact_requires_auth(self, seeded_app): + c = seeded_app["client"] + content = b"Report" + resp = c.post( + "/api/upload/artifacts", + files={"file": ("report.html", io.BytesIO(content), "text/html")}, + ) + assert resp.status_code == 401 + + def test_upload_artifact_directory_traversal(self, seeded_app): + """Path traversal in artifact filenames should be sanitized.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + content = b"" + resp = c.post( + "/api/upload/artifacts", + files={"file": ("../../../tmp/evil.html", io.BytesIO(content), "text/html")}, + headers=_auth(token), + ) + assert resp.status_code == 200 + # Must not contain slashes + assert "/" not in resp.json()["filename"] + + def test_upload_artifact_empty(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/upload/artifacts", + files={"file": ("empty.html", io.BytesIO(b""), "text/html")}, + headers=_auth(token), + ) + assert resp.status_code == 200 + assert resp.json()["size"] == 0 + + +class TestUploadLocalMd: + def test_upload_local_md_success(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + content = "# My Notes\n\nSome corporate memory content." + resp = c.post( + "/api/upload/local-md", + json={"content": content}, + headers=_auth(token), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["status"] == "ok" + assert data["user"] == "admin@test.com" + assert data["size"] == len(content) + + def test_upload_local_md_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post( + "/api/upload/local-md", + json={"content": "some content"}, + ) + assert resp.status_code == 401 + + def test_upload_local_md_missing_content_field(self, seeded_app): + """Missing content field should return 422.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/upload/local-md", + json={}, + headers=_auth(token), + ) + assert resp.status_code == 422 + + def test_upload_local_md_analyst(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post( + "/api/upload/local-md", + json={"content": "analyst notes"}, + headers=_auth(token), + ) + assert resp.status_code == 200 + assert resp.json()["user"] == "analyst@test.com"