agnes-the-ai-analyst/tests/test_admin_run_endpoints.py
minasarustamyan e26236fdc1
Extract session-pipeline framework + UsageProcessor skeleton (#232)
* Extract session pipeline framework, refactor verification, add UsageProcessor skeleton

Pluggable framework under services/session_pipeline/ (contract + lib + per-processor
runner) so multiple processors can read /data/user_sessions/<key>/*.jsonl on their
own cadence with full failure isolation. Verification flow becomes the first plugin;
a no-op UsageProcessor reserves the second slot pending a separate brainstorm on
extraction logic + storage shape.

Schema v28→v29: rename session_extraction_state → session_processor_state with
composite PK (processor_name, session_file). Existing rows copied over with
processor_name='verification'; legacy table dropped. Migration is idempotent and
no-ops the copy step on fresh installs that came up at the new schema.

Endpoint: /api/admin/run-verification-detector replaced by parametrized
/api/admin/run-session-processor?processor=<name>. Audit action format follows.
Scheduler JOBS: verification-detector entry split into session-processor:verification
+ session-processor:usage. SCHEDULER_VERIFICATION_DETECTOR_INTERVAL retained for
operator compatibility (drives both cadence and health-check grace window);
SCHEDULER_USAGE_PROCESSOR_INTERVAL added.

* Address PR #232 review: scan dead branch + per-processor lock

- `SessionProcessorStateRepository.scan_unprocessed_for` dead else: both
  branches surfaced every jsonl, the SELECT was unused, runner MD5-rehashed
  every stable session per tick. Replaced with an mtime precheck — stable
  sessions (mtime <= processed_at) are filtered at scan; modified files
  still surface for the runner's authoritative `file_hash` invalidation.
  Naive-local comparison matches the existing health-check idiom (DuckDB
  TIMESTAMP strips tz on storage).

- Per-processor advisory lock around `_run_processor` in
  `/api/admin/run-session-processor`. Scheduler tick + manual admin POST
  could otherwise both run, both call create_evidence on overlapping
  detections, and accumulate duplicate verification_evidence rows (the
  dedup short-circuit only covers create+contradiction, not evidence per
  ADR Decision 3). Non-blocking acquire → 409 Conflict on concurrent
  invocation; release in finally so a runner exception doesn't wedge the
  processor.

Tests: two new scan unit tests (mtime filter + post-mark mtime bump), 409
endpoint test, lock-released-on-exception test. Two existing tests updated
for the new "filtered at scan" stat shape (previously asserted skipped == 1,
now scanned == 0).

* Address PR #232 review #2: parallel scheduler tick + last_run on terminal state

Two pre-existing scaffold bugs in services/scheduler/__main__.py amplified
by adding more session-pipeline jobs:

1. Serial for-loop over jobs with synchronous httpx.post(timeout=900) — a
   10-minute verification run blocked every other job (data-refresh,
   health-check, usage, corporate-memory) for the whole window. The PR's
   stated isolation guarantee held inside the runner but broke at the
   scheduler dispatch layer.

2. last_run advanced only when _call_api returned True. Permanent-failure
   jobs hot-looped on every tick (30s) instead of cadence (15min).

Fix: ThreadPoolExecutor.submit per due job + per-job in_flight set so a
long-running job can't be re-launched on subsequent ticks. last_run
advances unconditionally in finally; errors still surface via _call_api
logging + audit_log on the receiving side.

_run_job extracted to module-level for unit testing. New tests:
- TestRunJobBookkeeping: advances on success / failure / unhandled raise
- TestRunLoopParallelism: in_flight protection prevents duplicate
  launches across ticks for a single slow job

---------

Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
2026-05-08 19:47:46 +02:00

421 lines
17 KiB
Python

"""Admin run-* endpoints that wire the LLM pipeline into scheduler-v2.
The scheduler container must drive corporate-memory, the session-pipeline
processors, and session-collector through HTTP — see services/scheduler/__main__.py
docstring for why in-process invocation is not safe (DuckDB single-writer
contention with the long-lived app handle).
Endpoints:
- POST /api/admin/run-session-collector
- POST /api/admin/run-session-processor?processor=<name>
- POST /api/admin/run-corporate-memory
All admin-gated. Request body is empty. Response is the underlying job
stats dict.
"""
from __future__ import annotations
from unittest.mock import patch
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
class TestRunSessionCollector:
def test_admin_can_trigger_session_collector(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
fake_stats = {"users_processed": 1, "files_copied": 2, "files_skipped": 0}
with patch("services.session_collector.collector.run", return_value=(0, fake_stats)) as m:
resp = c.post("/api/admin/run-session-collector", headers=_auth(token))
assert resp.status_code == 200
body = resp.json()
assert body["ok"] is True
assert body["details"]["files_copied"] == 2
m.assert_called_once_with(dry_run=False, verbose=False)
def test_non_admin_blocked(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post("/api/admin/run-session-collector", headers=_auth(token))
assert resp.status_code == 403
def test_unauth_blocked(self, seeded_app):
c = seeded_app["client"]
resp = c.post("/api/admin/run-session-collector")
assert resp.status_code == 401
def test_unhandled_exception_still_audits(self, seeded_app):
"""Devin Review on 9ebe991b: run_session_collector must mirror
run_verification_detector / run_corporate_memory — record the
failure in audit_log even when collector.run() raises (e.g.
permission error walking /home/), so /admin/scheduler-runs sees
the failure instead of only docker logs."""
from src.db import get_system_db
c = seeded_app["client"]
token = seeded_app["admin_token"]
with patch(
"services.session_collector.collector.run",
side_effect=PermissionError("simulated /home permission denied"),
):
resp = c.post("/api/admin/run-session-collector", headers=_auth(token))
assert resp.status_code == 500
assert "PermissionError" in resp.json()["detail"]
conn = get_system_db()
try:
rows = conn.execute(
"SELECT params FROM audit_log WHERE action = 'run_session_collector' ORDER BY timestamp DESC LIMIT 1"
).fetchall()
finally:
conn.close()
assert rows, "audit row missing on unhandled exception"
params_json = rows[0][0]
assert "unhandled_error" in params_json
assert "PermissionError" in params_json
class TestRunSessionProcessor:
"""Parametrized session-processor endpoint replaces the per-processor
/run-* endpoints. The scheduler invokes it once per registered processor
on its own cadence."""
def test_admin_can_trigger_verification(self, seeded_app, monkeypatch):
# Need an LLM key in env so build_verification_processor() doesn't
# raise during registry construction.
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test")
# Reset the lazily-built registry so the new env is picked up.
from services.session_processors import _build_registry
_build_registry.cache_clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
fake_stats = {
"processor": "verification",
"scanned": 3, "processed": 2, "skipped": 1, "errors": 0,
"items_extracted": 4, "errors_detail": [],
}
with patch(
"services.session_pipeline.runner.run_processor",
return_value=fake_stats,
) as m, patch("connectors.llm.factory.AnthropicExtractor"):
resp = c.post(
"/api/admin/run-session-processor?processor=verification",
headers=_auth(token),
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["ok"] is True
assert body["details"]["items_extracted"] == 4
m.assert_called_once()
def test_admin_can_trigger_usage_skeleton(self, seeded_app):
"""The usage processor is registered as a no-op skeleton — endpoint
should route to it without needing any LLM config."""
from services.session_processors import _build_registry
_build_registry.cache_clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
fake_stats = {
"processor": "usage",
"scanned": 0, "processed": 0, "skipped": 0, "errors": 0,
"items_extracted": 0, "errors_detail": [],
}
with patch(
"services.session_pipeline.runner.run_processor",
return_value=fake_stats,
) as m:
resp = c.post(
"/api/admin/run-session-processor?processor=usage",
headers=_auth(token),
)
assert resp.status_code == 200, resp.text
assert resp.json()["ok"] is True
m.assert_called_once()
def test_unknown_processor_returns_400(self, seeded_app):
from services.session_processors import _build_registry
_build_registry.cache_clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/admin/run-session-processor?processor=bogus",
headers=_auth(token),
)
assert resp.status_code == 400
assert "Unknown processor" in resp.json()["detail"]
def test_concurrent_invocation_returns_409(self, seeded_app):
"""Per-processor advisory lock rejects overlapping calls so
scheduler tick + manual admin POST don't double up on the same
sessions and pile up duplicate verification_evidence rows
(PR #232 review)."""
from app.api.admin import _get_processor_run_lock
from services.session_processors import _build_registry
_build_registry.cache_clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
# Hold the lock externally to simulate an in-flight invocation.
lock = _get_processor_run_lock("usage")
lock.acquire()
try:
resp = c.post(
"/api/admin/run-session-processor?processor=usage",
headers=_auth(token),
)
finally:
lock.release()
assert resp.status_code == 409
assert "already running" in resp.json()["detail"]
def test_lock_released_on_runner_exception(self, seeded_app):
"""Even when the runner raises, the lock must release so the next
scheduler tick / admin POST can proceed. A leaked lock would wedge
the processor permanently until process restart."""
from app.api.admin import _get_processor_run_lock
from services.session_processors import _build_registry
_build_registry.cache_clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
with patch(
"services.session_pipeline.runner.run_processor",
side_effect=RuntimeError("simulated"),
):
resp = c.post(
"/api/admin/run-session-processor?processor=usage",
headers=_auth(token),
)
assert resp.status_code == 500
# Lock must be free now — second invocation can grab it.
lock = _get_processor_run_lock("usage")
assert lock.acquire(blocking=False), "lock leaked after runner exception"
lock.release()
def test_non_admin_blocked(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post(
"/api/admin/run-session-processor?processor=verification",
headers=_auth(token),
)
assert resp.status_code == 403
def test_unhandled_exception_still_audits(self, seeded_app, monkeypatch):
"""Mirror the run_session_collector / run_corporate_memory pattern —
record the failure in audit_log even when the runner raises so
/admin/scheduler-runs sees the failure instead of only docker logs."""
from src.db import get_system_db
from services.session_processors import _build_registry
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test")
_build_registry.cache_clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
with patch(
"services.session_pipeline.runner.run_processor",
side_effect=RuntimeError("simulated DuckDB lock"),
), patch("connectors.llm.factory.AnthropicExtractor"):
resp = c.post(
"/api/admin/run-session-processor?processor=verification",
headers=_auth(token),
)
assert resp.status_code == 500
assert "RuntimeError" in resp.json()["detail"]
conn = get_system_db()
try:
rows = conn.execute(
"SELECT params FROM audit_log "
"WHERE action = 'run_session_processor:verification' "
"ORDER BY timestamp DESC LIMIT 1"
).fetchall()
finally:
conn.close()
assert rows, "audit row missing on unhandled exception"
params_json = rows[0][0]
assert "unhandled_error" in params_json
assert "RuntimeError" in params_json
class TestRunCorporateMemory:
def test_admin_can_trigger_corporate_memory(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
fake_stats = {
"users_scanned": 2,
"files_found": 2,
"items_extracted": 3,
"items_filtered": 0,
"items_preserved": 1,
"items_new": 2,
"items_pending": 2,
"skipped": False,
"errors": [],
}
with patch(
"services.corporate_memory.collector.collect_all",
return_value=fake_stats,
) as m:
resp = c.post("/api/admin/run-corporate-memory", headers=_auth(token))
assert resp.status_code == 200
body = resp.json()
assert body["ok"] is True
assert body["details"]["items_new"] == 2
m.assert_called_once()
def test_non_admin_blocked(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
resp = c.post("/api/admin/run-corporate-memory", headers=_auth(token))
assert resp.status_code == 403
def test_unhandled_exception_still_audits(self, seeded_app):
"""Devin Review on 4c4dfee8: run_corporate_memory must mirror
run_verification_detector — record the failure in audit_log even
when collect_all() raises something other than ValueError, so
the operator sees the failure on /admin/scheduler-runs instead
of only in docker logs."""
from src.db import get_system_db
c = seeded_app["client"]
token = seeded_app["admin_token"]
with patch(
"services.corporate_memory.collector.collect_all",
side_effect=RuntimeError("simulated DuckDB lock"),
):
resp = c.post("/api/admin/run-corporate-memory", headers=_auth(token))
assert resp.status_code == 500
assert "RuntimeError" in resp.json()["detail"]
# The audit row must exist regardless of the 500.
conn = get_system_db()
try:
rows = conn.execute(
"SELECT params FROM audit_log WHERE action = 'run_corporate_memory' ORDER BY timestamp DESC LIMIT 1"
).fetchall()
finally:
conn.close()
assert rows, "audit row missing on unhandled exception"
params_json = rows[0][0]
assert "unhandled_error" in params_json
assert "RuntimeError" in params_json
class TestSchedulerJobsWireUp:
"""The scheduler must drive all three new endpoints on a sensible cadence."""
def test_scheduler_includes_session_collector(self, monkeypatch):
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs
names = {n for n, *_ in build_jobs()}
assert "session-collector" in names
def test_scheduler_includes_session_processors(self, monkeypatch):
"""Post-refactor: the verification-detector + usage processors are
wired through the parametrized run-session-processor endpoint."""
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs
names = {n for n, *_ in build_jobs()}
assert "session-processor:verification" in names
assert "session-processor:usage" in names
def test_scheduler_includes_corporate_memory(self, monkeypatch):
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs
names = {n for n, *_ in build_jobs()}
assert "corporate-memory" in names
def test_session_collector_endpoint_is_registered(self, monkeypatch):
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs
target = next(j for j in build_jobs() if j[0] == "session-collector")
_, _, endpoint, method, _t = target
assert endpoint == "/api/admin/run-session-collector"
assert method == "POST"
def test_session_processor_endpoints_are_registered(self, monkeypatch):
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs
jobs = {n: (endpoint, method) for n, _, endpoint, method, _ in build_jobs()}
assert jobs["session-processor:verification"] == (
"/api/admin/run-session-processor?processor=verification", "POST",
)
assert jobs["session-processor:usage"] == (
"/api/admin/run-session-processor?processor=usage", "POST",
)
def test_corporate_memory_endpoint_is_registered(self, monkeypatch):
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs
target = next(j for j in build_jobs() if j[0] == "corporate-memory")
_, _, endpoint, method, _t = target
assert endpoint == "/api/admin/run-corporate-memory"
assert method == "POST"
def test_new_jobs_have_offset_cadences(self, monkeypatch):
"""Three jobs in the same family must NOT all fire on the same tick.
Otherwise the LLM API and DuckDB writer all spike together every time
the cadence aligns. Different schedule strings ensure offset.
"""
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs
targets = {n: schedule for n, schedule, *_ in build_jobs()
if n in (
"session-collector",
"session-processor:verification",
"corporate-memory",
)}
# All three present.
assert len(targets) == 3