agnes-the-ai-analyst/webapp/auth.py
Petr b99ec576ca Add self-service data onboarding system
Table Registry as central source of truth (JSON) with atomic writes,
optimistic locking, audit logging, and data_description.md generation.
Existing readers (config.py, profiler.py) need zero changes.

Phase 1 - Discovery API:
  - discover_tables() on DataSource ABC + Keboola implementation
  - admin_required decorator with server-side recomputation
  - GET /api/admin/discover-tables endpoint

Phase 2 - Table Registry:
  - src/table_registry.py with CRUD, validation, migration from MD
  - Admin API: register/update/unregister with version locking
  - DELETE cascade cleans up per-user subscriptions

Phase 3 - Auto-Profiling:
  - profile_changed_tables() for incremental profiling
  - Non-fatal hook in sync_all() after successful sync

Phase 4 - Per-Table Subscriptions:
  - table_mode (all/explicit) with per-table toggles
  - GET/POST /api/table-subscriptions endpoints
  - Subscription status in catalog and dashboard views

Phase 5 - Smart Sync:
  - Python-generated rsync filter files (not shell YAML parsing)
  - sync_data.sh uses --filter="merge ..." for explicit mode

Phase 6 - Admin UI:
  - /admin/tables with discovery, registration modal, registry mgmt
  - Vanilla JS, matching existing design system
2026-03-09 14:25:37 +01:00

112 lines
3.2 KiB
Python

"""
Core authentication module - shared infrastructure.
Provides:
- login_required decorator (used by all auth methods)
- validate_email_domain() (used by all auth providers)
- /login route (dynamically renders available auth providers)
- /logout route
Auth provider-specific logic lives in auth/<provider>/provider.py.
"""
import functools
import logging
from flask import Blueprint, flash, jsonify, redirect, render_template, request, session, url_for
from .config import Config
logger = logging.getLogger(__name__)
auth_bp = Blueprint("auth", __name__)
def login_required(f):
"""Decorator to require authentication for a route."""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if "user" not in session:
return redirect(url_for("auth.login"))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""Decorator to require admin privileges for a route.
Recomputes admin status server-side on every request.
Returns 403 JSON for API routes, redirect for HTML routes.
"""
@functools.wraps(f)
def decorated_function(*args, **kwargs):
if "user" not in session:
if request.path.startswith("/api/"):
return jsonify({"error": "Authentication required"}), 401
return redirect(url_for("auth.login"))
from .user_service import check_user_exists, get_username_from_email
email = session.get("user", {}).get("email", "")
username = get_username_from_email(email)
user_info = check_user_exists(username)
if not user_info.is_admin:
if request.path.startswith("/api/"):
return jsonify({"error": "Admin access required"}), 403
flash("Admin access required.", "error")
return redirect(url_for("dashboard"))
return f(*args, **kwargs)
return decorated_function
def validate_email_domain(email: str) -> bool:
"""Check if email belongs to allowed domain or whitelist.
Allows access for:
1. Configured allowed domain (for Google OAuth users)
2. Whitelisted emails (for password auth external users)
"""
if not email:
return False
email_lower = email.lower()
# Check whitelist first (for password auth users)
if email_lower in Config.ALLOWED_EMAILS:
return True
# Check domain (for Google OAuth users)
domain = email_lower.split("@")[-1]
return domain == Config.ALLOWED_DOMAIN.lower()
@auth_bp.route("/login")
def login():
"""Show login page with dynamically discovered auth providers."""
if "user" in session:
return redirect(url_for("dashboard"))
from auth import discover_providers
providers = discover_providers()
login_buttons = [
p.get_login_button()
for p in providers
if p.get_login_button().get("visible", True)
]
return render_template("login.html", login_buttons=login_buttons)
@auth_bp.route("/logout")
def logout():
"""Clear session and redirect to login."""
email = session.get("user", {}).get("email", "unknown")
session.clear()
logger.info(f"User logged out: {email}")
flash("You have been logged out.", "info")
return redirect(url_for("auth.login"))