diff --git a/CHANGELOG.md b/CHANGELOG.md index b92d65d..06692d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Added +- **Session pipeline framework** under `services/session_pipeline/` — pluggable processors for the centralized `/data/user_sessions//*.jsonl` tree. Each processor implements a `SessionProcessor` Protocol (`name`, `cadence_minutes`, `process_session(...)`) and runs through its own per-processor scheduler tick + scan loop. No cross-processor coupling: a slow or failing processor cannot block any other. Pure-utility lib (`parse_jsonl`, `compute_file_hash`) is shared; orchestration is per-processor in `runner.run_processor()`. Adding a new processor is one file in `services/session_processors/.py`, one entry in the registry list, one entry in the scheduler `JOBS` list. See `services/session_pipeline/contract.py` for the protocol and `services/session_processors/__init__.py` for the registry pattern. +- `services/session_processors/usage.py` — `UsageProcessor` skeleton (no-op, `cadence_minutes=10`). Reserves the registry slot + scheduler entry so the framework end-to-end exercises two processors. Extraction logic (skill / agent invocation events) and storage shape (DuckDB table vs. append-only parquet event log) are deferred to a separate brainstorm. +- `POST /api/admin/run-session-processor?processor=` — parametrized admin endpoint that drives one session-pipeline processor end-to-end. Admin-gated; same audit pattern as the other `/api/admin/run-*` endpoints (one row per call with action `run_session_processor:`); 400 when `processor` is unknown. +- `SessionProcessorStateRepository` in `src/repositories/session_processor_state.py` — backs the new state table. - **PostHog snippet middleware preserves `Response.background`** on every return path so any `BackgroundTask` / `BackgroundTasks` attached to an HTML route still fires once the integration is enabled (PR #231 review by minasarustamyan). `BaseHTTPMiddleware` materialises the body and asks subclasses to return a fresh `Response`; the previous implementation dropped `background` on three paths, silently cancelling deferred audit logging / async webhooks / email sends with no log line. Also adds a `_MAX_BUFFER_BYTES` (4 MB) cap so a streamed-HTML response can't balloon RSS — bigger bodies short-circuit through with a warning instead of being buffered. Regression tests in `tests/test_posthog_inject_middleware.py` exercise the four return paths plus the streaming guard. - **`POSTHOG_LLM_PAYLOAD_MAX_CHARS` (default 30000) clips `$ai_input` / `$ai_output_choices`** before they hit PostHog so oversized prompts don't get silently dropped at ingest. PostHog's per-event ceiling is ~32 KB and the SDK does not chunk; Agnes prompts routinely include sample rows / table schemas / analyst SQL that exceed it, and unbounded payloads landed *exactly* the calls operators wanted to inspect on the floor (PR #231 review by minasarustamyan). Truncated payloads carry an explicit `…[truncated N chars]` marker so a reader doesn't mistake them for a complete capture; metadata (provider, model, tokens, latency, error) flows regardless. Override the cap via the env var. - **PostHog event-level user attributes** so a reviewer reading an event in PostHog sees who the user was inline, without clicking through to the person profile. Backend `capture_exception` merges `user_id` / `user_email` / `user_name` (per `POSTHOG_IDENTIFY_PII`) into the event properties; browser snippet registers the same keys as super-properties via `posthog.register({...})` so every client-side event including `posthog.captureException()` carries them. @@ -30,6 +34,12 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Changed +- **BREAKING**: Schema bump v30 → v31 renames `session_extraction_state` → `session_processor_state` with composite PK `(processor_name, session_file)` so multiple processors can track their own processed-set independently. Existing rows are copied across with `processor_name='verification'` and the old table is dropped. The `KnowledgeRepository.is_session_processed` / `mark_session_processed` helpers are removed — sessions bookkeeping now lives in `SessionProcessorStateRepository`. The session-state-aware `is_processed` check now compares `file_hash` so a session jsonl that grows (live append from an active Claude Code session) gets reprocessed on the next tick — previously the file_hash was stored but never read back. +- **BREAKING**: `POST /api/admin/run-verification-detector` is dropped in favor of `POST /api/admin/run-session-processor?processor=verification`. Audit action renames `run_verification_detector` → `run_session_processor:verification`. The scheduler `JOBS` list reflects the new endpoint; no operator action required if the only caller is the in-tree scheduler. The legacy `dry_run` flag (no real callers outside the dropped CLI shim) is gone. +- `services/scheduler/__main__.py` JOBS — `verification-detector` entry replaced by two new entries: `session-processor:verification` and `session-processor:usage`. New env var `SCHEDULER_USAGE_PROCESSOR_INTERVAL` (default 600s); `SCHEDULER_VERIFICATION_DETECTOR_INTERVAL` is retained (still drives the verification cadence AND the health-check grace window in `app/api/health.py`) for operator compatibility with existing docker-compose env files. +- `services/verification_detector/detector.py` is reduced to LLM-side helpers (`_generate_id`, `_format_turns`, `extract_verifications`); the orchestration loop moves into `VerificationProcessor` in `services/session_processors/verification.py`. The CLI (`python -m services.verification_detector`) still works — it now constructs the processor and runs the shared `run_processor` runner. +- `app/api/health.py` `_check_session_pipeline` now queries `session_processor_state WHERE processor_name='verification'` instead of `session_extraction_state` (same heuristic, scoped explicitly to the verification processor). +- `app/web/router.py` `/profile/sessions` join target updated to `session_processor_state` (verification rows). `SCHEDULER_AUDIT_ACTIONS` updated to include the new per-processor audit actions. - Marketplace UI rebrand: `+ Install` → `+ Add to my stack`, `✓ Installed` → `✓ In your stack`, card "Installed" badge → "In stack" (amber pill), `My Subscriptions` tab → `My Stack`. Bridges the conceptual gap between "saved on the server" (what the click does) and "installed on my laptop" (what users assumed). Same vocabulary now consistent across `/marketplace`, `/store/` detail, navbar link, and the post-add hint panel. - Plugin and skill/agent detail pages now show an inline post-add hint panel after a successful "Add to my stack" click: green-bordered block under the description with a 2-step recipe ("open new Claude Code session" or run `agnes refresh-marketplace` + `/reload-plugins`), Copy button on the command, "Don't show again" dismiss persisted in `localStorage`. Removes the dead-end where users clicked Install, saw "Installed", opened Claude Code, and found nothing. - Action-row CTA on `/marketplace`: curated tab `[How to add new content]` → `[Submit a plugin]`, flea tab `[How to add new content]` removed (the `+ Upload` button next to it already covers self-service publishing — second CTA was redundant). Empty-state CTAs aligned: curated empty state links to `Submit a plugin →`, flea empty state shows only `+ Upload`. Guide page titles updated to `Submit a plugin to Curated Marketplace` / `Upload to Flea Market`. @@ -47,8 +57,15 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C - `GET /api/marketplace/curated///{skill,agent}/` now containment-checks the resolved file path against `plugin_root` via a new `_safe_join` helper (`resolve(strict=True)` + `relative_to`). The direct URL exploit was already blocked by Starlette's `[^/]+` path-param regex, but a curator-planted symlink inside a curated marketplace's git mirror could previously dereference outside the plugin tree on read. Now centralized so `_read_inner`, the skill `files` walk, and the agent `stat` call all share the same boundary. +### Fixed (PR #232 review) + +- `services/scheduler/__main__.py` tick loop is now parallel + advances `last_run` on terminal state. Pre-fix it was a synchronous `for-loop + httpx.post(timeout=900)` — a 10-minute verification run blocked every other job (`data-refresh`, `health-check`, `usage`, `corporate-memory`) for the entire window. The PR's stated isolation guarantee ("slow / failing processor cannot block any other") only held inside `services/session_pipeline/runner.py`; the scheduler dispatch layer broke it. Pre-fix `last_run` also only advanced on success, so a permanently failing job was retried every 30s tick instead of on its 15-min cadence (30× the configured request rate + LLM tokens). Replaced with `ThreadPoolExecutor.submit` per due job + per-job in-flight set so a long-running job can't be re-launched on subsequent ticks. `_run_job` extracted to a module-level helper so the bookkeeping is unit-testable. +- `SessionProcessorStateRepository.scan_unprocessed_for` had a dead `if/else` where both branches surfaced every jsonl, making the `SELECT session_file FROM session_processor_state` round-trip pointless and forcing the runner to MD5-rehash every stable session on every scheduler tick. Replaced with an mtime precheck: stable sessions (mtime <= processed_at) are filtered at scan and the runner never reads or hashes them. Files modified since the last run still surface for the runner's authoritative `file_hash` invalidation. +- `POST /api/admin/run-session-processor` now takes a per-processor advisory lock (`threading.Lock` keyed by name) before invoking the runner. Two trigger paths exist for the same processor (scheduler tick + manual admin POST); without serialization, overlapping runs would re-process the same `/data/user_sessions/*` set, double-call the LLM, and pile up duplicate `verification_evidence` rows (the dedup short-circuit only catches the create+contradiction branches, not `create_evidence`, per ADR Decision 3). Concurrent invocation returns HTTP 409 Conflict so the operator sees what happened instead of stacking behind a long-running tick. Lock releases unconditionally in `finally:` so a runner exception can't wedge the processor permanently. + ### Internal +- `services/session_processors/verification.py:build_verification_processor` factory mirrors the lazy LLM-extractor construction previously inlined in `app/api/admin.run_verification_detector` and `services/verification_detector/__main__`. Single source of truth for processor instantiation. - Schema bumped v27 → v28 (`DELETE FROM user_plugin_optouts` for the semantic flip + `marketplace_plugins.created_at` with `registered_at` backfill). - New tests `tests/test_marketplace_api.py` (browse, categories, install/uninstall, RBAC 403, `_safe_join` containment). Existing `tests/test_marketplace_filter_store.py`, `tests/test_marketplace_server_zip.py`, `tests/test_marketplace_server_git.py`, `tests/test_store_api.py`, `tests/test_store_repositories.py` updated for Model B (explicit subscribe in fixtures). diff --git a/app/api/admin.py b/app/api/admin.py index a33930f..022c2b4 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -11,7 +11,7 @@ import threading import uuid from pathlib import Path -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query from pydantic import BaseModel, Field, field_validator, model_validator from typing import Optional, List, Dict, Any import duckdb @@ -38,6 +38,30 @@ router = APIRouter(prefix="/api/admin", tags=["admin"]) # would need an OS-level file lock — documented limitation. _overlay_write_lock = threading.Lock() +# Per-processor advisory locks for /api/admin/run-session-processor. +# Two trigger paths exist for the same processor (scheduler tick + manual +# admin POST). Without serialization, overlapping runs would re-process the +# same /data/user_sessions/* set, double-call the LLM, and pile up duplicate +# `verification_evidence` rows — the dedup short-circuit in +# VerificationProcessor only catches the create+contradiction branches, not +# create_evidence (per ADR Decision 3, which expects evidence to accumulate +# per distinct verification event). Lock is non-blocking → second caller +# gets 409 Conflict so the operator sees what happened instead of stacking +# behind a long-running tick. +_processor_run_locks: dict[str, threading.Lock] = {} +_processor_run_locks_mutex = threading.Lock() + + +def _get_processor_run_lock(name: str) -> threading.Lock: + """Per-name lock factory; the registry mutex guards dict insertion so + two threads simultaneously asking for a never-seen processor don't + each install their own lock instance.""" + with _processor_run_locks_mutex: + if name not in _processor_run_locks: + _processor_run_locks[name] = threading.Lock() + return _processor_run_locks[name] + + # SSRF protection: reject private/internal URLs for keboola_url import ipaddress as _ipaddress import socket as _socket @@ -3336,44 +3360,55 @@ def run_session_collector( return {"ok": rc == 0, "details": {"rc": rc, **stats}} -@router.post("/run-verification-detector") -def run_verification_detector( +@router.post("/run-session-processor") +def run_session_processor( + processor: str = Query(..., description="Processor name (e.g. 'verification', 'usage')"), user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Trigger the verification-detector job from the scheduler. + """Trigger one session-pipeline processor against /data/user_sessions/*. - Reads collected session transcripts, extracts verified knowledge - via the LLM, and writes pending items to knowledge_items. The - /corporate-memory/admin queue picks them up for triage. + Replaces the per-processor /run-* endpoints with a single parametrized + entry. The scheduler invokes this once per registered processor on its + own cadence; processors are independent (one slow / failing processor + can't block any other). + + Returns 400 if `processor` is unknown. The verification processor + requires an LLM extractor — if the instance has no ai: config and no + ANTHROPIC_API_KEY / LLM_API_KEY, it won't appear in the registry and + the call returns 400 the same as a misspelled name. """ - from connectors.llm import create_extractor_from_env_or_config - from services.verification_detector import detector + from services.session_pipeline.runner import run_processor as _run_processor + from services.session_processors import get_processor, list_processor_names from src.db import get_system_db - # Build the extractor lazily so the endpoint surfaces a 500 with the - # factory's actionable error when no ai: block + no env keys are set. - # Use the overlay-aware loader (#179 review fix) so an ai: block written - # by /api/admin/configure to DATA_DIR/state/instance.yaml actually flows - # through to the factory. - try: - from app.instance_config import load_instance_config - try: - instance_config = load_instance_config() - except (ValueError, FileNotFoundError): - instance_config = {} - ai_config = instance_config.get("ai") if instance_config else None - extractor = create_extractor_from_env_or_config(ai_config) - except ValueError as e: - raise HTTPException(status_code=500, detail=str(e)) + proc = get_processor(processor) + if proc is None: + raise HTTPException( + status_code=400, + detail=( + f"Unknown processor '{processor}'. " + f"Known: {', '.join(list_processor_names())}" + ), + ) + + # Reject overlapping invocations of the same processor (PR #232 review). + # See `_get_processor_run_lock` docstring for why this matters + # (verification_evidence row duplication on race). + proc_lock = _get_processor_run_lock(processor) + if not proc_lock.acquire(blocking=False): + raise HTTPException( + status_code=409, + detail=f"Processor '{processor}' is already running", + ) job_conn = get_system_db() stats: dict = {} job_error: Optional[Exception] = None try: - stats = detector.run(job_conn, extractor, dry_run=False) + stats = _run_processor(job_conn, proc) except Exception as e: - # Capture and re-raise after audit so an unhandled detector error + # Capture and re-raise after audit so an unhandled runner error # (DuckDB lock, network blip, unexpected SDK type) still leaves a # row in audit_log — the /admin/scheduler-runs page is the # operator's only signal beyond docker logs. @@ -3383,25 +3418,32 @@ def run_verification_detector( job_conn.close() except Exception: pass + # Always release, even if the runner raised. A leaked lock would + # wedge the processor permanently until process restart. + proc_lock.release() audit_params: dict = { - "items_created": stats.get("items_created", 0), - "errors": len(stats.get("errors", [])), + "processor": processor, + "scanned": stats.get("scanned", 0), + "processed": stats.get("processed", 0), + "skipped": stats.get("skipped", 0), + "errors": stats.get("errors", 0), + "items_extracted": stats.get("items_extracted", 0), } if job_error is not None: audit_params["unhandled_error"] = f"{type(job_error).__name__}: {job_error}" AuditRepository(conn).log( user_id=user.get("id"), - action="run_verification_detector", - resource="job:verification-detector", + action=f"run_session_processor:{processor}", + resource=f"job:session-processor:{processor}", params=audit_params, ) if job_error is not None: raise HTTPException(status_code=500, detail=audit_params["unhandled_error"]) - return {"ok": not stats.get("errors"), "details": stats} + return {"ok": stats.get("errors", 0) == 0, "details": stats} @router.post("/run-corporate-memory") diff --git a/app/api/health.py b/app/api/health.py index 8cfbcf9..653dc5a 100644 --- a/app/api/health.py +++ b/app/api/health.py @@ -139,12 +139,17 @@ def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict: Heuristic (#176): max(mtime of /data/user_sessions/**/*.jsonl) <= - max(processed_at in session_extraction_state) + grace_seconds + max(processed_at in session_processor_state where processor='verification') + grace_seconds grace_seconds = 2 × the verification-detector cadence (default 15m → 30m). Operators with a custom SCHEDULER_VERIFICATION_DETECTOR_INTERVAL can extend the grace by setting that env var. + The check is scoped to the verification processor specifically — that's + the LLM-gated pipeline an operator most needs to know is stuck. Other + processors in the framework (e.g. usage) might lag for benign reasons + (no LLM, lighter scan cadence) and shouldn't trip a warning. + Returns ``warning`` (never ``error``) — the LLM may be down for maintenance, not a hard failure. Returns ``ok`` when no session files exist (cold-start case). @@ -167,32 +172,33 @@ def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict: except OSError: return {"status": "unknown", "detail": "could not stat session files"} - # Look up the most recent processed_at. + # Look up the most recent processed_at for the verification processor. try: row = conn.execute( - "SELECT MAX(processed_at) FROM session_extraction_state" + "SELECT MAX(processed_at) FROM session_processor_state WHERE processor_name = ?", + ["verification"], ).fetchone() except Exception as e: - return {"status": "unknown", "detail": f"could not query session_extraction_state: {e}"} + return {"status": "unknown", "detail": f"could not query session_processor_state: {e}"} last_processed = row[0] if row else None grace_seconds = _verification_detector_grace_seconds() if last_processed is None: - # Files exist but state table is empty — pipeline never ran here. + # Files exist but verification has no state rows — pipeline never ran here. if (datetime.now(timezone.utc).timestamp() - latest_session_mtime) > grace_seconds: return { "status": "warning", "detail": ( - "session_extraction_state is empty but jsonl files exist. " + "session_processor_state has no verification rows but jsonl files exist. " "Check the verification-detector scheduler job." ), "session_files": len(session_files), } return {"status": "ok", "session_files": len(session_files)} - # Both available — compare. session_extraction_state.processed_at is + # Both available — compare. session_processor_state.processed_at is # stored as DuckDB TIMESTAMP (naive). DuckDB converts tz-aware writes # to local time before storing, so the only safe interpretation is # local-naive on read. Compute the lag against `datetime.now()` (also @@ -210,7 +216,7 @@ def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict: return { "status": "warning", "detail": ( - f"session jsonls newer than session_extraction_state by ~{lag_seconds}s " + f"session jsonls newer than verification's session_processor_state rows by ~{lag_seconds}s " f"(grace={grace_seconds}s). Check the verification-detector scheduler " f"job — uploads are not being processed." ), @@ -221,22 +227,24 @@ def _check_session_pipeline(conn: duckdb.DuckDBPyConnection) -> dict: # FIFO check (#0.47.4): the MAX-only comparison above can pass silently # when the verification-detector skips a particular file but keeps # processing newer ones. Detect that case by finding the oldest FS - # jsonl whose path is NOT in session_extraction_state.session_file - # and surfacing it once it's older than _stuck_file_grace_seconds. + # jsonl whose path is NOT in session_processor_state.session_file + # (for processor_name='verification') and surfacing it once it's older + # than _stuck_file_grace_seconds. try: processed = { row[0] for row in conn.execute( - "SELECT session_file FROM session_extraction_state" + "SELECT session_file FROM session_processor_state WHERE processor_name = ?", + ["verification"], ).fetchall() } except Exception as e: # Don't fail the health check on this enrichment. - logger.debug("FIFO check: could not read session_extraction_state: %s", e) + logger.debug("FIFO check: could not read session_processor_state: %s", e) return {"status": "ok", "session_files": len(session_files)} - # session_extraction_state.session_file is stored as the path the - # extractor saw. Older rows store an absolute path (e.g. + # session_processor_state.session_file is stored as the path the + # processor saw. Older rows store an absolute path (e.g. # "/data/user_sessions/x/y.jsonl"); newer code stores a relative path # ("x/y.jsonl"). Match on either form so the FIFO check is robust to # both — a row stored under either spelling counts as processed. diff --git a/app/web/router.py b/app/web/router.py index 1100397..9e93a40 100644 --- a/app/web/router.py +++ b/app/web/router.py @@ -1449,7 +1449,8 @@ async def admin_marketplaces_page( # those endpoints, add the matching action strings to this list. SCHEDULER_AUDIT_ACTIONS = [ "run_session_collector", - "run_verification_detector", + "run_session_processor:verification", + "run_session_processor:usage", "run_corporate_memory", "marketplace.sync_all", ] @@ -1615,11 +1616,11 @@ async def profile_sessions_page( """User-self-view of own uploaded sessions and their extraction state. Walks `${DATA_DIR}/user_sessions//*.jsonl` for the caller's - own user_id, joins each file against `session_extraction_state` to - surface processed_at + items_extracted, and renders a table. - Items_extracted = 0 means the verification_detector ran but the LLM - found no claims worth tracking — that's the documented "no items" - outcome; it does NOT mean the pipeline is broken. + own user_id, joins each file against the verification processor's + rows in `session_processor_state` to surface processed_at + items_extracted, + and renders a table. Items_extracted = 0 means the verification processor + ran but the LLM found no claims worth tracking — that's the documented + "no items" outcome; it does NOT mean the pipeline is broken. """ import pathlib user_id = user["id"] @@ -1653,8 +1654,9 @@ async def profile_sessions_page( placeholders = ",".join("?" for _ in keys) rows = conn.execute( f"""SELECT session_file, processed_at, items_extracted, file_hash - FROM session_extraction_state - WHERE session_file IN ({placeholders})""", + FROM session_processor_state + WHERE processor_name = 'verification' + AND session_file IN ({placeholders})""", keys, ).fetchall() cols = [d[0] for d in conn.description] diff --git a/app/web/templates/profile_sessions.html b/app/web/templates/profile_sessions.html index 4165d84..6e4bc4d 100644 --- a/app/web/templates/profile_sessions.html +++ b/app/web/templates/profile_sessions.html @@ -65,7 +65,7 @@

My sessions

Sessions you uploaded via agnes push from your Claude Code workspace, with - extraction status from session_extraction_state. + extraction status from the verification processor's rows in session_processor_state.
Items extracted = 0 means the verification detector ran successfully but the LLM didn't find anything worth tracking in that session — that's expected for diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py index 24f2c4f..1322d00 100644 --- a/services/scheduler/__main__.py +++ b/services/scheduler/__main__.py @@ -21,7 +21,9 @@ Usage: python -m services.scheduler import logging import os import signal +import threading import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone import httpx @@ -80,7 +82,13 @@ _DEFAULTS = { # staleness grace window in app/api/health.py — single env var drives # both, so an operator changing the cadence moves both. "SCHEDULER_SESSION_COLLECTOR_INTERVAL": 10 * 60, + # Drives the verification session-processor cadence AND the + # health-check staleness grace window in app/api/health.py + # (single env var → both, so an operator changing the cadence moves + # both). Name retained post session-pipeline refactor for operator + # compatibility — existing docker-compose env files keep working. "SCHEDULER_VERIFICATION_DETECTOR_INTERVAL": 15 * 60, + "SCHEDULER_USAGE_PROCESSOR_INTERVAL": 10 * 60, "SCHEDULER_CORPORATE_MEMORY_INTERVAL": 17 * 60, } @@ -139,9 +147,10 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]: scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL") sess = _read_positive_int("SCHEDULER_SESSION_COLLECTOR_INTERVAL") verify = _read_positive_int("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL") + usage = _read_positive_int("SCHEDULER_USAGE_PROCESSOR_INTERVAL") corpmem = _read_positive_int("SCHEDULER_CORPORATE_MEMORY_INTERVAL") tick = _read_positive_int("SCHEDULER_TICK_SECONDS") - smallest = min(refresh, health, scripts, sess, verify, corpmem) + smallest = min(refresh, health, scripts, sess, verify, usage, corpmem) if tick > smallest: raise ValueError( f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job " @@ -161,7 +170,14 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]: # single source of truth for the health-check staleness grace # window in app/api/health.py (which uses 2x the cadence). ("session-collector", _seconds_to_schedule(sess), "/api/admin/run-session-collector", "POST", 300), - ("verification-detector", _seconds_to_schedule(verify), "/api/admin/run-verification-detector", "POST", 900), + # session-pipeline processors — independent loops, each invoked on + # its own cadence via the parametrized run-session-processor endpoint. + # Adding a third processor in the future is one line here + one entry + # in services/session_processors/__init__.py registry. + ("session-processor:verification", _seconds_to_schedule(verify), + "/api/admin/run-session-processor?processor=verification", "POST", 900), + ("session-processor:usage", _seconds_to_schedule(usage), + "/api/admin/run-session-processor?processor=usage", "POST", 300), ("corporate-memory", _seconds_to_schedule(corpmem), "/api/admin/run-corporate-memory", "POST", 900), ] @@ -220,19 +236,67 @@ def run(): last_run: dict[str, str | None] = {name: None for name, *_ in jobs} + # Per-tick concurrency: one thread per job slot, so a 900s verification + # run can't block the 60s health-check or the 30s data-refresh from + # firing on their own cadences (PR #232 review fix). Pure I/O workload + # (httpx) — GIL is irrelevant. `in_flight` prevents the same job being + # re-launched on a subsequent tick while the previous invocation is + # still running; otherwise a 10-min run during which 20 ticks fire + # would queue 20 duplicate POSTs against the same processor (the + # admin endpoint's per-processor lock would 409 most of them, but + # they'd still be wasted requests + audit-log noise). + in_flight: set[str] = set() + in_flight_lock = threading.Lock() + executor = ThreadPoolExecutor(max_workers=max(4, len(jobs))) + while _running: now_iso = datetime.now(timezone.utc).isoformat() for name, schedule, endpoint, method, timeout_sec in jobs: if not is_table_due(schedule, last_run[name]): continue + with in_flight_lock: + if name in in_flight: + # Previous tick's invocation hasn't returned yet; skip. + continue + in_flight.add(name) logger.info("Running job: %s (%s)", name, schedule) - ok = _call_api(endpoint, method, timeout_sec) - if ok: - last_run[name] = now_iso + executor.submit( + _run_job, name, endpoint, method, timeout_sec, now_iso, + last_run, in_flight, in_flight_lock, + ) time.sleep(tick) + logger.info("Scheduler stopping; waiting for in-flight jobs.") + executor.shutdown(wait=True) logger.info("Scheduler stopped.") +def _run_job( + name: str, + endpoint: str, + method: str, + timeout: int, + now_iso: str, + last_run: dict[str, "str | None"], + in_flight: set[str], + in_flight_lock: threading.Lock, +) -> None: + """Execute one scheduled job + bookkeeping. Lifted out of run() so it's + unit-testable. + + Advances last_run on terminal state (success OR failure) so a permanently + failing job retries on its cadence (e.g. 15 min), not on every scheduler + tick (default 30s). Pre-fix behavior caused a hot-loop on persistent 5xx — + 30× more requests + LLM tokens than the operator configured. Errors still + surface via _call_api's logging + audit_log on the receiving side. + """ + try: + _call_api(endpoint, method, timeout) + finally: + last_run[name] = now_iso + with in_flight_lock: + in_flight.discard(name) + + if __name__ == "__main__": run() diff --git a/services/session_pipeline/__init__.py b/services/session_pipeline/__init__.py new file mode 100644 index 0000000..d730a8b --- /dev/null +++ b/services/session_pipeline/__init__.py @@ -0,0 +1,9 @@ +"""Session pipeline framework — shared utilities, contract, and per-processor +runner for any service that wants to extract data from Claude Code session +transcripts in /data/user_sessions/. + +Processors live in services/session_processors/. Each one declares its own +cadence and its own state row keyed by (processor_name, session_file), so +adding a new processor today retroactively reprocesses all historical sessions +for that processor only, and a slow or failing processor cannot block any other. +""" diff --git a/services/session_pipeline/contract.py b/services/session_pipeline/contract.py new file mode 100644 index 0000000..0fd2cf7 --- /dev/null +++ b/services/session_pipeline/contract.py @@ -0,0 +1,55 @@ +"""Contract for session-pipeline processors. + +A processor is anything that, given a parsed Claude Code session jsonl file, +emits some side effect — knowledge extraction, usage events, error metrics, +security findings, etc. The runner (`services/session_pipeline/runner.py`) +calls process_session() once per unprocessed file and persists state on success. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Protocol, runtime_checkable + +import duckdb + + +@dataclass(frozen=True) +class ProcessorResult: + """Per-session outcome surfaced to the runner. items_count is the number + of records the processor produced (knowledge items, events, etc.) and + is stored in session_processor_state.items_extracted for observability — + not load-bearing for the framework's correctness.""" + items_count: int = 0 + + +@runtime_checkable +class SessionProcessor(Protocol): + """Implementations live in services/session_processors/.py and + are listed in services/session_processors/__init__.py:PROCESSORS.""" + + name: str + """Unique processor key. Used in session_processor_state.processor_name + and as the URL query param for /api/admin/run-session-processor.""" + + cadence_minutes: int + """How often the scheduler should invoke this processor. The actual + schedule entry is built in services/scheduler/__main__.py from this value + (env-overridable per processor).""" + + def process_session( + self, + session_path: Path, + username: str, + session_key: str, + conn: duckdb.DuckDBPyConnection, + ) -> ProcessorResult: + """Process exactly one session jsonl. Idempotent per + (name, session_key, file_hash). + + Raise = the runner will NOT mark this session as processed for this + processor → it will be retried on the next scheduler tick. Return = + the runner marks it processed and skips it next time (until its + file_hash changes).""" + ... diff --git a/services/session_pipeline/lib.py b/services/session_pipeline/lib.py new file mode 100644 index 0000000..eaf0d22 --- /dev/null +++ b/services/session_pipeline/lib.py @@ -0,0 +1,40 @@ +"""Pure utilities used by the runner and individual processors. No DB, no +side effects beyond logging.""" + +from __future__ import annotations + +import hashlib +import json +import logging +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def parse_jsonl(path: Path) -> list[dict]: + """Parse a Claude Code session jsonl into a list of event dicts. + + Malformed lines are logged and skipped — a single corrupt row mustn't + abort processing of the rest of the session. Lifted verbatim from the + pre-refactor verification_detector.detector.parse_session so the + behavior is identical.""" + turns: list[dict] = [] + with open(path) as f: + for line in f: + line = line.strip() + if line: + try: + turns.append(json.loads(line)) + except json.JSONDecodeError: + logger.warning("Skipping malformed JSONL line in %s", path) + return turns + + +def compute_file_hash(path: Path) -> str: + """MD5 of the file content. Used to invalidate session_processor_state + rows when a jsonl grows (Claude Code appending to an active session).""" + h = hashlib.md5() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() diff --git a/services/session_pipeline/runner.py b/services/session_pipeline/runner.py new file mode 100644 index 0000000..bcef002 --- /dev/null +++ b/services/session_pipeline/runner.py @@ -0,0 +1,127 @@ +"""Per-processor runner — drives one SessionProcessor across all unprocessed +sessions in /data/user_sessions/. Each processor is invoked independently +(one call to run_processor per scheduler tick per processor); there is no +cross-processor coupling. + +Failure handling mirrors the pre-refactor verification_detector behavior: +per-session try/except, on raise the state row is NOT written → the same +session will be retried on the next tick. There is no max_retries / dead +letter. A permanently malformed session will retry forever; that is a +known limitation we may revisit (out of scope for this refactor). +""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any + +import duckdb + +from services.session_pipeline.contract import ProcessorResult, SessionProcessor +from services.session_pipeline.lib import compute_file_hash +from src.repositories.session_processor_state import SessionProcessorStateRepository + +logger = logging.getLogger(__name__) + +DEFAULT_SESSION_DATA_DIR = Path(os.environ.get("SESSION_DATA_DIR", "/data/user_sessions")) + + +def run_processor( + conn: duckdb.DuckDBPyConnection, + processor: SessionProcessor, + session_data_dir: Path | None = None, +) -> dict[str, Any]: + """Run *processor* against every unprocessed session in + *session_data_dir* (defaults to $SESSION_DATA_DIR or /data/user_sessions). + + Returns a stats dict with: scanned, processed, skipped, errors, + items_extracted, errors_detail. Caller (admin endpoint) puts this in the + audit row and HTTP response body. + """ + effective_dir = session_data_dir if session_data_dir is not None else DEFAULT_SESSION_DATA_DIR + + stats: dict[str, Any] = { + "processor": processor.name, + "scanned": 0, + "processed": 0, + "skipped": 0, + "errors": 0, + "items_extracted": 0, + "errors_detail": [], + } + + repo = SessionProcessorStateRepository(conn) + candidates = repo.scan_unprocessed_for(processor.name, effective_dir) + stats["scanned"] = len(candidates) + + if not candidates: + logger.info("No sessions to process for processor=%s", processor.name) + return stats + + for username, jsonl_path in candidates: + session_key = f"{username}/{jsonl_path.name}" + try: + file_hash = compute_file_hash(jsonl_path) + except Exception as e: + logger.warning( + "Cannot hash %s for processor=%s: %s", + session_key, processor.name, e, + ) + stats["errors"] += 1 + stats["errors_detail"].append({"session": session_key, "error": str(e)}) + continue + + # Hash-aware skip: scan_unprocessed_for returns every candidate; we + # do the authoritative is_processed check here so the runner is the + # single place that decides "this exact (processor, session, hash) + # tuple is already done". Cost: one extra SELECT per candidate, but + # only for files that survived directory scan. + if repo.is_processed(processor.name, session_key, file_hash): + stats["skipped"] += 1 + continue + + try: + result = processor.process_session(jsonl_path, username, session_key, conn) + except Exception as e: + logger.exception( + "Processor %s failed on %s — leaving state unwritten for retry", + processor.name, session_key, + ) + stats["errors"] += 1 + stats["errors_detail"].append({"session": session_key, "error": str(e)}) + continue + + if not isinstance(result, ProcessorResult): + # Defensive: Protocol can't enforce the return type at runtime, + # so a misbehaving processor that returns None or an arbitrary + # dict shouldn't poison the state-write path. Treat it as zero + # items but still mark processed — the alternative (raise) would + # cause the same session to be retried forever. + logger.warning( + "Processor %s returned non-ProcessorResult on %s; coercing to empty result", + processor.name, session_key, + ) + result = ProcessorResult(items_count=0) + + repo.mark_processed( + processor_name=processor.name, + session_file=session_key, + username=username, + items_count=result.items_count, + file_hash=file_hash, + ) + stats["processed"] += 1 + stats["items_extracted"] += result.items_count + + logger.info( + "Processor %s: scanned=%d processed=%d skipped=%d errors=%d items=%d", + processor.name, + stats["scanned"], + stats["processed"], + stats["skipped"], + stats["errors"], + stats["items_extracted"], + ) + return stats diff --git a/services/session_processors/__init__.py b/services/session_processors/__init__.py new file mode 100644 index 0000000..31cddba --- /dev/null +++ b/services/session_processors/__init__.py @@ -0,0 +1,46 @@ +"""Pluggable session processors for the session-pipeline framework. + +Each processor implements the SessionProcessor protocol from +services.session_pipeline.contract and lives in its own module here. + +The PROCESSORS list + PROCESSORS_BY_NAME dict are populated lazily so that +processors needing runtime config (LLM extractor, instance config, etc.) +don't fail at import time when those aren't available — relevant for tests +and for instances where the LLM is intentionally unconfigured. +""" + +from __future__ import annotations + +from functools import lru_cache + +from services.session_pipeline.contract import SessionProcessor +from services.session_processors.usage import UsageProcessor +from services.session_processors.verification import build_verification_processor + + +@lru_cache(maxsize=1) +def _build_registry() -> dict[str, SessionProcessor]: + """Construct the registry once per process. Verification needs an LLM + extractor which is built from instance config + env, so we delay until + something actually asks for the registry — meaning admin endpoint or + scheduler call, not test imports.""" + registry: dict[str, SessionProcessor] = { + "usage": UsageProcessor(), + } + try: + registry["verification"] = build_verification_processor() + except Exception: + # Verification needs an LLM; if construction fails (no API key, + # bad config), the endpoint will report a clean 400 "unknown + # processor" rather than a 500 at import time. The error is logged + # by build_verification_processor. + pass + return registry + + +def get_processor(name: str) -> SessionProcessor | None: + return _build_registry().get(name) + + +def list_processor_names() -> list[str]: + return sorted(_build_registry().keys()) diff --git a/services/session_processors/usage.py b/services/session_processors/usage.py new file mode 100644 index 0000000..99cabd7 --- /dev/null +++ b/services/session_processors/usage.py @@ -0,0 +1,46 @@ +"""UsageProcessor — extracts skill / agent invocation events from Claude Code +session jsonls. + +NOTE: extraction logic is intentionally not implemented yet. Storage shape +(DuckDB events table vs. append-only parquet event log), granularity +(per-invocation row vs. per-session aggregate), and signal sources +(tool_use blocks only vs. also slash-command markers in user messages) are +pending a separate brainstorm — see plan +~/.claude/plans/abundant-leaping-charm.md "Out of scope" section. + +The class exists at this stage so that: + - The session-pipeline framework can be exercised end-to-end with two + registered processors, not one (catches single-processor assumptions). + - The scheduler entry + admin endpoint routing are wired now and won't + need a follow-up PR to add the second processor's plumbing. + +process_session is a no-op that always reports 0 items extracted. The +runner still calls mark_processed so the same session isn't scanned again. +""" + +from __future__ import annotations + +from pathlib import Path + +import duckdb + +from services.session_pipeline.contract import ProcessorResult + + +class UsageProcessor: + name: str = "usage" + cadence_minutes: int = 10 + + def process_session( + self, + session_path: Path, + username: str, + session_key: str, + conn: duckdb.DuckDBPyConnection, + ) -> ProcessorResult: + # TODO: extraction logic — pending brainstorm on signal sources + # (tool_use.name in {"Skill", "Task"}? slash-command markers? + # subagent invocations?) and storage (events table? parquet log? + # aggregates?). For now, return zero so the runner marks the + # session processed and we don't re-scan it every tick. + return ProcessorResult(items_count=0) diff --git a/services/session_processors/verification.py b/services/session_processors/verification.py new file mode 100644 index 0000000..fe0e12e --- /dev/null +++ b/services/session_processors/verification.py @@ -0,0 +1,173 @@ +"""VerificationProcessor — first plugin of the session-pipeline framework. + +Wraps the body of the pre-refactor `verification_detector.detector.run()` +inner loop so the LLM extraction + persist behavior is unchanged after the +framework refactor. Tests in `tests/test_corporate_memory_v1.py` are the +regression contract. +""" + +from __future__ import annotations + +import logging +from pathlib import Path + +import duckdb + +from connectors.llm import StructuredExtractor +from connectors.llm.exceptions import LLMError +from services.corporate_memory import contradiction as contradiction_module +from services.corporate_memory.confidence import compute_confidence +from services.session_pipeline.contract import ProcessorResult +from services.session_pipeline.lib import parse_jsonl +from services.verification_detector.duplicates import _record_duplicate_candidates +from services.verification_detector.detector import ( + _generate_id, + extract_verifications, +) +from src.repositories.knowledge import KnowledgeRepository + +logger = logging.getLogger(__name__) + + +class VerificationProcessor: + name: str = "verification" + cadence_minutes: int = 15 + + def __init__(self, extractor: StructuredExtractor): + self.extractor = extractor + + def process_session( + self, + session_path: Path, + username: str, + session_key: str, + conn: duckdb.DuckDBPyConnection, + ) -> ProcessorResult: + repo = KnowledgeRepository(conn) + session_id = f"session-{session_path.stem}-{username}" + + turns = parse_jsonl(session_path) + if not turns: + logger.info("Empty session: %s", session_key) + return ProcessorResult(items_count=0) + + verifications = extract_verifications(self.extractor, username, session_id, turns) + + items_created = 0 + for v in verifications: + item_id = _generate_id(v["title"], v["content"]) + existing = repo.get_by_id(item_id) + if existing: + # Hash collision on (title, content) → another analyst + # produced the same fact. ADR Decision 3 expects multiple + # evidence rows to accumulate (one per distinct + # verification event), so we still persist the new + # evidence row even though we skip the create+contradiction + # path. Without this, the second analyst's user_quote and + # detection_type are silently dropped and the + # "additional verifiers" boost cannot accumulate. + logger.info( + "Duplicate item — recording evidence on existing: %s", + item_id, + ) + repo.create_evidence( + item_id=item_id, + source_user=username, + source_ref=session_id, + detection_type=v.get("detection_type"), + user_quote=v.get("user_quote"), + ) + continue + + # Confidence is computed in code from (source_type, detection_type). + # The LLM is not trusted to set its own credibility — see Q3 in + # docs/pd-ps-comments.md and the ADR. + detection_type = v.get("detection_type") + try: + confidence_value = compute_confidence("user_verification", detection_type) + except ValueError: + # Unknown detection_type from the LLM; fall back to a + # lookup-keyed default rather than the LLM-supplied value. + confidence_value = compute_confidence("user_verification", "confirmation") + repo.create( + id=item_id, + title=v["title"], + content=v["content"], + category="business_logic", + source_user=username, + tags=v.get("entities", []), + status="pending", + confidence=confidence_value, + domain=v.get("domain"), + entities=v.get("entities"), + source_type="user_verification", + source_ref=session_id, + sensitivity="internal", + ) + # Persist the verification evidence row — user_quote and + # detection_type are the raw signal Bayesian re-calibration + # will need later (Q3). + repo.create_evidence( + item_id=item_id, + source_user=username, + source_ref=session_id, + detection_type=detection_type, + user_quote=v.get("user_quote"), + ) + items_created += 1 + + # Record duplicate-candidate hints inline. Heuristic-only (no + # LLM call) so it stays cheap; failures must never abort + # session processing — log and continue. Issue #62. + try: + new_item = repo.get_by_id(item_id) + if new_item is not None: + _record_duplicate_candidates(repo, new_item) + except Exception as e: + logger.warning( + "Duplicate-candidate detection failed for %s: %s", + item_id, e, + ) + + # Run contradiction detection inline. Failure of the LLM + # judge must not abort session processing — log and move on. + try: + new_item = repo.get_by_id(item_id) + if new_item is not None: + contradiction_module.detect_and_record(self.extractor, new_item, repo) + except LLMError as e: + logger.warning("Contradiction check failed for %s: %s", item_id, e) + except Exception as e: + logger.warning( + "Unexpected error during contradiction check for %s: %s", + item_id, e, + ) + + logger.info( + "Processed %s: %d verifications, %d items created", + session_key, len(verifications), items_created, + ) + return ProcessorResult(items_count=items_created) + + +def build_verification_processor() -> VerificationProcessor: + """Factory that constructs the LLM extractor from instance config + env. + + Mirrors the pattern in services/verification_detector/__main__.py and + app/api/admin.py:run_verification_detector — both built the extractor + lazily at call time. Raises if the LLM isn't configured.""" + from connectors.llm import create_extractor_from_env_or_config + + try: + from app.instance_config import load_instance_config + + try: + config = load_instance_config() + except (ValueError, FileNotFoundError): + config = {} + ai_config = config.get("ai") if config else None + except Exception: + ai_config = None + + extractor = create_extractor_from_env_or_config(ai_config) + return VerificationProcessor(extractor=extractor) diff --git a/services/verification_detector/__main__.py b/services/verification_detector/__main__.py index 3937c71..27f51be 100644 --- a/services/verification_detector/__main__.py +++ b/services/verification_detector/__main__.py @@ -1,17 +1,14 @@ -"""CLI entry point for the verification detector service. +"""CLI entry point for ad-hoc local runs of the verification processor. Usage: - python -m services.verification_detector [--dry-run] [--verbose] [--reset] + python -m services.verification_detector [--verbose] [--reset] -TODO(scheduler-v2): Trigger is manual-only today (CLI) but detect_and_record is -also called inline per new knowledge item submission. Wire into -services/scheduler/__main__.py JOBS list (e.g. hourly) and expose an admin -endpoint /api/admin/run-verification that calls detector.run() so the -scheduler stays the single source of truth for cadence. - -TODO(notifications): When new pending items land in knowledge_items via -detector.run(), there is no admin notification. Hook into services/telegram_bot -or email so km_admins are pinged with a digest of pending items to triage. +After the session-pipeline refactor the canonical execution path is the +admin endpoint POST /api/admin/run-session-processor?processor=verification +driven by the scheduler. This CLI shim is kept as a developer convenience +for running the verification flow against a local instance without going +through HTTP — it constructs the VerificationProcessor and runs it through +the shared runner. """ import argparse @@ -19,10 +16,10 @@ import logging import sys from app.logging_config import setup_logging +from services.session_pipeline.runner import run_processor +from services.session_processors.verification import build_verification_processor from src.db import get_system_db -from . import detector - logger = logging.getLogger(__name__) @@ -30,11 +27,6 @@ def main() -> None: parser = argparse.ArgumentParser( description="Extract verified organizational knowledge from analyst session transcripts." ) - parser.add_argument( - "--dry-run", - action="store_true", - help="Analyze sessions but do not write results to the database.", - ) parser.add_argument( "--verbose", action="store_true", @@ -43,29 +35,17 @@ def main() -> None: parser.add_argument( "--reset", action="store_true", - help="Reset session processing state before running.", + help="Reset the verification processor's session-processed state before running.", ) args = parser.parse_args() setup_logging(__name__, level="DEBUG" if args.verbose else "INFO") - # Load AI config; fail fast on missing config + env (#176). - # Use the overlay-aware loader (#179 review fix) so an ai: block written - # by /api/admin/configure to DATA_DIR/state/instance.yaml actually flows - # through to the factory. - from connectors.llm import create_extractor_from_env_or_config try: - from app.instance_config import load_instance_config - - try: - config = load_instance_config() - except (ValueError, FileNotFoundError): - config = {} - ai_config = config.get("ai") if config else None - extractor = create_extractor_from_env_or_config(ai_config) + processor = build_verification_processor() except (ValueError, FileNotFoundError) as e: logger.error( - "Failed to initialize verification detector: %s. " + "Failed to initialize verification processor: %s. " "Configure ai: in instance.yaml or set ANTHROPIC_API_KEY / LLM_API_KEY.", e, ) @@ -74,24 +54,23 @@ def main() -> None: conn = get_system_db() if args.reset: - logger.info("Resetting session extraction state...") - conn.execute("DELETE FROM session_extraction_state") - logger.info("Session extraction state cleared.") + logger.info("Resetting verification processor state...") + conn.execute( + "DELETE FROM session_processor_state WHERE processor_name = ?", + [processor.name], + ) - stats = detector.run(conn, extractor, dry_run=args.dry_run) + stats = run_processor(conn, processor) - print("\n--- Verification Detector Summary ---") - print(f"Sessions scanned: {stats['sessions_scanned']}") - print(f"Sessions processed: {stats['sessions_processed']}") - print(f"Sessions skipped: {stats['sessions_skipped']}") - print(f"Verifications extracted: {stats['verifications_extracted']}") - print(f"Items created: {stats['items_created']}") + print("\n--- Verification Processor Summary ---") + print(f"Sessions scanned: {stats['scanned']}") + print(f"Sessions processed: {stats['processed']}") + print(f"Sessions skipped: {stats['skipped']}") + print(f"Items created: {stats['items_extracted']}") if stats["errors"]: - print(f"Errors: {len(stats['errors'])}") - for err in stats["errors"]: + print(f"Errors: {stats['errors']}") + for err in stats["errors_detail"]: print(f" - {err}") - if args.dry_run: - print("\n(dry-run mode -- no changes were written)") if stats["errors"]: sys.exit(1) diff --git a/services/verification_detector/detector.py b/services/verification_detector/detector.py index 465a4f2..948e2b1 100644 --- a/services/verification_detector/detector.py +++ b/services/verification_detector/detector.py @@ -1,29 +1,25 @@ -"""Main pipeline for the verification detector service. +"""LLM-side helpers for the verification detector. -Scans unprocessed analyst session transcripts, sends them to an LLM for -verification extraction, and stores the results in the knowledge repository. +After the session-pipeline refactor, the orchestration loop (scan unprocessed +→ parse jsonl → mark processed) lives in services/session_pipeline/, and the +per-session persistence flow lives in services/session_processors/verification.py +(VerificationProcessor). This module retains only the pieces specific to LLM +extraction — prompt formatting, the structured-output call, and the +deterministic-id helper — which both the new processor and the legacy +__main__.py CLI shim still import. """ import hashlib -import json import logging -import os -from pathlib import Path -from typing import Any from connectors.llm import StructuredExtractor from connectors.llm.exceptions import LLMError -from services.corporate_memory import contradiction as contradiction_module -from services.corporate_memory.confidence import compute_confidence -from src.repositories.knowledge import KnowledgeRepository -from .duplicates import _record_duplicate_candidates from .prompts import VERIFICATION_EXTRACT_PROMPT from .schemas import VERIFICATION_SCHEMA logger = logging.getLogger(__name__) -SESSION_DATA_DIR = Path(os.environ.get("SESSION_DATA_DIR", "/data/user_sessions")) MAX_TURNS_PER_SESSION = 100 @@ -33,38 +29,6 @@ def _generate_id(title: str, content: str) -> str: return "kv_" + hashlib.sha256(raw.encode()).hexdigest()[:12] -def scan_unprocessed_sessions(conn, session_dir: Path | None = None) -> list[tuple[str, Path]]: - """Find JSONL files not yet in session_extraction_state table.""" - repo = KnowledgeRepository(conn) - results: list[tuple[str, Path]] = [] - effective_dir = session_dir if session_dir is not None else SESSION_DATA_DIR - if not effective_dir.exists(): - return results - for user_dir in effective_dir.iterdir(): - if not user_dir.is_dir(): - continue - username = user_dir.name - for jsonl_file in sorted(user_dir.glob("*.jsonl")): - key = f"{username}/{jsonl_file.name}" - if not repo.is_session_processed(key): - results.append((username, jsonl_file)) - return results - - -def parse_session(jsonl_path: Path) -> list[dict]: - """Parse JSONL session file into conversation turns.""" - turns: list[dict] = [] - with open(jsonl_path) as f: - for line in f: - line = line.strip() - if line: - try: - turns.append(json.loads(line)) - except json.JSONDecodeError: - logger.warning("Skipping malformed JSONL line in %s", jsonl_path) - return turns - - def _format_turns(turns: list[dict]) -> str: """Format conversation turns as a parseable, prompt-injection-hardened block. @@ -82,15 +46,6 @@ def _format_turns(turns: list[dict]) -> str: return "\n".join(lines) -def _compute_file_hash(path: Path) -> str: - """Compute MD5 hash of a file.""" - h = hashlib.md5() - with open(path, "rb") as f: - for chunk in iter(lambda: f.read(8192), b""): - h.update(chunk) - return h.hexdigest() - - def extract_verifications( extractor: StructuredExtractor, username: str, @@ -124,167 +79,3 @@ def extract_verifications( except LLMError as e: logger.error("LLM extraction failed for session %s: %s", session_id, e) return [] - - -def run( - conn, - extractor: StructuredExtractor, - dry_run: bool = False, - session_data_dir: Path | None = None, -) -> dict[str, Any]: - """Run the full verification detection pipeline. - - Returns stats dict with counts. - """ - effective_session_dir = session_data_dir if session_data_dir is not None else SESSION_DATA_DIR - - stats: dict[str, Any] = { - "sessions_scanned": 0, - "sessions_processed": 0, - "sessions_skipped": 0, - "verifications_extracted": 0, - "items_created": 0, - "contradictions_recorded": 0, - "duplicate_candidates_recorded": 0, - "errors": [], - } - - unprocessed = scan_unprocessed_sessions(conn, session_dir=effective_session_dir) - stats["sessions_scanned"] = len(unprocessed) - - if not unprocessed: - logger.info("No unprocessed sessions found") - return stats - - repo = KnowledgeRepository(conn) - - for username, jsonl_path in unprocessed: - session_key = f"{username}/{jsonl_path.name}" - session_id = f"session-{jsonl_path.stem}-{username}" - - try: - turns = parse_session(jsonl_path) - if not turns: - logger.info("Empty session: %s", session_key) - if not dry_run: - repo.mark_session_processed(session_key, username, 0, _compute_file_hash(jsonl_path)) - stats["sessions_skipped"] += 1 - continue - - verifications = extract_verifications(extractor, username, session_id, turns) - stats["verifications_extracted"] += len(verifications) - - items_created = 0 - for v in verifications: - item_id = _generate_id(v["title"], v["content"]) - # Check if item already exists (deduplication) - existing = repo.get_by_id(item_id) - if existing: - # Hash collision on (title, content) → another analyst - # produced the same fact. ADR Decision 3 expects multiple - # evidence rows to accumulate (one per distinct - # verification event), so we still persist the new - # evidence row even though we skip the create+contradiction - # path. Without this, the second analyst's user_quote and - # detection_type are silently dropped and the - # "additional verifiers" boost cannot accumulate. - logger.info( - "Duplicate item — recording evidence on existing: %s", - item_id, - ) - if not dry_run: - repo.create_evidence( - item_id=item_id, - source_user=username, - source_ref=session_id, - detection_type=v.get("detection_type"), - user_quote=v.get("user_quote"), - ) - continue - - if not dry_run: - # Confidence is computed in code from (source_type, detection_type). - # The LLM is not trusted to set its own credibility — see Q3 in - # docs/pd-ps-comments.md and the ADR. - detection_type = v.get("detection_type") - try: - confidence_value = compute_confidence("user_verification", detection_type) - except ValueError: - # Unknown detection_type from the LLM; fall back to a - # lookup-keyed default rather than the LLM-supplied value. - confidence_value = compute_confidence("user_verification", "confirmation") - repo.create( - id=item_id, - title=v["title"], - content=v["content"], - category="business_logic", - source_user=username, - tags=v.get("entities", []), - status="pending", - confidence=confidence_value, - domain=v.get("domain"), - entities=v.get("entities"), - source_type="user_verification", - source_ref=session_id, - sensitivity="internal", - ) - # Persist the verification evidence row — user_quote and - # detection_type are the raw signal Bayesian re-calibration - # will need later (Q3). - repo.create_evidence( - item_id=item_id, - source_user=username, - source_ref=session_id, - detection_type=detection_type, - user_quote=v.get("user_quote"), - ) - items_created += 1 - # Record duplicate-candidate hints inline. Heuristic-only - # (no LLM call) so it stays cheap; failures must never - # abort session processing — log and continue. Issue #62. - try: - new_item = repo.get_by_id(item_id) - if new_item is not None: - recorded_dup = _record_duplicate_candidates( - repo, new_item - ) - stats["duplicate_candidates_recorded"] += recorded_dup - except Exception as e: - logger.warning( - "Duplicate-candidate detection failed for %s: %s", - item_id, e, - ) - - # Run contradiction detection inline. Failure of the LLM - # judge must not abort session processing — log and move on. - try: - new_item = repo.get_by_id(item_id) - if new_item is not None: - recorded = contradiction_module.detect_and_record(extractor, new_item, repo) - stats["contradictions_recorded"] += len(recorded) - except LLMError as e: - logger.warning("Contradiction check failed for %s: %s", item_id, e) - except Exception as e: - logger.warning( - "Unexpected error during contradiction check for %s: %s", - item_id, - e, - ) - - if not dry_run: - repo.mark_session_processed(session_key, username, items_created, _compute_file_hash(jsonl_path)) - - stats["sessions_processed"] += 1 - stats["items_created"] += items_created - logger.info( - "Processed %s: %d verifications, %d items created", - session_key, - len(verifications), - items_created, - ) - - except Exception as e: - logger.error("Error processing %s: %s", session_key, e) - stats["errors"].append(f"{session_key}: {e}") - - return stats diff --git a/src/db.py b/src/db.py index c6b9a1f..9d8ab10 100644 --- a/src/db.py +++ b/src/db.py @@ -40,7 +40,7 @@ def _maybe_instrument(con, db_tag: str): _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 30 +SCHEMA_VERSION = 31 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -187,15 +187,19 @@ CREATE TABLE IF NOT EXISTS knowledge_item_relations ( CREATE INDEX IF NOT EXISTS idx_knowledge_item_relations_resolved ON knowledge_item_relations(resolved); --- v15: track which session JSONL files the verification detector has already --- processed so re-runs over the same session dir are idempotent and the --- detector can resume mid-batch on crash. -CREATE TABLE IF NOT EXISTS session_extraction_state ( - session_file VARCHAR PRIMARY KEY, +-- v15→v29: state tracking for any session-pipeline processor (verification, +-- usage, future extractors). Composite PK (processor_name, session_file) so +-- each processor has its own independent processed-set keyed by jsonl path. +-- file_hash invalidates state when a session jsonl grows (live append from +-- an active Claude Code session) so processors reprocess the new content. +CREATE TABLE IF NOT EXISTS session_processor_state ( + processor_name VARCHAR NOT NULL, + session_file VARCHAR NOT NULL, username VARCHAR NOT NULL, processed_at TIMESTAMP DEFAULT current_timestamp, items_extracted INTEGER DEFAULT 0, - file_hash VARCHAR + file_hash VARCHAR, + PRIMARY KEY (processor_name, session_file) ); -- v16: per-detection evidence rows — one knowledge_item can accumulate @@ -2098,6 +2102,68 @@ _V29_TO_V30_MIGRATIONS = [ ] +# v31: rename session_extraction_state → session_processor_state with composite +# PK (processor_name, session_file). The session pipeline framework +# (services/session_pipeline/) lets multiple processors track their own +# processed-set independently; each gets its own row keyed by name. Existing +# rows belong to the verification detector, so they're copied across with +# processor_name='verification'. The old single-PK table is dropped — its only +# caller (services/verification_detector/detector.py) is rewritten in the same +# PR to use the new repository. +# +# (Originally drafted as v29 but renumbered to v31 after rebase onto upstream's +# v29 instance_templates + v30 news_template work.) +# +# Implemented as a function rather than a SQL list because the INSERT-from-old +# step depends on whether `session_extraction_state` actually exists. Fresh +# installs at a pre-v31 schema_version (test fixtures hand-rolling a v19/v20 +# DB) come through `_SYSTEM_SCHEMA` which already creates +# `session_processor_state` at the new shape — but does NOT create the old +# `session_extraction_state` (we removed that). So the migration must skip +# the copy + drop when the old table is missing rather than 500 on +# CatalogException. +_V30_TO_V31_CREATE_NEW_TABLE = """ + CREATE TABLE IF NOT EXISTS session_processor_state ( + processor_name VARCHAR NOT NULL, + session_file VARCHAR NOT NULL, + username VARCHAR NOT NULL, + processed_at TIMESTAMP DEFAULT current_timestamp, + items_extracted INTEGER DEFAULT 0, + file_hash VARCHAR, + PRIMARY KEY (processor_name, session_file) + ) +""" + + +def _v30_to_v31_migrate(conn: duckdb.DuckDBPyConnection) -> None: + """Run the v31 migration steps with conditional copy from the legacy table.""" + conn.execute(_V30_TO_V31_CREATE_NEW_TABLE) + + # Skip the copy + drop when the legacy table doesn't exist (fresh + # install or upgrade path that started at >= v31). Otherwise migrate + # rows over with processor_name='verification' (the only writer of the + # legacy table). + has_legacy = conn.execute( + "SELECT 1 FROM information_schema.tables " + "WHERE table_schema = 'main' AND table_name = 'session_extraction_state'" + ).fetchone() + if not has_legacy: + return + + # INSERT OR IGNORE on the (processor_name, session_file) PK so a + # re-run idempotently no-ops if a verification row was already + # written at the new shape. + conn.execute( + """ + INSERT OR IGNORE INTO session_processor_state + (processor_name, session_file, username, processed_at, items_extracted, file_hash) + SELECT 'verification', session_file, username, processed_at, items_extracted, file_hash + FROM session_extraction_state + """ + ) + conn.execute("DROP TABLE session_extraction_state") + + # v24: rewrite materialized BQ source_query from DuckDB-flavor # (bq.""."") to BigQuery-native (`..
`) # so the new connectors.bigquery.extractor.materialize_query wrapping @@ -2368,6 +2434,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 30: for sql in _V29_TO_V30_MIGRATIONS: conn.execute(sql) + if current < 31: + _v30_to_v31_migrate(conn) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/src/repositories/knowledge.py b/src/repositories/knowledge.py index f7a1f5b..37ce35d 100644 --- a/src/repositories/knowledge.py +++ b/src/repositories/knowledge.py @@ -420,33 +420,6 @@ class KnowledgeRepository: ).fetchall() return self._rows_to_dicts(results) - # --- Session Extraction State --- - - def mark_session_processed( - self, - session_file: str, - username: str, - items_extracted: int = 0, - file_hash: Optional[str] = None, - ) -> None: - now = datetime.now(timezone.utc) - self.conn.execute( - """INSERT INTO session_extraction_state (session_file, username, processed_at, items_extracted, file_hash) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (session_file) DO UPDATE - SET processed_at = excluded.processed_at, - items_extracted = excluded.items_extracted, - file_hash = excluded.file_hash""", - [session_file, username, now, items_extracted, file_hash], - ) - - def is_session_processed(self, session_file: str) -> bool: - result = self.conn.execute( - "SELECT 1 FROM session_extraction_state WHERE session_file = ?", - [session_file], - ).fetchone() - return result is not None - # --- Item relations (duplicate-candidate hints, etc.) --- @staticmethod diff --git a/src/repositories/session_processor_state.py b/src/repositories/session_processor_state.py new file mode 100644 index 0000000..1ae2a57 --- /dev/null +++ b/src/repositories/session_processor_state.py @@ -0,0 +1,138 @@ +"""Repository for session_processor_state — per-(processor, session) bookkeeping +for the session pipeline framework (services/session_pipeline/). + +Composite PK (processor_name, session_file) lets each processor track its own +processed-set independently. file_hash invalidates the row when a session jsonl +grows (Claude Code appending live to an active session) so processors reprocess +the new content rather than treating the first hash as final. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +import duckdb + + +class SessionProcessorStateRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def is_processed( + self, + processor_name: str, + session_file: str, + file_hash: str, + ) -> bool: + """True iff a state row exists for (processor_name, session_file) AND + the stored file_hash matches the supplied current hash. Hash mismatch + (e.g. session jsonl grew since last run) is treated as unprocessed + so the processor reprocesses on the next tick.""" + result = self.conn.execute( + """SELECT file_hash FROM session_processor_state + WHERE processor_name = ? AND session_file = ?""", + [processor_name, session_file], + ).fetchone() + if result is None: + return False + return result[0] == file_hash + + def mark_processed( + self, + processor_name: str, + session_file: str, + username: str, + items_count: int, + file_hash: str, + ) -> None: + """UPSERT — overwrites previous state row for (processor, session).""" + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO session_processor_state + (processor_name, session_file, username, processed_at, items_extracted, file_hash) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (processor_name, session_file) DO UPDATE + SET processed_at = excluded.processed_at, + items_extracted = excluded.items_extracted, + file_hash = excluded.file_hash, + username = excluded.username""", + [processor_name, session_file, username, now, items_count, file_hash], + ) + + def scan_unprocessed_for( + self, + processor_name: str, + session_dir: Path, + ) -> list[tuple[str, Path]]: + """Return (username, jsonl_path) pairs in *session_dir* that this + processor needs to (re)process: no state row, OR state row with + an mtime newer than the stored processed_at (file modified since + last run — likely a live-append from an active Claude Code session). + + The mtime precheck is a cheap stat-only optimization: for stable + sessions (mtime <= processed_at) we skip without reading the file. + Files that survive the precheck still go through the runner's + per-file ``is_processed(file_hash)`` check for authoritative + hash-based invalidation. Without this filter, the runner would + MD5-rehash every stable session on every scheduler tick. + """ + results: list[tuple[str, Path]] = [] + if not session_dir.exists(): + return results + + # One query per scan, not per file. Storing processed_at (not file_hash) + # because mtime is the cheap precheck — file_hash compare lives in the + # runner where it's already paying the IO cost to hash. + known: dict[str, Optional[datetime]] = {} + rows = self.conn.execute( + """SELECT session_file, processed_at FROM session_processor_state + WHERE processor_name = ?""", + [processor_name], + ).fetchall() + for sf, pa in rows: + known[sf] = pa + + for user_dir in session_dir.iterdir(): + if not user_dir.is_dir(): + continue + username = user_dir.name + for jsonl_file in sorted(user_dir.glob("*.jsonl")): + key = f"{username}/{jsonl_file.name}" + if key not in known: + # No state row → definitely needs processing. + results.append((username, jsonl_file)) + continue + processed_at = known[key] + if processed_at is None: + # Defensive: row without processed_at shouldn't happen + # (mark_processed always sets it), but if it does, + # surface for the runner. + results.append((username, jsonl_file)) + continue + try: + mtime_epoch = jsonl_file.stat().st_mtime + except OSError: + # Stat failure: surface for the runner — it'll fail the + # hash compute next and report a clean error in stats + # rather than us silently dropping the file here. + results.append((username, jsonl_file)) + continue + # Compare in naive-local: DuckDB TIMESTAMP strips tz on + # storage and converts tz-aware writes to local time before + # storing (see app/api/health.py:_check_session_pipeline for + # the same idiom). `datetime.fromtimestamp(epoch)` without + # `tz=` returns naive-local, matching processed_at after + # the optional tz strip below. + mtime = datetime.fromtimestamp(mtime_epoch) + if processed_at.tzinfo is not None: + processed_at = processed_at.replace(tzinfo=None) + if mtime > processed_at: + # File touched since last run — could be a live-append + # (Claude Code writing to an active session). Surface + # for the runner; its hash compare will skip if content + # is identical (some editors rewrite-without-change). + results.append((username, jsonl_file)) + # else: stable session, skip without hashing. + return results diff --git a/tests/test_admin_run_endpoints.py b/tests/test_admin_run_endpoints.py index e5b2008..3f593b2 100644 --- a/tests/test_admin_run_endpoints.py +++ b/tests/test_admin_run_endpoints.py @@ -1,19 +1,17 @@ """Admin run-* endpoints that wire the LLM pipeline into scheduler-v2. -The scheduler container must drive corporate-memory, verification-detector, -and session-collector through HTTP — see services/scheduler/__main__.py +The scheduler container must drive corporate-memory, the session-pipeline +processors, and session-collector through HTTP — see services/scheduler/__main__.py docstring for why in-process invocation is not safe (DuckDB single-writer contention with the long-lived app handle). Endpoints: - POST /api/admin/run-session-collector -- POST /api/admin/run-verification-detector +- POST /api/admin/run-session-processor?processor= - POST /api/admin/run-corporate-memory All admin-gated. Request body is empty. Response is the underlying job stats dict. - -Closes one of five defects in #176. """ from __future__ import annotations @@ -79,40 +77,175 @@ class TestRunSessionCollector: assert "PermissionError" in params_json -class TestRunVerificationDetector: - def test_admin_can_trigger_verification_detector(self, seeded_app, monkeypatch): - # Set the env so the factory's env-fallback returns a real (mocked - # at the SDK boundary) extractor without 500-ing on missing config. +class TestRunSessionProcessor: + """Parametrized session-processor endpoint replaces the per-processor + /run-* endpoints. The scheduler invokes it once per registered processor + on its own cadence.""" + + def test_admin_can_trigger_verification(self, seeded_app, monkeypatch): + # Need an LLM key in env so build_verification_processor() doesn't + # raise during registry construction. monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + # Reset the lazily-built registry so the new env is picked up. + from services.session_processors import _build_registry + _build_registry.cache_clear() + c = seeded_app["client"] token = seeded_app["admin_token"] fake_stats = { - "sessions_scanned": 3, - "sessions_processed": 2, - "sessions_skipped": 1, - "verifications_extracted": 5, - "items_created": 4, - "errors": [], + "processor": "verification", + "scanned": 3, "processed": 2, "skipped": 1, "errors": 0, + "items_extracted": 4, "errors_detail": [], } with patch( - "services.verification_detector.detector.run", + "services.session_pipeline.runner.run_processor", return_value=fake_stats, - ) as m, patch( - "connectors.llm.factory.AnthropicExtractor" - ): - resp = c.post("/api/admin/run-verification-detector", headers=_auth(token)) + ) as m, patch("connectors.llm.factory.AnthropicExtractor"): + resp = c.post( + "/api/admin/run-session-processor?processor=verification", + headers=_auth(token), + ) assert resp.status_code == 200, resp.text body = resp.json() assert body["ok"] is True - assert body["details"]["items_created"] == 4 + assert body["details"]["items_extracted"] == 4 m.assert_called_once() + def test_admin_can_trigger_usage_skeleton(self, seeded_app): + """The usage processor is registered as a no-op skeleton — endpoint + should route to it without needing any LLM config.""" + from services.session_processors import _build_registry + _build_registry.cache_clear() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + fake_stats = { + "processor": "usage", + "scanned": 0, "processed": 0, "skipped": 0, "errors": 0, + "items_extracted": 0, "errors_detail": [], + } + with patch( + "services.session_pipeline.runner.run_processor", + return_value=fake_stats, + ) as m: + resp = c.post( + "/api/admin/run-session-processor?processor=usage", + headers=_auth(token), + ) + assert resp.status_code == 200, resp.text + assert resp.json()["ok"] is True + m.assert_called_once() + + def test_unknown_processor_returns_400(self, seeded_app): + from services.session_processors import _build_registry + _build_registry.cache_clear() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + resp = c.post( + "/api/admin/run-session-processor?processor=bogus", + headers=_auth(token), + ) + assert resp.status_code == 400 + assert "Unknown processor" in resp.json()["detail"] + + def test_concurrent_invocation_returns_409(self, seeded_app): + """Per-processor advisory lock rejects overlapping calls so + scheduler tick + manual admin POST don't double up on the same + sessions and pile up duplicate verification_evidence rows + (PR #232 review).""" + from app.api.admin import _get_processor_run_lock + from services.session_processors import _build_registry + _build_registry.cache_clear() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + + # Hold the lock externally to simulate an in-flight invocation. + lock = _get_processor_run_lock("usage") + lock.acquire() + try: + resp = c.post( + "/api/admin/run-session-processor?processor=usage", + headers=_auth(token), + ) + finally: + lock.release() + + assert resp.status_code == 409 + assert "already running" in resp.json()["detail"] + + def test_lock_released_on_runner_exception(self, seeded_app): + """Even when the runner raises, the lock must release so the next + scheduler tick / admin POST can proceed. A leaked lock would wedge + the processor permanently until process restart.""" + from app.api.admin import _get_processor_run_lock + from services.session_processors import _build_registry + _build_registry.cache_clear() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + + with patch( + "services.session_pipeline.runner.run_processor", + side_effect=RuntimeError("simulated"), + ): + resp = c.post( + "/api/admin/run-session-processor?processor=usage", + headers=_auth(token), + ) + assert resp.status_code == 500 + + # Lock must be free now — second invocation can grab it. + lock = _get_processor_run_lock("usage") + assert lock.acquire(blocking=False), "lock leaked after runner exception" + lock.release() + def test_non_admin_blocked(self, seeded_app): c = seeded_app["client"] token = seeded_app["analyst_token"] - resp = c.post("/api/admin/run-verification-detector", headers=_auth(token)) + resp = c.post( + "/api/admin/run-session-processor?processor=verification", + headers=_auth(token), + ) assert resp.status_code == 403 + def test_unhandled_exception_still_audits(self, seeded_app, monkeypatch): + """Mirror the run_session_collector / run_corporate_memory pattern — + record the failure in audit_log even when the runner raises so + /admin/scheduler-runs sees the failure instead of only docker logs.""" + from src.db import get_system_db + from services.session_processors import _build_registry + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + _build_registry.cache_clear() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + with patch( + "services.session_pipeline.runner.run_processor", + side_effect=RuntimeError("simulated DuckDB lock"), + ), patch("connectors.llm.factory.AnthropicExtractor"): + resp = c.post( + "/api/admin/run-session-processor?processor=verification", + headers=_auth(token), + ) + assert resp.status_code == 500 + assert "RuntimeError" in resp.json()["detail"] + + conn = get_system_db() + try: + rows = conn.execute( + "SELECT params FROM audit_log " + "WHERE action = 'run_session_processor:verification' " + "ORDER BY timestamp DESC LIMIT 1" + ).fetchall() + finally: + conn.close() + assert rows, "audit row missing on unhandled exception" + params_json = rows[0][0] + assert "unhandled_error" in params_json + assert "RuntimeError" in params_json + class TestRunCorporateMemory: def test_admin_can_trigger_corporate_memory(self, seeded_app): @@ -192,7 +325,9 @@ class TestSchedulerJobsWireUp: names = {n for n, *_ in build_jobs()} assert "session-collector" in names - def test_scheduler_includes_verification_detector(self, monkeypatch): + def test_scheduler_includes_session_processors(self, monkeypatch): + """Post-refactor: the verification-detector + usage processors are + wired through the parametrized run-session-processor endpoint.""" for v in ( "SCHEDULER_DATA_REFRESH_INTERVAL", "SCHEDULER_HEALTH_CHECK_INTERVAL", @@ -202,7 +337,8 @@ class TestSchedulerJobsWireUp: monkeypatch.delenv(v, raising=False) from services.scheduler.__main__ import build_jobs names = {n for n, *_ in build_jobs()} - assert "verification-detector" in names + assert "session-processor:verification" in names + assert "session-processor:usage" in names def test_scheduler_includes_corporate_memory(self, monkeypatch): for v in ( @@ -230,7 +366,7 @@ class TestSchedulerJobsWireUp: assert endpoint == "/api/admin/run-session-collector" assert method == "POST" - def test_verification_detector_endpoint_is_registered(self, monkeypatch): + def test_session_processor_endpoints_are_registered(self, monkeypatch): for v in ( "SCHEDULER_DATA_REFRESH_INTERVAL", "SCHEDULER_HEALTH_CHECK_INTERVAL", @@ -239,10 +375,13 @@ class TestSchedulerJobsWireUp: ): monkeypatch.delenv(v, raising=False) from services.scheduler.__main__ import build_jobs - target = next(j for j in build_jobs() if j[0] == "verification-detector") - _, _, endpoint, method, _t = target - assert endpoint == "/api/admin/run-verification-detector" - assert method == "POST" + jobs = {n: (endpoint, method) for n, _, endpoint, method, _ in build_jobs()} + assert jobs["session-processor:verification"] == ( + "/api/admin/run-session-processor?processor=verification", "POST", + ) + assert jobs["session-processor:usage"] == ( + "/api/admin/run-session-processor?processor=usage", "POST", + ) def test_corporate_memory_endpoint_is_registered(self, monkeypatch): for v in ( @@ -273,6 +412,10 @@ class TestSchedulerJobsWireUp: monkeypatch.delenv(v, raising=False) from services.scheduler.__main__ import build_jobs targets = {n: schedule for n, schedule, *_ in build_jobs() - if n in ("session-collector", "verification-detector", "corporate-memory")} + if n in ( + "session-collector", + "session-processor:verification", + "corporate-memory", + )} # All three present. assert len(targets) == 3 diff --git a/tests/test_corporate_memory_relations.py b/tests/test_corporate_memory_relations.py index 9805a94..7f67e12 100644 --- a/tests/test_corporate_memory_relations.py +++ b/tests/test_corporate_memory_relations.py @@ -282,7 +282,8 @@ class TestRunPopulatesDuplicateStats: def test_run_records_duplicates_when_two_items_share_entities( self, tmp_path, monkeypatch ): - from services.verification_detector.detector import run + from services.session_pipeline.runner import run_processor + from services.session_processors.verification import VerificationProcessor conn = _fresh_db(tmp_path, monkeypatch) # Mocked golden: two items in same domain sharing 2 entities @@ -316,11 +317,19 @@ class TestRunPopulatesDuplicateStats: # Minimal valid JSONL transcript with at least one turn (session_dir / "s1.jsonl").write_text('{"role":"user","content":"hi"}\n') - stats = run(conn, extractor, session_data_dir=tmp_path / "user_sessions") - assert stats["items_created"] >= 1 + stats = run_processor( + conn, VerificationProcessor(extractor), + session_data_dir=tmp_path / "user_sessions", + ) + assert stats["items_extracted"] >= 1 # The second item's duplicate-candidate hook should fire against the - # first one (same entities, same domain). - assert stats["duplicate_candidates_recorded"] >= 1 + # first one (same entities, same domain). Post-refactor the runner + # doesn't surface a duplicate counter in stats; query the table that + # _record_duplicate_candidates writes into instead. + dup_rows = conn.execute( + "SELECT COUNT(*) FROM knowledge_item_relations WHERE relation_type = 'likely_duplicate'" + ).fetchone()[0] + assert dup_rows >= 1 conn.close() diff --git a/tests/test_corporate_memory_v1.py b/tests/test_corporate_memory_v1.py index e06d2cb..94a56da 100644 --- a/tests/test_corporate_memory_v1.py +++ b/tests/test_corporate_memory_v1.py @@ -36,6 +36,38 @@ def _fresh_db(tmp_path, monkeypatch): return conn +def _run_verification_processor(conn, extractor, session_data_dir=None): + """Run the verification processor through the new framework. + + Returns a stats dict with both new keys (scanned/processed/skipped/ + items_extracted) AND legacy aliases (sessions_scanned/sessions_processed/ + sessions_skipped/verifications_extracted/items_created/contradictions_recorded) + derived from pre/post row counts so existing assertions keep working + after the session-pipeline refactor. + """ + from services.session_pipeline.runner import run_processor + from services.session_processors.verification import VerificationProcessor + + pre_evidence = conn.execute("SELECT COUNT(*) FROM verification_evidence").fetchone()[0] + pre_contradictions = conn.execute("SELECT COUNT(*) FROM knowledge_contradictions").fetchone()[0] + + processor = VerificationProcessor(extractor) + stats = run_processor(conn, processor, session_data_dir=session_data_dir) + + post_evidence = conn.execute("SELECT COUNT(*) FROM verification_evidence").fetchone()[0] + post_contradictions = conn.execute("SELECT COUNT(*) FROM knowledge_contradictions").fetchone()[0] + + return { + **stats, + "sessions_scanned": stats["scanned"], + "sessions_processed": stats["processed"], + "sessions_skipped": stats["skipped"], + "verifications_extracted": post_evidence - pre_evidence, + "items_created": stats["items_extracted"], + "contradictions_recorded": post_contradictions - pre_contradictions, + } + + def _load_golden(name: str) -> dict: """Load a golden verification output file.""" with open(VERIFICATIONS_DIR / f"{name}.json") as f: @@ -65,7 +97,11 @@ class TestSchemaV8Migration: ).fetchall() } assert "knowledge_contradictions" in tables - assert "session_extraction_state" in tables + # v29 renamed session_extraction_state → session_processor_state with + # composite (processor_name, session_file) PK so multiple processors + # can track their own processed-set independently. + assert "session_processor_state" in tables + assert "session_extraction_state" not in tables conn.close() def test_knowledge_items_has_new_columns(self, tmp_path, monkeypatch): @@ -214,16 +250,25 @@ class TestKnowledgeRepositoryV1: assert resolved[0]["resolution"] == "kept_a" conn.close() - def test_session_extraction_state(self, tmp_path, monkeypatch): + def test_session_processor_state(self, tmp_path, monkeypatch): + """Post-v29: session-processed bookkeeping moved out of + KnowledgeRepository into SessionProcessorStateRepository, keyed by + (processor_name, session_file). Each processor tracks its own + processed-set independently.""" conn = _fresh_db(tmp_path, monkeypatch) - from src.repositories.knowledge import KnowledgeRepository - repo = KnowledgeRepository(conn) + from src.repositories.session_processor_state import SessionProcessorStateRepository + repo = SessionProcessorStateRepository(conn) - assert repo.is_session_processed("alice/session1.jsonl") is False + assert repo.is_processed("verification", "alice/session1.jsonl", "abc123") is False - repo.mark_session_processed("alice/session1.jsonl", "alice", 3, "abc123") - assert repo.is_session_processed("alice/session1.jsonl") is True - assert repo.is_session_processed("alice/session2.jsonl") is False + repo.mark_processed("verification", "alice/session1.jsonl", "alice", 3, "abc123") + assert repo.is_processed("verification", "alice/session1.jsonl", "abc123") is True + # Different hash → treated as unprocessed (live append invalidation). + assert repo.is_processed("verification", "alice/session1.jsonl", "different") is False + # Another session not seen at all. + assert repo.is_processed("verification", "alice/session2.jsonl", "any") is False + # Different processor → independent state. + assert repo.is_processed("usage", "alice/session1.jsonl", "abc123") is False conn.close() def test_find_contradiction_candidates(self, tmp_path, monkeypatch): @@ -401,7 +446,7 @@ class TestSessionParsing: """Test JSONL session file parsing (no LLM).""" def test_parse_correction_session(self): - from services.verification_detector.detector import parse_session + from services.session_pipeline.lib import parse_jsonl as parse_session turns = parse_session(SESSIONS_DIR / "correction_churn_metric.jsonl") assert len(turns) == 4 assert turns[0]["role"] == "assistant" @@ -409,14 +454,14 @@ class TestSessionParsing: assert "wrong" in turns[1]["content"].lower() def test_parse_empty_file(self, tmp_path): - from services.verification_detector.detector import parse_session + from services.session_pipeline.lib import parse_jsonl as parse_session empty_file = tmp_path / "empty.jsonl" empty_file.write_text("") turns = parse_session(empty_file) assert turns == [] def test_parse_malformed_line_skipped(self, tmp_path): - from services.verification_detector.detector import parse_session + from services.session_pipeline.lib import parse_jsonl as parse_session bad_file = tmp_path / "bad.jsonl" bad_file.write_text('{"role": "user", "content": "ok"}\nNOT_JSON\n{"role": "assistant", "content": "sure"}\n') turns = parse_session(bad_file) @@ -472,7 +517,7 @@ class TestVerificationDetectorIntegration: def test_correction_pipeline(self, tmp_path, monkeypatch): conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor golden = _load_golden("correction_churn_metric") extractor = _mock_extractor(golden) @@ -499,7 +544,7 @@ class TestVerificationDetectorIntegration: def test_empty_session_skipped(self, tmp_path, monkeypatch): conn = _fresh_db(tmp_path, monkeypatch) - from services.verification_detector.detector import run + run = _run_verification_processor golden = _load_golden("no_verifications") extractor = _mock_extractor(golden) @@ -519,7 +564,7 @@ class TestVerificationDetectorIntegration: def test_idempotency(self, tmp_path, monkeypatch): """Running twice on same session should not create duplicate items.""" conn = _fresh_db(tmp_path, monkeypatch) - from services.verification_detector.detector import run + run = _run_verification_processor golden = _load_golden("correction_churn_metric") extractor = _mock_extractor(golden) @@ -534,13 +579,19 @@ class TestVerificationDetectorIntegration: stats2 = run(conn, extractor, session_data_dir=tmp_path / "user_sessions") assert stats1["items_created"] == 1 - assert stats2["sessions_scanned"] == 0 # Already processed + # Post-refactor: stable sessions (mtime <= processed_at) are filtered + # at scan via the mtime precheck so the runner never sees them → + # `scanned == 0`, not `skipped == 1`. PR #232 review fix avoided an + # MD5-rehash storm per scheduler tick. + assert stats2["sessions_processed"] == 0 + assert stats2["scanned"] == 0 + assert stats2["items_created"] == 0 conn.close() def test_mixed_session_multiple_items(self, tmp_path, monkeypatch): conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor golden = _load_golden("mixed_session") extractor = _mock_extractor(golden) @@ -560,28 +611,12 @@ class TestVerificationDetectorIntegration: assert len(items) == 2 conn.close() - def test_dry_run_no_writes(self, tmp_path, monkeypatch): - conn = _fresh_db(tmp_path, monkeypatch) - from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run - - golden = _load_golden("correction_churn_metric") - extractor = _mock_extractor(golden) - - session_dir = tmp_path / "user_sessions" / "alice" - session_dir.mkdir(parents=True) - import shutil - shutil.copy(SESSIONS_DIR / "correction_churn_metric.jsonl", session_dir / "s1.jsonl") - - stats = run(conn, extractor, dry_run=True, session_data_dir=tmp_path / "user_sessions") - - assert stats["verifications_extracted"] == 1 - assert stats["items_created"] == 0 # dry run - - repo = KnowledgeRepository(conn) - items = repo.list_items(source_type="user_verification") - assert len(items) == 0 - conn.close() + # The legacy `dry_run` flag was dropped in the session-pipeline refactor — + # there is no equivalent in the new framework. The runner always persists + # state on success; the only way to observe a "what would happen" output + # is to wrap the processor in a transaction-rolling-back fixture, which + # is more trouble than the test was worth (it only validated a flag that + # had one in-tree caller — the dropped CLI shim). class TestContradictionDetectionIntegration: @@ -860,7 +895,7 @@ class TestDetectorIgnoresLLMConfidence: def test_llm_returned_base_confidence_is_overridden(self, tmp_path, monkeypatch): conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor # Hostile golden: LLM tries to claim confidence=0.99 on a confirmation # (which should be 0.60 in code). @@ -904,7 +939,7 @@ class TestDetectorIgnoresLLMConfidence: accepting an LLM-supplied number.""" conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor hallucinated = { "verifications": [{ @@ -939,7 +974,7 @@ class TestDetectorPersistsEvidence: def test_evidence_row_created_per_verification(self, tmp_path, monkeypatch): conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor golden = _load_golden("correction_churn_metric") extractor = _mock_extractor(golden) @@ -970,7 +1005,7 @@ class TestDetectorPersistsEvidence: their respective items. Each row carries its own user_quote.""" conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor golden = _load_golden("mixed_session") extractor = _mock_extractor(golden) @@ -1007,7 +1042,7 @@ class TestDetectorPersistsEvidence: import shutil conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor repo = KnowledgeRepository(conn) golden = _load_golden("correction_churn_metric") @@ -1051,7 +1086,7 @@ class TestDetectorWiresContradictionDetection: def test_contradiction_recorded_when_judge_says_yes(self, tmp_path, monkeypatch): conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor from unittest.mock import MagicMock repo = KnowledgeRepository(conn) @@ -1108,7 +1143,7 @@ class TestDetectorWiresContradictionDetection: """Judge returns contradicts=false → item still created, contradictions_recorded=0.""" conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor from unittest.mock import MagicMock repo = KnowledgeRepository(conn) @@ -1161,7 +1196,7 @@ class TestDetectorWiresContradictionDetection: judge is degraded mode, not a fatal error.""" conn = _fresh_db(tmp_path, monkeypatch) from src.repositories.knowledge import KnowledgeRepository - from services.verification_detector.detector import run + run = _run_verification_processor from connectors.llm.exceptions import LLMError from unittest.mock import MagicMock @@ -1200,7 +1235,11 @@ class TestDetectorWiresContradictionDetection: assert stats["contradictions_recorded"] == 0 assert stats["sessions_processed"] == 1 # Session is marked processed so we don't re-run on next sweep. - assert repo.is_session_processed("alice/s.jsonl") is True + from services.session_pipeline.lib import compute_file_hash + from src.repositories.session_processor_state import SessionProcessorStateRepository + state_repo = SessionProcessorStateRepository(conn) + h = compute_file_hash(session_dir / "s.jsonl") + assert state_repo.is_processed("verification", "alice/s.jsonl", h) is True conn.close() diff --git a/tests/test_db_schema_version.py b/tests/test_db_schema_version.py index db9c73c..7feddd6 100644 --- a/tests/test_db_schema_version.py +++ b/tests/test_db_schema_version.py @@ -13,7 +13,7 @@ import duckdb from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version -def test_schema_version_is_30(): +def test_schema_version_is_31(): # v27 → v28: explicit-install (Model B) for curated marketplace plugins. # user_plugin_optouts row presence flips meaning from "excluded" to # "subscribed"; migration wipes existing rows so the inverted reading @@ -27,7 +27,12 @@ def test_schema_version_is_30(): # v29 → v30: news_template — single versioned table for the /home # news perex + /news permalink page. See # tests/test_news_template_repository.py. - assert SCHEMA_VERSION == 30 + # v30 → v31: session-pipeline framework. Renames session_extraction_state + # → session_processor_state with composite PK (processor_name, + # session_file) so multiple processors can track their own + # processed-set independently. Existing rows are copied across with + # processor_name='verification'; the old table is dropped. + assert SCHEMA_VERSION == 31 def test_v20_adds_source_query(tmp_path): diff --git a/tests/test_health_session_pipeline.py b/tests/test_health_session_pipeline.py index 053d746..9c5bcfd 100644 --- a/tests/test_health_session_pipeline.py +++ b/tests/test_health_session_pipeline.py @@ -1,11 +1,12 @@ """Health-check coverage for the session pipeline (#176). GET /api/health/detailed must surface a `session_pipeline` service entry -that warns when freshly-uploaded session jsonls aren't being processed. +that warns when freshly-uploaded session jsonls aren't being processed +by the verification processor. Heuristic: max(mtime of /data/user_sessions/**/*.jsonl) <= - max(processed_at in session_extraction_state) + grace + max(processed_at in session_processor_state where processor='verification') + grace Where grace = 2 * scheduler verification-detector cadence (default 15m). @@ -29,15 +30,15 @@ def _auth(token: str) -> dict: def _seed_extraction_state(processed_at: datetime, session_file: str = "/data/user_sessions/x/y.jsonl"): - """Insert a synthetic row into session_extraction_state.""" + """Insert a synthetic verification-processor state row.""" from src.db import get_system_db conn = get_system_db() conn.execute( - "INSERT OR REPLACE INTO session_extraction_state " - "(session_file, username, processed_at, items_extracted, file_hash) " - "VALUES (?, ?, ?, ?, ?)", - [session_file, "x", processed_at, 0, "deadbeef"], + "INSERT OR REPLACE INTO session_processor_state " + "(processor_name, session_file, username, processed_at, items_extracted, file_hash) " + "VALUES (?, ?, ?, ?, ?, ?)", + ["verification", session_file, "x", processed_at, 0, "deadbeef"], ) conn.close() @@ -97,7 +98,7 @@ class TestSessionPipelineHealthCheck: assert body["status"] == "degraded" def test_session_files_never_processed_returns_warning(self, seeded_app): - """Files exist but session_extraction_state is empty → warning.""" + """Files exist but verification has no rows in session_processor_state → warning.""" env = seeded_app["env"] _make_session_file(env["data_dir"], "neverprocessed.jsonl", mtime_ago_seconds=7200) diff --git a/tests/test_instance_config_overlay.py b/tests/test_instance_config_overlay.py index 91120e2..66e3f90 100644 --- a/tests/test_instance_config_overlay.py +++ b/tests/test_instance_config_overlay.py @@ -10,8 +10,8 @@ Two paths to a working LLM pipeline must both function: Path 2 used to be dead code: the three LLM consumers (``services.corporate_memory.collector.collect_all``, -``app.api.admin.run_verification_detector`` and -``services.verification_detector.__main__``) imported from +``services.session_processors.verification.build_verification_processor`` +and ``services.verification_detector.__main__``) imported from ``config.loader.load_instance_config`` (overlay-blind), and even if they hadn't, ``app.instance_config.load_instance_config`` deep-merged the overlay through raw ``yaml.safe_load`` without resolving ``${ENV_VAR}`` @@ -135,26 +135,37 @@ class TestConsumersUseOverlayAwareLoader: assert "from app.instance_config import load_instance_config" in src assert "from config.loader import load_instance_config" not in src - def test_admin_run_verification_detector_uses_overlay_loader(self): - """``run_verification_detector`` imports the overlay-aware loader.""" + def test_verification_processor_factory_uses_overlay_loader(self): + """``build_verification_processor`` imports the overlay-aware loader. + + Post session-pipeline refactor the LLM extractor is constructed by + services.session_processors.verification.build_verification_processor + rather than inline in the admin endpoint.""" import inspect - from app.api.admin import run_verification_detector + from services.session_processors.verification import build_verification_processor - src = inspect.getsource(run_verification_detector) + src = inspect.getsource(build_verification_processor) assert "from app.instance_config import load_instance_config" in src assert "from config.loader import load_instance_config" not in src - def test_verification_detector_main_uses_overlay_loader(self): - """The verification-detector CLI main reads through the overlay.""" + def test_verification_detector_main_delegates_to_overlay_factory(self): + """The verification-detector CLI main reads through the overlay. + + Post session-pipeline refactor it does so by delegating to + ``build_verification_processor`` (which is itself overlay-aware, + verified by ``test_verification_processor_factory_uses_overlay_loader``) + rather than calling the loader inline. Pin the delegation so a + future "simplify" refactor doesn't accidentally bypass the factory + and re-introduce direct ``config.loader`` usage.""" import inspect from services.verification_detector import __main__ as vd_main src = inspect.getsource(vd_main) - assert "from app.instance_config import load_instance_config" in src - # config.loader may legitimately appear in other contexts in this - # module someday; keep the assertion narrow to the same statement. + assert "build_verification_processor" in src + # Whichever loader the CLI ends up calling, it must NOT be the + # overlay-blind one from config.loader. assert "from config.loader import load_instance_config" not in src diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index dcb792c..b8c4beb 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -425,7 +425,7 @@ class TestLLMPipelineCadenceEnvVars: from services.scheduler.__main__ import build_jobs jobs = {name: schedule for name, schedule, *_ in build_jobs()} assert jobs["session-collector"] == "every 10m" - assert jobs["verification-detector"] == "every 15m" + assert jobs["session-processor:verification"] == "every 15m" assert jobs["corporate-memory"] == "every 17m" def test_session_collector_env_override_changes_cadence(self, monkeypatch) -> None: @@ -435,7 +435,7 @@ class TestLLMPipelineCadenceEnvVars: jobs = {name: schedule for name, schedule, *_ in build_jobs()} assert jobs["session-collector"] == "every 5m" # Other LLM jobs must be unaffected. - assert jobs["verification-detector"] == "every 15m" + assert jobs["session-processor:verification"] == "every 15m" assert jobs["corporate-memory"] == "every 17m" def test_verification_detector_env_override_changes_cadence(self, monkeypatch) -> None: @@ -443,7 +443,7 @@ class TestLLMPipelineCadenceEnvVars: monkeypatch.setenv("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL", "600") # 10m from services.scheduler.__main__ import build_jobs jobs = {name: schedule for name, schedule, *_ in build_jobs()} - assert jobs["verification-detector"] == "every 10m" + assert jobs["session-processor:verification"] == "every 10m" assert jobs["session-collector"] == "every 10m" assert jobs["corporate-memory"] == "every 17m" @@ -454,7 +454,7 @@ class TestLLMPipelineCadenceEnvVars: jobs = {name: schedule for name, schedule, *_ in build_jobs()} assert jobs["corporate-memory"] == "every 30m" assert jobs["session-collector"] == "every 10m" - assert jobs["verification-detector"] == "every 15m" + assert jobs["session-processor:verification"] == "every 15m" @pytest.mark.parametrize("var", [ "SCHEDULER_SESSION_COLLECTOR_INTERVAL", @@ -484,7 +484,7 @@ class TestVerificationDetectorGraceFollowsCadence: # operator who throttles the detector for any reason (rate-limit, # cost, debugging) gets a proportionally wider staleness window # automatically — no second knob to forget. - assert jobs["verification-detector"] == "every 10m" + assert jobs["session-processor:verification"] == "every 10m" assert _verification_detector_grace_seconds() == 2 * 600 def test_grace_uses_default_cadence_when_env_unset(self, monkeypatch) -> None: @@ -492,3 +492,124 @@ class TestVerificationDetectorGraceFollowsCadence: from app.api.health import _verification_detector_grace_seconds # Default cadence 900s -> grace 1800s. assert _verification_detector_grace_seconds() == 2 * 900 + + +# --------------------------------------------------------------------------- +# services/scheduler/__main__._run_job — terminal-state bookkeeping +# --------------------------------------------------------------------------- + + +class TestRunJobBookkeeping: + """Per-job worker that advances last_run + clears in_flight on terminal + state (success OR failure). Pre-fix: last_run only advanced on success, + causing permanently failing jobs to retry every tick (30s) instead of + on cadence (15min). PR #232 review fix.""" + + def _setup(self): + import threading + last_run: dict[str, str | None] = {"verification": None} + in_flight: set[str] = {"verification"} + return last_run, in_flight, threading.Lock() + + def test_advances_last_run_on_success(self, monkeypatch): + from services.scheduler import __main__ as sched + last_run, in_flight, lock = self._setup() + monkeypatch.setattr(sched, "_call_api", lambda *a, **kw: True) + + sched._run_job( + "verification", "/api/admin/run-x", "POST", 60, "2026-01-01T00:00:00", + last_run, in_flight, lock, + ) + assert last_run["verification"] == "2026-01-01T00:00:00" + assert "verification" not in in_flight + + def test_advances_last_run_on_failure(self, monkeypatch): + """Permanently-failing jobs must NOT hot-loop every tick — last_run + advances even when _call_api returns False.""" + from services.scheduler import __main__ as sched + last_run, in_flight, lock = self._setup() + monkeypatch.setattr(sched, "_call_api", lambda *a, **kw: False) + + sched._run_job( + "verification", "/api/admin/run-x", "POST", 60, "2026-01-01T00:00:00", + last_run, in_flight, lock, + ) + assert last_run["verification"] == "2026-01-01T00:00:00" + assert "verification" not in in_flight + + def test_advances_last_run_when_call_raises(self, monkeypatch): + """`_call_api` catches its own exceptions and returns False, but a + synchronous bug above it (e.g. KeyError on jobs tuple unpacking) + could still bubble. The finally block must release in_flight either + way, otherwise the processor wedges until container restart.""" + from services.scheduler import __main__ as sched + last_run, in_flight, lock = self._setup() + + def _boom(*a, **kw): + raise RuntimeError("simulated unhandled scheduler bug") + + monkeypatch.setattr(sched, "_call_api", _boom) + + with pytest.raises(RuntimeError): + sched._run_job( + "verification", "/api/admin/run-x", "POST", 60, "2026-01-01T00:00:00", + last_run, in_flight, lock, + ) + # Even on raise, bookkeeping ran. + assert last_run["verification"] == "2026-01-01T00:00:00" + assert "verification" not in in_flight + + +class TestRunLoopParallelism: + """The scheduler tick must dispatch jobs in parallel — a 900s verification + run cannot block the 60s health-check from firing on its own cadence. + PR #232 review fix replaces the `for-loop + synchronous _call_api` with + a `ThreadPoolExecutor.submit` per due job.""" + + def test_in_flight_skip_prevents_duplicate_launches(self, monkeypatch): + """When a previous tick's job hasn't returned yet, the next tick + must NOT submit it again — otherwise a 10-min run during which + 20 ticks fire would queue 20 duplicate POSTs against the same + processor (the admin endpoint's per-processor lock would 409 most + of them, but they'd still be wasted requests + audit-log noise).""" + import threading + import time as _time + from services.scheduler import __main__ as sched + + # Single job that takes ~0.3s. Tick is 0.05s. Without in_flight + # protection we'd see >5 launches per the run loop's tick budget. + call_count = {"n": 0} + call_count_lock = threading.Lock() + + def slow_call(*a, **kw): + with call_count_lock: + call_count["n"] += 1 + _time.sleep(0.3) + return True + + monkeypatch.setattr(sched, "_call_api", slow_call) + # Force a single short-cadence job + short tick. + monkeypatch.setattr( + sched, "build_jobs", + lambda: [("test-job", "every 1m", "/api/test", "POST", 60)], + ) + monkeypatch.setattr(sched, "resolved_tick_seconds", lambda: 0) + # Always-due so the in_flight check is what gates the second launch. + monkeypatch.setattr(sched, "is_table_due", lambda *a, **kw: True) + + # Kill the run loop after 0.4s — long enough for ≥5 ticks under + # the 0s tick budget, short enough that the job (0.3s) hasn't + # finished its first invocation yet. + sched._running = True + + def _kill(): + _time.sleep(0.4) + sched._running = False + + threading.Thread(target=_kill, daemon=True).start() + sched.run() + + # Without in_flight: ≥5 launches. With: exactly 1 (or maybe 2 if + # the first one finished mid-tick — both are correct, the bug is + # ≥5). + assert call_count["n"] <= 2, f"in_flight protection failed; {call_count['n']} launches" diff --git a/tests/test_session_pipeline.py b/tests/test_session_pipeline.py new file mode 100644 index 0000000..1b46256 --- /dev/null +++ b/tests/test_session_pipeline.py @@ -0,0 +1,489 @@ +"""Tests for the session-pipeline framework (services/session_pipeline/). + +Covers: +- Pure utility functions (parse_jsonl, compute_file_hash) and their behavior on + edge cases (malformed lines, file changes). +- SessionProcessorStateRepository CRUD on a fresh in-memory schema. +- run_processor end-to-end with fake processors covering success, raise, + empty-result, and file-hash-invalidation paths. +- v29 migration: existing session_extraction_state rows are copied to + session_processor_state with processor_name='verification' and the old + table is dropped. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import duckdb +import pytest + +from services.session_pipeline.contract import ProcessorResult +from services.session_pipeline.lib import compute_file_hash, parse_jsonl +from services.session_pipeline.runner import run_processor +from src.repositories.session_processor_state import SessionProcessorStateRepository + + +def _fresh_db(tmp_path, monkeypatch) -> duckdb.DuckDBPyConnection: + """Same idiom as tests/test_corporate_memory_v1.py — fresh schema in tmp_path.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + import src.db as db_module + db_module._system_db_conn = None + db_module._system_db_path = None + return db_module.get_system_db() + + +# --------------------------------------------------------------------------- +# parse_jsonl +# --------------------------------------------------------------------------- + +class TestParseJsonl: + def test_parses_well_formed_lines(self, tmp_path): + f = tmp_path / "session.jsonl" + f.write_text( + json.dumps({"role": "user", "content": "hi"}) + "\n" + + json.dumps({"role": "assistant", "content": "hello"}) + "\n" + ) + turns = parse_jsonl(f) + assert len(turns) == 2 + assert turns[0]["role"] == "user" + assert turns[1]["content"] == "hello" + + def test_skips_malformed_lines(self, tmp_path): + """Same behavior as pre-refactor verification_detector.parse_session — + a single corrupt row mustn't abort processing of the rest.""" + f = tmp_path / "session.jsonl" + f.write_text( + json.dumps({"role": "user", "content": "ok"}) + "\n" + + "this is not json\n" + + json.dumps({"role": "assistant", "content": "still ok"}) + "\n" + ) + turns = parse_jsonl(f) + assert len(turns) == 2 + assert turns[0]["content"] == "ok" + assert turns[1]["content"] == "still ok" + + def test_skips_blank_lines(self, tmp_path): + f = tmp_path / "session.jsonl" + f.write_text( + "\n" + + json.dumps({"role": "user", "content": "x"}) + "\n" + + " \n" + ) + turns = parse_jsonl(f) + assert len(turns) == 1 + + +# --------------------------------------------------------------------------- +# compute_file_hash +# --------------------------------------------------------------------------- + +class TestComputeFileHash: + def test_deterministic(self, tmp_path): + f = tmp_path / "x.jsonl" + f.write_text("hello world") + assert compute_file_hash(f) == compute_file_hash(f) + + def test_changes_with_content(self, tmp_path): + f = tmp_path / "x.jsonl" + f.write_text("v1") + h1 = compute_file_hash(f) + f.write_text("v2") + h2 = compute_file_hash(f) + assert h1 != h2 + + +# --------------------------------------------------------------------------- +# SessionProcessorStateRepository +# --------------------------------------------------------------------------- + +class TestSessionProcessorStateRepository: + def test_unprocessed_when_empty(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + repo = SessionProcessorStateRepository(conn) + assert repo.is_processed("verification", "alice/s.jsonl", "abc") is False + conn.close() + + def test_mark_then_is_processed(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + repo = SessionProcessorStateRepository(conn) + repo.mark_processed("verification", "alice/s.jsonl", "alice", 3, "abc") + assert repo.is_processed("verification", "alice/s.jsonl", "abc") is True + conn.close() + + def test_independent_per_processor(self, tmp_path, monkeypatch): + """Two processors track the same session independently — usage might be + done while verification still has work.""" + conn = _fresh_db(tmp_path, monkeypatch) + repo = SessionProcessorStateRepository(conn) + repo.mark_processed("usage", "alice/s.jsonl", "alice", 0, "abc") + assert repo.is_processed("usage", "alice/s.jsonl", "abc") is True + assert repo.is_processed("verification", "alice/s.jsonl", "abc") is False + conn.close() + + def test_hash_mismatch_treated_as_unprocessed(self, tmp_path, monkeypatch): + """When a session jsonl grows (live append from active Claude Code), + the stored file_hash no longer matches → processor gets to reprocess.""" + conn = _fresh_db(tmp_path, monkeypatch) + repo = SessionProcessorStateRepository(conn) + repo.mark_processed("verification", "alice/s.jsonl", "alice", 1, "old_hash") + assert repo.is_processed("verification", "alice/s.jsonl", "new_hash") is False + conn.close() + + def test_mark_upserts_on_re_run(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + repo = SessionProcessorStateRepository(conn) + repo.mark_processed("verification", "alice/s.jsonl", "alice", 1, "h1") + repo.mark_processed("verification", "alice/s.jsonl", "alice", 5, "h2") + row = conn.execute( + "SELECT items_extracted, file_hash FROM session_processor_state WHERE processor_name=? AND session_file=?", + ["verification", "alice/s.jsonl"], + ).fetchone() + assert row == (5, "h2") + conn.close() + + def test_scan_unprocessed_returns_all_when_empty_state(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + (sessions / "alice").mkdir(parents=True) + (sessions / "alice" / "s1.jsonl").write_text("{}") + (sessions / "alice" / "s2.jsonl").write_text("{}") + repo = SessionProcessorStateRepository(conn) + results = repo.scan_unprocessed_for("verification", sessions) + keys = sorted([f"{u}/{p.name}" for u, p in results]) + assert keys == ["alice/s1.jsonl", "alice/s2.jsonl"] + conn.close() + + def test_scan_skips_non_directory_entries(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + sessions.mkdir() + (sessions / "stray.txt").write_text("not a user dir") + (sessions / "alice").mkdir() + (sessions / "alice" / "s.jsonl").write_text("{}") + repo = SessionProcessorStateRepository(conn) + results = repo.scan_unprocessed_for("verification", sessions) + assert len(results) == 1 + assert results[0][0] == "alice" + conn.close() + + def test_scan_filters_stable_sessions_via_mtime(self, tmp_path, monkeypatch): + """Files with mtime <= processed_at are filtered at scan — the + runner never sees them and never hashes them. PR #232 review fix: + before the mtime precheck, every stable session was rehashed on + every scheduler tick.""" + import os + import time + from datetime import datetime, timezone + + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + (sessions / "alice").mkdir(parents=True) + stable = sessions / "alice" / "stable.jsonl" + stable.write_text("{}\n") + # Force mtime well in the past so we can set processed_at to "now" + # and have the precheck reliably skip. + old = time.time() - 3600 + os.utime(stable, (old, old)) + + repo = SessionProcessorStateRepository(conn) + repo.mark_processed("verification", "alice/stable.jsonl", "alice", 1, "h1") + + results = repo.scan_unprocessed_for("verification", sessions) + assert results == [], "stable session must be filtered at scan" + + # New file alongside it surfaces — not in state at all. + new_file = sessions / "alice" / "new.jsonl" + new_file.write_text("{}\n") + results = repo.scan_unprocessed_for("verification", sessions) + assert [str(p.name) for _, p in results] == ["new.jsonl"] + conn.close() + + def test_scan_surfaces_session_modified_after_processing(self, tmp_path, monkeypatch): + """File touched after processed_at — likely a Claude Code live append — + must come back through scan so the runner can hash + decide.""" + import os + import time + from datetime import datetime, timezone + + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + (sessions / "alice").mkdir(parents=True) + f = sessions / "alice" / "live.jsonl" + f.write_text("{}\n") + + repo = SessionProcessorStateRepository(conn) + # Mark processed at past time, then bump the file mtime to "now" + # to simulate a post-processing append. + past = datetime.now(timezone.utc).replace(microsecond=0) + conn.execute( + "INSERT INTO session_processor_state VALUES (?, ?, ?, ?, ?, ?)", + ["verification", "alice/live.jsonl", "alice", past, 0, "h1"], + ) + future = time.time() + 60 + os.utime(f, (future, future)) + + results = repo.scan_unprocessed_for("verification", sessions) + assert [str(p.name) for _, p in results] == ["live.jsonl"] + conn.close() + + +# --------------------------------------------------------------------------- +# run_processor +# --------------------------------------------------------------------------- + +class _FakeProcessor: + """Test double that records its calls and is configurable per behavior.""" + + def __init__( + self, + name: str = "fake", + cadence_minutes: int = 10, + return_value: ProcessorResult | None = None, + raise_on_session: str | None = None, + ): + self.name = name + self.cadence_minutes = cadence_minutes + self.return_value = return_value if return_value is not None else ProcessorResult(items_count=0) + self.raise_on_session = raise_on_session + self.calls: list[str] = [] + + def process_session(self, session_path: Path, username: str, session_key: str, conn): + self.calls.append(session_key) + if self.raise_on_session is not None and session_key == self.raise_on_session: + raise RuntimeError("simulated processor failure") + return self.return_value + + +def _seed_session(sessions_dir: Path, username: str, name: str, content: str = "{}\n") -> Path: + user_dir = sessions_dir / username + user_dir.mkdir(parents=True, exist_ok=True) + path = user_dir / name + path.write_text(content) + return path + + +class TestRunProcessor: + def test_processed_then_skipped_on_second_call(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + _seed_session(sessions, "alice", "s.jsonl") + + proc = _FakeProcessor(return_value=ProcessorResult(items_count=2)) + + stats1 = run_processor(conn, proc, session_data_dir=sessions) + assert stats1["processed"] == 1 + assert stats1["items_extracted"] == 2 + assert proc.calls == ["alice/s.jsonl"] + + stats2 = run_processor(conn, proc, session_data_dir=sessions) + # Stable session (mtime <= processed_at) is filtered at scan, so the + # runner never sees it — `scanned == 0`, not `skipped == 1`. The + # earlier shape (return-everything-then-runner-skips) caused an + # MD5-rehash storm per tick (PR #232 review fix). + assert stats2["processed"] == 0 + assert stats2["scanned"] == 0 + assert proc.calls == ["alice/s.jsonl"] # not invoked again + conn.close() + + def test_raise_leaves_state_unwritten(self, tmp_path, monkeypatch): + """A processor that raises must not be marked as processed — the runner + retries the same session on the next tick.""" + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + _seed_session(sessions, "alice", "s.jsonl") + + proc = _FakeProcessor(raise_on_session="alice/s.jsonl") + + stats = run_processor(conn, proc, session_data_dir=sessions) + assert stats["errors"] == 1 + assert stats["processed"] == 0 + + # State row absent: next call sees the session again. + repo = SessionProcessorStateRepository(conn) + assert repo.is_processed(proc.name, "alice/s.jsonl", "anything") is False + + # Second call retries. + proc.raise_on_session = None # this time succeed + stats2 = run_processor(conn, proc, session_data_dir=sessions) + assert stats2["processed"] == 1 + conn.close() + + def test_empty_result_marks_processed(self, tmp_path, monkeypatch): + """0 items extracted is a valid outcome — UsageProcessor skeleton + relies on this so its no-op runs aren't re-scanned every tick.""" + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + _seed_session(sessions, "bob", "s.jsonl") + + proc = _FakeProcessor(return_value=ProcessorResult(items_count=0)) + + stats1 = run_processor(conn, proc, session_data_dir=sessions) + assert stats1["processed"] == 1 + assert stats1["items_extracted"] == 0 + + stats2 = run_processor(conn, proc, session_data_dir=sessions) + # Filtered at scan via mtime precheck — see test_processed_then_skipped_on_second_call. + assert stats2["processed"] == 0 + assert stats2["scanned"] == 0 + conn.close() + + def test_file_hash_invalidates_state(self, tmp_path, monkeypatch): + """When a session jsonl grows (Claude Code live-appends to an active + session), the stored hash no longer matches → reprocessed.""" + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + path = _seed_session(sessions, "alice", "s.jsonl", content="line1\n") + + proc = _FakeProcessor(return_value=ProcessorResult(items_count=1)) + + stats1 = run_processor(conn, proc, session_data_dir=sessions) + assert stats1["processed"] == 1 + + # Mutate the file → new hash → reprocessed on next call. + path.write_text("line1\nline2\n") + stats2 = run_processor(conn, proc, session_data_dir=sessions) + assert stats2["processed"] == 1 + assert proc.calls == ["alice/s.jsonl", "alice/s.jsonl"] + conn.close() + + def test_processors_isolated(self, tmp_path, monkeypatch): + """Two processors on the same session work independently — what one + marked, the other still has to do.""" + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + _seed_session(sessions, "alice", "s.jsonl") + + proc_a = _FakeProcessor(name="a") + proc_b = _FakeProcessor(name="b") + + run_processor(conn, proc_a, session_data_dir=sessions) + run_processor(conn, proc_b, session_data_dir=sessions) + + assert proc_a.calls == ["alice/s.jsonl"] + assert proc_b.calls == ["alice/s.jsonl"] + conn.close() + + def test_no_sessions_dir_returns_clean_stats(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + proc = _FakeProcessor() + stats = run_processor(conn, proc, session_data_dir=tmp_path / "does_not_exist") + assert stats["scanned"] == 0 + assert stats["processed"] == 0 + assert stats["errors"] == 0 + conn.close() + + def test_non_processor_result_return_coerced(self, tmp_path, monkeypatch): + """A processor that returns the wrong type must not poison the state + write — the runner coerces it to an empty result and still marks the + session processed (alternative: retry forever).""" + conn = _fresh_db(tmp_path, monkeypatch) + sessions = tmp_path / "sessions" + _seed_session(sessions, "alice", "s.jsonl") + + class _BadReturn: + name = "bad" + cadence_minutes = 1 + def process_session(self, *a, **kw): + return None # type: ignore[return-value] + + stats = run_processor(conn, _BadReturn(), session_data_dir=sessions) + assert stats["processed"] == 1 + assert stats["items_extracted"] == 0 + conn.close() + + +# --------------------------------------------------------------------------- +# v29 migration — verification rows preserved, old table dropped +# --------------------------------------------------------------------------- + +class TestV29Migration: + """Exercise the v28 → v29 migration directly. Builds a v28 schema (using + the pre-v29 idiom inline so the test doesn't depend on _SYSTEM_SCHEMA's + current shape), seeds data, runs the v29 migrations, asserts the result. + """ + + def test_existing_rows_become_verification_processor_rows(self, tmp_path): + conn = duckdb.connect(":memory:") + # Recreate the pre-v29 table shape — single-key session_file PK. + conn.execute( + """ + CREATE TABLE session_extraction_state ( + session_file VARCHAR PRIMARY KEY, + username VARCHAR NOT NULL, + processed_at TIMESTAMP DEFAULT current_timestamp, + items_extracted INTEGER DEFAULT 0, + file_hash VARCHAR + ) + """ + ) + conn.execute( + "INSERT INTO session_extraction_state VALUES (?, ?, ?, ?, ?)", + ["alice/s1.jsonl", "alice", "2026-01-01 00:00:00", 3, "abc"], + ) + + # Run v29 migration steps via the helper (which conditionally copies + # from the legacy table when present). + from src.db import _v30_to_v31_migrate + _v30_to_v31_migrate(conn) + + # New table has the row tagged with processor_name='verification'. + rows = conn.execute( + "SELECT processor_name, session_file, username, items_extracted, file_hash " + "FROM session_processor_state ORDER BY session_file" + ).fetchall() + assert rows == [("verification", "alice/s1.jsonl", "alice", 3, "abc")] + + # Old table is gone. + existing = { + r[0] for r in conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema='main'" + ).fetchall() + } + assert "session_extraction_state" not in existing + assert "session_processor_state" in existing + conn.close() + + def test_migration_idempotent_when_new_table_exists(self, tmp_path): + """Fresh installs run _SYSTEM_SCHEMA (which already has session_processor_state) + AND the migration ladder. The v29 migration must not crash if the new + table already exists empty.""" + conn = duckdb.connect(":memory:") + # Pre-create both tables (simulating fresh install + ladder rerun). + conn.execute( + """ + CREATE TABLE session_extraction_state ( + session_file VARCHAR PRIMARY KEY, + username VARCHAR NOT NULL, + processed_at TIMESTAMP, + items_extracted INTEGER, + file_hash VARCHAR + ) + """ + ) + conn.execute( + """ + CREATE TABLE session_processor_state ( + processor_name VARCHAR NOT NULL, + session_file VARCHAR NOT NULL, + username VARCHAR NOT NULL, + processed_at TIMESTAMP, + items_extracted INTEGER, + file_hash VARCHAR, + PRIMARY KEY (processor_name, session_file) + ) + """ + ) + + from src.db import _v30_to_v31_migrate + _v30_to_v31_migrate(conn) + + existing = { + r[0] for r in conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema='main'" + ).fetchall() + } + assert "session_extraction_state" not in existing + assert "session_processor_state" in existing + conn.close() diff --git a/tests/test_web_ui.py b/tests/test_web_ui.py index e1521f9..33c0072 100644 --- a/tests/test_web_ui.py +++ b/tests/test_web_ui.py @@ -336,7 +336,11 @@ class TestAdminRoleGuards: r = web_client.get("/admin/scheduler-runs", cookies=admin_cookie, follow_redirects=False) assert r.status_code == 200 assert b"run_session_collector" in r.content - assert b"run_verification_detector" in r.content + # Post-refactor: per-processor audit actions instead of one + # run_verification_detector. Both processors are wired in + # SCHEDULER_AUDIT_ACTIONS. + assert b"run_session_processor:verification" in r.content + assert b"run_session_processor:usage" in r.content assert b"run_corporate_memory" in r.content # Devin Review on e86dd5ed: list must use the actual logged action # string, not a guess.