diff --git a/app/web/router.py b/app/web/router.py index eea5fdc..f9f609f 100644 --- a/app/web/router.py +++ b/app/web/router.py @@ -16,7 +16,8 @@ import duckdb import jinja2 -from app.auth.dependencies import get_current_user, get_optional_user, _get_db +from app.auth.dependencies import get_current_user, get_optional_user, require_role, _get_db +from src.rbac import Role from app.instance_config import ( get_instance_name, get_instance_subtitle, get_datasets, get_theme, get_corporate_memory_config, @@ -388,7 +389,7 @@ async def corporate_memory( @router.get("/corporate-memory/admin", response_class=HTMLResponse) async def corporate_memory_admin( request: Request, - user: dict = Depends(get_current_user), + user: dict = Depends(require_role(Role.KM_ADMIN)), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): repo = KnowledgeRepository(conn) @@ -431,7 +432,7 @@ async def activity_center( @router.get("/admin/tables", response_class=HTMLResponse) async def admin_tables( request: Request, - user: dict = Depends(get_current_user), + user: dict = Depends(require_role(Role.ADMIN)), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): from src.repositories.table_registry import TableRegistryRepository @@ -444,7 +445,7 @@ async def admin_tables( @router.get("/admin/permissions", response_class=HTMLResponse) async def admin_permissions_page( request: Request, - user: dict = Depends(get_current_user), + user: dict = Depends(require_role(Role.ADMIN)), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Admin page for managing permissions and access requests.""" diff --git a/tests/test_web_ui.py b/tests/test_web_ui.py index e4ade43..bcc9f59 100644 --- a/tests/test_web_ui.py +++ b/tests/test_web_ui.py @@ -23,18 +23,42 @@ def web_client(tmp_path, monkeypatch): @pytest.fixture def admin_cookie(web_client, tmp_path, monkeypatch): + from argon2 import PasswordHasher from src.db import get_system_db from src.repositories.users import UserRepository + password = "AdminPass1!" + password_hash = PasswordHasher().hash(password) conn = get_system_db() - UserRepository(conn).create(id="admin1", email="admin@test.com", name="Admin", role="admin") + UserRepository(conn).create( + id="admin1", email="admin@test.com", name="Admin", role="admin", + password_hash=password_hash, + ) conn.close() - # Get token via the API to ensure JWT secret matches the running app - resp = web_client.post("/auth/token", json={"email": "admin@test.com"}) + resp = web_client.post("/auth/token", json={"email": "admin@test.com", "password": password}) assert resp.status_code == 200, f"Bootstrap failed: {resp.text}" token = resp.json()["access_token"] return {"access_token": token} +@pytest.fixture +def analyst_cookie(web_client, tmp_path, monkeypatch): + from argon2 import PasswordHasher + from src.db import get_system_db + from src.repositories.users import UserRepository + password = "AnalystPass1!" + password_hash = PasswordHasher().hash(password) + conn = get_system_db() + UserRepository(conn).create( + id="analyst1", email="analyst@test.com", name="Analyst", role="analyst", + password_hash=password_hash, + ) + conn.close() + resp = web_client.post("/auth/token", json={"email": "analyst@test.com", "password": password}) + assert resp.status_code == 200, f"Analyst token failed: {resp.text}" + token = resp.json()["access_token"] + return {"access_token": token} + + class TestWebUISmoke: def test_login_page(self, web_client): resp = web_client.get("/login") @@ -67,3 +91,21 @@ class TestWebUISmoke: if resp.status_code == 404: pytest.skip("Route /admin/permissions does not exist") assert resp.status_code == 200 + + +class TestAdminRoleGuards: + def test_analyst_cannot_access_admin_tables(self, web_client, admin_cookie, analyst_cookie): + resp = web_client.get("/admin/tables", cookies=analyst_cookie) + assert resp.status_code == 403 + + def test_analyst_cannot_access_admin_permissions(self, web_client, admin_cookie, analyst_cookie): + resp = web_client.get("/admin/permissions", cookies=analyst_cookie) + assert resp.status_code == 403 + + def test_admin_can_access_admin_tables(self, web_client, admin_cookie): + resp = web_client.get("/admin/tables", cookies=admin_cookie) + assert resp.status_code == 200 + + def test_admin_can_access_admin_permissions(self, web_client, admin_cookie): + resp = web_client.get("/admin/permissions", cookies=admin_cookie) + assert resp.status_code == 200