diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py index 51a8cdc..a915dcb 100644 --- a/services/scheduler/__main__.py +++ b/services/scheduler/__main__.py @@ -73,6 +73,15 @@ _DEFAULTS = { "SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60, "SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60, "SCHEDULER_TICK_SECONDS": 30, + # LLM pipeline cadences (#176, #179 review). Defaults preserve the + # 10m / 15m / 17m coprime offset so the three jobs don't fire on the + # same tick and stack their API + DB load. The verification-detector + # default (900s) is also the source of truth for the health-check + # staleness grace window in app/api/health.py — single env var drives + # both, so an operator changing the cadence moves both. + "SCHEDULER_SESSION_COLLECTOR_INTERVAL": 10 * 60, + "SCHEDULER_VERIFICATION_DETECTOR_INTERVAL": 15 * 60, + "SCHEDULER_CORPORATE_MEMORY_INTERVAL": 17 * 60, } @@ -128,8 +137,11 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]: refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL") health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL") scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL") + sess = _read_positive_int("SCHEDULER_SESSION_COLLECTOR_INTERVAL") + verify = _read_positive_int("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL") + corpmem = _read_positive_int("SCHEDULER_CORPORATE_MEMORY_INTERVAL") tick = _read_positive_int("SCHEDULER_TICK_SECONDS") - smallest = min(refresh, health, scripts) + smallest = min(refresh, health, scripts, sess, verify, corpmem) if tick > smallest: raise ValueError( f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job " @@ -141,12 +153,16 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]: ("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30), ("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600), ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), - # LLM pipeline (#176). Cadences are deliberately offset (10m / 15m - # / 17m, all coprime modulo 30s tick) so the three LLM-driven jobs - # don't fire on the same tick and stack their API + DB load. - ("session-collector", "every 10m", "/api/admin/run-session-collector", "POST", 300), - ("verification-detector", "every 15m", "/api/admin/run-verification-detector", "POST", 900), - ("corporate-memory", "every 17m", "/api/admin/run-corporate-memory", "POST", 900), + # LLM pipeline (#176, #179 review). Cadences are deliberately offset + # (10m / 15m / 17m by default — all coprime modulo the 30s tick) so + # the three LLM-driven jobs don't fire on the same tick and stack + # their API + DB load. Driven by env so an operator can throttle + # without a code change; the verification-detector cadence is the + # single source of truth for the health-check staleness grace + # window in app/api/health.py (which uses 2x the cadence). + ("session-collector", _seconds_to_schedule(sess), "/api/admin/run-session-collector", "POST", 300), + ("verification-detector", _seconds_to_schedule(verify), "/api/admin/run-verification-detector", "POST", 900), + ("corporate-memory", _seconds_to_schedule(corpmem), "/api/admin/run-corporate-memory", "POST", 900), ] _running = True diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 7208f33..0bdf916 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -378,3 +378,109 @@ class TestParseTimestamp: def test_none_like_string_returns_none(self) -> None: assert _parse_timestamp("None") is None + + +# --------------------------------------------------------------------------- +# LLM pipeline cadence env vars (#179 review) +# +# Three jobs (session-collector, verification-detector, corporate-memory) and +# the health-check staleness grace must all derive from a single SCHEDULER_* +# env var per job, so an operator changing the cadence in one place moves +# both the schedule string and the grace window. The env var name was already +# read in app/api/health.py before this change but didn't actually drive the +# scheduler — this test pins the wired-up behavior. +# --------------------------------------------------------------------------- + + +_LLM_PIPELINE_ENV = ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + "SCHEDULER_SESSION_COLLECTOR_INTERVAL", + "SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", + "SCHEDULER_CORPORATE_MEMORY_INTERVAL", +) + + +def _clear_scheduler_env(monkeypatch) -> None: + for v in _LLM_PIPELINE_ENV: + monkeypatch.delenv(v, raising=False) + + +class TestLLMPipelineCadenceEnvVars: + """Three new env vars drive both the scheduler and the health grace window.""" + + def test_default_cadences_preserve_coprime_offset(self, monkeypatch) -> None: + """Defaults are 10m / 15m / 17m so the three jobs don't fire on the same tick.""" + _clear_scheduler_env(monkeypatch) + from services.scheduler.__main__ import build_jobs + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["session-collector"] == "every 10m" + assert jobs["verification-detector"] == "every 15m" + assert jobs["corporate-memory"] == "every 17m" + + def test_session_collector_env_override_changes_cadence(self, monkeypatch) -> None: + _clear_scheduler_env(monkeypatch) + monkeypatch.setenv("SCHEDULER_SESSION_COLLECTOR_INTERVAL", "300") # 5m + from services.scheduler.__main__ import build_jobs + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["session-collector"] == "every 5m" + # Other LLM jobs must be unaffected. + assert jobs["verification-detector"] == "every 15m" + assert jobs["corporate-memory"] == "every 17m" + + def test_verification_detector_env_override_changes_cadence(self, monkeypatch) -> None: + _clear_scheduler_env(monkeypatch) + monkeypatch.setenv("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", "600") # 10m + from services.scheduler.__main__ import build_jobs + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["verification-detector"] == "every 10m" + assert jobs["session-collector"] == "every 10m" + assert jobs["corporate-memory"] == "every 17m" + + def test_corporate_memory_env_override_changes_cadence(self, monkeypatch) -> None: + _clear_scheduler_env(monkeypatch) + monkeypatch.setenv("SCHEDULER_CORPORATE_MEMORY_INTERVAL", "1800") # 30m + from services.scheduler.__main__ import build_jobs + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["corporate-memory"] == "every 30m" + assert jobs["session-collector"] == "every 10m" + assert jobs["verification-detector"] == "every 15m" + + @pytest.mark.parametrize("var", [ + "SCHEDULER_SESSION_COLLECTOR_INTERVAL", + "SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", + "SCHEDULER_CORPORATE_MEMORY_INTERVAL", + ]) + @pytest.mark.parametrize("bad", ["0", "-5", "abc", ""]) + def test_invalid_llm_env_rejected(self, monkeypatch, var, bad) -> None: + _clear_scheduler_env(monkeypatch) + monkeypatch.setenv(var, bad) + from services.scheduler.__main__ import build_jobs + with pytest.raises(ValueError): + build_jobs() + + +class TestVerificationDetectorGraceFollowsCadence: + """The health-check grace window is 2x the cadence — same env var drives both.""" + + def test_grace_doubles_when_env_overrides_cadence(self, monkeypatch) -> None: + _clear_scheduler_env(monkeypatch) + monkeypatch.setenv("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", "600") # 10m + from app.api.health import _verification_detector_grace_seconds + from services.scheduler.__main__ import build_jobs + + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + # Cadence and grace MUST be derived from the same env var, so an + # operator who throttles the detector for any reason (rate-limit, + # cost, debugging) gets a proportionally wider staleness window + # automatically — no second knob to forget. + assert jobs["verification-detector"] == "every 10m" + assert _verification_detector_grace_seconds() == 2 * 600 + + def test_grace_uses_default_cadence_when_env_unset(self, monkeypatch) -> None: + _clear_scheduler_env(monkeypatch) + from app.api.health import _verification_detector_grace_seconds + # Default cadence 900s -> grace 1800s. + assert _verification_detector_grace_seconds() == 2 * 900