fix(scheduler): single env var drives cadence + grace (#179 review)

Devin NOTABLE: SCHEDULER_VERIFICATION_DETECTOR_INTERVAL was already
read by app/api/health.py to compute the staleness grace window, but
the actual scheduler cadence was hardcoded to 'every 15m'. The env
var name implied it controlled the cadence — it didn't. An operator
throttling the detector via the env was silently ignored by the
scheduler while the health grace silently widened.

Wired the env var into both ends. Same pattern applied to the other
two LLM-pipeline jobs:
- SCHEDULER_SESSION_COLLECTOR_INTERVAL     (default 600s = 10m)
- SCHEDULER_VERIFICATION_DETECTOR_INTERVAL (default 900s = 15m)
- SCHEDULER_CORPORATE_MEMORY_INTERVAL      (default 1020s = 17m)

Defaults preserve the existing 10m / 15m / 17m coprime offset so the
three jobs don't fire on the same tick.

build_jobs() now reads all three through _read_positive_int (matching
the existing pattern for data-refresh / health-check / script-runner)
and feeds them to _seconds_to_schedule. The smallest-interval check
includes the new variables so an operator can't accidentally set a
tick larger than any LLM cadence.

New tests in tests/test_scheduler.py:
- TestLLMPipelineCadenceEnvVars: env override changes the schedule
  string at scheduler-init time, with parametrized invalid-value
  rejection.
- TestVerificationDetectorGraceFollowsCadence: pinning the
  single-source-of-truth contract — same env var moves both the
  scheduler cadence and the health-check grace.
This commit is contained in:
ZdenekSrotyr 2026-05-05 05:59:18 +02:00
parent 9f33e24bf9
commit fa3a76a528
2 changed files with 129 additions and 7 deletions

View file

@ -73,6 +73,15 @@ _DEFAULTS = {
"SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60, "SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60,
"SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60, "SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60,
"SCHEDULER_TICK_SECONDS": 30, "SCHEDULER_TICK_SECONDS": 30,
# LLM pipeline cadences (#176, #179 review). Defaults preserve the
# 10m / 15m / 17m coprime offset so the three jobs don't fire on the
# same tick and stack their API + DB load. The verification-detector
# default (900s) is also the source of truth for the health-check
# staleness grace window in app/api/health.py — single env var drives
# both, so an operator changing the cadence moves both.
"SCHEDULER_SESSION_COLLECTOR_INTERVAL": 10 * 60,
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL": 15 * 60,
"SCHEDULER_CORPORATE_MEMORY_INTERVAL": 17 * 60,
} }
@ -128,8 +137,11 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]:
refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL") refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL")
health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL") health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL")
scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL") scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL")
sess = _read_positive_int("SCHEDULER_SESSION_COLLECTOR_INTERVAL")
verify = _read_positive_int("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL")
corpmem = _read_positive_int("SCHEDULER_CORPORATE_MEMORY_INTERVAL")
tick = _read_positive_int("SCHEDULER_TICK_SECONDS") tick = _read_positive_int("SCHEDULER_TICK_SECONDS")
smallest = min(refresh, health, scripts) smallest = min(refresh, health, scripts, sess, verify, corpmem)
if tick > smallest: if tick > smallest:
raise ValueError( raise ValueError(
f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job " f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job "
@ -141,12 +153,16 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]:
("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30), ("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30),
("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600), ("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600),
("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900),
# LLM pipeline (#176). Cadences are deliberately offset (10m / 15m # LLM pipeline (#176, #179 review). Cadences are deliberately offset
# / 17m, all coprime modulo 30s tick) so the three LLM-driven jobs # (10m / 15m / 17m by default — all coprime modulo the 30s tick) so
# don't fire on the same tick and stack their API + DB load. # the three LLM-driven jobs don't fire on the same tick and stack
("session-collector", "every 10m", "/api/admin/run-session-collector", "POST", 300), # their API + DB load. Driven by env so an operator can throttle
("verification-detector", "every 15m", "/api/admin/run-verification-detector", "POST", 900), # without a code change; the verification-detector cadence is the
("corporate-memory", "every 17m", "/api/admin/run-corporate-memory", "POST", 900), # single source of truth for the health-check staleness grace
# window in app/api/health.py (which uses 2x the cadence).
("session-collector", _seconds_to_schedule(sess), "/api/admin/run-session-collector", "POST", 300),
("verification-detector", _seconds_to_schedule(verify), "/api/admin/run-verification-detector", "POST", 900),
("corporate-memory", _seconds_to_schedule(corpmem), "/api/admin/run-corporate-memory", "POST", 900),
] ]
_running = True _running = True

