agnes-the-ai-analyst/tests/test_pat.py
ZdenekSrotyr e9d7af3cce feat(rbac+marketplace): RBAC v13 + Claude Code marketplace + #81/#83/#44 hardening
This squashes 13 commits from ma/staging plus a small docstring translation
into a single coherent unit. Three workstreams.

== RBAC v13 redesign ==
- Drops core.viewer/analyst/km_admin/admin hierarchy and the
  internal_roles / group_mappings / user_role_grants / plugin_access tables.
- Replaced by user_group_members + resource_grants. Atomic v12→v13 backfill
  wrapped in BEGIN/COMMIT; ROLLBACK leaves schema_version at 12 for retry.
- Two authorization primitives in app.auth.access:
    require_admin                        — Admin-group god-mode
    require_resource_access(rt, "{path}") — entity-scoped grants
  Single DB lookup per request; no session cache; no implies BFS.
- /admin/access UI (single page) replaces /admin/role-mapping +
  /admin/plugin-access. CLI `da admin group/grant *` replaces
  `da admin role/mapping/grant-role/revoke-role/effective-roles`.
- ResourceType.TABLE listing-only — admins can record table grants,
  runtime enforcement still flows through legacy dataset_permissions
  (migration plan in docs/TODO-rbac-data-enforcement.md).

