diff --git a/app/api/tokens.py b/app/api/tokens.py index 4d7e51f..0cbb63b 100644 --- a/app/api/tokens.py +++ b/app/api/tokens.py @@ -23,6 +23,14 @@ admin_router = APIRouter(prefix="/auth/admin/tokens", tags=["tokens-admin"]) class CreateTokenRequest(BaseModel): name: str expires_in_days: Optional[int] = 90 # null = no expiry + # Informational tag carried in the JWT (`scope` claim) and the audit log. + # The special value "bootstrap-analyst" force-clamps the resolved TTL to + # at most 1 hour regardless of the requested lifetime, so the bootstrap + # PAT can't be repurposed as a long-lived credential. + scope: str = "general" + # If set, wins over expires_in_days. Mirrors the same 10-year cap as + # expires_in_days (3650 days * 86400 = 315_360_000 seconds). + ttl_seconds: Optional[int] = None class CreateTokenResponse(BaseModel): @@ -99,22 +107,44 @@ async def create_token( # expires_at on the datetime object). if payload.expires_in_days is not None and payload.expires_in_days > 3650: raise HTTPException(status_code=400, detail="expires_in_days must not exceed 3650 (10 years)") + if payload.ttl_seconds is not None and payload.ttl_seconds <= 0: + raise HTTPException(status_code=400, detail="ttl_seconds must be a positive integer") + # Mirror the 10-year cap: 3650 days * 86400 s/day = 315_360_000 seconds. + if payload.ttl_seconds is not None and payload.ttl_seconds > 315_360_000: + raise HTTPException(status_code=400, detail="ttl_seconds must not exceed 315360000 (10 years)") + + # Resolve TTL: ttl_seconds wins; fall back to expires_in_days; else "no expiry". + expires_delta: Optional[timedelta] = None + omit_exp = False + if payload.ttl_seconds is not None: + expires_delta = timedelta(seconds=payload.ttl_seconds) + elif payload.expires_in_days is not None: + expires_delta = timedelta(days=payload.expires_in_days) + else: + omit_exp = True # "no expiry" — DB stores expires_at=NULL and the JWT + # carries no `exp` claim. The authoritative expiry check lives in + # app/auth/dependencies.py (via the DB row). + + # Force-clamp bootstrap-analyst PATs to <= 1 h regardless of request, so + # an init-time PAT can't be repurposed as a long-lived credential. + if payload.scope == "bootstrap-analyst": + ONE_HOUR = timedelta(hours=1) + if expires_delta is None or expires_delta > ONE_HOUR: + expires_delta = ONE_HOUR + omit_exp = False + + expires_at: Optional[datetime] = None + if expires_delta is not None: + expires_at = datetime.now(timezone.utc) + expires_delta + repo = AccessTokenRepository(conn) token_id = str(uuid.uuid4()) - expires_at: Optional[datetime] = None - expires_delta: Optional[timedelta] = None - omit_exp = payload.expires_in_days is None - if payload.expires_in_days is not None: - expires_delta = timedelta(days=payload.expires_in_days) - expires_at = datetime.now(timezone.utc) + expires_delta - # else: "no expiry" — DB stores expires_at=NULL and the JWT carries no - # `exp` claim. The authoritative expiry check lives in - # app/auth/dependencies.py (via the DB row). # Build the JWT that embeds jti=token_id and typ=pat jwt_token = create_access_token( user_id=user["id"], email=user["email"], token_id=token_id, typ="pat", expires_delta=expires_delta, omit_exp=omit_exp, + extra_claims={"scope": payload.scope}, ) # Prefix: first 8 chars of the jti (UUID) — uniquely identifies the token in UI # without exposing JWT headers (which all start with "eyJhbGci…" and are useless @@ -126,7 +156,8 @@ async def create_token( id=token_id, user_id=user["id"], name=payload.name.strip(), token_hash=token_hash, prefix=prefix, expires_at=expires_at, ) - _audit(conn, user["id"], "token.create", token_id, {"name": payload.name}) + _audit(conn, user["id"], "token.create", token_id, + {"name": payload.name, "scope": payload.scope}) return CreateTokenResponse( id=token_id, name=payload.name.strip(), prefix=prefix, token=jwt_token, # returned EXACTLY ONCE; never retrievable again diff --git a/app/auth/jwt.py b/app/auth/jwt.py index 152445f..9edf6f4 100644 --- a/app/auth/jwt.py +++ b/app/auth/jwt.py @@ -50,6 +50,7 @@ def create_access_token( token_id: Optional[str] = None, typ: str = "session", omit_exp: bool = False, + extra_claims: Optional[dict] = None, ) -> str: """Create a JWT. `typ` is "session" (interactive login) or "pat" (long-lived). @@ -58,6 +59,11 @@ def create_access_token( `personal_access_tokens.expires_at`, and a claim-less JWT avoids the misleading ~100y horizon that previously pretended to be "never". + `extra_claims` merges arbitrary key/value pairs into the JWT payload + after the reserved identity/metadata claims. Reserved keys (sub, email, + typ, iat, jti, exp) are protected — they cannot be overridden by the + caller. + No ``role`` claim — authorization is derived from ``user_group_members`` at request time via ``app.auth.access.is_user_admin``. The JWT carries only identity (``sub``, ``email``) and token metadata. @@ -74,6 +80,12 @@ def create_access_token( expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) ) payload["exp"] = expire + if extra_claims: + _reserved = {"sub", "email", "typ", "iat", "jti", "exp"} + for k, v in extra_claims.items(): + if k in _reserved: + continue + payload[k] = v return jwt.encode(payload, _get_cached_secret_key(), algorithm=ALGORITHM) diff --git a/tests/test_tokens_bootstrap_scope.py b/tests/test_tokens_bootstrap_scope.py new file mode 100644 index 0000000..173aad3 --- /dev/null +++ b/tests/test_tokens_bootstrap_scope.py @@ -0,0 +1,127 @@ +"""Tests for PAT scope + ttl_seconds fields (clean-analyst-bootstrap spec). + +Six behaviors covered: +- bootstrap-analyst scope force-clamps TTL to <= 1h regardless of request +- ttl_seconds wins over expires_in_days when both are set +- expires_in_days remains the fallback when ttl_seconds is omitted +- ttl_seconds upper bound (10y in seconds) rejects with 400 +- ttl_seconds <= 0 rejects with 400 +- scope defaults to "general" when omitted + +The spec calls for a `web_session` cookie-authenticated fixture sourced from +`tests/fixtures/analyst_bootstrap.py` (Task 20). Until that lands, these +tests use a local Bearer-session client built the same way the existing +`test_pat.py` suite does — same auth surface, same `require_session_token` +dependency, just a different transport for the session credential. +""" + +from __future__ import annotations + +import tempfile +import uuid + +import jwt as _jwt +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def fresh_db(monkeypatch): + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setenv("TESTING", "1") + monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!") + yield tmp + + +@pytest.fixture +def web_session(fresh_db): + """TestClient authenticated as an admin user via a Bearer session JWT. + + Mirrors the spec's `web_session` fixture surface: returns a TestClient + that carries authenticated session credentials on every request, so + test bodies can call `web_session.post("/auth/tokens", json=...)` + without per-call header plumbing. + """ + from app.auth.jwt import create_access_token + from app.main import app + from src.db import close_system_db, get_system_db + from src.repositories.users import UserRepository + from tests.helpers.auth import grant_admin + + conn = get_system_db() + try: + uid = str(uuid.uuid4()) + UserRepository(conn).create(id=uid, email="admin@example.com", name="Admin") + grant_admin(conn, uid) + sess_token = create_access_token(user_id=uid, email="admin@example.com") + finally: + conn.close() + close_system_db() + + client = TestClient(app) + client.headers.update({"Authorization": f"Bearer {sess_token}"}) + return client + + +def _decode(pat: str) -> dict: + return _jwt.decode(pat, options={"verify_signature": False}) + + +def test_bootstrap_pat_ttl_clamped_to_one_hour(web_session): + resp = web_session.post( + "/auth/tokens", + json={ + "name": "init", + "scope": "bootstrap-analyst", + "ttl_seconds": 86400, # 1 day — must be ignored, clamped to 3600 + }, + ) + assert resp.status_code == 201, resp.text + payload = _decode(resp.json()["token"]) + assert payload.get("scope") == "bootstrap-analyst" + assert payload["exp"] - payload["iat"] <= 3600 + 5 + + +def test_general_pat_uses_ttl_seconds_when_set(web_session): + resp = web_session.post( + "/auth/tokens", + json={"name": "test", "ttl_seconds": 7200}, + ) + assert resp.status_code == 201, resp.text + payload = _decode(resp.json()["token"]) + assert payload["exp"] - payload["iat"] <= 7200 + 5 + + +def test_general_pat_falls_back_to_expires_in_days(web_session): + resp = web_session.post( + "/auth/tokens", + json={"name": "test", "expires_in_days": 30}, + ) + assert resp.status_code == 201, resp.text + payload = _decode(resp.json()["token"]) + assert payload["exp"] - payload["iat"] <= 30 * 86400 + 5 + + +def test_ttl_seconds_upper_bound(web_session): + # 3650 days * 86400 = 315_360_000 seconds. One past this must reject. + resp = web_session.post( + "/auth/tokens", + json={"name": "test", "ttl_seconds": 315_360_001}, + ) + assert resp.status_code == 400, resp.text + + +def test_ttl_seconds_must_be_positive(web_session): + resp = web_session.post( + "/auth/tokens", + json={"name": "test", "ttl_seconds": 0}, + ) + assert resp.status_code == 400, resp.text + + +def test_scope_default_is_general(web_session): + resp = web_session.post("/auth/tokens", json={"name": "test"}) + assert resp.status_code == 201, resp.text + payload = _decode(resp.json()["token"]) + assert payload.get("scope", "general") == "general"