agnes-the-ai-analyst/tests/test_pat.py
Petr Simecek c25fd41bf7
feat(auth): Google Workspace groups on /profile + tag-triggered Keboola deploy workflow (#56)
* feat(auth): display Google Workspace groups on /profile

- Request cloud-identity.groups.readonly scope in Google OAuth
- Fetch groups via Cloud Identity API after callback; tolerate 4xx
  (non-Workspace tenants) and network errors — never break login
- Store result in Starlette session as google_groups
- Replace /profile redirect with a real profile page rendering
  account details (email, name, role) and the group list; show a
  friendly empty state when no groups are available
- Tests: helper parsing + 403 + exception paths; profile page
  smoke test; updated the old redirect test

* test: remove stale /profile redirect tests

Cherry-pick of Zdeněk's 4f7e4cd ("display Google Workspace groups on
/profile") replaces the /profile redirect with a real profile page —
but only updated one of three tests that expected the old behaviour.

These two tests in test_admin_tokens_ui.py and test_pat.py were left
asserting `/profile → 302 /tokens`, which now returns
`/profile → 302 /login?next=%2Fprofile` for unauth users (the standard
auth guard) or `/profile → 200 HTML` for authenticated users.

Removed both rather than patched — coverage for the new behaviour
already exists in tests/test_auth_providers.py (added by the same
commit). The /tokens render assertions in the deleted test_pat.py case
are redundant with test_admin_tokens_ui.py's own /tokens UI tests.

* fix(auth): Google groups search query needs parent + labels predicates

Cloud Identity Groups Search API returns 400 INVALID_ARGUMENT when the
CEL query lacks the required `parent == 'customers/<id>'` predicate AND
a `'<label>' in labels` membership predicate. Zdeněk's original 4f7e4cd
query had only `member_key_id == '<email>'` — every fetch silently
returned [] and the /profile groups list was always empty.

Fix: build the query with all three required pieces:
  parent == 'customers/my_customer'   (alias = caller's own Workspace
                                       org; no need to look up customer ID)
  member_key_id == '<email>'           (filter to this user's memberships)
  'cloudidentity.googleapis.com/groups.discussion_forum' in labels
                                       (Workspace mailing-list groups —
                                       the common case; security-group
                                       coverage is a follow-up)

Also: log the full error body (not truncated to 200 chars) and the
query string so the next time Google rejects something we can diagnose
in one log line instead of a re-deploy.

Caught when first agnes-dev login completed normally (HTTP 302) but app
log showed `Google groups fetch returned 400 for petr@keboola.com:
{"error":{"code":400,"message":"Request contains an invalid argument."}}`
on the same VM (kids-ai-data-analysis / agnes-dev.keboola.com).

Reference: https://cloud.google.com/identity/docs/reference/rest/v1/groups/search

* feat(web): add Profile link to user dropdown menu

The /profile page (Zdeněk's 4f7e4cd cherry-pick) renders a real profile
view including Google Workspace groups, but had no entry point in the
UI — users could only reach it by typing the URL manually. Add a
"Profile" menu item between the user header (email + role) and
"My tokens" so the page is discoverable.

Side effect: cleaned up the leftover `or _path.startswith('/profile')`
condition on the "My tokens" active class, which dated from the old
/profile → /tokens redirect (removed in c789617). Now each menu item
owns its own active state.

* fix: profile-link tests + .env quoting for CADDY_TLS

Two issues caught by Keboola's first agnes-dev deploy + agnes-auto-upgrade
cron run:

1. tests/test_web_ui.py — two negative assertions ("href=/profile" NOT in
   body) date from when /profile was a redirect-only stub. Now /profile
   is a real page (groups display) AND has a dropdown menu link, so the
   negative assertions flip to positive. Same for ">Profile<" text in
   the non-admin nav test.

2. startup-script.sh.tpl — CADDY_TLS line must be QUOTED in .env, because
   agnes-auto-upgrade.sh sources .env via `set -a; . .env; set +a` and
   bash treats `KEY=value with spaces` as `KEY=value` followed by `with`
   and `spaces` exec attempts. Symptom: cron log spam
   `/opt/agnes/.env: line 14: petr@keboola.com: command not found`,
   the cron exits non-zero, and no auto-upgrade ever happens. Caddy
   itself reads the value fine because docker-compose env_file=.env
   parses key=value properly without shell-evaluating the rest.

   Fix: emit `CADDY_TLS="tls <email>"` instead of `CADDY_TLS=tls <email>`.
   Both the cron source and docker-compose env_file accept the quoted
   form; cron stops failing.

* fix(auth): use searchTransitiveGroups + security label for non-admin user

Three bugs in the original cherry-pick + my prior fix attempt, all caught
by a stdlib probe script (scripts/debug/probe_google_groups.py) run
locally with a Playground-issued OAuth token:

1. Wrong endpoint. `groups:search` is the admin "find groups in org"
   endpoint and 400s for non-admin users regardless of query. Switched
   to `groups/-/memberships:searchTransitiveGroups` which is the
   user-perspective "what groups am I in" endpoint.

2. Wrong label. Querying with `cloudidentity.googleapis.com/groups.discussion_forum`
   returns 403 "Insufficient permissions to retrieve memberships" even
   on the new endpoint — Workspace policy denies non-admin reads of
   discussion-forum groups. Switching to `groups.security` returns 200
   with the actual membership list. Empirically every Workspace group
   at Keboola carries BOTH labels, so the security filter sees the full
   set anyway. Confirmed with the probe script.

3. Wrong response shape. `searchTransitiveGroups` returns
   {"memberships": [...]}, not {"groups": [...]}. Parser updated
   accordingly.

Also adds scripts/debug/probe_google_groups.py — stdlib-only standalone
probe that hits 6 candidate endpoints with a user OAuth token. Saved a
deploy cycle (~10 min) per query iteration; future API-syntax debugging
should start there.

Verified end-to-end: petr@keboola.com login on agnes-dev returns 5
groups (LIC-1PASSWORD, ROLE_ATLASSIAN_*, etc.) via the probe; once
deployed, the same will populate session["google_groups"] and render
on /profile.

* test(auth): update Google groups parser fixture to match searchTransitiveGroups shape

Mock payload was `{"groups": [...]}` (the shape `groups:search` returns).
After switching to `groups/-/memberships:searchTransitiveGroups` in the
prior commit, the actual response is `{"memberships": [...]}` and the
parser iterates that key. Test now mirrors the real shape.

The per-item structure (groupKey.id + displayName) is unchanged, so the
expected output dict stays the same: [{"id": "...", "name": "..."}].

* docs(auth): add docs/auth-groups.md — Google Workspace groups runbook

Captures the non-obvious bits: the GCP-side setup checklist (Cloud
Identity API + scope on consent screen + Internal user type), the
`security` vs `discussion_forum` label trap (the latter 403s for
non-admins, the former 200s — one of those is a 4-iteration debug
session and shouldn't have to be repeated), where groups are stored
(session, not DB) and how to refresh (re-login), plus how to use the
probe script for future API-syntax issues.

Deliberately stops short of explaining "what is Cloud Identity" or
"what is OAuth scope" — those belong in Google's own docs, not ours.

* docs(claude): document release workflows + module versioning + recreate trick

New "Release & deploy workflows" section in CLAUDE.md covers what didn't
exist anywhere in the repo before:

- Distinction between release.yml (auto-build per push) vs the new
  keboola-deploy.yml (tag-triggered, explicit deploy only) — plus when
  to use which (per-developer convenience vs shared dev VM safety)
- Module versioning (infra-vX.Y.Z) and the bump-after-merge dance
- The lifecycle.ignore_changes [metadata_startup_script] gotcha and how
  to force a recreate via workflow_dispatch's recreate_targets input

All generic — no customer hostnames, project IDs, IPs. Customer-specific
deploy steps belong in the consuming infra repo's README.

Also: cross-reference docs/auth-groups.md from the Authentication
section so future Claude sessions find the Workspace-groups runbook
without grepping.

---------

Co-authored-by: ZdenekSrotyr <zdenek.srotyr@keboola.com>
2026-04-26 00:56:44 +02:00

681 lines
24 KiB
Python

"""Tests for #12 — personal access tokens (PAT)."""
import os
import tempfile
import pytest
@pytest.fixture
def fresh_db(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
monkeypatch.setenv("TESTING", "1")
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
yield tmp
def test_schema_v6_creates_pat_table(fresh_db):
from src.db import get_system_db, get_schema_version, close_system_db
conn = get_system_db()
try:
cols = conn.execute("PRAGMA table_info(personal_access_tokens)").fetchall()
col_names = [c[1] for c in cols]
for expected in ("id", "user_id", "name", "token_hash", "prefix",
"scopes", "created_at", "expires_at", "last_used_at", "revoked_at"):
assert expected in col_names
assert get_schema_version(conn) >= 6
finally:
conn.close()
close_system_db()
def test_schema_v7_adds_last_used_ip_column(fresh_db):
"""Schema v7: personal_access_tokens has last_used_ip column."""
from src.db import get_system_db, get_schema_version, close_system_db
conn = get_system_db()
try:
cols = conn.execute("PRAGMA table_info(personal_access_tokens)").fetchall()
col_names = [c[1] for c in cols]
assert "last_used_ip" in col_names
assert get_schema_version(conn) >= 7
finally:
conn.close()
close_system_db()
def test_access_token_repo_create_and_lookup(fresh_db):
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.access_tokens import AccessTokenRepository
conn = get_system_db()
try:
repo = AccessTokenRepository(conn)
token_id = str(uuid.uuid4())
raw = "abcdefgh" + "x" * 32
repo.create(
id=token_id,
user_id="u1",
name="laptop",
token_hash=hashlib.sha256(raw.encode()).hexdigest(),
prefix=raw[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
row = repo.get_by_id(token_id)
assert row is not None
assert row["name"] == "laptop"
assert row["prefix"] == "abcdefgh"
assert row["revoked_at"] is None
rows = repo.list_for_user("u1")
assert len(rows) == 1
repo.revoke(token_id)
assert repo.get_by_id(token_id)["revoked_at"] is not None
finally:
conn.close()
close_system_db()
def test_access_token_repo_mark_used(fresh_db):
import hashlib, uuid
from datetime import datetime, timezone
from src.db import get_system_db, close_system_db
from src.repositories.access_tokens import AccessTokenRepository
conn = get_system_db()
try:
repo = AccessTokenRepository(conn)
tid = str(uuid.uuid4())
repo.create(id=tid, user_id="u1", name="x",
token_hash=hashlib.sha256(b"r").hexdigest(), prefix="rrrrrrrr")
assert repo.get_by_id(tid)["last_used_at"] is None
repo.mark_used(tid)
assert repo.get_by_id(tid)["last_used_at"] is not None
finally:
conn.close()
close_system_db()
def test_pat_token_carries_typ_claim(fresh_db):
from app.auth.jwt import create_access_token, verify_token
token = create_access_token(
user_id="u1", email="u@test", role="analyst",
token_id="deadbeef-1234", typ="pat",
)
payload = verify_token(token)
assert payload["typ"] == "pat"
assert payload["jti"] == "deadbeef-1234"
def test_session_token_defaults_typ(fresh_db):
from app.auth.jwt import create_access_token, verify_token
token = create_access_token(user_id="u1", email="u@test", role="analyst")
payload = verify_token(token)
# Default typ is "session".
assert payload.get("typ") == "session"
def test_revoked_pat_is_rejected(fresh_db, monkeypatch):
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
token_id = str(uuid.uuid4())
raw = "secretXX" + "a" * 32
AccessTokenRepository(conn).create(
id=token_id, user_id=uid, name="ci",
token_hash=hashlib.sha256(raw.encode()).hexdigest(),
prefix=raw[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=30),
)
jwt_token = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=token_id, typ="pat",
)
# Revoke
AccessTokenRepository(conn).revoke(token_id)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={"Authorization": f"Bearer {jwt_token}", "Accept": "application/json"},
)
assert resp.status_code == 401
def test_expired_pat_is_rejected_from_db(fresh_db):
"""A PAT with a past expires_at in DB is rejected even if JWT exp is in future."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
tid = str(uuid.uuid4())
# Past-dated expiry in DB
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="stale",
token_hash=hashlib.sha256(b"whatever").hexdigest(), prefix=tid.replace("-","")[:8],
expires_at=datetime.now(timezone.utc) - timedelta(days=1),
)
# JWT with much longer TTL so signature-level `exp` would pass
pat = create_access_token(
user_id=uid, email="u@t", role="admin",
token_id=tid, typ="pat",
expires_delta=timedelta(days=365),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={"Authorization": f"Bearer {pat}", "Accept": "application/json"},
)
assert resp.status_code == 401
def test_create_pat_returns_raw_once(fresh_db):
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
sess_token = create_access_token(user_id=uid, email="u@t", role="admin") # typ=session
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "laptop", "expires_in_days": 30},
)
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "laptop"
assert "token" in data and data["token"] # raw token returned exactly once
# Listing returns prefix, never raw.
# Prefix is derived from the token id (jti), not the JWT string, to avoid
# all tokens having the useless "eyJhbGci" JWT-header prefix.
list_resp = client.get(
"/auth/tokens", headers={"Authorization": f"Bearer {sess_token}"},
)
assert list_resp.status_code == 200
rows = list_resp.json()
assert len(rows) == 1
assert "token" not in rows[0]
assert rows[0]["prefix"] == data["prefix"]
assert len(rows[0]["prefix"]) == 8
assert not data["prefix"].startswith("eyJ") # regression: not the JWT header
def test_pat_cannot_create_pat(fresh_db):
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
tid = str(uuid.uuid4())
# Create the JWT first so we can store its sha256 as token_hash (otherwise
# the defense-in-depth check in get_current_user would reject it with 401
# before require_session_token ever runs).
pat = create_access_token(user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat")
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="x",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {pat}"},
json={"name": "bad", "expires_in_days": 30},
)
assert resp.status_code == 403
# NOTE: test_profile_page_redirects_to_tokens removed — /profile no longer
# redirects to /tokens; it renders a real profile page including Google
# Workspace groups (cherry-pick of Zdeněk's 4f7e4cd). The /tokens render
# checks (My tokens title, new-token-btn) survive in the test_admin_tokens_ui
# suite.
def test_pat_first_use_from_new_ip_audits(fresh_db):
"""Using a PAT from a different IP than last time emits an audit entry."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
repo = AccessTokenRepository(conn)
repo.create(
id=tid, user_id=uid, name="ci",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
# Simulate a prior use from 1.1.1.1 so the upcoming call is a "new IP".
repo.mark_used(tid, ip="1.1.1.1")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={
"Authorization": f"Bearer {pat}",
"Accept": "application/json",
"X-Forwarded-For": "2.2.2.2",
},
)
assert resp.status_code == 200, resp.text
conn = get_system_db()
try:
rows = conn.execute(
"SELECT params FROM audit_log WHERE action = 'token.first_use_new_ip' AND user_id = ?",
[uid],
).fetchall()
assert len(rows) == 1, f"expected 1 audit row, got {len(rows)}"
params = rows[0][0]
# params is stored as JSON text; check the IP appears
assert "2.2.2.2" in str(params)
finally:
conn.close()
close_system_db()
def test_pat_same_ip_does_not_audit(fresh_db):
"""Using a PAT from the same IP as last time does NOT emit an audit entry."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
repo = AccessTokenRepository(conn)
repo.create(
id=tid, user_id=uid, name="ci",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
repo.mark_used(tid, ip="3.3.3.3")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={
"Authorization": f"Bearer {pat}",
"Accept": "application/json",
"X-Forwarded-For": "3.3.3.3",
},
)
assert resp.status_code == 200, resp.text
conn = get_system_db()
try:
count = conn.execute(
"SELECT COUNT(*) FROM audit_log WHERE action = 'token.first_use_new_ip' AND user_id = ?",
[uid],
).fetchone()[0]
assert count == 0
finally:
conn.close()
close_system_db()
def test_pat_can_list_own_tokens(fresh_db):
"""A PAT must be allowed to list its owner's tokens — `da auth token list`
CLI flow. Previously this returned 403 because require_session_token
blocked all PATs uniformly."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="analyst")
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="analyst", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="laptop",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/auth/tokens",
headers={"Authorization": f"Bearer {pat}"},
)
assert resp.status_code == 200, resp.text
rows = resp.json()
assert any(r["id"] == tid for r in rows)
def test_pat_can_revoke_own_token(fresh_db):
"""A PAT must be allowed to revoke its owner's own tokens —
`da auth token revoke` CLI flow."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="analyst")
# Token A — the PAT used to authenticate this call.
tid_a = str(uuid.uuid4())
pat_a = create_access_token(
user_id=uid, email="u@t", role="analyst", token_id=tid_a, typ="pat",
expires_delta=timedelta(days=90),
)
AccessTokenRepository(conn).create(
id=tid_a, user_id=uid, name="primary",
token_hash=hashlib.sha256(pat_a.encode()).hexdigest(),
prefix=tid_a.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
# Token B — the one we'll revoke with A.
tid_b = str(uuid.uuid4())
AccessTokenRepository(conn).create(
id=tid_b, user_id=uid, name="old-ci",
token_hash=hashlib.sha256(b"whatever").hexdigest(),
prefix=tid_b.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.delete(
f"/auth/tokens/{tid_b}",
headers={"Authorization": f"Bearer {pat_a}"},
)
assert resp.status_code == 204, resp.text
# Confirm B is now revoked.
conn = get_system_db()
try:
row = AccessTokenRepository(conn).get_by_id(tid_b)
assert row["revoked_at"] is not None
finally:
conn.close()
close_system_db()
def test_create_token_rejects_expires_in_days_above_cap(fresh_db):
"""expires_in_days > 3650 must return 400 (not 500 via datetime overflow)."""
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
sess_token = create_access_token(user_id=uid, email="u@t", role="admin")
finally:
conn.close()
close_system_db()
client = TestClient(app)
# Just above the cap — must be 400, not 500.
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "laptop", "expires_in_days": 3651},
)
assert resp.status_code == 400, resp.text
assert "3650" in resp.text
# Huge value that would previously overflow datetime.max — still 400.
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "laptop", "expires_in_days": 10_000_000_000},
)
assert resp.status_code == 400, resp.text
def test_pat_first_ever_use_does_not_audit(fresh_db):
"""The first-ever use of a PAT (no prior last_used_at) does NOT emit an audit entry."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
tid = str(uuid.uuid4())
pat = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat",
expires_delta=timedelta(days=90),
)
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="ci",
token_hash=hashlib.sha256(pat.encode()).hexdigest(),
prefix=tid.replace("-", "")[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
# No mark_used call → first-ever use
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={
"Authorization": f"Bearer {pat}",
"Accept": "application/json",
"X-Forwarded-For": "4.4.4.4",
},
)
assert resp.status_code == 200, resp.text
conn = get_system_db()
try:
count = conn.execute(
"SELECT COUNT(*) FROM audit_log WHERE action = 'token.first_use_new_ip' AND user_id = ?",
[uid],
).fetchone()[0]
assert count == 0
finally:
conn.close()
close_system_db()
def test_pat_null_expiry_jwt_has_no_exp_claim(fresh_db):
"""PAT with `expires_in_days=null` (user-requested "never") must not
carry an `exp` claim at all — the DB `expires_at=NULL` is the source
of truth. The previous ~100y `exp` claim was a misleading silent expiry."""
from fastapi.testclient import TestClient
import uuid
import jwt as pyjwt
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
sess_token = create_access_token(user_id=uid, email="u@t", role="admin")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "forever", "expires_in_days": None},
)
assert resp.status_code == 201, resp.text
raw_pat = resp.json()["token"]
# Decode without signature verification — we're inspecting claims only.
claims = pyjwt.decode(raw_pat, options={"verify_signature": False})
assert "exp" not in claims, f"expected no exp claim, got: {claims.get('exp')}"
# But the other PAT claims are still present.
assert claims.get("typ") == "pat"
assert claims.get("sub") == uid
assert "jti" in claims
# DB row mirrors this: expires_at is NULL.
assert resp.json()["expires_at"] is None
def test_pat_with_null_expiry_is_accepted_by_verify_token(fresh_db):
"""A claim-less JWT (no `exp`) must round-trip through verify_token without
raising ExpiredSignatureError and without falling back to a wall-clock
cap. The DB-level expiry check in dependencies.py remains authoritative."""
from app.auth.jwt import create_access_token, verify_token
raw = create_access_token(
user_id="u-1", email="u@t", role="admin",
token_id="tid-1", typ="pat", omit_exp=True,
)
payload = verify_token(raw)
assert payload is not None
assert "exp" not in payload
assert payload["typ"] == "pat"
assert payload["jti"] == "tid-1"
def test_pat_null_expiry_end_to_end_allows_authenticated_request(fresh_db):
"""Create a PAT with `expires_in_days=null`, then use it to call an
authenticated endpoint. Previously relied on the 36500-day `exp`;
now relies on the DB row. Regression guard for the switch."""
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
sess_token = create_access_token(user_id=uid, email="u@t", role="admin")
finally:
conn.close()
close_system_db()
client = TestClient(app)
created = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "forever", "expires_in_days": None},
)
assert created.status_code == 201, created.text
pat = created.json()["token"]
# Use the PAT to list tokens (any authenticated endpoint).
listed = client.get("/auth/tokens", headers={"Authorization": f"Bearer {pat}"})
assert listed.status_code == 200, listed.text
assert any(row["name"] == "forever" for row in listed.json())