agnes-the-ai-analyst/app/auth/dependencies.py
ZdenekSrotyr caa60a507d feat: add centralized RBAC module — replace Linux group auth
New src/rbac.py: Role enum, hierarchy, get_user_role(), has_role(),
is_admin(), is_km_admin(), has_dataset_access(), set_user_role().

webapp/auth.py: admin_required + km_admin_required now use DuckDB
roles instead of Linux groups (pwd.getpwnam + sudo/data-ops check).

app/auth/dependencies.py: imports Role from src/rbac.py (single source).

11 RBAC tests passing.
2026-03-31 08:04:35 +02:00

83 lines
2.5 KiB
Python

"""FastAPI auth dependencies — current user, role checking."""
from typing import Optional
import duckdb
from fastapi import Depends, HTTPException, Header, Request, status
from app.auth.jwt import verify_token
from src.db import get_system_db
from src.rbac import Role, ROLE_HIERARCHY
from src.repositories.users import UserRepository
def _get_db():
conn = get_system_db()
try:
yield conn
finally:
conn.close()
async def get_current_user(
request: Request = None,
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> dict:
"""Extract and validate JWT from Authorization header or cookie. Returns user dict."""
token = None
# Try Authorization header first
if authorization and authorization.startswith("Bearer "):
token = authorization.removeprefix("Bearer ")
# Fallback to cookie (for web UI after OAuth redirect)
if not token and request:
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid Authorization header",
)
payload = verify_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
repo = UserRepository(conn)
user = repo.get_by_id(payload.get("sub", ""))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
async def get_optional_user(
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> Optional[dict]:
"""Like get_current_user but returns None instead of 401 if no token."""
if not authorization or not authorization.startswith("Bearer "):
return None
try:
return await get_current_user(authorization, conn)
except HTTPException:
return None
def require_role(minimum_role: Role):
"""Dependency factory: require user has at least the given role."""
async def _check(user: dict = Depends(get_current_user)):
user_role = Role(user.get("role", "viewer"))
if ROLE_HIERARCHY.get(user_role, 0) < ROLE_HIERARCHY.get(minimum_role, 0):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role {minimum_role.value} or higher",
)
return user
return _check