* feat(dev): add LOCAL_DEV_MODE for one-command local dev When LOCAL_DEV_MODE=1, every protected route auto-authenticates as a seeded admin user (default dev@localhost) — no login screen, no Google OAuth config, no magic-link roundtrip. Startup logs a loud warning to make misuse obvious. Also fixes two preexisting bugs in the magic-link flow that surfaced while wiring up the dev fallback: - /auth/email/verify only accepted POST, but the URL embedded in emails is a GET link — clicking from any mail client returned 405. Added a GET variant that consumes the token, sets the auth cookie, and redirects to /dashboard. - Token expiry check compared an offset-aware datetime.now(timezone.utc) against an offset-naive value from DuckDB, raising TypeError on every valid link. Normalize the stored timestamp to UTC before subtracting. Dev-only fallback (scoped strictly to LOCAL_DEV_MODE to keep test and production behavior identical): send-link logs the magic link to stderr and returns it as dev_link in the JSON response when no SMTP is configured. Usage: ./scripts/run-local-dev.sh open http://localhost:8000 # lands on /dashboard as admin * fix(dev): URL-encode magic-link email + avoid /login redirect loop Two issues surfaced by Devin review on PR #32. 1. _build_magic_link interpolated email into the URL unescaped. For addresses with '+' (e.g. user+tag@gmail.com) Starlette's query parser decoded '+' as a space on the GET /verify side, so repo.get_by_email returned None and every click yielded 401 "Invalid link". quote(email, safe='') fixes both the email transport and the dev_link fallback. 2. /login in LOCAL_DEV_MODE unconditionally redirected to /dashboard. If dev-user seeding failed at startup (main.py wraps seed in try/except), /dashboard 401'd, the HTML redirect handler bounced to /login, and the loop repeated until the browser aborted. Now /login checks the dev user actually exists before short-circuiting; otherwise it falls through to the normal login form so the missing seed is visible.
230 lines
8.4 KiB
Python
230 lines
8.4 KiB
Python
"""FastAPI auth dependencies — current user, role checking."""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Optional
|
|
|
|
import duckdb
|
|
from fastapi import Depends, HTTPException, Header, Request, status
|
|
|
|
from app.auth.jwt import verify_token
|
|
from src.db import get_system_db
|
|
from src.rbac import Role, ROLE_HIERARCHY
|
|
from src.repositories.users import UserRepository
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default dev user used when LOCAL_DEV_MODE=1. Seeded at startup by app/main.py.
|
|
LOCAL_DEV_DEFAULT_EMAIL = "dev@localhost"
|
|
|
|
|
|
def is_local_dev_mode() -> bool:
|
|
"""True when LOCAL_DEV_MODE=1 — unsafe for production, bypasses auth."""
|
|
return os.environ.get("LOCAL_DEV_MODE", "").lower() in ("1", "true", "yes")
|
|
|
|
|
|
def get_local_dev_email() -> str:
|
|
"""Email of the auto-logged-in dev user. Configurable via LOCAL_DEV_USER_EMAIL."""
|
|
return os.environ.get("LOCAL_DEV_USER_EMAIL", LOCAL_DEV_DEFAULT_EMAIL)
|
|
|
|
|
|
def _get_db():
|
|
conn = get_system_db()
|
|
try:
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _client_ip(request: Optional[Request]) -> Optional[str]:
|
|
"""Return the request's client IP, preferring the first hop of X-Forwarded-For.
|
|
|
|
Trust model: this deployment runs behind Caddy (see repo Caddyfile), which
|
|
strips incoming X-Forwarded-For and sets its own. The leftmost hop is
|
|
therefore trustworthy. If the app is ever exposed directly to the internet
|
|
without a proxy, this value becomes client-settable and should only be
|
|
relied on for audit/diagnostics, never access control. Value is stored in
|
|
personal_access_tokens.last_used_ip and audit_log entries — informational
|
|
only, never authorization.
|
|
"""
|
|
if request is None:
|
|
return None
|
|
xff = request.headers.get("x-forwarded-for")
|
|
if xff:
|
|
return xff.split(",", 1)[0].strip() or None
|
|
client = getattr(request, "client", None)
|
|
return getattr(client, "host", None) if client else None
|
|
|
|
|
|
def _get_local_dev_user(conn: duckdb.DuckDBPyConnection) -> Optional[dict]:
|
|
"""Return the seeded dev user when LOCAL_DEV_MODE is on, else None."""
|
|
repo = UserRepository(conn)
|
|
user = repo.get_by_email(get_local_dev_email())
|
|
if not user:
|
|
logger.error(
|
|
"LOCAL_DEV_MODE is on but dev user %s is not seeded; expected app startup to seed it",
|
|
get_local_dev_email(),
|
|
)
|
|
return user
|
|
|
|
|
|
async def get_current_user(
|
|
request: Request = None,
|
|
authorization: Optional[str] = Header(None),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
) -> dict:
|
|
"""Extract and validate JWT from Authorization header or cookie. Returns user dict."""
|
|
if is_local_dev_mode():
|
|
user = _get_local_dev_user(conn)
|
|
if user:
|
|
return user
|
|
# Fall through to normal auth if seed missing — surfaces the bug instead of hiding it.
|
|
|
|
token = None
|
|
|
|
# Try Authorization header first
|
|
if authorization and authorization.startswith("Bearer "):
|
|
token = authorization.removeprefix("Bearer ")
|
|
|
|
# Fallback to cookie (for web UI after OAuth redirect)
|
|
if not token and request:
|
|
token = request.cookies.get("access_token")
|
|
|
|
if not token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing or invalid Authorization header",
|
|
)
|
|
payload = verify_token(token)
|
|
if not payload:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired token",
|
|
)
|
|
|
|
repo = UserRepository(conn)
|
|
user = repo.get_by_id(payload.get("sub", ""))
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="User not found",
|
|
)
|
|
if not bool(user.get("active", True)):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Account deactivated",
|
|
)
|
|
|
|
# PAT validation: check it's not revoked / expired / unknown in DB.
|
|
if payload.get("typ") == "pat":
|
|
from datetime import datetime, timezone
|
|
import hashlib
|
|
from src.repositories.access_tokens import AccessTokenRepository
|
|
|
|
def _fail(detail: str) -> None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED, detail=detail
|
|
)
|
|
|
|
tokens_repo = AccessTokenRepository(conn)
|
|
record = tokens_repo.get_by_id(payload.get("jti", ""))
|
|
if not record:
|
|
_fail("Token unknown")
|
|
if record.get("revoked_at") is not None:
|
|
_fail("Token revoked")
|
|
exp_at = record.get("expires_at")
|
|
if exp_at is not None:
|
|
if isinstance(exp_at, str):
|
|
exp_at = datetime.fromisoformat(exp_at)
|
|
if exp_at.tzinfo is None:
|
|
exp_at = exp_at.replace(tzinfo=timezone.utc)
|
|
if datetime.now(timezone.utc) > exp_at:
|
|
_fail("Token expired")
|
|
# Defense-in-depth: stored token_hash must match sha256(bearer JWT).
|
|
# Protects against a forged-but-unrevoked JWT using a stolen key.
|
|
stored_hash = record.get("token_hash")
|
|
if stored_hash:
|
|
actual = hashlib.sha256(token.encode()).hexdigest()
|
|
if actual != stored_hash:
|
|
_fail("Token mismatch")
|
|
|
|
# First-use-from-new-IP audit entry (#12 acceptance criterion).
|
|
# Only emit when the IP changes on a *subsequent* use — the very
|
|
# first use of a token is not surprising and doesn't need an entry.
|
|
current_ip = _client_ip(request)
|
|
previous_ip = record.get("last_used_ip")
|
|
already_used = record.get("last_used_at") is not None
|
|
if already_used and current_ip and current_ip != previous_ip:
|
|
try:
|
|
from src.repositories.audit import AuditRepository
|
|
AuditRepository(conn).log(
|
|
user_id=user["id"],
|
|
action="token.first_use_new_ip",
|
|
resource=f"token:{payload['jti']}",
|
|
params={"ip": current_ip, "previous_ip": previous_ip},
|
|
)
|
|
except Exception:
|
|
pass # audit failure must not block auth
|
|
|
|
# Record last_used_at / last_used_ip synchronously — acceptable cost; can batch later.
|
|
try:
|
|
tokens_repo.mark_used(payload["jti"], ip=current_ip)
|
|
except Exception:
|
|
pass
|
|
|
|
return user
|
|
|
|
|
|
async def get_optional_user(
|
|
request: Request = None,
|
|
authorization: Optional[str] = Header(None),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
) -> Optional[dict]:
|
|
"""Like get_current_user but returns None instead of 401 if no token."""
|
|
try:
|
|
return await get_current_user(request=request, authorization=authorization, conn=conn)
|
|
except HTTPException:
|
|
return None
|
|
|
|
|
|
def require_role(minimum_role: Role):
|
|
"""Dependency factory: require user has at least the given role."""
|
|
async def _check(user: dict = Depends(get_current_user)):
|
|
user_role = Role(user.get("role", "viewer"))
|
|
if ROLE_HIERARCHY.get(user_role, 0) < ROLE_HIERARCHY.get(minimum_role, 0):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Requires role {minimum_role.value} or higher",
|
|
)
|
|
return user
|
|
return _check
|
|
|
|
|
|
async def require_admin(user: dict = Depends(get_current_user)) -> dict:
|
|
"""Dependency: require user is an admin. Raises 403 otherwise."""
|
|
if user.get("role") != "admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin access required",
|
|
)
|
|
return user
|
|
|
|
|
|
async def require_session_token(request: Request, user: dict = Depends(get_current_user)) -> dict:
|
|
"""Like get_current_user but rejects PAT — for endpoints that must not
|
|
be callable via a long-lived CI token (e.g. creating new tokens, changing password)."""
|
|
auth = request.headers.get("authorization", "")
|
|
token = None
|
|
if auth.startswith("Bearer "):
|
|
token = auth.removeprefix("Bearer ")
|
|
if not token and request:
|
|
token = request.cookies.get("access_token")
|
|
if token:
|
|
from app.auth.jwt import verify_token
|
|
payload = verify_token(token) or {}
|
|
if payload.get("typ") == "pat":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="This endpoint requires an interactive session, not a PAT",
|
|
)
|
|
return user
|