* Extract session pipeline framework, refactor verification, add UsageProcessor skeleton Pluggable framework under services/session_pipeline/ (contract + lib + per-processor runner) so multiple processors can read /data/user_sessions/<key>/*.jsonl on their own cadence with full failure isolation. Verification flow becomes the first plugin; a no-op UsageProcessor reserves the second slot pending a separate brainstorm on extraction logic + storage shape. Schema v28→v29: rename session_extraction_state → session_processor_state with composite PK (processor_name, session_file). Existing rows copied over with processor_name='verification'; legacy table dropped. Migration is idempotent and no-ops the copy step on fresh installs that came up at the new schema. Endpoint: /api/admin/run-verification-detector replaced by parametrized /api/admin/run-session-processor?processor=<name>. Audit action format follows. Scheduler JOBS: verification-detector entry split into session-processor:verification + session-processor:usage. SCHEDULER_VERIFICATION_DETECTOR_INTERVAL retained for operator compatibility (drives both cadence and health-check grace window); SCHEDULER_USAGE_PROCESSOR_INTERVAL added. * Address PR #232 review: scan dead branch + per-processor lock - `SessionProcessorStateRepository.scan_unprocessed_for` dead else: both branches surfaced every jsonl, the SELECT was unused, runner MD5-rehashed every stable session per tick. Replaced with an mtime precheck — stable sessions (mtime <= processed_at) are filtered at scan; modified files still surface for the runner's authoritative `file_hash` invalidation. Naive-local comparison matches the existing health-check idiom (DuckDB TIMESTAMP strips tz on storage). - Per-processor advisory lock around `_run_processor` in `/api/admin/run-session-processor`. Scheduler tick + manual admin POST could otherwise both run, both call create_evidence on overlapping detections, and accumulate duplicate verification_evidence rows (the dedup short-circuit only covers create+contradiction, not evidence per ADR Decision 3). Non-blocking acquire → 409 Conflict on concurrent invocation; release in finally so a runner exception doesn't wedge the processor. Tests: two new scan unit tests (mtime filter + post-mark mtime bump), 409 endpoint test, lock-released-on-exception test. Two existing tests updated for the new "filtered at scan" stat shape (previously asserted skipped == 1, now scanned == 0). * Address PR #232 review #2: parallel scheduler tick + last_run on terminal state Two pre-existing scaffold bugs in services/scheduler/__main__.py amplified by adding more session-pipeline jobs: 1. Serial for-loop over jobs with synchronous httpx.post(timeout=900) — a 10-minute verification run blocked every other job (data-refresh, health-check, usage, corporate-memory) for the whole window. The PR's stated isolation guarantee held inside the runner but broke at the scheduler dispatch layer. 2. last_run advanced only when _call_api returned True. Permanent-failure jobs hot-looped on every tick (30s) instead of cadence (15min). Fix: ThreadPoolExecutor.submit per due job + per-job in_flight set so a long-running job can't be re-launched on subsequent ticks. last_run advances unconditionally in finally; errors still surface via _call_api logging + audit_log on the receiving side. _run_job extracted to module-level for unit testing. New tests: - TestRunJobBookkeeping: advances on success / failure / unhandled raise - TestRunLoopParallelism: in_flight protection prevents duplicate launches across ticks for a single slow job --------- Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
196 lines
8.5 KiB
Python
196 lines
8.5 KiB
Python
"""Health-check coverage for the session pipeline (#176).
|
||
|
||
GET /api/health/detailed must surface a `session_pipeline` service entry
|
||
that warns when freshly-uploaded session jsonls aren't being processed
|
||
by the verification processor.
|
||
|
||
Heuristic:
|
||
max(mtime of /data/user_sessions/**/*.jsonl) <=
|
||
max(processed_at in session_processor_state where processor='verification') + grace
|
||
|
||
Where grace = 2 * scheduler verification-detector cadence (default 15m).
|
||
|
||
When the assert fails, return status='warning' with an actionable
|
||
message — never 'error' (the LLM service may be down for maintenance,
|
||
not a hard failure).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
|
||
|
||
def _auth(token: str) -> dict:
|
||
return {"Authorization": f"Bearer {token}"}
|
||
|
||
|
||
def _seed_extraction_state(processed_at: datetime, session_file: str = "/data/user_sessions/x/y.jsonl"):
|
||
"""Insert a synthetic verification-processor state row."""
|
||
from src.db import get_system_db
|
||
|
||
conn = get_system_db()
|
||
conn.execute(
|
||
"INSERT OR REPLACE INTO session_processor_state "
|
||
"(processor_name, session_file, username, processed_at, items_extracted, file_hash) "
|
||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||
["verification", session_file, "x", processed_at, 0, "deadbeef"],
|
||
)
|
||
conn.close()
|
||
|
||
|
||
def _make_session_file(env_data_dir: Path, name: str, mtime_ago_seconds: int) -> Path:
|
||
"""Create a fake session jsonl with the requested mtime offset."""
|
||
sessions_dir = env_data_dir / "user_sessions" / "x"
|
||
sessions_dir.mkdir(parents=True, exist_ok=True)
|
||
f = sessions_dir / name
|
||
f.write_text("{}\n")
|
||
target = time.time() - mtime_ago_seconds
|
||
os.utime(f, (target, target))
|
||
return f
|
||
|
||
|
||
class TestSessionPipelineHealthCheck:
|
||
def test_no_session_files_returns_ok(self, seeded_app):
|
||
"""Empty /data/user_sessions/ is the cold-start case — not a warning."""
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
services = resp.json()["services"]
|
||
assert "session_pipeline" in services
|
||
assert services["session_pipeline"]["status"] == "ok"
|
||
|
||
def test_session_files_recently_processed_returns_ok(self, seeded_app):
|
||
env = seeded_app["env"]
|
||
# Session file mtime: 1 minute ago. Processed: 30 seconds ago.
|
||
# Within grace window → ok.
|
||
_make_session_file(env["data_dir"], "ok.jsonl", mtime_ago_seconds=60)
|
||
_seed_extraction_state(datetime.now(timezone.utc))
|
||
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
services = resp.json()["services"]
|
||
assert services["session_pipeline"]["status"] == "ok"
|
||
|
||
def test_old_session_files_unprocessed_returns_warning(self, seeded_app, monkeypatch):
|
||
env = seeded_app["env"]
|
||
# Session file mtime: 2 hours ago. Processed: 3 hours ago.
|
||
# Way outside the 30-min grace window (2x default 15m cadence) → warning.
|
||
_make_session_file(env["data_dir"], "old.jsonl", mtime_ago_seconds=7200)
|
||
from datetime import timedelta
|
||
_seed_extraction_state(datetime.now(timezone.utc) - timedelta(hours=3))
|
||
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
body = resp.json()
|
||
services = body["services"]
|
||
assert services["session_pipeline"]["status"] == "warning"
|
||
# Actionable detail must point at the verification-detector job.
|
||
detail = services["session_pipeline"].get("detail", "")
|
||
assert "verification-detector" in detail or "session" in detail.lower()
|
||
# Warning bubbles up to overall status='degraded' (existing pattern).
|
||
assert body["status"] == "degraded"
|
||
|
||
def test_session_files_never_processed_returns_warning(self, seeded_app):
|
||
"""Files exist but verification has no rows in session_processor_state → warning."""
|
||
env = seeded_app["env"]
|
||
_make_session_file(env["data_dir"], "neverprocessed.jsonl", mtime_ago_seconds=7200)
|
||
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
services = resp.json()["services"]
|
||
assert services["session_pipeline"]["status"] == "warning"
|
||
|
||
|
||
class TestSessionPipelineFIFOCheck:
|
||
"""FIFO check (#0.47.4): MAX-only comparison passes silently when
|
||
the verification-detector skips a particular file but keeps processing
|
||
newer ones. New code finds the oldest unprocessed file and surfaces it
|
||
as `info` once it's older than SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS
|
||
(default 4× verification-detector grace = 2h).
|
||
"""
|
||
|
||
def test_fifo_check_warns_on_stuck_old_file_when_newer_was_processed(self, seeded_app):
|
||
"""Old jsonl never got processed even though a newer one was — info."""
|
||
env = seeded_app["env"]
|
||
# Old file (5h ago), NOT in extraction_state.
|
||
_make_session_file(env["data_dir"], "stuck_old.jsonl", mtime_ago_seconds=5 * 3600)
|
||
# Newer file (1min ago), processed_at=now → MAX-comparison says "ok".
|
||
_make_session_file(env["data_dir"], "fresh.jsonl", mtime_ago_seconds=60)
|
||
_seed_extraction_state(
|
||
datetime.now(timezone.utc),
|
||
session_file="x/fresh.jsonl",
|
||
)
|
||
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
body = resp.json()
|
||
services = body["services"]
|
||
sp = services["session_pipeline"]
|
||
assert sp["status"] == "info", f"expected info, got {sp}"
|
||
assert "skipped" in sp["detail"]
|
||
assert sp["stuck_file_age_seconds"] > 4 * 3600
|
||
assert sp["stuck_file"].endswith("stuck_old.jsonl")
|
||
# Info MUST NOT promote the headline to degraded — that's the whole
|
||
# point of starting at info severity.
|
||
assert body["status"] != "degraded"
|
||
|
||
def test_fifo_check_silent_when_old_file_under_threshold(self, seeded_app):
|
||
"""Old file is 1h old (< 2h default threshold) — should return ok."""
|
||
env = seeded_app["env"]
|
||
_make_session_file(env["data_dir"], "recent_unprocessed.jsonl", mtime_ago_seconds=3600)
|
||
_make_session_file(env["data_dir"], "fresh.jsonl", mtime_ago_seconds=60)
|
||
_seed_extraction_state(
|
||
datetime.now(timezone.utc),
|
||
session_file="x/fresh.jsonl",
|
||
)
|
||
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
services = resp.json()["services"]
|
||
assert services["session_pipeline"]["status"] == "ok"
|
||
|
||
def test_fifo_check_silent_when_no_unprocessed_files(self, seeded_app):
|
||
"""All FS jsonls are in extraction_state — should return ok."""
|
||
env = seeded_app["env"]
|
||
_make_session_file(env["data_dir"], "processed.jsonl", mtime_ago_seconds=60)
|
||
_seed_extraction_state(
|
||
datetime.now(timezone.utc),
|
||
session_file="x/processed.jsonl",
|
||
)
|
||
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
services = resp.json()["services"]
|
||
assert services["session_pipeline"]["status"] == "ok"
|
||
|
||
def test_fifo_check_threshold_env_override(self, seeded_app, monkeypatch):
|
||
"""SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS=60 → 5min-old file triggers info."""
|
||
monkeypatch.setenv("SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS", "60")
|
||
env = seeded_app["env"]
|
||
_make_session_file(env["data_dir"], "five_min_old.jsonl", mtime_ago_seconds=300)
|
||
_make_session_file(env["data_dir"], "fresh.jsonl", mtime_ago_seconds=10)
|
||
_seed_extraction_state(
|
||
datetime.now(timezone.utc),
|
||
session_file="x/fresh.jsonl",
|
||
)
|
||
|
||
c = seeded_app["client"]
|
||
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
|
||
assert resp.status_code == 200
|
||
body = resp.json()
|
||
services = body["services"]
|
||
sp = services["session_pipeline"]
|
||
assert sp["status"] == "info", f"expected info, got {sp}"
|
||
assert sp["stuck_file"].endswith("five_min_old.jsonl")
|
||
assert body["status"] != "degraded"
|