feat(tokens): add scope + ttl_seconds fields with bootstrap-analyst clamp

This commit is contained in:
ZdenekSrotyr 2026-05-04 17:00:54 +02:00
parent 8fbf4c7873
commit 4ee7323436
3 changed files with 180 additions and 10 deletions

View file

@ -23,6 +23,14 @@ admin_router = APIRouter(prefix="/auth/admin/tokens", tags=["tokens-admin"])
class CreateTokenRequest(BaseModel): class CreateTokenRequest(BaseModel):
name: str name: str
expires_in_days: Optional[int] = 90 # null = no expiry expires_in_days: Optional[int] = 90 # null = no expiry
# Informational tag carried in the JWT (`scope` claim) and the audit log.
# The special value "bootstrap-analyst" force-clamps the resolved TTL to
# at most 1 hour regardless of the requested lifetime, so the bootstrap
# PAT can't be repurposed as a long-lived credential.
scope: str = "general"
# If set, wins over expires_in_days. Mirrors the same 10-year cap as
# expires_in_days (3650 days * 86400 = 315_360_000 seconds).
ttl_seconds: Optional[int] = None
class CreateTokenResponse(BaseModel): class CreateTokenResponse(BaseModel):
@ -99,22 +107,44 @@ async def create_token(
# expires_at on the datetime object). # expires_at on the datetime object).
if payload.expires_in_days is not None and payload.expires_in_days > 3650: if payload.expires_in_days is not None and payload.expires_in_days > 3650:
raise HTTPException(status_code=400, detail="expires_in_days must not exceed 3650 (10 years)") raise HTTPException(status_code=400, detail="expires_in_days must not exceed 3650 (10 years)")
if payload.ttl_seconds is not None and payload.ttl_seconds <= 0:
raise HTTPException(status_code=400, detail="ttl_seconds must be a positive integer")
# Mirror the 10-year cap: 3650 days * 86400 s/day = 315_360_000 seconds.
if payload.ttl_seconds is not None and payload.ttl_seconds > 315_360_000:
raise HTTPException(status_code=400, detail="ttl_seconds must not exceed 315360000 (10 years)")
# Resolve TTL: ttl_seconds wins; fall back to expires_in_days; else "no expiry".
expires_delta: Optional[timedelta] = None
omit_exp = False
if payload.ttl_seconds is not None:
expires_delta = timedelta(seconds=payload.ttl_seconds)
elif payload.expires_in_days is not None:
expires_delta = timedelta(days=payload.expires_in_days)
else:
omit_exp = True # "no expiry" — DB stores expires_at=NULL and the JWT
# carries no `exp` claim. The authoritative expiry check lives in
# app/auth/dependencies.py (via the DB row).
# Force-clamp bootstrap-analyst PATs to <= 1 h regardless of request, so
# an init-time PAT can't be repurposed as a long-lived credential.
if payload.scope == "bootstrap-analyst":
ONE_HOUR = timedelta(hours=1)
if expires_delta is None or expires_delta > ONE_HOUR:
expires_delta = ONE_HOUR
omit_exp = False
expires_at: Optional[datetime] = None
if expires_delta is not None:
expires_at = datetime.now(timezone.utc) + expires_delta
repo = AccessTokenRepository(conn) repo = AccessTokenRepository(conn)
token_id = str(uuid.uuid4()) token_id = str(uuid.uuid4())
expires_at: Optional[datetime] = None
expires_delta: Optional[timedelta] = None
omit_exp = payload.expires_in_days is None
if payload.expires_in_days is not None:
expires_delta = timedelta(days=payload.expires_in_days)
expires_at = datetime.now(timezone.utc) + expires_delta
# else: "no expiry" — DB stores expires_at=NULL and the JWT carries no
# `exp` claim. The authoritative expiry check lives in
# app/auth/dependencies.py (via the DB row).
# Build the JWT that embeds jti=token_id and typ=pat # Build the JWT that embeds jti=token_id and typ=pat
jwt_token = create_access_token( jwt_token = create_access_token(
user_id=user["id"], email=user["email"], user_id=user["id"], email=user["email"],
token_id=token_id, typ="pat", token_id=token_id, typ="pat",
expires_delta=expires_delta, omit_exp=omit_exp, expires_delta=expires_delta, omit_exp=omit_exp,
extra_claims={"scope": payload.scope},
) )
# Prefix: first 8 chars of the jti (UUID) — uniquely identifies the token in UI # Prefix: first 8 chars of the jti (UUID) — uniquely identifies the token in UI
# without exposing JWT headers (which all start with "eyJhbGci…" and are useless # without exposing JWT headers (which all start with "eyJhbGci…" and are useless
@ -126,7 +156,8 @@ async def create_token(
id=token_id, user_id=user["id"], name=payload.name.strip(), id=token_id, user_id=user["id"], name=payload.name.strip(),
token_hash=token_hash, prefix=prefix, expires_at=expires_at, token_hash=token_hash, prefix=prefix, expires_at=expires_at,
) )
_audit(conn, user["id"], "token.create", token_id, {"name": payload.name}) _audit(conn, user["id"], "token.create", token_id,
{"name": payload.name, "scope": payload.scope})
return CreateTokenResponse( return CreateTokenResponse(
id=token_id, name=payload.name.strip(), prefix=prefix, id=token_id, name=payload.name.strip(), prefix=prefix,
token=jwt_token, # returned EXACTLY ONCE; never retrievable again token=jwt_token, # returned EXACTLY ONCE; never retrievable again

View file

@ -50,6 +50,7 @@ def create_access_token(
token_id: Optional[str] = None, token_id: Optional[str] = None,
typ: str = "session", typ: str = "session",
omit_exp: bool = False, omit_exp: bool = False,
extra_claims: Optional[dict] = None,
) -> str: ) -> str:
"""Create a JWT. `typ` is "session" (interactive login) or "pat" (long-lived). """Create a JWT. `typ` is "session" (interactive login) or "pat" (long-lived).
@ -58,6 +59,11 @@ def create_access_token(
`personal_access_tokens.expires_at`, and a claim-less JWT avoids the `personal_access_tokens.expires_at`, and a claim-less JWT avoids the
misleading ~100y horizon that previously pretended to be "never". misleading ~100y horizon that previously pretended to be "never".
`extra_claims` merges arbitrary key/value pairs into the JWT payload
after the reserved identity/metadata claims. Reserved keys (sub, email,
typ, iat, jti, exp) are protected they cannot be overridden by the
caller.
No ``role`` claim authorization is derived from No ``role`` claim authorization is derived from
``user_group_members`` at request time via ``app.auth.access.is_user_admin``. ``user_group_members`` at request time via ``app.auth.access.is_user_admin``.
The JWT carries only identity (``sub``, ``email``) and token metadata. The JWT carries only identity (``sub``, ``email``) and token metadata.
@ -74,6 +80,12 @@ def create_access_token(
expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
) )
payload["exp"] = expire payload["exp"] = expire
if extra_claims:
_reserved = {"sub", "email", "typ", "iat", "jti", "exp"}
for k, v in extra_claims.items():
if k in _reserved:
continue
payload[k] = v
return jwt.encode(payload, _get_cached_secret_key(), algorithm=ALGORITHM) return jwt.encode(payload, _get_cached_secret_key(), algorithm=ALGORITHM)

View file

@ -0,0 +1,127 @@
"""Tests for PAT scope + ttl_seconds fields (clean-analyst-bootstrap spec).
Six behaviors covered:
- bootstrap-analyst scope force-clamps TTL to <= 1h regardless of request
- ttl_seconds wins over expires_in_days when both are set
- expires_in_days remains the fallback when ttl_seconds is omitted
- ttl_seconds upper bound (10y in seconds) rejects with 400
- ttl_seconds <= 0 rejects with 400
- scope defaults to "general" when omitted
The spec calls for a `web_session` cookie-authenticated fixture sourced from
`tests/fixtures/analyst_bootstrap.py` (Task 20). Until that lands, these
tests use a local Bearer-session client built the same way the existing
`test_pat.py` suite does same auth surface, same `require_session_token`
dependency, just a different transport for the session credential.
"""
from __future__ import annotations
import tempfile
import uuid
import jwt as _jwt
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def fresh_db(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
monkeypatch.setenv("TESTING", "1")
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
yield tmp
@pytest.fixture
def web_session(fresh_db):
"""TestClient authenticated as an admin user via a Bearer session JWT.
Mirrors the spec's `web_session` fixture surface: returns a TestClient
that carries authenticated session credentials on every request, so
test bodies can call `web_session.post("/auth/tokens", json=...)`
without per-call header plumbing.
"""
from app.auth.jwt import create_access_token
from app.main import app
from src.db import close_system_db, get_system_db
from src.repositories.users import UserRepository
from tests.helpers.auth import grant_admin
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="admin@example.com", name="Admin")
grant_admin(conn, uid)
sess_token = create_access_token(user_id=uid, email="admin@example.com")
finally:
conn.close()
close_system_db()
client = TestClient(app)
client.headers.update({"Authorization": f"Bearer {sess_token}"})
return client
def _decode(pat: str) -> dict:
return _jwt.decode(pat, options={"verify_signature": False})
def test_bootstrap_pat_ttl_clamped_to_one_hour(web_session):
resp = web_session.post(
"/auth/tokens",
json={
"name": "init",
"scope": "bootstrap-analyst",
"ttl_seconds": 86400, # 1 day — must be ignored, clamped to 3600
},
)
assert resp.status_code == 201, resp.text
payload = _decode(resp.json()["token"])
assert payload.get("scope") == "bootstrap-analyst"
assert payload["exp"] - payload["iat"] <= 3600 + 5
def test_general_pat_uses_ttl_seconds_when_set(web_session):
resp = web_session.post(
"/auth/tokens",
json={"name": "test", "ttl_seconds": 7200},
)
assert resp.status_code == 201, resp.text
payload = _decode(resp.json()["token"])
assert payload["exp"] - payload["iat"] <= 7200 + 5
def test_general_pat_falls_back_to_expires_in_days(web_session):
resp = web_session.post(
"/auth/tokens",
json={"name": "test", "expires_in_days": 30},
)
assert resp.status_code == 201, resp.text
payload = _decode(resp.json()["token"])
assert payload["exp"] - payload["iat"] <= 30 * 86400 + 5
def test_ttl_seconds_upper_bound(web_session):
# 3650 days * 86400 = 315_360_000 seconds. One past this must reject.
resp = web_session.post(
"/auth/tokens",
json={"name": "test", "ttl_seconds": 315_360_001},
)
assert resp.status_code == 400, resp.text
def test_ttl_seconds_must_be_positive(web_session):
resp = web_session.post(
"/auth/tokens",
json={"name": "test", "ttl_seconds": 0},
)
assert resp.status_code == 400, resp.text
def test_scope_default_is_general(web_session):
resp = web_session.post("/auth/tokens", json={"name": "test"})
assert resp.status_code == 201, resp.text
payload = _decode(resp.json()["token"])
assert payload.get("scope", "general") == "general"