diff --git a/CHANGELOG.md b/CHANGELOG.md index c067806..ec1da65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,29 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Added +- **Per-analyst Stats dashboard at `/me/stats`.** Four-tab page showing + the calling user's own data, lazy-loaded per tab: + - **Sessions** — paginated `usage_session_summary` rows + filesystem + scan of un-processed JSONL (matches the admin `list_user_sessions` + shape). Includes the v44 token columns aggregated per row. + - **Tokens** — daily series (default last 30 days), by-model + breakdown (lifetime), top-10 biggest sessions, lifetime totals. + - **Data access** — `audit_log` rows where `action LIKE 'query.%'` + for the caller (covers `query.local`, `query.hybrid`, `query.remote`, + `query.internal`). Cursor-paginated on `(timestamp, id)`. + - **Sync activity** — `audit_log` rows where action is `sync.*` or + `manifest.*` for the caller, plus the user's `last_pull_at` for the + header. Per-pull history now persists thanks to the new + `manifest.fetch` audit row. + Backed by `GET /api/me/stats/{sessions,tokens,queries,sync}`, + authed-only, server-side caller-scope. New "Stats" link added to the + primary nav between "Data Packages" and the Admin dropdown. +- **`manifest.fetch` audit_log row** written from + `GET /api/sync/manifest` alongside the `users.last_pull_at` bump. + Surfaces per-pull history (the column UPDATE only retains the most + recent timestamp) so the Sync activity tab and any other + audit-log-driven view can render a timeline. + - **Homepage status frame.** The `/home` page now opens with a 5-card status row above the install-hero / offboard-strip: **Last sync** (your last `agnes pull`), **Sessions**, **Prompts**, **Tokens used**, diff --git a/app/api/me_stats.py b/app/api/me_stats.py new file mode 100644 index 0000000..c531346 --- /dev/null +++ b/app/api/me_stats.py @@ -0,0 +1,454 @@ +"""Self-scoped Stats endpoints for /me/stats — the analyst's own +analytics dashboard. + +Four tabs, four endpoints, all gated by ``get_current_user`` so a +caller can only see their own data: + +- ``GET /api/me/stats/sessions?limit=&offset=`` — paginated session + list joined from ``usage_session_summary`` (post-processor) with a + filesystem scan of un-processed JSONL (matches the admin + ``list_user_sessions`` shape). +- ``GET /api/me/stats/tokens?days=30`` — daily token series + by-model + breakdown + top-10 biggest sessions. Powers the Tokens tab chart. +- ``GET /api/me/stats/queries?cursor_ts=&cursor_id=&limit=`` — + ``audit_log`` rows where ``action LIKE 'query.%'`` (BQ + local + DuckDB queries) for this user. Cursor-paginated (keyset on + ``(timestamp, id)``). +- ``GET /api/me/stats/sync?cursor_ts=&cursor_id=&limit=`` — + ``audit_log`` rows where action is ``sync.*`` or ``manifest.*``, + plus the user's ``last_pull_at`` for prominent header rendering. + +Username derivation: ``_username_for_stats(user)`` reuses the +email-local-part rule from ``app.api.me`` so the joins on +``usage_*`` rows (filesystem-derived OS username) align with what +the session collector writes. +""" +from __future__ import annotations + +import logging +import os +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +import duckdb +from fastapi import APIRouter, Depends, Query + +from app.auth.dependencies import _get_db, get_current_user +from src.repositories.audit import AuditRepository + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/me/stats", tags=["me"]) + + +def _username_for_stats(user: dict) -> str: + """Email local-part → filesystem username, mirroring the rule in + ``app.api.me._username_for_stats``. Kept inline so this module + has no cross-import dependency on ``me.py``; if the mapping + evolves both copies update. + """ + email: str = user.get("email", "") or "" + return email.split("@")[0] if "@" in email else email + + +def _session_data_dir() -> Path: + """Match ``app.api.admin_user_sessions._session_data_dir``.""" + return Path( + os.environ.get("SESSION_DATA_DIR") + or os.environ.get("AGNES_SESSION_DATA_DIR") + or "/data/sessions" + ) + + +# --------------------------------------------------------------------------- +# Sessions tab +# --------------------------------------------------------------------------- + + +@router.get("/sessions") +def list_self_sessions( + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +) -> dict: + """Paginated session list for the calling user. + + Joins ``usage_session_summary`` (processed=true) with a filesystem + scan of un-processed JSONL so a session appears immediately even + before the UsageProcessor runs. Mirrors the admin + ``list_user_sessions`` projection plus the v44 token columns — + the Stats tab renders one row per session with totals on the + right. + """ + username = _username_for_stats(user) + user_dir = _session_data_dir() / username + + try: + rows_db = conn.execute( + """ + SELECT + session_file, session_id, started_at, ended_at, + active_seconds, wall_seconds, + user_messages, tool_calls, tool_errors, + input_tokens, output_tokens, + cache_read_tokens, cache_creation_tokens, + primary_model + FROM usage_session_summary + WHERE username = ? + ORDER BY started_at DESC NULLS LAST + """, + [username], + ).fetchall() + except Exception: + # usage_session_summary may not exist on a partially-migrated DB. + # Fall back to filesystem-only listing rather than 500. + rows_db = [] + + cols = [ + "session_file", "session_id", "started_at", "ended_at", + "active_seconds", "wall_seconds", + "user_messages", "tool_calls", "tool_errors", + "input_tokens", "output_tokens", + "cache_read_tokens", "cache_creation_tokens", + "primary_model", + ] + processed: dict[str, dict] = {} + for r in rows_db: + d = dict(zip(cols, r)) + for k in ("started_at", "ended_at"): + v = d.get(k) + if v is not None and hasattr(v, "isoformat"): + d[k] = v.isoformat() + d["tokens_total"] = ( + int(d.get("input_tokens") or 0) + + int(d.get("output_tokens") or 0) + + int(d.get("cache_read_tokens") or 0) + + int(d.get("cache_creation_tokens") or 0) + ) + d["processed"] = True + processed[d["session_file"]] = d + + all_rows: list[dict] = list(processed.values()) + if user_dir.is_dir(): + for p in sorted( + user_dir.glob("*.jsonl"), + key=lambda x: x.stat().st_mtime, + reverse=True, + ): + if p.name in processed: + continue + mtime = datetime.fromtimestamp(p.stat().st_mtime).isoformat() + all_rows.append({ + "session_file": p.name, + "session_id": p.stem, + "started_at": mtime, + "ended_at": None, + "active_seconds": 0, + "wall_seconds": 0, + "user_messages": 0, + "tool_calls": 0, + "tool_errors": 0, + "input_tokens": 0, + "output_tokens": 0, + "cache_read_tokens": 0, + "cache_creation_tokens": 0, + "tokens_total": 0, + "primary_model": None, + "processed": False, + }) + + all_rows.sort( + key=lambda r: r.get("started_at") or "", + reverse=True, + ) + total = len(all_rows) + page = all_rows[offset : offset + limit] + return { + "total": total, + "offset": offset, + "limit": limit, + "rows": page, + } + + +# --------------------------------------------------------------------------- +# Tokens tab +# --------------------------------------------------------------------------- + + +@router.get("/tokens") +def get_tokens( + days: int = Query(30, ge=1, le=365), + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +) -> dict: + """Token breakdown for the Tokens tab. + + Returns a daily series (last *days* days), by-model breakdown + (lifetime), top-10 biggest sessions (by total tokens, lifetime), + and the lifetime grand total. Single round-trip via three + sub-queries — each scans the same per-user partition of + ``usage_session_summary`` which the + ``idx_usage_session_user`` index supports. + """ + username = _username_for_stats(user) + + # Daily series — interval literal interpolated from validated `days`. + daily = conn.execute( + f""" + SELECT + CAST(started_at AS DATE) AS day, + COALESCE(SUM(input_tokens), 0) AS input_tokens, + COALESCE(SUM(output_tokens), 0) AS output_tokens, + COALESCE(SUM(cache_read_tokens), 0) AS cache_read, + COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation, + COUNT(*) AS sessions + FROM usage_session_summary + WHERE username = ? + AND started_at >= current_timestamp - INTERVAL {int(days)} DAY + GROUP BY 1 + ORDER BY 1 + """, + [username], + ).fetchall() + daily_series = [ + { + "day": d.isoformat() if hasattr(d, "isoformat") else str(d), + "input": int(i or 0), + "output": int(o or 0), + "cache_read": int(cr or 0), + "cache_creation": int(cc or 0), + "sessions": int(s or 0), + "total": int((i or 0) + (o or 0) + (cr or 0) + (cc or 0)), + } + for (d, i, o, cr, cc, s) in daily + ] + + by_model = conn.execute( + """ + SELECT + COALESCE(primary_model, '(unknown)') AS model, + COALESCE(SUM(input_tokens), 0) AS input_tokens, + COALESCE(SUM(output_tokens), 0) AS output_tokens, + COALESCE(SUM(cache_read_tokens), 0) AS cache_read, + COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation, + COUNT(*) AS sessions + FROM usage_session_summary + WHERE username = ? + GROUP BY 1 + ORDER BY ( + COALESCE(SUM(input_tokens), 0) + + COALESCE(SUM(output_tokens), 0) + + COALESCE(SUM(cache_read_tokens), 0) + + COALESCE(SUM(cache_creation_tokens), 0) + ) DESC + """, + [username], + ).fetchall() + model_breakdown = [ + { + "model": m, "input": int(i or 0), "output": int(o or 0), + "cache_read": int(cr or 0), "cache_creation": int(cc or 0), + "sessions": int(s or 0), + "total": int((i or 0) + (o or 0) + (cr or 0) + (cc or 0)), + } + for (m, i, o, cr, cc, s) in by_model + ] + + top_sessions = conn.execute( + """ + SELECT + session_file, session_id, started_at, primary_model, + input_tokens, output_tokens, + cache_read_tokens, cache_creation_tokens, + (COALESCE(input_tokens, 0) + COALESCE(output_tokens, 0) + + COALESCE(cache_read_tokens, 0) + COALESCE(cache_creation_tokens, 0)) + AS tokens_total + FROM usage_session_summary + WHERE username = ? + ORDER BY tokens_total DESC + LIMIT 10 + """, + [username], + ).fetchall() + top = [ + { + "session_file": sf, + "session_id": sid, + "started_at": st.isoformat() if hasattr(st, "isoformat") else st, + "primary_model": pm, + "input": int(i or 0), "output": int(o or 0), + "cache_read": int(cr or 0), "cache_creation": int(cc or 0), + "total": int(tt or 0), + } + for (sf, sid, st, pm, i, o, cr, cc, tt) in top_sessions + ] + + totals_row = conn.execute( + """ + SELECT + COALESCE(SUM(input_tokens), 0), + COALESCE(SUM(output_tokens), 0), + COALESCE(SUM(cache_read_tokens), 0), + COALESCE(SUM(cache_creation_tokens), 0), + COUNT(*) + FROM usage_session_summary + WHERE username = ? + """, + [username], + ).fetchone() + ti, to, tcr, tcc, tses = totals_row or (0, 0, 0, 0, 0) + totals = { + "input": int(ti or 0), + "output": int(to or 0), + "cache_read": int(tcr or 0), + "cache_creation": int(tcc or 0), + "total": int((ti or 0) + (to or 0) + (tcr or 0) + (tcc or 0)), + "sessions": int(tses or 0), + } + + return { + "days": days, + "daily": daily_series, + "by_model": model_breakdown, + "top_sessions": top, + "totals": totals, + } + + +# --------------------------------------------------------------------------- +# Data access tab (BQ + DuckDB queries) +# --------------------------------------------------------------------------- + + +@router.get("/queries") +def list_self_queries( + limit: int = Query(50, ge=1, le=200), + cursor_ts: Optional[datetime] = None, + cursor_id: Optional[str] = None, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +) -> dict: + """Audit-log rows where ``action LIKE 'query.%'`` for the caller. + + Covers query.local (DuckDB on parquet), query.hybrid (BQ + local + join), query.remote (BQ direct), and query.internal (admin + internal queries that get attributed to the actor). Cursor + pagination on (timestamp, id) so streams under concurrent + writes don't double-render rows. + """ + cursor = (cursor_ts, cursor_id) if cursor_ts and cursor_id else None + rows, next_cursor = AuditRepository(conn).query( + user_id=user["id"], + action_prefix="query.", + cursor=cursor, + limit=limit, + ) + return _audit_response(rows, next_cursor, limit) + + +# --------------------------------------------------------------------------- +# Sync activity tab +# --------------------------------------------------------------------------- + + +@router.get("/sync") +def list_self_sync_activity( + limit: int = Query(50, ge=1, le=200), + cursor_ts: Optional[datetime] = None, + cursor_id: Optional[str] = None, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +) -> dict: + """Audit-log rows where action is ``sync.*`` or ``manifest.*`` + for the caller, plus ``users.last_pull_at`` for the header card. + + Two action prefixes are merged with a UNION-ish IN filter via + ``AuditRepository.query(action_in=[...])`` — but the repo helper + doesn't take both prefix and IN, so we call twice and merge. + Cheaper alternative is two SELECTs in the repo; for now we fetch + two pages and interleave because cursor merging across two + independent streams is fiddly without a unified ORDER. To keep + the code obvious, we use ``query_actions(...)`` and accept that + the cursor is single-stream (start over to page back; first page + is what matters for the dashboard). + """ + actions_seen = AuditRepository(conn).query_actions( + actions=_sync_action_list(), + limit=limit, + ) + # Filter to this user — query_actions doesn't take user_id. + user_rows = [r for r in actions_seen if r.get("user_id") == user["id"]] + + last_pull_row = conn.execute( + "SELECT last_pull_at FROM users WHERE id = ?", [user["id"]] + ).fetchone() + last_pull_at = last_pull_row[0] if last_pull_row else None + + return { + "last_pull_at": last_pull_at.isoformat() + if last_pull_at and hasattr(last_pull_at, "isoformat") + else last_pull_at, + "rows": [_audit_row_to_payload(r) for r in user_rows[:limit]], + # No next_cursor in this branch — fetched a single newest-window + # page. Pagination beyond the first page is rarely needed for + # personal sync history; the timeline tab in /admin/activity is + # the place for deeper dives. + "next_cursor": None, + } + + +def _sync_action_list() -> list[str]: + """The set of audit actions that surface on the Sync activity tab. + + Concrete known actions today: + - ``sync.trigger`` — admin manually kicks a sync. + - ``manifest.fetch`` — added in this PR; bumped on every + ``GET /api/sync/manifest``. + + Listed explicitly (vs. a prefix LIKE) so accidental future + ``sync.*`` actions (e.g. an admin-only ``sync.config_change``) + don't leak into the analyst-facing view without review. + """ + return ["sync.trigger", "manifest.fetch"] + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _audit_row_to_payload(row: dict) -> dict: + """Convert a raw audit_log row dict to the JSON shape the Stats + tabs render. Drops `correlation_id` (not useful per-user) and + iso-stringifies the timestamp.""" + ts = row.get("timestamp") + return { + "id": row.get("id"), + "timestamp": ts.isoformat() if ts and hasattr(ts, "isoformat") else ts, + "action": row.get("action"), + "resource": row.get("resource"), + "result": row.get("result"), + "duration_ms": row.get("duration_ms"), + "params": row.get("params"), + "client_kind": row.get("client_kind"), + } + + +def _audit_response(rows: list[dict], next_cursor, limit: int) -> dict: + """Shared shape for the queries and (alternate path) sync endpoints.""" + if next_cursor is not None: + ts, cid = next_cursor + nc = { + "timestamp": ts.isoformat() if hasattr(ts, "isoformat") else ts, + "id": cid, + } + else: + nc = None + return { + "limit": limit, + "rows": [_audit_row_to_payload(r) for r in rows], + "next_cursor": nc, + } diff --git a/app/api/sync.py b/app/api/sync.py index 1b1cf94..828ad01 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -812,10 +812,22 @@ async def sync_manifest( "UPDATE users SET last_pull_at = current_timestamp WHERE id = ?", [user["id"]], ) + # Also emit an audit_log row so /me/stats Sync activity has a + # timeline of pulls (the column UPDATE only retains the most + # recent one). Action `manifest.fetch` covers both `agnes pull` + # via PAT and browser-driven manifest peeks; clients can + # disambiguate via client_kind. + AuditRepository(conn).log( + user_id=user["id"], + action="manifest.fetch", + resource="manifest", + result="ok", + client_kind="api", + ) except Exception: - # Never block a pull because the stamp UPDATE hit a transient - # issue (locked WAL, partial migration window). The manifest - # itself is the load-bearing payload. + # Never block a pull because the stamp UPDATE / audit row hit a + # transient issue (locked WAL, partial migration window). The + # manifest itself is the load-bearing payload. pass return _build_manifest_for_user(conn, user) diff --git a/app/main.py b/app/main.py index 11088be..48a4417 100644 --- a/app/main.py +++ b/app/main.py @@ -97,6 +97,7 @@ from app.api.telegram import router as telegram_router from app.api.access import router as access_router, me_router as me_access_router from app.api.me_debug import router as me_debug_router from app.api.me import router as me_router +from app.api.me_stats import router as me_stats_router from app.api.admin import router as admin_router from app.api.admin_bigquery_test import router as admin_bigquery_test_router from app.api.jira_webhooks import router as jira_webhooks_router @@ -612,6 +613,7 @@ def create_app() -> FastAPI: app.include_router(me_access_router) app.include_router(me_debug_router) app.include_router(me_router) + app.include_router(me_stats_router) app.include_router(jira_webhooks_router) app.include_router(metrics_router) app.include_router(metadata_router) diff --git a/app/web/router.py b/app/web/router.py index fd089f2..0cd8ac6 100644 --- a/app/web/router.py +++ b/app/web/router.py @@ -726,6 +726,26 @@ async def home_page( return templates.TemplateResponse(request, "home_not_onboarded.html", ctx) +@router.get("/me/stats", response_class=HTMLResponse) +async def me_stats_page( + request: Request, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Per-analyst stats dashboard. Four tabs (Sessions / Tokens / + Data access / Sync activity) backed by /api/me/stats/* endpoints. + Authed-only; each endpoint enforces user_id = caller scoping + server-side so this route just renders the shell. + """ + ctx = _build_context( + request, + user=user, + conn=conn, + is_admin=is_user_admin(user["id"], conn), + ) + return templates.TemplateResponse(request, "me_stats.html", ctx) + + @router.get("/news", response_class=HTMLResponse) async def news_page( request: Request, diff --git a/app/web/templates/_app_header.html b/app/web/templates/_app_header.html index 4f1220f..f83d6b8 100644 --- a/app/web/templates/_app_header.html +++ b/app/web/templates/_app_header.html @@ -24,6 +24,7 @@ Home Marketplace Data Packages + Stats {# Memory + Admin menu: both admin-only. Backend gates the routes themselves via require_admin (see app/web/router.py for /corporate-memory + /corporate-memory/admin + /admin/*), so diff --git a/app/web/templates/me_stats.html b/app/web/templates/me_stats.html new file mode 100644 index 0000000..d00a3a0 --- /dev/null +++ b/app/web/templates/me_stats.html @@ -0,0 +1,463 @@ +{% extends "base.html" %} + +{% block title %}Stats — {{ instance_brand }}{% endblock %} + +{# Override block layout (same trick dashboard.html uses) to escape the + narrow .container wrap from base.html. Stats tables look cramped + inside 800px; 1280px matches the top-nav header width + (_app_header.html uses the same value) so the Stats page reads as + "same chrome, full content area" rather than a separate visual + identity. #} +{% block layout %} + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} +
+ {% for category, message in messages %} +
{{ message }}
+ {% endfor %} +
+ {% endif %} + {% endwith %} +
{{ self.content() }}
+{% endblock %} + +{% block content %} + + +
+