== Claude Code marketplace ==
- Aggregated /marketplace.zip + /marketplace.git/* (PAT-gated,
  RBAC-filtered, content-addressed cache via dulwich).
- Admin god-mode dropped on the marketplace surface — admins curate
  their own view via grants like everyone else.
- Bare-repo cache materializes per RBAC-filtered ETag; stale entries
  not pruned in this iteration (disclaimed in git_backend.py docstring).

== #81 #83 #44 security/ops hardening ==
- #81 Group A — orchestrator ATTACH allow-listing (extension/url/alias).
- #81 Group B — Keboola extractor 3-state exit codes:
    0 success / 1 total fail / 2 PARTIAL fail
  Sync API logs PARTIAL FAILURE alert on exit 2. Operators with binary
  alerting must teach it the new partial signal.
- #81 Group C — schema v10 view_ownership; rejects silent overwrite
  of a prior connector's view name on collision.
- #81 Group D — extractor-side identifier validation.
- #83 — Jira webhook fail-closed when JIRA_WEBHOOK_SECRET unset
  + path-traversal fix.
- #44 — entire /api/scripts/* surface is admin-only (planted-script +
  sandbox-bypass risk closed).

== Web UI polish + deploy fix ==
- /admin/access: live grant-count badges (no stale snapshot revert),
  shared-header CSS link added to /catalog and /admin/{tables,permissions},
  per-resource-type colored stripes.
- docker-compose.host-mount.yml: bind,rbind so dual-disk hosts don't
  silently shadow sub-mounts and write state to the wrong disk.

== OSS vendor-neutralization (waves 1+2) ==
- scripts/grpn/ → scripts/ops/. Customer-specific identifiers
  (project IDs, internal hostnames, dev/prod VM IPs, brand names)
  replaced with placeholders across code, docs, Terraform, Caddyfile,
  OAuth probe, and planning docs. Downstream infra repos that copied
  scripts/grpn/agnes-tls-rotate.sh or agnes-auto-upgrade.sh must
  update the path.

== Translation ==
- src/repositories/user_groups.py::ensure_system docstring translated
  from Czech to English for codebase consistency.

Co-authored-by: Mina Rustamyan <mina@keboola.com>
2026-04-28 14:25:04 +02:00

701 lines
25 KiB
Python

"""Tests for #12 — personal access tokens (PAT)."""
import os
import tempfile
import pytest
@pytest.fixture
def fresh_db(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
monkeypatch.setenv("TESTING", "1")
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
yield tmp
def test_schema_v6_creates_pat_table(fresh_db):
from src.db import get_system_db, get_schema_version, close_system_db
conn = get_system_db()
try:
cols = conn.execute("PRAGMA table_info(personal_access_tokens)").fetchall()
col_names = [c[1] for c in cols]
for expected in ("id", "user_id", "name", "token_hash", "prefix",
"scopes", "created_at", "expires_at", "last_used_at", "revoked_at"):
assert expected in col_names
assert get_schema_version(conn) >= 6
finally:
conn.close()
close_system_db()
def test_schema_v7_adds_last_used_ip_column(fresh_db):
"""Schema v7: personal_access_tokens has last_used_ip column."""
from src.db import get_system_db, get_schema_version, close_system_db
conn = get_system_db()
try:
cols = conn.execute("PRAGMA table_info(personal_access_tokens)").fetchall()
col_names = [c[1] for c in cols]
assert "last_used_ip" in col_names
assert get_schema_version(conn) >= 7
finally:
conn.close()
close_system_db()
def test_access_token_repo_create_and_lookup(fresh_db):
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.access_tokens import AccessTokenRepository
conn = get_system_db()
try:
repo = AccessTokenRepository(conn)
token_id = str(uuid.uuid4())
raw = "abcdefgh" + "x" * 32
repo.create(
id=token_id,
user_id="u1",
name="laptop",
token_hash=hashlib.sha256(raw.encode()).hexdigest(),
prefix=raw[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
row = repo.get_by_id(token_id)
assert row is not None
assert row["name"] == "laptop"
assert row["prefix"] == "abcdefgh"
assert row["revoked_at"] is None
rows = repo.list_for_user("u1")
assert len(rows) == 1
repo.revoke(token_id)
assert repo.get_by_id(token_id)["revoked_at"] is not None
finally:
conn.close()
close_system_db()
def test_access_token_repo_mark_used(fresh_db):
import hashlib, uuid
from datetime import datetime, timezone
from src.db import get_system_db, close_system_db
from src.repositories.access_tokens import AccessTokenRepository
conn = get_system_db()
try:
repo = AccessTokenRepository(conn)
tid = str(uuid.uuid4())
repo.create(id=tid, user_id="u1", name="x",
token_hash=hashlib.sha256(b"r").hexdigest(), prefix="rrrrrrrr")
assert repo.get_by_id(tid)["last_used_at"] is None
repo.mark_used(tid)
assert repo.get_by_id(tid)["last_used_at"] is not None
finally:
conn.close()
close_system_db()
def test_pat_token_carries_typ_claim(fresh_db):
from app.auth.jwt import create_access_token, verify_token
token = create_access_token(
user_id="u1", email="u@test", role="analyst",
token_id="deadbeef-1234", typ="pat",
)
payload = verify_token(token)
assert payload["typ"] == "pat"
assert payload["jti"] == "deadbeef-1234"
def test_session_token_defaults_typ(fresh_db):
from app.auth.jwt import create_access_token, verify_token
token = create_access_token(user_id="u1", email="u@test", role="analyst")
payload = verify_token(token)
# Default typ is "session".
assert payload.get("typ") == "session"
def test_revoked_pat_is_rejected(fresh_db, monkeypatch):
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
token_id = str(uuid.uuid4())
raw = "secretXX" + "a" * 32
AccessTokenRepository(conn).create(
id=token_id, user_id=uid, name="ci",
token_hash=hashlib.sha256(raw.encode()).hexdigest(),
prefix=raw[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=30),
)
jwt_token = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=token_id, typ="pat",
)
# Revoke
AccessTokenRepository(conn).revoke(token_id)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={"Authorization": f"Bearer {jwt_token}", "Accept": "application/json"},
)
assert resp.status_code == 401
def test_expired_pat_is_rejected_from_db(fresh_db):
"""A PAT with a past expires_at in DB is rejected even if JWT exp is in future."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
tid = str(uuid.uuid4())
# Past-dated expiry in DB
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="stale",
token_hash=hashlib.sha256(b"whatever").hexdigest(), prefix=tid.replace("-","")[:8],
expires_at=datetime.now(timezone.utc) - timedelta(days=1),
)
# JWT with much longer TTL so signature-level `exp` would pass
pat = create_access_token(
user_id=uid, email="u@t", role="admin",
token_id=tid, typ="pat",
expires_delta=timedelta(days=365),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={"Authorization": f"Bearer {pat}", "Accept": "application/json"},
)
assert resp.status_code == 401
def test_create_pat_returns_raw_once(fresh_db):
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
sess_token = create_access_token(user_id=uid, email="u@t", role="admin") # typ=session
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "laptop", "expires_in_days": 30},
)
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "laptop"
assert "token" in data and data["token"] # raw token returned exactly once
# Listing returns prefix, never raw.
# Prefix is derived from the token id (jti), not the JWT string, to avoid
# all tokens having the useless "eyJhbGci" JWT-header prefix.
list_resp = client.get(
"/auth/tokens", headers={"Authorization": f"Bearer {sess_token}"},
)
assert list_resp.status_code == 200
rows = list_resp.json()
assert len(rows) == 1
assert "token" not in rows[0]
assert rows[0]["prefix"] == data["prefix"]
assert len(rows[0]["prefix"]) == 8
assert not data["prefix"].startswith("eyJ") # regression: not the JWT header
def test_pat_cannot_create_pat(fresh_db):
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
tid = str(uuid.uuid4())
# Create the JWT first so we can store its sha256 as token_hash (otherwise
# the defense-in-depth check in get_current_user would reject it with 401
# before require_session_token ever runs).
pat = create_access_token(user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat")
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="x",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {pat}"},
json={"name": "bad", "expires_in_days": 30},
)
assert resp.status_code == 403
# NOTE: test_profile_page_redirects_to_tokens removed — /profile no longer
# redirects to /tokens; it renders a real profile page including Google
# Workspace groups (cherry-pick of Zdeněk's 4f7e4cd). The /tokens render
# checks (My tokens title, new-token-btn) survive in the test_admin_tokens_ui
# suite.
def test_pat_first_use_from_new_ip_audits(fresh_db):
"""Using a PAT from a different IP than last time emits an audit entry."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
repo = AccessTokenRepository(conn)
repo.create(
id=tid, user_id=uid, name="ci",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
# Simulate a prior use from 1.1.1.1 so the upcoming call is a "new IP".
repo.mark_used(tid, ip="1.1.1.1")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={
"Authorization": f"Bearer {pat}",
"Accept": "application/json",
"X-Forwarded-For": "2.2.2.2",
},
)
assert resp.status_code == 200, resp.text
conn = get_system_db()
try:
rows = conn.execute(
"SELECT params FROM audit_log WHERE action = 'token.first_use_new_ip' AND user_id = ?",
[uid],
).fetchall()
assert len(rows) == 1, f"expected 1 audit row, got {len(rows)}"
params = rows[0][0]
# params is stored as JSON text; check the IP appears
assert "2.2.2.2" in str(params)
finally:
conn.close()
close_system_db()
def test_pat_same_ip_does_not_audit(fresh_db):
"""Using a PAT from the same IP as last time does NOT emit an audit entry."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
repo = AccessTokenRepository(conn)
repo.create(
id=tid, user_id=uid, name="ci",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
repo.mark_used(tid, ip="3.3.3.3")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={
"Authorization": f"Bearer {pat}",
"Accept": "application/json",
"X-Forwarded-For": "3.3.3.3",
},
)
assert resp.status_code == 200, resp.text
conn = get_system_db()
try:
count = conn.execute(
"SELECT COUNT(*) FROM audit_log WHERE action = 'token.first_use_new_ip' AND user_id = ?",
[uid],
).fetchone()[0]
assert count == 0
finally:
conn.close()
close_system_db()
def test_pat_can_list_own_tokens(fresh_db):
"""A PAT must be allowed to list its owner's tokens — `da auth token list`
CLI flow. Previously this returned 403 because require_session_token
blocked all PATs uniformly."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="analyst")
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="analyst", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="laptop",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/auth/tokens",
headers={"Authorization": f"Bearer {pat}"},
)
assert resp.status_code == 200, resp.text
rows = resp.json()
assert any(r["id"] == tid for r in rows)
def test_pat_can_revoke_own_token(fresh_db):
"""A PAT must be allowed to revoke its owner's own tokens —
`da auth token revoke` CLI flow."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="analyst")
# Token A — the PAT used to authenticate this call.
tid_a = str(uuid.uuid4())
pat_a = create_access_token(
user_id=uid, email="u@t", role="analyst", token_id=tid_a, typ="pat",
expires_delta=timedelta(days=90),
)
AccessTokenRepository(conn).create(
id=tid_a, user_id=uid, name="primary",
token_hash=hashlib.sha256(pat_a.encode()).hexdigest(),
prefix=tid_a.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
# Token B — the one we'll revoke with A.
tid_b = str(uuid.uuid4())
AccessTokenRepository(conn).create(
id=tid_b, user_id=uid, name="old-ci",
token_hash=hashlib.sha256(b"whatever").hexdigest(),
prefix=tid_b.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.delete(
f"/auth/tokens/{tid_b}",
headers={"Authorization": f"Bearer {pat_a}"},
)
assert resp.status_code == 204, resp.text
# Confirm B is now revoked.
conn = get_system_db()
try:
row = AccessTokenRepository(conn).get_by_id(tid_b)
assert row["revoked_at"] is not None
finally:
conn.close()
close_system_db()
def test_create_token_rejects_expires_in_days_above_cap(fresh_db):
"""expires_in_days > 3650 must return 400 (not 500 via datetime overflow)."""
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
sess_token = create_access_token(user_id=uid, email="u@t", role="admin")
finally:
conn.close()
close_system_db()
client = TestClient(app)
# Just above the cap — must be 400, not 500.
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "laptop", "expires_in_days": 3651},
)
assert resp.status_code == 400, resp.text
assert "3650" in resp.text
# Huge value that would previously overflow datetime.max — still 400.
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "laptop", "expires_in_days": 10_000_000_000},
)
assert resp.status_code == 400, resp.text
def test_pat_first_ever_use_does_not_audit(fresh_db):
"""The first-ever use of a PAT (no prior last_used_at) does NOT emit an audit entry."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="ci",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
# No mark_used call → first-ever use
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={
"Authorization": f"Bearer {pat}",
"Accept": "application/json",
"X-Forwarded-For": "4.4.4.4",
},
)
assert resp.status_code == 200, resp.text
conn = get_system_db()
try:
count = conn.execute(
"SELECT COUNT(*) FROM audit_log WHERE action = 'token.first_use_new_ip' AND user_id = ?",
[uid],
).fetchone()[0]
assert count == 0
finally:
conn.close()
close_system_db()
def test_pat_null_expiry_jwt_has_no_exp_claim(fresh_db):
"""PAT with `expires_in_days=null` (user-requested "never") must not
carry an `exp` claim at all — the DB `expires_at=NULL` is the source
of truth. The previous ~100y `exp` claim was a misleading silent expiry."""
from fastapi.testclient import TestClient
import uuid
import jwt as pyjwt
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
sess_token = create_access_token(user_id=uid, email="u@t", role="admin")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "forever", "expires_in_days": None},
)
assert resp.status_code == 201, resp.text
raw_pat = resp.json()["token"]
# Decode without signature verification — we're inspecting claims only.
claims = pyjwt.decode(raw_pat, options={"verify_signature": False})
assert "exp" not in claims, f"expected no exp claim, got: {claims.get('exp')}"
# But the other PAT claims are still present.
assert claims.get("typ") == "pat"
assert claims.get("sub") == uid
assert "jti" in claims
# DB row mirrors this: expires_at is NULL.
assert resp.json()["expires_at"] is None
def test_pat_with_null_expiry_is_accepted_by_verify_token(fresh_db):
"""A claim-less JWT (no `exp`) must round-trip through verify_token without
raising ExpiredSignatureError and without falling back to a wall-clock
cap. The DB-level expiry check in dependencies.py remains authoritative."""
from app.auth.jwt import create_access_token, verify_token
raw = create_access_token(
user_id="u-1", email="u@t", role="admin",
token_id="tid-1", typ="pat", omit_exp=True,
)
payload = verify_token(raw)
assert payload is not None
assert "exp" not in payload
assert payload["typ"] == "pat"
assert payload["jti"] == "tid-1"
def test_pat_null_expiry_end_to_end_allows_authenticated_request(fresh_db):
"""Create a PAT with `expires_in_days=null`, then use it to call an
authenticated endpoint. Previously relied on the 36500-day `exp`;
now relies on the DB row. Regression guard for the switch."""
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
from tests.helpers.auth import grant_admin
grant_admin(conn, uid)
sess_token = create_access_token(user_id=uid, email="u@t", role="admin")
finally:
conn.close()
close_system_db()
client = TestClient(app)
created = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "forever", "expires_in_days": None},
)
assert created.status_code == 201, created.text
pat = created.json()["token"]
# Use the PAT to list tokens (any authenticated endpoint).
listed = client.get("/auth/tokens", headers={"Authorization": f"Bearer {pat}"})
assert listed.status_code == 200, listed.text
assert any(row["name"] == "forever" for row in listed.json())