agnes-the-ai-analyst/app/auth/dependencies.py
ZdenekSrotyr 1b219cabe9 fix: remove dead PRAGMA enable_wal code
DuckDB has used WAL by default since v0.8, so this pragma is not
valid DuckDB syntax. Removed obsolete try-except block that attempted
to enable WAL on system database initialization.
2026-04-09 06:59:57 +02:00

82 lines
2.5 KiB
Python

"""FastAPI auth dependencies — current user, role checking."""
from typing import Optional
import duckdb
from fastapi import Depends, HTTPException, Header, Request, status
from app.auth.jwt import verify_token
from src.db import get_system_db
from src.rbac import Role, ROLE_HIERARCHY
from src.repositories.users import UserRepository
def _get_db():
conn = get_system_db()
try:
yield conn
finally:
conn.close()
async def get_current_user(
request: Request = None,
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> dict:
"""Extract and validate JWT from Authorization header or cookie. Returns user dict."""
token = None
# Try Authorization header first
if authorization and authorization.startswith("Bearer "):
token = authorization.removeprefix("Bearer ")
# Fallback to cookie (for web UI after OAuth redirect)
if not token and request:
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid Authorization header",
)
payload = verify_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
repo = UserRepository(conn)
user = repo.get_by_id(payload.get("sub", ""))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
async def get_optional_user(
request: Request = None,
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> Optional[dict]:
"""Like get_current_user but returns None instead of 401 if no token."""
try:
return await get_current_user(request=request, authorization=authorization, conn=conn)
except HTTPException:
return None
def require_role(minimum_role: Role):
"""Dependency factory: require user has at least the given role."""
async def _check(user: dict = Depends(get_current_user)):
user_role = Role(user.get("role", "viewer"))
if ROLE_HIERARCHY.get(user_role, 0) < ROLE_HIERARCHY.get(minimum_role, 0):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role {minimum_role.value} or higher",
)
return user
return _check