agnes-the-ai-analyst/services/session_processors/usage_lib.py
minasarustamyan c6c72b9c00
feat(flea): marketplace refactor — data model, attribution, UI unification (#342)
* feat(flea): phase-1 — title, tagline, synthetic_name columns + upload UX

Schema v49 adds three user-facing metadata columns to store_entities:

- title (NOT NULL) — humanized display name shown on marketplace
  surfaces in later phases. Acronym-aware humanizer in
  src/store_naming.py (27 entries: MCP, API, OAuth, S3, …) shared
  with the frontend via Jinja-injected dict so JS pre-fill and
  Python backfill produce identical output.
- tagline (NULL, ≤200 chars) — optional short description for card
  listings. Long-form `description` stays.
- synthetic_name (NOT NULL) — deterministic `<name>-by-<owner_username>`
  stored as a column for indexing and as the single source of truth
  for attribution lookups in later phases. Today's bundle bake still
  uses suffixed_name() at the same call sites.

Migration (_v48_to_v49_migrate, Python function — humanize has no
SQL equivalent) backfills existing rows: title from
humanize_name(strip_archive_suffix(name)), synthetic from the concat
formula; tagline stays NULL. Idempotent (ADD COLUMN IF NOT EXISTS +
SET NOT NULL no-op on re-run).

Upload form (store_upload.html step 2) reorders fields: Title
(pre-filled from server-side humanize, JS keeps it in sync until
the user edits manually) → Name + dark synthetic preview on one
row (matches marketplace_item_detail.html dark code styling, no
copy button — preview only) → Short description with character
counter → Description (unchanged). Edit form (store_edit.html)
mirrors the layout with pre-filled values from the entity row.

API:

- POST /api/store/entities/preview returns `title` (humanized
  fallback) for upload form pre-fill.
- POST + PUT /api/store/entities accept `title` and `tagline` form
  fields with 100/200-char validation; PUT recomputes
  synthetic_name when `name` changes (caller responsibility per
  repo contract).
- StoreEntityResponse exposes all three new fields.

Repository:

- create() takes title + tagline + synthetic_name as optional
  kwargs with derived defaults (humanize_name(name) / concat) so
  existing test fixtures don't need to thread them.
- update() supports partial updates on all three; tagline empty
  string clears via NULL sentinel.
- archive() recomputes synthetic_name on rename to the archived
  slug so the column stays consistent with name.

Tests:

- New test_schema_v48_to_v49_migration.py: fresh install,
  populated-row backfill (incl. archived row strip), idempotence,
  NOT NULL constraint verification.
- test_store_naming.py: 14 humanize parametrize cases + acronym
  dict invariants.
- test_store_api.py::TestStoreV49Metadata: preview humanize, POST
  with explicit + fallback title, 100/200-char rejects, PUT
  partial update + synthetic recompute on rename.
- Schema version assertion bumps (48 → 49) in test_db_schema_version,
  test_home_stats, test_schema_v42_migration, test_schema_v46_migration.

Phase 1 only — surface rendering on cards / detail pages and
Claude Code bundle propagation come in later phases.

* feat(flea): phase-2 — wire title/tagline/owner through marketplace cards + detail pages

Phase 1 (7f4cfcbb) populated the three new columns on store_entities;
phase 2 surfaces them across the web presentation layer so the kebab-
case slug + bare username no longer leak into user-facing copy.

API:

- `_flea_to_item` now takes `conn` (both callsites updated) and sets
  `display_name=entity.title`, `tagline=entity.tagline`, `owner=
  _resolve_owner_display(conn, owner_user_id, owner_username)` —
  matches the chain the curated path already uses (users.name →
  users.email → fallback). The card JS chain `it.display_name ||
  it.name` then renders the friendly form; `name` stays at the
  suffixed slug as the technical identifier JS uses for fallbacks.
- `flea_detail` adds `display_name` + `tagline` to PluginDetailResponse
  so the standalone skill/agent + plugin detail heroes pick them up
  through the existing `d.display_name` / `d.tagline` chains.
- `_flea_inner_parent_fields` swaps `parent_display_name` from
  `strip_archive_suffix(name)` to `entity.title or strip_archive_suffix(
  name)`. Drives parent-plugin label in four surfaces at once:
  breadcrumb 3rd segment, hero "part of <plugin>" meta-row,
  helper "This skill is part of <plugin>" panel, and the Details
  sidebar's "Parent plugin" row.

Templates — `marketplace_item_detail.html`:

- Pre-render: browser title, hero h1, and hero-window-label read
  `(entity.title if entity else None) or inner_name or item_name or
  plugin_name` so the SSR shell shows the friendly title before the
  JS fetch lands (no flash of kebab-case).
- Breadcrumb last segment for flea standalone drops the `d.manifest_name
  || heroTitle` fallback in favour of just `heroTitle` — manifest_name
  is the suffixed slug and users explicitly didn't want it in the path.
- Hero meta-row for flea standalone is now hidden. The prior "by
  <author> · N installed · <size>" line duplicated install count
  (hero telemetry chip below), owner + bundle size (Details sidebar).

Templates — `marketplace_plugin_detail.html`:

- Same SSR pre-render swap (title, h1, window-label, crumb-name).
- Hero tagline element starts hidden; JS shows it only when
  `d.tagline` is truthy. Pre-fix it fell back to `d.description`
  (long-form text), which read awkwardly under the h1 and pulled the
  hero too tall. Description still renders in the "What it does"
  panel below the hero.
- Initial "Loading…" placeholder removed so entities without a
  tagline don't flash that text mid-fetch.

Tests:

- New `TestFleaPhase2Presentation` class in test_marketplace_api.py
  (6 cases): card title + tagline + full-name owner, owner fallback
  chain when users.name is NULL, flea_detail exposes title + tagline,
  tagline null when omitted, inner skill parent_display_name uses
  entity.title (explicit + humanize-fallback variants).
- Updated `TestListItems.test_flea_lists_uploads` to assert both
  `display_name == "Alpha"` (humanized) and `name ==
  "alpha-by-alice"` (suffixed slug compat).
- Updated `TestWebPages.test_marketplace_flea_detail_page_renders`
  to look for the humanized title ("Page Skill") in the SSR shell
  instead of the kebab-case `page-skill`.

* feat(flea): phase-3 — read synthetic_name from DB, suffixed_name() only on write

Phase 1 added the column + backfill, repo write paths keep it in sync.
Phase 3 routes every READ callsite through `store_entities.synthetic_name`
directly instead of recomputing `<name>-by-<owner_username>` on the fly,
and switches the collision query off the inline string concat. The
`suffixed_name()` primitive now lives exclusively in write flows.

Read callsites updated (all read `entity["synthetic_name"]` directly,
no fallback — the column is NOT NULL and a missing value would be a
real bug worth surfacing as KeyError):

- app/api/marketplace.py:_flea_to_item — card MarketplaceItem.name.
- app/api/marketplace.py:flea_detail — PluginDetailResponse.manifest_name.
- app/api/store.py:_entity_to_response — StoreEntityResponse.invocation_name.
- app/api/store.py PUT bundle re-bake — `suffixed` passed to
  `_bake_plugin_tree`; entity is loaded pre-rename, so its
  synthetic_name is the OLD value `_bake_plugin_tree` expects.
- app/api/store.py PUT rename — `old_suffix` for `_rename_baked_tree`.
- app/api/my_stack.py — StoreInstallEntry.invocation_name.
- src/marketplace_filter.py — manifest_name in served plugin entry.

`suffixed_name` imports removed from marketplace.py, my_stack.py, and
marketplace_filter.py (no remaining callsites). store.py keeps the
import for its write paths:

- POST create (`suffixed = suffixed_name(final_name, username)` →
  passed to `_bake_plugin_tree` and `repo.create(synthetic_name=...)`).
- PUT rename collision check (`new_suffixed`).
- PUT rename `new_suffix` for `_rename_baked_tree` (proposed value).
- PUT rename `new_synthetic` for `repo.update(synthetic_name=...)`.
- Archive `old_suffix` + `new_suffix` for `_rename_baked_tree`
  (retro-compute pre-archive value after `repo.archive` already
  overwrote the DB row with the post-archive synthetic).

Collision SQL — `_suffixed_already_taken`:

  WHERE name || '-by-' || owner_username = ?   (before)
  WHERE synthetic_name = ?                     (after)

Same matches today (phase 1 backfill + NOT NULL invariant + write
paths in sync); indexable + single source of truth going forward.

Repository:

- UserStoreInstallsRepository.list_for_user explicit SELECT extended
  with `se.title`, `se.tagline`, `se.synthetic_name` so my_stack and
  marketplace_filter callers can read them off the joined row.

Tests:

- test_store_api.py::test_invocation_name_reads_from_synthetic_column —
  upload entity, manually override the column with a non-canonical
  value, verify GET response returns the override (proves read path
  consumes the column, not recomputes).
- test_marketplace_api.py::test_flea_card_and_detail_read_synthetic_name_from_db —
  same proof for `MarketplaceItem.name` (card) and
  `PluginDetailResponse.manifest_name` (detail).

* feat(flea): phase-4 — rename agnes-store-bundle → flea (synthetic plugin)

The synthetic plugin that wraps loose flea-market skills + agents into
one Claude Code plugin is renamed from `agnes-store-bundle` to `flea`.
Plugin-type flea uploads (their own standalone plugin entry) are
unaffected.

Constants:
- src/marketplace_filter.py:
  - BUNDLE_PLUGIN_NAME: "agnes-store-bundle" → "flea"  (Claude Code
    plugin manifest name + .claude-plugin/plugin.json name)
  - BUNDLE_PREFIXED_NAME: "store-bundle" → "flea"      (on-disk ZIP /
    git tree path, now plugins/flea/...)

Attribution layer (services/session_processors/usage_lib.py):
- FLEA_BUNDLE_PREFIX: "agnes-store-bundle" → "flea". The JSONL
  invocation identifier going forward is `flea:<skill-name>`.
- New `_LEGACY_FLEA_BUNDLE_PREFIXES = ("agnes-store-bundle",)`.
  `MarketplaceItemLookup.resolve()` + `_attribute_event()` accept BOTH
  the new and the legacy prefix so historic usage_events (~90-day
  retention) continue attributing to source='flea'. The tuple becomes
  a no-op once the rename has been live past the retention window —
  a follow-up commit can drop it then.
- USAGE_PROCESSOR_VERSION bumped 6 → 7 so the session-pipeline reprocess
  loop re-runs attribution with the new + legacy prefix branches.

User-facing copy:
- /api/store/bundle.zip Content-Disposition filename: agnes-store-bundle.zip → flea.zip
- `agnes admin store pull` default --out: agnes-store-bundle.zip → flea.zip
- Docstrings + JS comment + welcome template comment updated.

Tests:
- skill_flea.jsonl fixture identifier updated to flea:flea-skill.
- New skill_flea_legacy.jsonl with the legacy prefix for backward-compat
  coverage.
- New test `test_legacy_agnes_store_bundle_prefix_resolves` replays the
  legacy fixture and asserts source='flea' attribution still lands.
- All other test assertions / mocks substituted mechanically:
  test_session_processor_usage.py, test_usage_rollups.py,
  test_marketplace_filter_store.py, test_store_api.py,
  test_cli_refresh_marketplace.py.
- `_seed_flea_entity` (test_usage_rollups.py) + `_seed_attribution`
  (test_session_processor_usage.py) helpers now supply the NOT NULL
  `title` + `synthetic_name` columns from phase 1, since they INSERT
  directly bypassing the repo's create() fallback.

Client rollover note (CHANGELOG): `agnes refresh-marketplace` will
install the new `flea@agnes` plugin and the local marketplace clone's
`plugins/store-bundle/` source folder is removed via `git reset --hard`.
Whether Claude Code itself auto-prunes the orphan `agnes-store-bundle
@agnes` registry entry is undocumented — to verify empirically on the
dev VM. If the orphan entry lingers, a follow-up will add targeted
cleanup; until then users can manually run
`claude plugin uninstall agnes-store-bundle@agnes`.

Verified locally: 98 passed (session_processor_usage + usage_rollups +
marketplace_filter_store + cli_refresh_marketplace) + 228 passed/2
skipped (store_api + marketplace_api + admin_store_submissions +
store_entity_versions + store_repositories).

* fix(flea): phase-5 — attribution keyspace mismatch (closes #335)

Pre-fix every flea skill/agent invocation silently fell through to
`usage_events.source = 'builtin'`. Root cause: lookup tables in
`services/session_processors/usage_lib.py` keyed `_flea_entities` (and
the derived `_flea_plugins` set) by `store_entities.name` — the
un-suffixed display name. Claude Code writes invocations as
`flea:<synthetic_name>` (e.g. `flea:xlsx-by-c-marustamyan`), so
`dict.get(local)` always missed and the resolver fell through to
builtin. Result: marketplace cards, detail telemetry chips, admin
group-by-source all showed 0 flea invocations even when the raw
JSONL stream was correct.

Phase 1 added the `synthetic_name` column + backfill; phase 4 renamed
the bundle prefix to `flea`; phase 5 finally flips the lookup
keyspace to match what JSONL writes.

usage_lib.py:
- `MarketplaceItemLookup.__init__` preload: `SELECT synthetic_name,
  type FROM store_entities` (was `SELECT name, type`). `_flea_plugins`
  set derived from those keys, so it now carries synthetic_names
  too — matches what Claude Code writes when invoking a skill nested
  inside a flea plugin (`<synthetic>:<inner>`).
- `rebuild_rollups` preload: same SELECT change; also derives
  `flea_plugins` and threads it through `_aggregate_events` /
  `_rebuild_window`.
- `_attribute_event`: signature extended with `flea_plugins`; new
  branch `if prefix in flea_plugins: return ("flea", default_type,
  prefix, local)` for flea-plugin-nested skills/agents. This branch
  was added to `MarketplaceItemLookup.resolve()` in v6 (commit
  e076ebbe) but the rollup builder's helper was never updated to
  match, so nested skills inside flea plugins silently dropped out
  of the daily/window fact tables.
- `USAGE_PROCESSOR_VERSION`: 7 → 8. Forces the session-pipeline
  reprocess loop to re-attribute existing usage_events rows with
  the corrected lookup so rollup tables fill correctly on the next
  tick.

marketplace.py — 4 API stats lookup callsites switched from
`entity["name"]` to `entity["synthetic_name"]`:
- `_flea_to_item` (card stats lookup)
- `flea_detail` (`_build_telemetry` + `_load_inner_items_stats_by_parent`)
- `flea_skill_detail` (inner detail `parent_plugin` key)
- `flea_agent_detail` (inner detail `parent_plugin` key)

Tests:
- `skill_flea.jsonl` invocation: `flea:flea-skill` →
  `flea:flea-skill-by-alice` (mirrors what Claude Code writes after
  phase 1/4 — the suffixed synthetic_name).
- `test_flea_skill_attributed_with_empty_parent` assertion: rollup
  `name` column now carries the synthetic_name.

No legacy `agnes-store-bundle` prefix backward compat — clean cut per
user direction (dev phase, no production data worth preserving).

Verified locally: 53 passed targeted (session_processor_usage +
usage_rollups + marketplace_filter_store) + 215 passed/2 skipped
broader (store_api + marketplace_api + admin_store_submissions +
store_entity_versions).

* fix(flea): phase-6 — plugin-level rollup aggregation parity for flea

Flea plugin entity cards + detail pages showed 0 invocations even
though nested skills had correct rollup rows. Root cause: the
plugin-level aggregation pass in `_aggregate_events` was hardcoded
to `source='curated'` only:

    if source != "curated" or not parent:
        continue
    if group_by_day:
        pkey = (day, "curated", "plugin", "", parent)
    else:
        pkey = ("curated", "plugin", "", parent)

So flea plugin entities never got a synthetic
`(source='flea', type='plugin', parent_plugin='', name=<synth>)`
row aggregating nested invocations. `_load_invocation_stats('flea')`
filters `parent_plugin = ''` and returned no row for flea plugin
entity cards, so `stats.get(entity["synthetic_name"])` missed and
the API exposed 0/0.

Triggered by empirical observation on the dev VM —
`codex-second-opinion-by-c-marustamyan` plugin showed 0 calls in
the listing card while its three inner skills (codex-setup ×3,
codex-review ×1, codex-second-opinion ×1) had the expected child
rollup rows.

Fix:

- Extend the guard to `source in ("curated", "flea")`.
- Replace the hardcoded `"curated"` in the `pkey` tuple with the
  loop's `source` variable, so flea aggregation lands as `source=
  'flea'` and curated aggregation continues landing as
  `source='curated'`.

API path unchanged — `_load_invocation_stats('flea')` filters
`parent_plugin = ''` already picks up the new aggregated row
alongside standalone skill/agent rows. Rollup `name` field carries
the synthetic_name keyspace; no collision between standalone entity
synthetic and plugin entity synthetic (global suffix uniqueness
enforced by `_suffixed_already_taken`).

`USAGE_PROCESSOR_VERSION` bumped 8 → 9 to force a reprocess pass so
historic nested-invocation data fills the new plugin-level rows on
the next tick (instead of waiting for the next live invocation).

Tests:

- New `test_flea_plugin_row_aggregates_children` mirrors the existing
  `test_curated_plugin_row_aggregates_children`: seeds a flea plugin
  entity, three nested events (one user invoking two skills, a
  second user invoking one) → asserts the aggregated plugin row
  carries count=3, distinct_users=2 (union, not sum), plus the child
  rows survive alongside.

Verified locally: 43 passed (session_processor_usage + usage_rollups)
+ 82 passed/2 skipped broader (+ marketplace_filter_store +
marketplace_api).

* refactor(marketplace): phase-7 — unify Details sidebar across detail surfaces

Five marketplace detail surfaces (curated plugin, flea plugin, curated
inner skill/agent, flea inner skill/agent, flea standalone skill/agent)
had drifted on which Details rows they show and what order — the same
field landed in different positions, some fields duplicated hero info,
and the flea plugin Owner row leaked the kebab-case `owner_username`
slug instead of the user's real name. This commit aligns all five
surfaces on a single scan order driven by UX priority:

  identity → life-stage → telemetry → debug-tier

Concretely:

  1. Curator / Owner          (first scan signal — trust)
  2. Parent plugin            (inner skill/agent only)
  3. Released                 (top-level only — plugins + flea standalone)
  4. Last used                (recency)
  5. Active days              (engagement consistency)
  6. Version                  (flea standalone only — content hash)
  7. Bundle size              (debug-tier)

Dropped:

  - Slug field on plugin detail surfaces (`marketplace_id` for curated,
    `entity_id` for flea). Pure debug info, never user-relevant; URL
    already carries it.
  - Category + Installs on flea standalone skill/agent detail.
    Category is already shown as a hero badge; install count is in
    the hero telemetry chip — sidebar duplication added noise.

Owner display:

  - Flea plugin Owner row now reads `d.owner_display` (resolved through
    `users.name → users.email → owner_username` by `_resolve_owner_display`
    in `app/api/marketplace.py:1491`) instead of the raw `d.author_name`
    (which is `owner_username`, the kebab-case slug). API field already
    populated from phase 2; templates just consume it.
  - Curated Curator row continues to read `d.author_name` from
    marketplace-metadata.json; `owner_todo` placeholder behavior
    preserved.

Files:

  - app/web/templates/marketplace_plugin_detail.html — rewrote the
    Details render loop (lines 1364-1427 area). Slug row removed,
    rows reordered, Owner branch reads `d.owner_display`.
  - app/web/templates/marketplace_item_detail.html — both branches of
    the Details sidebar (inner skill/agent + flea standalone) re-laid
    around the same scan order. Telemetry helper unchanged, just
    repositioned. Category + Installs rows removed from the
    standalone branch.

No new tests — no existing test asserts the precise order of Details
rows or references the dropped fields in a sidebar context (grep
confirmed). API surface unchanged.

Verified locally: 84 passed / 2 skipped on `test_marketplace_api.py`
+ `test_store_api.py`.

* fix(flea): post-review hardening — N+1, v50 UNIQUE, docs, test cleanup

Addresses 5 critical findings from PR #342 code review:

1. N+1 query in `_flea_to_item` — owner-display resolution previously
   ran one `SELECT … FROM users WHERE id = ?` per item in the listing
   comprehension. Now batched via `_load_users_display` IN-query
   prefetch; 50 items drops 51 user queries to 2. Regression-guarded
   by `TestFleaOwnerDisplayBatched` (spies `_resolve_owner_display`
   and asserts it's not called inside the list path).

2. Misleading comment in `src/marketplace_filter.py` claimed the
   attribution layer accepts both `agnes-store-bundle` and `flea`
   prefixes — it doesn't (clean cut per CHANGELOG). Rewrote to match
   reality.

3. CHANGELOG `[Unreleased]` had two `### Changed` blocks. Merged into
   one (BREAKING bullet first).

4. New v49→v50 migration adds `UNIQUE INDEX
   idx_store_entities_synthetic_name`. v49 made `synthetic_name` the
   canonical attribution key but uniqueness was only app-enforced;
   v50 promotes the invariant to the DB layer. Migration pre-checks
   for existing duplicates and raises `RuntimeError` listing them
   rather than letting `CREATE UNIQUE INDEX` fail mid-way. v48→v49
   migration gained an `is_nullable='YES'` guard on its `SET NOT NULL`
   ALTERs so re-runs on a fully-migrated DB don't trip DuckDB's
   "cannot alter entry … entries depend on it" block (the new index
   counts as such an entry). Index is created by the migration only —
   keeping it out of `_SYSTEM_SCHEMA` preserves fresh-install ordering
   (CREATE TABLE → v49 ALTERs → v50 CREATE INDEX).

5. Deleted three redundant version-pinned schema asserts whose names
   lied about their bodies (`test_schema_version_is_42` asserting
   `== 49`, etc.). Canonical assert lives in
   `test_db_schema_version.py`, renamed to
   `test_schema_version_matches_constant`.

* fix(db): gate v34→v38 store_entities ALTER COLUMN steps on column state

CI on Linux failed `test_v17_to_v18_drops_*` after the v50 UNIQUE INDEX
landed. Root cause: those tests open a DB at the full target version,
seed fixtures, then reset `schema_version` to 17 and reopen — forcing
the ladder to re-run from 17 → current. With the v50 index now in place,
DuckDB blocks intermediate `ALTER COLUMN` steps on `store_entities`
("Cannot drop this column: an index depends on a column after it!" /
"Cannot alter entry because there are entries that depend on it"),
because `synthetic_name` (the indexed column) sits positionally after
the columns those steps touch.

Fix: convert the three SQL-list migrations that hit store_entities into
defensive Python functions:

- `_v34_to_v35_migrate` short-circuits when `synthetic_name` already
  exists (post-v49 shape — the visibility_status rebuild is moot and
  the DROP COLUMN would be blocked by the index).
- `_v35_to_v36_migrate` gates the `visibility_status SET NOT NULL` +
  `SET DEFAULT` on `is_nullable='YES'` so it's a true no-op when the
  column is already constrained.
- `_v37_to_v38_migrate` gates the `version_no SET NOT NULL` step the
  same way.

Forward-roll path (real installs that never reset schema_version) is
unchanged: the gates fire `YES` → ALTERs run. The fix only changes
behavior for the "DB is already at v50 shape but version row says 17"
scenario the tests construct.

---------

Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
2026-05-19 02:32:41 +02:00

792 lines
32 KiB
Python

"""Pure helpers for UsageProcessor — event extraction from Claude Code session jsonls.
Session JSONL shape (as documented in dev_docs/session_explore.md and verified
against live samples):
Each line is a top-level event dict with:
{
"type": "user" | "assistant" | "progress" | "system" |
"tool_use_result" | "summary" | "file-history-snapshot" |
"queue-operation" | ...,
"uuid": "event-uuid",
"parentUuid": "parent-event-uuid",
"sessionId": "session-uuid",
"timestamp": "2026-05-12T07:30:00.000Z",
"cwd": "/path/to/cwd",
"message": {
"role": "user" | "assistant",
"model": "claude-...", # present on assistant turns
"content": [ # array or plain string on user turns
{"type": "text", "text": "..."},
{"type": "tool_use", "id": "tu_123", "name": "Bash", "input": {...}},
{"type": "tool_result", "tool_use_id": "tu_123", "is_error": false, "content": [...]}
]
}
}
Tool results appear as:
- Inline content items of type "tool_result" inside a user-role message, OR
- As top-level events of type "tool_use_result" (older Claude Code versions)
is_error correlation: build a map of {tool_use_id: True} from tool_result
items on the first pass, then apply to matching tool_use events.
"""
from __future__ import annotations
import re
from collections import Counter
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Iterator
# v9: phase-6 plugin-level rollup parity for flea. `_aggregate_events`
# now produces synthetic (source='flea', type='plugin', parent_plugin='',
# name=<plugin_synth>) rows aggregating nested skill/agent invocations,
# mirroring the curated path. Without this, flea plugin entity cards +
# detail telemetry chips read 0 from `_load_invocation_stats` (which
# filters `parent_plugin = ''` for flea) even though nested children
# had correct rollup rows. Bump forces a re-aggregation pass so historic
# nested-invocation data fills the new plugin-level rows.
# (v8: phase-5 attribution keyspace fix + phase-4 bundle rename. Lookup
# tables key by `store_entities.synthetic_name` instead of `name`;
# `_attribute_event` gained the flea-plugin-nested branch so nested
# skills inside flea plugins flow into rollup tables.)
# (v5: v46 marketplace-telemetry refactor swapped AttributionLookup for
# MarketplaceItemLookup. Identifier prefix (`<plugin>:<local>`) now drives
# attribution and usage_events.source / ref_id are populated per-event from
# the live marketplace_plugins + store_entities tables.)
# (v4: #293 user_id column; v3: #303 <command-name> slash extraction.)
USAGE_PROCESSOR_VERSION = 9
# Claude Code wraps user-typed slash invocations as
# <command-name>/<name></command-name> inside the user message content
# (raw "/foo" plain text never reaches the jsonl). Tag may sit anywhere
# in the text — typically after a <command-message> sibling — so we
# search rather than anchor at start. Name pattern matches both flat
# commands (`clear`, `exit`) and plugin-prefixed ones (`plugin:name`).
COMMAND_NAME_RE = re.compile(r"<command-name>/([A-Za-z][\w:-]*)</command-name>")
# Event types to skip entirely
_SKIP_TYPES = frozenset(
{
"system",
"summary",
"file-history-snapshot",
"queue-operation",
"progress",
}
)
@dataclass(frozen=True)
class ParsedEvent:
event_uuid: str | None
parent_uuid: str | None
tool_id: str | None # tool_use 'id' (tu_xxx) from message.content item; None for slash_command
event_type: str # 'tool_use' | 'slash_command' | 'subagent' | 'mcp_call'
tool_name: str | None
skill_name: str | None
subagent_type: str | None
command_name: str | None
is_error: bool
model: str | None
cwd: str | None
occurred_at: datetime
def _parse_ts(ts_str: str | None) -> datetime | None:
"""Parse ISO 8601 timestamp to aware datetime. Returns None on failure."""
if not ts_str:
return None
try:
ts_str = ts_str.replace("Z", "+00:00")
return datetime.fromisoformat(ts_str)
except (ValueError, TypeError):
return None
def _collect_error_map(turns: list[dict]) -> dict[str, bool]:
"""First-pass: collect tool_use_id → is_error from all tool_result items.
Tool results appear in two places:
1. As content items inside user-role messages (type='tool_result')
2. As top-level events of type='tool_use_result'
"""
errors: dict[str, bool] = {}
for turn in turns:
turn_type = turn.get("type", "")
# Top-level tool_use_result events (older Claude Code)
if turn_type == "tool_use_result":
tu_id = turn.get("tool_use_id") or turn.get("toolUseId")
if tu_id and turn.get("is_error"):
errors[tu_id] = True
# Inline tool_result content blocks inside user messages
msg = turn.get("message", {}) or {}
content = msg.get("content", [])
if isinstance(content, list):
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "tool_result":
tu_id = item.get("tool_use_id")
if tu_id and item.get("is_error"):
errors[tu_id] = True
return errors
def iter_events(turns: list[dict]) -> Iterator[ParsedEvent]:
"""Walk parsed JSONL turns and yield ParsedEvent for each observable event.
Recognises:
- Assistant tool_use blocks → event_type='tool_use' (or 'subagent'/'mcp_call')
- Skill tool → also extracts skill_name
- Task/Agent tools → event_type='subagent'
- mcp__* tools → event_type='mcp_call'
- User messages containing a <command-name>/foo</command-name> tag
(Claude Code's wire format for user-typed slash invocations)
→ event_type='slash_command', command_name='foo'
Skips: system, summary, file-history-snapshot, queue-operation, progress.
"""
error_map = _collect_error_map(turns)
for turn in turns:
turn_type = turn.get("type", "")
if turn_type in _SKIP_TYPES:
continue
ts = _parse_ts(turn.get("timestamp")) or datetime.now(timezone.utc)
cwd = turn.get("cwd")
event_uuid = turn.get("uuid")
parent_uuid = turn.get("parentUuid")
msg = turn.get("message", {}) or {}
content = msg.get("content", [])
model = msg.get("model")
if turn_type == "assistant":
if isinstance(content, list):
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") != "tool_use":
continue
tool_id = item.get("id", "")
tool_name = item.get("name") or ""
inp = item.get("input") or {}
is_error = error_map.get(tool_id, False)
# Classify event type
skill_name: str | None = None
subagent_type: str | None = None
command_name: str | None = None
if tool_name == "Skill":
event_type = "tool_use"
# Real Skill input shape varies; check both keys
skill_name = inp.get("skill") or inp.get("name") or None
elif tool_name in ("Task", "Agent"):
event_type = "subagent"
subagent_type = inp.get("subagent_type") or tool_name
elif tool_name.startswith("mcp__"):
event_type = "mcp_call"
else:
event_type = "tool_use"
yield ParsedEvent(
event_uuid=event_uuid,
parent_uuid=parent_uuid,
tool_id=tool_id or None,
event_type=event_type,
tool_name=tool_name or None,
skill_name=skill_name,
subagent_type=subagent_type,
command_name=command_name,
is_error=is_error,
model=model,
cwd=cwd,
occurred_at=ts,
)
elif turn_type == "user":
# Slash-invocation detection: scan user message content for
# <command-name>/foo</command-name> tags. content is normally a
# plain string on slash-invocation turns; tolerate the
# list-of-blocks shape too in case Claude Code's wire format
# shifts to structured content later.
if isinstance(content, str):
text_parts = [content]
elif isinstance(content, list):
text_parts = [
item.get("text", "") for item in content if isinstance(item, dict) and item.get("type") == "text"
]
else:
text_parts = []
for text in text_parts:
if not text:
continue
for name in COMMAND_NAME_RE.findall(text):
yield ParsedEvent(
event_uuid=event_uuid,
parent_uuid=parent_uuid,
tool_id=None,
event_type="slash_command",
tool_name=None,
skill_name=None,
subagent_type=None,
command_name=name,
is_error=False,
model=None,
cwd=cwd,
occurred_at=ts,
)
# Synthetic plugin name Agnes uses to bundle flea-market store entities into
# a single Claude Code marketplace surface. Skill/agent/command identifiers
# from flea entities arrive as `flea:<synthetic_name>` in the JSONL.
FLEA_BUNDLE_PREFIX = "flea"
class MarketplaceItemLookup:
"""Preloads marketplace_plugins + store_entities into memory for O(1)
per-event attribution.
Claude Code writes plugin-defined skill/agent/command identifiers in the
JSONL as ``<plugin_name>:<local_name>`` (e.g. ``grpn:design``). The prefix
is the *plugin* name (curated plugin name, or the synthetic
``flea`` for flea entities); the local part is the
skill/agent/command name relative to that plugin. Identifiers without
a ``:`` are either built-in tools (``Bash``, ``Read``, …) or flat slash
commands (``/exit``) — neither participates in marketplace telemetry.
``resolve()`` returns ``(source, parent_plugin, name, type)``:
- ``source``: ``'curated'`` | ``'flea'`` | ``'builtin'``
- ``parent_plugin``: the prefix when it matched a plugin, ``''`` otherwise
(also ``''`` for flea entities, which are standalone — no parent)
- ``name``: the local-part when source is curated/flea, ``None`` otherwise
- ``type``: ``'skill'`` | ``'agent'`` — derived from the event_type
(slash commands count as skill per product rule).
"""
def __init__(self, conn):
self._curated_plugins: set[str] = {
row[0] for row in conn.execute(
"SELECT DISTINCT name FROM marketplace_plugins"
).fetchall()
}
# v49 phase-5: lookup table keyed by `synthetic_name` (the
# `<name>-by-<owner>` slug baked into the served plugin tree). Claude
# Code writes the local part of a flea invocation as that synthetic
# name (`flea:xlsx-by-c-marustamyan`), so matching against `name`
# (un-suffixed) never landed. Type comes along so the rollup writer
# knows whether to record the invocation as skill / agent / plugin.
self._flea_entities: dict[str, str] = {
row[0]: row[1] for row in conn.execute(
"SELECT synthetic_name, type FROM store_entities WHERE visibility_status='approved'"
).fetchall()
}
# Flea PLUGIN entities can be matched as a prefix too — `<plugin>:<inner>`
# invocations of a skill / agent that lives inside a flea plugin bundle
# land here, mirroring the curated nested attribution path. Standalone
# flea entities still flow through the FLEA_BUNDLE_PREFIX branch.
# Set carries synthetic_names because that's the plugin slug Claude
# Code resolves at install time (v49 phase-4: `data["name"] = suffixed`
# in `_bake_plugin_tree` for type='plugin' entities).
self._flea_plugins: set[str] = {
synthetic for synthetic, ent_type in self._flea_entities.items()
if ent_type == "plugin"
}
def resolve(self, event: ParsedEvent) -> tuple[str, str, str | None, str | None]:
"""Return ``(source, parent_plugin, name, type)``.
Priority order — first identifier with a ``:`` prefix that matches a
known plugin wins. Identifiers without ``:`` (built-in tools, flat
slash commands) drop through to the builtin fallback.
"""
# (identifier, default_type_when_matched)
candidates = (
(event.skill_name, "skill"),
(event.subagent_type, "agent"),
(event.command_name, "skill"), # slash commands counted as skill
)
for ident, default_type in candidates:
if not ident or ":" not in ident:
continue
prefix, local = ident.split(":", 1)
if prefix == FLEA_BUNDLE_PREFIX:
ent_type = self._flea_entities.get(local)
if ent_type:
# For a flea entity bundle, the local-part *is* the
# entity name and its type comes from the registry.
# Standalone flea items have no parent plugin.
return ("flea", "", local, ent_type)
# Bundle prefix but no matching entity — likely archived
# since the event was written. Fall through to builtin.
continue
if prefix in self._curated_plugins:
return ("curated", prefix, local, default_type)
if prefix in self._flea_plugins:
# Skill / agent nested inside a flea plugin bundle. Same
# shape as curated: source='flea', parent_plugin=<plugin
# name>, name=<inner local-part>.
return ("flea", prefix, local, default_type)
# Unknown plugin prefix — fall through to builtin (matches the
# rebuild_rollups filter that excludes unattributed events).
return ("builtin", "", None, None)
def compute_active_seconds(timestamps: list[datetime]) -> int:
"""Sum of intra-block durations. Gap >10 minutes = new block."""
if not timestamps:
return 0
timestamps = sorted(timestamps)
GAP = 600 # 10 minutes
blocks = []
block_start = timestamps[0]
prev = timestamps[0]
for ts in timestamps[1:]:
gap = (ts - prev).total_seconds()
if gap > GAP:
blocks.append((block_start, prev))
block_start = ts
prev = ts
blocks.append((block_start, prev))
return int(sum((end - start).total_seconds() for start, end in blocks))
def compute_summary(turns: list[dict], events: list[dict]) -> dict:
"""Build the usage_session_summary row dict from parsed turns and event rows.
Caller must fill in 'session_file' and 'username' after calling this.
events is a list of dicts (as produced by UsageProcessor, not ParsedEvent).
"""
# session_id: first turn with a sessionId field
session_id = None
for t in turns:
sid = t.get("sessionId")
if sid:
session_id = sid
break
# Timestamps from all turns that have one
timestamps: list[datetime] = []
user_messages = 0
assistant_messages = 0
model_counter: Counter = Counter()
input_tokens = 0
output_tokens = 0
cache_read_tokens = 0
cache_creation_tokens = 0
for t in turns:
ts = _parse_ts(t.get("timestamp"))
if ts:
timestamps.append(ts)
turn_type = t.get("type", "")
if turn_type == "user":
user_messages += 1
elif turn_type == "assistant":
assistant_messages += 1
msg = t.get("message", {}) or {}
m = msg.get("model")
if m:
model_counter[m] += 1
# Anthropic API usage block on assistant turns. Older sessions
# may lack `cache_*` keys (pre-prompt-caching) — `.get(k, 0)`
# tolerates that. Non-int values (corrupted JSONL) are skipped
# to keep one bad turn from poisoning the whole summary.
usage = msg.get("usage") or {}
for key, accum in (
("input_tokens", "input_tokens"),
("output_tokens", "output_tokens"),
("cache_read_input_tokens", "cache_read_tokens"),
("cache_creation_input_tokens", "cache_creation_tokens"),
):
v = usage.get(key, 0)
if isinstance(v, int):
if accum == "input_tokens":
input_tokens += v
elif accum == "output_tokens":
output_tokens += v
elif accum == "cache_read_tokens":
cache_read_tokens += v
elif accum == "cache_creation_tokens":
cache_creation_tokens += v
started_at = min(timestamps) if timestamps else None
ended_at = max(timestamps) if timestamps else None
wall_seconds = int((ended_at - started_at).total_seconds()) if started_at and ended_at else 0
active_seconds = compute_active_seconds(timestamps)
# Aggregate counts from events
tool_calls = sum(1 for e in events if e["event_type"] == "tool_use")
tool_errors = sum(1 for e in events if e.get("is_error"))
skill_invocations = sum(1 for e in events if e.get("skill_name"))
subagent_dispatches = sum(1 for e in events if e["event_type"] == "subagent")
mcp_calls = sum(1 for e in events if e["event_type"] == "mcp_call")
slash_commands = sum(1 for e in events if e["event_type"] == "slash_command")
distinct_tools = len({e["tool_name"] for e in events if e.get("tool_name")})
distinct_skills = len({e["skill_name"] for e in events if e.get("skill_name")})
primary_model = model_counter.most_common(1)[0][0] if model_counter else None
return {
"session_id": session_id or "",
"started_at": started_at,
"ended_at": ended_at,
"active_seconds": active_seconds,
"wall_seconds": wall_seconds,
"user_messages": user_messages,
"assistant_messages": assistant_messages,
"tool_calls": tool_calls,
"tool_errors": tool_errors,
"skill_invocations": skill_invocations,
"subagent_dispatches": subagent_dispatches,
"mcp_calls": mcp_calls,
"slash_commands": slash_commands,
"distinct_tools": distinct_tools,
"distinct_skills": distinct_skills,
"primary_model": primary_model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cache_read_tokens": cache_read_tokens,
"cache_creation_tokens": cache_creation_tokens,
"processor_version": USAGE_PROCESSOR_VERSION,
}
# Refresh interval for the 30-day window snapshot. The daily fact + 7d window
# refresh on every UsageProcessor tick (~10 min) because they're cheap and
# need to reflect recent activity. The 30d window is fuller, costs more to
# rebuild, and barely shifts between ticks — refresh hourly instead. Tracked
# in `session_processor_state` (processor_name='marketplace_rollup_30d').
WINDOW_30D_REFRESH_SECONDS = 3600
_MARKETPLACE_30D_TRACKER = "marketplace_rollup_30d"
def _identifier_split(skill_name, subagent_type, command_name, event_type):
"""Replicate MarketplaceItemLookup.resolve()'s prefix-split logic.
Returns ``(prefix, local, default_type)`` or ``(None, None, None)`` if
no identifier carries a plugin prefix. Used by the SQL rollup builder
so the attribution logic stays in one place even though the loop runs
in Python.
"""
candidates = (
(skill_name, "skill"),
(subagent_type, "agent"),
(command_name, "skill"), # slash commands counted as skill (product rule)
)
for ident, default_type in candidates:
if not ident or ":" not in ident:
continue
prefix, local = ident.split(":", 1)
return prefix, local, default_type
return None, None, None
def _attribute_event(curated_plugins: set[str], flea_entities: dict[str, str],
flea_plugins: set[str],
skill_name, subagent_type, command_name, event_type):
"""Resolve one event to (source, type, parent_plugin, name).
Returns None when the event doesn't belong in marketplace rollups
(built-in tool, flat slash command, unknown plugin prefix).
Lookup tables (curated_plugins, flea_entities, flea_plugins) are passed
in so the caller can preload once and reuse across thousands of events.
Mirrors the four branches `MarketplaceItemLookup.resolve()` walks:
1. ``flea:<synthetic>`` — standalone flea skill/agent/plugin
2. ``<curated_plugin>:<inner>`` — nested skill/agent of a curated plugin
3. ``<flea_plugin>:<inner>`` — nested skill/agent of a flea plugin
4. anything else — None (filtered out of rollups)
"""
prefix, local, default_type = _identifier_split(skill_name, subagent_type, command_name, event_type)
if prefix is None:
return None
if prefix == FLEA_BUNDLE_PREFIX:
ent_type = flea_entities.get(local)
if ent_type is None:
return None
return ("flea", ent_type, "", local)
if prefix in curated_plugins:
return ("curated", default_type, prefix, local)
if prefix in flea_plugins:
# v49 phase-5: nested skill/agent inside a flea plugin bundle.
# Same shape as curated nested attribution (source='flea',
# parent_plugin=<synthetic plugin name>, name=<inner frontmatter
# name>). Without this branch the rollup builder silently dropped
# inner-item invocations even though MarketplaceItemLookup.resolve()
# — used by the live writer — handled them since v6.
return ("flea", default_type, prefix, local)
return None
def _aggregate_events(events_rows, curated_plugins, flea_entities,
flea_plugins, *, group_by_day: bool):
"""Walk raw event rows and produce aggregated buckets.
``events_rows`` shape: (day, user_id, is_error, skill_name, subagent_type,
command_name, event_type). When ``group_by_day=True`` returns rows keyed
by (day, source, type, parent_plugin, name) — for daily fact. Else
aggregates across the whole window (source, type, parent_plugin, name).
Plugin-level aggregation (type='plugin' rows) is added by walking the
child results once and grouping by parent.
"""
# Bucket: key -> dict(count, users:set, errors)
leaf: dict[tuple, dict] = {}
for row in events_rows:
day, uid, is_err, sk, sa, cm, etype = row
attributed = _attribute_event(curated_plugins, flea_entities, flea_plugins,
sk, sa, cm, etype)
if attributed is None:
continue
source, type_, parent, name = attributed
if group_by_day:
key = (day, source, type_, parent, name)
else:
key = (source, type_, parent, name)
b = leaf.setdefault(key, {"count": 0, "users": set(), "errors": 0})
b["count"] += 1
if uid:
b["users"].add(uid)
if is_err:
b["errors"] += 1
# Plugin-level rollup: curated AND flea invocations get a parent row,
# summing the children. distinct_users at plugin level recomputed across
# child users so a user counted in two skills of the same plugin doesn't
# double-count. v49 phase-6: extended to flea (was curated-only); without
# this, flea plugin entities never got an aggregated row, so the
# parent_plugin='' filter in `_load_invocation_stats` returned no rows
# for plugin cards / detail telemetry chips even though nested children
# were attributed correctly.
plugin_bucket: dict[tuple, dict] = {}
for key, vals in leaf.items():
if group_by_day:
day, source, type_, parent, name = key
else:
day = None
source, type_, parent, name = key
if source not in ("curated", "flea") or not parent:
continue
if group_by_day:
pkey = (day, source, "plugin", "", parent)
else:
pkey = (source, "plugin", "", parent)
pb = plugin_bucket.setdefault(pkey, {"count": 0, "users": set(), "errors": 0})
pb["count"] += vals["count"]
pb["users"] |= vals["users"]
pb["errors"] += vals["errors"]
leaf.update(plugin_bucket)
return leaf
def _last_30d_due(conn) -> bool:
"""True if the 30d window has not been refreshed within the threshold."""
row = conn.execute(
"SELECT processed_at FROM session_processor_state "
"WHERE processor_name = ? AND session_file = '__rollup__'",
[_MARKETPLACE_30D_TRACKER],
).fetchone()
if row is None:
return True
last = row[0]
if last is None:
return True
# processed_at is a TIMESTAMP — DuckDB returns datetime; normalise to UTC.
now = datetime.now(timezone.utc)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
return (now - last).total_seconds() >= WINDOW_30D_REFRESH_SECONDS
def _mark_last_30d_refreshed(conn) -> None:
# Pass the timestamp explicitly — DuckDB parses bare `current_timestamp`
# in an ON CONFLICT … DO UPDATE SET clause as a column name on the
# right-hand side, then can't bind it.
now = datetime.now(timezone.utc)
conn.execute(
"""
INSERT INTO session_processor_state
(processor_name, session_file, username, processed_at, items_extracted)
VALUES (?, '__rollup__', 'system', ?, 0)
ON CONFLICT (processor_name, session_file) DO UPDATE SET
processed_at = EXCLUDED.processed_at
""",
[_MARKETPLACE_30D_TRACKER, now],
)
def rebuild_rollups(conn, *, since_day=None, force_30d: bool = False) -> None:
"""Rebuild marketplace + legacy tool rollups from usage_events.
Refresh policy (called every UsageProcessor tick):
- ``usage_marketplace_item_daily``: incremental DELETE+INSERT for the
last 7 days (default), or full rebuild when ``since_day=None`` is
passed in via reprocess.
- ``usage_marketplace_item_window`` ``period_label='last_7d'``: full
DELETE+INSERT every tick.
- ``usage_marketplace_item_window`` ``period_label='last_30d'``: full
DELETE+INSERT once an hour, or when ``force_30d=True``.
- ``usage_tool_daily`` (legacy): incremental DELETE+INSERT, unchanged
behaviour from v42.
All updates run in a single transaction so a partial failure never
leaves the rollup set inconsistent.
"""
if since_day is None:
since_day = (datetime.now(timezone.utc) - timedelta(days=7)).date()
# Preload lookup tables once — reused across daily + 7d + 30d rebuilds.
# v49 phase-5: dict keyed by `synthetic_name` (matches the JSONL invocation
# local-part) instead of `name`. `flea_plugins` set drives the
# `<plugin>:<inner>` nested-attribution branch in `_attribute_event`.
curated_plugins = {
r[0] for r in conn.execute("SELECT DISTINCT name FROM marketplace_plugins").fetchall()
}
flea_entities = {
r[0]: r[1] for r in conn.execute(
"SELECT synthetic_name, type FROM store_entities WHERE visibility_status='approved'"
).fetchall()
}
flea_plugins = {
synthetic for synthetic, ent_type in flea_entities.items()
if ent_type == "plugin"
}
do_30d = force_30d or _last_30d_due(conn)
try:
conn.execute("BEGIN")
# ---- Legacy: usage_tool_daily (unchanged) ----
conn.execute("DELETE FROM usage_tool_daily WHERE day >= ?", [since_day])
conn.execute(
"""
INSERT INTO usage_tool_daily
(day, tool_name, source, invocations, error_count, distinct_users, distinct_sessions)
SELECT
CAST(occurred_at AS DATE) AS day,
tool_name,
source,
COUNT(*) AS invocations,
SUM(CASE WHEN is_error THEN 1 ELSE 0 END) AS error_count,
COUNT(DISTINCT username) AS distinct_users,
COUNT(DISTINCT session_id) AS distinct_sessions
FROM usage_events
WHERE CAST(occurred_at AS DATE) >= ?
AND tool_name IS NOT NULL
GROUP BY day, tool_name, source
""",
[since_day],
)
# ---- New: usage_marketplace_item_daily (incremental last 7d) ----
daily_events = conn.execute(
"""
SELECT
CAST(occurred_at AS DATE) AS day,
user_id,
is_error,
skill_name,
subagent_type,
command_name,
event_type
FROM usage_events
WHERE CAST(occurred_at AS DATE) >= ?
""",
[since_day],
).fetchall()
daily_buckets = _aggregate_events(
daily_events, curated_plugins, flea_entities, flea_plugins,
group_by_day=True,
)
conn.execute("DELETE FROM usage_marketplace_item_daily WHERE day >= ?", [since_day])
if daily_buckets:
conn.executemany(
"""
INSERT INTO usage_marketplace_item_daily
(day, source, type, parent_plugin, name, count, distinct_users, error_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(day, source, type_, parent, name, v["count"], len(v["users"]), v["errors"])
for (day, source, type_, parent, name), v in daily_buckets.items()
],
)
# ---- New: usage_marketplace_item_window period_label='last_7d' (full) ----
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).date()
_rebuild_window(
conn, "last_7d", cutoff_7d, curated_plugins, flea_entities, flea_plugins,
)
# ---- New: usage_marketplace_item_window period_label='last_30d' (hourly) ----
if do_30d:
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).date()
_rebuild_window(
conn, "last_30d", cutoff_30d, curated_plugins, flea_entities, flea_plugins,
)
_mark_last_30d_refreshed(conn)
conn.execute("COMMIT")
except Exception:
try:
conn.execute("ROLLBACK")
except Exception:
pass
raise
def _rebuild_window(conn, period_label: str, cutoff_day, curated_plugins,
flea_entities, flea_plugins) -> None:
"""Full DELETE+INSERT of one period_label in usage_marketplace_item_window.
Caller wraps the call in a BEGIN/COMMIT transaction along with the
other rollup writes — this function only does the DML.
"""
events = conn.execute(
"""
SELECT
CAST(occurred_at AS DATE) AS day,
user_id,
is_error,
skill_name,
subagent_type,
command_name,
event_type
FROM usage_events
WHERE CAST(occurred_at AS DATE) >= ?
""",
[cutoff_day],
).fetchall()
buckets = _aggregate_events(
events, curated_plugins, flea_entities, flea_plugins,
group_by_day=False,
)
conn.execute(
"DELETE FROM usage_marketplace_item_window WHERE period_label = ?",
[period_label],
)
if buckets:
conn.executemany(
"""
INSERT INTO usage_marketplace_item_window
(period_label, source, type, parent_plugin, name, invocations, distinct_users)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
[
(period_label, source, type_, parent, name, v["count"], len(v["users"]))
for (source, type_, parent, name), v in buckets.items()
],
)