agnes-the-ai-analyst/app/api/admin_user_sessions.py
ZdenekSrotyr 3e19caa975
fix(security): RBAC filter uses stable user_id instead of mutable email local-part (#293) (#299)
* fix(security): RBAC filter for agnes_sessions matches both email local-part and user_id

The upload API (POST /api/upload/sessions) stores session files under
user_sessions/{user_id}/ (UUID), while the session collector uses the
OS username (email local-part). The session pipeline writes the directory
name verbatim into usage_session_summary.username, so the column can
contain either value depending on the ingestion path.

The RBAC filter in build_filter_clause previously only matched the email
local-part, missing sessions uploaded via the API. The fix adds an OR
condition so non-admin users see rows where username matches either their
email local-part or their user_id.

Closes #293

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(security): RBAC filter uses stable user_id instead of mutable email local-part

Closes #293

Previous fix used OR condition matching both email local-part and user_id
in the username column. This was fragile: email changes would break
filtering. This commit introduces a dedicated user_id column populated
by the session pipeline via resolve_user_id(), and switches the RBAC
filter to use it exclusively.

Changes:
- Schema v45: add user_id column to usage_session_summary and usage_events
- UsageProcessor: accept and store user_id in both tables
- runner.py: resolve_user_id() maps directory name to users.id UUID
  (exact match for UUID dirs, email LIKE for local-part dirs)
- INTERNAL_TABLES: agnes_sessions/agnes_telemetry filter on user_id column
- build_filter_clause: simplified to WHERE user_id = '<uuid>' (no OR)
- me.py/admin_user_sessions.py: query by user_id OR username for
  backward compatibility during transition
- USAGE_PROCESSOR_VERSION bumped 2→3 to trigger reprocessing/backfill
- Tests updated: 27 pass including new email-change resilience test

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(tests): bump schema version assertions 44→45

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(docs): correct resolve_user_id docstring, add TypeError comment

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(security): address review — backward-compat OR, LIKE escaping, narrower TypeError

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(security): address code review — eliminate TypeError hack, add resolve_user_id tests

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix(db): create user_id indexes in _v44_to_v45, not _SYSTEM_SCHEMA

_SYSTEM_SCHEMA runs before the migration ladder. On an upgrade from
v42/v43/v44, usage_events / usage_session_summary already exist without
the user_id column (CREATE TABLE IF NOT EXISTS is a no-op), so the
CREATE INDEX ... (user_id) lines in _SYSTEM_SCHEMA failed to bind and
aborted _ensure_schema — the app would not start post-upgrade. Move the
index creation to _v44_to_v45, which ADDs the column first. Same pattern
as the v41 audit_log indices.

* fix(usage): bump USAGE_PROCESSOR_VERSION 3→4 for user_id backfill

#303 shipped USAGE_PROCESSOR_VERSION=3 (release 0.54.12) for its
<command-name> slash extraction. This PR's 2→3 bump collided with it
on rebase, so the reprocess loop would not re-trigger to backfill the
new user_id column on deployments already running v3. Bump to 4.

* release: 0.54.13 — RBAC filter uses stable user_id (#293)

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
2026-05-14 14:12:54 +00:00

398 lines
14 KiB
Python

"""Admin endpoints for per-user session files.
Endpoints:
- GET /api/admin/users/{user_id}/sessions — paginated session list
- GET /api/admin/users/{user_id}/sessions/download-all — bulk ZIP download
- GET /api/admin/users/{user_id}/sessions/{session_file:path}/download — single JSONL
All admin-gated. Both download endpoints write audit_log rows.
"""
from __future__ import annotations
import io
import json
import logging
import os
import re
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import duckdb
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from app.auth.access import require_admin
from app.auth.dependencies import _get_db
from src.repositories.audit import AuditRepository
from src.repositories.users import UserRepository
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/admin", tags=["admin"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_SESSION_FILE_RE = re.compile(r"^[A-Za-z0-9._-]+\.jsonl$")
def _session_data_dir() -> Path:
return Path(os.environ.get("SESSION_DATA_DIR", "/data/user_sessions"))
def _resolve_user(user_id: str, conn: duckdb.DuckDBPyConnection) -> dict[str, Any]:
repo = UserRepository(conn)
target = repo.get_by_id(user_id)
if not target:
raise HTTPException(status_code=404, detail="User not found")
return target
def _username_from_user(user: dict[str, Any]) -> str:
"""Derive a filesystem username from the users row.
The session collector places files under the OS username of the agent
process, which for most deployments is the email local-part. The
`users` row stores the e-mail; we use the local-part (before '@') as
the best available approximation. If the server was configured with a
different SESSION_DATA_DIR layout, operators can subclass / monkey-patch
this helper — it is the single mapping point.
"""
email: str = user.get("email", "") or ""
return email.split("@")[0] if "@" in email else email
# ---------------------------------------------------------------------------
# GET /api/admin/users/{user_id}/sessions
# ---------------------------------------------------------------------------
@router.get("/users/{user_id}/sessions")
def list_user_sessions(
user_id: str,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Return a paginated session list for *user_id*.
Each row joins ``usage_session_summary`` (preferred, ``processed=true``)
with a filesystem scan of ``${SESSION_DATA_DIR}/<username>/*.jsonl`` so
the response surfaces sessions even when the UsageProcessor hasn't run yet
(``processed=false`` for those rows).
``processed=false`` rows carry only: ``session_file``, ``session_id``
(extracted from the filename when possible), ``started_at`` (file mtime),
and zeroed-out counters.
"""
target = _resolve_user(user_id, conn)
username = _username_from_user(target)
user_dir = _session_data_dir() / username
# ------------------------------------------------------------------
# Pull processed rows from usage_session_summary
# ------------------------------------------------------------------
# Match on both user_id (stable, v45+) and username (legacy) so the
# admin view shows sessions from both ingestion paths and pre-v45 rows.
try:
rows_db = conn.execute(
"""
SELECT
session_file, session_id, started_at, ended_at,
active_seconds, wall_seconds,
tool_calls, tool_errors, primary_model
FROM usage_session_summary
WHERE user_id = ? OR username = ?
ORDER BY started_at DESC NULLS LAST
""",
[user_id, username],
).fetchall()
except Exception:
rows_db = []
processed_files: dict[str, dict] = {}
if rows_db:
cols = [
"session_file",
"session_id",
"started_at",
"ended_at",
"active_seconds",
"wall_seconds",
"tool_calls",
"tool_errors",
"primary_model",
]
for r in rows_db:
d = dict(zip(cols, r))
# Normalise timestamps to ISO strings
for k in ("started_at", "ended_at"):
v = d.get(k)
if v is not None and hasattr(v, "isoformat"):
d[k] = v.isoformat()
d["processed"] = True
processed_files[d["session_file"]] = d
# ------------------------------------------------------------------
# Merge with filesystem scan — unindexed files become processed=false
# ------------------------------------------------------------------
all_rows: list[dict] = list(processed_files.values())
if user_dir.is_dir():
for p in sorted(user_dir.glob("*.jsonl"), key=lambda x: x.stat().st_mtime, reverse=True):
fname = p.name
# Relative key used as session_file value (matches what the
# processor writes: "<username>/<filename>" or just "<filename>").
# We normalise to basename-only to avoid path-separator surprises.
if fname not in processed_files:
mtime = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc)
# Try to extract a session_id from the filename: the collector
# names files like "<session_id>.jsonl" or "sess-<id>.jsonl".
sid = p.stem
all_rows.append(
{
"session_file": fname,
"session_id": sid,
"started_at": mtime.isoformat(),
"ended_at": None,
"active_seconds": None,
"wall_seconds": None,
"tool_calls": 0,
"tool_errors": 0,
"primary_model": None,
"processed": False,
}
)
# Sort: processed (have started_at) first then unprocessed, both newest-first
def _sort_key(r: dict):
ts = r.get("started_at") or ""
return (1 if r["processed"] else 0, "" if not ts else ts)
all_rows.sort(key=_sort_key, reverse=True)
total = len(all_rows)
page = all_rows[offset : offset + limit]
return {
"rows": page,
"pagination": {"limit": limit, "offset": offset, "total": total},
}
# ---------------------------------------------------------------------------
# GET /api/admin/users/{user_id}/sessions/download-all
# NOTE: this route MUST be declared BEFORE the /{session_file:path}/download
# route so FastAPI matches it first (exact segment wins over :path capture).
# ---------------------------------------------------------------------------
@router.get("/users/{user_id}/sessions/download-all")
def download_all_sessions(
user_id: str,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Stream a ZIP of every *.jsonl under the user's session directory.
Returns 404 when the directory doesn't exist.
Returns 200 + empty ZIP when the directory exists but has no JSONL files.
"""
target = _resolve_user(user_id, conn)
username = _username_from_user(target)
user_dir = _session_data_dir() / username
if not user_dir.is_dir():
raise HTTPException(status_code=404, detail="No session directory for this user")
jsonl_files = sorted(user_dir.glob("*.jsonl"))
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
zip_filename = f"{username}-sessions-{today}.zip"
total_bytes = 0
file_count = 0
# We need total_bytes and file_count for the audit row, but we also need
# to stream. For session files (typically < a few MB each) we build the
# ZIP in memory first so we can measure the totals, then yield it.
# If the corpus grows into GB territory, revisit with SpooledTemporaryFile.
user_dir_resolved = user_dir.resolve()
buf = io.BytesIO()
with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in jsonl_files:
# Guard against symlinks pointing outside the user's session directory.
try:
p.resolve().relative_to(user_dir_resolved)
except ValueError:
logger.warning(
"download_all_sessions: skipping symlink escape: %s -> %s",
p,
p.resolve(),
)
continue
data = p.read_bytes()
zf.writestr(p.name, data)
total_bytes += len(data)
file_count += 1
zip_bytes = buf.getvalue()
AuditRepository(conn).log(
user_id=user.get("id"),
action="session_bulk_download",
resource=f"users/{user_id}/sessions",
params={"file_count": file_count, "total_bytes": total_bytes, "username": username},
)
return StreamingResponse(
iter([zip_bytes]),
media_type="application/zip",
headers={
"Content-Disposition": f'attachment; filename="{zip_filename}"',
"Content-Length": str(len(zip_bytes)),
},
)
# ---------------------------------------------------------------------------
# GET /api/admin/users/{user_id}/sessions/{session_file:path}/download
# ---------------------------------------------------------------------------
@router.get("/users/{user_id}/sessions/{session_file:path}/download")
def download_session(
user_id: str,
session_file: str,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Stream the raw JSONL for a single session.
Path-traversal is guarded by three layers:
1. ``safe_name = Path(session_file).name`` — strips any ``../`` etc.
2. The name must match ``^[A-Za-z0-9._-]+\\.jsonl$``.
3. ``path.resolve()`` must still be under the session directory.
"""
# --- guard 1: basename extraction
safe_name = Path(session_file).name
if safe_name != session_file:
raise HTTPException(
status_code=400,
detail="session_file must be a plain basename (no path separators)",
)
# --- guard 2: character allowlist
if not _SESSION_FILE_RE.match(safe_name):
raise HTTPException(
status_code=400,
detail="session_file must match ^[A-Za-z0-9._-]+\\.jsonl$",
)
target = _resolve_user(user_id, conn)
username = _username_from_user(target)
user_dir = _session_data_dir() / username
path = user_dir / safe_name
if not path.exists():
raise HTTPException(status_code=404, detail="Session file not found")
# --- guard 3: resolved path still within session dir
try:
resolved = path.resolve()
base_resolved = user_dir.resolve()
resolved.relative_to(base_resolved)
except ValueError:
raise HTTPException(status_code=400, detail="Resolved path escapes session directory")
size = path.stat().st_size
AuditRepository(conn).log(
user_id=user.get("id"),
action="session_download",
resource=f"users/{user_id}/sessions/{safe_name}",
params={"bytes": size, "session_file": safe_name, "username": username},
)
def _iter_file():
with open(path, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
yield chunk
return StreamingResponse(
_iter_file(),
media_type="application/x-ndjson",
headers={
"Content-Disposition": f'attachment; filename="{safe_name}"',
"Content-Length": str(size),
},
)
# ---------------------------------------------------------------------------
# GET /api/admin/users/{user_id}/activity
# ---------------------------------------------------------------------------
@router.get("/users/{user_id}/activity")
def list_user_activity(
user_id: str,
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""List audit_log rows for a specific user.
Resolves user_id to the user record (404 if not found), filters audit_log
on the user_id field, returns paginated rows newest first.
"""
from src.repositories.audit import AuditRepository
row = conn.execute("SELECT id, email FROM users WHERE id = ?", [user_id]).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="user not found")
audit_repo = AuditRepository(conn)
rows, _ = audit_repo.query(user_id=user_id, limit=limit + offset)
# Apply offset via slicing — cursor-based pagination is per-page only
rows = rows[offset : offset + limit]
# Normalise timestamps to ISO strings and decode JSON params
for r in rows:
for k in ("timestamp",):
v = r.get(k)
if v is not None and hasattr(v, "isoformat"):
r[k] = v.isoformat()
params_val = r.get("params")
if isinstance(params_val, str):
try:
r["params"] = json.loads(params_val) if params_val else None
except (ValueError, TypeError):
pass
total = conn.execute("SELECT COUNT(*) FROM audit_log WHERE user_id = ?", [user_id]).fetchone()[0]
try:
AuditRepository(conn).log(
user_id=user.get("id"),
action="admin.user_activity_read",
resource=f"users/{user_id}/activity"[:256],
params={"target_user_id": user_id, "limit": limit, "offset": offset, "row_count": len(rows)},
result="success",
client_kind="web",
)
except Exception:
logger.exception("audit_log write failed for admin.user_activity_read; continuing")
return {
"rows": rows,
"pagination": {"limit": limit, "offset": offset, "total": int(total)},
}