agnes-the-ai-analyst/app/auth/providers/google.py
ZdenekSrotyr 7d036760f5 fix: wrap Google OAuth DB connection in try/finally to ensure it is always closed
The system DB connection opened in google_callback is now closed in a
finally block, so it is released even when an exception occurs between
open and close.
2026-04-09 18:42:56 +02:00

105 lines
3.2 KiB
Python

"""Google OAuth provider for FastAPI."""
import os
import logging
from authlib.integrations.starlette_client import OAuth
from fastapi import APIRouter, Request
from fastapi.responses import RedirectResponse
from starlette.config import Config as StarletteConfig
from app.auth.jwt import create_access_token
from app.instance_config import get_allowed_domains
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth/google", tags=["auth"])
oauth = OAuth()
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET", "")
def is_available() -> bool:
return bool(GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET)
def _setup_oauth():
if not is_available():
return
oauth.register(
name="google",
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
_setup_oauth()
@router.get("/login")
async def google_login(request: Request):
"""Redirect to Google OAuth."""
if not is_available():
return RedirectResponse(url="/login?error=google_not_configured")
redirect_uri = str(request.url_for("google_callback"))
return await oauth.google.authorize_redirect(request, redirect_uri)
@router.get("/callback")
async def google_callback(request: Request):
"""Handle Google OAuth callback."""
if not is_available():
return RedirectResponse(url="/login?error=google_not_configured")
try:
token = await oauth.google.authorize_access_token(request)
user_info = token.get("userinfo", {})
email = user_info.get("email", "")
name = user_info.get("name", "")
if not email:
return RedirectResponse(url="/login?error=no_email")
# Domain check
allowed = get_allowed_domains()
if allowed:
domain = email.split("@")[-1]
if domain not in allowed:
return RedirectResponse(url="/login?error=domain_not_allowed")
# Find or create user
from src.db import get_system_db
from src.repositories.users import UserRepository
import uuid
conn = get_system_db()
try:
repo = UserRepository(conn)
user = repo.get_by_email(email)
if not user:
user_id = str(uuid.uuid4())
repo.create(id=user_id, email=email, name=name, role="analyst")
user = repo.get_by_email(email)
finally:
conn.close()
# Issue JWT
jwt_token = create_access_token(user["id"], user["email"], user["role"])
# Redirect to dashboard with token in cookie
is_production = os.environ.get("TESTING", "").lower() not in ("1", "true")
response = RedirectResponse(url="/dashboard", status_code=302)
response.set_cookie(
key="access_token", value=jwt_token,
httponly=True, max_age=86400, samesite="lax",
secure=is_production,
)
return response
except Exception as e:
logger.error(f"Google OAuth error: {e}")
return RedirectResponse(url="/login?error=oauth_failed")