agnes-the-ai-analyst/webapp/auth.py
Petr 1318b74ff1 Add Corporate Memory governance — Phase 1 (data model + admin API)
Add admin curation layer between AI extraction and knowledge distribution.
Admins (km_admin flag in instance.yaml) can approve, reject, mandate, and
revoke knowledge items. Mandatory items distribute to all targeted users
automatically.

Three governance modes (configurable per instance):
- mandatory_only: admin controls everything, no user voting
- admin_curated: admin controls, users vote as feedback signal
- hybrid: mandatory from admin + optional from user voting

Three approval workflows:
- review_queue: nothing published without admin approval
- auto_publish: items go live immediately, admin intervenes retroactively
- threshold: confidence-based auto-publish (Phase 5)

Includes:
- 9 admin action functions (approve/reject/mandate/revoke/edit/batch/...)
- 11 new admin API endpoints under /api/corporate-memory/admin/
- Immutable audit log (audit.jsonl)
- Audience targeting via groups
- Automatic migration of existing items to "approved" status
- km_admin_required auth decorator
- 69 tests covering all governance logic
- Backward compatible: no config = legacy wiki behavior
2026-03-23 19:15:33 +01:00

140 lines
4.2 KiB
Python

"""
Core authentication module - shared infrastructure.
Provides:
- login_required decorator (used by all auth methods)
- validate_email_domain() (used by all auth providers)
- /login route (dynamically renders available auth providers)
- /logout route
Auth provider-specific logic lives in auth/<provider>/provider.py.
"""
import functools
import logging
from flask import Blueprint, flash, jsonify, redirect, render_template, request, session, url_for
from .config import Config
logger = logging.getLogger(__name__)
auth_bp = Blueprint("auth", __name__)
def login_required(f):
"""Decorator to require authentication for a route."""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if "user" not in session:
return redirect(url_for("auth.login"))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""Decorator to require admin privileges for a route.
Recomputes admin status server-side on every request.
Returns 403 JSON for API routes, redirect for HTML routes.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if "user" not in session:
if request.path.startswith("/api/"):
return jsonify({"error": "Authentication required"}), 401
return redirect(url_for("auth.login"))
from .user_service import check_user_exists, get_webapp_username
email = session.get("user", {}).get("email", "")
username = get_webapp_username(email)
user_info = check_user_exists(username)
if not user_info.is_admin:
if request.path.startswith("/api/"):
return jsonify({"error": "Admin access required"}), 403
flash("Admin access required.", "error")
return redirect(url_for("dashboard"))
return f(*args, **kwargs)
return decorated_function
def km_admin_required(f):
"""Decorator to require Corporate Memory admin privileges for a route.
Checks km_admin flag via corporate_memory_service.is_km_admin().
Returns 403 JSON for API routes, redirect for HTML routes.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if "user" not in session:
if request.path.startswith("/api/"):
return jsonify({"error": "Authentication required"}), 401
return redirect(url_for("auth.login"))
from .corporate_memory_service import is_km_admin
email = session.get("user", {}).get("email", "")
if not is_km_admin(email):
if request.path.startswith("/api/"):
return jsonify({"error": "Corporate Memory admin access required"}), 403
flash("Corporate Memory admin access required.", "error")
return redirect(url_for("dashboard"))
return f(*args, **kwargs)
return decorated_function
def validate_email_domain(email: str) -> bool:
"""Check if email belongs to an allowed domain or whitelist.
Allows access for:
1. Any of the configured allowed domains (comma-separated in config)
2. Whitelisted emails (for individually approved external users)
"""
if not email:
return False
email_lower = email.lower()
# Check whitelist first (individually approved emails)
if email_lower in Config.ALLOWED_EMAILS:
return True
# Check domain against all allowed domains
domain = email_lower.split("@")[-1]
return domain in Config.ALLOWED_DOMAINS
@auth_bp.route("/login")
def login():
"""Show login page with dynamically discovered auth providers."""
if "user" in session:
return redirect(url_for("dashboard"))
from auth import discover_providers
providers = discover_providers()
login_buttons = [
p.get_login_button()
for p in providers
if p.get_login_button().get("visible", True)
]
return render_template("login.html", login_buttons=login_buttons)
@auth_bp.route("/logout")
def logout():
"""Clear session and redirect to login."""
email = session.get("user", {}).get("email", "unknown")
session.clear()
logger.info(f"User logged out: {email}")
flash("You have been logged out.", "info")
return redirect(url_for("auth.login"))