agnes-the-ai-analyst/services/scheduler/__main__.py
Vojtech d6ad08f107
Flea-market upload guardrails + soft delete + JOIN-based admin queue (#233)
* feat(store): flea-market upload guardrails + soft delete + JOIN-based admin queue

Adds an end-to-end guardrails pipeline for store uploads (manifest +
static-security + LLM review), persists blocked bundles for forensics,
introduces soft-delete (Archive) semantics, consolidates the legacy
/store/{id} surface into /marketplace/flea/{id}, and reworks the admin
queue so lifecycle filters read live entity visibility via LEFT JOIN
rather than a denormalized submission column.

Schema v29 → v35:
  * v29 store_submissions table + store_entities.visibility_status
  * v30 file_size, bundle_sha256, bundle_purged_at on submissions
  * v31 reshape store_submissions (drop legacy unique on entity_id)
  * v32 store_entities.archived_at/by + 'archived' visibility value
  * v33 drop store_submissions.retry_count (unused)
  * v34 ensure idx_store_submissions_entity exists post column-drop
  * v35 broaden visibility_status enum + JOIN architecture cutover

Pipeline (src/store_guardrails/):
  * Inline checks: manifest_check, static_scan, quality_check
  * LLM review configurable haiku|sonnet|opus (default haiku)
  * BackgroundTasks-driven async path with structured-output JSON
  * Per-submitter daily quota (default 50)
  * 30-day TTL purge job (POST /api/admin/run-blocked-purge)
  * Bundle SHA256 + size persisted; sha256 survives purge for forensics

Visibility model:
  * pending | approved | hidden | archived
  * _enforce_visibility returns 404 (no leak) for non-owner non-admin
  * Owner sees own non-approved entries via include_owner_id widening
  * Install refused with 409 entity_not_approved when not approved

Soft-delete (DELETE /api/store/entities/{id}):
  * Default = soft (visibility_status='archived'); existing installs
    keep getting served the bundle so users don't lose the plugin
  * ?hard=true admin-only: drops bundle + cascades user_store_installs
  * Hard-delete preserves entity_id on submission as tombstone so
    audit_log linkage survives for the activity timeline

Admin queue lifecycle (the JOIN refactor):
  * Verdict (store_submissions.status) is immutable forensic record
  * Lifecycle (store_entities.visibility_status) is live state
  * /admin/store/submissions Archived chip translates to
    `e.visibility_status='archived'` via LEFT JOIN — any path that
    flips visibility surfaces in the queue immediately
  * Detail page renders Status (verdict) and Entity lifecycle side by
    side so admins see "approved at review, now archived" at a glance

URL consolidation:
  * /store/{id} deleted (no redirect, stale bookmarks 404)
  * /marketplace/flea/{id} is the canonical detail surface
  * Three in-tree callers (upload-success, my-stack card, store
    listing card) updated to point at the new URL
  * Quarantine banner extracted to _quarantine_banner.html partial,
    self-guarded, included from both flea detail templates
  * Banner JS auto-refreshes when the verdict lands by polling
    /api/marketplace/flea/{id}/detail (visibility_status +
    submission_status — the latter is needed because blocked_llm
    keeps the entity at visibility_status='pending')

Audit log resource format:
  * runner.py emits prefixed `store_submission:{id}` (post-fix)
  * Detail-page timeline query handles three patterns: prefixed
    submission, helper-emitted `store_entity:{sub_id}`, and bare-id
    legacy rows — all surface in the activity timeline

UX fixes:
  * Owner sees Under review / Quarantined / Hidden banner with status
  * Install button gray-disabled (not blue) when non-approved
  * Owner cannot delete quarantined entries (403); admin can
  * Admin queue: filter chips, sortable columns, paging, page-size
  * Auto-refresh queue every 5s while pending rows are visible
  * Store upload page file picker no longer opens twice (label →
    input default action collided with explicit JS handler)

Tests: 168 passed across the guardrails suites (admin submissions,
store API, inline / LLM / purge guardrails, store repositories,
marketplace filter, schema version). New regression coverage
includes: archive surfaces via JOIN even when API path is bypassed;
deleted submission renders activity timeline (tombstone); flea
detail surfaces submission_status only for owner/admin; detail page
renders Entity lifecycle row; audit log resource format covers both
helper and runner paths.

* fix(store-guardrails): PR #233 follow-up — prompt injection, atomic PUT, BG race, schema, reaper, sort whitelist

Addresses 9 of the 23 findings from the PR #233 review (spec at
docs/superpowers/specs/2026-05-09-pr233-guardrails-fixes-spec.md).
Merge-gate items #1-#6 plus high-value mediums #7, #9-#12, #23.
Architectural items (#8 enum split, #14 factory) and pure
maintainability (#15-#22) deferred to follow-ups.

Security:
* #1 prompt injection — SYSTEM_PROMPT now passed via the SDK's
  dedicated system= parameter; bundle wrapped in <bundle>...</bundle>
  sentinels declared data-only by the system prompt; literal
  sentinel strings in user content are escaped so an adversarial
  README can't forge a close tag.
* #6 static scan honesty — module docstring + admin copy + docs
  declare static scan as signal not gate; .md/.txt/.rst/.html/.json/
  .yaml/.yml/.toml skipped to avoid false positives on prose.
  AST mode for Python deferred (separate flag, FP comparison work).

Correctness:
* #2 PUT atomicity — bundles bake into plugin.staging-<rand>/
  alongside live, atomic-rename on success; failed checks leave
  live tree byte-for-byte intact.
* #3 BG-task race — set_visibility_if_pending guards verdict flips
  to the (pending, hidden) review window; admin archives during
  review survive; skipped flips audit-logged.
* #4 v35 NOT NULL/DEFAULT — schema v35→v36 re-applies them on
  store_entities.visibility_status. CHECK constraint enforced
  application-side (DuckDB ADD CHECK on existing column unsupported).
* #7 stuck-review reaper — reap_stuck_llm_reviews flips pending_llm
  rows older than guardrails.stuck_review_grace_seconds (default
  1800) to review_error. Scheduler runs every 15 min via new
  /api/admin/run-reap-stuck-reviews. Set knob to 0 to disable.
* #9 quota counter — count_blocked_for_submitter_since now counts
  blocked_inline + blocked_llm + review_error so a submitter
  triggering only LLM-blocked verdicts is bounded.
* #10 missing risk_level — surfaces as review_error with
  error='missing_risk_level' instead of silently defaulting to
  'medium' (which looked like a model-decided block).
* #11 archived_at clear — set_visibility nulls archived_at +
  archived_by when transitioning out of 'archived' so a future
  read doesn't show stale archive forensics on an approved row.

Maintainability:
* #12 FSM doc comment — accurate insert/transition/lifecycle
  description in src/db.py near store_submissions schema.
* #23 sort-key whitelist — admin queue rejects unknown sort keys
  with 400 invalid_sort_key; substring-replace footgun removed.

Deferred (separate PRs):
* #5 quota race — proper fix requires asyncio.Lock spanning the
  full pipeline; threading.Lock blocks event loop, DuckDB MVCC
  doesn't help. API-level slowapi bounds worst case for now.
* #6 part 3 (AST static scan), #8 (enum split), #13 (import
  bundle docs), #14 (factory consolidation), #15-#22 (maint).

Tests:
* New: tests/test_store_guardrails_prompt_injection.py (corpus +
  trust-boundary invariants), tests/test_store_put_atomic.py,
  tests/test_store_guardrails_reaper.py.
* Extended: test_store_guardrails_llm.py (system param, missing
  risk_level, BG race), test_admin_store_submissions.py (quota
  counter widening, sort whitelist 400), test_store_repositories.py
  (un-archive metadata clear), test_db_schema_version.py (v36).
* Full suite: 3738 passed; 17 pre-existing baseline failures
  unchanged (db migration tests, cli binary rename, catalog export,
  user mgmt v5 backfill — confirmed by stash + rerun on clean tree).
2026-05-09 17:32:53 +04:00

316 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Scheduler service — replaces systemd timers.
Lightweight sidecar that fires scheduled jobs over HTTP against the main
app. Authenticates with ``SCHEDULER_API_TOKEN`` (shared-secret synthetic
admin — see ``app.auth.scheduler_token``); falls back to no-auth in
LOCAL_DEV_MODE.
Schedules are strings parsed by ``src.scheduler.is_table_due`` — accepts
"every 15m", "every 1h", "daily 03:00", "daily 07:00,13:00".
Why every job is HTTP and nothing runs in-process: the scheduler container
shares ``/data/state/system.duckdb`` with the app container, but DuckDB
permits only one writer per file across processes. An in-process call
from the scheduler raced the app's long-lived handle and 500-ed on
``Could not set lock on file``. Going through HTTP makes the app the sole
writer; the scheduler is reduced to a pure cron clock.
Usage: python -m services.scheduler
"""
import logging
import os
import signal
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
import httpx
from app.logging_config import setup_logging
from src.scheduler import is_table_due
setup_logging(__name__)
logger = logging.getLogger(__name__)
API_URL = os.environ.get("API_URL", "http://localhost:8000")
SCHEDULER_API_TOKEN = os.environ.get("SCHEDULER_API_TOKEN", "")
_token_warning_emitted = False
def _get_auth_token() -> str:
"""Return the bearer token for API calls.
Production: ``SCHEDULER_API_TOKEN`` is a shared secret generated by the
Terraform startup script and written to ``/opt/agnes/.env``. Both the
``app`` and ``scheduler`` containers source the same .env via Docker
Compose ``env_file:``, so the secret is symmetric. The app validates
incoming Bearer tokens against this env var (constant-time compare in
``app.auth.scheduler_token``) and resolves matches to a synthetic
``scheduler@system.local`` user that is a member of the Admin group.
Dev / LOCAL_DEV_MODE: leave it unset. The scheduler returns the empty
string and calls the API without an ``Authorization`` header — the
API's dev-bypass auto-authenticates the request as the dev user.
"""
global _token_warning_emitted
if SCHEDULER_API_TOKEN:
return SCHEDULER_API_TOKEN
if not _token_warning_emitted:
logger.warning(
"SCHEDULER_API_TOKEN is not set — calling the API without "
"Authorization. Required in production; in LOCAL_DEV_MODE "
"the dev-bypass auto-authenticates and this is fine."
)
_token_warning_emitted = True
return ""
# --- Env parsing ------------------------------------------------------------
_DEFAULTS = {
"SCHEDULER_DATA_REFRESH_INTERVAL": 15 * 60, # seconds
"SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60,
"SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60,
"SCHEDULER_TICK_SECONDS": 30,
# LLM pipeline cadences (#176, #179 review). Defaults preserve the
# 10m / 15m / 17m coprime offset so the three jobs don't fire on the
# same tick and stack their API + DB load. The verification-detector
# default (900s) is also the source of truth for the health-check
# staleness grace window in app/api/health.py — single env var drives
# both, so an operator changing the cadence moves both.
"SCHEDULER_SESSION_COLLECTOR_INTERVAL": 10 * 60,
# Drives the verification session-processor cadence AND the
# health-check staleness grace window in app/api/health.py
# (single env var → both, so an operator changing the cadence moves
# both). Name retained post session-pipeline refactor for operator
# compatibility — existing docker-compose env files keep working.
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL": 15 * 60,
"SCHEDULER_USAGE_PROCESSOR_INTERVAL": 10 * 60,
"SCHEDULER_CORPORATE_MEMORY_INTERVAL": 17 * 60,
}
def _read_positive_int(name: str) -> int:
"""Read an env var as a positive integer or fall back to the default.
Treats unset env (``None``) as "use default". Treats explicitly empty
string (``""``) as an operator typo and raises — silently defaulting
on a literal ``FOO=`` in the env_file would mask configuration bugs.
"""
raw = os.environ.get(name)
if raw is None:
if name not in _DEFAULTS:
raise ValueError(f"Unknown scheduler env var: {name}")
return _DEFAULTS[name]
if raw == "":
raise ValueError(f"{name}='' must be a positive integer (seconds)")
try:
value = int(raw)
except (TypeError, ValueError):
raise ValueError(f"{name}={raw!r} must be a positive integer (seconds)")
if value <= 0:
raise ValueError(f"{name}={value} must be > 0 (seconds)")
return value
def _seconds_to_schedule(seconds: int) -> str:
"""Convert a seconds value to the closest 'every Nm' / 'every Nh' string.
Uses ceiling division so a non-multiple-of-60 input never produces a
schedule that fires MORE often than the operator configured (90s →
'every 2m', not 'every 1m'). Sub-minute inputs clamp to 'every 1m'
because the schedule grammar has minute-level resolution.
"""
if seconds % 3600 == 0 and seconds >= 3600:
return f"every {seconds // 3600}h"
# Ceiling division: -(-x // y) is the standard trick.
minutes = max(1, -(-seconds // 60))
return f"every {minutes}m"
def resolved_tick_seconds() -> int:
"""Read + validate SCHEDULER_TICK_SECONDS in isolation (test helper)."""
return _read_positive_int("SCHEDULER_TICK_SECONDS")
def build_jobs() -> list[tuple[str, str, str, str, int]]:
"""Build the JOBS list from env, applying defaults and validation.
Tuple shape: (name, schedule_string, endpoint, method, http_timeout_sec).
Marketplaces stays hardcoded — promoting it to env is out of #77 scope.
"""
refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL")
health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL")
scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL")
sess = _read_positive_int("SCHEDULER_SESSION_COLLECTOR_INTERVAL")
verify = _read_positive_int("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL")
usage = _read_positive_int("SCHEDULER_USAGE_PROCESSOR_INTERVAL")
corpmem = _read_positive_int("SCHEDULER_CORPORATE_MEMORY_INTERVAL")
tick = _read_positive_int("SCHEDULER_TICK_SECONDS")
smallest = min(refresh, health, scripts, sess, verify, usage, corpmem)
if tick > smallest:
raise ValueError(
f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job "
f"interval ({smallest}s) so jobs don't consistently miss their "
f"cadence by up to one tick"
)
return [
("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120),
("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30),
("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600),
("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900),
# LLM pipeline (#176, #179 review). Cadences are deliberately offset
# (10m / 15m / 17m by default — all coprime modulo the 30s tick) so
# the three LLM-driven jobs don't fire on the same tick and stack
# their API + DB load. Driven by env so an operator can throttle
# without a code change; the verification-detector cadence is the
# single source of truth for the health-check staleness grace
# window in app/api/health.py (which uses 2x the cadence).
("session-collector", _seconds_to_schedule(sess), "/api/admin/run-session-collector", "POST", 300),
# session-pipeline processors — independent loops, each invoked on
# its own cadence via the parametrized run-session-processor endpoint.
# Adding a third processor in the future is one line here + one entry
# in services/session_processors/__init__.py registry.
("session-processor:verification", _seconds_to_schedule(verify),
"/api/admin/run-session-processor?processor=verification", "POST", 900),
("session-processor:usage", _seconds_to_schedule(usage),
"/api/admin/run-session-processor?processor=usage", "POST", 300),
("corporate-memory", _seconds_to_schedule(corpmem), "/api/admin/run-corporate-memory", "POST", 900),
# v30: TTL purge of blocked-bundle bytes. Cheap (just rmtree
# + UPDATE), runs once daily at 04:00 UTC so the spike is
# visible in audit_log without competing with the marketplaces
# job at 03:00. Endpoint reads guardrails.blocked_bundle_ttl_days
# from instance.yaml and short-circuits when set to 0.
("store-blocked-purge", "daily 04:00", "/api/admin/run-blocked-purge", "POST", 600),
# Stuck-review reaper (#7). A submission stays at
# status='pending_llm' until the BackgroundTasks worker writes
# a verdict. If the worker crashes, the row sits forever. Run
# every 15 minutes; reap_stuck_llm_reviews flips rows older
# than guardrails.stuck_review_grace_seconds (default 1800)
# to review_error so admin can retry. Cheap (one indexed
# SELECT + N small UPDATEs); short timeout sufficient.
("store-reap-stuck-reviews", "every 15m", "/api/admin/run-reap-stuck-reviews", "POST", 60),
]
_running = True
def _signal_handler(sig, frame):
global _running
logger.info(f"Received signal {sig}, shutting down...")
_running = False
def _call_api(endpoint: str, method: str, timeout_sec: int) -> bool:
"""Call the main app API. Returns True on success."""
url = f"{API_URL}{endpoint}"
headers = {}
token = _get_auth_token()
if token:
headers["Authorization"] = f"Bearer {token}"
try:
if method == "POST":
resp = httpx.post(url, headers=headers, timeout=timeout_sec)
else:
resp = httpx.get(url, headers=headers, timeout=timeout_sec)
if resp.status_code < 400:
logger.info(f"Job {endpoint}: {resp.status_code}")
return True
else:
logger.warning(f"Job {endpoint}: HTTP {resp.status_code} - {resp.text[:200]}")
return False
except Exception as e:
logger.error(f"Job {endpoint} failed: {e}")
try:
from src.observability import get_posthog
get_posthog().capture_exception(
e,
distinct_id="system",
properties={"job": endpoint, "method": method, "component": "scheduler"},
)
except Exception:
logger.exception("PostHog capture_exception failed in scheduler")
return False
def run():
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
jobs = build_jobs()
tick = resolved_tick_seconds()
logger.info(
"Scheduler started. API_URL=%s, %d jobs, tick=%ds. Schedules: %s",
API_URL, len(jobs), tick,
{name: schedule for name, schedule, *_ in jobs},
)
last_run: dict[str, str | None] = {name: None for name, *_ in jobs}
# Per-tick concurrency: one thread per job slot, so a 900s verification
# run can't block the 60s health-check or the 30s data-refresh from
# firing on their own cadences (PR #232 review fix). Pure I/O workload
# (httpx) — GIL is irrelevant. `in_flight` prevents the same job being
# re-launched on a subsequent tick while the previous invocation is
# still running; otherwise a 10-min run during which 20 ticks fire
# would queue 20 duplicate POSTs against the same processor (the
# admin endpoint's per-processor lock would 409 most of them, but
# they'd still be wasted requests + audit-log noise).
in_flight: set[str] = set()
in_flight_lock = threading.Lock()
executor = ThreadPoolExecutor(max_workers=max(4, len(jobs)))
while _running:
now_iso = datetime.now(timezone.utc).isoformat()
for name, schedule, endpoint, method, timeout_sec in jobs:
if not is_table_due(schedule, last_run[name]):
continue
with in_flight_lock:
if name in in_flight:
# Previous tick's invocation hasn't returned yet; skip.
continue
in_flight.add(name)
logger.info("Running job: %s (%s)", name, schedule)
executor.submit(
_run_job, name, endpoint, method, timeout_sec, now_iso,
last_run, in_flight, in_flight_lock,
)
time.sleep(tick)
logger.info("Scheduler stopping; waiting for in-flight jobs.")
executor.shutdown(wait=True)
logger.info("Scheduler stopped.")
def _run_job(
name: str,
endpoint: str,
method: str,
timeout: int,
now_iso: str,
last_run: dict[str, "str | None"],
in_flight: set[str],
in_flight_lock: threading.Lock,
) -> None:
"""Execute one scheduled job + bookkeeping. Lifted out of run() so it's
unit-testable.
Advances last_run on terminal state (success OR failure) so a permanently
failing job retries on its cadence (e.g. 15 min), not on every scheduler
tick (default 30s). Pre-fix behavior caused a hot-loop on persistent 5xx —
30× more requests + LLM tokens than the operator configured. Errors still
surface via _call_api's logging + audit_log on the receiving side.
"""
try:
_call_api(endpoint, method, timeout)
finally:
last_run[name] = now_iso
with in_flight_lock:
in_flight.discard(name)
if __name__ == "__main__":
run()