Merge branch 'worktree-agent-aaa8db4c' into feature/v2-fastapi-duckdb-docker-cli

This commit is contained in:
ZdenekSrotyr 2026-04-12 11:15:34 +02:00
commit 8e22eed669
8 changed files with 1601 additions and 0 deletions

View file

@ -0,0 +1,204 @@
"""Tests for access requests API — create, list, approve, deny."""
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
class TestAccessRequestCreate:
def test_create_request(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/access-requests",
json={"table_id": "orders", "reason": "Need for analysis"},
headers=_auth(token),
)
assert resp.status_code == 201
data = resp.json()
assert "id" in data
assert data["status"] == "pending"
assert data["table_id"] == "orders"
def test_create_request_without_reason(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/access-requests",
json={"table_id": "customers"},
headers=_auth(token),
)
assert resp.status_code == 201
def test_create_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/access-requests",
json={"table_id": "orders", "reason": "Need access"},
)
assert resp.status_code == 401
def test_create_missing_table_id_returns_422(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/access-requests",
json={"reason": "No table_id"},
headers=_auth(token),
)
assert resp.status_code == 422
def test_duplicate_pending_request_returns_409(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
# First request
resp1 = c.post(
"/api/access-requests",
json={"table_id": "invoices", "reason": "First request"},
headers=_auth(token),
)
assert resp1.status_code == 201
# Duplicate request for the same table
resp2 = c.post(
"/api/access-requests",
json={"table_id": "invoices", "reason": "Duplicate"},
headers=_auth(token),
)
assert resp2.status_code == 409
class TestAccessRequestMyRequests:
def test_list_my_requests_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/access-requests/my", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert "requests" in data
assert isinstance(data["requests"], list)
def test_list_my_requests_after_create(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
c.post(
"/api/access-requests",
json={"table_id": "my_table", "reason": "for analysis"},
headers=_auth(token),
)
resp = c.get("/api/access-requests/my", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert len(data["requests"]) >= 1
table_ids = [r["table_id"] for r in data["requests"]]
assert "my_table" in table_ids
def test_my_requests_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/access-requests/my")
assert resp.status_code == 401
class TestAccessRequestPending:
def test_list_pending_as_admin(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
# Create a request
c.post(
"/api/access-requests",
json={"table_id": "secret_table", "reason": "Need access"},
headers=_auth(analyst_token),
)
resp = c.get("/api/access-requests/pending", headers=_auth(admin_token))
assert resp.status_code == 200
data = resp.json()
assert "requests" in data
assert "count" in data
def test_list_pending_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/access-requests/pending", headers=_auth(token))
assert resp.status_code == 403
def test_list_pending_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/access-requests/pending")
assert resp.status_code == 401
class TestAccessRequestApproveAndDeny:
def _create_request(self, c, analyst_token, table_id="test_table"):
resp = c.post(
"/api/access-requests",
json={"table_id": table_id, "reason": "Test"},
headers=_auth(analyst_token),
)
assert resp.status_code == 201
return resp.json()["id"]
def test_approve_request_as_admin(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
req_id = self._create_request(c, analyst_token, "approve_table")
resp = c.post(f"/api/access-requests/{req_id}/approve", headers=_auth(admin_token))
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "approved"
assert data["id"] == req_id
def test_approve_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
req_id = self._create_request(c, analyst_token, "approve_table2")
resp = c.post(f"/api/access-requests/{req_id}/approve", headers=_auth(analyst_token))
assert resp.status_code == 403
def test_deny_request_as_admin(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
req_id = self._create_request(c, analyst_token, "deny_table")
resp = c.post(f"/api/access-requests/{req_id}/deny", headers=_auth(admin_token))
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "denied"
assert data["id"] == req_id
def test_deny_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
req_id = self._create_request(c, analyst_token, "deny_table2")
resp = c.post(f"/api/access-requests/{req_id}/deny", headers=_auth(analyst_token))
assert resp.status_code == 403
def test_approve_nonexistent_request_returns_404(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
resp = c.post("/api/access-requests/nonexistent-id/approve", headers=_auth(admin_token))
assert resp.status_code == 404
def test_deny_nonexistent_request_returns_404(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
resp = c.post("/api/access-requests/nonexistent-id/deny", headers=_auth(admin_token))
assert resp.status_code == 404

View file

@ -0,0 +1,264 @@
"""Tests for admin configure and registry API endpoints."""
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
class TestAdminConfigure:
def test_configure_local_source(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/configure",
json={"data_source": "local"},
headers=_auth(token),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["data_source"] == "local"
def test_configure_invalid_source_type_returns_400(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/configure",
json={"data_source": "invalid_source"},
headers=_auth(token),
)
assert resp.status_code == 400
assert "data_source" in resp.json()["detail"].lower() or "must be" in resp.json()["detail"]
def test_configure_requires_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/admin/configure",
json={"data_source": "local"},
headers=_auth(token),
)
assert resp.status_code == 403
def test_configure_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/admin/configure",
json={"data_source": "local"},
)
assert resp.status_code == 401
def test_configure_bigquery_missing_project_returns_400(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/configure",
json={"data_source": "bigquery"}, # missing bigquery_project
headers=_auth(token),
)
assert resp.status_code == 400
def test_configure_bigquery_with_project(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/configure",
json={"data_source": "bigquery", "bigquery_project": "my-project"},
headers=_auth(token),
)
assert resp.status_code == 200
data = resp.json()
assert data["data_source"] == "bigquery"
def test_configure_missing_data_source_returns_422(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/configure",
json={}, # missing data_source entirely
headers=_auth(token),
)
assert resp.status_code == 422
class TestAdminRegistry:
def test_list_registry_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/admin/registry", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert "tables" in data
assert "count" in data
assert data["count"] == 0
def test_list_registry_requires_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/admin/registry", headers=_auth(token))
assert resp.status_code == 403
def test_list_registry_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/admin/registry")
assert resp.status_code == 401
class TestRegisterTable:
def test_register_table_success(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/register-table",
json={"name": "orders", "source_type": "keboola", "bucket": "in.c-crm",
"source_table": "orders", "query_mode": "local"},
headers=_auth(token),
)
assert resp.status_code == 201
data = resp.json()
assert data["id"] == "orders"
assert data["name"] == "orders"
assert data["status"] == "registered"
def test_register_table_appears_in_registry(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
c.post(
"/api/admin/register-table",
json={"name": "customers", "source_type": "keboola"},
headers=_auth(token),
)
resp = c.get("/api/admin/registry", headers=_auth(token))
assert resp.status_code == 200
names = [t["name"] for t in resp.json()["tables"]]
assert "customers" in names
def test_register_duplicate_returns_409(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Register once
c.post(
"/api/admin/register-table",
json={"name": "dup_table"},
headers=_auth(token),
)
# Register again
resp = c.post(
"/api/admin/register-table",
json={"name": "dup_table"},
headers=_auth(token),
)
assert resp.status_code == 409
def test_register_requires_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/admin/register-table",
json={"name": "new_table"},
headers=_auth(token),
)
assert resp.status_code == 403
def test_register_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/admin/register-table",
json={"name": "new_table"},
)
assert resp.status_code == 401
def test_register_table_with_all_fields(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/register-table",
json={
"name": "full_table",
"source_type": "keboola",
"bucket": "in.c-crm",
"source_table": "full_table",
"query_mode": "local",
"sync_schedule": "0 6 * * *",
"description": "Full configuration table",
"profile_after_sync": True,
},
headers=_auth(token),
)
assert resp.status_code == 201
class TestDeleteRegistryTable:
def test_delete_registered_table(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Register
c.post(
"/api/admin/register-table",
json={"name": "to_delete"},
headers=_auth(token),
)
# Delete
resp = c.delete("/api/admin/registry/to_delete", headers=_auth(token))
assert resp.status_code == 204
# Verify gone from registry
list_resp = c.get("/api/admin/registry", headers=_auth(token))
names = [t["name"] for t in list_resp.json()["tables"]]
assert "to_delete" not in names
def test_delete_nonexistent_table_returns_404(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.delete("/api/admin/registry/nonexistent_table", headers=_auth(token))
assert resp.status_code == 404
def test_delete_requires_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.delete("/api/admin/registry/some_table", headers=_auth(token))
assert resp.status_code == 403
def test_delete_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.delete("/api/admin/registry/some_table")
assert resp.status_code == 401
class TestDiscoverAndRegister:
def test_discover_and_register_requires_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post("/api/admin/discover-and-register", headers=_auth(token))
assert resp.status_code == 403
def test_discover_and_register_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post("/api/admin/discover-and-register")
assert resp.status_code == 401
def test_discover_and_register_non_keboola_returns_zero(self, seeded_app):
"""With no keboola config, discover-and-register returns 0 registered tables."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Configure as local (non-keboola)
c.post(
"/api/admin/configure",
json={"data_source": "local"},
headers=_auth(token),
)
resp = c.post("/api/admin/discover-and-register", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["registered"] == 0
assert data["source"] != "keboola"

308
tests/test_memory_api.py Normal file
View file

@ -0,0 +1,308 @@
"""Tests for corporate memory API — knowledge items, voting, governance."""
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
class TestMemoryCreate:
def test_create_knowledge_item(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/memory",
json={"title": "Best Practice", "content": "Always document your code.", "category": "engineering"},
headers=_auth(token),
)
assert resp.status_code == 201
data = resp.json()
assert "id" in data
assert data["status"] == "pending"
def test_create_with_tags(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/memory",
json={
"title": "Tagged Item",
"content": "Content here",
"category": "process",
"tags": ["tag1", "tag2"],
},
headers=_auth(token),
)
assert resp.status_code == 201
assert "id" in resp.json()
def test_create_missing_title_returns_422(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/memory",
json={"content": "No title", "category": "engineering"},
headers=_auth(token),
)
assert resp.status_code == 422
def test_create_missing_content_returns_422(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/memory",
json={"title": "No content", "category": "engineering"},
headers=_auth(token),
)
assert resp.status_code == 422
def test_create_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/memory",
json={"title": "Test", "content": "Content", "category": "engineering"},
)
assert resp.status_code == 401
class TestMemoryList:
def _create_item(self, c, token, title="Test Item", category="engineering"):
resp = c.post(
"/api/memory",
json={"title": title, "content": f"Content for {title}", "category": category},
headers=_auth(token),
)
assert resp.status_code == 201
return resp.json()["id"]
def test_list_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/memory", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert "items" in data
assert "count" in data
def test_list_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/memory")
assert resp.status_code == 401
def test_list_pagination(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Create 3 items
for i in range(3):
self._create_item(c, token, title=f"Item {i}")
# Page 1 with per_page=2
resp = c.get("/api/memory?page=1&per_page=2", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["per_page"] == 2
assert data["page"] == 1
assert len(data["items"]) <= 2
def test_list_search(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
self._create_item(c, token, title="Unique Keyword SearchTarget")
self._create_item(c, token, title="Another Item")
resp = c.get("/api/memory?search=SearchTarget", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["count"] >= 1
titles = [item["title"] for item in data["items"]]
assert any("SearchTarget" in t for t in titles)
class TestMemoryStats:
def test_get_stats(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/memory/stats", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert "total" in data
assert "by_status" in data
assert "categories" in data
def test_get_stats_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/memory/stats")
assert resp.status_code == 401
class TestMemoryVote:
def _create_item(self, c, token):
resp = c.post(
"/api/memory",
json={"title": "Voteable", "content": "vote me", "category": "process"},
headers=_auth(token),
)
return resp.json()["id"]
def test_vote_upvote(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
item_id = self._create_item(c, token)
resp = c.post(f"/api/memory/{item_id}/vote", json={"vote": 1}, headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["upvotes"] >= 1
def test_vote_downvote(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
item_id = self._create_item(c, token)
resp = c.post(f"/api/memory/{item_id}/vote", json={"vote": -1}, headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["downvotes"] >= 1
def test_vote_invalid_value_returns_400(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
item_id = self._create_item(c, token)
resp = c.post(f"/api/memory/{item_id}/vote", json={"vote": 5}, headers=_auth(token))
assert resp.status_code == 400
def test_vote_nonexistent_item_returns_404(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post("/api/memory/nonexistent-id/vote", json={"vote": 1}, headers=_auth(token))
assert resp.status_code == 404
def test_vote_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post("/api/memory/some-id/vote", json={"vote": 1})
assert resp.status_code == 401
class TestMemoryMyVotes:
def test_get_my_votes_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/memory/my-votes", headers=_auth(token))
assert resp.status_code == 200
assert isinstance(resp.json(), dict)
def test_get_my_votes_after_voting(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Create and vote
item_resp = c.post(
"/api/memory",
json={"title": "My Vote Item", "content": "content", "category": "engineering"},
headers=_auth(token),
)
item_id = item_resp.json()["id"]
c.post(f"/api/memory/{item_id}/vote", json={"vote": 1}, headers=_auth(token))
# Check my-votes
resp = c.get("/api/memory/my-votes", headers=_auth(token))
assert resp.status_code == 200
votes = resp.json()
assert item_id in votes
assert votes[item_id] == 1
def test_my_votes_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/memory/my-votes")
assert resp.status_code == 401
class TestMemoryAdminEndpoints:
def _create_item(self, c, token):
resp = c.post(
"/api/memory",
json={"title": "Admin Test", "content": "content", "category": "policy"},
headers=_auth(token),
)
assert resp.status_code == 201
return resp.json()["id"]
def test_admin_approve(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
item_id = self._create_item(c, admin_token)
resp = c.post(f"/api/memory/admin/approve?item_id={item_id}", headers=_auth(admin_token))
assert resp.status_code == 200
assert resp.json()["status"] == "approved"
def test_admin_reject(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
item_id = self._create_item(c, admin_token)
resp = c.post(
f"/api/memory/admin/reject?item_id={item_id}",
json={"reason": "not relevant"},
headers=_auth(admin_token),
)
assert resp.status_code == 200
assert resp.json()["status"] == "rejected"
def test_admin_mandate(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
item_id = self._create_item(c, admin_token)
resp = c.post(
f"/api/memory/admin/mandate?item_id={item_id}",
json={"reason": "company policy", "audience": "all"},
headers=_auth(admin_token),
)
assert resp.status_code == 200
assert resp.json()["status"] == "mandatory"
def test_admin_approve_analyst_gets_403(self, seeded_app):
"""Analyst cannot use admin governance endpoints."""
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
item_id = self._create_item(c, admin_token)
resp = c.post(f"/api/memory/admin/approve?item_id={item_id}", headers=_auth(analyst_token))
assert resp.status_code == 403
def test_admin_reject_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
item_id = self._create_item(c, admin_token)
resp = c.post(
f"/api/memory/admin/reject?item_id={item_id}",
json={"reason": "nope"},
headers=_auth(analyst_token),
)
assert resp.status_code == 403
def test_admin_mandate_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
item_id = self._create_item(c, admin_token)
resp = c.post(
f"/api/memory/admin/mandate?item_id={item_id}",
json={"reason": "policy"},
headers=_auth(analyst_token),
)
assert resp.status_code == 403
def test_admin_approve_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post("/api/memory/admin/approve?item_id=some-id")
assert resp.status_code == 401

161
tests/test_metadata_api.py Normal file
View file

@ -0,0 +1,161 @@
"""Tests for admin metadata API — column metadata CRUD and push."""
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
def _register_table(c, token, table_name="test_table"):
"""Helper to register a table in the registry."""
resp = c.post(
"/api/admin/register-table",
json={"name": table_name, "source_type": "keboola", "bucket": "in.c-test",
"source_table": table_name, "query_mode": "local"},
headers=_auth(token),
)
return resp.json().get("id", table_name.lower())
class TestGetMetadata:
def test_get_metadata_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/admin/metadata/some_table", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["table_id"] == "some_table"
assert "columns" in data
assert isinstance(data["columns"], list)
def test_get_metadata_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/admin/metadata/some_table")
assert resp.status_code == 401
def test_get_metadata_analyst_allowed(self, seeded_app):
"""GET metadata is allowed for authenticated users (not admin-only)."""
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/admin/metadata/some_table", headers=_auth(token))
assert resp.status_code == 200
class TestSaveMetadata:
def test_save_column_metadata_as_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
table_id = _register_table(c, token, "orders_meta")
resp = c.post(
f"/api/admin/metadata/{table_id}",
json={
"columns": [
{"column_name": "id", "basetype": "INTEGER", "description": "Primary key", "confidence": "manual"},
{"column_name": "name", "basetype": "VARCHAR", "description": "Customer name"},
]
},
headers=_auth(token),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["count"] == 2
def test_save_metadata_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/admin/metadata/some_table",
json={"columns": [{"column_name": "id", "basetype": "INTEGER"}]},
headers=_auth(token),
)
assert resp.status_code == 403
def test_save_metadata_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/admin/metadata/some_table",
json={"columns": [{"column_name": "id"}]},
)
assert resp.status_code == 401
def test_save_metadata_missing_columns_returns_422(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/metadata/some_table",
json={}, # missing 'columns'
headers=_auth(token),
)
assert resp.status_code == 422
def test_save_then_get_metadata(self, seeded_app):
"""Save metadata then verify it can be retrieved."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
table_id = _register_table(c, token, "round_trip_table")
c.post(
f"/api/admin/metadata/{table_id}",
json={
"columns": [
{"column_name": "amount", "basetype": "DECIMAL", "description": "Order amount"},
]
},
headers=_auth(token),
)
resp = c.get(f"/api/admin/metadata/{table_id}", headers=_auth(token))
assert resp.status_code == 200
columns = resp.json()["columns"]
assert len(columns) >= 1
col_names = [c_["column_name"] for c_ in columns]
assert "amount" in col_names
class TestPushMetadata:
def test_push_metadata_table_not_found_returns_404(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post("/api/admin/metadata/nonexistent_table/push", headers=_auth(token))
assert resp.status_code == 404
def test_push_metadata_non_keboola_returns_400(self, seeded_app):
"""Push is only supported for keboola tables."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Register a bigquery table (non-keboola)
c.post(
"/api/admin/register-table",
json={"name": "bq_table", "source_type": "bigquery", "query_mode": "remote"},
headers=_auth(token),
)
resp = c.post("/api/admin/metadata/bq_table/push", headers=_auth(token))
assert resp.status_code == 400
assert "keboola" in resp.json()["detail"].lower()
def test_push_metadata_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post("/api/admin/metadata/some_table/push", headers=_auth(token))
assert resp.status_code == 403
def test_push_metadata_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post("/api/admin/metadata/some_table/push")
assert resp.status_code == 401
def test_push_keboola_table_without_env_vars_returns_500(self, seeded_app):
"""Keboola table without env vars configured should return 500."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
table_id = _register_table(c, token, "kbc_push_table")
# Should fail with 500 because KBC_STACK_URL and KBC_STORAGE_TOKEN are not set
resp = c.post(f"/api/admin/metadata/{table_id}/push", headers=_auth(token))
assert resp.status_code == 500

View file

@ -0,0 +1,168 @@
"""Tests for admin permissions API — grant, revoke, list."""
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
class TestGrantPermission:
def test_grant_permission_as_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "sales", "access": "read"},
headers=_auth(token),
)
assert resp.status_code == 201
data = resp.json()
assert data["user_id"] == "analyst1"
assert data["dataset"] == "sales"
assert data["access"] == "read"
def test_grant_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "sales", "access": "read"},
headers=_auth(token),
)
assert resp.status_code == 403
def test_grant_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "sales", "access": "read"},
)
assert resp.status_code == 401
def test_grant_missing_fields_returns_422(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/permissions",
json={"user_id": "analyst1"}, # missing 'dataset'
headers=_auth(token),
)
assert resp.status_code == 422
class TestRevokePermission:
def _delete_with_json(self, c, url, body, headers=None):
"""TestClient.delete() does not support a body — use request() method."""
import json
h = {"Content-Type": "application/json"}
if headers:
h.update(headers)
return c.request("DELETE", url, content=json.dumps(body), headers=h)
def test_revoke_permission_as_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Grant first
c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "to_revoke", "access": "read"},
headers=_auth(token),
)
# Then revoke
resp = self._delete_with_json(
c, "/api/admin/permissions",
{"user_id": "analyst1", "dataset": "to_revoke"},
headers=_auth(token),
)
assert resp.status_code == 200
assert resp.json()["revoked"] is True
def test_revoke_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = self._delete_with_json(
c, "/api/admin/permissions",
{"user_id": "analyst1", "dataset": "sales"},
headers=_auth(token),
)
assert resp.status_code == 403
def test_revoke_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = self._delete_with_json(
c, "/api/admin/permissions",
{"user_id": "analyst1", "dataset": "sales"},
)
assert resp.status_code == 401
class TestListUserPermissions:
def test_list_user_permissions_as_admin(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Grant a permission
c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "finance", "access": "read"},
headers=_auth(token),
)
resp = c.get("/api/admin/permissions/analyst1", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["user_id"] == "analyst1"
assert "permissions" in data
datasets = [p["dataset"] for p in data["permissions"]]
assert "finance" in datasets
def test_list_user_permissions_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/admin/permissions/analyst1", headers=_auth(token))
assert resp.status_code == 403
def test_list_user_permissions_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/admin/permissions/analyst1")
assert resp.status_code == 401
class TestListAllPermissions:
def test_list_all_permissions_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/admin/permissions", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert "permissions" in data
assert isinstance(data["permissions"], list)
def test_list_all_permissions_after_grant(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "all_perms_test", "access": "read"},
headers=_auth(token),
)
resp = c.get("/api/admin/permissions", headers=_auth(token))
assert resp.status_code == 200
datasets = [p["dataset"] for p in resp.json()["permissions"]]
assert "all_perms_test" in datasets
def test_list_all_permissions_analyst_gets_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/admin/permissions", headers=_auth(token))
assert resp.status_code == 403
def test_list_all_permissions_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/admin/permissions")
assert resp.status_code == 401

185
tests/test_scripts_api.py Normal file
View file

@ -0,0 +1,185 @@
"""Tests for scripts API endpoints — deploy, run, list, undeploy."""
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
class TestScriptsList:
def test_list_scripts_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/scripts", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 0
assert data["scripts"] == []
def test_list_scripts_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/scripts")
assert resp.status_code == 401
class TestScriptsDeploy:
def test_deploy_safe_script(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/scripts/deploy",
json={"name": "hello", "source": "print('hello world')"},
headers=_auth(token),
)
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "hello"
assert "id" in data
def test_deploy_with_schedule(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/scripts/deploy",
json={"name": "scheduled", "source": "print('scheduled')", "schedule": "0 8 * * MON"},
headers=_auth(token),
)
assert resp.status_code == 201
data = resp.json()
assert data["schedule"] == "0 8 * * MON"
def test_deploy_script_with_blocked_import_deploys_ok_but_run_fails(self, seeded_app):
"""Deploy stores scripts as-is; safety validation happens at run time, not deploy time."""
c = seeded_app["client"]
token = seeded_app["analyst_token"]
# Deploy succeeds (no pre-validation at deploy time)
deploy_resp = c.post(
"/api/scripts/deploy",
json={"name": "bad_import", "source": "import os; print(os.getcwd())"},
headers=_auth(token),
)
assert deploy_resp.status_code == 201
script_id = deploy_resp.json()["id"]
# Running it should fail with 400 due to blocked import
run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token))
assert run_resp.status_code == 400
assert "Blocked" in run_resp.json()["detail"] or "disallowed" in run_resp.json()["detail"]
def test_deploy_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/scripts/deploy",
json={"name": "hello", "source": "print('hello')"},
)
assert resp.status_code == 401
def test_deploy_appears_in_list(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
c.post(
"/api/scripts/deploy",
json={"name": "listed_script", "source": "x = 1"},
headers=_auth(token),
)
resp = c.get("/api/scripts", headers=_auth(token))
assert resp.json()["count"] >= 1
names = [s["name"] for s in resp.json()["scripts"]]
assert "listed_script" in names
class TestScriptsRun:
def test_run_adhoc_safe_script(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/scripts/run",
json={"source": "print('hello from adhoc')", "name": "adhoc_test"},
headers=_auth(token),
)
assert resp.status_code == 200
data = resp.json()
assert data["exit_code"] == 0
assert "hello from adhoc" in data["stdout"]
def test_run_adhoc_blocked_os_module(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/scripts/run",
json={"source": "import sys; print(sys.path)", "name": "bad"},
headers=_auth(token),
)
assert resp.status_code == 400
def test_run_adhoc_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/scripts/run",
json={"source": "print('hello')", "name": "test"},
)
assert resp.status_code == 401
def test_run_adhoc_no_source_returns_400(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/scripts/run",
json={"name": "no_source"},
headers=_auth(token),
)
assert resp.status_code == 400
def test_run_deployed_script(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
# Deploy first
deploy_resp = c.post(
"/api/scripts/deploy",
json={"name": "calc", "source": "print(2+2)"},
headers=_auth(token),
)
assert deploy_resp.status_code == 201
script_id = deploy_resp.json()["id"]
# Run deployed script
resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token))
assert resp.status_code == 200
assert "4" in resp.json()["stdout"]
def test_run_nonexistent_script_returns_404(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post("/api/scripts/nonexistent-id/run", headers=_auth(token))
assert resp.status_code == 404
class TestScriptsDelete:
def test_undeploy_requires_admin(self, seeded_app):
"""Analyst cannot delete scripts — only admin can."""
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
# Deploy as analyst
deploy_resp = c.post(
"/api/scripts/deploy",
json={"name": "to_delete", "source": "print('bye')"},
headers=_auth(analyst_token),
)
script_id = deploy_resp.json()["id"]
# Analyst cannot delete
resp = c.delete(f"/api/scripts/{script_id}", headers=_auth(analyst_token))
assert resp.status_code == 403
# Admin can delete
resp = c.delete(f"/api/scripts/{script_id}", headers=_auth(admin_token))
assert resp.status_code == 204
def test_undeploy_nonexistent_returns_404(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.delete("/api/scripts/does-not-exist", headers=_auth(token))
assert resp.status_code == 404

126
tests/test_settings_api.py Normal file
View file

@ -0,0 +1,126 @@
"""Tests for user settings API endpoints."""
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
class TestSettingsGet:
def test_get_settings_returns_user_id(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/settings", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["user_id"] == "admin1"
assert "sync_settings" in data
assert "permissions" in data
def test_get_settings_analyst(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.get("/api/settings", headers=_auth(token))
assert resp.status_code == 200
data = resp.json()
assert data["user_id"] == "analyst1"
def test_get_settings_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.get("/api/settings")
assert resp.status_code == 401
def test_get_settings_empty_permissions_for_new_user(self, seeded_app):
"""New users have no permissions by default."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.get("/api/settings", headers=_auth(token))
assert resp.status_code == 200
# Admin sees their own settings — permissions list should exist (may be empty)
assert isinstance(resp.json()["permissions"], list)
class TestSettingsDataset:
def test_update_dataset_setting_with_permission(self, seeded_app):
"""Admin granting permission first, then analyst can update the dataset setting."""
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
# Grant permission to analyst first
c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "sales_data", "access": "read"},
headers=_auth(admin_token),
)
resp = c.put(
"/api/settings/dataset",
json={"dataset": "sales_data", "enabled": True},
headers=_auth(analyst_token),
)
assert resp.status_code == 200
data = resp.json()
assert data["dataset"] == "sales_data"
assert data["enabled"] is True
def test_update_dataset_setting_without_permission_returns_403(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.put(
"/api/settings/dataset",
json={"dataset": "secret_data", "enabled": True},
headers=_auth(token),
)
assert resp.status_code == 403
def test_update_dataset_setting_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.put(
"/api/settings/dataset",
json={"dataset": "sales_data", "enabled": True},
)
assert resp.status_code == 401
def test_update_dataset_missing_fields_returns_422(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.put(
"/api/settings/dataset",
json={"dataset": "sales_data"}, # missing 'enabled'
headers=_auth(token),
)
assert resp.status_code == 422
def test_update_without_explicit_permission_returns_403_even_for_admin(self, seeded_app):
"""The dataset settings endpoint checks dataset_permissions table — even admin
needs explicit permission to enable/disable a specific dataset via this endpoint."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.put(
"/api/settings/dataset",
json={"dataset": "any_dataset_no_perm", "enabled": False},
headers=_auth(token),
)
# The endpoint checks perm_repo.has_access which doesn't have admin bypass
assert resp.status_code == 403
def test_disable_dataset_with_permission(self, seeded_app):
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
# Grant explicit permission to admin for the dataset
c.post(
"/api/admin/permissions",
json={"user_id": "admin1", "dataset": "some_table", "access": "read"},
headers=_auth(admin_token),
)
resp = c.put(
"/api/settings/dataset",
json={"dataset": "some_table", "enabled": False},
headers=_auth(admin_token),
)
assert resp.status_code == 200
assert resp.json()["enabled"] is False

185
tests/test_upload_api.py Normal file
View file

@ -0,0 +1,185 @@
"""Tests for upload API endpoints — sessions, artifacts, local-md."""
import io
import pytest
def _auth(token):
return {"Authorization": f"Bearer {token}"}
class TestUploadSessions:
def test_upload_session_success(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
content = b'{"type": "message", "text": "hello"}\n{"type": "message", "text": "world"}\n'
resp = c.post(
"/api/upload/sessions",
files={"file": ("session.jsonl", io.BytesIO(content), "application/jsonl")},
headers=_auth(token),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["filename"] == "session.jsonl"
assert data["size"] == len(content)
def test_upload_session_requires_auth(self, seeded_app):
c = seeded_app["client"]
content = b'{"type": "message"}\n'
resp = c.post(
"/api/upload/sessions",
files={"file": ("session.jsonl", io.BytesIO(content), "application/jsonl")},
)
assert resp.status_code == 401
def test_upload_session_directory_traversal_rejected(self, seeded_app):
"""Filenames with ../ should be sanitized — only the basename is kept."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
content = b'{"type": "message"}\n'
resp = c.post(
"/api/upload/sessions",
files={"file": ("../../etc/passwd", io.BytesIO(content), "application/jsonl")},
headers=_auth(token),
)
# The upload should succeed, but the path traversal should be stripped
assert resp.status_code == 200
data = resp.json()
# filename must be just the basename — no slashes, no traversal
assert "/" not in data["filename"]
assert data["filename"] in ("passwd", "etc")
def test_upload_session_empty_content(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/upload/sessions",
files={"file": ("empty.jsonl", io.BytesIO(b""), "application/jsonl")},
headers=_auth(token),
)
assert resp.status_code == 200
assert resp.json()["size"] == 0
def test_upload_session_analyst_allowed(self, seeded_app):
"""Analyst users can also upload sessions."""
c = seeded_app["client"]
token = seeded_app["analyst_token"]
content = b'{"type": "message"}\n'
resp = c.post(
"/api/upload/sessions",
files={"file": ("analyst_session.jsonl", io.BytesIO(content), "application/jsonl")},
headers=_auth(token),
)
assert resp.status_code == 200
class TestUploadArtifacts:
def test_upload_html_artifact(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
content = b"<html><body>Report</body></html>"
resp = c.post(
"/api/upload/artifacts",
files={"file": ("report.html", io.BytesIO(content), "text/html")},
headers=_auth(token),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["filename"] == "report.html"
assert data["size"] == len(content)
def test_upload_png_artifact(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Minimal valid PNG header
content = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
resp = c.post(
"/api/upload/artifacts",
files={"file": ("chart.png", io.BytesIO(content), "image/png")},
headers=_auth(token),
)
assert resp.status_code == 200
assert resp.json()["filename"] == "chart.png"
def test_upload_artifact_requires_auth(self, seeded_app):
c = seeded_app["client"]
content = b"<html><body>Report</body></html>"
resp = c.post(
"/api/upload/artifacts",
files={"file": ("report.html", io.BytesIO(content), "text/html")},
)
assert resp.status_code == 401
def test_upload_artifact_directory_traversal(self, seeded_app):
"""Path traversal in artifact filenames should be sanitized."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
content = b"<html></html>"
resp = c.post(
"/api/upload/artifacts",
files={"file": ("../../../tmp/evil.html", io.BytesIO(content), "text/html")},
headers=_auth(token),
)
assert resp.status_code == 200
# Must not contain slashes
assert "/" not in resp.json()["filename"]
def test_upload_artifact_empty(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/upload/artifacts",
files={"file": ("empty.html", io.BytesIO(b""), "text/html")},
headers=_auth(token),
)
assert resp.status_code == 200
assert resp.json()["size"] == 0
class TestUploadLocalMd:
def test_upload_local_md_success(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
content = "# My Notes\n\nSome corporate memory content."
resp = c.post(
"/api/upload/local-md",
json={"content": content},
headers=_auth(token),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "ok"
assert data["user"] == "admin@test.com"
assert data["size"] == len(content)
def test_upload_local_md_requires_auth(self, seeded_app):
c = seeded_app["client"]
resp = c.post(
"/api/upload/local-md",
json={"content": "some content"},
)
assert resp.status_code == 401
def test_upload_local_md_missing_content_field(self, seeded_app):
"""Missing content field should return 422."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/upload/local-md",
json={},
headers=_auth(token),
)
assert resp.status_code == 422
def test_upload_local_md_analyst(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/upload/local-md",
json={"content": "analyst notes"},
headers=_auth(token),
)
assert resp.status_code == 200
assert resp.json()["user"] == "analyst@test.com"