diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a0556d..9f0153a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,20 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.54.13] — 2026-05-14 + +### Security + +- **RBAC filter uses stable `user_id` (UUID) instead of mutable email + local-part (#293).** Non-admin users querying `agnes_sessions` / + `agnes_telemetry` are now filtered by `user_id` (immutable UUID) + rather than `username` (email local-part, which changes on rename). + Schema v45 adds a `user_id` column to `usage_session_summary` and + `usage_events`; the session pipeline's `resolve_user_id()` populates + it on every (re)process run. `USAGE_PROCESSOR_VERSION` bumps 3→4 to + trigger backfill. During the transition period, RBAC queries include + an OR fallback on `username` so pre-backfill rows remain visible. + ## [0.54.12] — 2026-05-14 ### Fixed diff --git a/app/api/admin_user_sessions.py b/app/api/admin_user_sessions.py index 7fdd8b1..2d9c0d4 100644 --- a/app/api/admin_user_sessions.py +++ b/app/api/admin_user_sessions.py @@ -97,6 +97,8 @@ def list_user_sessions( # ------------------------------------------------------------------ # Pull processed rows from usage_session_summary # ------------------------------------------------------------------ + # Match on both user_id (stable, v45+) and username (legacy) so the + # admin view shows sessions from both ingestion paths and pre-v45 rows. try: rows_db = conn.execute( """ @@ -105,19 +107,27 @@ def list_user_sessions( active_seconds, wall_seconds, tool_calls, tool_errors, primary_model FROM usage_session_summary - WHERE username = ? + WHERE user_id = ? OR username = ? ORDER BY started_at DESC NULLS LAST """, - [username], + [user_id, username], ).fetchall() except Exception: rows_db = [] processed_files: dict[str, dict] = {} if rows_db: - cols = ["session_file", "session_id", "started_at", "ended_at", - "active_seconds", "wall_seconds", "tool_calls", "tool_errors", - "primary_model"] + cols = [ + "session_file", + "session_id", + "started_at", + "ended_at", + "active_seconds", + "wall_seconds", + "tool_calls", + "tool_errors", + "primary_model", + ] for r in rows_db: d = dict(zip(cols, r)) # Normalise timestamps to ISO strings @@ -144,18 +154,20 @@ def list_user_sessions( # Try to extract a session_id from the filename: the collector # names files like ".jsonl" or "sess-.jsonl". sid = p.stem - all_rows.append({ - "session_file": fname, - "session_id": sid, - "started_at": mtime.isoformat(), - "ended_at": None, - "active_seconds": None, - "wall_seconds": None, - "tool_calls": 0, - "tool_errors": 0, - "primary_model": None, - "processed": False, - }) + all_rows.append( + { + "session_file": fname, + "session_id": sid, + "started_at": mtime.isoformat(), + "ended_at": None, + "active_seconds": None, + "wall_seconds": None, + "tool_calls": 0, + "tool_errors": 0, + "primary_model": None, + "processed": False, + } + ) # Sort: processed (have started_at) first then unprocessed, both newest-first def _sort_key(r: dict): @@ -165,7 +177,7 @@ def list_user_sessions( all_rows.sort(key=_sort_key, reverse=True) total = len(all_rows) - page = all_rows[offset: offset + limit] + page = all_rows[offset : offset + limit] return { "rows": page, @@ -351,7 +363,7 @@ def list_user_activity( audit_repo = AuditRepository(conn) rows, _ = audit_repo.query(user_id=user_id, limit=limit + offset) # Apply offset via slicing — cursor-based pagination is per-page only - rows = rows[offset: offset + limit] + rows = rows[offset : offset + limit] # Normalise timestamps to ISO strings and decode JSON params for r in rows: @@ -366,9 +378,7 @@ def list_user_activity( except (ValueError, TypeError): pass - total = conn.execute( - "SELECT COUNT(*) FROM audit_log WHERE user_id = ?", [user_id] - ).fetchone()[0] + total = conn.execute("SELECT COUNT(*) FROM audit_log WHERE user_id = ?", [user_id]).fetchone()[0] try: AuditRepository(conn).log( diff --git a/app/api/me.py b/app/api/me.py index 7f5c41d..0cc8200 100644 --- a/app/api/me.py +++ b/app/api/me.py @@ -84,9 +84,7 @@ def _username_for_stats(user: dict) -> str: return email.split("@")[0] if "@" in email else email -def compute_home_stats( - conn: duckdb.DuckDBPyConnection, user: dict, window: str = "24h" -) -> dict: +def compute_home_stats(conn: duckdb.DuckDBPyConnection, user: dict, window: str = "24h") -> dict: """Pure helper that returns the home-stats payload for the given user. Shared by the HTTP endpoint and the /home Jinja handler (server-side @@ -101,9 +99,13 @@ def compute_home_stats( interval = _WINDOW_INTERVALS["24h"] username = _username_for_stats(user) + uid = user.get("id") or "" # f-string interpolates only the validated interval literal above; # all user-controlled input flows through bound parameters. + # Match on both user_id (stable, populated by v45 pipeline) and + # username (legacy rows before v45 backfill) so stats are complete + # during the transition period. sql = f""" WITH win AS ( SELECT current_timestamp - {interval} AS since @@ -117,12 +119,13 @@ def compute_home_stats( COALESCE(SUM(cache_read_tokens), 0) AS cache_read, COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation FROM usage_session_summary, win - WHERE username = ? AND started_at >= win.since + WHERE (user_id = ? OR username = ?) + AND started_at >= win.since ), proj AS ( SELECT COUNT(DISTINCT cwd) AS projects FROM usage_events, win - WHERE username = ? + WHERE (user_id = ? OR username = ?) AND cwd IS NOT NULL AND occurred_at >= win.since ), @@ -137,7 +140,7 @@ def compute_home_stats( proj.projects FROM u, sess, proj """ - row = conn.execute(sql, [username, username, user["id"]]).fetchone() + row = conn.execute(sql, [uid, username, uid, username, uid]).fetchone() if row is None: return { @@ -146,15 +149,16 @@ def compute_home_stats( "sessions": 0, "prompts": 0, "tokens": { - "input": 0, "output": 0, - "cache_read": 0, "cache_creation": 0, + "input": 0, + "output": 0, + "cache_read": 0, + "cache_creation": 0, "total": 0, }, "projects": 0, } - (last_pull_at, sessions, prompts, - input_t, output_t, cache_read, cache_creation, projects) = row + (last_pull_at, sessions, prompts, input_t, output_t, cache_read, cache_creation, projects) = row return { "window": window, "last_pull_at": last_pull_at.isoformat() if last_pull_at else None, @@ -165,8 +169,7 @@ def compute_home_stats( "output": int(output_t or 0), "cache_read": int(cache_read or 0), "cache_creation": int(cache_creation or 0), - "total": int((input_t or 0) + (output_t or 0) - + (cache_read or 0) + (cache_creation or 0)), + "total": int((input_t or 0) + (output_t or 0) + (cache_read or 0) + (cache_creation or 0)), }, "projects": int(projects or 0), } diff --git a/connectors/internal/access.py b/connectors/internal/access.py index 6152626..0358431 100644 --- a/connectors/internal/access.py +++ b/connectors/internal/access.py @@ -27,9 +27,8 @@ from __future__ import annotations import logging import re from dataclasses import dataclass -from typing import Any, Optional +from typing import Any -import duckdb logger = logging.getLogger(__name__) @@ -38,6 +37,7 @@ logger = logging.getLogger(__name__) # Internal-table registry — single source of truth # --------------------------------------------------------------------------- + @dataclass(frozen=True) class InternalTable: """One internal table mapping. @@ -54,30 +54,34 @@ class InternalTable: display_name: human-readable name (also goes into ``table_registry.name``) description: short blurb (catalog UI + ``agnes catalog`` output) """ + registry_id: str source_table: str filter_column: str - filter_kind: str # 'username' | 'user_id' + filter_kind: str # 'username' | 'user_id' display_name: str description: str + legacy_username_column: str | None = None # backward-compat OR fallback INTERNAL_TABLES: tuple[InternalTable, ...] = ( InternalTable( registry_id="agnes_sessions", source_table="usage_session_summary", - filter_column="username", - filter_kind="username", + filter_column="user_id", + filter_kind="user_id", display_name="Agnes sessions", description="Claude Code sessions. Also available locally for analysis.", + legacy_username_column="username", ), InternalTable( registry_id="agnes_telemetry", source_table="usage_events", - filter_column="username", - filter_kind="username", + filter_column="user_id", + filter_kind="user_id", display_name="Agnes telemetry events", description="Tool and skill invocations from Claude Code. Also available locally for analysis.", + legacy_username_column="username", ), InternalTable( registry_id="agnes_audit", @@ -89,9 +93,7 @@ INTERNAL_TABLES: tuple[InternalTable, ...] = ( ), ) -INTERNAL_TABLES_BY_ID: dict[str, InternalTable] = { - t.registry_id: t for t in INTERNAL_TABLES -} +INTERNAL_TABLES_BY_ID: dict[str, InternalTable] = {t.registry_id: t for t in INTERNAL_TABLES} def is_internal_table(table_id: str) -> bool: @@ -106,7 +108,7 @@ def is_internal_table(table_id: str) -> bool: # resolve to filesystem usernames with a `+`, and the session-data-dir # layout already supports the same character class. _USERNAME_RE = re.compile(r"^[A-Za-z0-9._+-]{1,200}$") -_USER_ID_RE = re.compile(r"^[A-Za-z0-9._@:+-]{1,200}$") +_USER_ID_RE = re.compile(r"^[A-Za-z0-9._@:+-]{1,200}$") class InternalAccessError(Exception): @@ -128,16 +130,12 @@ def _filter_value(user: dict[str, Any], kind: str) -> str: email = (user or {}).get("email", "") or "" username = email.split("@")[0] if "@" in email else email if not _USERNAME_RE.match(username): - raise InternalAccessError( - f"user email {email!r} does not yield a safe username for scoping" - ) + raise InternalAccessError(f"user email {email!r} does not yield a safe username for scoping") return username if kind == "user_id": uid = (user or {}).get("id", "") or "" if not _USER_ID_RE.match(uid): - raise InternalAccessError( - f"user_id {uid!r} fails the safe-identifier check" - ) + raise InternalAccessError(f"user_id {uid!r} fails the safe-identifier check") return uid raise InternalAccessError(f"unknown filter_kind: {kind!r}") @@ -145,15 +143,24 @@ def _filter_value(user: dict[str, Any], kind: str) -> str: def build_filter_clause(table: InternalTable, user: dict[str, Any], is_admin: bool) -> str: """Return the WHERE clause for one internal table. - Admins get an empty string (unscoped view). Everyone else gets + Admins get an empty string (unscoped view). Everyone else get ``WHERE = ''`` where value has been regex-validated. + + ``agnes_sessions`` and ``agnes_telemetry`` filter primarily on + ``user_id`` (stable UUID) but include an OR fallback on + ``username`` (email local-part) for rows that pre-date the v45 + backfill. Once all rows carry a non-NULL ``user_id`` the + ``legacy_username_column`` field can be removed. """ if is_admin: return "" value = _filter_value(user, table.filter_kind) - # value is regex-validated to ``[A-Za-z0-9._@:-]`` so single-quote - # escape is unnecessary; double single-quote anyway for defense-in-depth. safe = value.replace("'", "''") + + if table.legacy_username_column: + legacy = _filter_value(user, "username").replace("'", "''") + return f"WHERE ({table.filter_column} = '{safe}' OR {table.legacy_username_column} = '{legacy}')" + return f"WHERE {table.filter_column} = '{safe}'" @@ -162,7 +169,7 @@ def build_filter_clause(table: InternalTable, user: dict[str, Any], is_admin: bo # --------------------------------------------------------------------------- _TABLE_REF_RE = re.compile( - r'\b(' + '|'.join(re.escape(t.registry_id) for t in INTERNAL_TABLES) + r')\b', + r"\b(" + "|".join(re.escape(t.registry_id) for t in INTERNAL_TABLES) + r")\b", re.IGNORECASE, ) @@ -175,7 +182,7 @@ _SQL_STRING_LITERAL_RE = re.compile(r"'(?:''|[^'])*'") # comment-wrapped table name (`/**/users/**/`) can't slip past the # identifier scan downstream. _SQL_BLOCK_COMMENT_RE = re.compile(r"/\*[\s\S]*?\*/") -_SQL_LINE_COMMENT_RE = re.compile(r"--[^\n]*") +_SQL_LINE_COMMENT_RE = re.compile(r"--[^\n]*") def _strip_sql_noise(sql: str) -> str: @@ -190,9 +197,7 @@ def _strip_sql_noise(sql: str) -> str: return s -_INTERNAL_ALIAS_NAMES: frozenset[str] = frozenset( - t.registry_id.lower() for t in INTERNAL_TABLES -) +_INTERNAL_ALIAS_NAMES: frozenset[str] = frozenset(t.registry_id.lower() for t in INTERNAL_TABLES) def _sensitive_table_reference(stripped_sql: str, conn) -> str | None: @@ -211,10 +216,7 @@ def _sensitive_table_reference(stripped_sql: str, conn) -> str | None: double-quoted (`"users"`) forms both match because the bare name still sits between word boundaries. """ - rows = conn.execute( - "SELECT table_name FROM information_schema.tables " - "WHERE table_schema = 'main'" - ).fetchall() + rows = conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'").fetchall() for (name,) in rows: if name is None: continue @@ -300,17 +302,13 @@ def execute_internal_query( sensitive = _sensitive_table_reference(stripped, get_system_db()) if sensitive is not None: raise InternalAccessError( - f"non-admin SQL cannot reference table {sensitive!r}; " - "query one of the agnes_* aliases instead" + f"non-admin SQL cannot reference table {sensitive!r}; query one of the agnes_* aliases instead" ) cte_parts = [] for table_id in refs: table = INTERNAL_TABLES_BY_ID[table_id] where_clause = build_filter_clause(table, user, is_admin) - cte_parts.append( - f"{table.registry_id} AS " - f"(SELECT * FROM {table.source_table} {where_clause})" - ) + cte_parts.append(f"{table.registry_id} AS (SELECT * FROM {table.source_table} {where_clause})") cte_prefix = "WITH " + ", ".join(cte_parts) wrapped = f"{cte_prefix} SELECT * FROM ({sql}) AS _agnes_user_query" @@ -332,6 +330,7 @@ def execute_internal_query( # Schema introspection — feeds /api/v2/schema/{id} for internal tables # --------------------------------------------------------------------------- + def get_schema(system_db_path: str, table_id: str) -> list[dict]: """Return the underlying physical schema for an internal table. @@ -349,6 +348,7 @@ def get_schema(system_db_path: str, table_id: str) -> list[dict]: return [] table = INTERNAL_TABLES_BY_ID[table_id] from src.db import get_system_db + cursor = get_system_db().cursor() try: rows = cursor.execute( @@ -357,10 +357,7 @@ def get_schema(system_db_path: str, table_id: str) -> list[dict]: "WHERE table_name = ? ORDER BY ordinal_position", [table.source_table], ).fetchall() - return [ - {"name": r[0], "type": r[1], "nullable": r[2] == "YES"} - for r in rows - ] + return [{"name": r[0], "type": r[1], "nullable": r[2] == "YES"} for r in rows] finally: try: cursor.close() diff --git a/pyproject.toml b/pyproject.toml index 7c4e367..580370b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.54.12" +version = "0.54.13" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/services/session_pipeline/runner.py b/services/session_pipeline/runner.py index bcef002..6698d10 100644 --- a/services/session_pipeline/runner.py +++ b/services/session_pipeline/runner.py @@ -25,6 +25,43 @@ from src.repositories.session_processor_state import SessionProcessorStateReposi logger = logging.getLogger(__name__) + +def resolve_user_id( + conn: duckdb.DuckDBPyConnection, + username: str, +) -> str | None: + """Map a session-directory name to the stable ``users.id`` UUID. + + Two conventions exist for the directory name under + ``/data/user_sessions/``: + + * **Session collector** writes under the OS username, which in + current deployments equals the email local-part (e.g. ``alice``). + * **Upload API** writes under ``user["id"]`` — a UUID. + + Resolution order: + 1. Exact match on ``users.id`` (covers the UUID path). + 2. Email local-part match: ``users.email LIKE '@%'``. + If multiple users share the same local-part (different domains), + we pick the one most recently updated. + 3. Fallback: return ``None`` (orphaned / deleted user). + """ + row = conn.execute( + "SELECT id FROM users WHERE id = ?", + [username], + ).fetchone() + if row: + return row[0] + escaped = username.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") + row = conn.execute( + "SELECT id FROM users WHERE email LIKE ? || '@%' ESCAPE '\\' ORDER BY updated_at DESC NULLS LAST LIMIT 1", + [escaped], + ).fetchone() + if row: + return row[0] + return None + + DEFAULT_SESSION_DATA_DIR = Path(os.environ.get("SESSION_DATA_DIR", "/data/user_sessions")) @@ -60,6 +97,11 @@ def run_processor( logger.info("No sessions to process for processor=%s", processor.name) return stats + # Pre-resolve user_id per directory name so each processor can + # store the stable identity. Cache avoids repeated DB lookups when + # one user has many sessions. + _uid_cache: dict[str, str | None] = {} + for username, jsonl_path in candidates: session_key = f"{username}/{jsonl_path.name}" try: @@ -67,7 +109,9 @@ def run_processor( except Exception as e: logger.warning( "Cannot hash %s for processor=%s: %s", - session_key, processor.name, e, + session_key, + processor.name, + e, ) stats["errors"] += 1 stats["errors_detail"].append({"session": session_key, "error": str(e)}) @@ -82,12 +126,23 @@ def run_processor( stats["skipped"] += 1 continue + if username not in _uid_cache: + _uid_cache[username] = resolve_user_id(conn, username) + resolved_uid = _uid_cache[username] + try: - result = processor.process_session(jsonl_path, username, session_key, conn) + result = processor.process_session( + jsonl_path, + username, + session_key, + conn, + user_id=resolved_uid, + ) except Exception as e: logger.exception( "Processor %s failed on %s — leaving state unwritten for retry", - processor.name, session_key, + processor.name, + session_key, ) stats["errors"] += 1 stats["errors_detail"].append({"session": session_key, "error": str(e)}) @@ -101,7 +156,8 @@ def run_processor( # cause the same session to be retried forever. logger.warning( "Processor %s returned non-ProcessorResult on %s; coercing to empty result", - processor.name, session_key, + processor.name, + session_key, ) result = ProcessorResult(items_count=0) diff --git a/services/session_processors/usage.py b/services/session_processors/usage.py index 018e899..d0013d5 100644 --- a/services/session_processors/usage.py +++ b/services/session_processors/usage.py @@ -32,6 +32,8 @@ class UsageProcessor: username: str, session_key: str, conn: duckdb.DuckDBPyConnection, + *, + user_id: str | None = None, ) -> ProcessorResult: turns = parse_jsonl(session_path) events = list(iter_events(turns)) @@ -63,6 +65,7 @@ class UsageProcessor: "session_id": session_id, "session_file": session_key, "username": username, + "user_id": user_id, "event_uuid": e.event_uuid, "parent_uuid": e.parent_uuid, "event_type": e.event_type, @@ -83,6 +86,7 @@ class UsageProcessor: summary = compute_summary(turns, rows) summary["session_file"] = session_key summary["username"] = username + summary["user_id"] = user_id # Override session_id with the resolved one if not summary.get("session_id"): summary["session_id"] = session_id diff --git a/services/session_processors/usage_lib.py b/services/session_processors/usage_lib.py index 5265217..818b84e 100644 --- a/services/session_processors/usage_lib.py +++ b/services/session_processors/usage_lib.py @@ -40,13 +40,29 @@ from dataclasses import dataclass from datetime import datetime, timezone, timedelta from typing import Iterator -USAGE_PROCESSOR_VERSION = 3 +# v4: #293 added the user_id column to usage tables — bump forces the +# session-pipeline reprocess loop to backfill user_id on existing rows. +# (v3 was #303's slash extraction.) +USAGE_PROCESSOR_VERSION = 4 -BUILTIN_TOOLS = frozenset({ - "Bash", "Read", "Edit", "Write", "Grep", "Glob", "TodoWrite", - "Task", "Agent", "NotebookEdit", "WebFetch", "WebSearch", "ExitPlanMode", - "LS", # also built-in -}) +BUILTIN_TOOLS = frozenset( + { + "Bash", + "Read", + "Edit", + "Write", + "Grep", + "Glob", + "TodoWrite", + "Task", + "Agent", + "NotebookEdit", + "WebFetch", + "WebSearch", + "ExitPlanMode", + "LS", # also built-in + } +) # Claude Code wraps user-typed slash invocations as # / inside the user message content @@ -57,18 +73,23 @@ BUILTIN_TOOLS = frozenset({ COMMAND_NAME_RE = re.compile(r"/([A-Za-z][\w:-]*)") # Event types to skip entirely -_SKIP_TYPES = frozenset({ - "system", "summary", "file-history-snapshot", - "queue-operation", "progress", -}) +_SKIP_TYPES = frozenset( + { + "system", + "summary", + "file-history-snapshot", + "queue-operation", + "progress", + } +) @dataclass(frozen=True) class ParsedEvent: event_uuid: str | None parent_uuid: str | None - tool_id: str | None # tool_use 'id' (tu_xxx) from message.content item; None for slash_command - event_type: str # 'tool_use' | 'slash_command' | 'subagent' | 'mcp_call' + tool_id: str | None # tool_use 'id' (tu_xxx) from message.content item; None for slash_command + event_type: str # 'tool_use' | 'slash_command' | 'subagent' | 'mcp_call' tool_name: str | None skill_name: str | None subagent_type: str | None @@ -207,9 +228,7 @@ def iter_events(turns: list[dict]) -> Iterator[ParsedEvent]: text_parts = [content] elif isinstance(content, list): text_parts = [ - item.get("text", "") - for item in content - if isinstance(item, dict) and item.get("type") == "text" + item.get("text", "") for item in content if isinstance(item, dict) and item.get("type") == "text" ] else: text_parts = [] @@ -243,7 +262,7 @@ class AttributionLookup: """ def __init__(self, conn): - self._skills: dict[str, tuple[str, str]] = {} # name -> (source, ref_id) + self._skills: dict[str, tuple[str, str]] = {} # name -> (source, ref_id) self._agents: dict[str, tuple[str, str]] = {} self._commands: dict[str, tuple[str, str]] = {} @@ -375,9 +394,7 @@ def compute_summary(turns: list[dict], events: list[dict]) -> dict: started_at = min(timestamps) if timestamps else None ended_at = max(timestamps) if timestamps else None - wall_seconds = ( - int((ended_at - started_at).total_seconds()) if started_at and ended_at else 0 - ) + wall_seconds = int((ended_at - started_at).total_seconds()) if started_at and ended_at else 0 active_seconds = compute_active_seconds(timestamps) # Aggregate counts from events diff --git a/services/session_processors/verification.py b/services/session_processors/verification.py index fe0e12e..354e483 100644 --- a/services/session_processors/verification.py +++ b/services/session_processors/verification.py @@ -42,6 +42,7 @@ class VerificationProcessor: username: str, session_key: str, conn: duckdb.DuckDBPyConnection, + **kwargs: object, ) -> ProcessorResult: repo = KnowledgeRepository(conn) session_id = f"session-{session_path.stem}-{username}" @@ -126,7 +127,8 @@ class VerificationProcessor: except Exception as e: logger.warning( "Duplicate-candidate detection failed for %s: %s", - item_id, e, + item_id, + e, ) # Run contradiction detection inline. Failure of the LLM @@ -140,12 +142,15 @@ class VerificationProcessor: except Exception as e: logger.warning( "Unexpected error during contradiction check for %s: %s", - item_id, e, + item_id, + e, ) logger.info( "Processed %s: %d verifications, %d items created", - session_key, len(verifications), items_created, + session_key, + len(verifications), + items_created, ) return ProcessorResult(items_count=items_created) diff --git a/src/db.py b/src/db.py index d11aff8..a4a67df 100644 --- a/src/db.py +++ b/src/db.py @@ -40,7 +40,7 @@ def _maybe_instrument(con, db_tag: str): _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 44 +SCHEMA_VERSION = 45 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -723,13 +723,18 @@ CREATE TABLE IF NOT EXISTS usage_events ( occurred_at TIMESTAMP NOT NULL, processor_version INTEGER NOT NULL, extracted_at TIMESTAMP DEFAULT current_timestamp, - friction_tags JSON + friction_tags JSON, + user_id VARCHAR ); CREATE INDEX IF NOT EXISTS idx_usage_events_session ON usage_events(session_id); CREATE INDEX IF NOT EXISTS idx_usage_events_user_time ON usage_events(username, occurred_at); CREATE INDEX IF NOT EXISTS idx_usage_events_tool ON usage_events(tool_name); CREATE INDEX IF NOT EXISTS idx_usage_events_skill ON usage_events(skill_name); CREATE INDEX IF NOT EXISTS idx_usage_events_ref ON usage_events(source, ref_id); +-- idx_usage_events_user_id is created by _v44_to_v45, not here: _SYSTEM_SCHEMA +-- runs before the migration ladder, and CREATE TABLE IF NOT EXISTS won't add +-- user_id to a pre-v45 usage_events, so an index on it would fail to bind. +-- Same pattern as the v41 audit_log indices below. CREATE TABLE IF NOT EXISTS usage_session_summary ( session_file VARCHAR PRIMARY KEY, @@ -760,10 +765,13 @@ CREATE TABLE IF NOT EXISTS usage_session_summary ( input_tokens BIGINT DEFAULT 0, output_tokens BIGINT DEFAULT 0, cache_read_tokens BIGINT DEFAULT 0, - cache_creation_tokens BIGINT DEFAULT 0 + cache_creation_tokens BIGINT DEFAULT 0, + user_id VARCHAR ); CREATE INDEX IF NOT EXISTS idx_usage_session_user ON usage_session_summary(username); CREATE INDEX IF NOT EXISTS idx_usage_session_started ON usage_session_summary(started_at); +-- idx_usage_session_user_id is created by _v44_to_v45, not here — see the +-- note on idx_usage_events_user_id above. CREATE TABLE IF NOT EXISTS usage_tool_daily ( day DATE NOT NULL, @@ -881,15 +889,16 @@ def _try_open_system_db(db_path: str) -> duckdb.DuckDBPyConnection: snapshot = Path(db_path).parent / "system.duckdb.pre-migrate" if not snapshot.exists(): logger.error( - "WAL replay failed and no pre-migrate snapshot at %s — " - "manual recovery required.", snapshot, + "WAL replay failed and no pre-migrate snapshot at %s — manual recovery required.", + snapshot, ) raise wal_path = Path(db_path + ".wal") logger.warning( "WAL replay failed (%s) — auto-restoring from pre-migrate " "snapshot %s. The migration ladder will re-run on this start.", - msg.split("\n", 1)[0][:200], snapshot, + msg.split("\n", 1)[0][:200], + snapshot, ) # Move (not copy) the broken DB aside so an operator can post- # mortem if needed. The pre-migrate snapshot becomes the new @@ -961,9 +970,7 @@ def get_analytics_db() -> duckdb.DuckDBPyConnection: return _maybe_instrument(_analytics_db_conn.cursor(), "analytics") -def _reattach_remote_extensions( - conn: duckdb.DuckDBPyConnection, extracts_dir: Path -) -> None: +def _reattach_remote_extensions(conn: duckdb.DuckDBPyConnection, extracts_dir: Path) -> None: """Re-LOAD DuckDB extensions listed in _remote_attach tables of each extract.duckdb. Called from get_analytics_db_readonly() after ATTACHing extract.duckdb files so @@ -974,9 +981,7 @@ def _reattach_remote_extensions( return try: - attached_dbs = { - r[0] for r in conn.execute("SELECT database_name FROM duckdb_databases()").fetchall() - } + attached_dbs = {r[0] for r in conn.execute("SELECT database_name FROM duckdb_databases()").fetchall()} except Exception: return @@ -1013,9 +1018,7 @@ def _reattach_remote_extensions( # Refresh attached list before processing each source's rows try: - attached_dbs = { - r[0] for r in conn.execute("SELECT database_name FROM duckdb_databases()").fetchall() - } + attached_dbs = {r[0] for r in conn.execute("SELECT database_name FROM duckdb_databases()").fetchall()} except Exception: pass @@ -1042,7 +1045,8 @@ def _reattach_remote_extensions( "query-path remote_attach: extension %r not in allowlist; " "refusing to LOAD/ATTACH for source %s. Override via " "AGNES_REMOTE_ATTACH_EXTENSIONS if intended.", - extension, alias, + extension, + alias, ) continue if token_env and not is_token_env_allowed(token_env): @@ -1050,7 +1054,8 @@ def _reattach_remote_extensions( "query-path remote_attach: token_env %r not in allowlist; " "refusing for source %s. Override via " "AGNES_REMOTE_ATTACH_TOKEN_ENVS if intended.", - token_env, alias, + token_env, + alias, ) continue if alias in attached_dbs: @@ -1081,25 +1086,20 @@ def _reattach_remote_extensions( except BQMetadataAuthError as e: logger.error( "Failed to fetch BQ metadata token for %s: %s — skipping ATTACH", - alias, e, + alias, + e, ) continue escaped = escape_sql_string_literal(bq_token) secret_name = f"bq_secret_{alias}" - conn.execute( - f"CREATE OR REPLACE SECRET {secret_name} " - f"(TYPE bigquery, ACCESS_TOKEN '{escaped}')" - ) + conn.execute(f"CREATE OR REPLACE SECRET {secret_name} (TYPE bigquery, ACCESS_TOKEN '{escaped}')") from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) - conn.execute( - f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" - ) + conn.execute(f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)") elif token: escaped_token = escape_sql_string_literal(token) - conn.execute( - f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" - ) + conn.execute(f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')") # Apply BQ session settings on every BQ-extension attach, # not only the metadata-token branch above. Previously the # token-based branch fell through without setting @@ -1107,13 +1107,13 @@ def _reattach_remote_extensions( # in place and causing "remote query timeout" surprises. if extension == "bigquery": from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) else: - conn.execute( - f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" - ) + conn.execute(f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)") if extension == "bigquery": from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) attached_dbs.add(alias) logger.debug("Re-attached remote source %s via %s extension", alias, extension) @@ -1434,8 +1434,7 @@ _V16_TO_V17_MIGRATIONS = [ PRIMARY KEY (item_a_id, item_b_id, relation_type) ) """, - "CREATE INDEX IF NOT EXISTS idx_knowledge_item_relations_resolved " - "ON knowledge_item_relations(resolved)", + "CREATE INDEX IF NOT EXISTS idx_knowledge_item_relations_resolved ON knowledge_item_relations(resolved)", ] @@ -1458,14 +1457,15 @@ _V19_TO_V20_MIGRATIONS = [ # first so implies references resolve cleanly when expand_implies does BFS. _CORE_ROLES_SEED = [ # (key, display_name, description, implies) - ("core.viewer", "Viewer", - "Read-only access to permitted datasets.", []), - ("core.analyst", "Analyst", - "Default user role; query data, run analyses.", ["core.viewer"]), - ("core.km_admin", "Knowledge-management admin", - "Manages metric definitions and column metadata.", ["core.analyst"]), - ("core.admin", "Administrator", - "Full system access; bypasses dataset_permissions.", ["core.km_admin"]), + ("core.viewer", "Viewer", "Read-only access to permitted datasets.", []), + ("core.analyst", "Analyst", "Default user role; query data, run analyses.", ["core.viewer"]), + ( + "core.km_admin", + "Knowledge-management admin", + "Manages metric definitions and column metadata.", + ["core.analyst"], + ), + ("core.admin", "Administrator", "Full system access; bypasses dataset_permissions.", ["core.km_admin"]), ] # Maps the legacy users.role string values onto core.* keys for the v8→v9 @@ -1486,10 +1486,8 @@ SYSTEM_EVERYONE_GROUP = "Everyone" # app.auth.access (admin short-circuit) and the OAuth callback (default # Everyone membership for new users); changing them is a breaking change. _SYSTEM_GROUPS_SEED = [ - (SYSTEM_ADMIN_GROUP, - "System: full access to all data and admin actions"), - (SYSTEM_EVERYONE_GROUP, - "System: default group every user is implicitly a member of"), + (SYSTEM_ADMIN_GROUP, "System: full access to all data and admin actions"), + (SYSTEM_EVERYONE_GROUP, "System: default group every user is implicitly a member of"), ] @@ -1505,9 +1503,7 @@ def _seed_system_groups(conn: duckdb.DuckDBPyConnection) -> None: import uuid as _uuid for name, description in _SYSTEM_GROUPS_SEED: - existing = conn.execute( - "SELECT id, is_system FROM user_groups WHERE name = ?", [name] - ).fetchone() + existing = conn.execute("SELECT id, is_system FROM user_groups WHERE name = ?", [name]).fetchone() if existing is None: conn.execute( """INSERT INTO user_groups (id, name, description, is_system, created_by) @@ -1549,9 +1545,7 @@ def _v12_to_v13_finalize(conn: duckdb.DuckDBPyConnection) -> None: try: _seed_system_groups(conn) - admin_group_id = conn.execute( - "SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP] - ).fetchone()[0] + admin_group_id = conn.execute("SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP]).fetchone()[0] everyone_group_id = conn.execute( "SELECT id FROM user_groups WHERE name = ?", [SYSTEM_EVERYONE_GROUP] ).fetchone()[0] @@ -1560,16 +1554,14 @@ def _v12_to_v13_finalize(conn: duckdb.DuckDBPyConnection) -> None: # column having been physically dropped already (re-run safety) and of # malformed JSON (caught row-by-row, skipped silently). has_groups_col = conn.execute( - "SELECT 1 FROM information_schema.columns " - "WHERE table_name = 'users' AND column_name = 'groups'" + "SELECT 1 FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'groups'" ).fetchone() if has_groups_col: - rows = conn.execute( - "SELECT id, groups FROM users WHERE groups IS NOT NULL" - ).fetchall() + rows = conn.execute("SELECT id, groups FROM users WHERE groups IS NOT NULL").fetchall() for user_id, groups_json in rows: try: import json as _json + names = _json.loads(groups_json) if isinstance(groups_json, str) else (groups_json or []) except (ValueError, TypeError): names = [] @@ -1579,7 +1571,8 @@ def _v12_to_v13_finalize(conn: duckdb.DuckDBPyConnection) -> None: if not isinstance(name, str) or not name.strip(): continue group_row = conn.execute( - "SELECT id FROM user_groups WHERE name = ?", [name], + "SELECT id FROM user_groups WHERE name = ?", + [name], ).fetchone() if not group_row: continue @@ -1592,9 +1585,9 @@ def _v12_to_v13_finalize(conn: duckdb.DuckDBPyConnection) -> None: ) except duckdb.ConstraintException: logger.debug( - "v13 backfill step 2 (google_sync): skipped " - "insert for user=%s group=%s — already present", - user_id, name, + "v13 backfill step 2 (google_sync): skipped insert for user=%s group=%s — already present", + user_id, + name, ) # 3. core.admin grants → Admin membership. Tolerant of either table being @@ -1670,7 +1663,8 @@ def _v12_to_v13_finalize(conn: duckdb.DuckDBPyConnection) -> None: logger.debug( "v13 backfill step 5 (resource_grants): skipped " "insert for group=%s resource=%s — already migrated", - group_id, resource_id, + group_id, + resource_id, ) # Audit: log any non-core capability grants before dropping the @@ -1695,7 +1689,8 @@ def _v12_to_v13_finalize(conn: duckdb.DuckDBPyConnection) -> None: "If this role was registered via register_internal_role(), " "the affected users need to be re-added to an " "appropriate user_group post-upgrade.", - cnt, role_key, + cnt, + role_key, ) # 6. Drop legacy tables in FK-correct order: dependent tables first. @@ -1754,8 +1749,7 @@ def _v13_to_v14_finalize(conn: duckdb.DuckDBPyConnection) -> None: ).fetchone()[0] if orphan_members: logger.warning( - "v14 migration: dropping %d orphan user_group_members rows " - "(group_id pointed at a deleted user_groups.id)", + "v14 migration: dropping %d orphan user_group_members rows (group_id pointed at a deleted user_groups.id)", orphan_members, ) if orphan_grants: @@ -1778,9 +1772,7 @@ def _v13_to_v14_finalize(conn: duckdb.DuckDBPyConnection) -> None: ) # user_group_members rebuild - conn.execute( - "ALTER TABLE user_group_members RENAME TO user_group_members_v13_pre" - ) + conn.execute("ALTER TABLE user_group_members RENAME TO user_group_members_v13_pre") conn.execute( """CREATE TABLE user_group_members ( user_id VARCHAR NOT NULL, @@ -1800,9 +1792,7 @@ def _v13_to_v14_finalize(conn: duckdb.DuckDBPyConnection) -> None: conn.execute("DROP TABLE user_group_members_v13_pre") # resource_grants rebuild - conn.execute( - "ALTER TABLE resource_grants RENAME TO resource_grants_v13_pre" - ) + conn.execute("ALTER TABLE resource_grants RENAME TO resource_grants_v13_pre") conn.execute( """CREATE TABLE resource_grants ( id VARCHAR PRIMARY KEY, @@ -1913,11 +1903,13 @@ def _v18_to_v19_finalize(conn: duckdb.DuckDBPyConnection) -> None: `primary_key`) still migrate cleanly. Wrapped in BEGIN/COMMIT; on error ROLLBACK and the outer caller skips the schema_version bump. """ + def _existing_cols(table: str) -> set[str]: return { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = ?", [table], + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = ?", + [table], ).fetchall() } @@ -1951,26 +1943,29 @@ def _v18_to_v19_finalize(conn: duckdb.DuckDBPyConnection) -> None: )""" ) users_target_cols = [ - "id", "email", "name", "password_hash", - "setup_token", "setup_token_created", - "reset_token", "reset_token_created", - "active", "deactivated_at", "deactivated_by", - "created_at", "updated_at", + "id", + "email", + "name", + "password_hash", + "setup_token", + "setup_token_created", + "reset_token", + "reset_token_created", + "active", + "deactivated_at", + "deactivated_by", + "created_at", + "updated_at", ] old_users_cols = _existing_cols("users_v18_pre") common = [c for c in users_target_cols if c in old_users_cols] col_list = ", ".join(common) - conn.execute( - f"INSERT INTO users ({col_list}) " - f"SELECT {col_list} FROM users_v18_pre" - ) + conn.execute(f"INSERT INTO users ({col_list}) SELECT {col_list} FROM users_v18_pre") conn.execute("DROP TABLE users_v18_pre") # 4: rebuild table_registry without `is_public` column. if "is_public" in _existing_cols("table_registry"): - conn.execute( - "ALTER TABLE table_registry RENAME TO table_registry_v18_pre" - ) + conn.execute("ALTER TABLE table_registry RENAME TO table_registry_v18_pre") conn.execute( """CREATE TABLE table_registry ( id VARCHAR PRIMARY KEY, @@ -1990,18 +1985,25 @@ def _v18_to_v19_finalize(conn: duckdb.DuckDBPyConnection) -> None: )""" ) registry_target_cols = [ - "id", "name", "source_type", "bucket", "source_table", - "sync_strategy", "query_mode", "sync_schedule", - "profile_after_sync", "primary_key", "folder", - "description", "registered_by", "registered_at", + "id", + "name", + "source_type", + "bucket", + "source_table", + "sync_strategy", + "query_mode", + "sync_schedule", + "profile_after_sync", + "primary_key", + "folder", + "description", + "registered_by", + "registered_at", ] old_registry_cols = _existing_cols("table_registry_v18_pre") common = [c for c in registry_target_cols if c in old_registry_cols] col_list = ", ".join(common) - conn.execute( - f"INSERT INTO table_registry ({col_list}) " - f"SELECT {col_list} FROM table_registry_v18_pre" - ) + conn.execute(f"INSERT INTO table_registry ({col_list}) SELECT {col_list} FROM table_registry_v18_pre") conn.execute("DROP TABLE table_registry_v18_pre") conn.execute("COMMIT") @@ -2025,9 +2027,7 @@ def _seed_core_roles(conn: duckdb.DuckDBPyConnection) -> None: import uuid as _uuid for key, display_name, description, implies in _CORE_ROLES_SEED: - existing = conn.execute( - "SELECT id FROM internal_roles WHERE key = ?", [key] - ).fetchone() + existing = conn.execute("SELECT id FROM internal_roles WHERE key = ?", [key]).fetchone() implies_json = _json.dumps(implies) if existing: conn.execute( @@ -2059,25 +2059,21 @@ def _backfill_users_role_to_grants(conn: duckdb.DuckDBPyConnection) -> None: # Verify users.role column still exists (we may be re-running after a # half-applied migration); skip silently if it's already gone. has_role_col = conn.execute( - "SELECT 1 FROM information_schema.columns " - "WHERE table_name = 'users' AND column_name = 'role'" + "SELECT 1 FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'role'" ).fetchone() if not has_role_col: return - rows = conn.execute( - "SELECT id, role FROM users WHERE role IS NOT NULL" - ).fetchall() + rows = conn.execute("SELECT id, role FROM users WHERE role IS NOT NULL").fetchall() backfilled = 0 for user_id, role_str in rows: role_key = _LEGACY_ROLE_TO_CORE_KEY.get(role_str, "core.viewer") - role_row = conn.execute( - "SELECT id FROM internal_roles WHERE key = ?", [role_key] - ).fetchone() + role_row = conn.execute("SELECT id FROM internal_roles WHERE key = ?", [role_key]).fetchone() if not role_row: logger.warning( "v9 backfill: core role %s missing — skipping user %s", - role_key, user_id, + role_key, + user_id, ) continue try: @@ -2363,8 +2359,7 @@ def _v28_to_v29_finalize(conn) -> None: are gone after the first run and the seed INSERTs use ON CONFLICT. """ has_welcome = conn.execute( - "SELECT 1 FROM information_schema.tables " - "WHERE table_schema = 'main' AND table_name = 'welcome_template'" + "SELECT 1 FROM information_schema.tables WHERE table_schema = 'main' AND table_name = 'welcome_template'" ).fetchone() if has_welcome: conn.execute( @@ -2375,8 +2370,7 @@ def _v28_to_v29_finalize(conn) -> None: conn.execute("DROP TABLE welcome_template") has_claude_md = conn.execute( - "SELECT 1 FROM information_schema.tables " - "WHERE table_schema = 'main' AND table_name = 'claude_md_template'" + "SELECT 1 FROM information_schema.tables WHERE table_schema = 'main' AND table_name = 'claude_md_template'" ).fetchone() if has_claude_md: conn.execute( @@ -2391,8 +2385,7 @@ def _v28_to_v29_finalize(conn) -> None: # operators) or via a prior migration run (idempotent re-execution). for key in ("welcome", "claude_md", "home"): conn.execute( - "INSERT INTO instance_templates (key, content) VALUES (?, NULL) " - "ON CONFLICT (key) DO NOTHING", + "INSERT INTO instance_templates (key, content) VALUES (?, NULL) ON CONFLICT (key) DO NOTHING", [key], ) @@ -2531,15 +2524,12 @@ def _v34_to_v35_migrate(conn: duckdb.DuckDBPyConnection) -> None: The audit columns (``archived_at``, ``archived_by``) ship first behind ``IF NOT EXISTS`` so they're safe in all three states. """ - conn.execute( - "ALTER TABLE store_entities ADD COLUMN IF NOT EXISTS archived_at TIMESTAMP" - ) - conn.execute( - "ALTER TABLE store_entities ADD COLUMN IF NOT EXISTS archived_by VARCHAR" - ) + conn.execute("ALTER TABLE store_entities ADD COLUMN IF NOT EXISTS archived_at TIMESTAMP") + conn.execute("ALTER TABLE store_entities ADD COLUMN IF NOT EXISTS archived_by VARCHAR") cols = { - r[0] for r in conn.execute( + r[0] + for r in conn.execute( "SELECT column_name FROM information_schema.columns " "WHERE table_name = 'store_entities' " " AND column_name IN ('visibility_status', '_vis_v35')" @@ -2553,18 +2543,10 @@ def _v34_to_v35_migrate(conn: duckdb.DuckDBPyConnection) -> None: # in v36 (DuckDB ALTER COLUMN supports SET NOT NULL / SET DEFAULT # but not ADD CHECK on an existing column). Value-list enforcement # is application-side via VALID_VISIBILITY in StoreEntitiesRepository. - conn.execute( - "ALTER TABLE store_entities ADD COLUMN _vis_v35 VARCHAR" - ) - conn.execute( - "UPDATE store_entities SET _vis_v35 = visibility_status" - ) - conn.execute( - "ALTER TABLE store_entities DROP COLUMN visibility_status" - ) - conn.execute( - "ALTER TABLE store_entities RENAME COLUMN _vis_v35 TO visibility_status" - ) + conn.execute("ALTER TABLE store_entities ADD COLUMN _vis_v35 VARCHAR") + conn.execute("UPDATE store_entities SET _vis_v35 = visibility_status") + conn.execute("ALTER TABLE store_entities DROP COLUMN visibility_status") + conn.execute("ALTER TABLE store_entities RENAME COLUMN _vis_v35 TO visibility_status") elif has_temp and not has_vis: # Partial-rebuild recovery — prior attempt dropped visibility_status # but the RENAME never landed. Data is already in _vis_v35 from @@ -2573,19 +2555,14 @@ def _v34_to_v35_migrate(conn: duckdb.DuckDBPyConnection) -> None: "v34→v35 detected partial-rebuild state (visibility_status " "missing, _vis_v35 present); recovering via RENAME" ) - conn.execute( - "ALTER TABLE store_entities RENAME COLUMN _vis_v35 TO visibility_status" - ) + conn.execute("ALTER TABLE store_entities RENAME COLUMN _vis_v35 TO visibility_status") elif has_vis and has_temp: # Both present — earlier rebuild aborted before the DROP. # visibility_status holds the canonical values; drop the temp. logger.warning( - "v34→v35 detected partial-rebuild state (both visibility_status " - "and _vis_v35 present); dropping the temp" - ) - conn.execute( - "ALTER TABLE store_entities DROP COLUMN _vis_v35" + "v34→v35 detected partial-rebuild state (both visibility_status and _vis_v35 present); dropping the temp" ) + conn.execute("ALTER TABLE store_entities DROP COLUMN _vis_v35") # else: neither column is present, which means store_entities itself # is at a shape ahead of v34. _SYSTEM_SCHEMA above already created # the post-v35 shape; nothing to do here. @@ -2894,10 +2871,7 @@ def _v42_to_v43(conn: duckdb.DuckDBPyConnection) -> None: UNIQUE (user_id, name) ) """) - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_obs_views_user " - "ON user_observability_views(user_id, created_at)" - ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_obs_views_user ON user_observability_views(user_id, created_at)") def _v43_to_v44(conn: duckdb.DuckDBPyConnection) -> None: @@ -2915,19 +2889,35 @@ def _v43_to_v44(conn: duckdb.DuckDBPyConnection) -> None: from 1 → 2 in the same release, which the session-pipeline reprocess loop uses to invalidate stale summaries. """ - conn.execute( - "ALTER TABLE users ADD COLUMN IF NOT EXISTS last_pull_at TIMESTAMP" - ) + conn.execute("ALTER TABLE users ADD COLUMN IF NOT EXISTS last_pull_at TIMESTAMP") for col in ( "input_tokens", "output_tokens", "cache_read_tokens", "cache_creation_tokens", ): - conn.execute( - f"ALTER TABLE usage_session_summary " - f"ADD COLUMN IF NOT EXISTS {col} BIGINT DEFAULT 0" - ) + conn.execute(f"ALTER TABLE usage_session_summary ADD COLUMN IF NOT EXISTS {col} BIGINT DEFAULT 0") + + +def _v44_to_v45(conn: duckdb.DuckDBPyConnection) -> None: + """v45: add user_id column to usage tables for stable RBAC filtering. + + The ``username`` column in ``usage_session_summary`` / ``usage_events`` + stores the directory name from the session-data path, which is either + an email local-part (session collector) or a UUID (upload API). Email + local-parts are unstable — they change when users rename. ``user_id`` + is the stable identity and becomes the authoritative RBAC filter + column for the ``agnes_sessions`` / ``agnes_telemetry`` aliases. + + Backfill: the UsageProcessor populates ``user_id`` on every + (re)process run. Existing rows get backfilled when + ``USAGE_PROCESSOR_VERSION`` bumps, which triggers the session-pipeline + reprocess loop. + """ + conn.execute("ALTER TABLE usage_session_summary ADD COLUMN IF NOT EXISTS user_id VARCHAR") + conn.execute("ALTER TABLE usage_events ADD COLUMN IF NOT EXISTS user_id VARCHAR") + conn.execute("CREATE INDEX IF NOT EXISTS idx_usage_session_user_id ON usage_session_summary(user_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_usage_events_user_id ON usage_events(user_id)") _V33_TO_V34_MIGRATIONS = [ @@ -3049,8 +3039,10 @@ def _replace_for_v24(project_id: str): function-form replacement is the defensive idiom — it makes the intent explicit and removes the dependency on re.sub's replacement- string escaping rules.""" + def _repl(m): return f"`{project_id}.{m.group(1)}.{m.group(2)}`" + return _repl @@ -3059,6 +3051,7 @@ def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None: try: from app.instance_config import get_value + project_id = get_value("data_source", "bigquery", "project", default="") or "" except Exception: project_id = "" @@ -3092,7 +3085,7 @@ def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None: raise RuntimeError( f"v24 migration cannot complete: {len(rows)} materialized " f"BigQuery row(s) need their source_query rewritten from " - f"DuckDB-flavor `bq.\"ds\".\"tbl\"` to BQ-native " + f'DuckDB-flavor `bq."ds"."tbl"` to BQ-native ' f"`.ds.tbl`, but `data_source.bigquery.project` is " f"not configured. Set it via /admin/server-config (or " f"`instance.yaml: data_source.bigquery.project`) and restart " @@ -3113,7 +3106,8 @@ def _v23_to_v24_finalize(conn: duckdb.DuckDBPyConnection) -> None: [new_sq, row_id], ) logger.info( - "v24 migration: rewrote source_query for row %r", row_id, + "v24 migration: rewrote source_query for row %r", + row_id, ) conn.execute("COMMIT") except Exception: @@ -3179,16 +3173,12 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: [SCHEMA_VERSION], ) # v22 setup_banner row (kept as compat per CLAUDE.md schema notes). - conn.execute( - "INSERT INTO setup_banner (id, content) VALUES (1, NULL) " - "ON CONFLICT (id) DO NOTHING" - ) + conn.execute("INSERT INTO setup_banner (id, content) VALUES (1, NULL) ON CONFLICT (id) DO NOTHING") # v26 instance_templates seed — three canonical keys with NULL # content (operator override absent → render OSS default). for key in ("welcome", "claude_md", "home"): conn.execute( - "INSERT INTO instance_templates (key, content) VALUES (?, NULL) " - "ON CONFLICT (key) DO NOTHING", + "INSERT INTO instance_templates (key, content) VALUES (?, NULL) ON CONFLICT (key) DO NOTHING", [key], ) # v41 audit_log indices: _SYSTEM_SCHEMA omits CREATE INDEX to @@ -3207,6 +3197,10 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: # them on fresh installs (no-op ALTERs); kept here for the # ladder's chronological readability. _v43_to_v44(conn) + # v45 user_id column on usage tables. _SYSTEM_SCHEMA declares + # the columns for fresh installs; migration adds them for + # existing DBs. No-op on fresh. + _v44_to_v45(conn) # Fresh-install seed is handled by the unconditional # _seed_core_roles call at the bottom of _ensure_schema — # left as a no-op branch here so the migration ladder still @@ -3248,8 +3242,7 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: # Skip UPDATE if the column never existed (e.g. test fixtures # starting from v2/v3 with a hand-crafted minimal users table). has_role_col = conn.execute( - "SELECT 1 FROM information_schema.columns " - "WHERE table_name = 'users' AND column_name = 'role'" + "SELECT 1 FROM information_schema.columns WHERE table_name = 'users' AND column_name = 'role'" ).fetchone() if has_role_col: conn.execute("UPDATE users SET role = NULL") @@ -3350,6 +3343,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: _v42_to_v43(conn) if current < 44: _v43_to_v44(conn) + if current < 45: + _v44_to_v45(conn) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], @@ -3383,7 +3378,8 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: "this process before any container restart is the " "safe path; otherwise, the next start may need to " "restore from %s.", - e, _get_state_dir() / "system.duckdb.pre-migrate", + e, + _get_state_dir() / "system.duckdb.pre-migrate", ) # Always run the system-groups seed when the DB is on a version this binary diff --git a/src/repositories/usage.py b/src/repositories/usage.py index d5fc4bf..3c25e91 100644 --- a/src/repositories/usage.py +++ b/src/repositories/usage.py @@ -16,15 +16,31 @@ class UsageRepository: if not rows: return 0 cols = [ - "id", "session_id", "session_file", "username", "event_uuid", "parent_uuid", - "event_type", "tool_name", "skill_name", "subagent_type", "command_name", - "is_error", "source", "ref_id", "model", "cwd", "occurred_at", "processor_version", + "id", + "session_id", + "session_file", + "username", + "event_uuid", + "parent_uuid", + "event_type", + "tool_name", + "skill_name", + "subagent_type", + "command_name", + "is_error", + "source", + "ref_id", + "model", + "cwd", + "occurred_at", + "processor_version", + "user_id", ] placeholders = ",".join("?" for _ in cols) sql = f"INSERT OR IGNORE INTO usage_events ({','.join(cols)}) VALUES ({placeholders})" - self.conn.executemany(sql, [ - [r.get(c) if c != "processor_version" else processor_version for c in cols] for r in rows - ]) + self.conn.executemany( + sql, [[r.get(c) if c != "processor_version" else processor_version for c in cols] for r in rows] + ) return len(rows) def upsert_summary(self, summary: dict, *, processor_version: int) -> None: @@ -37,8 +53,8 @@ class UsageRepository: tool_calls, tool_errors, skill_invocations, subagent_dispatches, mcp_calls, slash_commands, distinct_tools, distinct_skills, primary_model, input_tokens, output_tokens, cache_read_tokens, - cache_creation_tokens, processor_version) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + cache_creation_tokens, processor_version, user_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ summary["session_file"], @@ -64,18 +80,15 @@ class UsageRepository: summary.get("cache_read_tokens", 0), summary.get("cache_creation_tokens", 0), processor_version, + summary.get("user_id"), ], ) def purge_for_session(self, session_file: str) -> int: """DELETE events + summary for one session — used on reprocess.""" - r = self.conn.execute( - "DELETE FROM usage_events WHERE session_file = ?", [session_file] - ) + r = self.conn.execute("DELETE FROM usage_events WHERE session_file = ?", [session_file]) events_deleted = r.rowcount if r.rowcount else 0 - self.conn.execute( - "DELETE FROM usage_session_summary WHERE session_file = ?", [session_file] - ) + self.conn.execute("DELETE FROM usage_session_summary WHERE session_file = ?", [session_file]) return events_deleted def delete_older_than(self, days: int) -> int: diff --git a/tests/test_db_schema_version.py b/tests/test_db_schema_version.py index 4592f89..b67d9ad 100644 --- a/tests/test_db_schema_version.py +++ b/tests/test_db_schema_version.py @@ -8,6 +8,7 @@ The v19 step (#150) drops dataset_permissions, access_requests tables and users.role, table_registry.is_public columns; v20 then ALTERs the post-v19 table_registry to add the source_query column. """ + import duckdb from src.db import SCHEMA_VERSION, _ensure_schema, get_schema_version @@ -98,7 +99,7 @@ def test_schema_version_is_44(): # output_tokens, cache_read_tokens, cache_creation_tokens). # USAGE_PROCESSOR_VERSION simultaneously bumps 1→2 so the # reprocess loop backfills tokens on next tick. - assert SCHEMA_VERSION == 44 + assert SCHEMA_VERSION == 45 def test_v37_marketplace_curator_columns(tmp_path): @@ -109,9 +110,9 @@ def test_v37_marketplace_curator_columns(tmp_path): _ensure_schema(conn) registry_cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'marketplace_registry'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'marketplace_registry'" ).fetchall() } assert {"curator_name", "curator_email"} <= registry_cols, ( @@ -119,9 +120,9 @@ def test_v37_marketplace_curator_columns(tmp_path): ) plugin_cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'marketplace_plugins'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'marketplace_plugins'" ).fetchall() } assert {"cover_photo_url", "video_url", "doc_links"} <= plugin_cols, ( @@ -139,10 +140,7 @@ def test_v36_db_migrates_to_current(tmp_path): # Stand up a minimal v36-shape registry + plugin row, plus the # schema_version row that pins us to 36. - conn.execute( - "CREATE TABLE schema_version (version INTEGER, " - "applied_at TIMESTAMP DEFAULT current_timestamp)" - ) + conn.execute("CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)") conn.execute("INSERT INTO schema_version (version) VALUES (36)") conn.execute("""CREATE TABLE marketplace_registry ( id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL, @@ -161,22 +159,15 @@ def test_v36_db_migrates_to_current(tmp_path): PRIMARY KEY (marketplace_id, name) )""") conn.execute( - "INSERT INTO marketplace_registry (id, name, url) " - "VALUES ('legacy', 'Legacy', 'https://example.com/repo.git')" - ) - conn.execute( - "INSERT INTO marketplace_plugins (marketplace_id, name) " - "VALUES ('legacy', 'foo')" + "INSERT INTO marketplace_registry (id, name, url) VALUES ('legacy', 'Legacy', 'https://example.com/repo.git')" ) + conn.execute("INSERT INTO marketplace_plugins (marketplace_id, name) VALUES ('legacy', 'foo')") _ensure_schema(conn) assert get_schema_version(conn) == SCHEMA_VERSION # v37 enrichment columns exist; existing rows preserved with NULL. - row = conn.execute( - "SELECT curator_name, curator_email FROM marketplace_registry " - "WHERE id = 'legacy'" - ).fetchone() + row = conn.execute("SELECT curator_name, curator_email FROM marketplace_registry WHERE id = 'legacy'").fetchone() assert row == (None, None) row = conn.execute( @@ -196,27 +187,18 @@ def test_v39_adds_marketplace_plugins_is_system(tmp_path): _ensure_schema(conn) cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'marketplace_plugins'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'marketplace_plugins'" ).fetchall() } assert "is_system" in cols, f"is_system missing from {cols}" # New rows default to FALSE — required so a freshly-synced plugin # doesn't accidentally land in everyone's stack. - conn.execute( - "INSERT INTO marketplace_registry (id, name, url) " - "VALUES ('m', 'M', 'https://example.com/repo.git')" - ) - conn.execute( - "INSERT INTO marketplace_plugins (marketplace_id, name) " - "VALUES ('m', 'p')" - ) - row = conn.execute( - "SELECT is_system FROM marketplace_plugins " - "WHERE marketplace_id = 'm' AND name = 'p'" - ).fetchone() + conn.execute("INSERT INTO marketplace_registry (id, name, url) VALUES ('m', 'M', 'https://example.com/repo.git')") + conn.execute("INSERT INTO marketplace_plugins (marketplace_id, name) VALUES ('m', 'p')") + row = conn.execute("SELECT is_system FROM marketplace_plugins WHERE marketplace_id = 'm' AND name = 'p'").fetchone() assert row[0] is False, f"new plugin defaulted to {row[0]!r}, expected False" conn.close() @@ -230,10 +212,7 @@ def test_v38_db_migrates_to_v39(tmp_path): # Stand up the v38 minimal shape: schema_version row + the two # marketplace tables + a pre-existing plugin row that must survive # the migration with is_system = FALSE. - conn.execute( - "CREATE TABLE schema_version (version INTEGER, " - "applied_at TIMESTAMP DEFAULT current_timestamp)" - ) + conn.execute("CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)") conn.execute("INSERT INTO schema_version (version) VALUES (38)") conn.execute("""CREATE TABLE marketplace_registry ( id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL, @@ -254,21 +233,17 @@ def test_v38_db_migrates_to_v39(tmp_path): PRIMARY KEY (marketplace_id, name) )""") conn.execute( - "INSERT INTO marketplace_registry (id, name, url) " - "VALUES ('legacy', 'Legacy', 'https://example.com/repo.git')" - ) - conn.execute( - "INSERT INTO marketplace_plugins (marketplace_id, name) " - "VALUES ('legacy', 'foo')" + "INSERT INTO marketplace_registry (id, name, url) VALUES ('legacy', 'Legacy', 'https://example.com/repo.git')" ) + conn.execute("INSERT INTO marketplace_plugins (marketplace_id, name) VALUES ('legacy', 'foo')") _ensure_schema(conn) assert get_schema_version(conn) == SCHEMA_VERSION cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'marketplace_plugins'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'marketplace_plugins'" ).fetchall() } assert "is_system" in cols @@ -276,8 +251,7 @@ def test_v38_db_migrates_to_v39(tmp_path): # Existing pre-v39 row backfilled to FALSE — no plugin lands in # everyone's stack just because we ran the migration. row = conn.execute( - "SELECT is_system FROM marketplace_plugins " - "WHERE marketplace_id = 'legacy' AND name = 'foo'" + "SELECT is_system FROM marketplace_plugins WHERE marketplace_id = 'legacy' AND name = 'foo'" ).fetchone() assert row[0] is False, f"pre-existing row backfilled to {row[0]!r}" conn.close() @@ -289,9 +263,9 @@ def test_v20_adds_source_query(tmp_path): _ensure_schema(conn) cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'table_registry'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'table_registry'" ).fetchall() } assert "source_query" in cols, f"source_query missing from {cols}" @@ -312,19 +286,13 @@ def test_claude_md_template_seeded_in_instance_templates(tmp_path): _ensure_schema(conn) tables = { - r[0] for r in conn.execute( - "SELECT table_name FROM information_schema.tables " - "WHERE table_schema = 'main'" - ).fetchall() + r[0] + for r in conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'").fetchall() } assert "instance_templates" in tables - assert "claude_md_template" not in tables, ( - "claude_md_template should be consolidated away post-v28" - ) + assert "claude_md_template" not in tables, "claude_md_template should be consolidated away post-v28" - row = conn.execute( - "SELECT key, content FROM instance_templates WHERE key = 'claude_md'" - ).fetchone() + row = conn.execute("SELECT key, content FROM instance_templates WHERE key = 'claude_md'").fetchone() assert row is not None assert row[0] == "claude_md" assert row[1] is None # default = no override @@ -340,10 +308,7 @@ def test_v19_db_migrates_to_v20(tmp_path): # Simulate a v19 DB at minimal but realistic shape: schema_version row + # a table_registry row in the post-v19 column shape (no is_public column, # since v19 finalize dropped it via the table-rebuild idiom). - conn.execute( - "CREATE TABLE schema_version (version INTEGER, " - "applied_at TIMESTAMP DEFAULT current_timestamp)" - ) + conn.execute("CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)") conn.execute("INSERT INTO schema_version (version) VALUES (19)") conn.execute("""CREATE TABLE table_registry ( id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL, @@ -361,16 +326,14 @@ def test_v19_db_migrates_to_v20(tmp_path): assert get_schema_version(conn) == SCHEMA_VERSION # bumped 19→28 forward cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'table_registry'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'table_registry'" ).fetchall() } assert "source_query" in cols # Existing row preserved, new column NULL - row = conn.execute( - "SELECT id, source_query FROM table_registry WHERE id='foo'" - ).fetchone() + row = conn.execute("SELECT id, source_query FROM table_registry WHERE id='foo'").fetchone() assert row == ("foo", None) conn.close() @@ -389,8 +352,7 @@ def _make_v34_store_entities(conn): ) """) conn.execute( - "INSERT INTO store_entities (id, visibility_status) VALUES " - "('a', 'approved'), ('b', 'pending'), ('c', 'hidden')" + "INSERT INTO store_entities (id, visibility_status) VALUES ('a', 'approved'), ('b', 'pending'), ('c', 'hidden')" ) @@ -409,9 +371,9 @@ def test_v34_to_v35_clean_path_rebuilds_visibility_column(tmp_path): _v34_to_v35_migrate(conn) cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'store_entities'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'store_entities'" ).fetchall() } assert "visibility_status" in cols @@ -419,12 +381,8 @@ def test_v34_to_v35_clean_path_rebuilds_visibility_column(tmp_path): assert "archived_at" in cols assert "archived_by" in cols - rows = dict(conn.execute( - "SELECT id, visibility_status FROM store_entities ORDER BY id" - ).fetchall()) - assert rows == {"a": "approved", "b": "pending", "c": "hidden"}, ( - f"row values must survive the rebuild: {rows}" - ) + rows = dict(conn.execute("SELECT id, visibility_status FROM store_entities ORDER BY id").fetchall()) + assert rows == {"a": "approved", "b": "pending", "c": "hidden"}, f"row values must survive the rebuild: {rows}" conn.close() @@ -452,16 +410,15 @@ def test_v34_to_v35_recovers_from_partial_rebuild_missing_visibility(tmp_path): ) """) conn.execute( - "INSERT INTO store_entities (id, _vis_v35) VALUES " - "('a', 'approved'), ('b', 'pending'), ('c', 'hidden')" + "INSERT INTO store_entities (id, _vis_v35) VALUES ('a', 'approved'), ('b', 'pending'), ('c', 'hidden')" ) _v34_to_v35_migrate(conn) cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'store_entities'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'store_entities'" ).fetchall() } assert "visibility_status" in cols @@ -469,9 +426,7 @@ def test_v34_to_v35_recovers_from_partial_rebuild_missing_visibility(tmp_path): assert "archived_at" in cols assert "archived_by" in cols - rows = dict(conn.execute( - "SELECT id, visibility_status FROM store_entities ORDER BY id" - ).fetchall()) + rows = dict(conn.execute("SELECT id, visibility_status FROM store_entities ORDER BY id").fetchall()) assert rows == {"a": "approved", "b": "pending", "c": "hidden"}, ( f"row values must come back via RENAME, not be lost: {rows}" ) @@ -495,25 +450,20 @@ def test_v34_to_v35_recovers_from_partial_rebuild_both_columns(tmp_path): _vis_v35 VARCHAR ) """) - conn.execute( - "INSERT INTO store_entities (id, visibility_status, _vis_v35) VALUES " - "('a', 'approved', 'approved')" - ) + conn.execute("INSERT INTO store_entities (id, visibility_status, _vis_v35) VALUES ('a', 'approved', 'approved')") _v34_to_v35_migrate(conn) cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'store_entities'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'store_entities'" ).fetchall() } assert "visibility_status" in cols assert "_vis_v35" not in cols, "temp column must be dropped" - row = conn.execute( - "SELECT id, visibility_status FROM store_entities WHERE id = 'a'" - ).fetchone() + row = conn.execute("SELECT id, visibility_status FROM store_entities WHERE id = 'a'").fetchone() assert row == ("a", "approved") conn.close() @@ -533,10 +483,7 @@ def test_v32_db_with_partial_v35_recovers_through_full_ladder(tmp_path): # Stand up the broken state. We only need enough of the schema for the # migration ladder to run — ``_ensure_schema`` will create the rest # via ``_SYSTEM_SCHEMA``'s IF NOT EXISTS guards. - conn.execute( - "CREATE TABLE schema_version (version INTEGER, " - "applied_at TIMESTAMP DEFAULT current_timestamp)" - ) + conn.execute("CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp)") conn.execute("INSERT INTO schema_version (version) VALUES (32)") conn.execute(""" CREATE TABLE store_entities ( @@ -550,26 +497,21 @@ def test_v32_db_with_partial_v35_recovers_through_full_ladder(tmp_path): _vis_v35 VARCHAR ) """) - conn.execute( - "INSERT INTO store_entities (id, type, name, _vis_v35) " - "VALUES ('a', 'skill', 'alpha', 'approved')" - ) + conn.execute("INSERT INTO store_entities (id, type, name, _vis_v35) VALUES ('a', 'skill', 'alpha', 'approved')") _ensure_schema(conn) assert get_schema_version(conn) == SCHEMA_VERSION cols = { - r[0] for r in conn.execute( - "SELECT column_name FROM information_schema.columns " - "WHERE table_name = 'store_entities'" + r[0] + for r in conn.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name = 'store_entities'" ).fetchall() } assert "visibility_status" in cols assert "_vis_v35" not in cols # Existing row preserved, value carried over from _vis_v35. - row = conn.execute( - "SELECT id, visibility_status FROM store_entities WHERE id = 'a'" - ).fetchone() + row = conn.execute("SELECT id, visibility_status FROM store_entities WHERE id = 'a'").fetchone() assert row == ("a", "approved") conn.close() @@ -593,13 +535,9 @@ def test_v35_to_v36_reapplies_visibility_constraints(tmp_path): ).fetchall() assert cols, "visibility_status column missing from store_entities" name, is_nullable, default_expr = cols[0] - assert is_nullable == "NO", ( - f"visibility_status must be NOT NULL after v36; got is_nullable={is_nullable!r}" - ) + assert is_nullable == "NO", f"visibility_status must be NOT NULL after v36; got is_nullable={is_nullable!r}" # DuckDB renders the default as a quoted literal — match either form. assert default_expr is not None, "visibility_status DEFAULT must be set" - assert "pending" in str(default_expr).lower(), ( - f"visibility_status DEFAULT must be 'pending'; got {default_expr!r}" - ) + assert "pending" in str(default_expr).lower(), f"visibility_status DEFAULT must be 'pending'; got {default_expr!r}" conn.close() diff --git a/tests/test_home_stats.py b/tests/test_home_stats.py index b1ea1b8..61bddef 100644 --- a/tests/test_home_stats.py +++ b/tests/test_home_stats.py @@ -10,6 +10,7 @@ Covers: - get_home_status_frame_visibility honors the env var + yaml override and defaults true. """ + from __future__ import annotations import asyncio @@ -42,10 +43,7 @@ def test_v44_fresh_install_has_token_columns_and_last_pull(tmp_path): user_cols = {r[1] for r in conn.execute("PRAGMA table_info(users)").fetchall()} assert "last_pull_at" in user_cols - sess_cols = { - r[1] - for r in conn.execute("PRAGMA table_info(usage_session_summary)").fetchall() - } + sess_cols = {r[1] for r in conn.execute("PRAGMA table_info(usage_session_summary)").fetchall()} for col in ( "input_tokens", "output_tokens", @@ -61,10 +59,7 @@ def test_v43_to_v44_upgrade_is_idempotent(tmp_path): db_path = tmp_path / "system.duckdb" conn = duckdb.connect(str(db_path)) # Hand-roll v43-shaped tables (no last_pull_at, no token cols). - conn.execute( - "CREATE TABLE users (id VARCHAR PRIMARY KEY, email VARCHAR, " - "onboarded BOOLEAN DEFAULT FALSE)" - ) + conn.execute("CREATE TABLE users (id VARCHAR PRIMARY KEY, email VARCHAR, onboarded BOOLEAN DEFAULT FALSE)") conn.execute( """ CREATE TABLE usage_session_summary ( @@ -82,14 +77,8 @@ def test_v43_to_v44_upgrade_is_idempotent(tmp_path): _v43_to_v44(conn) _v43_to_v44(conn) # idempotent - assert "last_pull_at" in { - r[1] for r in conn.execute("PRAGMA table_info(users)").fetchall() - } - tok_cols = { - r[1] - for r in conn.execute("PRAGMA table_info(usage_session_summary)").fetchall() - if "token" in r[1] - } + assert "last_pull_at" in {r[1] for r in conn.execute("PRAGMA table_info(users)").fetchall()} + tok_cols = {r[1] for r in conn.execute("PRAGMA table_info(usage_session_summary)").fetchall() if "token" in r[1]} assert tok_cols == { "input_tokens", "output_tokens", @@ -100,7 +89,7 @@ def test_v43_to_v44_upgrade_is_idempotent(tmp_path): def test_schema_version_constant_is_44(): """Belt + suspenders against schema_version regressions.""" - assert SCHEMA_VERSION == 44 + assert SCHEMA_VERSION == 45 # --------------------------------------------------------------------------- @@ -118,15 +107,23 @@ def stats_conn(tmp_path): def _seed_user(conn, *, uid="u1", email="alice@example.com"): conn.execute( - "INSERT INTO users (id, email, active, onboarded, last_pull_at) " - "VALUES (?, ?, TRUE, TRUE, current_timestamp)", + "INSERT INTO users (id, email, active, onboarded, last_pull_at) VALUES (?, ?, TRUE, TRUE, current_timestamp)", [uid, email], ) -def _seed_session(conn, *, session_file, username, started_sql, prompts=0, - input_tokens=0, output_tokens=0, - cache_read=0, cache_creation=0): +def _seed_session( + conn, + *, + session_file, + username, + started_sql, + prompts=0, + input_tokens=0, + output_tokens=0, + cache_read=0, + cache_creation=0, +): conn.execute( f""" INSERT INTO usage_session_summary @@ -136,8 +133,7 @@ def _seed_session(conn, *, session_file, username, started_sql, prompts=0, VALUES (?, ?, ?, {started_sql}, current_timestamp, ?, ?, ?, ?, ?, 2) """, - [session_file, session_file, username, prompts, - input_tokens, output_tokens, cache_read, cache_creation], + [session_file, session_file, username, prompts, input_tokens, output_tokens, cache_read, cache_creation], ) @@ -159,27 +155,60 @@ def test_compute_home_stats_24h_vs_7d_windowing(stats_conn): from app.api.me import compute_home_stats _seed_user(stats_conn) - _seed_session(stats_conn, session_file="a.jsonl", username="alice", - started_sql="current_timestamp - INTERVAL 1 HOUR", - prompts=5, input_tokens=100, output_tokens=50, - cache_read=800, cache_creation=25) - _seed_session(stats_conn, session_file="b.jsonl", username="alice", - started_sql="current_timestamp - INTERVAL 3 DAY", - prompts=5, input_tokens=100, output_tokens=50, - cache_read=800, cache_creation=25) - _seed_session(stats_conn, session_file="c.jsonl", username="alice", - started_sql="current_timestamp - INTERVAL 30 DAY", - prompts=99) + _seed_session( + stats_conn, + session_file="a.jsonl", + username="alice", + started_sql="current_timestamp - INTERVAL 1 HOUR", + prompts=5, + input_tokens=100, + output_tokens=50, + cache_read=800, + cache_creation=25, + ) + _seed_session( + stats_conn, + session_file="b.jsonl", + username="alice", + started_sql="current_timestamp - INTERVAL 3 DAY", + prompts=5, + input_tokens=100, + output_tokens=50, + cache_read=800, + cache_creation=25, + ) + _seed_session( + stats_conn, + session_file="c.jsonl", + username="alice", + started_sql="current_timestamp - INTERVAL 30 DAY", + prompts=99, + ) - _seed_event(stats_conn, ev_id="e1", session_file="a.jsonl", - username="alice", cwd="/proj/alpha", - occurred_sql="current_timestamp - INTERVAL 1 HOUR") - _seed_event(stats_conn, ev_id="e2", session_file="a.jsonl", - username="alice", cwd="/proj/beta", - occurred_sql="current_timestamp - INTERVAL 2 HOUR") - _seed_event(stats_conn, ev_id="e3", session_file="b.jsonl", - username="alice", cwd="/proj/gamma", - occurred_sql="current_timestamp - INTERVAL 3 DAY") + _seed_event( + stats_conn, + ev_id="e1", + session_file="a.jsonl", + username="alice", + cwd="/proj/alpha", + occurred_sql="current_timestamp - INTERVAL 1 HOUR", + ) + _seed_event( + stats_conn, + ev_id="e2", + session_file="a.jsonl", + username="alice", + cwd="/proj/beta", + occurred_sql="current_timestamp - INTERVAL 2 HOUR", + ) + _seed_event( + stats_conn, + ev_id="e3", + session_file="b.jsonl", + username="alice", + cwd="/proj/gamma", + occurred_sql="current_timestamp - INTERVAL 3 DAY", + ) user = {"id": "u1", "email": "alice@example.com"} @@ -243,8 +272,10 @@ def test_compute_home_stats_missing_users_row_returns_zeros(stats_conn): "sessions": 0, "prompts": 0, "tokens": { - "input": 0, "output": 0, - "cache_read": 0, "cache_creation": 0, + "input": 0, + "output": 0, + "cache_read": 0, + "cache_creation": 0, "total": 0, }, "projects": 0, @@ -267,9 +298,7 @@ def test_sync_manifest_bumps_last_pull_at(stats_conn, monkeypatch, tmp_path): _seed_user(stats_conn, uid="u_pull", email="puller@example.com") # Wipe seeded last_pull_at so we can detect the bump. - stats_conn.execute( - "UPDATE users SET last_pull_at = NULL WHERE id = ?", ["u_pull"] - ) + stats_conn.execute("UPDATE users SET last_pull_at = NULL WHERE id = ?", ["u_pull"]) asyncio.run( sync_manifest( @@ -277,9 +306,7 @@ def test_sync_manifest_bumps_last_pull_at(stats_conn, monkeypatch, tmp_path): conn=stats_conn, ) ) - row = stats_conn.execute( - "SELECT last_pull_at FROM users WHERE id = ?", ["u_pull"] - ).fetchone() + row = stats_conn.execute("SELECT last_pull_at FROM users WHERE id = ?", ["u_pull"]).fetchone() # Don't compare against `datetime.now(utc)` — DuckDB's # ``current_timestamp`` returns the session's wall-clock time which # may be naive-local-or-utc depending on the environment, so a @@ -298,6 +325,7 @@ def test_status_frame_default_is_visible(monkeypatch): """Absent both env var and yaml entry, the flag returns True.""" monkeypatch.delenv("AGNES_HOME_SHOW_STATUS_FRAME", raising=False) from app.instance_config import get_home_status_frame_visibility + assert get_home_status_frame_visibility() is True @@ -305,12 +333,14 @@ def test_status_frame_env_var_off(monkeypatch): """AGNES_HOME_SHOW_STATUS_FRAME=0 hides the frame.""" monkeypatch.setenv("AGNES_HOME_SHOW_STATUS_FRAME", "0") from app.instance_config import get_home_status_frame_visibility + assert get_home_status_frame_visibility() is False def test_status_frame_env_var_falsey_values(monkeypatch): """Each of {0, false, no, off, ''} hides the frame; anything else shows.""" from app.instance_config import get_home_status_frame_visibility + for val in ("0", "false", "False", "FALSE", "no", "off", ""): monkeypatch.setenv("AGNES_HOME_SHOW_STATUS_FRAME", val) assert get_home_status_frame_visibility() is False, f"{val!r} should hide" diff --git a/tests/test_internal_data_source.py b/tests/test_internal_data_source.py index e5c6821..e68c855 100644 --- a/tests/test_internal_data_source.py +++ b/tests/test_internal_data_source.py @@ -12,9 +12,7 @@ Coverage: from __future__ import annotations -import os -import duckdb import pytest from connectors.internal.access import ( @@ -46,22 +44,24 @@ def system_db(tmp_path, monkeypatch): # Force close any pre-existing handle held by an earlier test. from src.db import close_system_db + close_system_db() from src.db import get_system_db + conn = get_system_db() _ensure_schema(conn) ensure_internal_tables_registered(conn) # Seed a couple of canonical rows for the RBAC checks. conn.execute( "INSERT INTO usage_session_summary " - "(session_file, session_id, username, tool_calls, tool_errors, processor_version) VALUES " - "('alice/s1.jsonl', 's-a-1', 'alice', 10, 1, 1)" + "(session_file, session_id, username, user_id, tool_calls, tool_errors, processor_version) VALUES " + "('alice/s1.jsonl', 's-a-1', 'alice', 'alice-uuid', 10, 1, 1)" ) conn.execute( "INSERT INTO usage_session_summary " - "(session_file, session_id, username, tool_calls, tool_errors, processor_version) VALUES " - "('bob/s2.jsonl', 's-b-1', 'bob', 20, 0, 1)" + "(session_file, session_id, username, user_id, tool_calls, tool_errors, processor_version) VALUES " + "('bob/s2.jsonl', 's-b-1', 'bob', 'bob-uuid', 20, 0, 1)" ) conn.execute( "INSERT INTO audit_log (id, user_id, action, result) VALUES " @@ -79,6 +79,7 @@ def system_db(tmp_path, monkeypatch): # Static helpers — no DB needed # --------------------------------------------------------------------------- + def test_is_internal_table_recognises_canonical_ids(): assert is_internal_table("agnes_sessions") assert is_internal_table("agnes_telemetry") @@ -108,25 +109,31 @@ def test_filter_clause_admin_is_empty(): assert build_filter_clause(table, {"email": "admin@x", "id": "admin-uuid"}, True) == "" -def test_filter_clause_non_admin_scopes_to_username(): +def test_filter_clause_non_admin_scopes_to_user_id(): + """agnes_sessions filters on user_id with OR username fallback for + pre-backfill rows.""" table = INTERNAL_TABLES_BY_ID["agnes_sessions"] clause = build_filter_clause(table, {"email": "alice@example.com", "id": "alice-uuid"}, False) - assert clause == "WHERE username = 'alice'" + assert clause == "WHERE (user_id = 'alice-uuid' OR username = 'alice')" def test_filter_clause_non_admin_scopes_audit_to_user_id(): table = INTERNAL_TABLES_BY_ID["agnes_audit"] clause = build_filter_clause( - table, {"email": "alice@example.com", "id": "alice-uuid"}, False, + table, + {"email": "alice@example.com", "id": "alice-uuid"}, + False, ) assert clause == "WHERE user_id = 'alice-uuid'" -def test_filter_clause_rejects_unsafe_username(): +def test_filter_clause_rejects_unsafe_user_id(): table = INTERNAL_TABLES_BY_ID["agnes_sessions"] with pytest.raises(InternalAccessError): build_filter_clause( - table, {"email": "alice'; DROP TABLE--@example.com", "id": "x"}, False, + table, + {"email": "alice@example.com", "id": "'; DROP TABLE--"}, + False, ) @@ -134,6 +141,7 @@ def test_filter_clause_rejects_unsafe_username(): # End-to-end RBAC — runs against a real (fresh) system.duckdb # --------------------------------------------------------------------------- + def test_admin_sees_every_user_session(system_db, tmp_path): db_path = str(_get_state_dir() / "system.duckdb") _, rows, _ = execute_internal_query( @@ -156,6 +164,49 @@ def test_non_admin_sees_only_own_sessions(system_db): assert rows == [("alice", "s-a-1")] +def test_non_admin_sees_sessions_uploaded_via_api(system_db): + """Sessions uploaded via POST /api/upload/sessions are stored under + user_id (UUID) by the pipeline. The user_id filter covers both + ingestion paths (collector and upload API) because the pipeline + resolves the stable user_id for every session at processing time.""" + from src.db import get_system_db + + conn = get_system_db() + conn.execute( + "INSERT INTO usage_session_summary " + "(session_file, session_id, username, user_id, tool_calls, tool_errors, processor_version) VALUES " + "('550e8400-uuid/api.jsonl', 's-api-1', '550e8400-uuid', '550e8400-uuid', 5, 0, 1)" + ) + db_path = str(_get_state_dir() / "system.duckdb") + _, rows, _ = execute_internal_query( + db_path, + {"email": "alice@example.com", "id": "550e8400-uuid"}, + is_admin=False, + sql="SELECT username, session_id FROM agnes_sessions ORDER BY session_id", + ) + # alice should see both her collector row (user_id='alice-uuid' from + # fixture — won't match '550e8400-uuid') and her upload-API row + # (user_id='550e8400-uuid'). + assert ("550e8400-uuid", "s-api-1") in rows + # bob's rows must NOT appear + assert all(r[0] != "bob" for r in rows) + + +def test_email_change_does_not_affect_visibility(system_db): + """If alice changes her email from alice@example.com to + alice.new@example.com, her sessions should still be visible + because the filter uses the stable user_id, not the email.""" + db_path = str(_get_state_dir() / "system.duckdb") + # Query with a different email but the same user_id. + _, rows, _ = execute_internal_query( + db_path, + {"email": "alice.new@example.com", "id": "alice-uuid"}, + is_admin=False, + sql="SELECT username, session_id FROM agnes_sessions", + ) + assert rows == [("alice", "s-a-1")] + + def test_non_admin_sees_only_own_audit_rows(system_db): db_path = str(_get_state_dir() / "system.duckdb") _, rows, _ = execute_internal_query( @@ -225,6 +276,7 @@ def test_execute_rejects_sql_without_internal_refs(): # request handler layer where `is_user_admin(user)` was mis-called. # --------------------------------------------------------------------------- + def _seed_internal_via_api(): """``seeded_app`` bypasses the FastAPI lifespan, so the registry seed that puts agnes_sessions / agnes_telemetry / agnes_audit into @@ -232,6 +284,7 @@ def _seed_internal_via_api(): routes (`/api/query`, `/api/v2/sample`) can find them.""" from src.db import get_system_db from connectors.internal.registry import ensure_internal_tables_registered + ensure_internal_tables_registered(get_system_db()) @@ -243,6 +296,7 @@ def test_query_internal_via_api_admin_path(seeded_app, admin_user): """ _seed_internal_via_api() from src.db import get_system_db + conn = get_system_db() conn.execute( "INSERT INTO usage_session_summary " @@ -287,6 +341,7 @@ def test_sample_internal_via_api_admin_path(seeded_app, admin_user): # round-2 review (PR #278 R2 finding #1). # --------------------------------------------------------------------------- + def test_string_literal_alone_does_not_route_internal(system_db): """A SQL with `agnes_sessions` only inside a string literal should not be treated as referencing the internal table — find_internal_refs diff --git a/tests/test_schema_v42_migration.py b/tests/test_schema_v42_migration.py index eb791ae..4aa64be 100644 --- a/tests/test_schema_v42_migration.py +++ b/tests/test_schema_v42_migration.py @@ -1,4 +1,5 @@ """v41 → v42 migration: 7 new usage_* tables for telemetry.""" + import duckdb import pytest from src.db import _ensure_schema as init_database, SCHEMA_VERSION @@ -10,18 +11,25 @@ def test_schema_version_is_42(): # constant. Test name preserved for git-blame continuity; the # version-pinned tests in test_db_schema_version.py and # test_home_stats.py carry the v44 commentary. - assert SCHEMA_VERSION == 44 + assert SCHEMA_VERSION == 45 def test_v42_tables_exist_after_init(tmp_path): db_path = tmp_path / "test.duckdb" conn = duckdb.connect(str(db_path)) init_database(conn) - tables = {row[0] for row in conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='main'").fetchall()} + tables = { + row[0] + for row in conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='main'").fetchall() + } for tbl in [ - "usage_events", "usage_session_summary", - "usage_tool_daily", "usage_plugin_daily", - "usage_attribution_skills", "usage_attribution_agents", "usage_attribution_commands", + "usage_events", + "usage_session_summary", + "usage_tool_daily", + "usage_plugin_daily", + "usage_attribution_skills", + "usage_attribution_agents", + "usage_attribution_commands", ]: assert tbl in tables, f"missing table {tbl}" conn.close() @@ -31,12 +39,21 @@ def test_v42_indices_exist(tmp_path): db_path = tmp_path / "test.duckdb" conn = duckdb.connect(str(db_path)) init_database(conn) - idx_names = {row[0] for row in conn.execute("SELECT index_name FROM duckdb_indexes WHERE table_name LIKE 'usage_%'").fetchall()} + idx_names = { + row[0] + for row in conn.execute("SELECT index_name FROM duckdb_indexes WHERE table_name LIKE 'usage_%'").fetchall() + } for idx in [ - "idx_usage_events_session", "idx_usage_events_user_time", "idx_usage_events_tool", - "idx_usage_events_skill", "idx_usage_events_ref", - "idx_usage_session_user", "idx_usage_session_started", - "idx_usage_attr_skill_lookup", "idx_usage_attr_agent_lookup", "idx_usage_attr_command_lookup", + "idx_usage_events_session", + "idx_usage_events_user_time", + "idx_usage_events_tool", + "idx_usage_events_skill", + "idx_usage_events_ref", + "idx_usage_session_user", + "idx_usage_session_started", + "idx_usage_attr_skill_lookup", + "idx_usage_attr_agent_lookup", + "idx_usage_attr_command_lookup", ]: assert idx in idx_names, f"missing index {idx}" conn.close() @@ -51,7 +68,7 @@ def test_v41_to_v42_is_idempotent(tmp_path): conn = duckdb.connect(str(db_path)) init_database(conn) v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert v == 44 + assert v == 45 conn.close() @@ -72,15 +89,20 @@ def test_v41_db_upgrades_cleanly(tmp_path): conn = duckdb.connect(str(db_path)) init_database(conn) v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert v == 44 + assert v == 45 # All 7 new v41 tables exist after the v40→v41 upgrade - tables = {row[0] for row in conn.execute( - "SELECT table_name FROM information_schema.tables WHERE table_schema='main'" - ).fetchall()} + tables = { + row[0] + for row in conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='main'").fetchall() + } for tbl in [ - "usage_events", "usage_session_summary", - "usage_tool_daily", "usage_plugin_daily", - "usage_attribution_skills", "usage_attribution_agents", "usage_attribution_commands", + "usage_events", + "usage_session_summary", + "usage_tool_daily", + "usage_plugin_daily", + "usage_attribution_skills", + "usage_attribution_agents", + "usage_attribution_commands", ]: assert tbl in tables, f"missing table {tbl} after v40→v41 upgrade" conn.close() @@ -99,7 +121,7 @@ def test_v30_db_ladders_all_the_way_up(tmp_path): conn = duckdb.connect(str(db_path)) init_database(conn) v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert v == 44 + assert v == 45 cnt = conn.execute("SELECT COUNT(*) FROM audit_log WHERE id='vintage'").fetchone()[0] assert cnt == 1 # New v41 table exists diff --git a/tests/test_session_pipeline.py b/tests/test_session_pipeline.py index 1b46256..56c11ec 100644 --- a/tests/test_session_pipeline.py +++ b/tests/test_session_pipeline.py @@ -17,11 +17,10 @@ import json from pathlib import Path import duckdb -import pytest from services.session_pipeline.contract import ProcessorResult from services.session_pipeline.lib import compute_file_hash, parse_jsonl -from services.session_pipeline.runner import run_processor +from services.session_pipeline.runner import resolve_user_id, run_processor from src.repositories.session_processor_state import SessionProcessorStateRepository @@ -29,6 +28,7 @@ def _fresh_db(tmp_path, monkeypatch) -> duckdb.DuckDBPyConnection: """Same idiom as tests/test_corporate_memory_v1.py — fresh schema in tmp_path.""" monkeypatch.setenv("DATA_DIR", str(tmp_path)) import src.db as db_module + db_module._system_db_conn = None db_module._system_db_path = None return db_module.get_system_db() @@ -38,12 +38,15 @@ def _fresh_db(tmp_path, monkeypatch) -> duckdb.DuckDBPyConnection: # parse_jsonl # --------------------------------------------------------------------------- + class TestParseJsonl: def test_parses_well_formed_lines(self, tmp_path): f = tmp_path / "session.jsonl" f.write_text( - json.dumps({"role": "user", "content": "hi"}) + "\n" - + json.dumps({"role": "assistant", "content": "hello"}) + "\n" + json.dumps({"role": "user", "content": "hi"}) + + "\n" + + json.dumps({"role": "assistant", "content": "hello"}) + + "\n" ) turns = parse_jsonl(f) assert len(turns) == 2 @@ -55,9 +58,11 @@ class TestParseJsonl: a single corrupt row mustn't abort processing of the rest.""" f = tmp_path / "session.jsonl" f.write_text( - json.dumps({"role": "user", "content": "ok"}) + "\n" + json.dumps({"role": "user", "content": "ok"}) + + "\n" + "this is not json\n" - + json.dumps({"role": "assistant", "content": "still ok"}) + "\n" + + json.dumps({"role": "assistant", "content": "still ok"}) + + "\n" ) turns = parse_jsonl(f) assert len(turns) == 2 @@ -66,11 +71,7 @@ class TestParseJsonl: def test_skips_blank_lines(self, tmp_path): f = tmp_path / "session.jsonl" - f.write_text( - "\n" - + json.dumps({"role": "user", "content": "x"}) + "\n" - + " \n" - ) + f.write_text("\n" + json.dumps({"role": "user", "content": "x"}) + "\n" + " \n") turns = parse_jsonl(f) assert len(turns) == 1 @@ -79,6 +80,7 @@ class TestParseJsonl: # compute_file_hash # --------------------------------------------------------------------------- + class TestComputeFileHash: def test_deterministic(self, tmp_path): f = tmp_path / "x.jsonl" @@ -94,10 +96,77 @@ class TestComputeFileHash: assert h1 != h2 +# --------------------------------------------------------------------------- +# resolve_user_id +# --------------------------------------------------------------------------- + + +class TestResolveUserId: + """Unit tests for the linchpin identity-resolution function.""" + + @staticmethod + def _seed_users(conn: duckdb.DuckDBPyConnection, rows: list[tuple[str, str, str | None]]) -> None: + for uid, email, updated_at in rows: + conn.execute( + "INSERT INTO users (id, email, updated_at) VALUES (?, ?, ?)", + [uid, email, updated_at], + ) + + def test_exact_uuid_match(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + self._seed_users(conn, [("uuid-aaa", "alice@example.com", "2026-01-01")]) + assert resolve_user_id(conn, "uuid-aaa") == "uuid-aaa" + conn.close() + + def test_email_local_part_match(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + self._seed_users(conn, [("uuid-bbb", "bob@example.com", "2026-01-01")]) + assert resolve_user_id(conn, "bob") == "uuid-bbb" + conn.close() + + def test_null_fallback_for_unknown(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + self._seed_users(conn, [("uuid-aaa", "alice@example.com", "2026-01-01")]) + assert resolve_user_id(conn, "nobody") is None + conn.close() + + def test_tiebreak_picks_most_recently_updated(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + self._seed_users( + conn, + [ + ("uuid-old", "zara@old.com", "2025-01-01"), + ("uuid-new", "zara@new.com", "2026-06-01"), + ], + ) + assert resolve_user_id(conn, "zara") == "uuid-new" + conn.close() + + def test_underscore_not_treated_as_wildcard(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + self._seed_users( + conn, + [ + ("uuid-alice", "alicexsmith@example.com", "2026-01-01"), + ("uuid-real", "alice_smith@example.com", "2025-01-01"), + ], + ) + # "alice_smith" must match only the literal underscore email + assert resolve_user_id(conn, "alice_smith") == "uuid-real" + conn.close() + + def test_uuid_branch_takes_priority_over_email(self, tmp_path, monkeypatch): + conn = _fresh_db(tmp_path, monkeypatch) + self._seed_users(conn, [("uuid-aaa", "uuid-aaa@example.com", "2026-01-01")]) + assert resolve_user_id(conn, "uuid-aaa") == "uuid-aaa" + conn.close() + + # --------------------------------------------------------------------------- # SessionProcessorStateRepository # --------------------------------------------------------------------------- + class TestSessionProcessorStateRepository: def test_unprocessed_when_empty(self, tmp_path, monkeypatch): conn = _fresh_db(tmp_path, monkeypatch) @@ -175,7 +244,6 @@ class TestSessionProcessorStateRepository: every scheduler tick.""" import os import time - from datetime import datetime, timezone conn = _fresh_db(tmp_path, monkeypatch) sessions = tmp_path / "sessions" @@ -233,6 +301,7 @@ class TestSessionProcessorStateRepository: # run_processor # --------------------------------------------------------------------------- + class _FakeProcessor: """Test double that records its calls and is configurable per behavior.""" @@ -249,7 +318,7 @@ class _FakeProcessor: self.raise_on_session = raise_on_session self.calls: list[str] = [] - def process_session(self, session_path: Path, username: str, session_key: str, conn): + def process_session(self, session_path: Path, username: str, session_key: str, conn, **kwargs: object): self.calls.append(session_key) if self.raise_on_session is not None and session_key == self.raise_on_session: raise RuntimeError("simulated processor failure") @@ -385,6 +454,7 @@ class TestRunProcessor: class _BadReturn: name = "bad" cadence_minutes = 1 + def process_session(self, *a, **kw): return None # type: ignore[return-value] @@ -398,6 +468,7 @@ class TestRunProcessor: # v29 migration — verification rows preserved, old table dropped # --------------------------------------------------------------------------- + class TestV29Migration: """Exercise the v28 → v29 migration directly. Builds a v28 schema (using the pre-v29 idiom inline so the test doesn't depend on _SYSTEM_SCHEMA's @@ -426,6 +497,7 @@ class TestV29Migration: # Run v29 migration steps via the helper (which conditionally copies # from the legacy table when present). from src.db import _v30_to_v31_migrate + _v30_to_v31_migrate(conn) # New table has the row tagged with processor_name='verification'. @@ -437,7 +509,8 @@ class TestV29Migration: # Old table is gone. existing = { - r[0] for r in conn.execute( + r[0] + for r in conn.execute( "SELECT table_name FROM information_schema.tables WHERE table_schema='main'" ).fetchall() } @@ -477,10 +550,12 @@ class TestV29Migration: ) from src.db import _v30_to_v31_migrate + _v30_to_v31_migrate(conn) existing = { - r[0] for r in conn.execute( + r[0] + for r in conn.execute( "SELECT table_name FROM information_schema.tables WHERE table_schema='main'" ).fetchall() }