agnes-the-ai-analyst/services/scheduler/__main__.py
ZdenekSrotyr 995e4cd366
fix(scheduler): HTTP marketplaces job + SCHEDULER_API_TOKEN shared secret (#127)
* fix(scheduler): HTTP marketplaces job + SCHEDULER_API_TOKEN shared secret

Two scheduler-reliability bugs surfaced after the v0.12.1 USER-agnes flip:

1. The marketplaces job called src.marketplace.sync_marketplaces() in-process
   from the scheduler container, racing the app's long-lived system.duckdb
   handle. DuckDB rejects cross-process writers — every cron tick 500-ed on
   "Could not set lock on file ... PID 0".

2. The data-refresh + new marketplaces jobs both 401-ed on the API because
   SCHEDULER_API_TOKEN was never propagated by the Terraform startup script.
   The scheduler had no credential to authenticate with.

Fix:
- New POST /api/marketplaces/sync-all (admin-only) drives the nightly refresh
  through the app process so it inherits the existing DB connection.
- Scheduler swaps fn->http for marketplaces; all jobs are now plain HTTP and
  the scheduler is reduced to a cron clock.
- New app/auth/scheduler_token.py adds a shared-secret auth path. The
  startup script generates a 256-bit secret on first boot, persists it
  across reboots, and writes it to /opt/agnes/.env. Both containers source
  the same .env. The app validates incoming Bearer tokens against the env
  var (constant-time, length-floored) and resolves matches to a synthetic
  scheduler@system.local user that's a member of the Admin system group.
  Audit-log entries from the scheduler are attributed to this user.
- app/main.py seeds the synthetic user at startup so the first cron tick
  has a valid actor; lazy seed in get_scheduler_user covers token rotation
  before the next app restart.

Tests: 5 new in tests/test_auth_scheduler_token.py covering empty/short
secret rejection, exact-match comparison, idempotent user seeding, and
lazy provisioning. 142 marketplace + scheduler tests + 96 auth tests
remain green.

Existing VMs with .env from before this change need a one-time
re-provisioning (re-run startup-script or rotate via openssl rand);
documented in CHANGELOG.

* fix(audit): use '_all' sentinel for bulk marketplace sync — Devin review #127

Avoids the literal string 'marketplace:None' in the audit_log resource
column when the bulk sync endpoint writes its summary row.

* fix(scheduler): unblock event loop + per-job timeouts — Devin review #127

Two findings from Devin re-review on commit 5fbad15:

1. BUG: trigger_sync_all was async def, so FastAPI ran it on the asyncio
   event loop. sync_marketplaces() does blocking I/O (subprocess git
   clones up to GIT_TIMEOUT_SEC=300 each, threading.Lock, DuckDB writes)
   and would freeze every concurrent request for the duration of a bulk
   sync. Switched to plain def so FastAPI auto-routes to the thread pool.

2. ANALYSIS: scheduler used a fixed 120s httpx timeout for every POST.
   Bulk marketplace sync iterates the registry under a single lock with
   up to 300s per repo — easily exceeds 120s on 2-3 slow repos. The
   scheduler then sees a timeout, doesn't update last_run, and re-fires
   on the next 30s tick, queueing redundant work. Per-job timeout
   override added to the JOBS tuple; marketplaces gets 900s (15 min),
   data-refresh keeps 120s, health-check 30s.

* fix(auth): require_session_token rejects scheduler shared secret — Devin review #127

require_session_token gates /auth/tokens (PAT minting). Pre-fix it only
rejected JWTs with typ=pat — but the scheduler shared secret is an opaque
string, so verify_token() returns None, payload becomes {}, and the
PAT-claim check silently passed. A caller bearing SCHEDULER_API_TOKEN
could mint persistent PATs that survive a secret rotation.

Added explicit is_scheduler_token() check before the PAT-claim check;
new regression test in tests/test_auth_scheduler_token.py.

Devin's other note (pre-existing async def trigger_sync at marketplaces.py:392
also calls blocking sync_one) — Devin flagged it as out-of-scope for this PR
and I agree; tracking separately.

* release(0.17.0): cut + clean up CHANGELOG duplicates

Cuts 0.17.0 (minor: scheduler shared-secret auth + sync-all endpoint
plus the deploy-shape fixes that landed since the last release tag).

Bumps pyproject from 0.15.0 — also corrects the missed bump from PR #120
(v0.16.0 was tagged on GitHub and shipped as :stable, but pyproject
stayed at 0.15.0, so /api/version, /cli/latest, and `da --version` had
been under-reporting the running release).

Removes the long-form duplicate entries for 0.13.0 / 0.14.0 / 0.15.0
above [0.16.0] — the canonical short summaries (with GitHub-release
links) already exist below 0.16.0, the long forms were leftover state
from before those versions were cut and have been silently shadowed
ever since.
2026-04-29 11:44:00 +02:00

150 lines
5.5 KiB
Python

