diff --git a/app/api/admin.py b/app/api/admin.py index 7a2f115..efaacff 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -2785,3 +2785,119 @@ async def discover_and_register( return result except Exception as e: raise HTTPException(status_code=500, detail=f"Discovery and registration failed: {e}") + + +# --------------------------------------------------------------------------- +# Scheduler-driven LLM pipeline endpoints (#176) +# +# The scheduler container drives these via HTTP rather than running them +# in-process — same reasoning as the existing /api/marketplaces/sync-all +# job: DuckDB allows only one writer per file across processes, and the +# app keeps a long-lived handle on system.duckdb. Routing through the app +# inherits the existing connection without contention. +# +# Each endpoint is `def` (sync), so FastAPI runs it in a thread pool — +# the underlying jobs do blocking I/O (LLM calls, DuckDB writes, +# filesystem scans). Running on the asyncio thread would block health +# checks for the duration of a job. +# --------------------------------------------------------------------------- + + +@router.post("/run-session-collector") +def run_session_collector( + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Trigger the session-collector job from the scheduler. + + Walks /home/*/user/sessions/*.jsonl and copies new files into + /data/user_sessions//. Idempotent — already-collected files + are skipped. + """ + from services.session_collector import collector + + rc = collector.main() + AuditRepository(conn).log( + user_id=user.get("id"), + action="run_session_collector", + resource="job:session-collector", + params={"rc": rc}, + ) + return {"ok": rc == 0, "details": {"rc": rc}} + + +@router.post("/run-verification-detector") +def run_verification_detector( + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Trigger the verification-detector job from the scheduler. + + Reads collected session transcripts, extracts verified knowledge + via the LLM, and writes pending items to knowledge_items. The + /corporate-memory/admin queue picks them up for triage. + """ + from connectors.llm import create_extractor_from_env_or_config + from services.verification_detector import detector + from src.db import get_system_db + + # Build the extractor lazily so the endpoint surfaces a 500 with the + # factory's actionable error when no ai: block + no env keys are set. + try: + from config.loader import load_instance_config + try: + instance_config = load_instance_config() + except (ValueError, FileNotFoundError): + instance_config = {} + ai_config = instance_config.get("ai") if instance_config else None + extractor = create_extractor_from_env_or_config(ai_config) + except ValueError as e: + raise HTTPException(status_code=500, detail=str(e)) + + job_conn = get_system_db() + try: + stats = detector.run(job_conn, extractor, dry_run=False) + finally: + try: + job_conn.close() + except Exception: + pass + + AuditRepository(conn).log( + user_id=user.get("id"), + action="run_verification_detector", + resource="job:verification-detector", + params={ + "items_created": stats.get("items_created", 0), + "errors": len(stats.get("errors", [])), + }, + ) + return {"ok": not stats.get("errors"), "details": stats} + + +@router.post("/run-corporate-memory") +def run_corporate_memory( + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Trigger the corporate-memory catalog refresh from the scheduler. + + Reads all CLAUDE.local.md files, sends them through the LLM with the + existing catalog, and writes an updated catalog to knowledge.json. + """ + from services.corporate_memory.collector import collect_all + + stats = collect_all(dry_run=False) + + AuditRepository(conn).log( + user_id=user.get("id"), + action="run_corporate_memory", + resource="job:corporate-memory", + params={ + "items_new": stats.get("items_new", 0), + "items_filtered": stats.get("items_filtered", 0), + "errors": len(stats.get("errors", [])), + "skipped": stats.get("skipped", False), + }, + ) + return {"ok": not stats.get("errors"), "details": stats} diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py index 8b03d7f..51a8cdc 100644 --- a/services/scheduler/__main__.py +++ b/services/scheduler/__main__.py @@ -137,10 +137,16 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]: f"cadence by up to one tick" ) return [ - ("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120), - ("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30), - ("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600), - ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), + ("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120), + ("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30), + ("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600), + ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), + # LLM pipeline (#176). Cadences are deliberately offset (10m / 15m + # / 17m, all coprime modulo 30s tick) so the three LLM-driven jobs + # don't fire on the same tick and stack their API + DB load. + ("session-collector", "every 10m", "/api/admin/run-session-collector", "POST", 300), + ("verification-detector", "every 15m", "/api/admin/run-verification-detector", "POST", 900), + ("corporate-memory", "every 17m", "/api/admin/run-corporate-memory", "POST", 900), ] _running = True diff --git a/tests/test_admin_run_endpoints.py b/tests/test_admin_run_endpoints.py new file mode 100644 index 0000000..93ceb3e --- /dev/null +++ b/tests/test_admin_run_endpoints.py @@ -0,0 +1,217 @@ +"""Admin run-* endpoints that wire the LLM pipeline into scheduler-v2. + +The scheduler container must drive corporate-memory, verification-detector, +and session-collector through HTTP — see services/scheduler/__main__.py +docstring for why in-process invocation is not safe (DuckDB single-writer +contention with the long-lived app handle). + +Endpoints: +- POST /api/admin/run-session-collector +- POST /api/admin/run-verification-detector +- POST /api/admin/run-corporate-memory + +All admin-gated. Request body is empty. Response is the underlying job +stats dict. + +Closes one of five defects in #176. +""" + +from __future__ import annotations + +from unittest.mock import patch + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +class TestRunSessionCollector: + def test_admin_can_trigger_session_collector(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + with patch("services.session_collector.collector.main", return_value=0) as m: + resp = c.post("/api/admin/run-session-collector", headers=_auth(token)) + assert resp.status_code == 200 + body = resp.json() + assert body["ok"] is True + m.assert_called_once() + + def test_non_admin_blocked(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post("/api/admin/run-session-collector", headers=_auth(token)) + assert resp.status_code == 403 + + def test_unauth_blocked(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/admin/run-session-collector") + assert resp.status_code == 401 + + +class TestRunVerificationDetector: + def test_admin_can_trigger_verification_detector(self, seeded_app, monkeypatch): + # Set the env so the factory's env-fallback returns a real (mocked + # at the SDK boundary) extractor without 500-ing on missing config. + monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-test") + c = seeded_app["client"] + token = seeded_app["admin_token"] + fake_stats = { + "sessions_scanned": 3, + "sessions_processed": 2, + "sessions_skipped": 1, + "verifications_extracted": 5, + "items_created": 4, + "errors": [], + } + with patch( + "services.verification_detector.detector.run", + return_value=fake_stats, + ) as m, patch( + "connectors.llm.factory.AnthropicExtractor" + ): + resp = c.post("/api/admin/run-verification-detector", headers=_auth(token)) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["ok"] is True + assert body["details"]["items_created"] == 4 + m.assert_called_once() + + def test_non_admin_blocked(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post("/api/admin/run-verification-detector", headers=_auth(token)) + assert resp.status_code == 403 + + +class TestRunCorporateMemory: + def test_admin_can_trigger_corporate_memory(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + fake_stats = { + "users_scanned": 2, + "files_found": 2, + "items_extracted": 3, + "items_filtered": 0, + "items_preserved": 1, + "items_new": 2, + "items_pending": 2, + "skipped": False, + "errors": [], + } + with patch( + "services.corporate_memory.collector.collect_all", + return_value=fake_stats, + ) as m: + resp = c.post("/api/admin/run-corporate-memory", headers=_auth(token)) + assert resp.status_code == 200 + body = resp.json() + assert body["ok"] is True + assert body["details"]["items_new"] == 2 + m.assert_called_once() + + def test_non_admin_blocked(self, seeded_app): + c = seeded_app["client"] + token = seeded_app["analyst_token"] + resp = c.post("/api/admin/run-corporate-memory", headers=_auth(token)) + assert resp.status_code == 403 + + +class TestSchedulerJobsWireUp: + """The scheduler must drive all three new endpoints on a sensible cadence.""" + + def test_scheduler_includes_session_collector(self, monkeypatch): + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs + names = {n for n, *_ in build_jobs()} + assert "session-collector" in names + + def test_scheduler_includes_verification_detector(self, monkeypatch): + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs + names = {n for n, *_ in build_jobs()} + assert "verification-detector" in names + + def test_scheduler_includes_corporate_memory(self, monkeypatch): + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs + names = {n for n, *_ in build_jobs()} + assert "corporate-memory" in names + + def test_session_collector_endpoint_is_registered(self, monkeypatch): + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs + target = next(j for j in build_jobs() if j[0] == "session-collector") + _, _, endpoint, method, _t = target + assert endpoint == "/api/admin/run-session-collector" + assert method == "POST" + + def test_verification_detector_endpoint_is_registered(self, monkeypatch): + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs + target = next(j for j in build_jobs() if j[0] == "verification-detector") + _, _, endpoint, method, _t = target + assert endpoint == "/api/admin/run-verification-detector" + assert method == "POST" + + def test_corporate_memory_endpoint_is_registered(self, monkeypatch): + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs + target = next(j for j in build_jobs() if j[0] == "corporate-memory") + _, _, endpoint, method, _t = target + assert endpoint == "/api/admin/run-corporate-memory" + assert method == "POST" + + def test_new_jobs_have_offset_cadences(self, monkeypatch): + """Three jobs in the same family must NOT all fire on the same tick. + + Otherwise the LLM API and DuckDB writer all spike together every time + the cadence aligns. Different schedule strings ensure offset. + """ + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs + targets = {n: schedule for n, schedule, *_ in build_jobs() + if n in ("session-collector", "verification-detector", "corporate-memory")} + # All three present. + assert len(targets) == 3