agnes-the-ai-analyst/services/session_processors/usage_lib.py
minasarustamyan 302cf58ccd
feat(marketplace): telemetry v46 + flea inner parity + listing polish (#329)
* feat(telemetry): marketplace item rollup refactor (schema v46)

Replace the v42 attribution layer with prefix-split + live lookup against
marketplace_plugins / store_entities. The v42 design had a latent bug —
AttributionLookup keyed on bare skill names while Claude Code writes
`<plugin>:<local>` in JSONL, so lookups never matched and
usage_plugin_daily stayed empty in every deployment.

Schema (v46 migration):
- Drop usage_attribution_skills / _agents / _commands (mapping tables,
  derivable from marketplace_plugins + plugin tree).
- Drop usage_plugin_daily (always empty in production due to the bug above).
- Create usage_marketplace_item_daily — per-day fact (count, distinct_users,
  error_count), composite PK on (day, source, type, parent_plugin, name).
- Create usage_marketplace_item_window — sliding-window snapshot with
  true cross-window distinct user counts; period_label='last_7d' refreshes
  every tick, 'last_30d' refreshes hourly (tracked via session_processor_state).
- Mark usage_tool_daily as candidate for removal (no product-UI consumer).

Attribution flow:
- MarketplaceItemLookup replaces AttributionLookup. Preloads
  marketplace_plugins.name + store_entities.name into memory once per
  UsageProcessor tick, then per-event splits identifier on ':',
  matches prefix, writes resolved source / parent_plugin into
  usage_events. agnes-store-bundle prefix routes to flea entities.
  Slash commands with `plugin:` prefix count as type='skill' in rollup.

API:
- BREAKING: MarketplaceItem.unique_users_30d renamed to distinct_users_30d
  (now a true distinct count from the window snapshot, not sum-of-daily).
- InnerDetailResponse gains a telemetry field — invocations_30d +
  distinct_users_30d surfaced on curated inner skill / agent detail pages.
- Card chip hidden pending UX finalisation; data stays in the response.

Backfill: scripts/backfill_marketplace_rollup.py — one-shot rebuild over
historic usage_events after deploy, idempotent.

USAGE_PROCESSOR_VERSION bumped 4 → 5 so the reprocess loop re-attributes
existing events to the new source/ref_id semantics on the next tick.

Tests rewritten: test_session_processor_usage, test_usage_rollups,
test_marketplace_telemetry, test_api_admin_usage_reprocess,
test_db_schema_version, test_home_stats, test_schema_v42_migration.
New: test_backfill_marketplace_rollup.

* fix(marketplace): refresh Most Popular on search + category changes

`loadMostPopular()` early-exits when `state.q` or `state.category` is
set, but the search + category handlers only called `loadItems()` —
so once the section was visible, typing a query or filtering by
category didn't re-run the hide check and the cards stayed on screen
out of scope. Tab + sort handlers already chained the call.

Add the call to runSearch + category pill click handlers (All +
per-category) so the visibility contract holds for every state
mutation that can flip the early-exit condition.

* feat(marketplace): All-plugins section + 7-day Most Popular

Listing layout:
- Always-visible "All plugins" / "All items" / "Your stack" section
  header (label swaps per tab) wrapped in `#mp-all-section` so its
  margin-collapse mirrors the sibling `#mp-popular-section` and the
  spacing from the filter row stays consistent in both layouts.
- Sort dropdown moved from the filter row into the All-* header,
  pinned right via `margin-left: auto`. Anchored to its section so
  the relationship between sort + grid is obvious.
- `.mp-section-header` gets `min-height: 32px` + `align-items: center`
  so the bare-text Most Popular row matches the dropdown-bearing
  All-* row.
- `.mp-section-header` margin tightened 24px → 20px on top.

Most Popular:
- Capacity reduced 8 → 4 cards.
- Now reflects a 7-day window (was 30-day). Backend surfaces
  `invocations_7d` + `distinct_users_7d` on `MarketplaceItem`
  alongside the existing 30d fields; the loader pulls a wider page
  (server still sorts by 30d) and re-sorts + filters client-side
  on `invocations_7d > 0` so the strip stays "hot right now".
- Section label updated to "Last 7 days".
- Section now renders on both `curated` and `flea` tabs (was
  curated-only). Hidden on `my` and whenever search / category
  filter is active. Refresh hooks wired into search + category
  click handlers so visibility flips immediately on state change.

Backend (`_load_invocation_stats`):
- Single SELECT pulls both `last_30d` and `last_7d` rows from
  `usage_marketplace_item_window`; the result dict carries
  invocations + distinct_users for both windows.
- Trend (recent_7 vs prior_7) kept on the daily fact table so it
  stays independent of the window snapshot's freshness.

* feat(marketplace): Most adopted sort + hide Trending when no trend data

Add a fourth sort option to the All-items dropdown — "Most adopted
(30d)", keyed on `MarketplaceItem.distinct_users_30d` (true 30d
distinct user count from `usage_marketplace_item_window`). Protects
the listing from power-user skew that `most_used` is susceptible to:
one user × 100 invokes can't beat 10 different users × 1 invoke
under adoption sort.

Hide Trending option when the response has no trend data. User
reported `sort=trending` returning an empty grid because every
plugin's `trend_pct` was None (prior-week threshold of >= 3
invocations didn't clear anywhere). Empty grids on a user-selected
sort are worse UX than just not offering the sort — surface what
works, hide what doesn't.

Backend (`app/api/marketplace.py`):
- `_apply_sort` gains a `most_adopted` branch (DESC distinct_users_30d,
  ties by name ASC).
- `sort` Literal extended.
- `ItemListResponse.available_sorts` lists the sort keys the UI
  should expose for this response. recent/most_used/most_adopted
  always; trending only when at least one item in the tab's stats
  carries a non-null trend_pct.
- `_available_sorts(stats_dicts)` helper centralises the rule —
  curated and flea branches pass one stats dict, my-tab passes both
  (option is available when either source has trend data).

Frontend (`app/web/templates/marketplace.html`):
- New `<option value="most_adopted">Most adopted (30d)</option>`
  between Most used and Trending.
