211 lines
7.3 KiB
Python
211 lines
7.3 KiB
Python
"""Tests for PAT scope + ttl_seconds fields (clean-analyst-bootstrap spec).
|
|
|
|
Behaviors covered:
|
|
- bootstrap-analyst scope force-clamps TTL to <= 1h regardless of request
|
|
- ttl_seconds wins over expires_in_days when both are set
|
|
- ttl_seconds beats expires_in_days even when both passed in same request
|
|
- expires_in_days remains the fallback when ttl_seconds is omitted
|
|
- ttl_seconds upper bound (10y in seconds) rejects with 400
|
|
- ttl_seconds <= 0 rejects with 400
|
|
- scope defaults to "general" when omitted (strict: claim must be present)
|
|
- audit log row for token.create records the scope param
|
|
- create_access_token rejects reserved keys in extra_claims
|
|
|
|
The spec calls for a `web_session` cookie-authenticated fixture sourced from
|
|
`tests/fixtures/analyst_bootstrap.py` (Task 20). Until that lands, these
|
|
tests use a local Bearer-session client built the same way the existing
|
|
`test_pat.py` suite does — same auth surface, same `require_session_token`
|
|
dependency, just a different transport for the session credential.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import tempfile
|
|
import uuid
|
|
|
|
import jwt as _jwt
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(monkeypatch):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
monkeypatch.setenv("DATA_DIR", tmp)
|
|
monkeypatch.setenv("TESTING", "1")
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
|
|
yield tmp
|
|
|
|
|
|
@pytest.fixture
|
|
def web_session(fresh_db):
|
|
"""TestClient authenticated as an admin user via a Bearer session JWT.
|
|
|
|
Mirrors the spec's `web_session` fixture surface: returns a TestClient
|
|
that carries authenticated session credentials on every request, so
|
|
test bodies can call `web_session.post("/auth/tokens", json=...)`
|
|
without per-call header plumbing.
|
|
"""
|
|
from app.auth.jwt import create_access_token
|
|
from app.main import app
|
|
from src.db import close_system_db, get_system_db
|
|
from src.repositories.users import UserRepository
|
|
from tests.helpers.auth import grant_admin
|
|
|
|
conn = get_system_db()
|
|
try:
|
|
uid = str(uuid.uuid4())
|
|
UserRepository(conn).create(id=uid, email="admin@example.com", name="Admin")
|
|
grant_admin(conn, uid)
|
|
sess_token = create_access_token(user_id=uid, email="admin@example.com")
|
|
finally:
|
|
conn.close()
|
|
close_system_db()
|
|
|
|
client = TestClient(app)
|
|
client.headers.update({"Authorization": f"Bearer {sess_token}"})
|
|
return client
|
|
|
|
|
|
def _decode(pat: str) -> dict:
|
|
return _jwt.decode(pat, options={"verify_signature": False})
|
|
|
|
|
|
def test_bootstrap_pat_ttl_clamped_to_one_hour(web_session):
|
|
resp = web_session.post(
|
|
"/auth/tokens",
|
|
json={
|
|
"name": "init",
|
|
"scope": "bootstrap-analyst",
|
|
"ttl_seconds": 86400, # 1 day — must be ignored, clamped to 3600
|
|
},
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
payload = _decode(resp.json()["token"])
|
|
assert payload.get("scope") == "bootstrap-analyst"
|
|
assert payload["exp"] - payload["iat"] <= 3600 + 5
|
|
|
|
|
|
def test_general_pat_uses_ttl_seconds_when_set(web_session):
|
|
resp = web_session.post(
|
|
"/auth/tokens",
|
|
json={"name": "test", "ttl_seconds": 7200},
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
payload = _decode(resp.json()["token"])
|
|
assert payload["exp"] - payload["iat"] <= 7200 + 5
|
|
|
|
|
|
def test_general_pat_falls_back_to_expires_in_days(web_session):
|
|
resp = web_session.post(
|
|
"/auth/tokens",
|
|
json={"name": "test", "expires_in_days": 30},
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
payload = _decode(resp.json()["token"])
|
|
assert payload["exp"] - payload["iat"] <= 30 * 86400 + 5
|
|
|
|
|
|
def test_ttl_seconds_upper_bound(web_session):
|
|
# 3650 days * 86400 = 315_360_000 seconds. One past this must reject.
|
|
resp = web_session.post(
|
|
"/auth/tokens",
|
|
json={"name": "test", "ttl_seconds": 315_360_001},
|
|
)
|
|
assert resp.status_code == 400, resp.text
|
|
|
|
|
|
def test_ttl_seconds_must_be_positive(web_session):
|
|
resp = web_session.post(
|
|
"/auth/tokens",
|
|
json={"name": "test", "ttl_seconds": 0},
|
|
)
|
|
assert resp.status_code == 400, resp.text
|
|
|
|
|
|
def test_scope_default_is_general(web_session):
|
|
resp = web_session.post("/auth/tokens", json={"name": "test"})
|
|
assert resp.status_code == 201, resp.text
|
|
payload = _decode(resp.json()["token"])
|
|
# Strict assertion: the claim MUST be present. The previous
|
|
# `payload.get("scope", "general") == "general"` form would silently
|
|
# pass even if the route stopped passing extra_claims={"scope": ...}
|
|
# to create_access_token entirely — which is exactly the regression
|
|
# we want to catch.
|
|
assert payload["scope"] == "general"
|
|
|
|
|
|
def test_ttl_seconds_wins_when_both_set(web_session):
|
|
"""Spec: ttl_seconds wins over expires_in_days when both are present."""
|
|
resp = web_session.post(
|
|
"/auth/tokens",
|
|
json={
|
|
"name": "test",
|
|
"ttl_seconds": 7200, # 2 hours
|
|
"expires_in_days": 30, # 30 days — must be ignored
|
|
},
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
payload = _decode(resp.json()["token"])
|
|
delta = payload["exp"] - payload["iat"]
|
|
# ttl_seconds (~7200) wins, NOT expires_in_days (~2_592_000)
|
|
assert delta <= 7200 + 5
|
|
assert delta < 30 * 86400
|
|
|
|
|
|
def test_audit_log_includes_scope(web_session, fresh_db):
|
|
"""Audit log row for token creation must record the scope param."""
|
|
import json
|
|
|
|
from src.db import close_system_db, get_system_db
|
|
|
|
resp = web_session.post(
|
|
"/auth/tokens",
|
|
json={"name": "audit-test", "scope": "bootstrap-analyst"},
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
|
|
# Read the most recent token-creation audit row directly — same pattern
|
|
# as tests/test_pat.py (token.first_use_new_ip audit assertions).
|
|
conn = get_system_db()
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT params FROM audit_log WHERE action = 'token.create' "
|
|
"ORDER BY timestamp DESC LIMIT 1"
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
close_system_db()
|
|
|
|
assert rows, "no audit row found for token.create"
|
|
raw_params = rows[0][0]
|
|
params = json.loads(raw_params) if isinstance(raw_params, str) else raw_params
|
|
assert params.get("scope") == "bootstrap-analyst"
|
|
|
|
|
|
def test_create_access_token_rejects_reserved_extra_claims():
|
|
"""extra_claims must not override reserved JWT identity claims."""
|
|
import os
|
|
|
|
import jwt as pyjwt
|
|
|
|
from app.auth.jwt import create_access_token
|
|
|
|
# create_access_token reads JWT_SECRET_KEY from env when TESTING=1.
|
|
# Set both so the call works outside the web_session fixture too.
|
|
os.environ.setdefault("TESTING", "1")
|
|
os.environ.setdefault("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
|
|
|
|
token = create_access_token(
|
|
user_id="real-user",
|
|
email="real@example.com",
|
|
extra_claims={
|
|
"sub": "evil-user", # reserved — must be ignored
|
|
"email": "evil@example.com", # reserved — must be ignored
|
|
"scope": "custom-scope", # not reserved — must land
|
|
},
|
|
)
|
|
decoded = pyjwt.decode(token, options={"verify_signature": False})
|
|
assert decoded["sub"] == "real-user"
|
|
assert decoded["email"] == "real@example.com"
|
|
assert decoded["scope"] == "custom-scope"
|