fix(health): session-pipeline staleness check (#176)

GET /api/health/detailed now returns a session_pipeline service entry.
Heuristic:
  max(mtime of /data/user_sessions/**/*.jsonl) <=
  max(processed_at in session_extraction_state) + grace_seconds

grace_seconds = 2 × verification-detector cadence (default 30 min;
configurable via SCHEDULER_VERIFICATION_DETECTOR_INTERVAL).

When the assert fails, status='warning' (never 'error') with an
actionable detail pointing at the verification-detector scheduler job.
A warning bubbles up to the existing overall='degraded' aggregation —
operators querying /api/health/detailed (or /agnes diagnose system)
get a clear breadcrumb instead of a silently-broken pipeline.

Cold-start case (no session files, or files newer than the grace
window with empty state table) is handled explicitly to avoid noise
on a fresh deploy.

Tests: tests/test_health_session_pipeline.py.
This commit is contained in:
ZdenekSrotyr 2026-05-05 00:04:28 +02:00
parent c53c1e1572
commit a621a415cc
2 changed files with 223 additions and 0 deletions

View file

@ -2,6 +2,7 @@
import os
from datetime import datetime, timezone
from pathlib import Path
from fastapi import APIRouter, Depends
import duckdb
@ -87,6 +88,113 @@ def _check_bq_billing_project() -> dict | None:
}
def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict:
"""Detect a stuck session pipeline: jsonls land but never get processed.
Heuristic (#176):
max(mtime of /data/user_sessions/**/*.jsonl) <=
max(processed_at in session_extraction_state) + grace_seconds
grace_seconds = 2 × the verification-detector cadence (default 15m 30m).
Operators with a custom SCHEDULER_VERIFICATION_DETECTOR_INTERVAL can
extend the grace by setting that env var.
Returns ``warning`` (never ``error``) the LLM may be down for
maintenance, not a hard failure. Returns ``ok`` when no session
files exist (cold-start case).
"""
# Resolve user_sessions dir from the same DATA_DIR conftest sets up.
data_dir = Path(os.environ.get("DATA_DIR", "/data"))
user_sessions = data_dir / "user_sessions"
try:
session_files = list(user_sessions.glob("**/*.jsonl"))
except OSError:
# Permission / FS error — surface as 'unknown' rather than ok/warning.
return {"status": "unknown", "detail": "could not scan user_sessions"}
if not session_files:
return {"status": "ok", "detail": "no session files yet"}
try:
latest_session_mtime = max(f.stat().st_mtime for f in session_files)
except OSError:
return {"status": "unknown", "detail": "could not stat session files"}
# Look up the most recent processed_at.
try:
row = conn.execute(
"SELECT MAX(processed_at) FROM session_extraction_state"
).fetchone()
except Exception as e:
return {"status": "unknown", "detail": f"could not query session_extraction_state: {e}"}
last_processed = row[0] if row else None
grace_seconds = _verification_detector_grace_seconds()
if last_processed is None:
# Files exist but state table is empty — pipeline never ran here.
if (datetime.now(timezone.utc).timestamp() - latest_session_mtime) > grace_seconds:
return {
"status": "warning",
"detail": (
"session_extraction_state is empty but jsonl files exist. "
"Check the verification-detector scheduler job."
),
"session_files": len(session_files),
}
return {"status": "ok", "session_files": len(session_files)}
# Both available — compare. session_extraction_state.processed_at is
# stored as DuckDB TIMESTAMP (naive). DuckDB converts tz-aware writes
# to local time before storing, so the only safe interpretation is
# local-naive on read. Compute the lag against `datetime.now()` (also
# local-naive) and only convert to epoch via the OS's local timezone
# mapping at the comparison boundary.
now_local_naive = datetime.now()
if hasattr(last_processed, "tzinfo") and last_processed.tzinfo is not None:
last_processed = last_processed.replace(tzinfo=None)
proc_age_seconds = (now_local_naive - last_processed).total_seconds()
file_age_seconds = time_now() - latest_session_mtime
# File is newer than the last processed_at by more than grace_seconds.
if proc_age_seconds - file_age_seconds > grace_seconds:
lag_seconds = int(proc_age_seconds - file_age_seconds)
return {
"status": "warning",
"detail": (
f"session jsonls newer than session_extraction_state by ~{lag_seconds}s "
f"(grace={grace_seconds}s). Check the verification-detector scheduler "
f"job — uploads are not being processed."
),
"lag_seconds": lag_seconds,
"session_files": len(session_files),
}
return {"status": "ok", "session_files": len(session_files)}
def time_now() -> float:
"""Wall-clock seconds since epoch — separated out for test seam parity."""
import time as _t
return _t.time()
def _verification_detector_grace_seconds() -> int:
"""Compute the staleness grace window for the session pipeline check."""
cadence_seconds_default = 15 * 60
raw = os.environ.get("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL")
if raw:
try:
cadence_seconds = int(raw)
if cadence_seconds > 0:
return 2 * cadence_seconds
except ValueError:
pass
return 2 * cadence_seconds_default
def _check_db_schema() -> dict:
"""Check DB schema version against expected SCHEMA_VERSION.
@ -177,6 +285,13 @@ async def health_check_detailed(
if bq_cfg is not None:
checks["bq_config"] = bq_cfg
# Session pipeline (#176): warn when uploaded jsonls aren't getting
# processed by the verification-detector cadence.
try:
checks["session_pipeline"] = _check_session_pipeline(conn)
except Exception as e:
checks["session_pipeline"] = {"status": "unknown", "detail": str(e)}
overall = "healthy"
for check in checks.values():
if check.get("status") == "error":

View file

@ -0,0 +1,108 @@
"""Health-check coverage for the session pipeline (#176).
GET /api/health/detailed must surface a `session_pipeline` service entry
that warns when freshly-uploaded session jsonls aren't being processed.
Heuristic:
max(mtime of /data/user_sessions/**/*.jsonl) <=
max(processed_at in session_extraction_state) + grace
Where grace = 2 * scheduler verification-detector cadence (default 15m).
When the assert fails, return status='warning' with an actionable
message never 'error' (the LLM service may be down for maintenance,
not a hard failure).
"""
from __future__ import annotations
import os
import time
from datetime import datetime, timezone
from pathlib import Path
import pytest
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def _seed_extraction_state(processed_at: datetime, session_file: str = "/data/user_sessions/x/y.jsonl"):
"""Insert a synthetic row into session_extraction_state."""
from src.db import get_system_db
conn = get_system_db()
conn.execute(
"INSERT OR REPLACE INTO session_extraction_state "
"(session_file, username, processed_at, items_extracted, file_hash) "
"VALUES (?, ?, ?, ?, ?)",
[session_file, "x", processed_at, 0, "deadbeef"],
)
conn.close()
def _make_session_file(env_data_dir: Path, name: str, mtime_ago_seconds: int) -> Path:
"""Create a fake session jsonl with the requested mtime offset."""
sessions_dir = env_data_dir / "user_sessions" / "x"
sessions_dir.mkdir(parents=True, exist_ok=True)
f = sessions_dir / name
f.write_text("{}\n")
target = time.time() - mtime_ago_seconds
os.utime(f, (target, target))
return f
class TestSessionPipelineHealthCheck:
def test_no_session_files_returns_ok(self, seeded_app):
"""Empty /data/user_sessions/ is the cold-start case — not a warning."""
c = seeded_app["client"]
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
assert resp.status_code == 200
services = resp.json()["services"]
assert "session_pipeline" in services
assert services["session_pipeline"]["status"] == "ok"
def test_session_files_recently_processed_returns_ok(self, seeded_app):
env = seeded_app["env"]
# Session file mtime: 1 minute ago. Processed: 30 seconds ago.
# Within grace window → ok.
_make_session_file(env["data_dir"], "ok.jsonl", mtime_ago_seconds=60)
_seed_extraction_state(datetime.now(timezone.utc))
c = seeded_app["client"]
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
assert resp.status_code == 200
services = resp.json()["services"]
assert services["session_pipeline"]["status"] == "ok"
def test_old_session_files_unprocessed_returns_warning(self, seeded_app, monkeypatch):
env = seeded_app["env"]
# Session file mtime: 2 hours ago. Processed: 3 hours ago.
# Way outside the 30-min grace window (2x default 15m cadence) → warning.
_make_session_file(env["data_dir"], "old.jsonl", mtime_ago_seconds=7200)
from datetime import timedelta
_seed_extraction_state(datetime.now(timezone.utc) - timedelta(hours=3))
c = seeded_app["client"]
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
assert resp.status_code == 200
body = resp.json()
services = body["services"]
assert services["session_pipeline"]["status"] == "warning"
# Actionable detail must point at the verification-detector job.
detail = services["session_pipeline"].get("detail", "")
assert "verification-detector" in detail or "session" in detail.lower()
# Warning bubbles up to overall status='degraded' (existing pattern).
assert body["status"] == "degraded"
def test_session_files_never_processed_returns_warning(self, seeded_app):
"""Files exist but session_extraction_state is empty → warning."""
env = seeded_app["env"]
_make_session_file(env["data_dir"], "neverprocessed.jsonl", mtime_ago_seconds=7200)
c = seeded_app["client"]
resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"]))
assert resp.status_code == 200
services = resp.json()["services"]
assert services["session_pipeline"]["status"] == "warning"