agnes-the-ai-analyst/services/session_pipeline/runner.py
minasarustamyan e26236fdc1
Extract session-pipeline framework + UsageProcessor skeleton (#232)
* Extract session pipeline framework, refactor verification, add UsageProcessor skeleton

Pluggable framework under services/session_pipeline/ (contract + lib + per-processor
runner) so multiple processors can read /data/user_sessions/<key>/*.jsonl on their
own cadence with full failure isolation. Verification flow becomes the first plugin;
a no-op UsageProcessor reserves the second slot pending a separate brainstorm on
extraction logic + storage shape.

Schema v28→v29: rename session_extraction_state → session_processor_state with
composite PK (processor_name, session_file). Existing rows copied over with
processor_name='verification'; legacy table dropped. Migration is idempotent and
no-ops the copy step on fresh installs that came up at the new schema.

Endpoint: /api/admin/run-verification-detector replaced by parametrized
/api/admin/run-session-processor?processor=<name>. Audit action format follows.
Scheduler JOBS: verification-detector entry split into session-processor:verification
+ session-processor:usage. SCHEDULER_VERIFICATION_DETECTOR_INTERVAL retained for
operator compatibility (drives both cadence and health-check grace window);
SCHEDULER_USAGE_PROCESSOR_INTERVAL added.

* Address PR #232 review: scan dead branch + per-processor lock

- `SessionProcessorStateRepository.scan_unprocessed_for` dead else: both
  branches surfaced every jsonl, the SELECT was unused, runner MD5-rehashed
  every stable session per tick. Replaced with an mtime precheck — stable
  sessions (mtime <= processed_at) are filtered at scan; modified files
  still surface for the runner's authoritative `file_hash` invalidation.
  Naive-local comparison matches the existing health-check idiom (DuckDB
  TIMESTAMP strips tz on storage).

- Per-processor advisory lock around `_run_processor` in
  `/api/admin/run-session-processor`. Scheduler tick + manual admin POST
  could otherwise both run, both call create_evidence on overlapping
  detections, and accumulate duplicate verification_evidence rows (the
  dedup short-circuit only covers create+contradiction, not evidence per
  ADR Decision 3). Non-blocking acquire → 409 Conflict on concurrent
  invocation; release in finally so a runner exception doesn't wedge the
  processor.

Tests: two new scan unit tests (mtime filter + post-mark mtime bump), 409
endpoint test, lock-released-on-exception test. Two existing tests updated
for the new "filtered at scan" stat shape (previously asserted skipped == 1,
now scanned == 0).

* Address PR #232 review #2: parallel scheduler tick + last_run on terminal state

Two pre-existing scaffold bugs in services/scheduler/__main__.py amplified
by adding more session-pipeline jobs:

1. Serial for-loop over jobs with synchronous httpx.post(timeout=900) — a
   10-minute verification run blocked every other job (data-refresh,
   health-check, usage, corporate-memory) for the whole window. The PR's
   stated isolation guarantee held inside the runner but broke at the
   scheduler dispatch layer.

2. last_run advanced only when _call_api returned True. Permanent-failure
   jobs hot-looped on every tick (30s) instead of cadence (15min).

Fix: ThreadPoolExecutor.submit per due job + per-job in_flight set so a
long-running job can't be re-launched on subsequent ticks. last_run
advances unconditionally in finally; errors still surface via _call_api
logging + audit_log on the receiving side.

_run_job extracted to module-level for unit testing. New tests:
- TestRunJobBookkeeping: advances on success / failure / unhandled raise
- TestRunLoopParallelism: in_flight protection prevents duplicate
  launches across ticks for a single slow job

---------

Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
2026-05-08 19:47:46 +02:00

127 lines
4.7 KiB
Python

"""Per-processor runner — drives one SessionProcessor across all unprocessed
sessions in /data/user_sessions/. Each processor is invoked independently
(one call to run_processor per scheduler tick per processor); there is no
cross-processor coupling.
Failure handling mirrors the pre-refactor verification_detector behavior:
per-session try/except, on raise the state row is NOT written → the same
session will be retried on the next tick. There is no max_retries / dead
letter. A permanently malformed session will retry forever; that is a
known limitation we may revisit (out of scope for this refactor).
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Any
import duckdb
from services.session_pipeline.contract import ProcessorResult, SessionProcessor
from services.session_pipeline.lib import compute_file_hash
from src.repositories.session_processor_state import SessionProcessorStateRepository
logger = logging.getLogger(__name__)
DEFAULT_SESSION_DATA_DIR = Path(os.environ.get("SESSION_DATA_DIR", "/data/user_sessions"))
def run_processor(
conn: duckdb.DuckDBPyConnection,
processor: SessionProcessor,
session_data_dir: Path | None = None,
) -> dict[str, Any]:
"""Run *processor* against every unprocessed session in
*session_data_dir* (defaults to $SESSION_DATA_DIR or /data/user_sessions).
Returns a stats dict with: scanned, processed, skipped, errors,
items_extracted, errors_detail. Caller (admin endpoint) puts this in the
audit row and HTTP response body.
"""
effective_dir = session_data_dir if session_data_dir is not None else DEFAULT_SESSION_DATA_DIR
stats: dict[str, Any] = {
"processor": processor.name,
"scanned": 0,
"processed": 0,
"skipped": 0,
"errors": 0,
"items_extracted": 0,
"errors_detail": [],
}
repo = SessionProcessorStateRepository(conn)
candidates = repo.scan_unprocessed_for(processor.name, effective_dir)
stats["scanned"] = len(candidates)
if not candidates:
logger.info("No sessions to process for processor=%s", processor.name)
return stats
for username, jsonl_path in candidates:
session_key = f"{username}/{jsonl_path.name}"
try:
file_hash = compute_file_hash(jsonl_path)
except Exception as e:
logger.warning(
"Cannot hash %s for processor=%s: %s",
session_key, processor.name, e,
)
stats["errors"] += 1
stats["errors_detail"].append({"session": session_key, "error": str(e)})
continue
# Hash-aware skip: scan_unprocessed_for returns every candidate; we
# do the authoritative is_processed check here so the runner is the
# single place that decides "this exact (processor, session, hash)
# tuple is already done". Cost: one extra SELECT per candidate, but
# only for files that survived directory scan.
if repo.is_processed(processor.name, session_key, file_hash):
stats["skipped"] += 1
continue
try:
result = processor.process_session(jsonl_path, username, session_key, conn)
except Exception as e:
logger.exception(
"Processor %s failed on %s — leaving state unwritten for retry",
processor.name, session_key,
)
stats["errors"] += 1
stats["errors_detail"].append({"session": session_key, "error": str(e)})
continue
if not isinstance(result, ProcessorResult):
# Defensive: Protocol can't enforce the return type at runtime,
# so a misbehaving processor that returns None or an arbitrary
# dict shouldn't poison the state-write path. Treat it as zero
# items but still mark processed — the alternative (raise) would
# cause the same session to be retried forever.
logger.warning(
"Processor %s returned non-ProcessorResult on %s; coercing to empty result",
processor.name, session_key,
)
result = ProcessorResult(items_count=0)
repo.mark_processed(
processor_name=processor.name,
session_file=session_key,
username=username,
items_count=result.items_count,
file_hash=file_hash,
)
stats["processed"] += 1
stats["items_extracted"] += result.items_count
logger.info(
"Processor %s: scanned=%d processed=%d skipped=%d errors=%d items=%d",
processor.name,
stats["scanned"],
stats["processed"],
stats["skipped"],
stats["errors"],
stats["items_extracted"],
)
return stats