"""Scheduler service — replaces systemd timers.
Lightweight sidecar that fires scheduled jobs over HTTP against the main
app. Authenticates with ``SCHEDULER_API_TOKEN`` (shared-secret synthetic
admin — see ``app.auth.scheduler_token``); falls back to no-auth in
LOCAL_DEV_MODE.
Schedules are strings parsed by ``src.scheduler.is_table_due`` — accepts
"every 15m", "every 1h", "daily 03:00", "daily 07:00,13:00".
Why every job is HTTP and nothing runs in-process: the scheduler container
shares ``/data/state/system.duckdb`` with the app container, but DuckDB
permits only one writer per file across processes. An in-process call
from the scheduler raced the app's long-lived handle and 500-ed on
``Could not set lock on file``. Going through HTTP makes the app the sole
writer; the scheduler is reduced to a pure cron clock.
Usage: python -m services.scheduler
"""
import logging
import os
import signal
import time
from datetime import datetime, timezone
import httpx
from src.scheduler import is_table_due
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s [scheduler] %(message)s",
)
logger = logging.getLogger(__name__)
API_URL = os.environ.get("API_URL", "http://localhost:8000")
SCHEDULER_API_TOKEN = os.environ.get("SCHEDULER_API_TOKEN", "")
_token_warning_emitted = False
def _get_auth_token() -> str:
"""Return the bearer token for API calls.
Production: ``SCHEDULER_API_TOKEN`` is a shared secret generated by the
Terraform startup script and written to ``/opt/agnes/.env``. Both the
``app`` and ``scheduler`` containers source the same .env via Docker
Compose ``env_file:``, so the secret is symmetric. The app validates
incoming Bearer tokens against this env var (constant-time compare in
``app.auth.scheduler_token``) and resolves matches to a synthetic
``scheduler@system.local`` user that is a member of the Admin group.
Dev / LOCAL_DEV_MODE: leave it unset. The scheduler returns the empty
string and calls the API without an ``Authorization`` header — the
API's dev-bypass auto-authenticates the request as the dev user.
"""
global _token_warning_emitted
if SCHEDULER_API_TOKEN:
return SCHEDULER_API_TOKEN
if not _token_warning_emitted:
logger.warning(
"SCHEDULER_API_TOKEN is not set — calling the API without "
"Authorization. Required in production; in LOCAL_DEV_MODE "
"the dev-bypass auto-authenticates and this is fine."
)
_token_warning_emitted = True
return ""
# Schedule definitions: (name, schedule_string, endpoint, method, timeout_sec).
# All jobs are HTTP — see the module docstring for why nothing runs
# in-process anymore. ``daily 03:00`` for marketplaces matches the cadence
# the previous in-process job used; the endpoint is admin-only and
# idempotent (it iterates the registry and per-marketplace errors do not
# abort the run).
#
# timeout_sec: per-job override for the httpx call. Marketplaces gets a
# generous 15 min because the app handler iterates every registered
# marketplace under a single lock with up to 300s of git timeout per
# entry — at 120s (the default that data-refresh uses) a real-world
# registry of more than 2-3 slow repos times out the scheduler call,
# which then re-fires on the next 30s tick and queues a redundant sync.
JOBS = [
("data-refresh", "every 15m", "/api/sync/trigger", "POST", 120),
("health-check", "every 5m", "/api/health", "GET", 30),
("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900),
]
_running = True
def _signal_handler(sig, frame):
global _running
logger.info(f"Received signal {sig}, shutting down...")
_running = False
def _call_api(endpoint: str, method: str, timeout_sec: int) -> bool:
"""Call the main app API. Returns True on success."""
url = f"{API_URL}{endpoint}"
headers = {}
token = _get_auth_token()
if token:
headers["Authorization"] = f"Bearer {token}"
try:
if method == "POST":
resp = httpx.post(url, headers=headers, timeout=timeout_sec)
else:
resp = httpx.get(url, headers=headers, timeout=timeout_sec)
if resp.status_code < 400:
logger.info(f"Job {endpoint}: {resp.status_code}")
return True
else:
logger.warning(f"Job {endpoint}: HTTP {resp.status_code} - {resp.text[:200]}")
return False
except Exception as e:
logger.error(f"Job {endpoint} failed: {e}")
return False
def run():
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
logger.info(f"Scheduler started. API_URL={API_URL}, {len(JOBS)} jobs configured.")
# Track last successful run per job as ISO string — matches what
# src.scheduler.is_table_due expects.
last_run: dict[str, str | None] = {name: None for name, *_ in JOBS}
while _running:
now_iso = datetime.now(timezone.utc).isoformat()
for name, schedule, endpoint, method, timeout_sec in JOBS:
if not is_table_due(schedule, last_run[name]):
continue
logger.info("Running job: %s (%s)", name, schedule)
ok = _call_api(endpoint, method, timeout_sec)
if ok:
last_run[name] = now_iso
# 30s tick is plenty: interval jobs have minute-level resolution,
# daily jobs have a ~24 h retry window.
time.sleep(30)
logger.info("Scheduler stopped.")
if __name__ == "__main__":
run()