"""Smoke tests for web UI pages.""" import os import pytest from fastapi.testclient import TestClient @pytest.fixture def web_client(tmp_path, monkeypatch): monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("TESTING", "1") monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-key-min-32-characters!!") (tmp_path / "state").mkdir() (tmp_path / "analytics").mkdir() (tmp_path / "extracts").mkdir() # Reset global DuckDB singleton to pick up new DATA_DIR from src.db import close_system_db close_system_db() from app.main import create_app app = create_app() yield TestClient(app) close_system_db() @pytest.fixture def admin_cookie(web_client, tmp_path, monkeypatch): from argon2 import PasswordHasher from src.db import get_system_db from src.repositories.users import UserRepository from tests.helpers.auth import grant_admin password = "AdminPass1!" password_hash = PasswordHasher().hash(password) conn = get_system_db() UserRepository(conn).create( id="admin1", email="admin@test.com", name="Admin", password_hash=password_hash, ) grant_admin(conn, "admin1") conn.close() resp = web_client.post("/auth/token", json={"email": "admin@test.com", "password": password}) assert resp.status_code == 200, f"Bootstrap failed: {resp.text}" token = resp.json()["access_token"] return {"access_token": token} @pytest.fixture def analyst_cookie(web_client, tmp_path, monkeypatch): from argon2 import PasswordHasher from src.db import get_system_db from src.repositories.users import UserRepository password = "AnalystPass1!" password_hash = PasswordHasher().hash(password) conn = get_system_db() UserRepository(conn).create( id="analyst1", email="analyst@test.com", name="Analyst", password_hash=password_hash, ) conn.close() resp = web_client.post("/auth/token", json={"email": "analyst@test.com", "password": password}) assert resp.status_code == 200, f"Analyst token failed: {resp.text}" token = resp.json()["access_token"] return {"access_token": token} class TestWebUISmoke: def test_login_page(self, web_client): resp = web_client.get("/login") assert resp.status_code == 200 def test_dashboard(self, web_client, admin_cookie): resp = web_client.get("/dashboard", cookies=admin_cookie) assert resp.status_code in (200, 302) def test_catalog(self, web_client, admin_cookie): resp = web_client.get("/catalog", cookies=admin_cookie) assert resp.status_code == 200 def test_corporate_memory(self, web_client, admin_cookie): resp = web_client.get("/corporate-memory", cookies=admin_cookie) assert resp.status_code == 200 def test_activity_center(self, web_client, admin_cookie): resp = web_client.get("/activity-center", cookies=admin_cookie) assert resp.status_code == 200 def test_admin_tables(self, web_client, admin_cookie): resp = web_client.get("/admin/tables", cookies=admin_cookie) if resp.status_code == 404: pytest.skip("Route /admin/tables does not exist") assert resp.status_code == 200 def test_admin_permissions_route_removed(self, web_client, admin_cookie): """v19 dropped the half-shipped /admin/permissions page (replaced by the unified /admin/access page). Verify the route is gone.""" resp = web_client.get("/admin/permissions", cookies=admin_cookie) assert resp.status_code == 404 def test_admin_users_renders_modern_ui(self, web_client, admin_cookie): resp = web_client.get("/admin/users", cookies=admin_cookie) assert resp.status_code == 200 body = resp.text # Shared header chrome assert "app-header" in body # Nav: "My tokens" (own) is in the user-menu dropdown; admin Tokens # entry (and Tables, Users, Groups, Resource access, Server config) # lives in the Admin dropdown. assert 'href="/tokens"' in body assert 'href="/admin/tokens"' in body assert 'href="/profile"' in body assert 'href="/admin/users"' in body # v12 modern UI markers — Role column was replaced by Groups chips, # so role-pill is gone. Confirm-modal pattern is shared by both. assert 'class="users-page"' in body assert 'id="confirm-modal"' in body def test_nav_shows_tokens_link_for_non_admin(self, web_client, analyst_cookie): """Non-admins see 'My tokens' + 'Profile' user-menu links — no admin Tokens entry.""" resp = web_client.get("/dashboard", cookies=analyst_cookie) assert resp.status_code in (200, 302) if resp.status_code == 302: # Dashboard may redirect in some flows; follow it for nav check. resp = web_client.get(resp.headers["location"], cookies=analyst_cookie) body = resp.text assert 'href="/tokens"' in body assert 'href="/profile"' in body assert ">My tokens<" in body assert ">Profile<" in body # Non-admins must NOT see the admin Tokens link inside the Admin dropdown. assert 'href="/admin/tokens"' not in body def test_nav_shows_all_tokens_link_for_admin(self, web_client, admin_cookie): """Admins see the 'My tokens' user-menu link and the admin Tokens entry inside the Admin dropdown.""" resp = web_client.get("/dashboard", cookies=admin_cookie) assert resp.status_code in (200, 302) if resp.status_code == 302: resp = web_client.get(resp.headers["location"], cookies=admin_cookie) body = resp.text assert 'href="/tokens"' in body assert 'href="/admin/tokens"' in body assert ">My tokens<" in body # Admin dropdown now lists Tables / Tokens / Users / Groups / Resource access / Server config. assert 'href="/admin/tables"' in body assert ">Tables<" in body assert ">Tokens<" in body def test_profile_renders_account_details(self, web_client, admin_cookie): """/profile renders a real profile page with email + tokens link. v12 changes: role-pill is replaced by an Admin-pill driven by Admin user_group membership; ``session.google_groups`` is gone (the OAuth callback writes Workspace memberships into ``user_group_members`` instead), so the "No Google groups available" empty state is no longer rendered. """ resp = web_client.get("/profile", cookies=admin_cookie) assert resp.status_code == 200 body = resp.text assert "admin@test.com" in body assert 'href="/tokens"' in body def test_profile_requires_auth(self, web_client): """/profile requires auth (was a 302 back-compat redirect before).""" resp = web_client.get("/profile", follow_redirects=False) # Auth dep raises 401; some configs may redirect to /login — accept either. assert resp.status_code in (401, 302) @pytest.mark.skip( reason=( "v12: /profile no longer renders an admin-self-management link. " "Admin can navigate to /admin/users/{id} from the top-nav Admin " "dropdown directly. Drop or rewrite this test once the profile " "page settles." ) ) def test_profile_shows_admin_detail_link_for_admin(self, web_client, admin_cookie): resp = web_client.get("/profile", cookies=admin_cookie) assert resp.status_code == 200 assert 'href="/admin/users/admin1"' in resp.text @pytest.mark.skip( reason=( "v12: profile page no longer surfaces /admin/users/* link at all, " "so the negative-assertion is moot. Header chrome unrelated to " "the profile body now contains the admin dropdown." ) ) def test_profile_hides_admin_detail_link_for_non_admin(self, web_client, analyst_cookie): resp = web_client.get("/profile", cookies=analyst_cookie) assert resp.status_code == 200 assert "/admin/users/" not in resp.text @pytest.mark.skip( reason=( "v12: the four-level core.viewer/analyst/km_admin/admin hierarchy " "is gone. Profile now shows group memberships (user_group_members) " "and effective resource access (resource_grants), not internal " "role keys. Rewrite against the new sections — see " "templates/profile.html." ) ) def test_profile_shows_effective_roles_for_non_admin(self, web_client, analyst_cookie): resp = web_client.get("/profile", cookies=analyst_cookie) assert resp.status_code == 200 body = resp.text assert "Effective roles" in body assert "core.analyst" in body assert "core.viewer" in body assert "Direct grants" in body class TestClaudeSetupPreview: """/install and /dashboard render a visible, read-only preview of the 'Setup a new Claude Code' clipboard payload. The real token is never rendered into the HTML — only a styled placeholder is. """ def test_install_preview_visible_for_signed_in_user(self, web_client, admin_cookie): # /setup is now a single unified flow regardless of caller's role. # Admin sees the same layout as everyone else; the marketplace # block appears iff the caller has plugin grants in # `resource_grants` (the seeded admin in this fixture has none). resp = web_client.get("/setup", cookies=admin_cookie) assert resp.status_code == 200 body = resp.text # Preview card + placeholder token render assert "setup-preview-pre" in body assert "What Claude Code will receive" in body assert "<will be generated on click>" in body assert 'class="placeholder-token"' in body # Setup payload text substituted with real server URL. The wheel URL # must be under /cli/wheel/ (uv tool install rejects a bare .whl alias # because it validates the PEP 427 filename in the URL before fetch). assert "/cli/wheel/" in body assert "/cli/agnes.whl" not in body # Unified always-on layout (Fix B + Fix C in 2026-05-10 init-report # response): preflight + marketplace + Atlassian MCP all unconditional. # Step 1 install, step 2 mkdir/cd, step 3 init, step 4 catalog, # step 5 preflight, step 6 marketplace, step 7 diagnose. assert "1) Install the CLI" in body assert "7) Run diagnostics" in body assert "agnes diagnose" in body # `agnes init` is now the mandatory bootstrap step. assert "agnes init" in body # The generated /setup prompt's "Log in" / "Verify the login" # admin-only headers are gone (agnes init subsumes them). # `agnes auth whoami` survives as a static manual-install # example elsewhere on the page (not in the generated prompt). assert "2) Log in" not in body assert "3) Verify the login" not in body def test_install_preview_unified_layout(self, web_client, admin_cookie): """The clipboard payload (SETUP_INSTRUCTIONS_TEMPLATE JS array) carries the unified layout for every caller — admin-vs-analyst is no longer a layout branch. Marketplace + Atlassian MCP blocks are always emitted (Fix B + Fix C in 2026-05-10 init-report response): the user-facing one-liner is `agnes refresh-marketplace --bootstrap` (the literal `claude plugin marketplace add` shows up only as a documentation comment listing what the binary does internally, never as an instruction to run by hand).""" import re resp = web_client.get("/setup", cookies=admin_cookie) assert resp.status_code == 200 body = resp.text match = re.search( r"var\s+SETUP_INSTRUCTIONS_TEMPLATE\s*=\s*\[(.*?)\]\.join\(", body, re.DOTALL, ) assert match, "SETUP_INSTRUCTIONS_TEMPLATE array missing" clipboard = match.group(1) assert "agnes init" in clipboard # User runs the bootstrap one-liner, not raw `claude plugin # marketplace add` — the latter is an internal step described in a # comment block, never an action line to run. assert "agnes refresh-marketplace --bootstrap" in clipboard # Atlassian MCP registration is always-on now. assert "claude mcp add --transport sse atlassian" in clipboard # Legacy admin-only auth verbs are gone from the generated prompt. assert "agnes auth import-token" not in clipboard # `agnes auth whoami` was the old admin step 3; subsumed by # `agnes init` + `agnes catalog` smoke verify. assert "3) Verify the login" not in clipboard assert "2) Log in" not in clipboard def test_dashboard_setup_cta_links_to_setup(self, web_client, admin_cookie): """Dashboard setup CTA shows env-setup-cta and a link to /setup instead of an inline collapsed preview.""" resp = web_client.get("/dashboard", cookies=admin_cookie) assert resp.status_code == 200 body = resp.text assert "env-setup-cta" in body assert "Open the full setup page" in body assert 'href="/setup"' in body # inline
preview block must no longer appear assert 'aria-label="Preview of the clipboard payload"' not in body def test_install_mcp_card_removed(self, web_client): """The stale 'Use with Claude Code / MCP' card on /setup has been removed — there is no Agnes-as-MCP-server today. The Atlassian MCP server registration step (Fix C in the 2026-05-10 init-report response) is registered FROM the setup script, not as a /setup- page card; that's an unrelated wiring direction. """ resp = web_client.get("/setup") assert resp.status_code == 200 body = resp.text assert "Use with Claude Code / MCP" not in body class TestAdminRoleGuards: def test_analyst_cannot_access_admin_tables(self, web_client, admin_cookie, analyst_cookie): resp = web_client.get("/admin/tables", cookies=analyst_cookie) assert resp.status_code == 403 def test_admin_can_access_admin_tables(self, web_client, admin_cookie): resp = web_client.get("/admin/tables", cookies=admin_cookie) assert resp.status_code == 200 def test_analyst_cannot_access_admin_access_page(self, web_client, analyst_cookie): """The unified /admin/access page replaces the dropped /admin/permissions page. Non-admin must still be blocked.""" resp = web_client.get("/admin/access", cookies=analyst_cookie) assert resp.status_code == 403 def test_admin_can_access_admin_access_page(self, web_client, admin_cookie): resp = web_client.get("/admin/access", cookies=admin_cookie) assert resp.status_code == 200 def test_analyst_cannot_access_corporate_memory_admin(self, web_client, admin_cookie, analyst_cookie): resp = web_client.get("/corporate-memory/admin", cookies=analyst_cookie) assert resp.status_code == 403 def test_admin_agent_prompt_page_admin_only(self, web_client, admin_cookie, analyst_cookie): """The renamed Agent Setup Prompt page is gated by require_admin.""" # Unauthenticated → 302 redirect to login r = web_client.get("/admin/agent-prompt", follow_redirects=False) assert r.status_code in (302, 401, 403) # Non-admin → 403 r = web_client.get("/admin/agent-prompt", cookies=analyst_cookie, follow_redirects=False) assert r.status_code == 403 # Admin → 200 r = web_client.get("/admin/agent-prompt", cookies=admin_cookie, follow_redirects=False) assert r.status_code == 200 def test_admin_scheduler_runs_page_admin_only(self, web_client, admin_cookie, analyst_cookie): """The /admin/scheduler-runs read-only audit-log view is gated by require_admin.""" r = web_client.get("/admin/scheduler-runs", follow_redirects=False) assert r.status_code in (302, 401, 403) r = web_client.get("/admin/scheduler-runs", cookies=analyst_cookie, follow_redirects=False) assert r.status_code == 403 r = web_client.get("/admin/scheduler-runs", cookies=admin_cookie, follow_redirects=False) assert r.status_code == 200 assert b"run_session_collector" in r.content # Post-refactor: per-processor audit actions instead of one # run_verification_detector. Both processors are wired in # SCHEDULER_AUDIT_ACTIONS. assert b"run_session_processor:verification" in r.content assert b"run_session_processor:usage" in r.content assert b"run_corporate_memory" in r.content # Devin Review on e86dd5ed: list must use the actual logged action # string, not a guess. assert b"marketplace.sync_all" in r.content def test_profile_sessions_page_no_admin_required(self, web_client, analyst_cookie, admin_cookie): """The /profile/sessions page is gated by get_current_user, not require_admin — every authenticated user views their own sessions.""" r = web_client.get("/profile/sessions", follow_redirects=False) assert r.status_code in (302, 401, 403) r = web_client.get("/profile/sessions", cookies=analyst_cookie, follow_redirects=False) assert r.status_code == 200 assert b"My sessions" in r.content r = web_client.get("/profile/sessions", cookies=admin_cookie, follow_redirects=False) assert r.status_code == 200 def test_profile_session_download_path_safety(self, web_client, analyst_cookie): """Per-session download endpoint must reject any filename that could escape the user's own session directory.""" # NB: bare ".." is excluded — httpx normalises the URL to # /profile/sessions before sending, so it never reaches the # download handler. The %2F-encoded variant exercises the real # path-component value that does reach the handler. for bad in ["../etc/passwd", "subdir/file.jsonl", ".env", "session.jsonl.bak", "..%2Fetc%2Fpasswd"]: r = web_client.get(f"/profile/sessions/{bad}", cookies=analyst_cookie, follow_redirects=False) assert r.status_code == 404, f"Expected 404 for {bad!r}, got {r.status_code}" # Unauthenticated → never the file r = web_client.get("/profile/sessions/anything.jsonl", follow_redirects=False) assert r.status_code in (302, 401, 403) def test_profile_sessions_page_tolerates_stat_failures(self, web_client, analyst_cookie, tmp_path, monkeypatch): """Devin Review on d878764a: a transient stat() failure on one file must not 500 the whole page. Skip the bad row, render the rest.""" import pathlib user_sessions = tmp_path / "user_sessions" / "analyst1" user_sessions.mkdir(parents=True) good = user_sessions / "good.jsonl" good.write_text('{"event": "ok"}\n') bad = user_sessions / "bad.jsonl" bad.write_text('{"event": "stat-explodes"}\n') monkeypatch.setenv("DATA_DIR", str(tmp_path)) # Make `bad.jsonl`.stat() raise; `good.jsonl`.stat() works. real_stat = pathlib.Path.stat def selective_stat(self, *args, **kwargs): if self.name == "bad.jsonl": raise PermissionError("simulated stat failure") return real_stat(self, *args, **kwargs) monkeypatch.setattr(pathlib.Path, "stat", selective_stat) r = web_client.get("/profile/sessions", cookies=analyst_cookie, follow_redirects=False) assert r.status_code == 200 assert b"good.jsonl" in r.content assert b"bad.jsonl" not in r.content def test_profile_session_download_returns_file_for_owner(self, web_client, analyst_cookie, tmp_path, monkeypatch): """Authenticated owner can fetch their own jsonl with proper Content-Disposition.""" # The seeded analyst is "analyst1" (per conftest.seeded_app). user_sessions = tmp_path / "user_sessions" / "analyst1" user_sessions.mkdir(parents=True) sample = user_sessions / "abc-123.jsonl" sample.write_text('{"event": "test"}\n') monkeypatch.setenv("DATA_DIR", str(tmp_path)) r = web_client.get("/profile/sessions/abc-123.jsonl", cookies=analyst_cookie, follow_redirects=False) assert r.status_code == 200 assert r.headers.get("content-disposition", "").endswith('filename="abc-123.jsonl"') assert b'"event": "test"' in r.content class TestUnauthenticatedHtmlRedirects: def test_dashboard_unauthenticated_redirects_to_login(self, web_client): resp = web_client.get("/dashboard", follow_redirects=False) assert resp.status_code == 302 assert resp.headers["location"].startswith("/login") assert "next=%2Fdashboard" in resp.headers["location"] def test_catalog_unauthenticated_redirects_to_login(self, web_client): resp = web_client.get("/catalog", follow_redirects=False) assert resp.status_code == 302 assert resp.headers["location"].startswith("/login") assert "next=%2Fcatalog" in resp.headers["location"] def test_api_route_still_returns_json_401(self, web_client): # /api/sync/manifest requires auth; must keep JSON 401 (no redirect). resp = web_client.get("/api/sync/manifest", follow_redirects=False) assert resp.status_code == 401 assert resp.headers["content-type"].startswith("application/json") def test_password_login_honors_next(self, web_client, tmp_path): from argon2 import PasswordHasher from src.db import get_system_db from src.repositories.users import UserRepository password = "TestPass1!" conn = get_system_db() UserRepository(conn).create( id="u1", email="u1@test.com", name="U1", password_hash=PasswordHasher().hash(password), ) conn.close() resp = web_client.post( "/auth/password/login/web", data={"email": "u1@test.com", "password": password, "next": "/catalog"}, follow_redirects=False, ) assert resp.status_code == 302 assert resp.headers["location"] == "/catalog" def test_password_login_rejects_open_redirect(self, web_client, tmp_path): from argon2 import PasswordHasher from src.db import get_system_db from src.repositories.users import UserRepository password = "TestPass1!" conn = get_system_db() UserRepository(conn).create( id="u2", email="u2@test.com", name="U2", password_hash=PasswordHasher().hash(password), ) conn.close() resp = web_client.post( "/auth/password/login/web", data={"email": "u2@test.com", "password": password, "next": "//evil.example/"}, follow_redirects=False, ) assert resp.status_code == 302 assert resp.headers["location"] == "/dashboard" @pytest.mark.parametrize("hostile_next,expected_location", [ ("javascript:alert(1)", "/dashboard"), ("http://evil.example/", "/dashboard"), ("//evil.example/", "/dashboard"), ("dashboard", "/dashboard"), # missing leading slash ("/foo?bar=baz", "/foo?bar=baz"), # valid same-origin with query ]) def test_password_login_sanitizes_next(self, web_client, tmp_path, hostile_next, expected_location): from argon2 import PasswordHasher from src.db import get_system_db from src.repositories.users import UserRepository import uuid password = "TestPass1!" uid = f"u-{uuid.uuid4().hex[:8]}" conn = get_system_db() UserRepository(conn).create( id=uid, email=f"{uid}@test.com", name=uid, password_hash=PasswordHasher().hash(password), ) conn.close() resp = web_client.post( "/auth/password/login/web", data={"email": f"{uid}@test.com", "password": password, "next": hostile_next}, follow_redirects=False, ) assert resp.status_code == 302 assert resp.headers["location"] == expected_location def test_non_api_post_still_returns_json_401(self, web_client): # POST to a JSON auth endpoint that lives outside /api/ — must NOT be redirected. resp = web_client.post("/auth/token", json={"email": "nope@x.com", "password": "wrong"}, follow_redirects=False) assert resp.status_code == 401 assert resp.headers["content-type"].startswith("application/json") def test_auth_json_get_still_returns_json_401(self, web_client): # GET to a JSON endpoint under /auth/* (e.g. PAT CRUD) — must NOT be redirected, # so CLI clients calling api_get("/auth/tokens") get JSON they can parse. resp = web_client.get("/auth/tokens", follow_redirects=False) assert resp.status_code == 401 assert resp.headers["content-type"].startswith("application/json") def test_login_page_propagates_next_to_password_button(self, web_client): resp = web_client.get("/login?next=/catalog") assert resp.status_code == 200 body = resp.text # Password button URL should carry next. assert "/login/password?next=%2Fcatalog" in body, \ f"Expected /login/password?next=%2Fcatalog in login page HTML; got snippet: {body[:500]}" def test_login_page_propagates_next_to_google_button(self, web_client, monkeypatch): """The Google OAuth button URL must also carry the ?next param so the post-login redirect honors the requested destination.""" # Force Google provider to appear available so the button is rendered. monkeypatch.setattr( "app.auth.providers.google.is_available", lambda: True, ) resp = web_client.get("/login?next=/catalog") assert resp.status_code == 200 body = resp.text assert "/auth/google/login?next=%2Fcatalog" in body, \ f"Expected google login URL with ?next in login page; snippet: {body[:800]}" def test_login_email_page_extracts_and_renders_next(self, web_client): """/login/email (magic link) must extract ?next from the URL and emit it into the hidden form field so it round-trips to the POST.""" resp = web_client.get("/login/email?next=/catalog") assert resp.status_code == 200 body = resp.text # The template renders assert 'name="next" value="/catalog"' in body, \ f"Expected /catalog in next hidden field; snippet: {body[:800]}" def test_login_email_page_rejects_open_redirect_in_next(self, web_client): """Hostile ?next values (e.g. //evil) must be sanitized away before the hidden field is rendered.""" resp = web_client.get("/login/email?next=//evil.example/") assert resp.status_code == 200 body = resp.text assert "evil.example" not in body # Empty string is the sanitized default. assert 'name="next" value=""' in body def test_google_login_stashes_safe_next_in_session(self, web_client, monkeypatch): """google_login() must stash the sanitized next_path in the session. We can't exercise the full OAuth flow without a Google mock, but we can verify the helper applies the sanitizer correctly.""" from app.auth._common import safe_next_path # Valid same-origin paths pass through. assert safe_next_path("/catalog") == "/catalog" assert safe_next_path("/foo?bar=baz") == "/foo?bar=baz" # Open-redirect shapes get defaulted. assert safe_next_path("//evil.example/") == "/dashboard" assert safe_next_path("http://evil.example/") == "/dashboard" assert safe_next_path("javascript:alert(1)") == "/dashboard" assert safe_next_path("") == "/dashboard" assert safe_next_path(None) == "/dashboard" # Empty-default variant (used when computing query string). assert safe_next_path(None, default="") == ""