agnes-the-ai-analyst/app/auth/router.py
ZdenekSrotyr 91caefaca9
security(auth): per-IP rate limit + last-admin guard (#165)
* security(auth): per-IP rate limit on auth endpoints + generalize last-admin guard

Closes #45 and #151.

#45 — every auth endpoint was unthrottled (login, magic-link, token,
bootstrap), leaving us open to password brute-force and SMTP
email-bombing. Wires slowapi (new dep) into the middleware chain with
per-route limits: 10/min on login + token, 5/min on send-link, 3/min on
bootstrap. Returns 429 with Retry-After: 60 once exceeded. Per-IP key
respects the leftmost X-Forwarded-For hop (Caddy in front of the app
strips client-supplied XFF). Operator escape hatch:
AGNES_AUTH_RATELIMIT_ENABLED=0. Test suite disables the limiter via
autouse conftest fixture so existing auth tests that hammer endpoints
in tight loops are unaffected.

#151 — DELETE /api/admin/users/{id}/memberships/{group_id} and the
mirror DELETE /api/admin/groups/{group_id}/members/{user_id} only
guarded against self-removal as last admin. Generalizes to refuse
removing anyone from the seeded Admin group when they are the only
remaining active admin (mirrors the existing
count_admins(active_only=True) <= 1 check on delete_user / update_user).
Recovery from zero admins requires direct DB access, so this closes
a path where a scheduler/bootstrap actor that bypasses normal admin
checks could otherwise empty the group.

* security(auth): throttle remaining email-bombing + token-confirm endpoints

Address code-review gap on PR #165 — the first commit covered /send-link
but missed two endpoints with the IDENTICAL email-bombing surface:

- POST /auth/password/reset       — sends reset mail, anti-enum response
- POST /auth/password/setup/request — sends setup mail, anti-enum response

Both now share the 5/min limit with /send-link.

Also add 10/min to the token-confirm surfaces — high-entropy tokens but
partial leaks via logs / referer have surfaced before, and unbounded
guess rate would let an attacker exhaust the keyspace adjacent to a
leaked prefix:

- POST /auth/email/verify
- GET  /auth/email/verify         — closes the click-through bypass
- POST /auth/password/reset/confirm
- POST /auth/password/setup/confirm

Doc fix: rate_limit.py module docstring + CHANGELOG entry no longer
claim "disable without a redeploy" (misleading). The Limiter constructor
freezes `enabled` from env at import time, matching every other Agnes
env knob — operators set the flag and bounce the container.

Tests: 4 new cases in test_auth_rate_limit.py covering
/reset, /setup/request, /reset/confirm, GET /verify. Full suite:
2583 passed, 32 skipped, 0 failed.

* security(auth): throttle JSON /auth/password/setup — closes form-throttle bypass

Second code-review pass on PR #165 caught a fifth gap: POST /auth/password/setup
(JSON variant, kept for backward compat) consumes the same setup_token as
the web form /setup/confirm but was unthrottled — an attacker brute-forcing
the token just switches from the form path to the JSON path and resumes
at unbounded RPS. Apply the same 10/min limit and signature shape used
on /setup/confirm.

Also extend CHANGELOG note about the JSON-variant bypass for future
operators reading the security entry.

Test: 1 new case (test_password_setup_json_rate_limited_after_10_requests),
9 rate-limit tests + 28 password-flow tests + 41 auth-provider tests pass,
no regressions.

* chore(release): cut 0.30.1 — auth security hardening (rate limit + last-admin guard)
2026-05-02 21:08:33 +02:00

181 lines
6 KiB
Python

