Add admin curation layer between AI extraction and knowledge distribution. Admins (km_admin flag in instance.yaml) can approve, reject, mandate, and revoke knowledge items. Mandatory items distribute to all targeted users automatically. Three governance modes (configurable per instance): - mandatory_only: admin controls everything, no user voting - admin_curated: admin controls, users vote as feedback signal - hybrid: mandatory from admin + optional from user voting Three approval workflows: - review_queue: nothing published without admin approval - auto_publish: items go live immediately, admin intervenes retroactively - threshold: confidence-based auto-publish (Phase 5) Includes: - 9 admin action functions (approve/reject/mandate/revoke/edit/batch/...) - 11 new admin API endpoints under /api/corporate-memory/admin/ - Immutable audit log (audit.jsonl) - Audience targeting via groups - Automatic migration of existing items to "approved" status - km_admin_required auth decorator - 69 tests covering all governance logic - Backward compatible: no config = legacy wiki behavior
140 lines
4.2 KiB
Python
140 lines
4.2 KiB
Python
"""
|
|
Core authentication module - shared infrastructure.
|
|
|
|
Provides:
|
|
- login_required decorator (used by all auth methods)
|
|
- validate_email_domain() (used by all auth providers)
|
|
- /login route (dynamically renders available auth providers)
|
|
- /logout route
|
|
|
|
Auth provider-specific logic lives in auth/<provider>/provider.py.
|
|
"""
|
|
|
|
import functools
|
|
import logging
|
|
|
|
from flask import Blueprint, flash, jsonify, redirect, render_template, request, session, url_for
|
|
|
|
from .config import Config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
auth_bp = Blueprint("auth", __name__)
|
|
|
|
|
|
def login_required(f):
|
|
"""Decorator to require authentication for a route."""
|
|
|
|
@functools.wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if "user" not in session:
|
|
return redirect(url_for("auth.login"))
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def admin_required(f):
|
|
"""Decorator to require admin privileges for a route.
|
|
|
|
Recomputes admin status server-side on every request.
|
|
Returns 403 JSON for API routes, redirect for HTML routes.
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if "user" not in session:
|
|
if request.path.startswith("/api/"):
|
|
return jsonify({"error": "Authentication required"}), 401
|
|
return redirect(url_for("auth.login"))
|
|
|
|
from .user_service import check_user_exists, get_webapp_username
|
|
|
|
email = session.get("user", {}).get("email", "")
|
|
username = get_webapp_username(email)
|
|
user_info = check_user_exists(username)
|
|
|
|
if not user_info.is_admin:
|
|
if request.path.startswith("/api/"):
|
|
return jsonify({"error": "Admin access required"}), 403
|
|
flash("Admin access required.", "error")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def km_admin_required(f):
|
|
"""Decorator to require Corporate Memory admin privileges for a route.
|
|
|
|
Checks km_admin flag via corporate_memory_service.is_km_admin().
|
|
Returns 403 JSON for API routes, redirect for HTML routes.
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if "user" not in session:
|
|
if request.path.startswith("/api/"):
|
|
return jsonify({"error": "Authentication required"}), 401
|
|
return redirect(url_for("auth.login"))
|
|
|
|
from .corporate_memory_service import is_km_admin
|
|
|
|
email = session.get("user", {}).get("email", "")
|
|
if not is_km_admin(email):
|
|
if request.path.startswith("/api/"):
|
|
return jsonify({"error": "Corporate Memory admin access required"}), 403
|
|
flash("Corporate Memory admin access required.", "error")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def validate_email_domain(email: str) -> bool:
|
|
"""Check if email belongs to an allowed domain or whitelist.
|
|
|
|
Allows access for:
|
|
1. Any of the configured allowed domains (comma-separated in config)
|
|
2. Whitelisted emails (for individually approved external users)
|
|
"""
|
|
if not email:
|
|
return False
|
|
email_lower = email.lower()
|
|
|
|
# Check whitelist first (individually approved emails)
|
|
if email_lower in Config.ALLOWED_EMAILS:
|
|
return True
|
|
|
|
# Check domain against all allowed domains
|
|
domain = email_lower.split("@")[-1]
|
|
return domain in Config.ALLOWED_DOMAINS
|
|
|
|
|
|
@auth_bp.route("/login")
|
|
def login():
|
|
"""Show login page with dynamically discovered auth providers."""
|
|
if "user" in session:
|
|
return redirect(url_for("dashboard"))
|
|
|
|
from auth import discover_providers
|
|
|
|
providers = discover_providers()
|
|
login_buttons = [
|
|
p.get_login_button()
|
|
for p in providers
|
|
if p.get_login_button().get("visible", True)
|
|
]
|
|
return render_template("login.html", login_buttons=login_buttons)
|
|
|
|
|
|
@auth_bp.route("/logout")
|
|
def logout():
|
|
"""Clear session and redirect to login."""
|
|
email = session.get("user", {}).get("email", "unknown")
|
|
session.clear()
|
|
logger.info(f"User logged out: {email}")
|
|
flash("You have been logged out.", "info")
|
|
return redirect(url_for("auth.login"))
|