agnes-the-ai-analyst/services/session_collector/collector.py
ZdenekSrotyr cc1886c97c
release: 0.47.4 — Docker collector skip + FIFO session-pipeline check (#229)
## Summary

Two minimum-viable fixes after today's 0.44.0 → 0.47.3 release train and the production 30-user launch. Devil's advocate review of a 3-PR / 7-item plan cut scope to these 2 — the rest is deferred to a separate "operate-first, instrument-second" backlog item.

### B2 — Docker session_collector log skip

`services/session_collector` was logging `Collection complete: 0 users, 0 files copied` + `WARNING: Group 'data-ops' not found, using default group` every 10 minutes in the Docker layout (where `/home/*/user/sessions/` doesn't exist). New env var `AGNES_SKIP_LEGACY_COLLECTOR=1` set by default in `docker-compose.yml` short-circuits the collector pass.

The bare-VM deployment path (where /home/* IS populated by Claude Code) leaves the env var unset and continues to scan normally — including the data-ops warning, which is load-bearing for catching missing-group mis-deploys.

### O2 — FIFO check in `_check_session_pipeline`

The existing check compares `MAX(processed_at)` to newest jsonl mtime — catches "detector hasn't run lately" but blind to "old file was skipped while newer ones were processed". New code finds the oldest FS jsonl that's NOT in `session_extraction_state.session_file` and flags if its mtime is older than `SESSION_PIPELINE_STUCK_FILE_GRACE_SECONDS` (default 4× the existing grace = 2h).

Severity intentionally starts at `info` so we can collect prod data on false-positive rate before tightening to `warning`. The aggregator already treats `info` as non-promoting (see the severity vocabulary docstring at the top of `app/api/health.py`), so the headline `status` stays at `healthy` even when this fires — the operator sees the entry in the per-check breakdown but no spurious `degraded` overall.

## Test plan

- [x] `pytest tests/test_session_collector.py` — 17 tests pass (existing 9 + new 8 covering env-set/unset, truthy variants, falsy non-skip).
- [x] `pytest tests/test_health_session_pipeline.py` — 8 tests pass (existing 4 + new 4 FIFO tests covering stuck-file, under-threshold, all-processed, env-override).
<!-- devin-review-badge-begin -->

---

<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/229" target="_blank">
  <picture>
    <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
    <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
  </picture>
</a>
<!-- devin-review-badge-end -->
2026-05-08 09:38:21 +02:00

214 lines
7 KiB
Python

#!/usr/bin/env python3
"""Collect Claude Code session transcripts from all user home directories.
This script runs as a systemd service (session-collector.service) triggered by
session-collector.timer. It scans all /home/*/user/sessions/ directories and
copies session transcript files to /data/user_sessions/$user/ for centralized
storage and analysis.
Design principles:
- Must run as root (or user with read access to all /home/*)
- Preserves file metadata (timestamps, permissions)
- Idempotent - safe to run multiple times (skips existing files)
- Atomic operations - uses tempfile + os.replace for safety
- Logs to stdout (captured by journalctl)
TODO(scheduler-v2): In docker-compose.yml this service is a one-shot process
restarted by Docker (`restart: unless-stopped`), which is effectively a tight
boot loop. Replace with proper cadence: either an internal `while True: scan;
sleep(N)` loop, or wire into services/scheduler/__main__.py JOBS list with an
admin endpoint /api/admin/collect-sessions.
"""
import logging
import os
import shutil
import sys
from pathlib import Path
from typing import Iterator
from app.logging_config import setup_logging
# Central storage for session transcripts
TARGET_BASE = Path("/data/user_sessions")
# Directory to scan for sessions in each user home
USER_SESSIONS_DIR = "user/sessions"
setup_logging(__name__)
logger = logging.getLogger(__name__)
def find_user_home_dirs() -> Iterator[Path]:
"""Yield all user home directories from /home/*."""
home_base = Path("/home")
if not home_base.exists():
logger.warning(f"{home_base} does not exist")
return
for entry in home_base.iterdir():
if entry.is_dir() and not entry.name.startswith("."):
yield entry
def find_session_files(user_home: Path) -> Iterator[Path]:
"""Yield all session JSONL files from user's sessions directory."""
sessions_dir = user_home / USER_SESSIONS_DIR
if not sessions_dir.exists():
return
try:
for entry in sessions_dir.iterdir():
if entry.is_file() and entry.suffix == ".jsonl":
yield entry
except PermissionError:
logger.warning(f"Permission denied reading {sessions_dir}")
except Exception as e:
logger.error(f"Error scanning {sessions_dir}: {e}")
def copy_session_file(source: Path, target: Path, dry_run: bool = False) -> bool:
"""Copy session file to target location, preserving metadata.
Returns True if file was copied, False if skipped (already exists).
"""
if target.exists():
# Already collected, skip
return False
if dry_run:
logger.info(f"[DRY-RUN] Would copy: {source} -> {target}")
return True
try:
# Ensure target directory exists
target.parent.mkdir(parents=True, exist_ok=True)
# Copy with metadata preserved
shutil.copy2(source, target)
logger.info(f"Collected: {source} -> {target}")
return True
except Exception as e:
logger.error(f"Failed to copy {source} to {target}: {e}")
return False
def collect_user_sessions(username: str, user_home: Path, dry_run: bool = False) -> tuple[int, int]:
"""Collect all session files for a user.
Returns tuple (files_copied, files_skipped).
"""
target_dir = TARGET_BASE / username
copied = 0
skipped = 0
for session_file in find_session_files(user_home):
target_path = target_dir / session_file.name
if copy_session_file(session_file, target_path, dry_run=dry_run):
copied += 1
else:
skipped += 1
return copied, skipped
def run(dry_run: bool = False, verbose: bool = False) -> tuple[int, dict]:
"""Run the session-collector pass. Returns (exit_code, stats).
Argv-free so callers (FastAPI admin endpoint, scheduler) can invoke
without inheriting uvicorn's sys.argv — argparse here would
SystemExit(2) when uvicorn's flags hit it.
stats keys: users_processed, files_copied, files_skipped.
"""
import grp
if verbose:
logger.setLevel(logging.DEBUG)
logger.info("Starting session transcript collection")
# Skip the legacy /home/*/user/sessions/ scan in deployment layouts that
# don't populate it (e.g. Docker compose, where Claude Code never lands
# session jsonls under /home). Without this, the scheduler's 10-min
# /api/admin/run-session-collector calls log "0 users, 0 files copied"
# plus a misleading "Group 'data-ops' not found" WARNING per run.
# Explicit env var only — no auto-detect: the bare-VM path *does*
# populate /home/*/, and the data-ops warning there is load-bearing
# for catching missing-group mis-deploys.
if os.environ.get("AGNES_SKIP_LEGACY_COLLECTOR", "").strip() in ("1", "true", "TRUE"):
logger.debug(
"AGNES_SKIP_LEGACY_COLLECTOR set; skipping legacy /home/*/user/sessions/ scan"
)
return 0, {
"users_processed": 0,
"files_copied": 0,
"files_skipped": 0,
"skipped": True,
}
try:
TARGET_BASE.mkdir(parents=True, exist_ok=True)
os.chmod(TARGET_BASE, 0o2770)
try:
dataops_gid = grp.getgrnam("data-ops").gr_gid
os.chown(TARGET_BASE, -1, dataops_gid)
except KeyError:
logger.warning("Group 'data-ops' not found, using default group")
except Exception as e:
logger.warning(f"Could not set group ownership: {e}")
except Exception as e:
logger.error(f"Failed to create target directory {TARGET_BASE}: {e}")
return 1, {"users_processed": 0, "files_copied": 0, "files_skipped": 0}
total_copied = 0
total_skipped = 0
users_processed = 0
for user_home in find_user_home_dirs():
username = user_home.name
try:
uid = user_home.stat().st_uid
if uid < 1000:
continue
except Exception:
continue
copied, skipped = collect_user_sessions(username, user_home, dry_run=dry_run)
if copied > 0 or skipped > 0:
users_processed += 1
total_copied += copied
total_skipped += skipped
logger.info(f"User {username}: {copied} copied, {skipped} skipped")
logger.info(
f"Collection complete: {users_processed} users, {total_copied} files copied, {total_skipped} files skipped"
)
return 0, {
"users_processed": users_processed,
"files_copied": total_copied,
"files_skipped": total_skipped,
}
def main() -> int:
"""CLI entry point. Parses argv, delegates to run()."""
import argparse
parser = argparse.ArgumentParser(description="Collect Claude Code session transcripts from all users")
parser.add_argument("--dry-run", action="store_true", help="Preview what would be copied without actually copying")
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose output")
args = parser.parse_args()
rc, _ = run(dry_run=args.dry_run, verbose=args.verbose)
return rc
if __name__ == "__main__":
sys.exit(main())