* Setup-prompt + bootstrap fixes from David's 2026-05-10 init report Three issues from clean-machine bootstrap evidence: 1. `agnes refresh-marketplace --bootstrap` failed to recover when the local clone existed but Claude Code's marketplace registry had lost the `agnes` entry. Bootstrap path now parses `claude plugin marketplace list`, re-runs `claude plugin marketplace add ~/.agnes/marketplace` when missing, and treats `add` failures as fatal (was warn-and-continue, root cause of the cascade into "Marketplace 'agnes' not found" plugin install errors). 2. Setup prompt now always emits the marketplace-registration block, even when the operator has zero plugin grants. Pre-wires the SessionStart hook so future admin grants land automatically without re-running setup. Block copy adapts: empty list shows "no plugins granted yet", populated list shows "install plugins". 3. Setup prompt registers the Atlassian Remote MCP server unattended (`claude mcp add --transport sse atlassian https://mcp.atlassian.com/v1/sse`). Hosted Remote MCP, OAuth handled automatically by Claude Code on first use. Asana / GWS stay on the /home connector cards (PAT/keychain flows don't fit unattended bootstrap). Confirm step nudges the user toward the /home connector cards for the PAT-flow services. CLAUDE.md template renames the marketplace section to "Agnes Marketplace" and documents that all plugins are addressed as `<plugin>@agnes` regardless of upstream slug. Layout: Confirm shifts from step 6/8 to step 9 across all variants (preflight, marketplace, MCP all unconditional). Tests updated. * Link Claude license options from /home install pane Step-1 Claude install on /home pointed users to OAuth without explaining what to do if they don't have a Pro/Max subscription. Add a one-line follow-up link to the plan-tier section on /setup-advanced (new `#claude-plan` anchor) so first-time users discover the subscription tiers rather than bouncing on the OAuth screen. * Add idempotent + no-TLS-bypass guardrails to /home connector prompts The Asana / Google Workspace / Atlassian connector prompts on /home already shipped a precheck step that short-circuits when the service is already wired, but they didn't carry the same idempotency + surface-errors-verbatim + don't-disable-TLS-verification guardrails the bash bootstrap prompt has. Add a one-paragraph 'Ground rules' block at the top of each prompt so a connector failure doesn't tempt the model into bypass workarounds, matching the same posture David's 2026-05-10 init report flagged for the bash flow. * skip Source: lines in marketplace registry detector `claude plugin marketplace list` prints a `Source: <local path>` line under each registered marketplace; the local clone almost always lives under a path containing the marketplace name itself (`~/.agnes/marketplace`). A naive \\bagnes\\b match over the full stdout therefore false-positives whenever ANY unrelated marketplace sits under `~/.agnes-…/` or similar. Filter Source: lines out before matching so the recovery path actually re-adds when needed instead of silently falling through to a broken `marketplace update agnes`. Adds regression test covering the substring-only case. * drop customer-specific tokens from CHANGELOG entries Per CLAUDE.md vendor-agnostic OSS rule ("nothing customer-specific ... in changelogs"): - "agnes-vrysanek.groupondev.com" -> "a private-CA Agnes deployment" - "Groupon Marketplace / groupon-marketplace" -> "<Org> Marketplace / <org>-marketplace" (placeholder example) - Removed "David flagged" attribution language; init-report context stays intact, just stripped of the named host + brand --------- Co-authored-by: ZdenekSrotyr <zdenek.srotyr@keboola.com>
587 lines
28 KiB
Python
587 lines
28 KiB
Python
"""Smoke tests for web UI pages."""
|
|
import os
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture
|
|
def web_client(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("TESTING", "1")
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-key-min-32-characters!!")
|
|
(tmp_path / "state").mkdir()
|
|
(tmp_path / "analytics").mkdir()
|
|
(tmp_path / "extracts").mkdir()
|
|
# Reset global DuckDB singleton to pick up new DATA_DIR
|
|
from src.db import close_system_db
|
|
close_system_db()
|
|
from app.main import create_app
|
|
app = create_app()
|
|
yield TestClient(app)
|
|
close_system_db()
|
|
|
|
|
|
@pytest.fixture
|
|
def admin_cookie(web_client, tmp_path, monkeypatch):
|
|
from argon2 import PasswordHasher
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
from tests.helpers.auth import grant_admin
|
|
password = "AdminPass1!"
|
|
password_hash = PasswordHasher().hash(password)
|
|
conn = get_system_db()
|
|
UserRepository(conn).create(
|
|
id="admin1", email="admin@test.com", name="Admin",
|
|
password_hash=password_hash,
|
|
)
|
|
grant_admin(conn, "admin1")
|
|
conn.close()
|
|
resp = web_client.post("/auth/token", json={"email": "admin@test.com", "password": password})
|
|
assert resp.status_code == 200, f"Bootstrap failed: {resp.text}"
|
|
token = resp.json()["access_token"]
|
|
return {"access_token": token}
|
|
|
|
|
|
@pytest.fixture
|
|
def analyst_cookie(web_client, tmp_path, monkeypatch):
|
|
from argon2 import PasswordHasher
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
password = "AnalystPass1!"
|
|
password_hash = PasswordHasher().hash(password)
|
|
conn = get_system_db()
|
|
UserRepository(conn).create(
|
|
id="analyst1", email="analyst@test.com", name="Analyst",
|
|
password_hash=password_hash,
|
|
)
|
|
conn.close()
|
|
resp = web_client.post("/auth/token", json={"email": "analyst@test.com", "password": password})
|
|
assert resp.status_code == 200, f"Analyst token failed: {resp.text}"
|
|
token = resp.json()["access_token"]
|
|
return {"access_token": token}
|
|
|
|
|
|
class TestWebUISmoke:
|
|
def test_login_page(self, web_client):
|
|
resp = web_client.get("/login")
|
|
assert resp.status_code == 200
|
|
|
|
def test_dashboard(self, web_client, admin_cookie):
|
|
resp = web_client.get("/dashboard", cookies=admin_cookie)
|
|
assert resp.status_code in (200, 302)
|
|
|
|
def test_catalog(self, web_client, admin_cookie):
|
|
resp = web_client.get("/catalog", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
|
|
def test_corporate_memory(self, web_client, admin_cookie):
|
|
resp = web_client.get("/corporate-memory", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
|
|
def test_activity_center(self, web_client, admin_cookie):
|
|
resp = web_client.get("/activity-center", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
|
|
def test_admin_tables(self, web_client, admin_cookie):
|
|
resp = web_client.get("/admin/tables", cookies=admin_cookie)
|
|
if resp.status_code == 404:
|
|
pytest.skip("Route /admin/tables does not exist")
|
|
assert resp.status_code == 200
|
|
|
|
def test_admin_permissions_route_removed(self, web_client, admin_cookie):
|
|
"""v19 dropped the half-shipped /admin/permissions page (replaced by
|
|
the unified /admin/access page). Verify the route is gone."""
|
|
resp = web_client.get("/admin/permissions", cookies=admin_cookie)
|
|
assert resp.status_code == 404
|
|
|
|
def test_admin_users_renders_modern_ui(self, web_client, admin_cookie):
|
|
resp = web_client.get("/admin/users", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
# Shared header chrome
|
|
assert "app-header" in body
|
|
# Nav: "My tokens" (own) is in the user-menu dropdown; admin Tokens
|
|
# entry (and Tables, Users, Groups, Resource access, Server config)
|
|
# lives in the Admin dropdown.
|
|
assert 'href="/tokens"' in body
|
|
assert 'href="/admin/tokens"' in body
|
|
assert 'href="/profile"' in body
|
|
assert 'href="/admin/users"' in body
|
|
# v12 modern UI markers — Role column was replaced by Groups chips,
|
|
# so role-pill is gone. Confirm-modal pattern is shared by both.
|
|
assert 'class="users-page"' in body
|
|
assert 'id="confirm-modal"' in body
|
|
|
|
def test_nav_shows_tokens_link_for_non_admin(self, web_client, analyst_cookie):
|
|
"""Non-admins see 'My tokens' + 'Profile' user-menu links — no admin Tokens entry."""
|
|
resp = web_client.get("/dashboard", cookies=analyst_cookie)
|
|
assert resp.status_code in (200, 302)
|
|
if resp.status_code == 302:
|
|
# Dashboard may redirect in some flows; follow it for nav check.
|
|
resp = web_client.get(resp.headers["location"], cookies=analyst_cookie)
|
|
body = resp.text
|
|
assert 'href="/tokens"' in body
|
|
assert 'href="/profile"' in body
|
|
assert ">My tokens<" in body
|
|
assert ">Profile<" in body
|
|
# Non-admins must NOT see the admin Tokens link inside the Admin dropdown.
|
|
assert 'href="/admin/tokens"' not in body
|
|
|
|
def test_nav_shows_all_tokens_link_for_admin(self, web_client, admin_cookie):
|
|
"""Admins see the 'My tokens' user-menu link and the admin Tokens entry inside the Admin dropdown."""
|
|
resp = web_client.get("/dashboard", cookies=admin_cookie)
|
|
assert resp.status_code in (200, 302)
|
|
if resp.status_code == 302:
|
|
resp = web_client.get(resp.headers["location"], cookies=admin_cookie)
|
|
body = resp.text
|
|
assert 'href="/tokens"' in body
|
|
assert 'href="/admin/tokens"' in body
|
|
assert ">My tokens<" in body
|
|
# Admin dropdown now lists Tables / Tokens / Users / Groups / Resource access / Server config.
|
|
assert 'href="/admin/tables"' in body
|
|
assert ">Tables<" in body
|
|
assert ">Tokens<" in body
|
|
|
|
def test_profile_renders_account_details(self, web_client, admin_cookie):
|
|
"""/profile renders a real profile page with email + tokens link.
|
|
|
|
v12 changes: role-pill is replaced by an Admin-pill driven by Admin
|
|
user_group membership; ``session.google_groups`` is gone (the
|
|
OAuth callback writes Workspace memberships into
|
|
``user_group_members`` instead), so the "No Google groups available"
|
|
empty state is no longer rendered.
|
|
"""
|
|
resp = web_client.get("/profile", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
assert "admin@test.com" in body
|
|
assert 'href="/tokens"' in body
|
|
|
|
def test_profile_requires_auth(self, web_client):
|
|
"""/profile requires auth (was a 302 back-compat redirect before)."""
|
|
resp = web_client.get("/profile", follow_redirects=False)
|
|
# Auth dep raises 401; some configs may redirect to /login — accept either.
|
|
assert resp.status_code in (401, 302)
|
|
|
|
@pytest.mark.skip(
|
|
reason=(
|
|
"v12: /profile no longer renders an admin-self-management link. "
|
|
"Admin can navigate to /admin/users/{id} from the top-nav Admin "
|
|
"dropdown directly. Drop or rewrite this test once the profile "
|
|
"page settles."
|
|
)
|
|
)
|
|
def test_profile_shows_admin_detail_link_for_admin(self, web_client, admin_cookie):
|
|
resp = web_client.get("/profile", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
assert 'href="/admin/users/admin1"' in resp.text
|
|
|
|
@pytest.mark.skip(
|
|
reason=(
|
|
"v12: profile page no longer surfaces /admin/users/* link at all, "
|
|
"so the negative-assertion is moot. Header chrome unrelated to "
|
|
"the profile body now contains the admin dropdown."
|
|
)
|
|
)
|
|
def test_profile_hides_admin_detail_link_for_non_admin(self, web_client, analyst_cookie):
|
|
resp = web_client.get("/profile", cookies=analyst_cookie)
|
|
assert resp.status_code == 200
|
|
assert "/admin/users/" not in resp.text
|
|
|
|
@pytest.mark.skip(
|
|
reason=(
|
|
"v12: the four-level core.viewer/analyst/km_admin/admin hierarchy "
|
|
"is gone. Profile now shows group memberships (user_group_members) "
|
|
"and effective resource access (resource_grants), not internal "
|
|
"role keys. Rewrite against the new sections — see "
|
|
"templates/profile.html."
|
|
)
|
|
)
|
|
def test_profile_shows_effective_roles_for_non_admin(self, web_client, analyst_cookie):
|
|
resp = web_client.get("/profile", cookies=analyst_cookie)
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
assert "Effective roles" in body
|
|
assert "core.analyst" in body
|
|
assert "core.viewer" in body
|
|
assert "Direct grants" in body
|
|
|
|
|
|
class TestClaudeSetupPreview:
|
|
"""/install and /dashboard render a visible, read-only preview of the
|
|
'Setup a new Claude Code' clipboard payload. The real token is never
|
|
rendered into the HTML — only a styled placeholder is.
|
|
"""
|
|
|
|
def test_install_preview_visible_for_signed_in_user(self, web_client, admin_cookie):
|
|
# /setup is now a single unified flow regardless of caller's role.
|
|
# Admin sees the same layout as everyone else; the marketplace
|
|
# block appears iff the caller has plugin grants in
|
|
# `resource_grants` (the seeded admin in this fixture has none).
|
|
resp = web_client.get("/setup", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
# Preview card + placeholder token render
|
|
assert "setup-preview-pre" in body
|
|
assert "What Claude Code will receive" in body
|
|
assert "<will be generated on click>" in body
|
|
assert 'class="placeholder-token"' in body
|
|
# Setup payload text substituted with real server URL. The wheel URL
|
|
# must be under /cli/wheel/ (uv tool install rejects a bare .whl alias
|
|
# because it validates the PEP 427 filename in the URL before fetch).
|
|
assert "/cli/wheel/" in body
|
|
assert "/cli/agnes.whl" not in body
|
|
# Unified always-on layout (Fix B + Fix C in 2026-05-10 init-report
|
|
# response): preflight + marketplace + Atlassian MCP all unconditional.
|
|
# Step 1 install, step 4 preflight, step 5 marketplace, step 6 MCP,
|
|
# step 7 diagnose.
|
|
assert "1) Install the CLI" in body
|
|
assert "7) Run diagnostics" in body
|
|
assert "agnes diagnose" in body
|
|
# `agnes init` is now the mandatory bootstrap step.
|
|
assert "agnes init" in body
|
|
# The generated /setup prompt's "Log in" / "Verify the login"
|
|
# admin-only headers are gone (agnes init subsumes them).
|
|
# `agnes auth whoami` survives as a static manual-install
|
|
# example elsewhere on the page (not in the generated prompt).
|
|
assert "2) Log in" not in body
|
|
assert "3) Verify the login" not in body
|
|
|
|
def test_install_preview_unified_layout(self, web_client, admin_cookie):
|
|
"""The clipboard payload (SETUP_INSTRUCTIONS_TEMPLATE JS array)
|
|
carries the unified layout for every caller — admin-vs-analyst
|
|
is no longer a layout branch. Marketplace + Atlassian MCP blocks
|
|
are always emitted (Fix B + Fix C in 2026-05-10 init-report
|
|
response): the user-facing one-liner is `agnes refresh-marketplace
|
|
--bootstrap` (the literal `claude plugin marketplace add` shows up
|
|
only as a documentation comment listing what the binary does
|
|
internally, never as an instruction to run by hand)."""
|
|
import re
|
|
resp = web_client.get("/setup", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
match = re.search(
|
|
r"var\s+SETUP_INSTRUCTIONS_TEMPLATE\s*=\s*\[(.*?)\]\.join\(",
|
|
body, re.DOTALL,
|
|
)
|
|
assert match, "SETUP_INSTRUCTIONS_TEMPLATE array missing"
|
|
clipboard = match.group(1)
|
|
assert "agnes init" in clipboard
|
|
# User runs the bootstrap one-liner, not raw `claude plugin
|
|
# marketplace add` — the latter is an internal step described in a
|
|
# comment block, never an action line to run.
|
|
assert "agnes refresh-marketplace --bootstrap" in clipboard
|
|
# Atlassian MCP registration is always-on now.
|
|
assert "claude mcp add --transport sse atlassian" in clipboard
|
|
# Legacy admin-only auth verbs are gone from the generated prompt.
|
|
assert "agnes auth import-token" not in clipboard
|
|
# `agnes auth whoami` was the old admin step 3; subsumed by
|
|
# `agnes init` + `agnes catalog` smoke verify.
|
|
assert "3) Verify the login" not in clipboard
|
|
assert "2) Log in" not in clipboard
|
|
|
|
def test_dashboard_setup_cta_links_to_setup(self, web_client, admin_cookie):
|
|
"""Dashboard setup CTA shows env-setup-cta and a link to /setup instead
|
|
of an inline collapsed preview."""
|
|
resp = web_client.get("/dashboard", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
assert "env-setup-cta" in body
|
|
assert "Open the full setup page" in body
|
|
assert 'href="/setup"' in body
|
|
# inline <details> preview block must no longer appear
|
|
assert 'aria-label="Preview of the clipboard payload"' not in body
|
|
|
|
def test_install_mcp_card_removed(self, web_client):
|
|
"""The stale 'Use with Claude Code / MCP' card on /setup has been
|
|
removed — there is no Agnes-as-MCP-server today. The Atlassian
|
|
MCP server registration step (Fix C in the 2026-05-10 init-report
|
|
response) is registered FROM the setup script, not as a /setup-
|
|
page card; that's an unrelated wiring direction.
|
|
"""
|
|
resp = web_client.get("/setup")
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
assert "Use with Claude Code / MCP" not in body
|
|
|
|
|
|
class TestAdminRoleGuards:
|
|
def test_analyst_cannot_access_admin_tables(self, web_client, admin_cookie, analyst_cookie):
|
|
resp = web_client.get("/admin/tables", cookies=analyst_cookie)
|
|
assert resp.status_code == 403
|
|
|
|
def test_admin_can_access_admin_tables(self, web_client, admin_cookie):
|
|
resp = web_client.get("/admin/tables", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
|
|
def test_analyst_cannot_access_admin_access_page(self, web_client, analyst_cookie):
|
|
"""The unified /admin/access page replaces the dropped
|
|
/admin/permissions page. Non-admin must still be blocked."""
|
|
resp = web_client.get("/admin/access", cookies=analyst_cookie)
|
|
assert resp.status_code == 403
|
|
|
|
def test_admin_can_access_admin_access_page(self, web_client, admin_cookie):
|
|
resp = web_client.get("/admin/access", cookies=admin_cookie)
|
|
assert resp.status_code == 200
|
|
|
|
def test_analyst_cannot_access_corporate_memory_admin(self, web_client, admin_cookie, analyst_cookie):
|
|
resp = web_client.get("/corporate-memory/admin", cookies=analyst_cookie)
|
|
assert resp.status_code == 403
|
|
|
|
def test_admin_agent_prompt_page_admin_only(self, web_client, admin_cookie, analyst_cookie):
|
|
"""The renamed Agent Setup Prompt page is gated by require_admin."""
|
|
# Unauthenticated → 302 redirect to login
|
|
r = web_client.get("/admin/agent-prompt", follow_redirects=False)
|
|
assert r.status_code in (302, 401, 403)
|
|
# Non-admin → 403
|
|
r = web_client.get("/admin/agent-prompt", cookies=analyst_cookie, follow_redirects=False)
|
|
assert r.status_code == 403
|
|
# Admin → 200
|
|
r = web_client.get("/admin/agent-prompt", cookies=admin_cookie, follow_redirects=False)
|
|
assert r.status_code == 200
|
|
|
|
def test_admin_scheduler_runs_page_admin_only(self, web_client, admin_cookie, analyst_cookie):
|
|
"""The /admin/scheduler-runs read-only audit-log view is gated by require_admin."""
|
|
r = web_client.get("/admin/scheduler-runs", follow_redirects=False)
|
|
assert r.status_code in (302, 401, 403)
|
|
r = web_client.get("/admin/scheduler-runs", cookies=analyst_cookie, follow_redirects=False)
|
|
assert r.status_code == 403
|
|
r = web_client.get("/admin/scheduler-runs", cookies=admin_cookie, follow_redirects=False)
|
|
assert r.status_code == 200
|
|
assert b"run_session_collector" in r.content
|
|
# Post-refactor: per-processor audit actions instead of one
|
|
# run_verification_detector. Both processors are wired in
|
|
# SCHEDULER_AUDIT_ACTIONS.
|
|
assert b"run_session_processor:verification" in r.content
|
|
assert b"run_session_processor:usage" in r.content
|
|
assert b"run_corporate_memory" in r.content
|
|
# Devin Review on e86dd5ed: list must use the actual logged action
|
|
# string, not a guess.
|
|
assert b"marketplace.sync_all" in r.content
|
|
|
|
def test_profile_sessions_page_no_admin_required(self, web_client, analyst_cookie, admin_cookie):
|
|
"""The /profile/sessions page is gated by get_current_user, not require_admin —
|
|
every authenticated user views their own sessions."""
|
|
r = web_client.get("/profile/sessions", follow_redirects=False)
|
|
assert r.status_code in (302, 401, 403)
|
|
r = web_client.get("/profile/sessions", cookies=analyst_cookie, follow_redirects=False)
|
|
assert r.status_code == 200
|
|
assert b"My sessions" in r.content
|
|
r = web_client.get("/profile/sessions", cookies=admin_cookie, follow_redirects=False)
|
|
assert r.status_code == 200
|
|
|
|
def test_profile_session_download_path_safety(self, web_client, analyst_cookie):
|
|
"""Per-session download endpoint must reject any filename that could
|
|
escape the user's own session directory."""
|
|
# NB: bare ".." is excluded — httpx normalises the URL to
|
|
# /profile/sessions before sending, so it never reaches the
|
|
# download handler. The %2F-encoded variant exercises the real
|
|
# path-component value that does reach the handler.
|
|
for bad in ["../etc/passwd", "subdir/file.jsonl", ".env",
|
|
"session.jsonl.bak", "..%2Fetc%2Fpasswd"]:
|
|
r = web_client.get(f"/profile/sessions/{bad}", cookies=analyst_cookie, follow_redirects=False)
|
|
assert r.status_code == 404, f"Expected 404 for {bad!r}, got {r.status_code}"
|
|
# Unauthenticated → never the file
|
|
r = web_client.get("/profile/sessions/anything.jsonl", follow_redirects=False)
|
|
assert r.status_code in (302, 401, 403)
|
|
|
|
def test_profile_sessions_page_tolerates_stat_failures(self, web_client, analyst_cookie, tmp_path, monkeypatch):
|
|
"""Devin Review on d878764a: a transient stat() failure on one file
|
|
must not 500 the whole page. Skip the bad row, render the rest."""
|
|
import pathlib
|
|
user_sessions = tmp_path / "user_sessions" / "analyst1"
|
|
user_sessions.mkdir(parents=True)
|
|
good = user_sessions / "good.jsonl"
|
|
good.write_text('{"event": "ok"}\n')
|
|
bad = user_sessions / "bad.jsonl"
|
|
bad.write_text('{"event": "stat-explodes"}\n')
|
|
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
|
|
# Make `bad.jsonl`.stat() raise; `good.jsonl`.stat() works.
|
|
real_stat = pathlib.Path.stat
|
|
|
|
def selective_stat(self, *args, **kwargs):
|
|
if self.name == "bad.jsonl":
|
|
raise PermissionError("simulated stat failure")
|
|
return real_stat(self, *args, **kwargs)
|
|
|
|
monkeypatch.setattr(pathlib.Path, "stat", selective_stat)
|
|
|
|
r = web_client.get("/profile/sessions", cookies=analyst_cookie, follow_redirects=False)
|
|
assert r.status_code == 200
|
|
assert b"good.jsonl" in r.content
|
|
assert b"bad.jsonl" not in r.content
|
|
|
|
def test_profile_session_download_returns_file_for_owner(self, web_client, analyst_cookie, tmp_path, monkeypatch):
|
|
"""Authenticated owner can fetch their own jsonl with proper Content-Disposition."""
|
|
# The seeded analyst is "analyst1" (per conftest.seeded_app).
|
|
user_sessions = tmp_path / "user_sessions" / "analyst1"
|
|
user_sessions.mkdir(parents=True)
|
|
sample = user_sessions / "abc-123.jsonl"
|
|
sample.write_text('{"event": "test"}\n')
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
|
|
r = web_client.get("/profile/sessions/abc-123.jsonl", cookies=analyst_cookie, follow_redirects=False)
|
|
assert r.status_code == 200
|
|
assert r.headers.get("content-disposition", "").endswith('filename="abc-123.jsonl"')
|
|
assert b'"event": "test"' in r.content
|
|
|
|
|
|
class TestUnauthenticatedHtmlRedirects:
|
|
def test_dashboard_unauthenticated_redirects_to_login(self, web_client):
|
|
resp = web_client.get("/dashboard", follow_redirects=False)
|
|
assert resp.status_code == 302
|
|
assert resp.headers["location"].startswith("/login")
|
|
assert "next=%2Fdashboard" in resp.headers["location"]
|
|
|
|
def test_catalog_unauthenticated_redirects_to_login(self, web_client):
|
|
resp = web_client.get("/catalog", follow_redirects=False)
|
|
assert resp.status_code == 302
|
|
assert resp.headers["location"].startswith("/login")
|
|
assert "next=%2Fcatalog" in resp.headers["location"]
|
|
|
|
def test_api_route_still_returns_json_401(self, web_client):
|
|
# /api/sync/manifest requires auth; must keep JSON 401 (no redirect).
|
|
resp = web_client.get("/api/sync/manifest", follow_redirects=False)
|
|
assert resp.status_code == 401
|
|
assert resp.headers["content-type"].startswith("application/json")
|
|
|
|
def test_password_login_honors_next(self, web_client, tmp_path):
|
|
from argon2 import PasswordHasher
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
password = "TestPass1!"
|
|
conn = get_system_db()
|
|
UserRepository(conn).create(
|
|
id="u1", email="u1@test.com", name="U1",
|
|
password_hash=PasswordHasher().hash(password),
|
|
)
|
|
conn.close()
|
|
resp = web_client.post(
|
|
"/auth/password/login/web",
|
|
data={"email": "u1@test.com", "password": password, "next": "/catalog"},
|
|
follow_redirects=False,
|
|
)
|
|
assert resp.status_code == 302
|
|
assert resp.headers["location"] == "/catalog"
|
|
|
|
def test_password_login_rejects_open_redirect(self, web_client, tmp_path):
|
|
from argon2 import PasswordHasher
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
password = "TestPass1!"
|
|
conn = get_system_db()
|
|
UserRepository(conn).create(
|
|
id="u2", email="u2@test.com", name="U2",
|
|
password_hash=PasswordHasher().hash(password),
|
|
)
|
|
conn.close()
|
|
resp = web_client.post(
|
|
"/auth/password/login/web",
|
|
data={"email": "u2@test.com", "password": password, "next": "//evil.example/"},
|
|
follow_redirects=False,
|
|
)
|
|
assert resp.status_code == 302
|
|
assert resp.headers["location"] == "/dashboard"
|
|
|
|
@pytest.mark.parametrize("hostile_next,expected_location", [
|
|
("javascript:alert(1)", "/dashboard"),
|
|
("http://evil.example/", "/dashboard"),
|
|
("//evil.example/", "/dashboard"),
|
|
("dashboard", "/dashboard"), # missing leading slash
|
|
("/foo?bar=baz", "/foo?bar=baz"), # valid same-origin with query
|
|
])
|
|
def test_password_login_sanitizes_next(self, web_client, tmp_path, hostile_next, expected_location):
|
|
from argon2 import PasswordHasher
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
import uuid
|
|
password = "TestPass1!"
|
|
uid = f"u-{uuid.uuid4().hex[:8]}"
|
|
conn = get_system_db()
|
|
UserRepository(conn).create(
|
|
id=uid, email=f"{uid}@test.com", name=uid,
|
|
password_hash=PasswordHasher().hash(password),
|
|
)
|
|
conn.close()
|
|
resp = web_client.post(
|
|
"/auth/password/login/web",
|
|
data={"email": f"{uid}@test.com", "password": password, "next": hostile_next},
|
|
follow_redirects=False,
|
|
)
|
|
assert resp.status_code == 302
|
|
assert resp.headers["location"] == expected_location
|
|
|
|
def test_non_api_post_still_returns_json_401(self, web_client):
|
|
# POST to a JSON auth endpoint that lives outside /api/ — must NOT be redirected.
|
|
resp = web_client.post("/auth/token", json={"email": "nope@x.com", "password": "wrong"},
|
|
follow_redirects=False)
|
|
assert resp.status_code == 401
|
|
assert resp.headers["content-type"].startswith("application/json")
|
|
|
|
def test_auth_json_get_still_returns_json_401(self, web_client):
|
|
# GET to a JSON endpoint under /auth/* (e.g. PAT CRUD) — must NOT be redirected,
|
|
# so CLI clients calling api_get("/auth/tokens") get JSON they can parse.
|
|
resp = web_client.get("/auth/tokens", follow_redirects=False)
|
|
assert resp.status_code == 401
|
|
assert resp.headers["content-type"].startswith("application/json")
|
|
|
|
def test_login_page_propagates_next_to_password_button(self, web_client):
|
|
resp = web_client.get("/login?next=/catalog")
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
# Password button URL should carry next.
|
|
assert "/login/password?next=%2Fcatalog" in body, \
|
|
f"Expected /login/password?next=%2Fcatalog in login page HTML; got snippet: {body[:500]}"
|
|
|
|
def test_login_page_propagates_next_to_google_button(self, web_client, monkeypatch):
|
|
"""The Google OAuth button URL must also carry the ?next param so the
|
|
post-login redirect honors the requested destination."""
|
|
# Force Google provider to appear available so the button is rendered.
|
|
monkeypatch.setattr(
|
|
"app.auth.providers.google.is_available", lambda: True,
|
|
)
|
|
resp = web_client.get("/login?next=/catalog")
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
assert "/auth/google/login?next=%2Fcatalog" in body, \
|
|
f"Expected google login URL with ?next in login page; snippet: {body[:800]}"
|
|
|
|
def test_login_email_page_extracts_and_renders_next(self, web_client):
|
|
"""/login/email (magic link) must extract ?next from the URL and
|
|
emit it into the hidden form field so it round-trips to the POST."""
|
|
resp = web_client.get("/login/email?next=/catalog")
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
# The template renders <input type="hidden" name="next" value="/catalog">
|
|
assert 'name="next" value="/catalog"' in body, \
|
|
f"Expected /catalog in next hidden field; snippet: {body[:800]}"
|
|
|
|
def test_login_email_page_rejects_open_redirect_in_next(self, web_client):
|
|
"""Hostile ?next values (e.g. //evil) must be sanitized away before
|
|
the hidden field is rendered."""
|
|
resp = web_client.get("/login/email?next=//evil.example/")
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
assert "evil.example" not in body
|
|
# Empty string is the sanitized default.
|
|
assert 'name="next" value=""' in body
|
|
|
|
def test_google_login_stashes_safe_next_in_session(self, web_client, monkeypatch):
|
|
"""google_login() must stash the sanitized next_path in the session.
|
|
|
|
We can't exercise the full OAuth flow without a Google mock, but we
|
|
can verify the helper applies the sanitizer correctly."""
|
|
from app.auth._common import safe_next_path
|
|
# Valid same-origin paths pass through.
|
|
assert safe_next_path("/catalog") == "/catalog"
|
|
assert safe_next_path("/foo?bar=baz") == "/foo?bar=baz"
|
|
# Open-redirect shapes get defaulted.
|
|
assert safe_next_path("//evil.example/") == "/dashboard"
|
|
assert safe_next_path("http://evil.example/") == "/dashboard"
|
|
assert safe_next_path("javascript:alert(1)") == "/dashboard"
|
|
assert safe_next_path("") == "/dashboard"
|
|
assert safe_next_path(None) == "/dashboard"
|
|
# Empty-default variant (used when computing query string).
|
|
assert safe_next_path(None, default="") == ""
|