"""Build a deterministic ZIP + per-request info for the aggregated marketplace. The ZIP is the delivery artifact for the non-git channel. Its layout: .claude-plugin/marketplace.json ← merged, prefixed-name manifest plugins//... ← copy of ${DATA_DIR}/marketplaces// plugins//... .agnes/version.json ← per-request diagnostics Determinism requirements: - Members sorted by arcname - Fixed DOS timestamp (1980-01-01) - ZIP_DEFLATED - UNIX mode 0o644 Two users with the same allowed plugin set therefore produce byte-identical ZIPs (modulo `.agnes/version.json`, which carries `generated_at`; this is why the git channel strips that file — see git_backend). """ from __future__ import annotations import io import json import os import threading import zipfile from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple import duckdb from cachetools import TTLCache from src import marketplace_filter MARKETPLACE_NAME = "agnes" # In-process TTL cache for compute_etag() results. The expensive part of # compute_etag is a SHA256 over every plugin file on disk; for a stable # marketplace this hash doesn't change between requests. We key on the # resolved plugin set (prefixed_name + version + plugin_dir path) so two # users with the same allowed view share the same cache entry. # # TTL bounds drift between cache and on-disk content. Marketplace sync runs # nightly; the default 120s TTL means the first session-start in a cold # minute pays the SHA cost and the next ~120s of session-starts (across all # users with the same view) hit the cache. Override with # AGNES_MARKETPLACE_ETAG_TTL= for tests / tighter staleness bounds; # set 0 to disable. _ETAG_CACHE_TTL = int(os.environ.get("AGNES_MARKETPLACE_ETAG_TTL", "120")) _ETAG_CACHE: Optional[TTLCache] = ( TTLCache(maxsize=512, ttl=_ETAG_CACHE_TTL) if _ETAG_CACHE_TTL > 0 else None ) _ETAG_CACHE_LOCK = threading.Lock() MARKETPLACE_OWNER = {"name": "Agnes AI Analyst"} MARKETPLACE_DESCRIPTION = ( "Aggregated per-user Claude Code marketplace — served by agnes-the-ai-analyst" ) DETERMINISTIC_TIMESTAMP = (1980, 1, 1, 0, 0, 0) def _merged_manifest(plugins: List[dict], etag: str) -> Dict[str, Any]: """Synthesize .claude-plugin/marketplace.json over the filtered plugin set. Each entry copies the plugin's cached `raw` manifest, then overrides: - `name` = manifest_name (from the plugin's own plugin.json — must match the loaded plugin's identity, or the `/plugin` UI Components panel can't link the loaded plugin back to its catalog entry; see src.marketplace_filter) - `source` = "./plugins/" (slug-prefixed dir avoids cross-marketplace file collisions in the flat ZIP / git tree layout) All other fields (version, description, author, homepage, keywords, ...) are preserved so Claude Code UI looks the same as if the user pulled from the upstream marketplace directly. """ entries: List[dict] = [] for plugin in plugins: entry = dict(plugin["raw"]) # shallow copy — we only override two keys entry["name"] = plugin["manifest_name"] entry["source"] = f"./plugins/{plugin['prefixed_name']}" # Always honor the cached version on the aggregated manifest — the # plugin_dir on disk might have drifted if sync fetched a new commit # after marketplace_plugins was written, but this is the authoritative # record. if plugin.get("version") and "version" not in entry: entry["version"] = plugin["version"] entries.append(entry) return { "name": MARKETPLACE_NAME, "owner": MARKETPLACE_OWNER, "metadata": { "description": MARKETPLACE_DESCRIPTION, "version": etag, }, "plugins": entries, } def build_info(conn: duckdb.DuckDBPyConnection, user: dict) -> Dict[str, Any]: """Return a JSON-serializable summary for diagnostic / admin endpoints. Mirrors the PoC's /marketplace/info contract; v24 splits the plugin list by ``source`` so operators can tell at a glance whether a user's marketplace view is admin-curated, Store-installed, or both. """ plugins = marketplace_filter.resolve_user_marketplace(conn, user) etag = marketplace_filter.compute_etag(plugins) def _entry(p: dict) -> Dict[str, Any]: return { "name": p["manifest_name"], "original_name": p["original_name"], "prefixed_name": p["prefixed_name"], "marketplace_slug": p["marketplace_slug"], "version": p.get("version"), "description": p["raw"].get("description"), "source": p.get("source", "marketplace"), } return { "user_id": user.get("id"), "email": user.get("email"), "groups": marketplace_filter.resolve_user_groups(conn, user), "marketplace_name": MARKETPLACE_NAME, "etag": etag, "plugin_count": len(plugins), "plugins": [_entry(p) for p in plugins if p.get("source") != "store"], "store_plugins": [_entry(p) for p in plugins if p.get("source") == "store"], "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), } def _collect_members(plugins: List[dict], etag: str) -> List[Tuple[str, bytes]]: """Collect (arcname, bytes) pairs for everything that goes into the ZIP. Intentionally returns unsorted — caller sorts for deterministic order. Bundle entries (``bundle_dirs`` set, ``plugin_dir`` is None) get a synth ``.claude-plugin/plugin.json`` and content merged from every source dir minus each source's own ``.claude-plugin/`` (the bundle ships its own). """ members: List[Tuple[str, bytes]] = [] manifest = _merged_manifest(plugins, etag) members.append( ( ".claude-plugin/marketplace.json", json.dumps(manifest, indent=2, sort_keys=False).encode("utf-8"), ) ) for plugin in plugins: prefix = plugin["prefixed_name"] if plugin.get("bundle_dirs"): members.append( ( f"plugins/{prefix}/.claude-plugin/plugin.json", _bundle_plugin_json_bytes(plugin), ) ) from src.marketplace_filter import _bundle_files for rel, abs_path in _bundle_files(plugin["bundle_dirs"]): members.append( (f"plugins/{prefix}/{rel}", abs_path.read_bytes()) ) continue plugin_dir = plugin["plugin_dir"] if plugin_dir is None or not plugin_dir.is_dir(): continue for f in sorted(p for p in plugin_dir.rglob("*") if p.is_file()): rel_parts = f.relative_to(plugin_dir).parts # v32: strip Agnes-only files (`.agnes/**` and `agnes-metadata.json`) # from the synth Claude Code marketplace so user instances never # see enrichment metadata they don't need. ETag is computed from # the same filtered set (compute_etag in marketplace_filter), so # adding/removing these files never busts user-side caches. if marketplace_filter.is_agnes_only_path(rel_parts): continue rel = f.relative_to(plugin_dir).as_posix() arc = f"plugins/{prefix}/{rel}" members.append((arc, f.read_bytes())) return members def _bundle_plugin_json_bytes(plugin: dict) -> bytes: """Synth plugin.json for a bundle entry — uses the same fields as the served marketplace.json plugin entry so Claude Code's catalog lookup matches the loaded plugin's identity.""" payload = { "name": plugin["manifest_name"], "version": plugin.get("version") or "", "description": plugin["raw"].get("description") or "", } return json.dumps(payload, indent=2).encode("utf-8") def _write_zip_entry(zf: zipfile.ZipFile, arcname: str, data: bytes) -> None: info = zipfile.ZipInfo(filename=arcname, date_time=DETERMINISTIC_TIMESTAMP) info.compress_type = zipfile.ZIP_DEFLATED info.external_attr = 0o644 << 16 zf.writestr(info, data) def _etag_cache_key(plugins: List[dict]) -> tuple: return tuple( sorted( (p["prefixed_name"], p.get("version") or "", str(p["plugin_dir"])) for p in plugins ) ) def compute_etag_for_user( conn: duckdb.DuckDBPyConnection, user: dict ) -> Tuple[str, List[dict]]: """Resolve the user's served plugin set (admin grants minus opt-outs, plus Store installs) and compute its content-addressed ETag. Returns (etag, plugins) so callers that proceed to build_zip can reuse the resolved plugin set and skip the second DB query. """ plugins = marketplace_filter.resolve_user_marketplace(conn, user) if _ETAG_CACHE is None: return marketplace_filter.compute_etag(plugins), plugins cache_key = _etag_cache_key(plugins) with _ETAG_CACHE_LOCK: cached = _ETAG_CACHE.get(cache_key) if cached is not None: return cached, plugins etag = marketplace_filter.compute_etag(plugins) with _ETAG_CACHE_LOCK: _ETAG_CACHE[cache_key] = etag return etag, plugins def invalidate_etag_cache() -> None: """Drop all cached etags. Called by marketplace sync after refresh so the next request re-hashes against the new on-disk content instead of waiting for TTL expiry.""" if _ETAG_CACHE is None: return with _ETAG_CACHE_LOCK: _ETAG_CACHE.clear() def build_zip( conn: duckdb.DuckDBPyConnection, user: dict, *, plugins: Optional[List[dict]] = None, etag: Optional[str] = None, ) -> Tuple[bytes, str]: """Build the deterministic ZIP for this user. Returns (bytes, etag). The `.agnes/version.json` entry carries `generated_at` for diagnostics and therefore makes the ZIP non-byte-identical on every request. That's fine for the ZIP channel (the ETag gate is computed from content hashes *before* that file is added). The git channel uses file_set_for_user() instead, which deliberately omits this diagnostic file. Callers that already resolved plugins + etag (e.g. the router after an If-None-Match miss) pass them as kwargs so we don't redo the work. """ if plugins is None or etag is None: etag, plugins = compute_etag_for_user(conn, user) members = _collect_members(plugins, etag) version_payload = { "user_id": user.get("id"), "email": user.get("email"), "groups": marketplace_filter.resolve_user_groups(conn, user), "marketplace_name": MARKETPLACE_NAME, "etag": etag, "plugin_count": len(plugins), "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), } members.append( ( ".agnes/version.json", json.dumps(version_payload, indent=2, sort_keys=True).encode("utf-8"), ) ) members.sort(key=lambda m: m[0]) buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: for arc, data in members: _write_zip_entry(zf, arc, data) return buf.getvalue(), etag