agnes-the-ai-analyst/services/scheduler/__main__.py
ZdenekSrotyr b3841f5b6c release: 0.50.0 — persistent BQ metadata cache + scheduled refresh; catalog never blocks on BigQuery
Since 0.47.0 GET /api/v2/catalog enriched each remote BigQuery row by
fetching INFORMATION_SCHEMA.TABLE_STORAGE + COLUMNS through the DuckDB
BigQuery extension *inside the request*. On cold caches that fanned out
to O(N) sequential BQ jobs-API roundtrips — easily 90 s+ on partitioned
/ view-backed tables — and reliably blew the CLI's 30 s httpx
ReadTimeout. Reproduced with py-spy: three AnyIO worker threads stuck
inside connectors/bigquery/metadata._fetch_via_legacy_tables.

Refactor: enrichment is read exclusively from a new persistent
bq_metadata_cache DuckDB table (schema v40), populated by a scheduler-
driven refresh job at SCHEDULER_BQ_METADATA_REFRESH_INTERVAL (default
4 h). Cold catalog response on a fresh container is now tens of
milliseconds with metadata_freshness=never_fetched for unwarmed rows.

New surface:
  - POST /api/admin/run-bq-metadata-refresh (scheduler-driven, full)
  - POST /api/v2/metadata-cache/refresh?table=<id> (admin, single)
  - GET  /api/v2/metadata-cache/status (auth, non-admin)
  - metadata_freshness field per catalog row

Removed (internal API): v2_catalog._size_hint_for_row,
_resolve_remote_metadata, _metadata_provider_for,
_build_metadata_request, _materialized_size_hint, in-memory
_metadata_cache. Response shape unchanged for external consumers.

991 tests passing; 2 pre-existing failures (test_db v3→v4 ladder,
test_cli_binary_rename) unrelated to this change.
2026-05-11 20:37:17 +02:00

