agnes-the-ai-analyst/app/main.py
ZdenekSrotyr 995e4cd366
fix(scheduler): HTTP marketplaces job + SCHEDULER_API_TOKEN shared secret (#127)
* fix(scheduler): HTTP marketplaces job + SCHEDULER_API_TOKEN shared secret

Two scheduler-reliability bugs surfaced after the v0.12.1 USER-agnes flip:

1. The marketplaces job called src.marketplace.sync_marketplaces() in-process
   from the scheduler container, racing the app's long-lived system.duckdb
   handle. DuckDB rejects cross-process writers — every cron tick 500-ed on
   "Could not set lock on file ... PID 0".

2. The data-refresh + new marketplaces jobs both 401-ed on the API because
   SCHEDULER_API_TOKEN was never propagated by the Terraform startup script.
   The scheduler had no credential to authenticate with.

Fix:
- New POST /api/marketplaces/sync-all (admin-only) drives the nightly refresh
  through the app process so it inherits the existing DB connection.
- Scheduler swaps fn->http for marketplaces; all jobs are now plain HTTP and
  the scheduler is reduced to a cron clock.
- New app/auth/scheduler_token.py adds a shared-secret auth path. The
  startup script generates a 256-bit secret on first boot, persists it
  across reboots, and writes it to /opt/agnes/.env. Both containers source
  the same .env. The app validates incoming Bearer tokens against the env
  var (constant-time, length-floored) and resolves matches to a synthetic
  scheduler@system.local user that's a member of the Admin system group.
  Audit-log entries from the scheduler are attributed to this user.
- app/main.py seeds the synthetic user at startup so the first cron tick
  has a valid actor; lazy seed in get_scheduler_user covers token rotation
  before the next app restart.

Tests: 5 new in tests/test_auth_scheduler_token.py covering empty/short
secret rejection, exact-match comparison, idempotent user seeding, and
lazy provisioning. 142 marketplace + scheduler tests + 96 auth tests
remain green.

Existing VMs with .env from before this change need a one-time
re-provisioning (re-run startup-script or rotate via openssl rand);
documented in CHANGELOG.

* fix(audit): use '_all' sentinel for bulk marketplace sync — Devin review #127

Avoids the literal string 'marketplace:None' in the audit_log resource
column when the bulk sync endpoint writes its summary row.

* fix(scheduler): unblock event loop + per-job timeouts — Devin review #127

Two findings from Devin re-review on commit 5fbad15:

1. BUG: trigger_sync_all was async def, so FastAPI ran it on the asyncio
   event loop. sync_marketplaces() does blocking I/O (subprocess git
   clones up to GIT_TIMEOUT_SEC=300 each, threading.Lock, DuckDB writes)
   and would freeze every concurrent request for the duration of a bulk
   sync. Switched to plain def so FastAPI auto-routes to the thread pool.

2. ANALYSIS: scheduler used a fixed 120s httpx timeout for every POST.
   Bulk marketplace sync iterates the registry under a single lock with
   up to 300s per repo — easily exceeds 120s on 2-3 slow repos. The
   scheduler then sees a timeout, doesn't update last_run, and re-fires
   on the next 30s tick, queueing redundant work. Per-job timeout
   override added to the JOBS tuple; marketplaces gets 900s (15 min),
   data-refresh keeps 120s, health-check 30s.

* fix(auth): require_session_token rejects scheduler shared secret — Devin review #127

require_session_token gates /auth/tokens (PAT minting). Pre-fix it only
rejected JWTs with typ=pat — but the scheduler shared secret is an opaque
string, so verify_token() returns None, payload becomes {}, and the
PAT-claim check silently passed. A caller bearing SCHEDULER_API_TOKEN
could mint persistent PATs that survive a secret rotation.

Added explicit is_scheduler_token() check before the PAT-claim check;
new regression test in tests/test_auth_scheduler_token.py.

Devin's other note (pre-existing async def trigger_sync at marketplaces.py:392
also calls blocking sync_one) — Devin flagged it as out-of-scope for this PR
and I agree; tracking separately.

