* feat(flea): phase-1 — title, tagline, synthetic_name columns + upload UX
Schema v49 adds three user-facing metadata columns to store_entities:
- title (NOT NULL) — humanized display name shown on marketplace
surfaces in later phases. Acronym-aware humanizer in
src/store_naming.py (27 entries: MCP, API, OAuth, S3, …) shared
with the frontend via Jinja-injected dict so JS pre-fill and
Python backfill produce identical output.
- tagline (NULL, ≤200 chars) — optional short description for card
listings. Long-form `description` stays.
- synthetic_name (NOT NULL) — deterministic `<name>-by-<owner_username>`
stored as a column for indexing and as the single source of truth
for attribution lookups in later phases. Today's bundle bake still
uses suffixed_name() at the same call sites.
Migration (_v48_to_v49_migrate, Python function — humanize has no
SQL equivalent) backfills existing rows: title from
humanize_name(strip_archive_suffix(name)), synthetic from the concat
formula; tagline stays NULL. Idempotent (ADD COLUMN IF NOT EXISTS +
SET NOT NULL no-op on re-run).
Upload form (store_upload.html step 2) reorders fields: Title
(pre-filled from server-side humanize, JS keeps it in sync until
the user edits manually) → Name + dark synthetic preview on one
row (matches marketplace_item_detail.html dark code styling, no
copy button — preview only) → Short description with character
counter → Description (unchanged). Edit form (store_edit.html)
mirrors the layout with pre-filled values from the entity row.
API:
- POST /api/store/entities/preview returns `title` (humanized
fallback) for upload form pre-fill.
- POST + PUT /api/store/entities accept `title` and `tagline` form
fields with 100/200-char validation; PUT recomputes
synthetic_name when `name` changes (caller responsibility per
repo contract).
- StoreEntityResponse exposes all three new fields.
Repository:
- create() takes title + tagline + synthetic_name as optional
kwargs with derived defaults (humanize_name(name) / concat) so
existing test fixtures don't need to thread them.
- update() supports partial updates on all three; tagline empty
string clears via NULL sentinel.
- archive() recomputes synthetic_name on rename to the archived
slug so the column stays consistent with name.
Tests:
- New test_schema_v48_to_v49_migration.py: fresh install,
populated-row backfill (incl. archived row strip), idempotence,
NOT NULL constraint verification.
- test_store_naming.py: 14 humanize parametrize cases + acronym
dict invariants.
- test_store_api.py::TestStoreV49Metadata: preview humanize, POST
with explicit + fallback title, 100/200-char rejects, PUT
partial update + synthetic recompute on rename.
- Schema version assertion bumps (48 → 49) in test_db_schema_version,
test_home_stats, test_schema_v42_migration, test_schema_v46_migration.
Phase 1 only — surface rendering on cards / detail pages and
Claude Code bundle propagation come in later phases.
* feat(flea): phase-2 — wire title/tagline/owner through marketplace cards + detail pages
Phase 1 (7f4cfcbb) populated the three new columns on store_entities;
phase 2 surfaces them across the web presentation layer so the kebab-
case slug + bare username no longer leak into user-facing copy.
API:
- `_flea_to_item` now takes `conn` (both callsites updated) and sets
`display_name=entity.title`, `tagline=entity.tagline`, `owner=
_resolve_owner_display(conn, owner_user_id, owner_username)` —
matches the chain the curated path already uses (users.name →
users.email → fallback). The card JS chain `it.display_name ||
it.name` then renders the friendly form; `name` stays at the
suffixed slug as the technical identifier JS uses for fallbacks.
- `flea_detail` adds `display_name` + `tagline` to PluginDetailResponse
so the standalone skill/agent + plugin detail heroes pick them up
through the existing `d.display_name` / `d.tagline` chains.
- `_flea_inner_parent_fields` swaps `parent_display_name` from
`strip_archive_suffix(name)` to `entity.title or strip_archive_suffix(
name)`. Drives parent-plugin label in four surfaces at once:
breadcrumb 3rd segment, hero "part of <plugin>" meta-row,
helper "This skill is part of <plugin>" panel, and the Details
sidebar's "Parent plugin" row.
Templates — `marketplace_item_detail.html`:
- Pre-render: browser title, hero h1, and hero-window-label read
`(entity.title if entity else None) or inner_name or item_name or
plugin_name` so the SSR shell shows the friendly title before the
JS fetch lands (no flash of kebab-case).
- Breadcrumb last segment for flea standalone drops the `d.manifest_name
|| heroTitle` fallback in favour of just `heroTitle` — manifest_name
is the suffixed slug and users explicitly didn't want it in the path.
- Hero meta-row for flea standalone is now hidden. The prior "by
<author> · N installed · <size>" line duplicated install count
(hero telemetry chip below), owner + bundle size (Details sidebar).
Templates — `marketplace_plugin_detail.html`:
- Same SSR pre-render swap (title, h1, window-label, crumb-name).
- Hero tagline element starts hidden; JS shows it only when
`d.tagline` is truthy. Pre-fix it fell back to `d.description`
(long-form text), which read awkwardly under the h1 and pulled the
hero too tall. Description still renders in the "What it does"
panel below the hero.
- Initial "Loading…" placeholder removed so entities without a
tagline don't flash that text mid-fetch.
Tests:
- New `TestFleaPhase2Presentation` class in test_marketplace_api.py
(6 cases): card title + tagline + full-name owner, owner fallback
chain when users.name is NULL, flea_detail exposes title + tagline,
tagline null when omitted, inner skill parent_display_name uses
entity.title (explicit + humanize-fallback variants).
- Updated `TestListItems.test_flea_lists_uploads` to assert both
`display_name == "Alpha"` (humanized) and `name ==
"alpha-by-alice"` (suffixed slug compat).
- Updated `TestWebPages.test_marketplace_flea_detail_page_renders`
to look for the humanized title ("Page Skill") in the SSR shell
instead of the kebab-case `page-skill`.
* feat(flea): phase-3 — read synthetic_name from DB, suffixed_name() only on write
Phase 1 added the column + backfill, repo write paths keep it in sync.
Phase 3 routes every READ callsite through `store_entities.synthetic_name`
directly instead of recomputing `<name>-by-<owner_username>` on the fly,
and switches the collision query off the inline string concat. The
`suffixed_name()` primitive now lives exclusively in write flows.
Read callsites updated (all read `entity["synthetic_name"]` directly,
no fallback — the column is NOT NULL and a missing value would be a
real bug worth surfacing as KeyError):
- app/api/marketplace.py:_flea_to_item — card MarketplaceItem.name.
- app/api/marketplace.py:flea_detail — PluginDetailResponse.manifest_name.
- app/api/store.py:_entity_to_response — StoreEntityResponse.invocation_name.
- app/api/store.py PUT bundle re-bake — `suffixed` passed to
`_bake_plugin_tree`; entity is loaded pre-rename, so its
synthetic_name is the OLD value `_bake_plugin_tree` expects.
- app/api/store.py PUT rename — `old_suffix` for `_rename_baked_tree`.
- app/api/my_stack.py — StoreInstallEntry.invocation_name.
- src/marketplace_filter.py — manifest_name in served plugin entry.
`suffixed_name` imports removed from marketplace.py, my_stack.py, and
marketplace_filter.py (no remaining callsites). store.py keeps the
import for its write paths:
- POST create (`suffixed = suffixed_name(final_name, username)` →
passed to `_bake_plugin_tree` and `repo.create(synthetic_name=...)`).
- PUT rename collision check (`new_suffixed`).
- PUT rename `new_suffix` for `_rename_baked_tree` (proposed value).
- PUT rename `new_synthetic` for `repo.update(synthetic_name=...)`.
- Archive `old_suffix` + `new_suffix` for `_rename_baked_tree`
(retro-compute pre-archive value after `repo.archive` already
overwrote the DB row with the post-archive synthetic).
Collision SQL — `_suffixed_already_taken`:
WHERE name || '-by-' || owner_username = ? (before)
WHERE synthetic_name = ? (after)
Same matches today (phase 1 backfill + NOT NULL invariant + write
paths in sync); indexable + single source of truth going forward.
Repository:
- UserStoreInstallsRepository.list_for_user explicit SELECT extended
with `se.title`, `se.tagline`, `se.synthetic_name` so my_stack and
marketplace_filter callers can read them off the joined row.
Tests:
- test_store_api.py::test_invocation_name_reads_from_synthetic_column —
upload entity, manually override the column with a non-canonical
value, verify GET response returns the override (proves read path
consumes the column, not recomputes).
- test_marketplace_api.py::test_flea_card_and_detail_read_synthetic_name_from_db —
same proof for `MarketplaceItem.name` (card) and
`PluginDetailResponse.manifest_name` (detail).
* feat(flea): phase-4 — rename agnes-store-bundle → flea (synthetic plugin)
The synthetic plugin that wraps loose flea-market skills + agents into
one Claude Code plugin is renamed from `agnes-store-bundle` to `flea`.
Plugin-type flea uploads (their own standalone plugin entry) are
unaffected.
Constants:
- src/marketplace_filter.py:
- BUNDLE_PLUGIN_NAME: "agnes-store-bundle" → "flea" (Claude Code
plugin manifest name + .claude-plugin/plugin.json name)
- BUNDLE_PREFIXED_NAME: "store-bundle" → "flea" (on-disk ZIP /
git tree path, now plugins/flea/...)
Attribution layer (services/session_processors/usage_lib.py):
- FLEA_BUNDLE_PREFIX: "agnes-store-bundle" → "flea". The JSONL
invocation identifier going forward is `flea:<skill-name>`.
- New `_LEGACY_FLEA_BUNDLE_PREFIXES = ("agnes-store-bundle",)`.
`MarketplaceItemLookup.resolve()` + `_attribute_event()` accept BOTH
the new and the legacy prefix so historic usage_events (~90-day
retention) continue attributing to source='flea'. The tuple becomes
a no-op once the rename has been live past the retention window —
a follow-up commit can drop it then.
- USAGE_PROCESSOR_VERSION bumped 6 → 7 so the session-pipeline reprocess
loop re-runs attribution with the new + legacy prefix branches.
User-facing copy:
- /api/store/bundle.zip Content-Disposition filename: agnes-store-bundle.zip → flea.zip
- `agnes admin store pull` default --out: agnes-store-bundle.zip → flea.zip
- Docstrings + JS comment + welcome template comment updated.
Tests:
- skill_flea.jsonl fixture identifier updated to flea:flea-skill.
- New skill_flea_legacy.jsonl with the legacy prefix for backward-compat
coverage.
- New test `test_legacy_agnes_store_bundle_prefix_resolves` replays the
legacy fixture and asserts source='flea' attribution still lands.
- All other test assertions / mocks substituted mechanically:
test_session_processor_usage.py, test_usage_rollups.py,
test_marketplace_filter_store.py, test_store_api.py,
test_cli_refresh_marketplace.py.
- `_seed_flea_entity` (test_usage_rollups.py) + `_seed_attribution`
(test_session_processor_usage.py) helpers now supply the NOT NULL
`title` + `synthetic_name` columns from phase 1, since they INSERT
directly bypassing the repo's create() fallback.
Client rollover note (CHANGELOG): `agnes refresh-marketplace` will
install the new `flea@agnes` plugin and the local marketplace clone's
`plugins/store-bundle/` source folder is removed via `git reset --hard`.
Whether Claude Code itself auto-prunes the orphan `agnes-store-bundle
@agnes` registry entry is undocumented — to verify empirically on the
dev VM. If the orphan entry lingers, a follow-up will add targeted
cleanup; until then users can manually run
`claude plugin uninstall agnes-store-bundle@agnes`.
Verified locally: 98 passed (session_processor_usage + usage_rollups +
marketplace_filter_store + cli_refresh_marketplace) + 228 passed/2
skipped (store_api + marketplace_api + admin_store_submissions +
store_entity_versions + store_repositories).
* fix(flea): phase-5 — attribution keyspace mismatch (closes #335)
Pre-fix every flea skill/agent invocation silently fell through to
`usage_events.source = 'builtin'`. Root cause: lookup tables in
`services/session_processors/usage_lib.py` keyed `_flea_entities` (and
the derived `_flea_plugins` set) by `store_entities.name` — the
un-suffixed display name. Claude Code writes invocations as
`flea:<synthetic_name>` (e.g. `flea:xlsx-by-c-marustamyan`), so
`dict.get(local)` always missed and the resolver fell through to
builtin. Result: marketplace cards, detail telemetry chips, admin
group-by-source all showed 0 flea invocations even when the raw
JSONL stream was correct.
Phase 1 added the `synthetic_name` column + backfill; phase 4 renamed
the bundle prefix to `flea`; phase 5 finally flips the lookup
keyspace to match what JSONL writes.
usage_lib.py:
- `MarketplaceItemLookup.__init__` preload: `SELECT synthetic_name,
type FROM store_entities` (was `SELECT name, type`). `_flea_plugins`
set derived from those keys, so it now carries synthetic_names
too — matches what Claude Code writes when invoking a skill nested
inside a flea plugin (`<synthetic>:<inner>`).
- `rebuild_rollups` preload: same SELECT change; also derives
`flea_plugins` and threads it through `_aggregate_events` /
`_rebuild_window`.
- `_attribute_event`: signature extended with `flea_plugins`; new
branch `if prefix in flea_plugins: return ("flea", default_type,
prefix, local)` for flea-plugin-nested skills/agents. This branch
was added to `MarketplaceItemLookup.resolve()` in v6 (commit
e076ebbe) but the rollup builder's helper was never updated to
match, so nested skills inside flea plugins silently dropped out
of the daily/window fact tables.
- `USAGE_PROCESSOR_VERSION`: 7 → 8. Forces the session-pipeline
reprocess loop to re-attribute existing usage_events rows with
the corrected lookup so rollup tables fill correctly on the next
tick.
marketplace.py — 4 API stats lookup callsites switched from
`entity["name"]` to `entity["synthetic_name"]`:
- `_flea_to_item` (card stats lookup)
- `flea_detail` (`_build_telemetry` + `_load_inner_items_stats_by_parent`)
- `flea_skill_detail` (inner detail `parent_plugin` key)
- `flea_agent_detail` (inner detail `parent_plugin` key)
Tests:
- `skill_flea.jsonl` invocation: `flea:flea-skill` →
`flea:flea-skill-by-alice` (mirrors what Claude Code writes after
phase 1/4 — the suffixed synthetic_name).
- `test_flea_skill_attributed_with_empty_parent` assertion: rollup
`name` column now carries the synthetic_name.
No legacy `agnes-store-bundle` prefix backward compat — clean cut per
user direction (dev phase, no production data worth preserving).
Verified locally: 53 passed targeted (session_processor_usage +
usage_rollups + marketplace_filter_store) + 215 passed/2 skipped
broader (store_api + marketplace_api + admin_store_submissions +
store_entity_versions).
* fix(flea): phase-6 — plugin-level rollup aggregation parity for flea
Flea plugin entity cards + detail pages showed 0 invocations even
though nested skills had correct rollup rows. Root cause: the
plugin-level aggregation pass in `_aggregate_events` was hardcoded
to `source='curated'` only:
if source != "curated" or not parent:
continue
if group_by_day:
pkey = (day, "curated", "plugin", "", parent)
else:
pkey = ("curated", "plugin", "", parent)
So flea plugin entities never got a synthetic
`(source='flea', type='plugin', parent_plugin='', name=<synth>)`
row aggregating nested invocations. `_load_invocation_stats('flea')`
filters `parent_plugin = ''` and returned no row for flea plugin
entity cards, so `stats.get(entity["synthetic_name"])` missed and
the API exposed 0/0.
Triggered by empirical observation on the dev VM —
`codex-second-opinion-by-c-marustamyan` plugin showed 0 calls in
the listing card while its three inner skills (codex-setup ×3,
codex-review ×1, codex-second-opinion ×1) had the expected child
rollup rows.
Fix:
- Extend the guard to `source in ("curated", "flea")`.
- Replace the hardcoded `"curated"` in the `pkey` tuple with the
loop's `source` variable, so flea aggregation lands as `source=
'flea'` and curated aggregation continues landing as
`source='curated'`.
API path unchanged — `_load_invocation_stats('flea')` filters
`parent_plugin = ''` already picks up the new aggregated row
alongside standalone skill/agent rows. Rollup `name` field carries
the synthetic_name keyspace; no collision between standalone entity
synthetic and plugin entity synthetic (global suffix uniqueness
enforced by `_suffixed_already_taken`).
`USAGE_PROCESSOR_VERSION` bumped 8 → 9 to force a reprocess pass so
historic nested-invocation data fills the new plugin-level rows on
the next tick (instead of waiting for the next live invocation).
Tests:
- New `test_flea_plugin_row_aggregates_children` mirrors the existing
`test_curated_plugin_row_aggregates_children`: seeds a flea plugin
entity, three nested events (one user invoking two skills, a
second user invoking one) → asserts the aggregated plugin row
carries count=3, distinct_users=2 (union, not sum), plus the child
rows survive alongside.
Verified locally: 43 passed (session_processor_usage + usage_rollups)
+ 82 passed/2 skipped broader (+ marketplace_filter_store +
marketplace_api).
* refactor(marketplace): phase-7 — unify Details sidebar across detail surfaces
Five marketplace detail surfaces (curated plugin, flea plugin, curated
inner skill/agent, flea inner skill/agent, flea standalone skill/agent)
had drifted on which Details rows they show and what order — the same
field landed in different positions, some fields duplicated hero info,
and the flea plugin Owner row leaked the kebab-case `owner_username`
slug instead of the user's real name. This commit aligns all five
surfaces on a single scan order driven by UX priority:
identity → life-stage → telemetry → debug-tier
Concretely:
1. Curator / Owner (first scan signal — trust)
2. Parent plugin (inner skill/agent only)
3. Released (top-level only — plugins + flea standalone)
4. Last used (recency)
5. Active days (engagement consistency)
6. Version (flea standalone only — content hash)
7. Bundle size (debug-tier)
Dropped:
- Slug field on plugin detail surfaces (`marketplace_id` for curated,
`entity_id` for flea). Pure debug info, never user-relevant; URL
already carries it.
- Category + Installs on flea standalone skill/agent detail.
Category is already shown as a hero badge; install count is in
the hero telemetry chip — sidebar duplication added noise.
Owner display:
- Flea plugin Owner row now reads `d.owner_display` (resolved through
`users.name → users.email → owner_username` by `_resolve_owner_display`
in `app/api/marketplace.py:1491`) instead of the raw `d.author_name`
(which is `owner_username`, the kebab-case slug). API field already
populated from phase 2; templates just consume it.
- Curated Curator row continues to read `d.author_name` from
marketplace-metadata.json; `owner_todo` placeholder behavior
preserved.
Files:
- app/web/templates/marketplace_plugin_detail.html — rewrote the
Details render loop (lines 1364-1427 area). Slug row removed,
rows reordered, Owner branch reads `d.owner_display`.
- app/web/templates/marketplace_item_detail.html — both branches of
the Details sidebar (inner skill/agent + flea standalone) re-laid
around the same scan order. Telemetry helper unchanged, just
repositioned. Category + Installs rows removed from the
standalone branch.
No new tests — no existing test asserts the precise order of Details
rows or references the dropped fields in a sidebar context (grep
confirmed). API surface unchanged.
Verified locally: 84 passed / 2 skipped on `test_marketplace_api.py`
+ `test_store_api.py`.
* fix(flea): post-review hardening — N+1, v50 UNIQUE, docs, test cleanup
Addresses 5 critical findings from PR #342 code review:
1. N+1 query in `_flea_to_item` — owner-display resolution previously
ran one `SELECT … FROM users WHERE id = ?` per item in the listing
comprehension. Now batched via `_load_users_display` IN-query
prefetch; 50 items drops 51 user queries to 2. Regression-guarded
by `TestFleaOwnerDisplayBatched` (spies `_resolve_owner_display`
and asserts it's not called inside the list path).
2. Misleading comment in `src/marketplace_filter.py` claimed the
attribution layer accepts both `agnes-store-bundle` and `flea`
prefixes — it doesn't (clean cut per CHANGELOG). Rewrote to match
reality.
3. CHANGELOG `[Unreleased]` had two `### Changed` blocks. Merged into
one (BREAKING bullet first).
4. New v49→v50 migration adds `UNIQUE INDEX
idx_store_entities_synthetic_name`. v49 made `synthetic_name` the
canonical attribution key but uniqueness was only app-enforced;
v50 promotes the invariant to the DB layer. Migration pre-checks
for existing duplicates and raises `RuntimeError` listing them
rather than letting `CREATE UNIQUE INDEX` fail mid-way. v48→v49
migration gained an `is_nullable='YES'` guard on its `SET NOT NULL`
ALTERs so re-runs on a fully-migrated DB don't trip DuckDB's
"cannot alter entry … entries depend on it" block (the new index
counts as such an entry). Index is created by the migration only —
keeping it out of `_SYSTEM_SCHEMA` preserves fresh-install ordering
(CREATE TABLE → v49 ALTERs → v50 CREATE INDEX).
5. Deleted three redundant version-pinned schema asserts whose names
lied about their bodies (`test_schema_version_is_42` asserting
`== 49`, etc.). Canonical assert lives in
`test_db_schema_version.py`, renamed to
`test_schema_version_matches_constant`.
* fix(db): gate v34→v38 store_entities ALTER COLUMN steps on column state
CI on Linux failed `test_v17_to_v18_drops_*` after the v50 UNIQUE INDEX
landed. Root cause: those tests open a DB at the full target version,
seed fixtures, then reset `schema_version` to 17 and reopen — forcing
the ladder to re-run from 17 → current. With the v50 index now in place,
DuckDB blocks intermediate `ALTER COLUMN` steps on `store_entities`
("Cannot drop this column: an index depends on a column after it!" /
"Cannot alter entry because there are entries that depend on it"),
because `synthetic_name` (the indexed column) sits positionally after
the columns those steps touch.
Fix: convert the three SQL-list migrations that hit store_entities into
defensive Python functions:
- `_v34_to_v35_migrate` short-circuits when `synthetic_name` already
exists (post-v49 shape — the visibility_status rebuild is moot and
the DROP COLUMN would be blocked by the index).
- `_v35_to_v36_migrate` gates the `visibility_status SET NOT NULL` +
`SET DEFAULT` on `is_nullable='YES'` so it's a true no-op when the
column is already constrained.
- `_v37_to_v38_migrate` gates the `version_no SET NOT NULL` step the
same way.
Forward-roll path (real installs that never reset schema_version) is
unchanged: the gates fire `YES` → ALTERs run. The fix only changes
behavior for the "DB is already at v50 shape but version row says 17"
scenario the tests construct.
---------
Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
792 lines
32 KiB
Python
792 lines
32 KiB
Python
"""Pure helpers for UsageProcessor — event extraction from Claude Code session jsonls.
|
|
|
|
Session JSONL shape (as documented in dev_docs/session_explore.md and verified
|
|
against live samples):
|
|
|
|
Each line is a top-level event dict with:
|
|
{
|
|
"type": "user" | "assistant" | "progress" | "system" |
|
|
"tool_use_result" | "summary" | "file-history-snapshot" |
|
|
"queue-operation" | ...,
|
|
"uuid": "event-uuid",
|
|
"parentUuid": "parent-event-uuid",
|
|
"sessionId": "session-uuid",
|
|
"timestamp": "2026-05-12T07:30:00.000Z",
|
|
"cwd": "/path/to/cwd",
|
|
"message": {
|
|
"role": "user" | "assistant",
|
|
"model": "claude-...", # present on assistant turns
|
|
"content": [ # array or plain string on user turns
|
|
{"type": "text", "text": "..."},
|
|
{"type": "tool_use", "id": "tu_123", "name": "Bash", "input": {...}},
|
|
{"type": "tool_result", "tool_use_id": "tu_123", "is_error": false, "content": [...]}
|
|
]
|
|
}
|
|
}
|
|
|
|
Tool results appear as:
|
|
- Inline content items of type "tool_result" inside a user-role message, OR
|
|
- As top-level events of type "tool_use_result" (older Claude Code versions)
|
|
|
|
is_error correlation: build a map of {tool_use_id: True} from tool_result
|
|
items on the first pass, then apply to matching tool_use events.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Iterator
|
|
|
|
# v9: phase-6 plugin-level rollup parity for flea. `_aggregate_events`
|
|
# now produces synthetic (source='flea', type='plugin', parent_plugin='',
|
|
# name=<plugin_synth>) rows aggregating nested skill/agent invocations,
|
|
# mirroring the curated path. Without this, flea plugin entity cards +
|
|
# detail telemetry chips read 0 from `_load_invocation_stats` (which
|
|
# filters `parent_plugin = ''` for flea) even though nested children
|
|
# had correct rollup rows. Bump forces a re-aggregation pass so historic
|
|
# nested-invocation data fills the new plugin-level rows.
|
|
# (v8: phase-5 attribution keyspace fix + phase-4 bundle rename. Lookup
|
|
# tables key by `store_entities.synthetic_name` instead of `name`;
|
|
# `_attribute_event` gained the flea-plugin-nested branch so nested
|
|
# skills inside flea plugins flow into rollup tables.)
|
|
# (v5: v46 marketplace-telemetry refactor swapped AttributionLookup for
|
|
# MarketplaceItemLookup. Identifier prefix (`<plugin>:<local>`) now drives
|
|
# attribution and usage_events.source / ref_id are populated per-event from
|
|
# the live marketplace_plugins + store_entities tables.)
|
|
# (v4: #293 user_id column; v3: #303 <command-name> slash extraction.)
|
|
USAGE_PROCESSOR_VERSION = 9
|
|
|
|
# Claude Code wraps user-typed slash invocations as
|
|
# <command-name>/<name></command-name> inside the user message content
|
|
# (raw "/foo" plain text never reaches the jsonl). Tag may sit anywhere
|
|
# in the text — typically after a <command-message> sibling — so we
|
|
# search rather than anchor at start. Name pattern matches both flat
|
|
# commands (`clear`, `exit`) and plugin-prefixed ones (`plugin:name`).
|
|
COMMAND_NAME_RE = re.compile(r"<command-name>/([A-Za-z][\w:-]*)</command-name>")
|
|
|
|
# Event types to skip entirely
|
|
_SKIP_TYPES = frozenset(
|
|
{
|
|
"system",
|
|
"summary",
|
|
"file-history-snapshot",
|
|
"queue-operation",
|
|
"progress",
|
|
}
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ParsedEvent:
|
|
event_uuid: str | None
|
|
parent_uuid: str | None
|
|
tool_id: str | None # tool_use 'id' (tu_xxx) from message.content item; None for slash_command
|
|
event_type: str # 'tool_use' | 'slash_command' | 'subagent' | 'mcp_call'
|
|
tool_name: str | None
|
|
skill_name: str | None
|
|
subagent_type: str | None
|
|
command_name: str | None
|
|
is_error: bool
|
|
model: str | None
|
|
cwd: str | None
|
|
occurred_at: datetime
|
|
|
|
|
|
def _parse_ts(ts_str: str | None) -> datetime | None:
|
|
"""Parse ISO 8601 timestamp to aware datetime. Returns None on failure."""
|
|
if not ts_str:
|
|
return None
|
|
try:
|
|
ts_str = ts_str.replace("Z", "+00:00")
|
|
return datetime.fromisoformat(ts_str)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def _collect_error_map(turns: list[dict]) -> dict[str, bool]:
|
|
"""First-pass: collect tool_use_id → is_error from all tool_result items.
|
|
|
|
Tool results appear in two places:
|
|
1. As content items inside user-role messages (type='tool_result')
|
|
2. As top-level events of type='tool_use_result'
|
|
"""
|
|
errors: dict[str, bool] = {}
|
|
for turn in turns:
|
|
turn_type = turn.get("type", "")
|
|
|
|
# Top-level tool_use_result events (older Claude Code)
|
|
if turn_type == "tool_use_result":
|
|
tu_id = turn.get("tool_use_id") or turn.get("toolUseId")
|
|
if tu_id and turn.get("is_error"):
|
|
errors[tu_id] = True
|
|
|
|
# Inline tool_result content blocks inside user messages
|
|
msg = turn.get("message", {}) or {}
|
|
content = msg.get("content", [])
|
|
if isinstance(content, list):
|
|
for item in content:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
if item.get("type") == "tool_result":
|
|
tu_id = item.get("tool_use_id")
|
|
if tu_id and item.get("is_error"):
|
|
errors[tu_id] = True
|
|
|
|
return errors
|
|
|
|
|
|
def iter_events(turns: list[dict]) -> Iterator[ParsedEvent]:
|
|
"""Walk parsed JSONL turns and yield ParsedEvent for each observable event.
|
|
|
|
Recognises:
|
|
- Assistant tool_use blocks → event_type='tool_use' (or 'subagent'/'mcp_call')
|
|
- Skill tool → also extracts skill_name
|
|
- Task/Agent tools → event_type='subagent'
|
|
- mcp__* tools → event_type='mcp_call'
|
|
- User messages containing a <command-name>/foo</command-name> tag
|
|
(Claude Code's wire format for user-typed slash invocations)
|
|
→ event_type='slash_command', command_name='foo'
|
|
|
|
Skips: system, summary, file-history-snapshot, queue-operation, progress.
|
|
"""
|
|
error_map = _collect_error_map(turns)
|
|
|
|
for turn in turns:
|
|
turn_type = turn.get("type", "")
|
|
if turn_type in _SKIP_TYPES:
|
|
continue
|
|
|
|
ts = _parse_ts(turn.get("timestamp")) or datetime.now(timezone.utc)
|
|
cwd = turn.get("cwd")
|
|
event_uuid = turn.get("uuid")
|
|
parent_uuid = turn.get("parentUuid")
|
|
|
|
msg = turn.get("message", {}) or {}
|
|
content = msg.get("content", [])
|
|
model = msg.get("model")
|
|
|
|
if turn_type == "assistant":
|
|
if isinstance(content, list):
|
|
for item in content:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
if item.get("type") != "tool_use":
|
|
continue
|
|
|
|
tool_id = item.get("id", "")
|
|
tool_name = item.get("name") or ""
|
|
inp = item.get("input") or {}
|
|
is_error = error_map.get(tool_id, False)
|
|
|
|
# Classify event type
|
|
skill_name: str | None = None
|
|
subagent_type: str | None = None
|
|
command_name: str | None = None
|
|
|
|
if tool_name == "Skill":
|
|
event_type = "tool_use"
|
|
# Real Skill input shape varies; check both keys
|
|
skill_name = inp.get("skill") or inp.get("name") or None
|
|
elif tool_name in ("Task", "Agent"):
|
|
event_type = "subagent"
|
|
subagent_type = inp.get("subagent_type") or tool_name
|
|
elif tool_name.startswith("mcp__"):
|
|
event_type = "mcp_call"
|
|
else:
|
|
event_type = "tool_use"
|
|
|
|
yield ParsedEvent(
|
|
event_uuid=event_uuid,
|
|
parent_uuid=parent_uuid,
|
|
tool_id=tool_id or None,
|
|
event_type=event_type,
|
|
tool_name=tool_name or None,
|
|
skill_name=skill_name,
|
|
subagent_type=subagent_type,
|
|
command_name=command_name,
|
|
is_error=is_error,
|
|
model=model,
|
|
cwd=cwd,
|
|
occurred_at=ts,
|
|
)
|
|
|
|
elif turn_type == "user":
|
|
# Slash-invocation detection: scan user message content for
|
|
# <command-name>/foo</command-name> tags. content is normally a
|
|
# plain string on slash-invocation turns; tolerate the
|
|
# list-of-blocks shape too in case Claude Code's wire format
|
|
# shifts to structured content later.
|
|
if isinstance(content, str):
|
|
text_parts = [content]
|
|
elif isinstance(content, list):
|
|
text_parts = [
|
|
item.get("text", "") for item in content if isinstance(item, dict) and item.get("type") == "text"
|
|
]
|
|
else:
|
|
text_parts = []
|
|
|
|
for text in text_parts:
|
|
if not text:
|
|
continue
|
|
for name in COMMAND_NAME_RE.findall(text):
|
|
yield ParsedEvent(
|
|
event_uuid=event_uuid,
|
|
parent_uuid=parent_uuid,
|
|
tool_id=None,
|
|
event_type="slash_command",
|
|
tool_name=None,
|
|
skill_name=None,
|
|
subagent_type=None,
|
|
command_name=name,
|
|
is_error=False,
|
|
model=None,
|
|
cwd=cwd,
|
|
occurred_at=ts,
|
|
)
|
|
|
|
|
|
# Synthetic plugin name Agnes uses to bundle flea-market store entities into
|
|
# a single Claude Code marketplace surface. Skill/agent/command identifiers
|
|
# from flea entities arrive as `flea:<synthetic_name>` in the JSONL.
|
|
FLEA_BUNDLE_PREFIX = "flea"
|
|
|
|
|
|
class MarketplaceItemLookup:
|
|
"""Preloads marketplace_plugins + store_entities into memory for O(1)
|
|
per-event attribution.
|
|
|
|
Claude Code writes plugin-defined skill/agent/command identifiers in the
|
|
JSONL as ``<plugin_name>:<local_name>`` (e.g. ``grpn:design``). The prefix
|
|
is the *plugin* name (curated plugin name, or the synthetic
|
|
``flea`` for flea entities); the local part is the
|
|
skill/agent/command name relative to that plugin. Identifiers without
|
|
a ``:`` are either built-in tools (``Bash``, ``Read``, …) or flat slash
|
|
commands (``/exit``) — neither participates in marketplace telemetry.
|
|
|
|
``resolve()`` returns ``(source, parent_plugin, name, type)``:
|
|
- ``source``: ``'curated'`` | ``'flea'`` | ``'builtin'``
|
|
- ``parent_plugin``: the prefix when it matched a plugin, ``''`` otherwise
|
|
(also ``''`` for flea entities, which are standalone — no parent)
|
|
- ``name``: the local-part when source is curated/flea, ``None`` otherwise
|
|
- ``type``: ``'skill'`` | ``'agent'`` — derived from the event_type
|
|
(slash commands count as skill per product rule).
|
|
"""
|
|
|
|
def __init__(self, conn):
|
|
self._curated_plugins: set[str] = {
|
|
row[0] for row in conn.execute(
|
|
"SELECT DISTINCT name FROM marketplace_plugins"
|
|
).fetchall()
|
|
}
|
|
# v49 phase-5: lookup table keyed by `synthetic_name` (the
|
|
# `<name>-by-<owner>` slug baked into the served plugin tree). Claude
|
|
# Code writes the local part of a flea invocation as that synthetic
|
|
# name (`flea:xlsx-by-c-marustamyan`), so matching against `name`
|
|
# (un-suffixed) never landed. Type comes along so the rollup writer
|
|
# knows whether to record the invocation as skill / agent / plugin.
|
|
self._flea_entities: dict[str, str] = {
|
|
row[0]: row[1] for row in conn.execute(
|
|
"SELECT synthetic_name, type FROM store_entities WHERE visibility_status='approved'"
|
|
).fetchall()
|
|
}
|
|
# Flea PLUGIN entities can be matched as a prefix too — `<plugin>:<inner>`
|
|
# invocations of a skill / agent that lives inside a flea plugin bundle
|
|
# land here, mirroring the curated nested attribution path. Standalone
|
|
# flea entities still flow through the FLEA_BUNDLE_PREFIX branch.
|
|
# Set carries synthetic_names because that's the plugin slug Claude
|
|
# Code resolves at install time (v49 phase-4: `data["name"] = suffixed`
|
|
# in `_bake_plugin_tree` for type='plugin' entities).
|
|
self._flea_plugins: set[str] = {
|
|
synthetic for synthetic, ent_type in self._flea_entities.items()
|
|
if ent_type == "plugin"
|
|
}
|
|
|
|
def resolve(self, event: ParsedEvent) -> tuple[str, str, str | None, str | None]:
|
|
"""Return ``(source, parent_plugin, name, type)``.
|
|
|
|
Priority order — first identifier with a ``:`` prefix that matches a
|
|
known plugin wins. Identifiers without ``:`` (built-in tools, flat
|
|
slash commands) drop through to the builtin fallback.
|
|
"""
|
|
# (identifier, default_type_when_matched)
|
|
candidates = (
|
|
(event.skill_name, "skill"),
|
|
(event.subagent_type, "agent"),
|
|
(event.command_name, "skill"), # slash commands counted as skill
|
|
)
|
|
for ident, default_type in candidates:
|
|
if not ident or ":" not in ident:
|
|
continue
|
|
prefix, local = ident.split(":", 1)
|
|
if prefix == FLEA_BUNDLE_PREFIX:
|
|
ent_type = self._flea_entities.get(local)
|
|
if ent_type:
|
|
# For a flea entity bundle, the local-part *is* the
|
|
# entity name and its type comes from the registry.
|
|
# Standalone flea items have no parent plugin.
|
|
return ("flea", "", local, ent_type)
|
|
# Bundle prefix but no matching entity — likely archived
|
|
# since the event was written. Fall through to builtin.
|
|
continue
|
|
if prefix in self._curated_plugins:
|
|
return ("curated", prefix, local, default_type)
|
|
if prefix in self._flea_plugins:
|
|
# Skill / agent nested inside a flea plugin bundle. Same
|
|
# shape as curated: source='flea', parent_plugin=<plugin
|
|
# name>, name=<inner local-part>.
|
|
return ("flea", prefix, local, default_type)
|
|
# Unknown plugin prefix — fall through to builtin (matches the
|
|
# rebuild_rollups filter that excludes unattributed events).
|
|
return ("builtin", "", None, None)
|
|
|
|
|
|
def compute_active_seconds(timestamps: list[datetime]) -> int:
|
|
"""Sum of intra-block durations. Gap >10 minutes = new block."""
|
|
if not timestamps:
|
|
return 0
|
|
timestamps = sorted(timestamps)
|
|
GAP = 600 # 10 minutes
|
|
blocks = []
|
|
block_start = timestamps[0]
|
|
prev = timestamps[0]
|
|
for ts in timestamps[1:]:
|
|
gap = (ts - prev).total_seconds()
|
|
if gap > GAP:
|
|
blocks.append((block_start, prev))
|
|
block_start = ts
|
|
prev = ts
|
|
blocks.append((block_start, prev))
|
|
return int(sum((end - start).total_seconds() for start, end in blocks))
|
|
|
|
|
|
def compute_summary(turns: list[dict], events: list[dict]) -> dict:
|
|
"""Build the usage_session_summary row dict from parsed turns and event rows.
|
|
|
|
Caller must fill in 'session_file' and 'username' after calling this.
|
|
events is a list of dicts (as produced by UsageProcessor, not ParsedEvent).
|
|
"""
|
|
# session_id: first turn with a sessionId field
|
|
session_id = None
|
|
for t in turns:
|
|
sid = t.get("sessionId")
|
|
if sid:
|
|
session_id = sid
|
|
break
|
|
|
|
# Timestamps from all turns that have one
|
|
timestamps: list[datetime] = []
|
|
user_messages = 0
|
|
assistant_messages = 0
|
|
model_counter: Counter = Counter()
|
|
input_tokens = 0
|
|
output_tokens = 0
|
|
cache_read_tokens = 0
|
|
cache_creation_tokens = 0
|
|
|
|
for t in turns:
|
|
ts = _parse_ts(t.get("timestamp"))
|
|
if ts:
|
|
timestamps.append(ts)
|
|
turn_type = t.get("type", "")
|
|
if turn_type == "user":
|
|
user_messages += 1
|
|
elif turn_type == "assistant":
|
|
assistant_messages += 1
|
|
msg = t.get("message", {}) or {}
|
|
m = msg.get("model")
|
|
if m:
|
|
model_counter[m] += 1
|
|
# Anthropic API usage block on assistant turns. Older sessions
|
|
# may lack `cache_*` keys (pre-prompt-caching) — `.get(k, 0)`
|
|
# tolerates that. Non-int values (corrupted JSONL) are skipped
|
|
# to keep one bad turn from poisoning the whole summary.
|
|
usage = msg.get("usage") or {}
|
|
for key, accum in (
|
|
("input_tokens", "input_tokens"),
|
|
("output_tokens", "output_tokens"),
|
|
("cache_read_input_tokens", "cache_read_tokens"),
|
|
("cache_creation_input_tokens", "cache_creation_tokens"),
|
|
):
|
|
v = usage.get(key, 0)
|
|
if isinstance(v, int):
|
|
if accum == "input_tokens":
|
|
input_tokens += v
|
|
elif accum == "output_tokens":
|
|
output_tokens += v
|
|
elif accum == "cache_read_tokens":
|
|
cache_read_tokens += v
|
|
elif accum == "cache_creation_tokens":
|
|
cache_creation_tokens += v
|
|
|
|
started_at = min(timestamps) if timestamps else None
|
|
ended_at = max(timestamps) if timestamps else None
|
|
wall_seconds = int((ended_at - started_at).total_seconds()) if started_at and ended_at else 0
|
|
active_seconds = compute_active_seconds(timestamps)
|
|
|
|
# Aggregate counts from events
|
|
tool_calls = sum(1 for e in events if e["event_type"] == "tool_use")
|
|
tool_errors = sum(1 for e in events if e.get("is_error"))
|
|
skill_invocations = sum(1 for e in events if e.get("skill_name"))
|
|
subagent_dispatches = sum(1 for e in events if e["event_type"] == "subagent")
|
|
mcp_calls = sum(1 for e in events if e["event_type"] == "mcp_call")
|
|
slash_commands = sum(1 for e in events if e["event_type"] == "slash_command")
|
|
distinct_tools = len({e["tool_name"] for e in events if e.get("tool_name")})
|
|
distinct_skills = len({e["skill_name"] for e in events if e.get("skill_name")})
|
|
primary_model = model_counter.most_common(1)[0][0] if model_counter else None
|
|
|
|
return {
|
|
"session_id": session_id or "",
|
|
"started_at": started_at,
|
|
"ended_at": ended_at,
|
|
"active_seconds": active_seconds,
|
|
"wall_seconds": wall_seconds,
|
|
"user_messages": user_messages,
|
|
"assistant_messages": assistant_messages,
|
|
"tool_calls": tool_calls,
|
|
"tool_errors": tool_errors,
|
|
"skill_invocations": skill_invocations,
|
|
"subagent_dispatches": subagent_dispatches,
|
|
"mcp_calls": mcp_calls,
|
|
"slash_commands": slash_commands,
|
|
"distinct_tools": distinct_tools,
|
|
"distinct_skills": distinct_skills,
|
|
"primary_model": primary_model,
|
|
"input_tokens": input_tokens,
|
|
"output_tokens": output_tokens,
|
|
"cache_read_tokens": cache_read_tokens,
|
|
"cache_creation_tokens": cache_creation_tokens,
|
|
"processor_version": USAGE_PROCESSOR_VERSION,
|
|
}
|
|
|
|
|
|
# Refresh interval for the 30-day window snapshot. The daily fact + 7d window
|
|
# refresh on every UsageProcessor tick (~10 min) because they're cheap and
|
|
# need to reflect recent activity. The 30d window is fuller, costs more to
|
|
# rebuild, and barely shifts between ticks — refresh hourly instead. Tracked
|
|
# in `session_processor_state` (processor_name='marketplace_rollup_30d').
|
|
WINDOW_30D_REFRESH_SECONDS = 3600
|
|
_MARKETPLACE_30D_TRACKER = "marketplace_rollup_30d"
|
|
|
|
|
|
def _identifier_split(skill_name, subagent_type, command_name, event_type):
|
|
"""Replicate MarketplaceItemLookup.resolve()'s prefix-split logic.
|
|
|
|
Returns ``(prefix, local, default_type)`` or ``(None, None, None)`` if
|
|
no identifier carries a plugin prefix. Used by the SQL rollup builder
|
|
so the attribution logic stays in one place even though the loop runs
|
|
in Python.
|
|
"""
|
|
candidates = (
|
|
(skill_name, "skill"),
|
|
(subagent_type, "agent"),
|
|
(command_name, "skill"), # slash commands counted as skill (product rule)
|
|
)
|
|
for ident, default_type in candidates:
|
|
if not ident or ":" not in ident:
|
|
continue
|
|
prefix, local = ident.split(":", 1)
|
|
return prefix, local, default_type
|
|
return None, None, None
|
|
|
|
|
|
def _attribute_event(curated_plugins: set[str], flea_entities: dict[str, str],
|
|
flea_plugins: set[str],
|
|
skill_name, subagent_type, command_name, event_type):
|
|
"""Resolve one event to (source, type, parent_plugin, name).
|
|
|
|
Returns None when the event doesn't belong in marketplace rollups
|
|
(built-in tool, flat slash command, unknown plugin prefix).
|
|
|
|
Lookup tables (curated_plugins, flea_entities, flea_plugins) are passed
|
|
in so the caller can preload once and reuse across thousands of events.
|
|
Mirrors the four branches `MarketplaceItemLookup.resolve()` walks:
|
|
|
|
1. ``flea:<synthetic>`` — standalone flea skill/agent/plugin
|
|
2. ``<curated_plugin>:<inner>`` — nested skill/agent of a curated plugin
|
|
3. ``<flea_plugin>:<inner>`` — nested skill/agent of a flea plugin
|
|
4. anything else — None (filtered out of rollups)
|
|
"""
|
|
prefix, local, default_type = _identifier_split(skill_name, subagent_type, command_name, event_type)
|
|
if prefix is None:
|
|
return None
|
|
if prefix == FLEA_BUNDLE_PREFIX:
|
|
ent_type = flea_entities.get(local)
|
|
if ent_type is None:
|
|
return None
|
|
return ("flea", ent_type, "", local)
|
|
if prefix in curated_plugins:
|
|
return ("curated", default_type, prefix, local)
|
|
if prefix in flea_plugins:
|
|
# v49 phase-5: nested skill/agent inside a flea plugin bundle.
|
|
# Same shape as curated nested attribution (source='flea',
|
|
# parent_plugin=<synthetic plugin name>, name=<inner frontmatter
|
|
# name>). Without this branch the rollup builder silently dropped
|
|
# inner-item invocations even though MarketplaceItemLookup.resolve()
|
|
# — used by the live writer — handled them since v6.
|
|
return ("flea", default_type, prefix, local)
|
|
return None
|
|
|
|
|
|
def _aggregate_events(events_rows, curated_plugins, flea_entities,
|
|
flea_plugins, *, group_by_day: bool):
|
|
"""Walk raw event rows and produce aggregated buckets.
|
|
|
|
``events_rows`` shape: (day, user_id, is_error, skill_name, subagent_type,
|
|
command_name, event_type). When ``group_by_day=True`` returns rows keyed
|
|
by (day, source, type, parent_plugin, name) — for daily fact. Else
|
|
aggregates across the whole window (source, type, parent_plugin, name).
|
|
|
|
Plugin-level aggregation (type='plugin' rows) is added by walking the
|
|
child results once and grouping by parent.
|
|
"""
|
|
# Bucket: key -> dict(count, users:set, errors)
|
|
leaf: dict[tuple, dict] = {}
|
|
for row in events_rows:
|
|
day, uid, is_err, sk, sa, cm, etype = row
|
|
attributed = _attribute_event(curated_plugins, flea_entities, flea_plugins,
|
|
sk, sa, cm, etype)
|
|
if attributed is None:
|
|
continue
|
|
source, type_, parent, name = attributed
|
|
if group_by_day:
|
|
key = (day, source, type_, parent, name)
|
|
else:
|
|
key = (source, type_, parent, name)
|
|
b = leaf.setdefault(key, {"count": 0, "users": set(), "errors": 0})
|
|
b["count"] += 1
|
|
if uid:
|
|
b["users"].add(uid)
|
|
if is_err:
|
|
b["errors"] += 1
|
|
|
|
# Plugin-level rollup: curated AND flea invocations get a parent row,
|
|
# summing the children. distinct_users at plugin level recomputed across
|
|
# child users so a user counted in two skills of the same plugin doesn't
|
|
# double-count. v49 phase-6: extended to flea (was curated-only); without
|
|
# this, flea plugin entities never got an aggregated row, so the
|
|
# parent_plugin='' filter in `_load_invocation_stats` returned no rows
|
|
# for plugin cards / detail telemetry chips even though nested children
|
|
# were attributed correctly.
|
|
plugin_bucket: dict[tuple, dict] = {}
|
|
for key, vals in leaf.items():
|
|
if group_by_day:
|
|
day, source, type_, parent, name = key
|
|
else:
|
|
day = None
|
|
source, type_, parent, name = key
|
|
if source not in ("curated", "flea") or not parent:
|
|
continue
|
|
if group_by_day:
|
|
pkey = (day, source, "plugin", "", parent)
|
|
else:
|
|
pkey = (source, "plugin", "", parent)
|
|
pb = plugin_bucket.setdefault(pkey, {"count": 0, "users": set(), "errors": 0})
|
|
pb["count"] += vals["count"]
|
|
pb["users"] |= vals["users"]
|
|
pb["errors"] += vals["errors"]
|
|
leaf.update(plugin_bucket)
|
|
return leaf
|
|
|
|
|
|
def _last_30d_due(conn) -> bool:
|
|
"""True if the 30d window has not been refreshed within the threshold."""
|
|
row = conn.execute(
|
|
"SELECT processed_at FROM session_processor_state "
|
|
"WHERE processor_name = ? AND session_file = '__rollup__'",
|
|
[_MARKETPLACE_30D_TRACKER],
|
|
).fetchone()
|
|
if row is None:
|
|
return True
|
|
last = row[0]
|
|
if last is None:
|
|
return True
|
|
# processed_at is a TIMESTAMP — DuckDB returns datetime; normalise to UTC.
|
|
now = datetime.now(timezone.utc)
|
|
if last.tzinfo is None:
|
|
last = last.replace(tzinfo=timezone.utc)
|
|
return (now - last).total_seconds() >= WINDOW_30D_REFRESH_SECONDS
|
|
|
|
|
|
def _mark_last_30d_refreshed(conn) -> None:
|
|
# Pass the timestamp explicitly — DuckDB parses bare `current_timestamp`
|
|
# in an ON CONFLICT … DO UPDATE SET clause as a column name on the
|
|
# right-hand side, then can't bind it.
|
|
now = datetime.now(timezone.utc)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO session_processor_state
|
|
(processor_name, session_file, username, processed_at, items_extracted)
|
|
VALUES (?, '__rollup__', 'system', ?, 0)
|
|
ON CONFLICT (processor_name, session_file) DO UPDATE SET
|
|
processed_at = EXCLUDED.processed_at
|
|
""",
|
|
[_MARKETPLACE_30D_TRACKER, now],
|
|
)
|
|
|
|
|
|
def rebuild_rollups(conn, *, since_day=None, force_30d: bool = False) -> None:
|
|
"""Rebuild marketplace + legacy tool rollups from usage_events.
|
|
|
|
Refresh policy (called every UsageProcessor tick):
|
|
- ``usage_marketplace_item_daily``: incremental DELETE+INSERT for the
|
|
last 7 days (default), or full rebuild when ``since_day=None`` is
|
|
passed in via reprocess.
|
|
- ``usage_marketplace_item_window`` ``period_label='last_7d'``: full
|
|
DELETE+INSERT every tick.
|
|
- ``usage_marketplace_item_window`` ``period_label='last_30d'``: full
|
|
DELETE+INSERT once an hour, or when ``force_30d=True``.
|
|
- ``usage_tool_daily`` (legacy): incremental DELETE+INSERT, unchanged
|
|
behaviour from v42.
|
|
|
|
All updates run in a single transaction so a partial failure never
|
|
leaves the rollup set inconsistent.
|
|
"""
|
|
if since_day is None:
|
|
since_day = (datetime.now(timezone.utc) - timedelta(days=7)).date()
|
|
|
|
# Preload lookup tables once — reused across daily + 7d + 30d rebuilds.
|
|
# v49 phase-5: dict keyed by `synthetic_name` (matches the JSONL invocation
|
|
# local-part) instead of `name`. `flea_plugins` set drives the
|
|
# `<plugin>:<inner>` nested-attribution branch in `_attribute_event`.
|
|
curated_plugins = {
|
|
r[0] for r in conn.execute("SELECT DISTINCT name FROM marketplace_plugins").fetchall()
|
|
}
|
|
flea_entities = {
|
|
r[0]: r[1] for r in conn.execute(
|
|
"SELECT synthetic_name, type FROM store_entities WHERE visibility_status='approved'"
|
|
).fetchall()
|
|
}
|
|
flea_plugins = {
|
|
synthetic for synthetic, ent_type in flea_entities.items()
|
|
if ent_type == "plugin"
|
|
}
|
|
|
|
do_30d = force_30d or _last_30d_due(conn)
|
|
|
|
try:
|
|
conn.execute("BEGIN")
|
|
|
|
# ---- Legacy: usage_tool_daily (unchanged) ----
|
|
conn.execute("DELETE FROM usage_tool_daily WHERE day >= ?", [since_day])
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO usage_tool_daily
|
|
(day, tool_name, source, invocations, error_count, distinct_users, distinct_sessions)
|
|
SELECT
|
|
CAST(occurred_at AS DATE) AS day,
|
|
tool_name,
|
|
source,
|
|
COUNT(*) AS invocations,
|
|
SUM(CASE WHEN is_error THEN 1 ELSE 0 END) AS error_count,
|
|
COUNT(DISTINCT username) AS distinct_users,
|
|
COUNT(DISTINCT session_id) AS distinct_sessions
|
|
FROM usage_events
|
|
WHERE CAST(occurred_at AS DATE) >= ?
|
|
AND tool_name IS NOT NULL
|
|
GROUP BY day, tool_name, source
|
|
""",
|
|
[since_day],
|
|
)
|
|
|
|
# ---- New: usage_marketplace_item_daily (incremental last 7d) ----
|
|
daily_events = conn.execute(
|
|
"""
|
|
SELECT
|
|
CAST(occurred_at AS DATE) AS day,
|
|
user_id,
|
|
is_error,
|
|
skill_name,
|
|
subagent_type,
|
|
command_name,
|
|
event_type
|
|
FROM usage_events
|
|
WHERE CAST(occurred_at AS DATE) >= ?
|
|
""",
|
|
[since_day],
|
|
).fetchall()
|
|
daily_buckets = _aggregate_events(
|
|
daily_events, curated_plugins, flea_entities, flea_plugins,
|
|
group_by_day=True,
|
|
)
|
|
conn.execute("DELETE FROM usage_marketplace_item_daily WHERE day >= ?", [since_day])
|
|
if daily_buckets:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO usage_marketplace_item_daily
|
|
(day, source, type, parent_plugin, name, count, distinct_users, error_count)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(day, source, type_, parent, name, v["count"], len(v["users"]), v["errors"])
|
|
for (day, source, type_, parent, name), v in daily_buckets.items()
|
|
],
|
|
)
|
|
|
|
# ---- New: usage_marketplace_item_window period_label='last_7d' (full) ----
|
|
cutoff_7d = (datetime.now(timezone.utc) - timedelta(days=7)).date()
|
|
_rebuild_window(
|
|
conn, "last_7d", cutoff_7d, curated_plugins, flea_entities, flea_plugins,
|
|
)
|
|
|
|
# ---- New: usage_marketplace_item_window period_label='last_30d' (hourly) ----
|
|
if do_30d:
|
|
cutoff_30d = (datetime.now(timezone.utc) - timedelta(days=30)).date()
|
|
_rebuild_window(
|
|
conn, "last_30d", cutoff_30d, curated_plugins, flea_entities, flea_plugins,
|
|
)
|
|
_mark_last_30d_refreshed(conn)
|
|
|
|
conn.execute("COMMIT")
|
|
except Exception:
|
|
try:
|
|
conn.execute("ROLLBACK")
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
|
|
def _rebuild_window(conn, period_label: str, cutoff_day, curated_plugins,
|
|
flea_entities, flea_plugins) -> None:
|
|
"""Full DELETE+INSERT of one period_label in usage_marketplace_item_window.
|
|
|
|
Caller wraps the call in a BEGIN/COMMIT transaction along with the
|
|
other rollup writes — this function only does the DML.
|
|
"""
|
|
events = conn.execute(
|
|
"""
|
|
SELECT
|
|
CAST(occurred_at AS DATE) AS day,
|
|
user_id,
|
|
is_error,
|
|
skill_name,
|
|
subagent_type,
|
|
command_name,
|
|
event_type
|
|
FROM usage_events
|
|
WHERE CAST(occurred_at AS DATE) >= ?
|
|
""",
|
|
[cutoff_day],
|
|
).fetchall()
|
|
buckets = _aggregate_events(
|
|
events, curated_plugins, flea_entities, flea_plugins,
|
|
group_by_day=False,
|
|
)
|
|
conn.execute(
|
|
"DELETE FROM usage_marketplace_item_window WHERE period_label = ?",
|
|
[period_label],
|
|
)
|
|
if buckets:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO usage_marketplace_item_window
|
|
(period_label, source, type, parent_plugin, name, invocations, distinct_users)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(period_label, source, type_, parent, name, v["count"], len(v["users"]))
|
|
for (source, type_, parent, name), v in buckets.items()
|
|
],
|
|
)
|