View file

@ -378,3 +378,109 @@ class TestParseTimestamp:
def test_none_like_string_returns_none(self) -> None: def test_none_like_string_returns_none(self) -> None:
assert _parse_timestamp("None") is None assert _parse_timestamp("None") is None
# ---------------------------------------------------------------------------
# LLM pipeline cadence env vars (#179 review)
#
# Three jobs (session-collector, verification-detector, corporate-memory) and
# the health-check staleness grace must all derive from a single SCHEDULER_*
# env var per job, so an operator changing the cadence in one place moves
# both the schedule string and the grace window. The env var name was already
# read in app/api/health.py before this change but didn't actually drive the
# scheduler — this test pins the wired-up behavior.
# ---------------------------------------------------------------------------
_LLM_PIPELINE_ENV = (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
"SCHEDULER_SESSION_COLLECTOR_INTERVAL",
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL",
"SCHEDULER_CORPORATE_MEMORY_INTERVAL",
)
def _clear_scheduler_env(monkeypatch) -> None:
for v in _LLM_PIPELINE_ENV:
monkeypatch.delenv(v, raising=False)
class TestLLMPipelineCadenceEnvVars:
"""Three new env vars drive both the scheduler and the health grace window."""
def test_default_cadences_preserve_coprime_offset(self, monkeypatch) -> None:
"""Defaults are 10m / 15m / 17m so the three jobs don't fire on the same tick."""
_clear_scheduler_env(monkeypatch)
from services.scheduler.__main__ import build_jobs
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
assert jobs["session-collector"] == "every 10m"
assert jobs["verification-detector"] == "every 15m"
assert jobs["corporate-memory"] == "every 17m"
def test_session_collector_env_override_changes_cadence(self, monkeypatch) -> None:
_clear_scheduler_env(monkeypatch)
monkeypatch.setenv("SCHEDULER_SESSION_COLLECTOR_INTERVAL", "300") # 5m
from services.scheduler.__main__ import build_jobs
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
assert jobs["session-collector"] == "every 5m"
# Other LLM jobs must be unaffected.
assert jobs["verification-detector"] == "every 15m"
assert jobs["corporate-memory"] == "every 17m"
def test_verification_detector_env_override_changes_cadence(self, monkeypatch) -> None:
_clear_scheduler_env(monkeypatch)
monkeypatch.setenv("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", "600") # 10m
from services.scheduler.__main__ import build_jobs
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
assert jobs["verification-detector"] == "every 10m"
assert jobs["session-collector"] == "every 10m"
assert jobs["corporate-memory"] == "every 17m"
def test_corporate_memory_env_override_changes_cadence(self, monkeypatch) -> None:
_clear_scheduler_env(monkeypatch)
monkeypatch.setenv("SCHEDULER_CORPORATE_MEMORY_INTERVAL", "1800") # 30m
from services.scheduler.__main__ import build_jobs
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
assert jobs["corporate-memory"] == "every 30m"
assert jobs["session-collector"] == "every 10m"
assert jobs["verification-detector"] == "every 15m"
@pytest.mark.parametrize("var", [
"SCHEDULER_SESSION_COLLECTOR_INTERVAL",
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL",
"SCHEDULER_CORPORATE_MEMORY_INTERVAL",
])
@pytest.mark.parametrize("bad", ["0", "-5", "abc", ""])
def test_invalid_llm_env_rejected(self, monkeypatch, var, bad) -> None:
_clear_scheduler_env(monkeypatch)
monkeypatch.setenv(var, bad)
from services.scheduler.__main__ import build_jobs
with pytest.raises(ValueError):
build_jobs()
class TestVerificationDetectorGraceFollowsCadence:
"""The health-check grace window is 2x the cadence — same env var drives both."""
def test_grace_doubles_when_env_overrides_cadence(self, monkeypatch) -> None:
_clear_scheduler_env(monkeypatch)
monkeypatch.setenv("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", "600") # 10m
from app.api.health import _verification_detector_grace_seconds
from services.scheduler.__main__ import build_jobs
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
# Cadence and grace MUST be derived from the same env var, so an
# operator who throttles the detector for any reason (rate-limit,
# cost, debugging) gets a proportionally wider staleness window
# automatically — no second knob to forget.
assert jobs["verification-detector"] == "every 10m"
assert _verification_detector_grace_seconds() == 2 * 600
def test_grace_uses_default_cadence_when_env_unset(self, monkeypatch) -> None:
_clear_scheduler_env(monkeypatch)
from app.api.health import _verification_detector_grace_seconds
# Default cadence 900s -> grace 1800s.
assert _verification_detector_grace_seconds() == 2 * 900