* Extract session pipeline framework, refactor verification, add UsageProcessor skeleton Pluggable framework under services/session_pipeline/ (contract + lib + per-processor runner) so multiple processors can read /data/user_sessions/<key>/*.jsonl on their own cadence with full failure isolation. Verification flow becomes the first plugin; a no-op UsageProcessor reserves the second slot pending a separate brainstorm on extraction logic + storage shape. Schema v28→v29: rename session_extraction_state → session_processor_state with composite PK (processor_name, session_file). Existing rows copied over with processor_name='verification'; legacy table dropped. Migration is idempotent and no-ops the copy step on fresh installs that came up at the new schema. Endpoint: /api/admin/run-verification-detector replaced by parametrized /api/admin/run-session-processor?processor=<name>. Audit action format follows. Scheduler JOBS: verification-detector entry split into session-processor:verification + session-processor:usage. SCHEDULER_VERIFICATION_DETECTOR_INTERVAL retained for operator compatibility (drives both cadence and health-check grace window); SCHEDULER_USAGE_PROCESSOR_INTERVAL added. * Address PR #232 review: scan dead branch + per-processor lock - `SessionProcessorStateRepository.scan_unprocessed_for` dead else: both branches surfaced every jsonl, the SELECT was unused, runner MD5-rehashed every stable session per tick. Replaced with an mtime precheck — stable sessions (mtime <= processed_at) are filtered at scan; modified files still surface for the runner's authoritative `file_hash` invalidation. Naive-local comparison matches the existing health-check idiom (DuckDB TIMESTAMP strips tz on storage). - Per-processor advisory lock around `_run_processor` in `/api/admin/run-session-processor`. Scheduler tick + manual admin POST could otherwise both run, both call create_evidence on overlapping detections, and accumulate duplicate verification_evidence rows (the dedup short-circuit only covers create+contradiction, not evidence per ADR Decision 3). Non-blocking acquire → 409 Conflict on concurrent invocation; release in finally so a runner exception doesn't wedge the processor. Tests: two new scan unit tests (mtime filter + post-mark mtime bump), 409 endpoint test, lock-released-on-exception test. Two existing tests updated for the new "filtered at scan" stat shape (previously asserted skipped == 1, now scanned == 0). * Address PR #232 review #2: parallel scheduler tick + last_run on terminal state Two pre-existing scaffold bugs in services/scheduler/__main__.py amplified by adding more session-pipeline jobs: 1. Serial for-loop over jobs with synchronous httpx.post(timeout=900) — a 10-minute verification run blocked every other job (data-refresh, health-check, usage, corporate-memory) for the whole window. The PR's stated isolation guarantee held inside the runner but broke at the scheduler dispatch layer. 2. last_run advanced only when _call_api returned True. Permanent-failure jobs hot-looped on every tick (30s) instead of cadence (15min). Fix: ThreadPoolExecutor.submit per due job + per-job in_flight set so a long-running job can't be re-launched on subsequent ticks. last_run advances unconditionally in finally; errors still surface via _call_api logging + audit_log on the receiving side. _run_job extracted to module-level for unit testing. New tests: - TestRunJobBookkeeping: advances on success / failure / unhandled raise - TestRunLoopParallelism: in_flight protection prevents duplicate launches across ticks for a single slow job --------- Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
615 lines
27 KiB
Python
615 lines
27 KiB
Python
"""Tests for src.scheduler - schedule parsing and sync-due evaluation."""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
import pytest
|
|
|
|
from src.scheduler import (
|
|
_is_daily_due,
|
|
_parse_daily_times,
|
|
_parse_timestamp,
|
|
is_table_due,
|
|
parse_interval_minutes,
|
|
)
|
|
|
|
# Fixed reference time: 2026-03-15 12:00:00 UTC
|
|
NOW = datetime(2026, 3, 15, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# parse_interval_minutes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParseIntervalMinutes:
|
|
"""Tests for parse_interval_minutes()."""
|
|
|
|
def test_minutes_basic(self) -> None:
|
|
assert parse_interval_minutes("every 15m") == 15
|
|
|
|
def test_minutes_single_digit(self) -> None:
|
|
assert parse_interval_minutes("every 5m") == 5
|
|
|
|
def test_minutes_large(self) -> None:
|
|
assert parse_interval_minutes("every 120m") == 120
|
|
|
|
def test_hours_basic(self) -> None:
|
|
assert parse_interval_minutes("every 2h") == 120
|
|
|
|
def test_hours_single(self) -> None:
|
|
assert parse_interval_minutes("every 1h") == 60
|
|
|
|
def test_hours_large(self) -> None:
|
|
assert parse_interval_minutes("every 24h") == 1440
|
|
|
|
def test_daily_returns_none(self) -> None:
|
|
assert parse_interval_minutes("daily 05:00") is None
|
|
|
|
def test_invalid_format_returns_none(self) -> None:
|
|
assert parse_interval_minutes("not a schedule") is None
|
|
|
|
def test_empty_string_returns_none(self) -> None:
|
|
assert parse_interval_minutes("") is None
|
|
|
|
def test_missing_unit_returns_none(self) -> None:
|
|
assert parse_interval_minutes("every 15") is None
|
|
|
|
def test_wrong_unit_returns_none(self) -> None:
|
|
assert parse_interval_minutes("every 15s") is None
|
|
|
|
def test_no_space_returns_none(self) -> None:
|
|
assert parse_interval_minutes("every15m") is None
|
|
|
|
def test_extra_whitespace_returns_none(self) -> None:
|
|
# Strict parsing: extra whitespace is rejected
|
|
assert parse_interval_minutes("every 15m") is None
|
|
|
|
def test_negative_not_matched(self) -> None:
|
|
# Regex uses \d+ so negative sign won't match
|
|
assert parse_interval_minutes("every -5m") is None
|
|
|
|
def test_zero_minutes(self) -> None:
|
|
# "every 0m" matches the pattern, returns 0
|
|
assert parse_interval_minutes("every 0m") == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_table_due - interval schedules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsTableDueInterval:
|
|
"""Tests for is_table_due() with interval-based schedules."""
|
|
|
|
def test_never_synced_is_due(self) -> None:
|
|
assert is_table_due("every 15m", last_sync_iso=None, now=NOW) is True
|
|
|
|
def test_empty_last_sync_is_due(self) -> None:
|
|
assert is_table_due("every 15m", last_sync_iso="", now=NOW) is True
|
|
|
|
def test_every_0m_is_always_due(self) -> None:
|
|
# ``every 0m`` opts out of rate limiting — used to force-resync
|
|
# a row whose previous attempt errored without recording
|
|
# last_sync. Even a sync seconds ago must come back as due.
|
|
last_sync = (NOW - timedelta(seconds=5)).isoformat()
|
|
assert is_table_due("every 0m", last_sync_iso=last_sync, now=NOW) is True
|
|
assert is_table_due("every 0m", last_sync_iso=None, now=NOW) is True
|
|
|
|
def test_synced_10min_ago_every_15m_not_due(self) -> None:
|
|
last_sync = (NOW - timedelta(minutes=10)).isoformat()
|
|
assert is_table_due("every 15m", last_sync_iso=last_sync, now=NOW) is False
|
|
|
|
def test_synced_20min_ago_every_15m_is_due(self) -> None:
|
|
last_sync = (NOW - timedelta(minutes=20)).isoformat()
|
|
assert is_table_due("every 15m", last_sync_iso=last_sync, now=NOW) is True
|
|
|
|
def test_synced_exactly_15min_ago_every_15m_is_due(self) -> None:
|
|
last_sync = (NOW - timedelta(minutes=15)).isoformat()
|
|
assert is_table_due("every 15m", last_sync_iso=last_sync, now=NOW) is True
|
|
|
|
def test_synced_30min_ago_every_1h_not_due(self) -> None:
|
|
last_sync = (NOW - timedelta(minutes=30)).isoformat()
|
|
assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is False
|
|
|
|
def test_synced_90min_ago_every_1h_is_due(self) -> None:
|
|
last_sync = (NOW - timedelta(minutes=90)).isoformat()
|
|
assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is True
|
|
|
|
def test_synced_exactly_1h_ago_every_1h_is_due(self) -> None:
|
|
last_sync = (NOW - timedelta(hours=1)).isoformat()
|
|
assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is True
|
|
|
|
def test_synced_59min_ago_every_1h_not_due(self) -> None:
|
|
last_sync = (NOW - timedelta(minutes=59)).isoformat()
|
|
assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is False
|
|
|
|
def test_synced_3h_ago_every_2h_is_due(self) -> None:
|
|
last_sync = (NOW - timedelta(hours=3)).isoformat()
|
|
assert is_table_due("every 2h", last_sync_iso=last_sync, now=NOW) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_table_due - daily schedules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsTableDueDaily:
|
|
"""Tests for is_table_due() with daily schedules."""
|
|
|
|
def test_before_target_time_not_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 4, 30, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 14, 6, 0, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is False
|
|
|
|
def test_past_target_not_synced_today_is_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is True
|
|
|
|
def test_past_target_already_synced_after_target_not_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 5, 15, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is False
|
|
|
|
def test_evening_schedule_past_target_last_sync_yesterday_is_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 18, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 14, 17, 30, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 17:00", last_sync_iso=last_sync, now=now) is True
|
|
|
|
def test_daily_never_synced_is_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc)
|
|
assert is_table_due("daily 05:00", last_sync_iso=None, now=now) is True
|
|
|
|
def test_daily_never_synced_before_target_still_due(self) -> None:
|
|
# Never synced always returns True regardless of target time
|
|
now = datetime(2026, 3, 15, 3, 0, 0, tzinfo=timezone.utc)
|
|
assert is_table_due("daily 05:00", last_sync_iso=None, now=now) is True
|
|
|
|
def test_daily_exactly_at_target_time_is_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 5, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 14, 5, 0, 0, tzinfo=timezone.utc).isoformat()
|
|
# now == today_target, so now < today_target is False
|
|
# last_sync (yesterday) < today_target => due
|
|
assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is True
|
|
|
|
def test_daily_synced_at_exactly_target_not_due_again(self) -> None:
|
|
now = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 5, 0, 0, tzinfo=timezone.utc).isoformat()
|
|
# last_sync == today_target => last_sync >= today_target => not due
|
|
assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is False
|
|
|
|
def test_midnight_schedule(self) -> None:
|
|
now = datetime(2026, 3, 15, 0, 30, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 14, 0, 15, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 00:00", last_sync_iso=last_sync, now=now) is True
|
|
|
|
def test_end_of_day_schedule(self) -> None:
|
|
now = datetime(2026, 3, 15, 23, 59, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 14, 23, 50, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 23:30", last_sync_iso=last_sync, now=now) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# is_table_due - edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsTableDueEdgeCases:
|
|
"""Edge case tests for is_table_due()."""
|
|
|
|
def test_unparseable_last_sync_returns_true(self) -> None:
|
|
# Fail-safe: if we can't parse last_sync, assume sync is needed
|
|
assert is_table_due("every 15m", last_sync_iso="garbage", now=NOW) is True
|
|
|
|
def test_unknown_schedule_format_returns_false(self) -> None:
|
|
last_sync = (NOW - timedelta(hours=2)).isoformat()
|
|
assert is_table_due("weekly", last_sync_iso=last_sync, now=NOW) is False
|
|
|
|
def test_unknown_schedule_never_synced_returns_true(self) -> None:
|
|
# Never synced takes priority over unknown schedule
|
|
assert is_table_due("weekly", last_sync_iso=None, now=NOW) is True
|
|
|
|
def test_now_defaults_to_current_time(self) -> None:
|
|
# When now is not provided, it defaults to current UTC time
|
|
# A table that was never synced should be due regardless
|
|
assert is_table_due("every 15m", last_sync_iso=None) is True
|
|
|
|
def test_naive_last_sync_treated_as_utc(self) -> None:
|
|
# Naive timestamp (no timezone) should be treated as UTC
|
|
naive_ts = "2026-03-15T11:50:00"
|
|
# 10 minutes ago from NOW (12:00), with 15m interval -> not due
|
|
assert is_table_due("every 15m", last_sync_iso=naive_ts, now=NOW) is False
|
|
|
|
def test_last_sync_in_future_not_due(self) -> None:
|
|
# Edge case: last_sync in the future (clock skew, etc.)
|
|
future = (NOW + timedelta(hours=1)).isoformat()
|
|
assert is_table_due("every 15m", last_sync_iso=future, now=NOW) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _is_daily_due (internal function, direct tests)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsDailyDue:
|
|
"""Direct tests for _is_daily_due() internal function."""
|
|
|
|
def test_before_target_not_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 14, 5, 30, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(5, 0)]) is False
|
|
|
|
def test_after_target_last_sync_before_target_is_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(5, 0)]) is True
|
|
|
|
def test_after_target_last_sync_after_target_not_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(5, 0)]) is False
|
|
|
|
def test_target_with_minutes(self) -> None:
|
|
now = datetime(2026, 3, 15, 17, 45, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(17, 30)]) is True
|
|
|
|
def test_target_with_minutes_not_yet(self) -> None:
|
|
now = datetime(2026, 3, 15, 17, 15, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(17, 30)]) is False
|
|
|
|
|
|
class TestMultipleDailyTimes:
|
|
"""Tests for multiple daily schedule times."""
|
|
|
|
def test_multi_time_first_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 8, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 14, 19, 0, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is True
|
|
|
|
def test_multi_time_second_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 14, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is True
|
|
|
|
def test_multi_time_third_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 19, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 13, 30, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is True
|
|
|
|
def test_multi_time_between_slots_not_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is False
|
|
|
|
def test_multi_time_all_done_not_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 20, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 18, 30, 0, tzinfo=timezone.utc)
|
|
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is False
|
|
|
|
def test_is_table_due_multi_time_format(self) -> None:
|
|
now = datetime(2026, 3, 15, 14, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 07:00,13:00,18:00", last_sync_iso=last_sync, now=now) is True
|
|
|
|
def test_is_table_due_multi_time_not_due(self) -> None:
|
|
now = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
|
|
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc).isoformat()
|
|
assert is_table_due("daily 07:00,13:00,18:00", last_sync_iso=last_sync, now=now) is False
|
|
|
|
|
|
class TestParseDailyTimes:
|
|
"""Tests for _parse_daily_times()."""
|
|
|
|
def test_single_time(self) -> None:
|
|
assert _parse_daily_times("05:00") == [(5, 0)]
|
|
|
|
def test_multiple_times(self) -> None:
|
|
assert _parse_daily_times("07:00,13:00,18:00") == [(7, 0), (13, 0), (18, 0)]
|
|
|
|
def test_invalid_format(self) -> None:
|
|
assert _parse_daily_times("7:00") == []
|
|
|
|
def test_invalid_hour(self) -> None:
|
|
assert _parse_daily_times("25:00") == []
|
|
|
|
def test_invalid_minute(self) -> None:
|
|
assert _parse_daily_times("12:60") == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _parse_timestamp
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParseTimestamp:
|
|
"""Tests for _parse_timestamp() internal function."""
|
|
|
|
def test_iso_with_timezone(self) -> None:
|
|
result = _parse_timestamp("2026-03-15T12:00:00+00:00")
|
|
assert result is not None
|
|
assert result.year == 2026
|
|
assert result.month == 3
|
|
assert result.day == 15
|
|
assert result.hour == 12
|
|
|
|
def test_iso_with_z_suffix(self) -> None:
|
|
# Python 3.11+ fromisoformat handles Z
|
|
result = _parse_timestamp("2026-03-15T12:00:00Z")
|
|
assert result is not None
|
|
assert result.hour == 12
|
|
|
|
def test_iso_without_timezone(self) -> None:
|
|
result = _parse_timestamp("2026-03-15T12:00:00")
|
|
assert result is not None
|
|
assert result.hour == 12
|
|
assert result.tzinfo is None
|
|
|
|
def test_iso_with_microseconds(self) -> None:
|
|
result = _parse_timestamp("2026-03-15T12:00:00.123456")
|
|
assert result is not None
|
|
assert result.microsecond == 123456
|
|
|
|
def test_space_separated(self) -> None:
|
|
result = _parse_timestamp("2026-03-15 12:00:00")
|
|
assert result is not None
|
|
assert result.hour == 12
|
|
|
|
def test_invalid_string_returns_none(self) -> None:
|
|
assert _parse_timestamp("not-a-date") is None
|
|
|
|
def test_empty_string_returns_none(self) -> None:
|
|
assert _parse_timestamp("") is None
|
|
|
|
def test_partial_date_returns_none(self) -> None:
|
|
# "2026-03-15" alone - fromisoformat handles date-only in 3.11+
|
|
result = _parse_timestamp("2026-03-15")
|
|
# Should parse as a date (with hour=0, minute=0)
|
|
assert result is not None
|
|
assert result.hour == 0
|
|
|
|
def test_iso_with_positive_offset(self) -> None:
|
|
result = _parse_timestamp("2026-03-15T12:00:00+05:30")
|
|
assert result is not None
|
|
assert result.hour == 12
|
|
assert result.utcoffset() is not None
|
|
|
|
def test_iso_with_negative_offset(self) -> None:
|
|
result = _parse_timestamp("2026-03-15T12:00:00-07:00")
|
|
assert result is not None
|
|
assert result.utcoffset() is not None
|
|
|
|
def test_numeric_garbage_returns_none(self) -> None:
|
|
assert _parse_timestamp("12345") is None
|
|
|
|
def test_none_like_string_returns_none(self) -> None:
|
|
assert _parse_timestamp("None") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LLM pipeline cadence env vars (#179 review)
|
|
#
|
|
# Three jobs (session-collector, verification-detector, corporate-memory) and
|
|
# the health-check staleness grace must all derive from a single SCHEDULER_*
|
|
# env var per job, so an operator changing the cadence in one place moves
|
|
# both the schedule string and the grace window. The env var name was already
|
|
# read in app/api/health.py before this change but didn't actually drive the
|
|
# scheduler — this test pins the wired-up behavior.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_LLM_PIPELINE_ENV = (
|
|
"SCHEDULER_DATA_REFRESH_INTERVAL",
|
|
"SCHEDULER_HEALTH_CHECK_INTERVAL",
|
|
"SCHEDULER_TICK_SECONDS",
|
|
"SCHEDULER_SCRIPT_RUN_INTERVAL",
|
|
"SCHEDULER_SESSION_COLLECTOR_INTERVAL",
|
|
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL",
|
|
"SCHEDULER_CORPORATE_MEMORY_INTERVAL",
|
|
)
|
|
|
|
|
|
def _clear_scheduler_env(monkeypatch) -> None:
|
|
for v in _LLM_PIPELINE_ENV:
|
|
monkeypatch.delenv(v, raising=False)
|
|
|
|
|
|
class TestLLMPipelineCadenceEnvVars:
|
|
"""Three new env vars drive both the scheduler and the health grace window."""
|
|
|
|
def test_default_cadences_preserve_coprime_offset(self, monkeypatch) -> None:
|
|
"""Defaults are 10m / 15m / 17m so the three jobs don't fire on the same tick."""
|
|
_clear_scheduler_env(monkeypatch)
|
|
from services.scheduler.__main__ import build_jobs
|
|
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
|
|
assert jobs["session-collector"] == "every 10m"
|
|
assert jobs["session-processor:verification"] == "every 15m"
|
|
assert jobs["corporate-memory"] == "every 17m"
|
|
|
|
def test_session_collector_env_override_changes_cadence(self, monkeypatch) -> None:
|
|
_clear_scheduler_env(monkeypatch)
|
|
monkeypatch.setenv("SCHEDULER_SESSION_COLLECTOR_INTERVAL", "300") # 5m
|
|
from services.scheduler.__main__ import build_jobs
|
|
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
|
|
assert jobs["session-collector"] == "every 5m"
|
|
# Other LLM jobs must be unaffected.
|
|
assert jobs["session-processor:verification"] == "every 15m"
|
|
assert jobs["corporate-memory"] == "every 17m"
|
|
|
|
def test_verification_detector_env_override_changes_cadence(self, monkeypatch) -> None:
|
|
_clear_scheduler_env(monkeypatch)
|
|
monkeypatch.setenv("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", "600") # 10m
|
|
from services.scheduler.__main__ import build_jobs
|
|
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
|
|
assert jobs["session-processor:verification"] == "every 10m"
|
|
assert jobs["session-collector"] == "every 10m"
|
|
assert jobs["corporate-memory"] == "every 17m"
|
|
|
|
def test_corporate_memory_env_override_changes_cadence(self, monkeypatch) -> None:
|
|
_clear_scheduler_env(monkeypatch)
|
|
monkeypatch.setenv("SCHEDULER_CORPORATE_MEMORY_INTERVAL", "1800") # 30m
|
|
from services.scheduler.__main__ import build_jobs
|
|
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
|
|
assert jobs["corporate-memory"] == "every 30m"
|
|
assert jobs["session-collector"] == "every 10m"
|
|
assert jobs["session-processor:verification"] == "every 15m"
|
|
|
|
@pytest.mark.parametrize("var", [
|
|
"SCHEDULER_SESSION_COLLECTOR_INTERVAL",
|
|
"SCHEDULER_VERIFICATION_DETECTOR_INTERVAL",
|
|
"SCHEDULER_CORPORATE_MEMORY_INTERVAL",
|
|
])
|
|
@pytest.mark.parametrize("bad", ["0", "-5", "abc", ""])
|
|
def test_invalid_llm_env_rejected(self, monkeypatch, var, bad) -> None:
|
|
_clear_scheduler_env(monkeypatch)
|
|
monkeypatch.setenv(var, bad)
|
|
from services.scheduler.__main__ import build_jobs
|
|
with pytest.raises(ValueError):
|
|
build_jobs()
|
|
|
|
|
|
class TestVerificationDetectorGraceFollowsCadence:
|
|
"""The health-check grace window is 2x the cadence — same env var drives both."""
|
|
|
|
def test_grace_doubles_when_env_overrides_cadence(self, monkeypatch) -> None:
|
|
_clear_scheduler_env(monkeypatch)
|
|
monkeypatch.setenv("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", "600") # 10m
|
|
from app.api.health import _verification_detector_grace_seconds
|
|
from services.scheduler.__main__ import build_jobs
|
|
|
|
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
|
|
# Cadence and grace MUST be derived from the same env var, so an
|
|
# operator who throttles the detector for any reason (rate-limit,
|
|
# cost, debugging) gets a proportionally wider staleness window
|
|
# automatically — no second knob to forget.
|
|
assert jobs["session-processor:verification"] == "every 10m"
|
|
assert _verification_detector_grace_seconds() == 2 * 600
|
|
|
|
def test_grace_uses_default_cadence_when_env_unset(self, monkeypatch) -> None:
|
|
_clear_scheduler_env(monkeypatch)
|
|
from app.api.health import _verification_detector_grace_seconds
|
|
# Default cadence 900s -> grace 1800s.
|
|
assert _verification_detector_grace_seconds() == 2 * 900
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# services/scheduler/__main__._run_job — terminal-state bookkeeping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRunJobBookkeeping:
|
|
"""Per-job worker that advances last_run + clears in_flight on terminal
|
|
state (success OR failure). Pre-fix: last_run only advanced on success,
|
|
causing permanently failing jobs to retry every tick (30s) instead of
|
|
on cadence (15min). PR #232 review fix."""
|
|
|
|
def _setup(self):
|
|
import threading
|
|
last_run: dict[str, str | None] = {"verification": None}
|
|
in_flight: set[str] = {"verification"}
|
|
return last_run, in_flight, threading.Lock()
|
|
|
|
def test_advances_last_run_on_success(self, monkeypatch):
|
|
from services.scheduler import __main__ as sched
|
|
last_run, in_flight, lock = self._setup()
|
|
monkeypatch.setattr(sched, "_call_api", lambda *a, **kw: True)
|
|
|
|
sched._run_job(
|
|
"verification", "/api/admin/run-x", "POST", 60, "2026-01-01T00:00:00",
|
|
last_run, in_flight, lock,
|
|
)
|
|
assert last_run["verification"] == "2026-01-01T00:00:00"
|
|
assert "verification" not in in_flight
|
|
|
|
def test_advances_last_run_on_failure(self, monkeypatch):
|
|
"""Permanently-failing jobs must NOT hot-loop every tick — last_run
|
|
advances even when _call_api returns False."""
|
|
from services.scheduler import __main__ as sched
|
|
last_run, in_flight, lock = self._setup()
|
|
monkeypatch.setattr(sched, "_call_api", lambda *a, **kw: False)
|
|
|
|
sched._run_job(
|
|
"verification", "/api/admin/run-x", "POST", 60, "2026-01-01T00:00:00",
|
|
last_run, in_flight, lock,
|
|
)
|
|
assert last_run["verification"] == "2026-01-01T00:00:00"
|
|
assert "verification" not in in_flight
|
|
|
|
def test_advances_last_run_when_call_raises(self, monkeypatch):
|
|
"""`_call_api` catches its own exceptions and returns False, but a
|
|
synchronous bug above it (e.g. KeyError on jobs tuple unpacking)
|
|
could still bubble. The finally block must release in_flight either
|
|
way, otherwise the processor wedges until container restart."""
|
|
from services.scheduler import __main__ as sched
|
|
last_run, in_flight, lock = self._setup()
|
|
|
|
def _boom(*a, **kw):
|
|
raise RuntimeError("simulated unhandled scheduler bug")
|
|
|
|
monkeypatch.setattr(sched, "_call_api", _boom)
|
|
|
|
with pytest.raises(RuntimeError):
|
|
sched._run_job(
|
|
"verification", "/api/admin/run-x", "POST", 60, "2026-01-01T00:00:00",
|
|
last_run, in_flight, lock,
|
|
)
|
|
# Even on raise, bookkeeping ran.
|
|
assert last_run["verification"] == "2026-01-01T00:00:00"
|
|
assert "verification" not in in_flight
|
|
|
|
|
|
class TestRunLoopParallelism:
|
|
"""The scheduler tick must dispatch jobs in parallel — a 900s verification
|
|
run cannot block the 60s health-check from firing on its own cadence.
|
|
PR #232 review fix replaces the `for-loop + synchronous _call_api` with
|
|
a `ThreadPoolExecutor.submit` per due job."""
|
|
|
|
def test_in_flight_skip_prevents_duplicate_launches(self, monkeypatch):
|
|
"""When a previous tick's job hasn't returned yet, the next tick
|
|
must NOT submit it again — otherwise a 10-min run during which
|
|
20 ticks fire would queue 20 duplicate POSTs against the same
|
|
processor (the admin endpoint's per-processor lock would 409 most
|
|
of them, but they'd still be wasted requests + audit-log noise)."""
|
|
import threading
|
|
import time as _time
|
|
from services.scheduler import __main__ as sched
|
|
|
|
# Single job that takes ~0.3s. Tick is 0.05s. Without in_flight
|
|
# protection we'd see >5 launches per the run loop's tick budget.
|
|
call_count = {"n": 0}
|
|
call_count_lock = threading.Lock()
|
|
|
|
def slow_call(*a, **kw):
|
|
with call_count_lock:
|
|
call_count["n"] += 1
|
|
_time.sleep(0.3)
|
|
return True
|
|
|
|
monkeypatch.setattr(sched, "_call_api", slow_call)
|
|
# Force a single short-cadence job + short tick.
|
|
monkeypatch.setattr(
|
|
sched, "build_jobs",
|
|
lambda: [("test-job", "every 1m", "/api/test", "POST", 60)],
|
|
)
|
|
monkeypatch.setattr(sched, "resolved_tick_seconds", lambda: 0)
|
|
# Always-due so the in_flight check is what gates the second launch.
|
|
monkeypatch.setattr(sched, "is_table_due", lambda *a, **kw: True)
|
|
|
|
# Kill the run loop after 0.4s — long enough for ≥5 ticks under
|
|
# the 0s tick budget, short enough that the job (0.3s) hasn't
|
|
# finished its first invocation yet.
|
|
sched._running = True
|
|
|
|
def _kill():
|
|
_time.sleep(0.4)
|
|
sched._running = False
|
|
|
|
threading.Thread(target=_kill, daemon=True).start()
|
|
sched.run()
|
|
|
|
# Without in_flight: ≥5 launches. With: exactly 1 (or maybe 2 if
|
|
# the first one finished mid-tick — both are correct, the bug is
|
|
# ≥5).
|
|
assert call_count["n"] <= 2, f"in_flight protection failed; {call_count['n']} launches"
|