From a621a415cc873cc93aad36a56ec1084964fd890e Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 00:04:28 +0200 Subject: [PATCH] fix(health): session-pipeline staleness check (#176) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET /api/health/detailed now returns a session_pipeline service entry. Heuristic: max(mtime of /data/user_sessions/**/*.jsonl) <= max(processed_at in session_extraction_state) + grace_seconds grace_seconds = 2 × verification-detector cadence (default 30 min; configurable via SCHEDULER_VERIFICATION_DETECTOR_INTERVAL). When the assert fails, status='warning' (never 'error') with an actionable detail pointing at the verification-detector scheduler job. A warning bubbles up to the existing overall='degraded' aggregation — operators querying /api/health/detailed (or /agnes diagnose system) get a clear breadcrumb instead of a silently-broken pipeline. Cold-start case (no session files, or files newer than the grace window with empty state table) is handled explicitly to avoid noise on a fresh deploy. Tests: tests/test_health_session_pipeline.py. --- app/api/health.py | 115 ++++++++++++++++++++++++++ tests/test_health_session_pipeline.py | 108 ++++++++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 tests/test_health_session_pipeline.py diff --git a/app/api/health.py b/app/api/health.py index 736542b..9b68f52 100644 --- a/app/api/health.py +++ b/app/api/health.py @@ -2,6 +2,7 @@ import os from datetime import datetime, timezone +from pathlib import Path from fastapi import APIRouter, Depends import duckdb @@ -87,6 +88,113 @@ def _check_bq_billing_project() -> dict | None: } +def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict: + """Detect a stuck session pipeline: jsonls land but never get processed. + + Heuristic (#176): + max(mtime of /data/user_sessions/**/*.jsonl) <= + max(processed_at in session_extraction_state) + grace_seconds + + grace_seconds = 2 × the verification-detector cadence (default 15m → 30m). + Operators with a custom SCHEDULER_VERIFICATION_DETECTOR_INTERVAL can + extend the grace by setting that env var. + + Returns ``warning`` (never ``error``) — the LLM may be down for + maintenance, not a hard failure. Returns ``ok`` when no session + files exist (cold-start case). + """ + # Resolve user_sessions dir from the same DATA_DIR conftest sets up. + data_dir = Path(os.environ.get("DATA_DIR", "/data")) + user_sessions = data_dir / "user_sessions" + + try: + session_files = list(user_sessions.glob("**/*.jsonl")) + except OSError: + # Permission / FS error — surface as 'unknown' rather than ok/warning. + return {"status": "unknown", "detail": "could not scan user_sessions"} + + if not session_files: + return {"status": "ok", "detail": "no session files yet"} + + try: + latest_session_mtime = max(f.stat().st_mtime for f in session_files) + except OSError: + return {"status": "unknown", "detail": "could not stat session files"} + + # Look up the most recent processed_at. + try: + row = conn.execute( + "SELECT MAX(processed_at) FROM session_extraction_state" + ).fetchone() + except Exception as e: + return {"status": "unknown", "detail": f"could not query session_extraction_state: {e}"} + + last_processed = row[0] if row else None + + grace_seconds = _verification_detector_grace_seconds() + + if last_processed is None: + # Files exist but state table is empty — pipeline never ran here. + if (datetime.now(timezone.utc).timestamp() - latest_session_mtime) > grace_seconds: + return { + "status": "warning", + "detail": ( + "session_extraction_state is empty but jsonl files exist. " + "Check the verification-detector scheduler job." + ), + "session_files": len(session_files), + } + return {"status": "ok", "session_files": len(session_files)} + + # Both available — compare. session_extraction_state.processed_at is + # stored as DuckDB TIMESTAMP (naive). DuckDB converts tz-aware writes + # to local time before storing, so the only safe interpretation is + # local-naive on read. Compute the lag against `datetime.now()` (also + # local-naive) and only convert to epoch via the OS's local timezone + # mapping at the comparison boundary. + now_local_naive = datetime.now() + if hasattr(last_processed, "tzinfo") and last_processed.tzinfo is not None: + last_processed = last_processed.replace(tzinfo=None) + proc_age_seconds = (now_local_naive - last_processed).total_seconds() + file_age_seconds = time_now() - latest_session_mtime + + # File is newer than the last processed_at by more than grace_seconds. + if proc_age_seconds - file_age_seconds > grace_seconds: + lag_seconds = int(proc_age_seconds - file_age_seconds) + return { + "status": "warning", + "detail": ( + f"session jsonls newer than session_extraction_state by ~{lag_seconds}s " + f"(grace={grace_seconds}s). Check the verification-detector scheduler " + f"job — uploads are not being processed." + ), + "lag_seconds": lag_seconds, + "session_files": len(session_files), + } + + return {"status": "ok", "session_files": len(session_files)} + + +def time_now() -> float: + """Wall-clock seconds since epoch — separated out for test seam parity.""" + import time as _t + return _t.time() + + +def _verification_detector_grace_seconds() -> int: + """Compute the staleness grace window for the session pipeline check.""" + cadence_seconds_default = 15 * 60 + raw = os.environ.get("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL") + if raw: + try: + cadence_seconds = int(raw) + if cadence_seconds > 0: + return 2 * cadence_seconds + except ValueError: + pass + return 2 * cadence_seconds_default + + def _check_db_schema() -> dict: """Check DB schema version against expected SCHEMA_VERSION. @@ -177,6 +285,13 @@ async def health_check_detailed( if bq_cfg is not None: checks["bq_config"] = bq_cfg + # Session pipeline (#176): warn when uploaded jsonls aren't getting + # processed by the verification-detector cadence. + try: + checks["session_pipeline"] = _check_session_pipeline(conn) + except Exception as e: + checks["session_pipeline"] = {"status": "unknown", "detail": str(e)} + overall = "healthy" for check in checks.values(): if check.get("status") == "error": diff --git a/tests/test_health_session_pipeline.py b/tests/test_health_session_pipeline.py new file mode 100644 index 0000000..b3a103d --- /dev/null +++ b/tests/test_health_session_pipeline.py @@ -0,0 +1,108 @@ +"""Health-check coverage for the session pipeline (#176). + +GET /api/health/detailed must surface a `session_pipeline` service entry +that warns when freshly-uploaded session jsonls aren't being processed. + +Heuristic: + max(mtime of /data/user_sessions/**/*.jsonl) <= + max(processed_at in session_extraction_state) + grace + +Where grace = 2 * scheduler verification-detector cadence (default 15m). + +When the assert fails, return status='warning' with an actionable +message — never 'error' (the LLM service may be down for maintenance, +not a hard failure). +""" + +from __future__ import annotations + +import os +import time +from datetime import datetime, timezone +from pathlib import Path + +import pytest + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def _seed_extraction_state(processed_at: datetime, session_file: str = "/data/user_sessions/x/y.jsonl"): + """Insert a synthetic row into session_extraction_state.""" + from src.db import get_system_db + + conn = get_system_db() + conn.execute( + "INSERT OR REPLACE INTO session_extraction_state " + "(session_file, username, processed_at, items_extracted, file_hash) " + "VALUES (?, ?, ?, ?, ?)", + [session_file, "x", processed_at, 0, "deadbeef"], + ) + conn.close() + + +def _make_session_file(env_data_dir: Path, name: str, mtime_ago_seconds: int) -> Path: + """Create a fake session jsonl with the requested mtime offset.""" + sessions_dir = env_data_dir / "user_sessions" / "x" + sessions_dir.mkdir(parents=True, exist_ok=True) + f = sessions_dir / name + f.write_text("{}\n") + target = time.time() - mtime_ago_seconds + os.utime(f, (target, target)) + return f + + +class TestSessionPipelineHealthCheck: + def test_no_session_files_returns_ok(self, seeded_app): + """Empty /data/user_sessions/ is the cold-start case — not a warning.""" + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + services = resp.json()["services"] + assert "session_pipeline" in services + assert services["session_pipeline"]["status"] == "ok" + + def test_session_files_recently_processed_returns_ok(self, seeded_app): + env = seeded_app["env"] + # Session file mtime: 1 minute ago. Processed: 30 seconds ago. + # Within grace window → ok. + _make_session_file(env["data_dir"], "ok.jsonl", mtime_ago_seconds=60) + _seed_extraction_state(datetime.now(timezone.utc)) + + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + services = resp.json()["services"] + assert services["session_pipeline"]["status"] == "ok" + + def test_old_session_files_unprocessed_returns_warning(self, seeded_app, monkeypatch): + env = seeded_app["env"] + # Session file mtime: 2 hours ago. Processed: 3 hours ago. + # Way outside the 30-min grace window (2x default 15m cadence) → warning. + _make_session_file(env["data_dir"], "old.jsonl", mtime_ago_seconds=7200) + from datetime import timedelta + _seed_extraction_state(datetime.now(timezone.utc) - timedelta(hours=3)) + + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + body = resp.json() + services = body["services"] + assert services["session_pipeline"]["status"] == "warning" + # Actionable detail must point at the verification-detector job. + detail = services["session_pipeline"].get("detail", "") + assert "verification-detector" in detail or "session" in detail.lower() + # Warning bubbles up to overall status='degraded' (existing pattern). + assert body["status"] == "degraded" + + def test_session_files_never_processed_returns_warning(self, seeded_app): + """Files exist but session_extraction_state is empty → warning.""" + env = seeded_app["env"] + _make_session_file(env["data_dir"], "neverprocessed.jsonl", mtime_ago_seconds=7200) + + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + services = resp.json()["services"] + assert services["session_pipeline"]["status"] == "warning"