* release(0.17.0): cut + clean up CHANGELOG duplicates

Cuts 0.17.0 (minor: scheduler shared-secret auth + sync-all endpoint
plus the deploy-shape fixes that landed since the last release tag).

Bumps pyproject from 0.15.0 — also corrects the missed bump from PR #120
(v0.16.0 was tagged on GitHub and shipped as :stable, but pyproject
stayed at 0.15.0, so /api/version, /cli/latest, and `da --version` had
been under-reporting the running release).

Removes the long-form duplicate entries for 0.13.0 / 0.14.0 / 0.15.0
above [0.16.0] — the canonical short summaries (with GitHub-release
links) already exist below 0.16.0, the long forms were leftover state
from before those versions were cut and have been silently shadowed
ever since.
2026-04-29 11:44:00 +02:00

432 lines
19 KiB
Python

"""FastAPI main application — unified server for web UI + API."""
# Silence authlib's internal forward-compat note. Authlib emits an
# AuthlibDeprecationWarning from its own _joserfc_helpers when our
# `from authlib.integrations.starlette_client import OAuth` import
# touches `authlib.jose` paths. The warning is upstream-internal — it's
# telling authlib to migrate to joserfc before its 2.0; it's not
# actionable on our side until either authlib ships the fix or we
# rewrite OAuth handling on top of joserfc directly. Filtering here
# (before authlib gets imported transitively) keeps `make local-dev`
# stdout clean without hiding warnings from any other package.
import warnings as _warnings
try:
from authlib.deprecate import AuthlibDeprecationWarning as _AuthlibDepr
_warnings.filterwarnings("ignore", category=_AuthlibDepr)
except ImportError:
# authlib too old / class moved — fall back to message-based match
# so the filter still keeps startup clean.
_warnings.filterwarnings(
"ignore",
message=r"authlib\.jose module is deprecated.*",
)
import logging
from contextlib import asynccontextmanager
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as _pkg_version
from pathlib import Path
from urllib.parse import quote
import os
def _app_version() -> str:
"""Product version for FastAPI title / OpenAPI schema.
Single source of truth is `pyproject.toml` `[project].version`; we read
it back via `importlib.metadata` at runtime so `/docs`, `/openapi.json`,
`/api/version`, `/cli/latest`, and `da --version` can never drift.
"""
try:
return _pkg_version("agnes-the-ai-analyst")
except PackageNotFoundError:
return "dev"
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.middleware.gzip import GZipMiddleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.types import ASGIApp, Receive, Scope, Send
class _SelectiveGZipMiddleware:
"""GZipMiddleware wrapper that skips a set of path prefixes.
Parquet-serving endpoints send responses that are already columnar-
compressed (parquet's internal codec) and — for /api/data — can reach
hundreds of MB. Gzipping them on the way out costs CPU and latency with
no meaningful size reduction. Skip those paths; every other endpoint
(JSON manifests, HTML previews, install.sh) still gets compressed.
"""
def __init__(self, app: ASGIApp, minimum_size: int = 1024, skip_prefixes: tuple[str, ...] = ()) -> None:
self._raw = app
self._gzip = GZipMiddleware(app, minimum_size=minimum_size)
self._skip_prefixes = skip_prefixes
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope.get("type") == "http":
path = scope.get("path", "")
if any(path.startswith(p) for p in self._skip_prefixes):
await self._raw(scope, receive, send)
return
await self._gzip(scope, receive, send)
from app.auth.router import router as auth_router
from app.api.health import router as health_router
from app.api.sync import router as sync_router
from app.api.data import router as data_router
from app.api.query import router as query_router
from app.api.users import router as users_router
from app.api.memory import router as memory_router
from app.api.upload import router as upload_router
from app.api.scripts import router as scripts_router
from app.api.settings import router as settings_router
from app.api.catalog import router as catalog_router
from app.api.telegram import router as telegram_router
from app.api.access import router as access_router, me_router as me_access_router
from app.api.me_debug import router as me_debug_router
from app.api.admin import router as admin_router
from app.api.permissions import router as permissions_router
from app.api.access_requests import router as access_requests_router
from app.api.jira_webhooks import router as jira_webhooks_router
from app.api.metrics import router as metrics_router
from app.api.metadata import router as metadata_router
from app.api.query_hybrid import router as query_hybrid_router
from app.api.cli_artifacts import router as cli_artifacts_router
from app.api.tokens import router as tokens_router, admin_router as tokens_admin_router
from app.api.v2_catalog import router as v2_catalog_router
from app.api.v2_schema import router as v2_schema_router
from app.api.v2_sample import router as v2_sample_router
from app.api.v2_scan import router as v2_scan_router
from app.api.marketplaces import router as marketplaces_router
from app.marketplace_server.router import router as marketplace_server_router
from app.marketplace_server.git_router import make_git_wsgi_app
from app.web.router import router as web_router
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app):
# Issue #81 Group A — log the effective remote_attach allowlist at
# startup so an operator's typo in AGNES_REMOTE_ATTACH_EXTENSIONS
# (which REPLACES, not extends, the default) is visible.
try:
from src.orchestrator_security import log_effective_policy
log_effective_policy()
except Exception:
pass # never block startup on a logging convenience
yield
from src.db import close_system_db
close_system_db()
def create_app() -> FastAPI:
app = FastAPI(
title="AI Data Analyst",
description="Data distribution platform for AI analytical systems",
version=_app_version(),
lifespan=lifespan,
)
# Compress JSON / HTML responses on the wire. Parquet downloads are
# excluded — they're already columnar-compressed and re-gzipping them
# just burns CPU with no size win. minimum_size=1024 keeps tiny
# responses uncompressed too (cheaper than the header overhead).
app.add_middleware(
_SelectiveGZipMiddleware,
minimum_size=1024,
skip_prefixes=(
"/api/data/",
"/cli/wheel/",
"/cli/download",
"/marketplace.git", # git smart-HTTP is self-chunked; double-gzip bloats
),
)
# Session middleware (required for OAuth state)
from app.secrets import get_session_secret
session_secret = get_session_secret()
if len(session_secret) < 32:
# Same gate JWT applies (app/auth/jwt.py:_get_secret_key) — keeps the
# two HMAC surfaces consistent. session_internal_roles + google_groups
# are trusted off the cookie signature; a weak SESSION_SECRET means
# those gates are weak too.
import warnings as _warnings
_warnings.warn(
f"SESSION_SECRET is {len(session_secret)} chars — minimum 32 recommended",
UserWarning, stacklevel=2,
)
app.add_middleware(SessionMiddleware, secret_key=session_secret)
# CORS for CLI and external clients
cors_origins = os.environ.get("CORS_ORIGINS", "http://localhost:3000,http://localhost:8000").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in cors_origins],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load .env_overlay (persisted by /api/admin/configure)
_overlay = Path(os.environ.get("DATA_DIR", "./data")) / "state" / ".env_overlay"
if _overlay.exists():
for line in _overlay.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
# Load instance config on startup
try:
from app.instance_config import load_instance_config
load_instance_config()
logger.info("Instance config loaded")
except Exception as e:
logger.warning(f"Could not load instance config: {e}")
# Configure confidence scoring from instance config (corporate_memory.confidence section)
try:
from app.instance_config import get_corporate_memory_config
from services.corporate_memory.confidence import configure as configure_confidence
cm_config = get_corporate_memory_config()
if cm_config and "confidence" in cm_config:
configure_confidence(cm_config["confidence"])
logger.info("Corporate memory confidence config applied")
except Exception as e:
logger.warning(f"Could not configure corporate memory confidence: {e}")
# Startup banner
from src.db import SCHEMA_VERSION
logger.info(
"Agnes %s | channel: %s | schema v%s",
os.environ.get("AGNES_VERSION", "dev"),
os.environ.get("RELEASE_CHANNEL", "dev"),
SCHEMA_VERSION,
)
# LOCAL_DEV_MODE: bypass authentication for local development. DO NOT enable in prod.
# When on, every protected route auto-logs in as a seeded admin user (default dev@localhost).
from app.auth.dependencies import (
is_local_dev_mode, get_local_dev_email, get_local_dev_groups,
)
if is_local_dev_mode():
logger.warning("=" * 60)
logger.warning("LOCAL_DEV_MODE is ON — authentication is bypassed.")
logger.warning("All requests auto-authenticate as: %s", get_local_dev_email())
# Validate + report LOCAL_DEV_GROUPS at startup so a malformed JSON
# value gets surfaced loudly here instead of silently warning on the
# first authenticated request. Empty when unset is fine — just say so.
raw_groups_env = os.environ.get("LOCAL_DEV_GROUPS", "").strip()
mocked_groups = get_local_dev_groups()
if raw_groups_env and not mocked_groups:
logger.warning(
"LOCAL_DEV_GROUPS is set but produced no valid groups — "
"check the WARNING above for the parse error.",
)
elif mocked_groups:
logger.warning(
"LOCAL_DEV_GROUPS: mocking %d group(s) into session: %s",
len(mocked_groups),
", ".join(g["id"] for g in mocked_groups),
)
else:
logger.warning("LOCAL_DEV_GROUPS is unset — session.google_groups will be empty.")
logger.warning("NEVER enable this in a deployment reachable from the internet.")
logger.warning("=" * 60)
# Seed admin user (SEED_ADMIN_EMAIL) and add them to the Admin user_group.
# Optional SEED_ADMIN_PASSWORD lets the seeded user sign in immediately
# without going through bootstrap; never overwritten if already set.
# The Admin/Everyone user_groups themselves are seeded inside
# _ensure_schema (src.db._seed_system_groups), so this hook only has to
# handle membership for the seed admin.
seed_email = os.environ.get("SEED_ADMIN_EMAIL") or (get_local_dev_email() if is_local_dev_mode() else None)
if seed_email:
try:
from src.db import SYSTEM_ADMIN_GROUP, get_system_db
from src.repositories.user_group_members import UserGroupMembersRepository
from src.repositories.users import UserRepository
conn = get_system_db()
repo = UserRepository(conn)
seed_password = os.environ.get("SEED_ADMIN_PASSWORD") or None
password_hash = None
if seed_password:
from argon2 import PasswordHasher
password_hash = PasswordHasher().hash(seed_password)
existing = repo.get_by_email(seed_email)
if not existing:
import uuid
user_id = str(uuid.uuid4())
repo.create(
id=user_id,
email=seed_email,
name="Admin",
role="admin",
password_hash=password_hash,
)
logger.info("Seeded admin user: %s (password=%s)", seed_email, "yes" if password_hash else "no")
else:
user_id = existing["id"]
if password_hash and not existing.get("password_hash"):
repo.update(id=user_id, password_hash=password_hash)
logger.info("Set password on existing seed admin: %s", seed_email)
# Make sure the seed admin is actually in the Admin group — this
# is what gives them admin access in v12. Idempotent.
admin_group = conn.execute(
"SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP],
).fetchone()
if admin_group:
UserGroupMembersRepository(conn).add_member(
user_id=user_id,
group_id=admin_group[0],
source="system_seed",
added_by="app.main:seed_admin",
)
conn.close()
except Exception as e:
logger.warning(f"Could not seed admin: {e}")
# Seed the synthetic scheduler user when SCHEDULER_API_TOKEN is configured,
# so the very first cron tick after a fresh deploy already has a valid
# actor to attribute audit-log entries to. The lazy seed in
# `app.auth.scheduler_token.get_scheduler_user` covers the case where the
# secret is rotated mid-life, but doing it here keeps startup observable.
from app.auth.scheduler_token import get_scheduler_secret
if get_scheduler_secret():
try:
from app.auth.scheduler_token import (
SCHEDULER_TOKEN_MIN_LENGTH,
ensure_scheduler_user,
)
from src.db import get_system_db
secret = get_scheduler_secret()
if len(secret) < SCHEDULER_TOKEN_MIN_LENGTH:
logger.warning(
"SCHEDULER_API_TOKEN is set but only %d chars — auth path"
" disabled (minimum %d). Generate a longer secret in .env.",
len(secret), SCHEDULER_TOKEN_MIN_LENGTH,
)
else:
conn = get_system_db()
try:
ensure_scheduler_user(conn)
finally:
conn.close()
except Exception as e:
logger.warning(f"Could not seed scheduler user: {e}")
# C8: Warn when no user has a password_hash — bootstrap endpoint is open.
# This is intentional UX (operator can claim seed admin), but the open
# window should be visible in startup logs so it's not forgotten.
if not is_local_dev_mode():
try:
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
repo = UserRepository(conn)
all_users = repo.list_all()
has_password = any(u.get("password_hash") for u in all_users)
if not has_password:
logger.warning(
"No user has a password set — /auth/bootstrap is reachable. "
"Claim the seed admin (or set SEED_ADMIN_PASSWORD) to close this window."
)
conn.close()
except Exception:
pass # never block startup on a logging convenience
# Static files
static_dir = Path(__file__).parent / "web" / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
# Auth providers (conditional registration)
from app.auth.providers.google import router as google_auth_router, is_available as google_available
from app.auth.providers.password import router as password_auth_router
from app.auth.providers.email import router as email_auth_router, is_available as email_available
# API routers
app.include_router(auth_router)
app.include_router(google_auth_router)
app.include_router(password_auth_router)
app.include_router(email_auth_router) # Always register, check availability per-request
app.include_router(health_router)
app.include_router(sync_router)
app.include_router(data_router)
app.include_router(query_router)
app.include_router(users_router)
app.include_router(memory_router)
app.include_router(upload_router)
app.include_router(scripts_router)
app.include_router(settings_router)
app.include_router(catalog_router)
app.include_router(telegram_router)
app.include_router(admin_router)
app.include_router(access_router)
app.include_router(me_access_router)
app.include_router(me_debug_router)
app.include_router(permissions_router)
app.include_router(access_requests_router)
app.include_router(jira_webhooks_router)
app.include_router(metrics_router)
app.include_router(metadata_router)
app.include_router(query_hybrid_router)
app.include_router(cli_artifacts_router)
app.include_router(tokens_router)
app.include_router(tokens_admin_router)
app.include_router(v2_catalog_router)
app.include_router(v2_schema_router)
app.include_router(v2_sample_router)
app.include_router(v2_scan_router)
app.include_router(marketplaces_router)
app.include_router(marketplace_server_router)
# Git smart-HTTP endpoint for Claude Code: /marketplace.git/*
# WSGI → ASGI bridge (dulwich is WSGI-native; FastAPI is ASGI).
from a2wsgi import WSGIMiddleware
app.mount("/marketplace.git", WSGIMiddleware(make_git_wsgi_app()))
# Web UI router (must be last — has catch-all routes)
app.include_router(web_router)
# Paths served as API responses (JSON / ZIP / git smart-HTTP) — never
# redirect a 401 here to the HTML login page; clients expect the raw 401.
_API_PATH_PREFIXES: tuple[str, ...] = (
"/api/",
"/auth/",
"/marketplace.zip",
"/marketplace.git",
"/marketplace/",
)
@app.exception_handler(StarletteHTTPException)
async def _html_auth_redirect_handler(request, exc: StarletteHTTPException):
"""Redirect unauthenticated HTML page loads (GET) to /login.
Only GET requests outside the API prefixes are redirected — that
targets browser navigations to HTML pages. POSTs, API prefixes, and
non-401 errors fall through to Starlette's default JSON response so
JSON clients (including `/auth/tokens` for PAT CRUD and
`/marketplace.zip` consumed by Claude Code) keep their existing
contract.
"""
if (
exc.status_code == 401
and request.method == "GET"
and not request.url.path.startswith(_API_PATH_PREFIXES)
):
next_param = quote(request.url.path, safe="")
return RedirectResponse(url=f"/login?next={next_param}", status_code=302)
from fastapi.exception_handlers import http_exception_handler
return await http_exception_handler(request, exc)
return app
app = create_app()