Your Stats

+

+ Sessions, tokens, data access, and sync activity for + {{ user.email }}. Data covers the + sessions {{ instance_brand }} has processed from your Claude + Code transcripts. +

+ + + +
+
Loading sessions…
+ +
+ +
+
Loading tokens…
+ +
+ +
+
Loading queries…
+ +
+ +
+
Loading sync activity…
+ +
+
+ + +{% endblock %} diff --git a/tests/test_me_stats.py b/tests/test_me_stats.py new file mode 100644 index 0000000..6002079 --- /dev/null +++ b/tests/test_me_stats.py @@ -0,0 +1,269 @@ +"""/api/me/stats/* — per-user dashboard endpoints. + +Coverage: +- All four endpoints scope rows to ``user["id"]`` / username so user A + cannot read user B's data (gates are server-side; the page renders a + shell with no caller-scope params). +- Empty user returns zero-counts / empty arrays, not 500. +- Tokens aggregates daily series, by-model, top-N, and lifetime totals + from a seeded sample. +- Sync activity surfaces both ``manifest.fetch`` and ``sync.trigger``, + filtered to caller. +- GET /api/sync/manifest writes the ``manifest.fetch`` audit row. +""" +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone + +import duckdb +import pytest + +from src.db import _SYSTEM_SCHEMA +from src.repositories.audit import AuditRepository + + +@pytest.fixture +def stats_conn(tmp_path): + db = tmp_path / "system.duckdb" + conn = duckdb.connect(str(db)) + conn.execute(_SYSTEM_SCHEMA) + return conn + + +def _seed_user(conn, *, uid, email): + conn.execute( + "INSERT INTO users (id, email, active, onboarded) VALUES (?, ?, TRUE, TRUE)", + [uid, email], + ) + + +def _seed_session(conn, *, sf, username, started_sql, model="claude-opus-4-7", + user_messages=0, tool_calls=0, + input_tokens=0, output_tokens=0, + cache_read=0, cache_creation=0): + conn.execute( + f""" + INSERT INTO usage_session_summary + (session_file, session_id, username, started_at, ended_at, + active_seconds, wall_seconds, user_messages, assistant_messages, + tool_calls, tool_errors, skill_invocations, subagent_dispatches, + mcp_calls, slash_commands, distinct_tools, distinct_skills, + primary_model, input_tokens, output_tokens, cache_read_tokens, + cache_creation_tokens, processor_version) + VALUES (?, ?, ?, {started_sql}, current_timestamp, + 10, 30, ?, ?, ?, 0, 0, 0, 0, 0, 0, 0, ?, ?, ?, ?, ?, 2) + """, + [sf, sf, username, user_messages, user_messages, + tool_calls, model, input_tokens, output_tokens, + cache_read, cache_creation], + ) + + +# --------------------------------------------------------------------------- +# Sessions tab +# --------------------------------------------------------------------------- + + +def test_sessions_endpoint_scopes_to_caller(stats_conn, tmp_path, monkeypatch): + """User A's sessions endpoint must not return user B's rows.""" + # Point session-fs scan at an empty dir so unprocessed-jsonl path is no-op. + monkeypatch.setenv("AGNES_SESSION_DATA_DIR", str(tmp_path / "noop")) + + _seed_user(stats_conn, uid="ua", email="alice@example.com") + _seed_user(stats_conn, uid="ub", email="bob@example.com") + _seed_session(stats_conn, sf="a1.jsonl", username="alice", + started_sql="current_timestamp - INTERVAL 1 HOUR", + user_messages=4, input_tokens=100, output_tokens=50) + _seed_session(stats_conn, sf="b1.jsonl", username="bob", + started_sql="current_timestamp - INTERVAL 1 HOUR", + user_messages=9, input_tokens=999, output_tokens=999) + + from app.api.me_stats import list_self_sessions + res_a = list_self_sessions( + limit=50, offset=0, + user={"id": "ua", "email": "alice@example.com"}, + conn=stats_conn, + ) + assert res_a["total"] == 1 + assert res_a["rows"][0]["session_file"] == "a1.jsonl" + assert res_a["rows"][0]["user_messages"] == 4 + assert res_a["rows"][0]["tokens_total"] == 150 + + +def test_sessions_endpoint_pagination(stats_conn, tmp_path, monkeypatch): + monkeypatch.setenv("AGNES_SESSION_DATA_DIR", str(tmp_path / "noop")) + _seed_user(stats_conn, uid="ua", email="alice@example.com") + for i in range(5): + _seed_session(stats_conn, sf=f"s{i}.jsonl", username="alice", + started_sql=f"current_timestamp - INTERVAL {i} HOUR", + user_messages=i) + + from app.api.me_stats import list_self_sessions + page1 = list_self_sessions( + limit=2, offset=0, + user={"id": "ua", "email": "alice@example.com"}, conn=stats_conn, + ) + page2 = list_self_sessions( + limit=2, offset=2, + user={"id": "ua", "email": "alice@example.com"}, conn=stats_conn, + ) + assert page1["total"] == 5 + assert len(page1["rows"]) == 2 + assert len(page2["rows"]) == 2 + assert page1["rows"][0]["session_file"] != page2["rows"][0]["session_file"] + + +# --------------------------------------------------------------------------- +# Tokens tab +# --------------------------------------------------------------------------- + + +def test_tokens_endpoint_empty_user(stats_conn): + _seed_user(stats_conn, uid="ua", email="alice@example.com") + from app.api.me_stats import get_tokens + res = get_tokens( + days=30, + user={"id": "ua", "email": "alice@example.com"}, + conn=stats_conn, + ) + assert res["totals"]["total"] == 0 + assert res["daily"] == [] + assert res["by_model"] == [] + assert res["top_sessions"] == [] + + +def test_tokens_endpoint_aggregates(stats_conn): + _seed_user(stats_conn, uid="ua", email="alice@example.com") + _seed_session(stats_conn, sf="x.jsonl", username="alice", + started_sql="current_timestamp - INTERVAL 1 HOUR", + model="claude-opus-4-7", + input_tokens=100, output_tokens=50, + cache_read=800, cache_creation=25) + _seed_session(stats_conn, sf="y.jsonl", username="alice", + started_sql="current_timestamp - INTERVAL 2 DAY", + model="claude-sonnet-4-6", + input_tokens=200, output_tokens=100, + cache_read=400, cache_creation=10) + # Far-past row excluded by `days=7` window for the daily series, but + # still counted in lifetime totals + by_model + top. + _seed_session(stats_conn, sf="z.jsonl", username="alice", + started_sql="current_timestamp - INTERVAL 60 DAY", + model="claude-opus-4-7", + input_tokens=1, output_tokens=1) + + from app.api.me_stats import get_tokens + res = get_tokens( + days=7, + user={"id": "ua", "email": "alice@example.com"}, + conn=stats_conn, + ) + # Lifetime totals include all three sessions + assert res["totals"]["sessions"] == 3 + assert res["totals"]["input"] == 301 + assert res["totals"]["output"] == 151 + assert res["totals"]["cache_read"] == 1200 + assert res["totals"]["cache_creation"] == 35 + assert res["totals"]["total"] == 1687 + + # Daily series window excludes the 60-day-old row + daily_sessions = sum(d["sessions"] for d in res["daily"]) + assert daily_sessions == 2 + + # By-model: opus has 2 sessions (x + z), sonnet has 1 (y) + models = {m["model"]: m for m in res["by_model"]} + assert models["claude-opus-4-7"]["sessions"] == 2 + assert models["claude-sonnet-4-6"]["sessions"] == 1 + + # Top sessions: largest first + assert res["top_sessions"][0]["session_file"] in ("x.jsonl", "y.jsonl") + assert res["top_sessions"][0]["total"] >= res["top_sessions"][1]["total"] + + +# --------------------------------------------------------------------------- +# Data access tab +# --------------------------------------------------------------------------- + + +def test_queries_endpoint_filters_to_query_actions(stats_conn): + _seed_user(stats_conn, uid="ua", email="alice@example.com") + repo = AuditRepository(stats_conn) + repo.log(user_id="ua", action="query.local", resource="orders", + result="ok", duration_ms=42) + repo.log(user_id="ua", action="query.remote", resource="web_sessions", + result="ok", duration_ms=1500) + # Non-query action — must not appear + repo.log(user_id="ua", action="manifest.fetch", result="ok") + # Same query action but different user — must not appear + repo.log(user_id="ub", action="query.local", result="ok") + + from app.api.me_stats import list_self_queries + res = list_self_queries( + limit=50, cursor_ts=None, cursor_id=None, + user={"id": "ua", "email": "alice@example.com"}, conn=stats_conn, + ) + assert len(res["rows"]) == 2 + actions = {r["action"] for r in res["rows"]} + assert actions == {"query.local", "query.remote"} + + +# --------------------------------------------------------------------------- +# Sync activity tab +# --------------------------------------------------------------------------- + + +def test_sync_endpoint_returns_manifest_fetch_rows(stats_conn): + _seed_user(stats_conn, uid="ua", email="alice@example.com") + repo = AuditRepository(stats_conn) + repo.log(user_id="ua", action="manifest.fetch", resource="manifest", + result="ok", client_kind="api") + repo.log(user_id="ua", action="sync.trigger", result="ok") + # Other user — must not leak + repo.log(user_id="ub", action="manifest.fetch", result="ok") + # Unrelated action — must not surface + repo.log(user_id="ua", action="query.local", result="ok") + # Stamp last_pull_at so the header card has a value + stats_conn.execute( + "UPDATE users SET last_pull_at = current_timestamp WHERE id = ?", + ["ua"], + ) + + from app.api.me_stats import list_self_sync_activity + res = list_self_sync_activity( + limit=50, cursor_ts=None, cursor_id=None, + user={"id": "ua", "email": "alice@example.com"}, conn=stats_conn, + ) + actions = {r["action"] for r in res["rows"]} + assert actions == {"manifest.fetch", "sync.trigger"} + assert res["last_pull_at"] is not None + + +# --------------------------------------------------------------------------- +# Manifest endpoint writes the audit_log row +# --------------------------------------------------------------------------- + + +def test_sync_manifest_writes_audit_row(stats_conn, monkeypatch, tmp_path): + """GET /api/sync/manifest must emit a manifest.fetch audit_log row + so the Sync activity tab can list per-pull history.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + _seed_user(stats_conn, uid="ua", email="alice@example.com") + + from app.api.sync import sync_manifest + asyncio.run( + sync_manifest( + user={"id": "ua", "email": "alice@example.com"}, + conn=stats_conn, + ) + ) + rows = stats_conn.execute( + "SELECT action, resource, result, client_kind FROM audit_log " + "WHERE user_id = ? ORDER BY timestamp DESC", + ["ua"], + ).fetchall() + assert len(rows) == 1 + action, resource, result, client_kind = rows[0] + assert action == "manifest.fetch" + assert resource == "manifest" + assert result == "ok" + assert client_kind == "api"