"""
Email magic link authentication provider.
Users enter their email, receive a magic link, click it and they're logged in.
No passwords needed. Domain restriction ensures only allowed users can access.
Email delivery modes:
1. SMTP relay (recommended) - configure SMTP_HOST, SMTP_PORT, etc. in .env
2. Console mode (development) - link printed to server log, shown in browser
"""
import logging
import smtplib
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from flask import (
Blueprint,
current_app,
flash,
redirect,
render_template,
request,
session,
url_for,
)
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
import os
from auth import AuthProvider
from app.instance_config import get_allowed_domains, get_value
_ALLOWED_DOMAINS = get_allowed_domains()
_ALLOWED_EMAILS = [
e.strip().lower()
for e in os.environ.get("ALLOWED_EMAILS", "").split(",")
if e.strip()
]
def validate_email_domain(email: str) -> bool:
if not email:
return False
email_lower = email.lower()
if email_lower in _ALLOWED_EMAILS:
return True
domain = email_lower.split("@")[-1]
return domain in _ALLOWED_DOMAINS
class _Config:
SECRET_KEY = os.environ.get("WEBAPP_SECRET_KEY", "dev-secret-key-change-me")
ALLOWED_DOMAINS = _ALLOWED_DOMAINS
SMTP_HOST = os.environ.get("SMTP_HOST", "")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ.get("SMTP_USER", "")
SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD", "")
SMTP_FROM = os.environ.get("SMTP_FROM",
os.environ.get("SMTP_USER",
get_value("email", "from_address", default="noreply@example.com")))
SMTP_USE_TLS = os.environ.get("SMTP_USE_TLS", "true").lower() == "true"
INSTANCE_NAME = get_value("instance", "name", default="AI Data Analyst")
Config = _Config
logger = logging.getLogger(__name__)
email_bp = Blueprint("email_auth", __name__)
# SVG envelope icon for the login button
_EMAIL_ICON_HTML = (
'"
)
def _get_serializer() -> URLSafeTimedSerializer:
"""Create token serializer using the app secret key."""
return URLSafeTimedSerializer(Config.SECRET_KEY, salt="email-magic-link")
def _generate_magic_token(email: str) -> str:
"""Generate a signed, time-limited token containing the email."""
s = _get_serializer()
return s.dumps({"email": email.lower(), "t": int(time.time())})
def _verify_magic_token(token: str, max_age_seconds: int = 900) -> str | None:
"""Verify magic link token. Returns email if valid, None otherwise.
Args:
token: The signed token from the magic link URL.
max_age_seconds: Token validity period (default 15 minutes).
Returns:
Email address if token is valid, None otherwise.
"""
s = _get_serializer()
try:
data = s.loads(token, max_age=max_age_seconds)
return data.get("email")
except SignatureExpired:
logger.warning("Magic link token expired")
return None
except BadSignature:
logger.warning("Invalid magic link token")
return None
def _send_magic_email(email: str, magic_url: str) -> bool:
"""Send magic link email via SMTP relay.
Returns True if sent successfully, False otherwise.
"""
smtp_host = Config.SMTP_HOST
if not smtp_host:
return False
msg = MIMEMultipart("alternative")
msg["Subject"] = f"Sign in to {Config.INSTANCE_NAME}"
msg["From"] = Config.SMTP_FROM
msg["To"] = email
text_body = (
f"Sign in to {Config.INSTANCE_NAME}\n\n"
f"Click the link below to sign in:\n{magic_url}\n\n"
f"This link expires in 15 minutes.\n"
f"If you didn't request this, ignore this email."
)
html_body = f"""
Sign in to {Config.INSTANCE_NAME}
Click the button below to sign in:
Sign In
This link expires in 15 minutes.
If you didn't request this, ignore this email.
Or copy and paste this URL into your browser:
{magic_url}
"""
msg.attach(MIMEText(text_body, "plain"))
msg.attach(MIMEText(html_body, "html"))
try:
smtp_port = Config.SMTP_PORT
use_tls = Config.SMTP_USE_TLS
if smtp_port == 465:
server = smtplib.SMTP_SSL(smtp_host, smtp_port, timeout=10)
else:
server = smtplib.SMTP(smtp_host, smtp_port, timeout=10)
if use_tls:
server.starttls()
smtp_user = Config.SMTP_USER
smtp_password = Config.SMTP_PASSWORD
if smtp_user and smtp_password:
server.login(smtp_user, smtp_password)
server.sendmail(Config.SMTP_FROM, [email], msg.as_string())
server.quit()
logger.info("Magic link email sent to %s via SMTP", email)
return True
except Exception as e:
logger.error("Failed to send magic link email to %s: %s", email, e)
return False
# --- Routes ---
@email_bp.route("/login/email")
def login_email_form():
"""Show email input form."""
return render_template(
"login_magic_link.html",
allowed_domains=Config.ALLOWED_DOMAINS,
)
@email_bp.route("/login/email/send", methods=["POST"])
def send_magic_link():
"""Validate email domain and send magic link."""
email = request.form.get("email", "").strip().lower()
if not email:
flash("Please enter your email address.", "error")
return redirect(url_for("email_auth.login_email_form"))
if not validate_email_domain(email):
domains_str = ", ".join(f"@{d}" for d in Config.ALLOWED_DOMAINS)
flash(
f"Only {domains_str} email addresses are allowed.",
"error",
)
return redirect(url_for("email_auth.login_email_form"))
# Generate magic link
token = _generate_magic_token(email)
magic_url = url_for("email_auth.verify_magic_link", token=token, _external=True)
# Try SMTP first, fall back to console mode
smtp_sent = _send_magic_email(email, magic_url)
if smtp_sent:
flash("Check your email for the sign-in link.", "info")
return render_template(
"login_magic_link_sent.html",
email=email,
console_mode=False,
)
else:
# Console/development mode - show link directly
logger.info("MAGIC LINK for %s: %s", email, magic_url)
return render_template(
"login_magic_link_sent.html",
email=email,
magic_url=magic_url,
console_mode=True,
)
@email_bp.route("/login/email/verify/")
def verify_magic_link(token: str):
"""Verify magic link token and log user in."""
email = _verify_magic_token(token)
if not email:
flash("This sign-in link has expired or is invalid. Please try again.", "error")
return redirect(url_for("email_auth.login_email_form"))
# Double-check domain (in case config changed since token was issued)
if not validate_email_domain(email):
flash("Your email is no longer authorized.", "error")
return redirect(url_for("auth.login"))
# Set session (shared contract across all auth providers)
name = email.split("@")[0].replace(".", " ").title()
session["user"] = {
"email": email,
"name": name,
"picture": "",
}
logger.info("User logged in via magic link: %s", email)
return redirect(url_for("dashboard"))
# --- Provider class ---
class EmailAuthProvider(AuthProvider):
"""Email magic link authentication provider."""
def get_name(self) -> str:
return "email"
def get_display_name(self) -> str:
return "Email"
def get_blueprint(self) -> Blueprint:
return email_bp
def get_login_button(self) -> dict:
domains = Config.ALLOWED_DOMAINS
if len(domains) > 1:
domain_str = ", ".join(f"@{d}" for d in domains)
elif domains:
domain_str = f"@{domains[0]}"
else:
domain_str = ""
return {
"text": "Sign in with Email",
"url": "/login/email",
"icon_html": _EMAIL_ICON_HTML,
"subtitle": f'For {domain_str} email addresses.' if domain_str else "",
"order": 20,
"css_class": "btn-email",
"visible": True,
}
def is_available(self) -> bool:
"""Available when at least one allowed domain is configured."""
return len(Config.ALLOWED_DOMAINS) > 0
def init_app(self, app) -> None:
"""No additional initialization needed."""
pass
# Module-level provider instance for auto-discovery
provider = EmailAuthProvider()