333 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Scheduler service — replaces systemd timers.
Lightweight sidecar that fires scheduled jobs over HTTP against the main
app. Authenticates with ``SCHEDULER_API_TOKEN`` (shared-secret synthetic
admin — see ``app.auth.scheduler_token``); falls back to no-auth in
LOCAL_DEV_MODE.
Schedules are strings parsed by ``src.scheduler.is_table_due`` — accepts
"every 15m", "every 1h", "daily 03:00", "daily 07:00,13:00".
Why every job is HTTP and nothing runs in-process: the scheduler container
shares ``/data/state/system.duckdb`` with the app container, but DuckDB
permits only one writer per file across processes. An in-process call
from the scheduler raced the app's long-lived handle and 500-ed on
``Could not set lock on file``. Going through HTTP makes the app the sole
writer; the scheduler is reduced to a pure cron clock.
Usage: python -m services.scheduler
"""
import logging
import os
import signal
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
import httpx
from app.logging_config import setup_logging
from src.scheduler import is_table_due
setup_logging(__name__)
logger = logging.getLogger(__name__)
API_URL = os.environ.get("API_URL", "http://localhost:8000")
SCHEDULER_API_TOKEN = os.environ.get("SCHEDULER_API_TOKEN", "")
_token_warning_emitted = False
def _get_auth_token() -> str:
"""Return the bearer token for API calls.
Production: ``SCHEDULER_API_TOKEN`` is a shared secret generated by the
Terraform startup script and written to ``/opt/agnes/.env``. Both the
``app`` and ``scheduler`` containers source the same .env via Docker
Compose ``env_file:``, so the secret is symmetric. The app validates
incoming Bearer tokens against this env var (constant-time compare in
``app.auth.scheduler_token``) and resolves matches to a synthetic
``scheduler@system.local`` user that is a member of the Admin group.
Dev / LOCAL_DEV_MODE: leave it unset. The scheduler returns the empty
string and calls the API without an ``Authorization`` header — the
API's dev-bypass auto-authenticates the request as the dev user.
"""
global _token_warning_emitted
if SCHEDULER_API_TOKEN:
return SCHEDULER_API_TOKEN
if not _token_warning_emitted:
logger.warning(
"SCHEDULER_API_TOKEN is not set — calling the API without "
"Authorization. Required in production; in LOCAL_DEV_MODE "
"the dev-bypass auto-authenticates and this is fine."
)
_token_warning_emitted = True
return ""
# --- Env parsing ------------------------------------------------------------
_DEFAULTS = {
"SCHEDULER_DATA_REFRESH_INTERVAL": 15 * 60, # seconds
"SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60,
"SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60,
"SCHEDULER_TICK_SECONDS": 30,
# LLM pipeline cadences (#176, #179 review). Defaults preserve the
# 10m / 15m / 17m coprime offset so the three jobs don't fire on the
# same tick and stack their API + DB load. The verification-detector
# default (900s) is also the source of truth for the health-check
# staleness grace window in app/api/health.py — single env var drives
# both, so an operator changing the cadence moves both.
"SCHEDULER_SESSION_COLLECTOR_INTERVAL": 10 * 60,
# Drives the verification session-processor cadence AND the
# health-check staleness grace window in app/api/health.py
# (single env var → both, so an operator changing the cadence moves
# both). Name retained post session-pipeline refactor for operator
# compatibility — existing docker-compose env files keep working.
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL": 15 * 60,
"SCHEDULER_USAGE_PROCESSOR_INTERVAL": 10 * 60,
"SCHEDULER_CORPORATE_MEMORY_INTERVAL": 17 * 60,
# BigQuery metadata refresh: walks remote registry rows and updates the
# persistent ``bq_metadata_cache``. Default 4 h — long enough that the
# cumulative BQ jobs API cost stays negligible on a typical 1050-table
# registry, short enough that operator-edited tables show real numbers
# within an analyst's working day. Hot reads of ``/api/v2/catalog`` go
# to DuckDB, never to BQ, so this can be tuned freely without touching
# request-path latency.
"SCHEDULER_BQ_METADATA_REFRESH_INTERVAL": 4 * 60 * 60,
}
def _read_positive_int(name: str) -> int:
"""Read an env var as a positive integer or fall back to the default.
Treats unset env (``None``) as "use default". Treats explicitly empty
string (``""``) as an operator typo and raises — silently defaulting
on a literal ``FOO=`` in the env_file would mask configuration bugs.
"""
raw = os.environ.get(name)
if raw is None:
if name not in _DEFAULTS:
raise ValueError(f"Unknown scheduler env var: {name}")
return _DEFAULTS[name]
if raw == "":
raise ValueError(f"{name}='' must be a positive integer (seconds)")
try:
value = int(raw)
except (TypeError, ValueError):
raise ValueError(f"{name}={raw!r} must be a positive integer (seconds)")
if value <= 0:
raise ValueError(f"{name}={value} must be > 0 (seconds)")
return value
def _seconds_to_schedule(seconds: int) -> str:
"""Convert a seconds value to the closest 'every Nm' / 'every Nh' string.
Uses ceiling division so a non-multiple-of-60 input never produces a
schedule that fires MORE often than the operator configured (90s →
'every 2m', not 'every 1m'). Sub-minute inputs clamp to 'every 1m'
because the schedule grammar has minute-level resolution.
"""
if seconds % 3600 == 0 and seconds >= 3600:
return f"every {seconds // 3600}h"
# Ceiling division: -(-x // y) is the standard trick.
minutes = max(1, -(-seconds // 60))
return f"every {minutes}m"
def resolved_tick_seconds() -> int:
"""Read + validate SCHEDULER_TICK_SECONDS in isolation (test helper)."""
return _read_positive_int("SCHEDULER_TICK_SECONDS")
def build_jobs() -> list[tuple[str, str, str, str, int]]:
"""Build the JOBS list from env, applying defaults and validation.
Tuple shape: (name, schedule_string, endpoint, method, http_timeout_sec).
Marketplaces stays hardcoded — promoting it to env is out of #77 scope.
"""
refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL")
health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL")
scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL")
sess = _read_positive_int("SCHEDULER_SESSION_COLLECTOR_INTERVAL")
verify = _read_positive_int("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL")
usage = _read_positive_int("SCHEDULER_USAGE_PROCESSOR_INTERVAL")
corpmem = _read_positive_int("SCHEDULER_CORPORATE_MEMORY_INTERVAL")
bqmeta = _read_positive_int("SCHEDULER_BQ_METADATA_REFRESH_INTERVAL")
tick = _read_positive_int("SCHEDULER_TICK_SECONDS")
smallest = min(refresh, health, scripts, sess, verify, usage, corpmem, bqmeta)
if tick > smallest:
raise ValueError(
f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job "
f"interval ({smallest}s) so jobs don't consistently miss their "
f"cadence by up to one tick"
)
return [
("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120),
("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30),
("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600),
("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900),
# LLM pipeline (#176, #179 review). Cadences are deliberately offset
# (10m / 15m / 17m by default — all coprime modulo the 30s tick) so
# the three LLM-driven jobs don't fire on the same tick and stack
# their API + DB load. Driven by env so an operator can throttle
# without a code change; the verification-detector cadence is the
# single source of truth for the health-check staleness grace
# window in app/api/health.py (which uses 2x the cadence).
("session-collector", _seconds_to_schedule(sess), "/api/admin/run-session-collector", "POST", 300),
# session-pipeline processors — independent loops, each invoked on
# its own cadence via the parametrized run-session-processor endpoint.
# Adding a third processor in the future is one line here + one entry
# in services/session_processors/__init__.py registry.
("session-processor:verification", _seconds_to_schedule(verify),
"/api/admin/run-session-processor?processor=verification", "POST", 900),
("session-processor:usage", _seconds_to_schedule(usage),
"/api/admin/run-session-processor?processor=usage", "POST", 300),
("corporate-memory", _seconds_to_schedule(corpmem), "/api/admin/run-corporate-memory", "POST", 900),
# v30: TTL purge of blocked-bundle bytes. Cheap (just rmtree
# + UPDATE), runs once daily at 04:00 UTC so the spike is
# visible in audit_log without competing with the marketplaces
# job at 03:00. Endpoint reads guardrails.blocked_bundle_ttl_days
# from instance.yaml and short-circuits when set to 0.
("store-blocked-purge", "daily 04:00", "/api/admin/run-blocked-purge", "POST", 600),
# Stuck-review reaper (#7). A submission stays at
# status='pending_llm' until the BackgroundTasks worker writes
# a verdict. If the worker crashes, the row sits forever. Run
# every 15 minutes; reap_stuck_llm_reviews flips rows older
# than guardrails.stuck_review_grace_seconds (default 1800)
# to review_error so admin can retry. Cheap (one indexed
# SELECT + N small UPDATEs); short timeout sufficient.
("store-reap-stuck-reviews", "every 15m", "/api/admin/run-reap-stuck-reviews", "POST", 60),
# BigQuery metadata refresh — keeps ``bq_metadata_cache`` warm so
# ``GET /api/v2/catalog`` never has to call BQ at request time.
# 30-min timeout is generous; on a 10-table dev registry the
# observed full refresh ran in ~7 min when two view-backed rows
# took 7 min each. Bounded concurrency
# (``AGNES_BQ_METADATA_REFRESH_CONCURRENCY``, default 4) caps the
# tail.
("bq-metadata-refresh", _seconds_to_schedule(bqmeta), "/api/admin/run-bq-metadata-refresh", "POST", 1800),
]
_running = True
def _signal_handler(sig, frame):
global _running
logger.info(f"Received signal {sig}, shutting down...")
_running = False
def _call_api(endpoint: str, method: str, timeout_sec: int) -> bool:
"""Call the main app API. Returns True on success."""
url = f"{API_URL}{endpoint}"
headers = {}
token = _get_auth_token()
if token:
headers["Authorization"] = f"Bearer {token}"
try:
if method == "POST":
resp = httpx.post(url, headers=headers, timeout=timeout_sec)
else:
resp = httpx.get(url, headers=headers, timeout=timeout_sec)
if resp.status_code < 400:
logger.info(f"Job {endpoint}: {resp.status_code}")
return True
else:
logger.warning(f"Job {endpoint}: HTTP {resp.status_code} - {resp.text[:200]}")
return False
except Exception as e:
logger.error(f"Job {endpoint} failed: {e}")
try:
from src.observability import get_posthog
get_posthog().capture_exception(
e,
distinct_id="system",
properties={"job": endpoint, "method": method, "component": "scheduler"},
)
except Exception:
logger.exception("PostHog capture_exception failed in scheduler")
return False
def run():
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
jobs = build_jobs()
tick = resolved_tick_seconds()
logger.info(
"Scheduler started. API_URL=%s, %d jobs, tick=%ds. Schedules: %s",
API_URL, len(jobs), tick,
{name: schedule for name, schedule, *_ in jobs},
)
last_run: dict[str, str | None] = {name: None for name, *_ in jobs}
# Per-tick concurrency: one thread per job slot, so a 900s verification
# run can't block the 60s health-check or the 30s data-refresh from
# firing on their own cadences (PR #232 review fix). Pure I/O workload
# (httpx) — GIL is irrelevant. `in_flight` prevents the same job being
# re-launched on a subsequent tick while the previous invocation is
# still running; otherwise a 10-min run during which 20 ticks fire
# would queue 20 duplicate POSTs against the same processor (the
# admin endpoint's per-processor lock would 409 most of them, but
# they'd still be wasted requests + audit-log noise).
in_flight: set[str] = set()
in_flight_lock = threading.Lock()
executor = ThreadPoolExecutor(max_workers=max(4, len(jobs)))
while _running:
now_iso = datetime.now(timezone.utc).isoformat()
for name, schedule, endpoint, method, timeout_sec in jobs:
if not is_table_due(schedule, last_run[name]):
continue
with in_flight_lock:
if name in in_flight:
# Previous tick's invocation hasn't returned yet; skip.
continue
in_flight.add(name)
logger.info("Running job: %s (%s)", name, schedule)
executor.submit(
_run_job, name, endpoint, method, timeout_sec, now_iso,
last_run, in_flight, in_flight_lock,
)
time.sleep(tick)
logger.info("Scheduler stopping; waiting for in-flight jobs.")
executor.shutdown(wait=True)
logger.info("Scheduler stopped.")
def _run_job(
name: str,
endpoint: str,
method: str,
timeout: int,
now_iso: str,
last_run: dict[str, "str | None"],
in_flight: set[str],
in_flight_lock: threading.Lock,
) -> None:
"""Execute one scheduled job + bookkeeping. Lifted out of run() so it's
unit-testable.
Advances last_run on terminal state (success OR failure) so a permanently
failing job retries on its cadence (e.g. 15 min), not on every scheduler
tick (default 30s). Pre-fix behavior caused a hot-loop on persistent 5xx —
30× more requests + LLM tokens than the operator configured. Errors still
surface via _call_api's logging + audit_log on the receiving side.
"""
try:
_call_api(endpoint, method, timeout)
finally:
last_run[name] = now_iso
with in_flight_lock:
in_flight.discard(name)
if __name__ == "__main__":
run()