agnes-the-ai-analyst/services/scheduler/__main__.py
ZdenekSrotyr b6cdd68e8d feat(catalog): entity_type + validated where_examples + view-aware cost-guard + scheduler hygiene
Three behavioural improvements driven by the sub-agent end-to-end test
findings, plus scheduler tweaks to prevent the post-deploy contention
burst we measured.

CATALOG (catalog-side bugs the test agents tripped on):
  - new entity_type field per remote row (BASE TABLE / VIEW /
    MATERIALIZED VIEW). For views, rows + size_bytes return null
    instead of the misleading 0 that __TABLES__ reports.
  - where_examples now validates against the table's actual schema
    (cached known_columns from refresh). The pre-fix behavior
    blindly advertised `country_code = 'CZ'` on tables with no
    country_code column — the sub-agent tests reliably hit this on
    unit_economics.
  - new known_columns + entity_type columns on bq_metadata_cache;
    populated by bq_metadata_refresh.refresh_one from the same
    fetch_bq_columns_full call (no extra BQ roundtrip) plus a
    cheap INFORMATION_SCHEMA.TABLES lookup for table_type.

QUERY COST-GUARD:
  - remote_scan_too_large suggestion now names views explicitly:
    `Target(s) <ids> are VIEW or MATERIALIZED VIEW. BigQuery does
    not push LIMIT into the view body — SELECT * FROM <view>
    LIMIT 1 still runs the full underlying scan.` Programmatic
    consumers get a new view_targets field on the error detail.

SCHEDULER HYGIENE (the post-deploy 1-minute window where
concurrent parquet downloads dropped to ~1 MB/s):
  - SCHEDULER_STARTUP_GRACE_SECONDS (default 60) holds the first
    tick so the burst doesn't overlap cache_warmup writes.
  - SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS (default 900)
    randomises bq-metadata-refresh's first-fire offset.

TESTS:
  - test_bq_metadata_cache_repo: entity_type + known_columns round-trip
  - test_v2_catalog_remote_metadata: where_examples validation, views
    return null rows/size_bytes, cold rows have empty examples
  - test_api_query_guardrail: VIEW-aware suggestion text + view_targets
  - test_connectors_bigquery_metadata: entity_type lookup mock + new
    fields in TableMetadata expectations
  - test_scheduler_sidecar: grace + jitter env-var resolution
2026-05-12 10:37:35 +02:00

