diff --git a/CHANGELOG.md b/CHANGELOG.md index fdbf8d8..b165502 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,13 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.47.4] — 2026-05-08 + +### Fixed + +- `services/session_collector` no longer logs "Collection complete: 0 users, 0 files copied" + "Group 'data-ops' not found" every 10 minutes in the Docker layout where `/home/*/user/sessions/` doesn't exist. New env var `AGNES_SKIP_LEGACY_COLLECTOR=1` (set by default in `docker-compose.yml`) short-circuits the collector pass. The bare-VM deployment path (where /home/* IS populated by Claude Code) leaves this unset and continues to scan + log normally — including the data-ops warning, which is load-bearing for catching missing-group mis-deploys. +- `agnes diagnose` `session_pipeline` check gains a FIFO-aware lookup: in addition to the existing MAX(processed_at) comparison (catches "detector hasn't run lately"), it now flags the case where an OLD jsonl never got processed even though newer ones did (= verification-detector skipped a file). Threshold defaults to 4× the verification-detector grace (= 2h with default 30min grace) and is configurable via `SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS`. Severity intentionally starts at `info` — operators can tighten to `warning` once they have prod data on false-positive rate. + ## [0.47.3] — 2026-05-07 ### Fixed diff --git a/app/api/health.py b/app/api/health.py index e937bbd..ebed4df 100644 --- a/app/api/health.py +++ b/app/api/health.py @@ -19,10 +19,13 @@ check function. The aggregator at the bottom of `health_check_detailed` treats `info` as non-promoting. """ +import logging import os from datetime import datetime, timezone from pathlib import Path +logger = logging.getLogger(__name__) + from fastapi import APIRouter, Depends, Query import duckdb @@ -110,6 +113,27 @@ def _check_bq_billing_project() -> dict | None: } +def _stuck_file_grace_seconds() -> int: + """How long (seconds) an unprocessed jsonl must sit before triggering + the FIFO check warning. Defaults to 4× the verification-detector grace + (= 2h with default 30min grace = 8 × 15min cadence). Configurable via + SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS env var. + + Started conservatively at 4× to avoid false positives on routine LLM + API hiccups. Operators can tighten with the env var once they have + prod data on extraction throughput. + """ + explicit = os.environ.get("SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS") + if explicit: + try: + v = int(explicit) + if v > 0: + return v + except ValueError: + pass + return 4 * _verification_detector_grace_seconds() + + def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict: """Detect a stuck session pipeline: jsonls land but never get processed. @@ -194,6 +218,64 @@ def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict: "session_files": len(session_files), } + # FIFO check (#0.47.4): the MAX-only comparison above can pass silently + # when the verification-detector skips a particular file but keeps + # processing newer ones. Detect that case by finding the oldest FS + # jsonl whose path is NOT in session_extraction_state.session_file + # and surfacing it once it's older than _stuck_file_grace_seconds. + try: + processed = { + row[0] + for row in conn.execute( + "SELECT session_file FROM session_extraction_state" + ).fetchall() + } + except Exception as e: + # Don't fail the health check on this enrichment. + logger.debug("FIFO check: could not read session_extraction_state: %s", e) + return {"status": "ok", "session_files": len(session_files)} + + # session_extraction_state.session_file is stored as the path the + # extractor saw. Older rows store an absolute path (e.g. + # "/data/user_sessions/x/y.jsonl"); newer code stores a relative path + # ("x/y.jsonl"). Match on either form so the FIFO check is robust to + # both — a row stored under either spelling counts as processed. + user_sessions_root = data_dir / "user_sessions" + oldest_unprocessed: tuple[float, str] | None = None + for f in session_files: + try: + rel = str(f.relative_to(user_sessions_root)) + except ValueError: + continue # not under user_sessions_root, skip + absolute = str(f) + if rel in processed or absolute in processed: + continue + try: + mtime = f.stat().st_mtime + except OSError: + continue + if oldest_unprocessed is None or mtime < oldest_unprocessed[0]: + oldest_unprocessed = (mtime, rel) + + if oldest_unprocessed is not None: + stuck_grace = _stuck_file_grace_seconds() + age_s = time_now() - oldest_unprocessed[0] + if age_s > stuck_grace: + return { + "status": "info", + "detail": ( + f"verification-detector skipped a file: oldest unprocessed " + f"jsonl is ~{int(age_s)}s old " + f"(stuck_grace={stuck_grace}s, file={oldest_unprocessed[1]}). " + f"Newer files ARE being processed (this is FIFO-stuck, not " + f"a backlog). Check the verification-detector logs for " + f"this file's processing attempts." + ), + "stuck_file_age_seconds": int(age_s), + "stuck_file": oldest_unprocessed[1], + "session_files": len(session_files), + } + return {"status": "ok", "session_files": len(session_files)} diff --git a/docker-compose.yml b/docker-compose.yml index 97d6399..4ea3a49 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,11 @@ services: # storage_api.py:get_temp_root. Operators can override per # deployment via .env (or unset to fall back to system /tmp). - AGNES_TEMP_DIR=${AGNES_TEMP_DIR:-/data/tmp} + # /home/*/user/sessions/ doesn't exist in the Docker layout — skip + # the legacy session-collector to silence per-10-min "0 users, 0 files" + # + "Group 'data-ops' not found" log noise. The bare-VM deployment + # path leaves this unset and continues to scan + log normally. + - AGNES_SKIP_LEGACY_COLLECTOR=1 healthcheck: test: ["CMD", "curl", "-sf", "http://localhost:8000/api/health"] interval: 30s @@ -62,6 +67,11 @@ services: - AGNES_TEMP_DIR=${AGNES_TEMP_DIR:-/data/tmp} - API_URL=http://app:8000 - SEED_ADMIN_EMAIL=${SEED_ADMIN_EMAIL:-} + # Mirror the app service: the scheduler calls /api/admin/run-session-collector + # over HTTP rather than running the collector in-process, but if anything + # ever invokes the collector module from this container directly, we want + # the same skip behavior. Bare-VM path leaves this unset. + - AGNES_SKIP_LEGACY_COLLECTOR=1 depends_on: app: condition: service_healthy diff --git a/pyproject.toml b/pyproject.toml index 6cd859e..b182468 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.47.3" +version = "0.47.4" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/services/session_collector/collector.py b/services/session_collector/collector.py index add086b..1779eac 100644 --- a/services/session_collector/collector.py +++ b/services/session_collector/collector.py @@ -129,6 +129,25 @@ def run(dry_run: bool = False, verbose: bool = False) -> tuple[int, dict]: logger.info("Starting session transcript collection") + # Skip the legacy /home/*/user/sessions/ scan in deployment layouts that + # don't populate it (e.g. Docker compose, where Claude Code never lands + # session jsonls under /home). Without this, the scheduler's 10-min + # /api/admin/run-session-collector calls log "0 users, 0 files copied" + # plus a misleading "Group 'data-ops' not found" WARNING per run. + # Explicit env var only — no auto-detect: the bare-VM path *does* + # populate /home/*/, and the data-ops warning there is load-bearing + # for catching missing-group mis-deploys. + if os.environ.get("AGNES_SKIP_LEGACY_COLLECTOR", "").strip() in ("1", "true", "TRUE"): + logger.debug( + "AGNES_SKIP_LEGACY_COLLECTOR set; skipping legacy /home/*/user/sessions/ scan" + ) + return 0, { + "users_processed": 0, + "files_copied": 0, + "files_skipped": 0, + "skipped": True, + } + try: TARGET_BASE.mkdir(parents=True, exist_ok=True) os.chmod(TARGET_BASE, 0o2770) diff --git a/tests/test_health_session_pipeline.py b/tests/test_health_session_pipeline.py index b3a103d..053d746 100644 --- a/tests/test_health_session_pipeline.py +++ b/tests/test_health_session_pipeline.py @@ -106,3 +106,90 @@ class TestSessionPipelineHealthCheck: assert resp.status_code == 200 services = resp.json()["services"] assert services["session_pipeline"]["status"] == "warning" + + +class TestSessionPipelineFIFOCheck: + """FIFO check (#0.47.4): MAX-only comparison passes silently when + the verification-detector skips a particular file but keeps processing + newer ones. New code finds the oldest unprocessed file and surfaces it + as `info` once it's older than SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS + (default 4× verification-detector grace = 2h). + """ + + def test_fifo_check_warns_on_stuck_old_file_when_newer_was_processed(self, seeded_app): + """Old jsonl never got processed even though a newer one was — info.""" + env = seeded_app["env"] + # Old file (5h ago), NOT in extraction_state. + _make_session_file(env["data_dir"], "stuck_old.jsonl", mtime_ago_seconds=5 * 3600) + # Newer file (1min ago), processed_at=now → MAX-comparison says "ok". + _make_session_file(env["data_dir"], "fresh.jsonl", mtime_ago_seconds=60) + _seed_extraction_state( + datetime.now(timezone.utc), + session_file="x/fresh.jsonl", + ) + + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + body = resp.json() + services = body["services"] + sp = services["session_pipeline"] + assert sp["status"] == "info", f"expected info, got {sp}" + assert "skipped" in sp["detail"] + assert sp["stuck_file_age_seconds"] > 4 * 3600 + assert sp["stuck_file"].endswith("stuck_old.jsonl") + # Info MUST NOT promote the headline to degraded — that's the whole + # point of starting at info severity. + assert body["status"] != "degraded" + + def test_fifo_check_silent_when_old_file_under_threshold(self, seeded_app): + """Old file is 1h old (< 2h default threshold) — should return ok.""" + env = seeded_app["env"] + _make_session_file(env["data_dir"], "recent_unprocessed.jsonl", mtime_ago_seconds=3600) + _make_session_file(env["data_dir"], "fresh.jsonl", mtime_ago_seconds=60) + _seed_extraction_state( + datetime.now(timezone.utc), + session_file="x/fresh.jsonl", + ) + + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + services = resp.json()["services"] + assert services["session_pipeline"]["status"] == "ok" + + def test_fifo_check_silent_when_no_unprocessed_files(self, seeded_app): + """All FS jsonls are in extraction_state — should return ok.""" + env = seeded_app["env"] + _make_session_file(env["data_dir"], "processed.jsonl", mtime_ago_seconds=60) + _seed_extraction_state( + datetime.now(timezone.utc), + session_file="x/processed.jsonl", + ) + + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + services = resp.json()["services"] + assert services["session_pipeline"]["status"] == "ok" + + def test_fifo_check_threshold_env_override(self, seeded_app, monkeypatch): + """SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS=60 → 5min-old file triggers info.""" + monkeypatch.setenv("SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS", "60") + env = seeded_app["env"] + _make_session_file(env["data_dir"], "five_min_old.jsonl", mtime_ago_seconds=300) + _make_session_file(env["data_dir"], "fresh.jsonl", mtime_ago_seconds=10) + _seed_extraction_state( + datetime.now(timezone.utc), + session_file="x/fresh.jsonl", + ) + + c = seeded_app["client"] + resp = c.get("/api/health/detailed", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + body = resp.json() + services = body["services"] + sp = services["session_pipeline"] + assert sp["status"] == "info", f"expected info, got {sp}" + assert sp["stuck_file"].endswith("five_min_old.jsonl") + assert body["status"] != "degraded" diff --git a/tests/test_session_collector.py b/tests/test_session_collector.py index 989107e..c396639 100644 --- a/tests/test_session_collector.py +++ b/tests/test_session_collector.py @@ -142,3 +142,85 @@ class TestRunHelper: rc = collector.main() assert rc == 0 + + +class TestRunSkipEnvVar: + """AGNES_SKIP_LEGACY_COLLECTOR=1 short-circuits the run before any FS or + grp lookups. Used in the Docker layout where /home/*/user/sessions/ is + empty by design — keeps logs quiet without auto-detect logic that would + mask real bare-VM mis-deploys. + """ + + def test_collector_run_skips_when_env_set(self, monkeypatch, tmp_path): + """AGNES_SKIP_LEGACY_COLLECTOR=1 → return early with skipped=True.""" + from services.session_collector import collector + + monkeypatch.setenv("AGNES_SKIP_LEGACY_COLLECTOR", "1") + # Point TARGET_BASE at tmp_path so even if the skip didn't fire we + # wouldn't touch /data — but the assertion below is that mkdir + # was NOT called on it. + target = tmp_path / "user_sessions" + monkeypatch.setattr(collector, "TARGET_BASE", target) + + # If the skip didn't fire, find_user_home_dirs would be called. + called = [] + + def _spy(): + called.append(True) + return iter([]) + + monkeypatch.setattr(collector, "find_user_home_dirs", _spy) + + rc, stats = collector.run() + assert rc == 0 + assert stats.get("skipped") is True + assert stats["files_copied"] == 0 + assert stats["users_processed"] == 0 + assert stats["files_skipped"] == 0 + # Skip path must NOT touch the target directory or call into the + # /home scanner — those are exactly the operations we're avoiding. + assert not target.exists(), "TARGET_BASE.mkdir should not have run" + assert called == [], "find_user_home_dirs should not have been called" + + @pytest.mark.parametrize("val", ["1", "true", "TRUE"]) + def test_collector_run_skips_for_truthy_values(self, monkeypatch, tmp_path, val): + """The accepted truthy spellings are 1 / true / TRUE. Anything else + (including '0', 'false', 'yes') falls through to the normal pass.""" + from services.session_collector import collector + + monkeypatch.setenv("AGNES_SKIP_LEGACY_COLLECTOR", val) + monkeypatch.setattr(collector, "TARGET_BASE", tmp_path / "user_sessions") + monkeypatch.setattr(collector, "find_user_home_dirs", lambda: iter([])) + + rc, stats = collector.run() + assert rc == 0 + assert stats.get("skipped") is True + + def test_collector_run_full_pass_when_env_unset(self, monkeypatch, tmp_path): + """No env var → existing scan path runs (returns stats without 'skipped').""" + from services.session_collector import collector + + monkeypatch.delenv("AGNES_SKIP_LEGACY_COLLECTOR", raising=False) + target = tmp_path / "user_sessions" + monkeypatch.setattr(collector, "TARGET_BASE", target) + monkeypatch.setattr(collector, "find_user_home_dirs", lambda: iter([])) + + rc, stats = collector.run() + assert rc == 0 + # Bare-VM path: we ran, even if no users were scanned. + assert "skipped" not in stats + # mkdir should have happened. + assert target.exists() + + def test_collector_run_full_pass_for_falsy_values(self, monkeypatch, tmp_path): + """AGNES_SKIP_LEGACY_COLLECTOR='0' should NOT skip — only the explicit + truthy spellings short-circuit.""" + from services.session_collector import collector + + monkeypatch.setenv("AGNES_SKIP_LEGACY_COLLECTOR", "0") + monkeypatch.setattr(collector, "TARGET_BASE", tmp_path / "user_sessions") + monkeypatch.setattr(collector, "find_user_home_dirs", lambda: iter([])) + + rc, stats = collector.run() + assert rc == 0 + assert "skipped" not in stats