agnes-the-ai-analyst/app/auth/dependencies.py
ZdenekSrotyr 5cf0df77fc feat: add Metrics API endpoints (GET/POST/DELETE) with admin auth
- New app/api/metrics.py: GET /api/metrics, GET /api/metrics/{id:path},
  POST /api/admin/metrics (201), DELETE /api/admin/metrics/{id:path}
- Add require_admin dependency to app/auth/dependencies.py
- Register metrics_router in app/main.py before web_router
- Deprecate GET /api/catalog/metrics/{path} with 301 redirect to new endpoint
- 7 new tests in TestMetricsAPI covering CRUD, 404, RBAC, category filter
2026-04-10 19:32:13 +02:00

92 lines
2.8 KiB
Python

"""FastAPI auth dependencies — current user, role checking."""
from typing import Optional
import duckdb
from fastapi import Depends, HTTPException, Header, Request, status
from app.auth.jwt import verify_token
from src.db import get_system_db
from src.rbac import Role, ROLE_HIERARCHY
from src.repositories.users import UserRepository
def _get_db():
conn = get_system_db()
try:
yield conn
finally:
conn.close()
async def get_current_user(
request: Request = None,
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> dict:
"""Extract and validate JWT from Authorization header or cookie. Returns user dict."""
token = None
# Try Authorization header first
if authorization and authorization.startswith("Bearer "):
token = authorization.removeprefix("Bearer ")
# Fallback to cookie (for web UI after OAuth redirect)
if not token and request:
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid Authorization header",
)
payload = verify_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
repo = UserRepository(conn)
user = repo.get_by_id(payload.get("sub", ""))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
async def get_optional_user(
request: Request = None,
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> Optional[dict]:
"""Like get_current_user but returns None instead of 401 if no token."""
try:
return await get_current_user(request=request, authorization=authorization, conn=conn)
except HTTPException:
return None
def require_role(minimum_role: Role):
"""Dependency factory: require user has at least the given role."""
async def _check(user: dict = Depends(get_current_user)):
user_role = Role(user.get("role", "viewer"))
if ROLE_HIERARCHY.get(user_role, 0) < ROLE_HIERARCHY.get(minimum_role, 0):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role {minimum_role.value} or higher",
)
return user
return _check
async def require_admin(user: dict = Depends(get_current_user)) -> dict:
"""Dependency: require user is an admin. Raises 403 otherwise."""
if user.get("role") != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user