428 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Scheduler service — replaces systemd timers.
Lightweight sidecar that fires scheduled jobs over HTTP against the main
app. Authenticates with ``SCHEDULER_API_TOKEN`` (shared-secret synthetic
admin — see ``app.auth.scheduler_token``); falls back to no-auth in
LOCAL_DEV_MODE.
Schedules are strings parsed by ``src.scheduler.is_table_due`` — accepts
"every 15m", "every 1h", "daily 03:00", "daily 07:00,13:00".
Why every job is HTTP and nothing runs in-process: the scheduler container
shares ``/data/state/system.duckdb`` with the app container, but DuckDB
permits only one writer per file across processes. An in-process call
from the scheduler raced the app's long-lived handle and 500-ed on
``Could not set lock on file``. Going through HTTP makes the app the sole
writer; the scheduler is reduced to a pure cron clock.
Usage: python -m services.scheduler
"""
import logging
import os
import signal
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
import httpx
from app.logging_config import setup_logging
from src.scheduler import is_table_due
setup_logging(__name__)
logger = logging.getLogger(__name__)
API_URL = os.environ.get("API_URL", "http://localhost:8000")
SCHEDULER_API_TOKEN = os.environ.get("SCHEDULER_API_TOKEN", "")
_token_warning_emitted = False
def _get_auth_token() -> str:
"""Return the bearer token for API calls.
Production: ``SCHEDULER_API_TOKEN`` is a shared secret generated by the
Terraform startup script and written to ``/opt/agnes/.env``. Both the
``app`` and ``scheduler`` containers source the same .env via Docker
Compose ``env_file:``, so the secret is symmetric. The app validates
incoming Bearer tokens against this env var (constant-time compare in
``app.auth.scheduler_token``) and resolves matches to a synthetic
``scheduler@system.local`` user that is a member of the Admin group.
Dev / LOCAL_DEV_MODE: leave it unset. The scheduler returns the empty
string and calls the API without an ``Authorization`` header — the
API's dev-bypass auto-authenticates the request as the dev user.
"""
global _token_warning_emitted
if SCHEDULER_API_TOKEN:
return SCHEDULER_API_TOKEN
if not _token_warning_emitted:
logger.warning(
"SCHEDULER_API_TOKEN is not set — calling the API without "
"Authorization. Required in production; in LOCAL_DEV_MODE "
"the dev-bypass auto-authenticates and this is fine."
)
_token_warning_emitted = True
return ""
# --- Env parsing ------------------------------------------------------------
_DEFAULTS = {
"SCHEDULER_DATA_REFRESH_INTERVAL": 15 * 60, # seconds
"SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60,
"SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60,
"SCHEDULER_TICK_SECONDS": 30,
# LLM pipeline cadences (#176, #179 review). Defaults preserve the
# 10m / 15m / 17m coprime offset so the three jobs don't fire on the
# same tick and stack their API + DB load. The verification-detector
# default (900s) is also the source of truth for the health-check
# staleness grace window in app/api/health.py — single env var drives
# both, so an operator changing the cadence moves both.
"SCHEDULER_SESSION_COLLECTOR_INTERVAL": 10 * 60,
# Drives the verification session-processor cadence AND the
# health-check staleness grace window in app/api/health.py
# (single env var → both, so an operator changing the cadence moves
# both). Name retained post session-pipeline refactor for operator
# compatibility — existing docker-compose env files keep working.
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL": 15 * 60,
"SCHEDULER_USAGE_PROCESSOR_INTERVAL": 10 * 60,
"SCHEDULER_CORPORATE_MEMORY_INTERVAL": 17 * 60,
# BigQuery metadata refresh: walks remote registry rows and updates the
# persistent ``bq_metadata_cache``. Default 4 h — long enough that the
# cumulative BQ jobs API cost stays negligible on a typical 1050-table
# registry, short enough that operator-edited tables show real numbers
# within an analyst's working day. Hot reads of ``/api/v2/catalog`` go
# to DuckDB, never to BQ, so this can be tuned freely without touching
# request-path latency.
"SCHEDULER_BQ_METADATA_REFRESH_INTERVAL": 4 * 60 * 60,
# Pause between scheduler startup and the first tick. Keeps the
# scheduler from synchronising its "Table never synced, marking as
# due" burst with the app's own startup cache_warmup (which writes
# heavily to the system DB for several seconds). Set to 0 to disable
# — useful in tests that need deterministic-fast first-tick.
"SCHEDULER_STARTUP_GRACE_SECONDS": 60,
}
def _read_positive_int(name: str) -> int:
"""Read an env var as a positive integer or fall back to the default.
Treats unset env (``None``) as "use default". Treats explicitly empty
string (``""``) as an operator typo and raises — silently defaulting
on a literal ``FOO=`` in the env_file would mask configuration bugs.
"""
raw = os.environ.get(name)
if raw is None:
if name not in _DEFAULTS:
raise ValueError(f"Unknown scheduler env var: {name}")
return _DEFAULTS[name]
if raw == "":
raise ValueError(f"{name}='' must be a positive integer (seconds)")
try:
value = int(raw)
except (TypeError, ValueError):
raise ValueError(f"{name}={raw!r} must be a positive integer (seconds)")
if value <= 0:
raise ValueError(f"{name}={value} must be > 0 (seconds)")
return value
def _read_non_negative_int(name: str) -> int:
"""Like ``_read_positive_int`` but also accepts ``0`` as a valid value.
Used for knobs where ``0`` means "disable" rather than "operator typo"
— e.g. ``SCHEDULER_STARTUP_GRACE_SECONDS=0`` legitimately skips the
startup pause in unit tests / fast-iteration dev setups.
"""
raw = os.environ.get(name)
if raw is None:
if name not in _DEFAULTS:
raise ValueError(f"Unknown scheduler env var: {name}")
return _DEFAULTS[name]
if raw == "":
raise ValueError(f"{name}='' must be a non-negative integer (seconds)")
try:
value = int(raw)
except (TypeError, ValueError):
raise ValueError(f"{name}={raw!r} must be a non-negative integer (seconds)")
if value < 0:
raise ValueError(f"{name}={value} must be >= 0 (seconds)")
return value
def _seconds_to_schedule(seconds: int) -> str:
"""Convert a seconds value to the closest 'every Nm' / 'every Nh' string.
Uses ceiling division so a non-multiple-of-60 input never produces a
schedule that fires MORE often than the operator configured (90s →
'every 2m', not 'every 1m'). Sub-minute inputs clamp to 'every 1m'
because the schedule grammar has minute-level resolution.
"""
if seconds % 3600 == 0 and seconds >= 3600:
return f"every {seconds // 3600}h"
# Ceiling division: -(-x // y) is the standard trick.
minutes = max(1, -(-seconds // 60))
return f"every {minutes}m"
def resolved_tick_seconds() -> int:
"""Read + validate SCHEDULER_TICK_SECONDS in isolation (test helper)."""
return _read_positive_int("SCHEDULER_TICK_SECONDS")
def resolved_startup_grace_seconds() -> int:
"""Read SCHEDULER_STARTUP_GRACE_SECONDS (default 60).
Sleep duration between scheduler startup and the first tick. Mitigates
the "post-deploy contention burst" where the scheduler's "everything
is due" first tick (5+ paralle HTTP POSTs against the just-restarted
app) overlaps the app's own startup ``cache_warmup`` job, doubling
disk I/O on the host's boot disk and dropping concurrent parquet
downloads from ~3 MB/s to ~1 MB/s for the duration of the burst.
"""
return _read_non_negative_int("SCHEDULER_STARTUP_GRACE_SECONDS")
def resolved_bq_metadata_initial_offset_seconds(rng=None) -> int:
"""Random startup-jitter for ``bq-metadata-refresh``.
Returns a value in ``[0, BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS]``
that the run loop uses to fake a recent ``last_run`` for the
``bq-metadata-refresh`` job at startup. With ``last_run = now - jitter``
and the default 4 h interval, the first refresh fires
``interval - jitter`` seconds after the startup grace finishes
(≈ 3 h 45 m to 4 h). This intentionally suppresses an immediate
refresh on every container start — the app's own ``cache_warmup``
already populates the persistent cache at startup, so a duplicate
refresh from the scheduler would just compete for disk I/O while
adding nothing.
``rng`` injectable for deterministic tests.
"""
import random as _random
cap = int(os.environ.get(
"SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS", "900",
))
if cap <= 0:
return 0
r = rng or _random.Random()
return r.randint(0, cap)
def build_jobs() -> list[tuple[str, str, str, str, int]]:
"""Build the JOBS list from env, applying defaults and validation.
Tuple shape: (name, schedule_string, endpoint, method, http_timeout_sec).
Marketplaces stays hardcoded — promoting it to env is out of #77 scope.
"""
refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL")
health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL")
scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL")
sess = _read_positive_int("SCHEDULER_SESSION_COLLECTOR_INTERVAL")
verify = _read_positive_int("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL")
usage = _read_positive_int("SCHEDULER_USAGE_PROCESSOR_INTERVAL")
corpmem = _read_positive_int("SCHEDULER_CORPORATE_MEMORY_INTERVAL")
bqmeta = _read_positive_int("SCHEDULER_BQ_METADATA_REFRESH_INTERVAL")
tick = _read_positive_int("SCHEDULER_TICK_SECONDS")
smallest = min(refresh, health, scripts, sess, verify, usage, corpmem, bqmeta)
if tick > smallest:
raise ValueError(
f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job "
f"interval ({smallest}s) so jobs don't consistently miss their "
f"cadence by up to one tick"
)
return [
("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120),
("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30),
("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600),
("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900),
# LLM pipeline (#176, #179 review). Cadences are deliberately offset
# (10m / 15m / 17m by default — all coprime modulo the 30s tick) so
# the three LLM-driven jobs don't fire on the same tick and stack
# their API + DB load. Driven by env so an operator can throttle
# without a code change; the verification-detector cadence is the
# single source of truth for the health-check staleness grace
# window in app/api/health.py (which uses 2x the cadence).
("session-collector", _seconds_to_schedule(sess), "/api/admin/run-session-collector", "POST", 300),
# session-pipeline processors — independent loops, each invoked on
# its own cadence via the parametrized run-session-processor endpoint.
# Adding a third processor in the future is one line here + one entry
# in services/session_processors/__init__.py registry.
("session-processor:verification", _seconds_to_schedule(verify),
"/api/admin/run-session-processor?processor=verification", "POST", 900),
("session-processor:usage", _seconds_to_schedule(usage),
"/api/admin/run-session-processor?processor=usage", "POST", 300),
("corporate-memory", _seconds_to_schedule(corpmem), "/api/admin/run-corporate-memory", "POST", 900),
# v30: TTL purge of blocked-bundle bytes. Cheap (just rmtree
# + UPDATE), runs once daily at 04:00 UTC so the spike is
# visible in audit_log without competing with the marketplaces
# job at 03:00. Endpoint reads guardrails.blocked_bundle_ttl_days
# from instance.yaml and short-circuits when set to 0.
("store-blocked-purge", "daily 04:00", "/api/admin/run-blocked-purge", "POST", 600),
# Stuck-review reaper (#7). A submission stays at
# status='pending_llm' until the BackgroundTasks worker writes
# a verdict. If the worker crashes, the row sits forever. Run
# every 15 minutes; reap_stuck_llm_reviews flips rows older
# than guardrails.stuck_review_grace_seconds (default 1800)
# to review_error so admin can retry. Cheap (one indexed
# SELECT + N small UPDATEs); short timeout sufficient.
("store-reap-stuck-reviews", "every 15m", "/api/admin/run-reap-stuck-reviews", "POST", 60),
# BigQuery metadata refresh — keeps ``bq_metadata_cache`` warm so
# ``GET /api/v2/catalog`` never has to call BQ at request time.
# 30-min timeout is generous; on a 10-table dev registry the
# observed full refresh ran in ~7 min when two view-backed rows
# took 7 min each. Bounded concurrency
# (``AGNES_BQ_METADATA_REFRESH_CONCURRENCY``, default 4) caps the
# tail.
("bq-metadata-refresh", _seconds_to_schedule(bqmeta), "/api/admin/run-bq-metadata-refresh", "POST", 1800),
]
_running = True
def _signal_handler(sig, frame):
global _running
logger.info(f"Received signal {sig}, shutting down...")
_running = False
def _call_api(endpoint: str, method: str, timeout_sec: int) -> bool:
"""Call the main app API. Returns True on success."""
url = f"{API_URL}{endpoint}"
headers = {}
token = _get_auth_token()
if token:
headers["Authorization"] = f"Bearer {token}"
try:
if method == "POST":
resp = httpx.post(url, headers=headers, timeout=timeout_sec)
else:
resp = httpx.get(url, headers=headers, timeout=timeout_sec)
if resp.status_code < 400:
logger.info(f"Job {endpoint}: {resp.status_code}")
return True
else:
logger.warning(f"Job {endpoint}: HTTP {resp.status_code} - {resp.text[:200]}")
return False
except Exception as e:
logger.error(f"Job {endpoint} failed: {e}")
try:
from src.observability import get_posthog
get_posthog().capture_exception(
e,
distinct_id="system",
properties={"job": endpoint, "method": method, "component": "scheduler"},
)
except Exception:
logger.exception("PostHog capture_exception failed in scheduler")
return False
def run():
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
jobs = build_jobs()
tick = resolved_tick_seconds()
grace = resolved_startup_grace_seconds()
bqmeta_offset = resolved_bq_metadata_initial_offset_seconds()
logger.info(
"Scheduler started. API_URL=%s, %d jobs, tick=%ds, "
"startup_grace=%ds, bq_metadata_initial_offset=%ds. Schedules: %s",
API_URL, len(jobs), tick, grace, bqmeta_offset,
{name: schedule for name, schedule, *_ in jobs},
)
# Startup grace — see ``resolved_startup_grace_seconds`` for why.
# Honors SIGTERM by polling _running in short slices, so an operator
# `docker compose stop` during grace doesn't hang for ~60s.
grace_remaining = grace
while grace_remaining > 0 and _running:
time.sleep(min(grace_remaining, 5))
grace_remaining -= 5
if not _running:
logger.info("Scheduler shutdown during startup grace; exiting.")
return
last_run: dict[str, str | None] = {name: None for name, *_ in jobs}
# Suppress the first ``bq-metadata-refresh`` fire by pretending the
# job ran ``bqmeta_offset`` seconds ago at startup. ``is_table_due``
# will then wait the remainder of the configured interval before
# firing for the first time. Two scheduler containers that came up
# within seconds of each other will pick different offsets and stop
# synchronising their refresh ticks against one another.
if bqmeta_offset > 0:
from datetime import timedelta as _td
offset_ago = (
datetime.now(timezone.utc) - _td(seconds=bqmeta_offset)
).isoformat()
last_run["bq-metadata-refresh"] = offset_ago
# Per-tick concurrency: one thread per job slot, so a 900s verification
# run can't block the 60s health-check or the 30s data-refresh from
# firing on their own cadences (PR #232 review fix). Pure I/O workload
# (httpx) — GIL is irrelevant. `in_flight` prevents the same job being
# re-launched on a subsequent tick while the previous invocation is
# still running; otherwise a 10-min run during which 20 ticks fire
# would queue 20 duplicate POSTs against the same processor (the
# admin endpoint's per-processor lock would 409 most of them, but
# they'd still be wasted requests + audit-log noise).
in_flight: set[str] = set()
in_flight_lock = threading.Lock()
executor = ThreadPoolExecutor(max_workers=max(4, len(jobs)))
while _running:
now_iso = datetime.now(timezone.utc).isoformat()
for name, schedule, endpoint, method, timeout_sec in jobs:
if not is_table_due(schedule, last_run[name]):
continue
with in_flight_lock:
if name in in_flight:
# Previous tick's invocation hasn't returned yet; skip.
continue
in_flight.add(name)
logger.info("Running job: %s (%s)", name, schedule)
executor.submit(
_run_job, name, endpoint, method, timeout_sec, now_iso,
last_run, in_flight, in_flight_lock,
)
time.sleep(tick)
logger.info("Scheduler stopping; waiting for in-flight jobs.")
executor.shutdown(wait=True)
logger.info("Scheduler stopped.")
def _run_job(
name: str,
endpoint: str,
method: str,
timeout: int,
now_iso: str,
last_run: dict[str, "str | None"],
in_flight: set[str],
in_flight_lock: threading.Lock,
) -> None:
"""Execute one scheduled job + bookkeeping. Lifted out of run() so it's
unit-testable.
Advances last_run on terminal state (success OR failure) so a permanently
failing job retries on its cadence (e.g. 15 min), not on every scheduler
tick (default 30s). Pre-fix behavior caused a hot-loop on persistent 5xx —
30× more requests + LLM tokens than the operator configured. Errors still
surface via _call_api's logging + audit_log on the receiving side.
"""
try:
_call_api(endpoint, method, timeout)
finally:
last_run[name] = now_iso
with in_flight_lock:
in_flight.discard(name)
if __name__ == "__main__":
run()