"""Auth endpoints — login, token generation, bootstrap."""
import logging
import uuid
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
import duckdb
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from app.auth.jwt import create_access_token
from app.auth.access import is_user_admin
from app.auth.dependencies import _get_db
from app.auth.rate_limit import limiter as _rate_limiter
from src.db import SYSTEM_ADMIN_GROUP
from src.repositories.users import UserRepository
from src.repositories.user_group_members import UserGroupMembersRepository
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["auth"])
class TokenRequest(BaseModel):
email: str
password: str = ""
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user_id: str
email: str
role: str
class BootstrapRequest(BaseModel):
email: str
name: str = ""
password: str = ""
def _audit(user_id: str, action: str, result: str | None = None) -> None:
"""Fire-and-forget audit log entry. Swallows all errors."""
try:
from src.db import get_system_db
from src.repositories.audit import AuditRepository
audit_conn = get_system_db()
AuditRepository(audit_conn).log(
user_id=user_id,
action=action,
resource="auth",
result=result,
)
audit_conn.close()
except Exception:
pass # Audit failure must not block auth
@router.post("/token", response_model=TokenResponse)
@_rate_limiter.limit("10/minute")
async def create_token(
request: Request,
body: TokenRequest,
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Issue a JWT token. Requires password authentication."""
repo = UserRepository(conn)
user = repo.get_by_email(body.email)
if not user:
raise HTTPException(status_code=401, detail="User not found")
if not bool(user.get("active", True)):
_audit(user["id"], "login_failed", result="deactivated")
raise HTTPException(status_code=401, detail="Account deactivated")
# If user has password_hash, require and verify it
if user.get("password_hash"):
if not body.password:
raise HTTPException(status_code=401, detail="Password required")
try:
ph = PasswordHasher()
ph.verify(user["password_hash"], body.password)
except VerifyMismatchError:
_audit(user["id"], "login_failed", result="invalid_password")
raise HTTPException(status_code=401, detail="Invalid password")
except Exception:
logger.exception("Unexpected error during password verification")
raise HTTPException(status_code=500, detail="Internal server error")
else:
# No password set — must use their auth provider (Google OAuth, magic link)
raise HTTPException(
status_code=401,
detail="This account uses external authentication. Please log in via your configured provider.",
)
role_label = "admin" if is_user_admin(user["id"], conn) else "user"
token = create_access_token(
user_id=user["id"],
email=user["email"],
)
_audit(user["id"], "token_created")
return TokenResponse(
access_token=token,
user_id=user["id"],
email=user["email"],
role=role_label,
)
@router.post("/bootstrap", response_model=TokenResponse)
@_rate_limiter.limit("3/minute")
async def bootstrap(
request: Request,
body: BootstrapRequest,
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Bootstrap the first admin account.
Allowed when no user has a password_hash yet. This covers:
(a) No users exist at all.
(b) Only seed users (created by SEED_ADMIN_EMAIL at startup) exist, which
have no password and cannot log in — bootstrap lets the operator
activate them with a password.
If a user with the given email already exists (e.g. as a seed), this
endpoint sets its password_hash (or clears it, if no password was supplied —
useful for OAuth-only flows) and promotes it to admin.
Deactivates as soon as any user has a password_hash.
"""
repo = UserRepository(conn)
existing = repo.list_all()
# Bootstrap is locked once anyone has a password set.
users_with_password = [u for u in existing if u.get("password_hash")]
if users_with_password:
raise HTTPException(
status_code=403,
detail="Bootstrap disabled — a user with a password already exists. Use /auth/password/login.",
)
password_hash = PasswordHasher().hash(body.password) if body.password else None
# If a matching user already exists (e.g. seed), update it; else create fresh.
existing_user = next((u for u in existing if u.get("email") == body.email), None)
if existing_user:
user_id = existing_user["id"]
repo.update(id=user_id, password_hash=password_hash)
_audit(user_id, "bootstrap_activated_seed")
else:
user_id = str(uuid.uuid4())
repo.create(
id=user_id,
email=body.email,
name=body.name or body.email.split("@")[0],
password_hash=password_hash,
)
_audit(user_id, "bootstrap_completed")
# Promote the bootstrap user to the Admin system group — replaces the v9
# ``user_role_grants`` write that the old bootstrap path relied on.
admin_group = conn.execute(
"SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP],
).fetchone()
if admin_group:
UserGroupMembersRepository(conn).add_member(
user_id=user_id,
group_id=admin_group[0],
source="system_seed",
added_by="auth.bootstrap",
)
token = create_access_token(user_id=user_id, email=body.email)
return TokenResponse(
access_token=token,
user_id=user_id,
email=body.email,
role="admin",
)