- URL state allowlist extended so `?sort=most_adopted` round-trips.
- `applyAvailableSorts(available)` runs after each list fetch:
  hides options not in the response's available_sorts; if the user
  is on a now-unavailable sort, resets to 'recent' and re-fetches.
  Search-mode fan-out unions availability across the curated + flea
  responses so a hit on either side keeps the option visible.

* feat(marketplace): funnel chip on cards + deterministic Most Popular sort

Card chip — funnel telemetry between description and footer:

  [stack-icon] N installed · [user-icon] N active · [bolt-icon] N calls · ↑/↓ N%

- stack_count (new MarketplaceItem field): for curated it's COUNT(*)
  on user_plugin_optouts (post-v28 row PRESENCE = subscribed; system
  plugins are fanned out to every user via fanout_system_for_user so
  the count includes them naturally). For flea it reuses the existing
  store_entities.install_count (bumped on install/uninstall).
- distinct_users_30d (existing) — active users in the 30d window.
- invocations_30d (existing) — call volume.
- trend_pct (existing) — week-over-week, both directions: green ↑ /
  red ↓, magnitude only (sign in the arrow). Hidden when null.

Backend additions in app/api/marketplace.py:
- MarketplaceItem.stack_count field.
- _load_curated_stack_counts() — one SELECT per render, GROUP BY
  (marketplace_id, plugin_name). Wired into the curated + my-tab
  branches; flea reads install_count off the entity row directly.

Frontend (app/web/templates/marketplace.html):
- Heroicons solid 24×24 inlined (one helper per icon, all
  fill="currentColor" so per-segment colour tokens apply): rectangle-
  stack (mirrors the My Stack tab icon), user, bolt, arrow-trending-
  up/down.
- Per-segment colour: installed=amber #F59F0A (My Stack accent),
  active=green #0e9b6a, calls=orange #f97316. Text stays neutral so
  the chip still reads as metadata, the leading glyph carries the
  visual cue. Trend pill keeps the full-segment green/red colour.
- Zero state: chip hidden when stack_count == 0 AND invocations_30d
  == 0 — brand-new cards aren't visually penalised by a "0·0·0" row.
- Tooltips on every segment via title="…" so hover explains the
  number's meaning to anyone uncertain about the icon.

Most Popular section — deterministic ordering:

Previously sorted by invocations_7d DESC with no tie-breakers, so
several cards with identical 7d call counts would swap places on
refresh (JS stable sort fell back on backend order, and the backend's
own tie-breaker for `most_used` was just name ASC — six `grpn`
plugins from six test marketplaces collapse to the same name and
became indeterminate via list_with_filters' created_at order).

New cascading hierarchy (chosen primary now matches what "most
popular" really means — wide adoption, not power-user volume):

  1. distinct_users_7d DESC  ← adoption / social proof
  2. invocations_7d   DESC  ← volume at equal adoption
  3. distinct_users_30d DESC ← broader adoption fallback
  4. invocations_30d  DESC  ← broader volume fallback
  5. name              ASC  ← deterministic textual order
  6. marketplace_slug  ASC  ← splits duplicate plugin names across
                              marketplaces

Six levels guarantee any two items end at a different sort key, so
the strip is stable across refreshes.

* fix(marketplace): unify Most Popular on 30d + right-align installed chip

Most Popular section was sorting on the 7d window while its cards
rendered 30d numbers — header label promised one thing, cards showed
another. Unified everything on 30d so a card means the same data
everywhere on the page.

- Dropped the "Last 7 days" meta from the Most Popular header.
- Sort cascade now starts on distinct_users_30d, then invocations_30d,
  with 7d adoption/volume as recency-aware fallbacks before the name +
  marketplace_slug deterministic tail. Six levels guarantee identical
  sort keys never produce indeterminate order across refreshes.
- Filter switched from invocations_7d > 0 to invocations_30d > 0 to
  match the new horizon.
- Most Popular now only renders on page 1 of the listing. Past initial
  discovery, a top-of-list popularity strip on page 2+ would shadow the
  results the user paged into. Pager click handler refreshes the
  section so navigating back to page 1 re-mounts it.

Chip layout — split engagement vs adoption visually:

  [user] N active · [bolt] N calls · [↑/↓] N%        [stack] N installed
  └────────── LEFT (time-bounded engagement) ────┘   └── RIGHT (all-time) ──┘

- Installed (stack_count) is all-time, decremented on uninstall. Alone
  it says little ("12 people installed it") without the engagement
  context next to it ("…but did anyone actually use it?"). Visually
  separating the two groups makes that distinction obvious — left
  group answers "is it used", right answers "does anyone have it".
- Implemented via flex with margin-left:auto on .seg-installed so
  installed drifts to the trailing edge.
- Installed tooltip now reads "Currently installed by N users" — the
  count is a real-time net (uninstall drops it), and saying "currently"
  makes that explicit. Helps when a card shows 0: signals "nobody has
  this in their stack right now", not "data missing".

* feat(plugin-detail): telemetry chip in hero, derived rows in sidebar

Surface the same telemetry funnel the listing card carries on the
curated plugin detail page, so clicking through from /marketplace
keeps a single mental model — figures match, semantics match. The
detail sidebar drops the two raw numbers that used to live there
(Invocations 30d / Users 30d — duplicated by the chip now) and
replaces them with two *derived* signals only the daily series can
provide: Active days + Last used.

Backend (app/api/marketplace.py):
- PluginDetailResponse.stack_count — curated reads via
  _load_curated_stack_counts(), flea reuses install_count. Frontend
  treats both sources uniformly.
- _build_telemetry() always returns a dict (never None). Frontend
  decides chip visibility from stack_count + invocations_30d the
  same way the listing card does. daily_series is always 30 entries
  (zero-padded) so "Active days" and "Last used" derivations on the
  sidebar are trivial array filters.

Frontend (app/web/templates/marketplace_plugin_detail.html):
- New .hero-telemetry slot at the bottom of the hero meta column,
  between the pills row and the action buttons. Renders the four
  funnel segments — active · calls · trend · installed — joined by
  ` · `. No left/right split: the hero has space, so a single
  coherent metadata strip reads cleaner than the card's split layout.
- Heroicons solid inlined (user / bolt / arrow-trending-up,-down /
  rectangle-stack) recoloured against the dark hero — icons in
  lighter tokens (mint #6ee7b7, peach #fdba74, cream #fde68a), trend
  pill keeps the saturated green/red because direction-coding earns
  its own colour.
- Tooltip on installed reads "Currently installed by N users" — the
  count is a real-time net (drops on uninstall), and "currently"
  makes that explicit when a card shows 0.
- fmtNum helper added so 1.2k / 14M renderings match the card's
  format exactly.
- Sidebar swap: Invocations + Users rows removed, replaced by
    Active days  →  "N of 30"
    Last used    →  fmtRelative of the latest non-zero day
  Both derived from telemetry.daily_series — engagement consistency
  + recency, neither of which the hero chip exposes on its own.

* feat(item-detail): telemetry chip in hero for curated skill/agent

Bring the funnel chip the plugin detail page got in 4cf38d40 to the
curated inner skill/agent detail page — clicking through from the
listing card now keeps the same metadata strip from grid to plugin
page to inner item page.

Backend (app/api/marketplace.py):
- _load_inner_item_stats() rewritten:
    * always returns a dict (never None) so the frontend can decide
      chip visibility client-side, same contract as _build_telemetry
    * adds trend_pct, computed the same way as plugin level
      (recent_7 vs prior_7 from usage_marketplace_item_daily, ≥3
      prior-week threshold)
    * adds daily_series (30 entries, zero-padded) so the sidebar can
      derive Active days + Last used
- InnerDetailResponse.parent_stack_count — new field. Skills/agents
  don't have a per-item subscription model, so the hero shows the
  *parent plugin's* stack count under a "Plugin:" prefix. The
  funnel: "12 installed plugin → 2 actually use this skill".
- curated_skill_detail + curated_agent_detail handlers load
  _load_curated_stack_counts() once and pass the parent's value.

Frontend (app/web/templates/marketplace_item_detail.html):
- New .item-detail .hero .hero-telemetry slot beneath the badges
  row. CSS mirrors plugin-detail's colour tokens (mint/peach/cream
  Heroicons solid + saturated trend pill) so the two surfaces read
  as one visual family.
- Installed segment uses a "Plugin:" label rendered with reduced
  opacity to signal the metric describes the parent, not the item
  itself. Tooltip: "Parent plugin (<plugin_name>) currently
  installed by N users".
- Sidebar Invocations + Users rows removed (chip carries them).
  Active days + Last used derived from telemetry.daily_series replace
  them; only rendered when activeDays > 0 so a brand-new skill
  doesn't show "0 of 30" / "Last used —".
- "Type" row dropped from the sidebar — duplicates the hero badge.
- fmtNum helper added (matches listing card + plugin detail).

Plugin detail (app/web/templates/marketplace_plugin_detail.html):
- Hero "Curator: …" line removed. The Details sidebar already
  carries that info; duplicating it under the h1 was visual noise.
- Sidebar "Owner" row renamed to "Curator" — for curated plugins
  it's a person who curates inclusion in this Agnes instance, not
  the upstream code owner. "Owner" was a hold-over label.

* feat(item-detail): unify hero with plugin detail — pills + breadcrumb + cleaner sidebar

- Inner skill/agent hero now uses the same `.pills` / `.pill.cat / .curated /
  .flea / .muted` class names + CSS as the plugin detail page; the only
  item-only addition is `.pill.type` (Skill / Agent uppercase, plugin detail
  has no kind axis).
- Hero `Updated` moved out of the meta-row into a muted pill (mirrors the
  plugin detail hero), removed from the Details sidebar to avoid duplication.
- Details sidebar slimmed: dropped Marketplace, Path, Updated rows; Parent
  plugin now shows the curator-friendly display name
  (`parent_display_name || manifest_name || slug`) instead of the slug.
- Breadcrumb extended to full path: Marketplace > <marketplace_name> >
  <plugin display name> > <self>, mirroring the plugin detail breadcrumb.
- Backend: new `InnerDetailResponse.parent_display_name` field, populated via
  `_curated_plugin_enrichment` from marketplace-metadata.json — same source
  plugin detail hero already uses.

* feat(marketplace): flea inner skill/agent detail + breadcrumb polish

- Flea inner skill/agent detail page parity with curated:
  * GET /api/marketplace/flea/{id}/skill/{name} + /agent/{name}
    returning InnerDetailResponse (mirror of curated_skill_detail).
  * /marketplace/flea/{id}/skill|agent/{name} web routes that render
    marketplace_item_detail.html with source='flea' + innerName context.
  * Frontend apiURL grows a third branch for flea-inner; breadcrumb
    grows to 4 segments (Marketplace > Flea Market > <plugin display
    name> > <self>) when innerName is set.
  * Telemetry attribution: MarketplaceItemLookup resolves
    <flea_plugin>:<inner> prefixes to (source='flea',
    parent_plugin=<plugin name>) so nested invocations land in the
    same rollups curated nested skills use. USAGE_PROCESSOR_VERSION
    bumped 5 -> 6 so the reprocess loop re-attributes historic events.
- Breadcrumb 2nd segment is now a generic clickable "Curated
  Marketplace" / "Flea Market" link to /marketplace?tab=... instead
  of the opaque per-instance marketplace_name. Applied on both plugin
  detail and inner item detail.
- Inner item hero telemetry chip works for both sources: installedCount
  branches on parent_stack_count (curated) vs install_count (flea),
  installed segment drops the "Plugin:" prefix for flea standalone /
  inner items.
- Updated row dropped from Details sidebar on item detail — the hero
  pill already carries the value, sidebar row was duplicate.

* feat(item-detail): block stack-install on flea inner items (mirror curated)

Inner skills/agents nested inside a flea plugin can no longer be added
to a user's stack on their own — adoption only happens at the plugin
level, same rule curated nested items have followed since launch.

- Hero action: when innerName is set (curated nested OR flea nested),
  render "Open parent plugin →" link + helper text instead of the
  install/remove buttons. Flea standalone entities (no innerName) keep
  the normal install UX.
- Meta-row: same branch now serves curated + flea inner — "part of
  <parent plugin display name> · by <author>" with the parent link
  pointing at the right detail page per source.

No API gate change needed: POST /api/store/entities/{id}/install only
accepts existing entity ids (plugin-level), inner items have no entity
id of their own so the endpoint cannot target them directly.

* feat(marketplace): telemetry chip on inner cards + fix flea hero chip visibility

Inner skill/agent cards on the plugin detail page now carry the same
four-segment funnel chip the marketplace listing cards show (N active
. N calls . trend . N installed), for both curated nested skills and
flea nested skills. Plus two fixes that were keeping the hero chip
hidden on flea plugin / flea inner detail pages.

- Backend `_load_inner_items_stats_by_parent(conn, source, parent_plugin)`
  bulk loader: one query per plugin against usage_marketplace_item_window
  + one against _daily, returning {(name, type): stats}. Avoids N+1
  per-card lookups.
- `InnerItemSummary` gains invocations_30d / distinct_users_30d /
  trend_pct / parent_stack_count fields. `curated_detail` and
  `flea_detail` (in the entity.type=='plugin' branch) enrich the
  skills / agents lists after the existing cover-photo enrichment loop.
- `marketplace_plugin_detail.html`: new `.plugin-detail .inner-card
  .inv-chip*` CSS lifted from marketplace.html with the listing-card
  rules, new buildInnerCardChip() helper, buildCardSection appends
  the chip to each card body. Same gate as the listing card (hidden
  on parent_stack==0 && calls==0).

- fix(flea): flea_detail forgot to populate PluginDetailResponse.stack_count
  from entity.install_count (listing card does this on line 851; detail
  endpoint didn't). Hero chip gate `stackCount===0 && calls===0` then
  always hid the chip even when the entity had installs. Now mirrors
  listing card semantics: stack_count == install_count for flea.
- fix(flea inner): renderInnerHeroTelemetry was reading `d.install_count`
  for any non-curated source. InnerDetailResponse has no install_count
  field — it has parent_stack_count (populated server-side from the
  parent flea plugin's install_count). Gate + label now read
  parent_stack_count for both curated nested AND flea nested scenarios;
  install_count remains the flea standalone path.

* fix(marketplace): Owner label on flea + parent-centric sidebar for flea inner

- Plugin detail Details sidebar — authorship row label now tracks the
  source: curated bundles get `Curator` (existing behaviour), flea
  bundles get `Owner`. The `owner_todo` reminder placeholder stays on
  the curated branch only; flea falls through silently.
- Inner item detail Details sidebar — flea-inner (skill/agent nested
  inside a flea plugin) now shares the curated nested layout: Parent
  plugin / Bundle size / Active days / Last used / Owner. Drops the
  flea-standalone shape's `Category`, `Version`, `Installs`, `Released`
  rows that didn't apply to a nested item. Active days + Last used were
  already wired (telemetryRows) — they just weren't on the flea-inner
  branch.

* fix(tests): bump SCHEMA_VERSION assertions 47 -> 48 post-rebase

The marketplace telemetry migration was renamed _v46_to_v47 -> _v47_to_v48
during the rebase onto main (collision with #326 FTS BM25 migration that
took the v47 slot). Two test files still asserted the pre-rebase value:

- tests/test_home_stats.py::test_schema_version_constant_is_46 (CI red)
- tests/test_schema_v46_migration.py::test_schema_version_is_46

Renames the helper fn name + bumps the assertion. The other two test
files (test_db_schema_version.py, test_schema_v42_migration.py) were
already updated in the rebase resolution.

* fix(telemetry): _build_telemetry returns None when invocations_30d == 0

The follow-up commit that introduced the always-return-dict shape broke
the test contract from the original v46 PR (commit b603e998):

  tests/test_marketplace_telemetry.py::TestDetailTelemetry::
    test_detail_endpoint_telemetry_absent_when_no_data
    AssertionError: assert {'daily_series': [...], ...} is None

Both `PluginDetailResponse.telemetry` and `InnerDetailResponse.telemetry`
are declared `Optional[Dict] = None`, the frontend renders are None-safe
(`d.telemetry || {}` guard + `if (!d.telemetry || ...)` on daily_series),
so dropping the dict on zero activity is the cleaner default.

* release: 0.54.21 — marketplace telemetry refactor (schema v48) + flea inner detail parity + listing UX polish

---------

Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
Co-authored-by: ZdenekSrotyr <zdenek.srotyr@keboola.com>
2026-05-15 20:58:03 +02:00

747 lines
30 KiB
Python

"""Pure helpers for UsageProcessor — event extraction from Claude Code session jsonls.
Session JSONL shape (as documented in dev_docs/session_explore.md and verified
against live samples):
Each line is a top-level event dict with:
{
"type": "user" | "assistant" | "progress" | "system" |
"tool_use_result" | "summary" | "file-history-snapshot" |
"queue-operation" | ...,
"uuid": "event-uuid",
"parentUuid": "parent-event-uuid",
"sessionId": "session-uuid",
"timestamp": "2026-05-12T07:30:00.000Z",
"cwd": "/path/to/cwd",
"message": {
"role": "user" | "assistant",
"model": "claude-...", # present on assistant turns
"content": [ # array or plain string on user turns
{"type": "text", "text": "..."},
{"type": "tool_use", "id": "tu_123", "name": "Bash", "input": {...}},
{"type": "tool_result", "tool_use_id": "tu_123", "is_error": false, "content": [...]}
]
}
}
Tool results appear as:
- Inline content items of type "tool_result" inside a user-role message, OR
- As top-level events of type "tool_use_result" (older Claude Code versions)
is_error correlation: build a map of {tool_use_id: True} from tool_result
items on the first pass, then apply to matching tool_use events.
"""
from __future__ import annotations
import re
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Iterator
# v6: MarketplaceItemLookup now resolves <flea_plugin>:<inner> prefixes so
# skills/agents nested inside a flea plugin bundle attribute to source='flea'
# with parent_plugin=<plugin name> (same shape as curated nested attribution).
# Pre-v6 these landed as ('builtin', '', None) and never flowed into the
# rollup tables. Bump forces re-attribution on the next reprocess tick.
# (v5: v46 marketplace-telemetry refactor swapped AttributionLookup for
# MarketplaceItemLookup. Identifier prefix (`<plugin>:<local>`) now drives
# attribution and usage_events.source / ref_id are populated per-event from
# the live marketplace_plugins + store_entities tables.)
# (v4: #293 user_id column; v3: #303 <command-name> slash extraction.)
USAGE_PROCESSOR_VERSION = 6
# Claude Code wraps user-typed slash invocations as
# <command-name>/<name></command-name> inside the user message content
# (raw "/foo" plain text never reaches the jsonl). Tag may sit anywhere
# in the text — typically after a <command-message> sibling — so we
# search rather than anchor at start. Name pattern matches both flat
# commands (`clear`, `exit`) and plugin-prefixed ones (`plugin:name`).
COMMAND_NAME_RE = re.compile(r"<command-name>/([A-Za-z][\w:-]*)</command-name>")
# Event types to skip entirely
_SKIP_TYPES = frozenset(
{
"system",
"summary",
"file-history-snapshot",
"queue-operation",
"progress",
}
)
@dataclass(frozen=True)
class ParsedEvent:
event_uuid: str | None
parent_uuid: str | None
tool_id: str | None # tool_use 'id' (tu_xxx) from message.content item; None for slash_command
event_type: str # 'tool_use' | 'slash_command' | 'subagent' | 'mcp_call'
tool_name: str | None
skill_name: str | None
subagent_type: str | None
command_name: str | None
is_error: bool
model: str | None
cwd: str | None
occurred_at: datetime
def _parse_ts(ts_str: str | None) -> datetime | None:
"""Parse ISO 8601 timestamp to aware datetime. Returns None on failure."""
if not ts_str:
return None
try:
ts_str = ts_str.replace("Z", "+00:00")
return datetime.fromisoformat(ts_str)
except (ValueError, TypeError):
return None
def _collect_error_map(turns: list[dict]) -> dict[str, bool]:
"""First-pass: collect tool_use_id → is_error from all tool_result items.
Tool results appear in two places:
1. As content items inside user-role messages (type='tool_result')
2. As top-level events of type='tool_use_result'
"""
errors: dict[str, bool] = {}
for turn in turns:
turn_type = turn.get("type", "")
# Top-level tool_use_result events (older Claude Code)
if turn_type == "tool_use_result":
tu_id = turn.get("tool_use_id") or turn.get("toolUseId")
if tu_id and turn.get("is_error"):
errors[tu_id] = True
# Inline tool_result content blocks inside user messages
msg = turn.get("message", {}) or {}
content = msg.get("content", [])
if isinstance(content, list):
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "tool_result":
tu_id = item.get("tool_use_id")
if tu_id and item.get("is_error"):
errors[tu_id] = True
return errors
def iter_events(turns: list[dict]) -> Iterator[ParsedEvent]:
"""Walk parsed JSONL turns and yield ParsedEvent for each observable event.
Recognises:
- Assistant tool_use blocks → event_type='tool_use' (or 'subagent'/'mcp_call')
- Skill tool → also extracts skill_name
- Task/Agent tools → event_type='subagent'
- mcp__* tools → event_type='mcp_call'
- User messages containing a <command-name>/foo</command-name> tag
(Claude Code's wire format for user-typed slash invocations)
→ event_type='slash_command', command_name='foo'
Skips: system, summary, file-history-snapshot, queue-operation, progress.
"""
error_map = _collect_error_map(turns)
for turn in turns:
turn_type = turn.get("type", "")
if turn_type in _SKIP_TYPES:
continue
ts = _parse_ts(turn.get("timestamp")) or datetime.now(timezone.utc)
cwd = turn.get("cwd")
event_uuid = turn.get("uuid")
parent_uuid = turn.get("parentUuid")
msg = turn.get("message", {}) or {}
content = msg.get("content", [])
model = msg.get("model")
if turn_type == "assistant":
if isinstance(content, list):
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") != "tool_use":
continue
tool_id = item.get("id", "")
tool_name = item.get("name") or ""
inp = item.get("input") or {}
is_error = error_map.get(tool_id, False)
# Classify event type
skill_name: str | None = None
subagent_type: str | None = None
command_name: str | None = None
if tool_name == "Skill":
event_type = "tool_use"
# Real Skill input shape varies; check both keys
skill_name = inp.get("skill") or inp.get("name") or None
elif tool_name in ("Task", "Agent"):
event_type = "subagent"
subagent_type = inp.get("subagent_type") or tool_name
elif tool_name.startswith("mcp__"):
event_type = "mcp_call"
else:
event_type = "tool_use"
yield ParsedEvent(
event_uuid=event_uuid,
parent_uuid=parent_uuid,
tool_id=tool_id or None,
event_type=event_type,
tool_name=tool_name or None,
skill_name=skill_name,
subagent_type=subagent_type,
command_name=command_name,
is_error=is_error,
model=model,
cwd=cwd,
occurred_at=ts,
)
elif turn_type == "user":
# Slash-invocation detection: scan user message content for
# <command-name>/foo</command-name> tags. content is normally a
# plain string on slash-invocation turns; tolerate the
# list-of-blocks shape too in case Claude Code's wire format
# shifts to structured content later.
if isinstance(content, str):
text_parts = [content]
elif isinstance(content, list):
text_parts = [
item.get("text", "") for item in content if isinstance(item, dict) and item.get("type") == "text"
]
else:
text_parts = []
for text in text_parts:
if not text:
continue
for name in COMMAND_NAME_RE.findall(text):
yield ParsedEvent(
event_uuid=event_uuid,
parent_uuid=parent_uuid,
tool_id=None,
event_type="slash_command",
tool_name=None,
skill_name=None,
subagent_type=None,
command_name=name,
is_error=False,
model=None,
cwd=cwd,
occurred_at=ts,
)
# Synthetic plugin name Agnes uses to bundle flea-market store entities into
# a single Claude Code marketplace surface. Skill/agent/command identifiers
# from flea entities arrive as `agnes-store-bundle:<entity-name>` in the JSONL.
FLEA_BUNDLE_PREFIX = "agnes-store-bundle"
class MarketplaceItemLookup:
"""Preloads marketplace_plugins + store_entities into memory for O(1)
per-event attribution.
Claude Code writes plugin-defined skill/agent/command identifiers in the
JSONL as ``<plugin_name>:<local_name>`` (e.g. ``grpn:design``). The prefix
is the *plugin* name (curated plugin name, or the synthetic
``agnes-store-bundle`` for flea entities); the local part is the
skill/agent/command name relative to that plugin. Identifiers without
a ``:`` are either built-in tools (``Bash``, ``Read``, …) or flat slash
commands (``/exit``) — neither participates in marketplace telemetry.
``resolve()`` returns ``(source, parent_plugin, name, type)``:
- ``source``: ``'curated'`` | ``'flea'`` | ``'builtin'``
- ``parent_plugin``: the prefix when it matched a plugin, ``''`` otherwise
(also ``''`` for flea entities, which are standalone — no parent)
- ``name``: the local-part when source is curated/flea, ``None`` otherwise
- ``type``: ``'skill'`` | ``'agent'`` — derived from the event_type
(slash commands count as skill per product rule).
"""
def __init__(self, conn):
self._curated_plugins: set[str] = {
row[0] for row in conn.execute(
"SELECT DISTINCT name FROM marketplace_plugins"
).fetchall()
}
# Flea entities are looked up by name; we also need their type to
# determine whether the invocation should land as skill, agent or
# plugin in the rollup tables.
self._flea_entities: dict[str, str] = {
row[0]: row[1] for row in conn.execute(
"SELECT name, type FROM store_entities WHERE visibility_status='approved'"
).fetchall()
}
# Flea PLUGIN entities can be matched as a prefix too — `<plugin>:<inner>`
# invocations of a skill / agent that lives inside a flea plugin bundle
# land here, mirroring the curated nested attribution path. Standalone
# flea entities still flow through the FLEA_BUNDLE_PREFIX branch.
self._flea_plugins: set[str] = {
name for name, ent_type in self._flea_entities.items()
if ent_type == "plugin"
}
def resolve(self, event: ParsedEvent) -> tuple[str, str, str | None, str | None]:
"""Return ``(source, parent_plugin, name, type)``.
Priority order — first identifier with a ``:`` prefix that matches a
known plugin wins. Identifiers without ``:`` (built-in tools, flat
slash commands) drop through to the builtin fallback.
"""
# (identifier, default_type_when_matched)
candidates = (
(event.skill_name, "skill"),
(event.subagent_type, "agent"),
(event.command_name, "skill"), # slash commands counted as skill
)
for ident, default_type in candidates:
if not ident or ":" not in ident:
continue
prefix, local = ident.split(":", 1)
if prefix == FLEA_BUNDLE_PREFIX:
ent_type = self._flea_entities.get(local)
if ent_type:
# For a flea entity bundle, the local-part *is* the
# entity name and its type comes from the registry.
# Standalone flea items have no parent plugin.
return ("flea", "", local, ent_type)
# Bundle prefix but no matching entity — likely archived
# since the event was written. Fall through to builtin.
continue
if prefix in self._curated_plugins:
return ("curated", prefix, local, default_type)
if prefix in self._flea_plugins:
# Skill / agent nested inside a flea plugin bundle. Same
# shape as curated: source='flea', parent_plugin=<plugin
# name>, name=<inner local-part>.
return ("flea", prefix, local, default_type)
# Unknown plugin prefix — fall through to builtin (matches the
# rebuild_rollups filter that excludes unattributed events).
return ("builtin", "", None, None)
def compute_active_seconds(timestamps: list[datetime]) -> int:
"""Sum of intra-block durations. Gap >10 minutes = new block."""
if not timestamps:
return 0
timestamps = sorted(timestamps)
GAP = 600 # 10 minutes
blocks = []
block_start = timestamps[0]
prev = timestamps[0]
for ts in timestamps[1:]:
gap = (ts - prev).total_seconds()
if gap > GAP:
blocks.append((block_start, prev))
block_start = ts
prev = ts
blocks.append((block_start, prev))
return int(sum((end - start).total_seconds() for start, end in blocks))
def compute_summary(turns: list[dict], events: list[dict]) -> dict:
"""Build the usage_session_summary row dict from parsed turns and event rows.
Caller must fill in 'session_file' and 'username' after calling this.
events is a list of dicts (as produced by UsageProcessor, not ParsedEvent).
"""
# session_id: first turn with a sessionId field
session_id = None
for t in turns:
sid = t.get("sessionId")
if sid:
session_id = sid
break
# Timestamps from all turns that have one
timestamps: list[datetime] = []
user_messages = 0
assistant_messages = 0
model_counter: Counter = Counter()
input_tokens = 0
output_tokens = 0
cache_read_tokens = 0
cache_creation_tokens = 0
for t in turns:
ts = _parse_ts(t.get("timestamp"))
if ts:
timestamps.append(ts)
turn_type = t.get("type", "")
if turn_type == "user":
user_messages += 1
elif turn_type == "assistant":
assistant_messages += 1
msg = t.get("message", {}) or {}
m = msg.get("model")
if m:
model_counter[m] += 1
# Anthropic API usage block on assistant turns. Older sessions
# may lack `cache_*` keys (pre-prompt-caching) — `.get(k, 0)`
# tolerates that. Non-int values (corrupted JSONL) are skipped
# to keep one bad turn from poisoning the whole summary.
usage = msg.get("usage") or {}
for key, accum in (
("input_tokens", "input_tokens"),
("output_tokens", "output_tokens"),
("cache_read_input_tokens", "cache_read_tokens"),
("cache_creation_input_tokens", "cache_creation_tokens"),
):
v = usage.get(key, 0)
if isinstance(v, int):
if accum == "input_tokens":
input_tokens += v
elif accum == "output_tokens":
output_tokens += v
elif accum == "cache_read_tokens":
cache_read_tokens += v
elif accum == "cache_creation_tokens":
cache_creation_tokens += v
started_at = min(timestamps) if timestamps else None
ended_at = max(timestamps) if timestamps else None
wall_seconds = int((ended_at - started_at).total_seconds()) if started_at and ended_at else 0
active_seconds = compute_active_seconds(timestamps)
# Aggregate counts from events
tool_calls = sum(1 for e in events if e["event_type"] == "tool_use")
tool_errors = sum(1 for e in events if e.get("is_error"))
skill_invocations = sum(1 for e in events if e.get("skill_name"))
subagent_dispatches = sum(1 for e in events if e["event_type"] == "subagent")
mcp_calls = sum(1 for e in events if e["event_type"] == "mcp_call")
slash_commands = sum(1 for e in events if e["event_type"] == "slash_command")
distinct_tools = len({e["tool_name"] for e in events if e.get("tool_name")})
distinct_skills = len({e["skill_name"] for e in events if e.get("skill_name")})
primary_model = model_counter.most_common(1)[0][0] if model_counter else None
return {
"session_id": session_id or "",
"started_at": started_at,
"ended_at": ended_at,
"active_seconds": active_seconds,
"wall_seconds": wall_seconds,
"user_messages": user_messages,
"assistant_messages": assistant_messages,
"tool_calls": tool_calls,
"tool_errors": tool_errors,
"skill_invocations": skill_invocations,
"subagent_dispatches": subagent_dispatches,
"mcp_calls": mcp_calls,
"slash_commands": slash_commands,
"distinct_tools": distinct_tools,
"distinct_skills": distinct_skills,
"primary_model": primary_model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cache_read_tokens": cache_read_tokens,
"cache_creation_tokens": cache_creation_tokens,
"processor_version": USAGE_PROCESSOR_VERSION,
}
# Refresh interval for the 30-day window snapshot. The daily fact + 7d window
# refresh on every UsageProcessor tick (~10 min) because they're cheap and
# need to reflect recent activity. The 30d window is fuller, costs more to
# rebuild, and barely shifts between ticks — refresh hourly instead. Tracked
# in `session_processor_state` (processor_name='marketplace_rollup_30d').
WINDOW_30D_REFRESH_SECONDS = 3600
_MARKETPLACE_30D_TRACKER = "marketplace_rollup_30d"
def _identifier_split(skill_name, subagent_type, command_name, event_type):
"""Replicate MarketplaceItemLookup.resolve()'s prefix-split logic.
Returns ``(prefix, local, default_type)`` or ``(None, None, None)`` if
no identifier carries a plugin prefix. Used by the SQL rollup builder
so the attribution logic stays in one place even though the loop runs
in Python.
"""
candidates = (
(skill_name, "skill"),
(subagent_type, "agent"),
(command_name, "skill"), # slash commands counted as skill (product rule)
)
for ident, default_type in candidates:
if not ident or ":" not in ident:
continue
prefix, local = ident.split(":", 1)
return prefix, local, default_type
return None, None, None
def _attribute_event(curated_plugins: set[str], flea_entities: dict[str, str],
skill_name, subagent_type, command_name, event_type):
"""Resolve one event to (source, type, parent_plugin, name).
Returns None when the event doesn't belong in marketplace rollups
(built-in tool, flat slash command, unknown plugin prefix).
Lookup tables (curated_plugins, flea_entities) are passed in so the
caller can preload once and reuse across thousands of events.
"""
prefix, local, default_type = _identifier_split(skill_name, subagent_type, command_name, event_type)
if prefix is None:
return None
if prefix == FLEA_BUNDLE_PREFIX:
ent_type = flea_entities.get(local)
if ent_type is None:
return None
return ("flea", ent_type, "", local)
if prefix in curated_plugins:
return ("curated", default_type, prefix, local)
return None
def _aggregate_events(events_rows, curated_plugins, flea_entities, *, group_by_day: bool):
"""Walk raw event rows and produce aggregated buckets.
``events_rows`` shape: (day, user_id, is_error, skill_name, subagent_type,
command_name, event_type). When ``group_by_day=True`` returns rows keyed
by (day, source, type, parent_plugin, name) — for daily fact. Else
aggregates across the whole window (source, type, parent_plugin, name).
Plugin-level aggregation (type='plugin' rows) is added by walking the
child results once and grouping by parent.
"""
# Bucket: key -> dict(count, users:set, errors)
leaf: dict[tuple, dict] = {}
for row in events_rows:
day, uid, is_err, sk, sa, cm, etype = row
attributed = _attribute_event(curated_plugins, flea_entities, sk, sa, cm, etype)
if attributed is None:
continue
source, type_, parent, name = attributed
if group_by_day:
key = (day, source, type_, parent, name)
else:
key = (source, type_, parent, name)
b = leaf.setdefault(key, {"count": 0, "users": set(), "errors": 0})
b["count"] += 1
if uid:
b["users"].add(uid)
if is_err:
b["errors"] += 1
# Plugin-level rollup: curated invocations get a parent row, summing the
# children. distinct_users at plugin level recomputed across child users
# so a user counted in two skills of the same plugin doesn't double-count.
plugin_bucket: dict[tuple, dict] = {}
for key, vals in leaf.items():
if group_by_day:
day, source, type_, parent, name = key
else:
day = None
source, type_, parent, name = key
if source != "curated" or not parent:
continue
if group_by_day:
pkey = (day, "curated", "plugin", "", parent)
else:
pkey = ("curated", "plugin", "", parent)
pb = plugin_bucket.setdefault(pkey, {"count": 0, "users": set(), "errors": 0})
pb["count"] += vals["count"]
pb["users"] |= vals["users"]
pb["errors"] += vals["errors"]
leaf.update(plugin_bucket)
return leaf
def _last_30d_due(conn) -> bool:
"""True if the 30d window has not been refreshed within the threshold."""
row = conn.execute(
"SELECT processed_at FROM session_processor_state "
"WHERE processor_name = ? AND session_file = '__rollup__'",
[_MARKETPLACE_30D_TRACKER],
).fetchone()
if row is None:
return True
last = row[0]
if last is None:
return True
# processed_at is a TIMESTAMP — DuckDB returns datetime; normalise to UTC.
now = datetime.now(timezone.utc)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
return (now - last).total_seconds() >= WINDOW_30D_REFRESH_SECONDS
def _mark_last_30d_refreshed(conn) -> None:
# Pass the timestamp explicitly — DuckDB parses bare `current_timestamp`
# in an ON CONFLICT … DO UPDATE SET clause as a column name on the
# right-hand side, then can't bind it.
now = datetime.now(timezone.utc)
conn.execute(
"""
INSERT INTO session_processor_state
(processor_name, session_file, username, processed_at, items_extracted)
VALUES (?, '__rollup__', 'system', ?, 0)
ON CONFLICT (processor_name, session_file) DO UPDATE SET
processed_at = EXCLUDED.processed_at
""",
[_MARKETPLACE_30D_TRACKER, now],
)
def rebuild_rollups(conn, *, since_day=None, force_30d: bool = False) -> None:
"""Rebuild marketplace + legacy tool rollups from usage_events.
Refresh policy (called every UsageProcessor tick):
- ``usage_marketplace_item_daily``: incremental DELETE+INSERT for the
last 7 days (default), or full rebuild when ``since_day=None`` is
passed in via reprocess.
- ``usage_marketplace_item_window`` ``period_label='last_7d'``: full
DELETE+INSERT every tick.
- ``usage_marketplace_item_window`` ``period_label='last_30d'``: full
DELETE+INSERT once an hour, or when ``force_30d=True``.
- ``usage_tool_daily`` (legacy): incremental DELETE+INSERT, unchanged
behaviour from v42.
All updates run in a single transaction so a partial failure never
leaves the rollup set inconsistent.
"""
if since_day is None:
since_day = (datetime.now(timezone.utc) - timedelta(days=7)).date()
# Preload lookup tables once — reused across daily + 7d + 30d rebuilds.
curated_plugins = {
r[0] for r in conn.execute("SELECT DISTINCT name FROM marketplace_plugins").fetchall()
}
flea_entities = {
r[0]: r[1] for r in conn.execute(
"SELECT name, type FROM store_entities WHERE visibility_status='approved'"
).fetchall()
}
do_30d = force_30d or _last_30d_due(conn)
try:
conn.execute("BEGIN")
# ---- Legacy: usage_tool_daily (unchanged) ----
conn.execute("DELETE FROM usage_tool_daily WHERE day >= ?", [since_day])
conn.execute(
"""
INSERT INTO usage_tool_daily
(day, tool_name, source, invocations, error_count, distinct_users, distinct_sessions)
SELECT
CAST(occurred_at AS DATE) AS day,
tool_name,
source,
COUNT(*) AS invocations,
SUM(CASE WHEN is_error THEN 1 ELSE 0 END) AS error_count,
COUNT(DISTINCT username) AS distinct_users,
COUNT(DISTINCT session_id) AS distinct_sessions
FROM usage_events
WHERE CAST(occurred_at AS DATE) >= ?
AND tool_name IS NOT NULL
GROUP BY day, tool_name, source
""",
[since_day],
)
# ---- New: usage_marketplace_item_daily (incremental last 7d) ----
daily_events = conn.execute(
"""
SELECT
CAST(occurred_at AS DATE) AS day,
user_id,
is_error,
skill_name,
subagent_type,
command_name,
event_type
FROM usage_events
WHERE CAST(occurred_at AS DATE) >= ?
""",
[since_day],
).fetchall()
daily_buckets = _aggregate_events(
daily_events, curated_plugins, flea_entities, group_by_day=True
)
conn.execute("DELETE FROM usage_marketplace_item_daily WHERE day >= ?", [since_day])
if daily_buckets:
conn.executemany(
"""
INSERT INTO usage_marketplace_item_daily
(day, source, type, parent_plugin, name, count, distinct_users, error_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(day, source, type_, parent, name, v["count"], len(v["users"]), v["errors"])
for (day, source, type_, parent, name), v in daily_buckets.items()
],
)
# ---- New: usage_marketplace_item_window period_label='last_7d' (full) ----
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).date()
_rebuild_window(
conn, "last_7d", cutoff_7d, curated_plugins, flea_entities,
)
# ---- New: usage_marketplace_item_window period_label='last_30d' (hourly) ----
if do_30d:
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).date()
_rebuild_window(
conn, "last_30d", cutoff_30d, curated_plugins, flea_entities,
)
_mark_last_30d_refreshed(conn)
conn.execute("COMMIT")
except Exception:
try:
conn.execute("ROLLBACK")
except Exception:
pass
raise
def _rebuild_window(conn, period_label: str, cutoff_day, curated_plugins, flea_entities) -> None:
"""Full DELETE+INSERT of one period_label in usage_marketplace_item_window.
Caller wraps the call in a BEGIN/COMMIT transaction along with the
other rollup writes — this function only does the DML.
"""
events = conn.execute(
"""
SELECT
CAST(occurred_at AS DATE) AS day,
user_id,
is_error,
skill_name,
subagent_type,
command_name,
event_type
FROM usage_events
WHERE CAST(occurred_at AS DATE) >= ?
""",
[cutoff_day],
).fetchall()
buckets = _aggregate_events(
events, curated_plugins, flea_entities, group_by_day=False
)
conn.execute(
"DELETE FROM usage_marketplace_item_window WHERE period_label = ?",
[period_label],
)
if buckets:
conn.executemany(
"""
INSERT INTO usage_marketplace_item_window
(period_label, source, type, parent_plugin, name, invocations, distinct_users)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
[
(period_label, source, type_, parent, name, v["count"], len(v["users"]))
for (source, type_, parent, name), v in buckets.items()
],
)