agnes-the-ai-analyst/services/session_pipeline/runner.py
ZdenekSrotyr 3e19caa975
fix(security): RBAC filter uses stable user_id instead of mutable email local-part (#293) (#299)
* fix(security): RBAC filter for agnes_sessions matches both email local-part and user_id

The upload API (POST /api/upload/sessions) stores session files under
user_sessions/{user_id}/ (UUID), while the session collector uses the
OS username (email local-part). The session pipeline writes the directory
name verbatim into usage_session_summary.username, so the column can
contain either value depending on the ingestion path.

The RBAC filter in build_filter_clause previously only matched the email
local-part, missing sessions uploaded via the API. The fix adds an OR
condition so non-admin users see rows where username matches either their
email local-part or their user_id.

Closes #293

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(security): RBAC filter uses stable user_id instead of mutable email local-part

Closes #293

Previous fix used OR condition matching both email local-part and user_id
in the username column. This was fragile: email changes would break
filtering. This commit introduces a dedicated user_id column populated
by the session pipeline via resolve_user_id(), and switches the RBAC
filter to use it exclusively.

Changes:
- Schema v45: add user_id column to usage_session_summary and usage_events
- UsageProcessor: accept and store user_id in both tables
- runner.py: resolve_user_id() maps directory name to users.id UUID
  (exact match for UUID dirs, email LIKE for local-part dirs)
- INTERNAL_TABLES: agnes_sessions/agnes_telemetry filter on user_id column
- build_filter_clause: simplified to WHERE user_id = '<uuid>' (no OR)
- me.py/admin_user_sessions.py: query by user_id OR username for
  backward compatibility during transition
- USAGE_PROCESSOR_VERSION bumped 2→3 to trigger reprocessing/backfill
- Tests updated: 27 pass including new email-change resilience test

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(tests): bump schema version assertions 44→45

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(docs): correct resolve_user_id docstring, add TypeError comment

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(security): address review — backward-compat OR, LIKE escaping, narrower TypeError

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(security): address code review — eliminate TypeError hack, add resolve_user_id tests

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(db): create user_id indexes in _v44_to_v45, not _SYSTEM_SCHEMA

_SYSTEM_SCHEMA runs before the migration ladder. On an upgrade from
v42/v43/v44, usage_events / usage_session_summary already exist without
the user_id column (CREATE TABLE IF NOT EXISTS is a no-op), so the
CREATE INDEX ... (user_id) lines in _SYSTEM_SCHEMA failed to bind and
aborted _ensure_schema — the app would not start post-upgrade. Move the
index creation to _v44_to_v45, which ADDs the column first. Same pattern
as the v41 audit_log indices.

* fix(usage): bump USAGE_PROCESSOR_VERSION 3→4 for user_id backfill

#303 shipped USAGE_PROCESSOR_VERSION=3 (release 0.54.12) for its
<command-name> slash extraction. This PR's 2→3 bump collided with it
on rebase, so the reprocess loop would not re-trigger to backfill the
new user_id column on deployments already running v3. Bump to 4.

* release: 0.54.13 — RBAC filter uses stable user_id (#293)

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
2026-05-14 14:12:54 +00:00

183 lines
6.5 KiB
Python

"""Per-processor runner — drives one SessionProcessor across all unprocessed
sessions in /data/user_sessions/. Each processor is invoked independently
(one call to run_processor per scheduler tick per processor); there is no
cross-processor coupling.
Failure handling mirrors the pre-refactor verification_detector behavior:
per-session try/except, on raise the state row is NOT written → the same
session will be retried on the next tick. There is no max_retries / dead
letter. A permanently malformed session will retry forever; that is a
known limitation we may revisit (out of scope for this refactor).
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Any
import duckdb
from services.session_pipeline.contract import ProcessorResult, SessionProcessor
from services.session_pipeline.lib import compute_file_hash
from src.repositories.session_processor_state import SessionProcessorStateRepository
logger = logging.getLogger(__name__)
def resolve_user_id(
conn: duckdb.DuckDBPyConnection,
username: str,
) -> str | None:
"""Map a session-directory name to the stable ``users.id`` UUID.
Two conventions exist for the directory name under
``/data/user_sessions/``:
* **Session collector** writes under the OS username, which in
current deployments equals the email local-part (e.g. ``alice``).
* **Upload API** writes under ``user["id"]`` — a UUID.
Resolution order:
1. Exact match on ``users.id`` (covers the UUID path).
2. Email local-part match: ``users.email LIKE '<username>@%'``.
If multiple users share the same local-part (different domains),
we pick the one most recently updated.
3. Fallback: return ``None`` (orphaned / deleted user).
"""
row = conn.execute(
"SELECT id FROM users WHERE id = ?",
[username],
).fetchone()
if row:
return row[0]
escaped = username.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
row = conn.execute(
"SELECT id FROM users WHERE email LIKE ? || '@%' ESCAPE '\\' ORDER BY updated_at DESC NULLS LAST LIMIT 1",
[escaped],
).fetchone()
if row:
return row[0]
return None
DEFAULT_SESSION_DATA_DIR = Path(os.environ.get("SESSION_DATA_DIR", "/data/user_sessions"))
def run_processor(
conn: duckdb.DuckDBPyConnection,
processor: SessionProcessor,
session_data_dir: Path | None = None,
) -> dict[str, Any]:
"""Run *processor* against every unprocessed session in
*session_data_dir* (defaults to $SESSION_DATA_DIR or /data/user_sessions).
Returns a stats dict with: scanned, processed, skipped, errors,
items_extracted, errors_detail. Caller (admin endpoint) puts this in the
audit row and HTTP response body.
"""
effective_dir = session_data_dir if session_data_dir is not None else DEFAULT_SESSION_DATA_DIR
stats: dict[str, Any] = {
"processor": processor.name,
"scanned": 0,
"processed": 0,
"skipped": 0,
"errors": 0,
"items_extracted": 0,
"errors_detail": [],
}
repo = SessionProcessorStateRepository(conn)
candidates = repo.scan_unprocessed_for(processor.name, effective_dir)
stats["scanned"] = len(candidates)
if not candidates:
logger.info("No sessions to process for processor=%s", processor.name)
return stats
# Pre-resolve user_id per directory name so each processor can
# store the stable identity. Cache avoids repeated DB lookups when
# one user has many sessions.
_uid_cache: dict[str, str | None] = {}
for username, jsonl_path in candidates:
session_key = f"{username}/{jsonl_path.name}"
try:
file_hash = compute_file_hash(jsonl_path)
except Exception as e:
logger.warning(
"Cannot hash %s for processor=%s: %s",
session_key,
processor.name,
e,
)
stats["errors"] += 1
stats["errors_detail"].append({"session": session_key, "error": str(e)})
continue
# Hash-aware skip: scan_unprocessed_for returns every candidate; we
# do the authoritative is_processed check here so the runner is the
# single place that decides "this exact (processor, session, hash)
# tuple is already done". Cost: one extra SELECT per candidate, but
# only for files that survived directory scan.
if repo.is_processed(processor.name, session_key, file_hash):
stats["skipped"] += 1
continue
if username not in _uid_cache:
_uid_cache[username] = resolve_user_id(conn, username)
resolved_uid = _uid_cache[username]
try:
result = processor.process_session(
jsonl_path,
username,
session_key,
conn,
user_id=resolved_uid,
)
except Exception as e:
logger.exception(
"Processor %s failed on %s — leaving state unwritten for retry",
processor.name,
session_key,
)
stats["errors"] += 1
stats["errors_detail"].append({"session": session_key, "error": str(e)})
continue
if not isinstance(result, ProcessorResult):
# Defensive: Protocol can't enforce the return type at runtime,
# so a misbehaving processor that returns None or an arbitrary
# dict shouldn't poison the state-write path. Treat it as zero
# items but still mark processed — the alternative (raise) would
# cause the same session to be retried forever.
logger.warning(
"Processor %s returned non-ProcessorResult on %s; coercing to empty result",
processor.name,
session_key,
)
result = ProcessorResult(items_count=0)
repo.mark_processed(
processor_name=processor.name,
session_file=session_key,
username=username,
items_count=result.items_count,
file_hash=file_hash,
)
stats["processed"] += 1
stats["items_extracted"] += result.items_count
logger.info(
"Processor %s: scanned=%d processed=%d skipped=%d errors=%d items=%d",
processor.name,
stats["scanned"],
stats["processed"],
stats["skipped"],
stats["errors"],
stats["items_extracted"],
)
return stats