diff --git a/CHANGELOG.md b/CHANGELOG.md index 61e7143..1eef9f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,11 +45,32 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C `GET /api/store/categories`, `GET /api/store/owners`, `GET /api/my-stack`, `PUT /api/my-stack/curated/{marketplace_id}/{plugin_name}`. -- **CLI: `agnes store {list,show,install,uninstall,upload,delete}`** and +- **CLI: `agnes store {list,show,install,uninstall,upload,update,delete,pull,info}`** and **`agnes my-stack {show,toggle}`** — full analyst-side coverage of the new Store + composition REST surface. Multipart upload helper added to - `cli/v2_client.py` (`api_post_multipart` / `api_put_multipart`) so - future multipart endpoints don't have to roll their own httpx wiring. + `cli/v2_client.py` (`api_post_multipart` / `api_put_multipart` / + `api_get_stream`) so future multipart and binary-download endpoints + don't have to roll their own httpx wiring. +- **CLI: `agnes admin store push`** — admin-only Store bulk restore. + Wraps `POST /api/store/import-bundle` with mode=merge|replace|skip and + client-side zipping when the source is a directory (so a backup git + repo's working tree can go straight back into Agnes via a single + command). +- **REST: `GET /api/store/bundle.zip`** — deterministic ZIP of all + (filtered) Store entities for whole-Store backup. Layout: + `manifest.json` at the top with per-entity metadata + `owner_email` + for cross-instance restore, then `entities//{plugin,assets}/`. + Auth: any authenticated user (Store is community-open, the same set + is already visible via `GET /api/store/entities`). Filters mirror the + listing endpoint (type / category / owner / search). +- **REST: `POST /api/store/import-bundle`** — admin-only restore of a + bundle ZIP. Modes: `merge` (default — upsert by `entity_id`, replace + when version differs), `replace` (overwrite all matching), `skip` + (only insert new). Owner resolution by `owner_email` against + `users.email`; missing emails get a stub disabled user + (`active=False`, no password, id `imported-`) so the + historical owner stays attached and an admin can later activate or + reassign in `/admin/users`. Audit-logged with the full counts. ### Changed - `/admin/marketplaces` admin nav entry moved from the top-level header into diff --git a/app/api/store.py b/app/api/store.py index aacae2b..837243d 100644 --- a/app/api/store.py +++ b/app/api/store.py @@ -16,6 +16,7 @@ display name don't collide in Claude Code's flat namespace. from __future__ import annotations +import io import json import logging import os @@ -39,15 +40,16 @@ from fastapi import ( Query, UploadFile, ) -from fastapi.responses import FileResponse +from fastapi.responses import FileResponse, Response from pydantic import BaseModel -from app.auth.access import is_user_admin +from app.auth.access import is_user_admin, require_admin from app.auth.dependencies import _get_db, get_current_user from app.utils import get_store_dir from src.repositories.audit import AuditRepository from src.repositories.store_entities import StoreEntitiesRepository from src.repositories.user_store_installs import UserStoreInstallsRepository +from src.repositories.users import UserRepository from src.store_categories import STORE_CATEGORIES, is_valid_category from src.store_naming import ( compute_entity_version, @@ -1061,6 +1063,421 @@ async def uninstall_entity( return InstallResponse(entity_id=entity_id, installed=False) +# --------------------------------------------------------------------------- +# Bundle: GET /api/store/bundle.zip + POST /api/store/import-bundle +# --------------------------------------------------------------------------- +# +# Whole-Store backup/restore primitive. Operationally consumed by the +# `agnes admin store {pull,push}` CLI commands which back up the Store to a +# git repo (or restore from one). Bundle format: +# +# agnes-store-bundle.zip +# ├── manifest.json ← {"format":1,"generated_at":..., "entries":[...]} +# └── entities// +# ├── plugin/... ← canonical Claude Code plugin tree +# └── assets/... ← photo + docs +# +# Each manifest entry carries `owner_email` (resolved at export time from the +# users table) — when `import-bundle` lands on a different Agnes instance, +# the importer matches by email rather than by `owner_user_id` (the latter +# is per-instance and won't match). If the email is unknown on the target, +# we create a stub user (active=False, password_hash=NULL) so the historical +# owner is preserved; an admin can later activate or reassign. +# +# Bundle ordering is deterministic (entries sorted by entity_id, files within +# each entity sorted by relpath, fixed mtime) so that diffs of two +# successive snapshots stay clean when committed to git. + +BUNDLE_FORMAT_VERSION = 1 +BUNDLE_DETERMINISTIC_TIMESTAMP = (1980, 1, 1, 0, 0, 0) + + +class BundleEntry(BaseModel): + entity_id: str + type: str + name: str + description: Optional[str] = None + category: Optional[str] = None + version: str + owner_user_id: str + owner_email: Optional[str] = None + owner_username: str + install_count: int = 0 + file_size: int = 0 + photo_path: Optional[str] = None + video_url: Optional[str] = None + doc_paths: List[str] = [] + created_at: Optional[str] = None + updated_at: Optional[str] = None + + +class BundleManifest(BaseModel): + format: int = BUNDLE_FORMAT_VERSION + generated_at: str + entry_count: int + entries: List[BundleEntry] + + +class ImportBundleResponse(BaseModel): + imported: int + replaced: int + skipped: int + stub_users_created: int + errors: List[dict] = [] + + +def _resolve_owner_emails( + conn: duckdb.DuckDBPyConnection, owner_ids: List[str] +) -> dict: + """Bulk-fetch user_id → email map for the given owners. + + Empty list short-circuits to {} so the caller doesn't need a guard. + Missing rows are simply absent from the returned dict — the caller + falls back to the row's stored ``owner_username`` for diagnostics. + """ + if not owner_ids: + return {} + placeholders = ",".join(["?"] * len(owner_ids)) + rows = conn.execute( + f"SELECT id, email FROM users WHERE id IN ({placeholders})", + list(owner_ids), + ).fetchall() + return {r[0]: r[1] for r in rows} + + +def _walk_entity_files(entity_id: str) -> List[tuple[str, Path]]: + """Return [(arcname, abs_path)] for every file under + ``${DATA_DIR}/store//`` that should land in the bundle. + + Both ``plugin/`` and ``assets/`` subtrees are included. Output is + sorted by arcname so the resulting ZIP is byte-deterministic. + """ + out: list[tuple[str, Path]] = [] + root = _entity_dir(entity_id) + if not root.is_dir(): + return out + for f in sorted(p for p in root.rglob("*") if p.is_file()): + rel = f.relative_to(root).as_posix() + # Only ship plugin/ and assets/ subtrees — anything else under the + # entity dir is internal scratch and shouldn't enter the bundle. + first = rel.split("/", 1)[0] if "/" in rel else rel + if first not in ("plugin", "assets"): + continue + arc = f"entities/{entity_id}/{rel}" + out.append((arc, f)) + return sorted(out, key=lambda t: t[0]) + + +def _build_bundle_zip( + conn: duckdb.DuckDBPyConnection, + entries: List[dict], +) -> bytes: + """Build the deterministic ZIP from a list of store_entities rows. + + Entries arrive already filtered (per the caller's query). We resolve + owner_email in one bulk roundtrip to keep the export path off the + O(N) per-row query path. + """ + owner_emails = _resolve_owner_emails( + conn, list({e["owner_user_id"] for e in entries}) + ) + bundle_entries: List[dict] = [] + for e in sorted(entries, key=lambda r: r["id"]): + bundle_entries.append( + { + "entity_id": e["id"], + "type": e["type"], + "name": e["name"], + "description": e.get("description"), + "category": e.get("category"), + "version": e["version"], + "owner_user_id": e["owner_user_id"], + "owner_email": owner_emails.get(e["owner_user_id"]), + "owner_username": e["owner_username"], + "install_count": int(e.get("install_count") or 0), + "file_size": int(e.get("file_size") or 0), + "photo_path": e.get("photo_path"), + "video_url": e.get("video_url"), + "doc_paths": e.get("doc_paths") or [], + "created_at": _to_iso(e.get("created_at")), + "updated_at": _to_iso(e.get("updated_at")), + } + ) + + manifest = { + "format": BUNDLE_FORMAT_VERSION, + "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "entry_count": len(bundle_entries), + "entries": bundle_entries, + } + + members: list[tuple[str, bytes]] = [ + ("manifest.json", json.dumps(manifest, indent=2, sort_keys=False).encode("utf-8")) + ] + for entry in bundle_entries: + for arc, abs_path in _walk_entity_files(entry["entity_id"]): + members.append((arc, abs_path.read_bytes())) + members.sort(key=lambda m: m[0]) + + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for arc, data in members: + info = zipfile.ZipInfo(filename=arc, date_time=BUNDLE_DETERMINISTIC_TIMESTAMP) + info.compress_type = zipfile.ZIP_DEFLATED + info.external_attr = 0o644 << 16 + zf.writestr(info, data) + return buf.getvalue() + + +@router.get("/bundle.zip") +async def export_bundle( + type: Optional[str] = Query(None, description="skill | agent | plugin"), + category: Optional[str] = Query(None), + search: Optional[str] = Query(None), + owner: Optional[str] = Query(None, description="Filter by owner user_id"), + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Stream a ZIP of all (filtered) Store entities. + + Auth: any authenticated user — the Store is community-open, the same + set is already visible via ``GET /api/store/entities``. The bundle is + deterministic so two consecutive pulls without state changes produce + byte-identical ZIPs (modulo the manifest's ``generated_at`` timestamp). + Filters mirror the listing endpoint so a backup workflow can scope by + type/owner if needed. + """ + if type and type not in _VALID_TYPES: + raise HTTPException(status_code=400, detail="invalid_type") + repo = StoreEntitiesRepository(conn) + # Page through everything. The 100/req limit on `list` is a UI + # pagination affordance, not a backup constraint — for a bulk export + # we want all matches. + items: list[dict] = [] + skip = 0 + page = 200 + while True: + page_items, _total = repo.list( + skip=skip, limit=page, type=type, category=category, + search=search, owner_user_id=owner, + ) + if not page_items: + break + items.extend(page_items) + if len(page_items) < page: + break + skip += page + + payload = _build_bundle_zip(conn, items) + return Response( + content=payload, + media_type="application/zip", + headers={ + "Content-Disposition": 'attachment; filename="agnes-store-bundle.zip"', + "X-Bundle-Entry-Count": str(len(items)), + }, + ) + + +def _import_one_entry( + conn: duckdb.DuckDBPyConnection, + entry: dict, + extract_root: Path, + *, + mode: str, + actor_user_id: str, +) -> tuple[str, int]: + """Apply a single manifest entry. Returns ``(outcome, stub_users_created)`` + where outcome is one of ``imported``, ``replaced``, ``skipped``. + + Owner resolution: we match the bundle's ``owner_email`` against + ``users.email``. Missing → create a stub (active=False, no password) + so the historical owner stays attached; an admin can activate or + reassign in /admin/users. The stub gets ``id = "imported-" + + sha256(email)[:12]`` to make it idempotent across repeated imports. + """ + entity_id = entry["entity_id"] + repo = StoreEntitiesRepository(conn) + existing = repo.get(entity_id) + + if existing: + if mode == "skip": + return ("skipped", 0) + if mode == "merge": + # Keep newer version (content-hash). If equal, skip. + if (existing.get("version") or "") == (entry.get("version") or ""): + return ("skipped", 0) + # mode='replace' OR mode='merge' with newer version → fall through. + + # Resolve owner. + user_repo = UserRepository(conn) + owner_email = (entry.get("owner_email") or "").strip().lower() + stub_created = 0 + owner_user_id: Optional[str] = None + if owner_email: + existing_user = user_repo.get_by_email(owner_email) + if existing_user: + owner_user_id = existing_user["id"] + else: + import hashlib as _hl + stub_id = "imported-" + _hl.sha256(owner_email.encode("utf-8")).hexdigest()[:12] + if not user_repo.get_by_id(stub_id): + user_repo.create( + id=stub_id, email=owner_email, name=owner_email, + password_hash=None, + ) + user_repo.update(stub_id, active=False) + stub_created = 1 + owner_user_id = stub_id + if owner_user_id is None: + # Fallback: use the importer (admin) so the row has a valid owner. + owner_user_id = actor_user_id + + # Materialize files. + src_dir = extract_root / "entities" / entity_id + if not src_dir.is_dir(): + raise HTTPException( + status_code=422, + detail=f"manifest entry {entity_id!r} has no entities// directory in the bundle", + ) + target_dir = _entity_dir(entity_id) + if existing and target_dir.exists(): + shutil.rmtree(target_dir) + target_dir.mkdir(parents=True, exist_ok=True) + for f in src_dir.rglob("*"): + if not f.is_file(): + continue + rel = f.relative_to(src_dir) + dest = target_dir / rel + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(f, dest) + + # Upsert DB row. + if existing: + repo.update( + entity_id, + description=entry.get("description"), + category=entry.get("category"), + version=entry["version"], + photo_path=entry.get("photo_path"), + video_url=entry.get("video_url"), + doc_paths=entry.get("doc_paths") or [], + file_size=int(entry.get("file_size") or 0), + ) + return ("replaced", stub_created) + + repo.create( + id=entity_id, + owner_user_id=owner_user_id, + owner_username=entry.get("owner_username") or owner_email.split("@")[0], + type=entry["type"], + name=entry["name"], + description=entry.get("description"), + category=entry.get("category"), + version=entry["version"], + photo_path=entry.get("photo_path"), + video_url=entry.get("video_url"), + doc_paths=entry.get("doc_paths") or [], + file_size=int(entry.get("file_size") or 0), + ) + return ("imported", stub_created) + + +@router.post("/import-bundle", response_model=ImportBundleResponse) +async def import_bundle( + file: UploadFile = File(...), + mode: str = Form("merge"), + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Restore a Store bundle ZIP — admin only. + + Modes: + * ``merge`` (default) — upsert by ``entity_id``; existing entities + are replaced when the bundle's ``version`` differs, otherwise + skipped. Safe default for nightly cron round-trips. + * ``replace`` — every entity in the bundle overwrites the existing + row + on-disk tree. Bundle-not-in-target rows are NOT deleted. + * ``skip`` — only entities NOT already present are imported. + + Owner resolution by ``owner_email``; missing emails get a stub + disabled user so the row references an existing ``users.id`` (no + foreign key, but app code joins). + """ + if mode not in {"merge", "replace", "skip"}: + raise HTTPException(status_code=400, detail="invalid_mode") + + tmp, _ = await _stream_to_temp(file, MAX_ZIP_SIZE * 4, suffix=".zip") + tmp.close() + extract_root = Path(tempfile.mkdtemp(prefix="agnes_store_import_")) + try: + try: + with zipfile.ZipFile(tmp.name, "r") as zf: + _safe_zip_extract(zf, extract_root) + except zipfile.BadZipFile: + raise HTTPException(status_code=422, detail="zip_invalid") + finally: + Path(tmp.name).unlink(missing_ok=True) + + manifest_path = extract_root / "manifest.json" + if not manifest_path.is_file(): + raise HTTPException(status_code=422, detail="manifest_missing") + try: + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + except (OSError, ValueError): + raise HTTPException(status_code=422, detail="manifest_invalid") + if not isinstance(manifest, dict) or manifest.get("format") != BUNDLE_FORMAT_VERSION: + raise HTTPException( + status_code=422, + detail=f"manifest_unsupported_format (expected {BUNDLE_FORMAT_VERSION})", + ) + entries = manifest.get("entries") or [] + if not isinstance(entries, list): + raise HTTPException(status_code=422, detail="manifest_entries_invalid") + + imported = replaced = skipped = stubs = 0 + errors: list[dict] = [] + for entry in entries: + if not isinstance(entry, dict) or not entry.get("entity_id"): + errors.append({"entry": entry, "error": "entry_missing_id"}) + continue + try: + outcome, sc = _import_one_entry( + conn, entry, extract_root, mode=mode, actor_user_id=user["id"], + ) + except HTTPException: + raise + except Exception as exc: + errors.append({"entity_id": entry.get("entity_id"), "error": str(exc)}) + continue + stubs += sc + if outcome == "imported": + imported += 1 + elif outcome == "replaced": + replaced += 1 + elif outcome == "skipped": + skipped += 1 + + _audit( + conn, user["id"], "store.bundle.import", "bundle", + { + "mode": mode, + "imported": imported, + "replaced": replaced, + "skipped": skipped, + "stub_users_created": stubs, + "errors": len(errors), + }, + ) + _invalidate_etag() + return ImportBundleResponse( + imported=imported, replaced=replaced, skipped=skipped, + stub_users_created=stubs, errors=errors, + ) + finally: + shutil.rmtree(extract_root, ignore_errors=True) + + # --------------------------------------------------------------------------- # Photo upload helper # --------------------------------------------------------------------------- diff --git a/cli/commands/admin.py b/cli/commands/admin.py index 7534524..4fed5bc 100644 --- a/cli/commands/admin.py +++ b/cli/commands/admin.py @@ -6,10 +6,12 @@ import typer from cli.client import api_get, api_post, api_delete, api_patch from cli.commands.admin_metrics import admin_metrics_app +from cli.commands.admin_store import admin_store_app from cli.commands.memory_admin import memory_admin_app admin_app = typer.Typer(help="Admin operations (requires admin role)") admin_app.add_typer(admin_metrics_app, name="metrics") +admin_app.add_typer(admin_store_app, name="store") admin_app.add_typer(memory_admin_app, name="memory") diff --git a/cli/commands/admin_store.py b/cli/commands/admin_store.py new file mode 100644 index 0000000..6bb2ddd --- /dev/null +++ b/cli/commands/admin_store.py @@ -0,0 +1,104 @@ +"""`agnes admin store push` — admin-only Store bulk restore. + +Wraps ``POST /api/store/import-bundle`` (admin-gated). Read paths +(``pull`` / ``info``) live under user-namespace ``agnes store`` because the +server endpoint for the export is open to any authenticated user (the +Store is community-readable). +""" + +from __future__ import annotations + +import json +import shutil +import tempfile +import zipfile +from pathlib import Path +from typing import Optional + +import typer + +from cli.v2_client import V2ClientError, api_post_multipart + +admin_store_app = typer.Typer(help="Admin: Store bulk restore (push)") + + +@admin_store_app.command("push") +def push_bundle( + source: Path = typer.Argument( + ..., exists=True, readable=True, + help="Bundle to upload — either a *.zip file or a directory " + "containing manifest.json + entities/. A directory is " + "zipped client-side before upload.", + ), + mode: str = typer.Option( + "merge", "--mode", + help="merge (default — upsert by entity_id; replace when version " + "differs) | replace (overwrite every existing row in the " + "bundle) | skip (insert only entities not already present)", + ), + yes: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation"), +): + """Upload a Store bundle ZIP for bulk restore. Admin only.""" + if mode not in {"merge", "replace", "skip"}: + typer.echo(f"--mode must be merge|replace|skip, got {mode!r}", err=True) + raise typer.Exit(2) + + # If source is a directory, zip it client-side. The expected layout is + # the same as `agnes store pull --unpack` produces: manifest.json at + # the top, entities// subtrees. + cleanup: Optional[Path] = None + try: + if source.is_dir(): + if not (source / "manifest.json").is_file(): + typer.echo( + f"{source} does not contain manifest.json — is this a Store bundle directory?", + err=True, + ) + raise typer.Exit(2) + scratch = Path(tempfile.mkdtemp(prefix="agnes_store_push_")) + cleanup = scratch + zip_path = scratch / "bundle.zip" + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for f in sorted(p for p in source.rglob("*") if p.is_file()): + rel = f.relative_to(source).as_posix() + zf.write(f, arcname=rel) + zip_to_send = zip_path + else: + zip_to_send = source + + if not yes: + confirm = typer.confirm( + f"Upload bundle from {source} with mode={mode}? " + f"This may modify existing Store entities." + ) + if not confirm: + raise typer.Abort() + + files = { + "file": (zip_to_send.name, zip_to_send.read_bytes(), "application/zip"), + } + try: + body = api_post_multipart( + "/api/store/import-bundle", + files=files, data={"mode": mode}, + ) + except V2ClientError as e: + typer.echo(str(e), err=True) + raise typer.Exit(1) + typer.echo( + f"imported={body.get('imported', 0)} " + f"replaced={body.get('replaced', 0)} " + f"skipped={body.get('skipped', 0)} " + f"stub_users_created={body.get('stub_users_created', 0)}" + ) + errs = body.get("errors") or [] + if errs: + typer.echo(f"\n{len(errs)} entries had errors:", err=True) + for e in errs[:10]: + typer.echo(f" - {json.dumps(e)}", err=True) + if len(errs) > 10: + typer.echo(f" ... and {len(errs) - 10} more", err=True) + raise typer.Exit(1) + finally: + if cleanup is not None: + shutil.rmtree(cleanup, ignore_errors=True) diff --git a/cli/commands/store.py b/cli/commands/store.py index 322d760..95a4654 100644 --- a/cli/commands/store.py +++ b/cli/commands/store.py @@ -19,8 +19,10 @@ from cli.v2_client import ( V2ClientError, api_delete, api_get_json, + api_get_stream, api_post_json, api_post_multipart, + api_put_multipart, ) store_app = typer.Typer(help="Community Store — browse, install, upload skills/agents/plugins") @@ -159,3 +161,182 @@ def delete_entity( typer.echo(str(e), err=True) raise typer.Exit(1) typer.echo(f"Deleted: {entity_id}") + + +@store_app.command("update") +def update_entity( + entity_id: str = typer.Argument(...), + description: Optional[str] = typer.Option(None, "--description"), + category: Optional[str] = typer.Option(None, "--category"), + video_url: Optional[str] = typer.Option(None, "--video-url"), + photo: Optional[Path] = typer.Option( + None, "--photo", exists=True, dir_okay=False, readable=True, + help="Replace the entity's photo with this image file", + ), + zip_path: Optional[Path] = typer.Option( + None, "--zip", exists=True, dir_okay=False, readable=True, + help="Replace the plugin tree with this new ZIP", + ), +): + """In-place edit a Store entity. Owner or admin only. + + Server-side authorization (PUT /api/store/entities/{id}) admits the + owner OR any member of the Admin group; CLI doesn't enforce, the + server does. Pass any combination of --description / --category / + --video-url / --photo / --zip; omitted fields are left untouched + (note: an empty string clears nothing — there's no API affordance to + clear a field back to NULL via PUT today). + """ + files: dict = {} + data: dict = {} + if zip_path: + files["file"] = (zip_path.name, zip_path.read_bytes(), "application/zip") + if photo: + files["photo"] = (photo.name, photo.read_bytes(), f"image/{photo.suffix.lstrip('.')}") + if description is not None: + data["description"] = description + if category is not None: + data["category"] = category + if video_url is not None: + data["video_url"] = video_url + if not files and not data: + typer.echo("Nothing to update — pass at least one of --description / --category / --video-url / --photo / --zip.", err=True) + raise typer.Exit(2) + try: + body = api_put_multipart( + f"/api/store/entities/{entity_id}", + files=files or None, data=data, + ) + except V2ClientError as e: + typer.echo(str(e), err=True) + raise typer.Exit(1) + typer.echo( + f"Updated: id={body['id']} version={body['version']}" + ) + + +# --------------------------------------------------------------------------- +# Bundle: pull + info (read paths, any authenticated user). +# Bulk restore (push) lives under `agnes admin store push` because the +# server-side endpoint is admin-only. +# --------------------------------------------------------------------------- + + +@store_app.command("pull") +def pull_bundle( + type: Optional[str] = typer.Option(None, "--type", help="skill | agent | plugin"), + category: Optional[str] = typer.Option(None, "--category"), + owner: Optional[str] = typer.Option(None, "--owner", help="Filter by owner user_id"), + search: Optional[str] = typer.Option(None, "--search", "-q"), + out: Path = typer.Option( + Path("agnes-store-bundle.zip"), "-o", "--out", + help="Where to save the ZIP (default: ./agnes-store-bundle.zip)", + ), + unpack: Optional[Path] = typer.Option( + None, "--unpack", + help="Instead of saving the ZIP, unpack it into this directory. " + "Useful for committing a snapshot to a backup git repo: " + "`agnes store pull --unpack ./backup/ && cd backup && git add .`", + ), +): + """Download the whole Store as a deterministic ZIP. + + With ``--unpack DIR`` the ZIP is streamed and immediately extracted + into ``DIR`` (the directory is wiped first so re-runs leave a clean + diff). The bundle layout:: + + manifest.json + entities// + ├── plugin/... + └── assets/... + + Every entity matching the given filters is included; no filters = + everything in the Store. + """ + import shutil as _shutil + import tempfile as _tempfile + import zipfile as _zipfile + + params: dict = {} + if type: + params["type"] = type + if category: + params["category"] = category + if owner: + params["owner"] = owner + if search: + params["search"] = search + + if unpack: + # Stream into a temp file, then unpack into `unpack` (wiped first). + scratch = Path(_tempfile.mkdtemp(prefix="agnes_store_pull_")) + zip_path = scratch / "bundle.zip" + try: + try: + api_get_stream("/api/store/bundle.zip", str(zip_path), **params) + except V2ClientError as e: + typer.echo(str(e), err=True) + raise typer.Exit(1) + if unpack.exists(): + _shutil.rmtree(unpack) + unpack.mkdir(parents=True, exist_ok=True) + with _zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(unpack) + finally: + _shutil.rmtree(scratch, ignore_errors=True) + typer.echo(f"Unpacked Store bundle → {unpack}") + return + + out.parent.mkdir(parents=True, exist_ok=True) + try: + size = api_get_stream("/api/store/bundle.zip", str(out), **params) + except V2ClientError as e: + typer.echo(str(e), err=True) + raise typer.Exit(1) + typer.echo(f"Wrote {size:,} bytes → {out}") + + +@store_app.command("info") +def store_info( + json_out: bool = typer.Option(False, "--json"), +): + """Summary of the Store: total entities, breakdown by type, total size. + + No new endpoint — assembled client-side from a paginated /entities + sweep so it stays in sync with what `pull` would emit. + """ + skip = 0 + page = 100 + by_type: dict = {} + total_entities = 0 + total_size = 0 + while True: + try: + body = api_get_json( + "/api/store/entities", limit=page, skip=skip, + ) + except V2ClientError as e: + typer.echo(str(e), err=True) + raise typer.Exit(1) + items = body.get("items", []) + if not items: + break + for it in items: + total_entities += 1 + total_size += int(it.get("file_size") or 0) + by_type[it["type"]] = by_type.get(it["type"], 0) + 1 + if len(items) < page: + break + skip += page + + summary = { + "total_entities": total_entities, + "total_file_size_bytes": total_size, + "by_type": by_type, + } + if json_out: + typer.echo(json.dumps(summary, indent=2)) + return + typer.echo(f"Store: {total_entities} entit, {total_size:,} bytes total") + for t in sorted(by_type): + typer.echo(f" {t:8s} {by_type[t]}") diff --git a/cli/v2_client.py b/cli/v2_client.py index 0e8a374..9a947be 100644 --- a/cli/v2_client.py +++ b/cli/v2_client.py @@ -119,6 +119,39 @@ def api_put_multipart( return r.json() +def api_get_stream(path: str, dest: "io.IOBase | str", **params) -> int: + """Stream a binary response (e.g. /bundle.zip) into ``dest``. + + ``dest`` is either a writable binary file-like or a filesystem path. + Returns the byte count written. Raises V2ClientError on non-2xx with + the parsed error body. + """ + import io as _io + url = f"{get_server_url().rstrip('/')}{path}" + with httpx.stream( + "GET", url, headers=_headers(), params=params or None, timeout=600, + ) as r: + if r.status_code >= 400: + # Read the (likely small) error body before raising. + body = b"".join(r.iter_bytes()) + try: + parsed = httpx.Response(r.status_code, content=body, headers=r.headers) + raise V2ClientError(status_code=r.status_code, body=_parse_error_body(parsed)) + except V2ClientError: + raise + owns = isinstance(dest, str) + fh = open(dest, "wb") if owns else dest + total = 0 + try: + for chunk in r.iter_bytes(): + fh.write(chunk) + total += len(chunk) + finally: + if owns: + fh.close() + return total + + def api_post_arrow(path: str, payload: dict) -> pa.Table: """Post JSON, expect Arrow IPC stream response.""" url = f"{get_server_url().rstrip('/')}{path}" diff --git a/tests/test_cli_store.py b/tests/test_cli_store.py index 846bc07..d2773d8 100644 --- a/tests/test_cli_store.py +++ b/tests/test_cli_store.py @@ -187,3 +187,212 @@ def test_my_stack_toggle_writes_put(monkeypatch): assert captured["path"] == "/api/my-stack/curated/official/alpha" assert captured["payload"] == {"enabled": False} assert "DISABLED" in _clean(r.output) + + +# --------------------------------------------------------------------------- +# `agnes store update` +# --------------------------------------------------------------------------- + + +def test_store_update_help_lists_options(): + r = runner.invoke(store_app, ["update", "--help"]) + assert r.exit_code == 0 + out = _clean(r.output) + for opt in ("--description", "--category", "--video-url", "--photo", "--zip"): + assert opt in out + + +def test_store_update_no_fields_exit_2(): + r = runner.invoke(store_app, ["update", "abc123"]) + assert r.exit_code == 2 + assert "Nothing to update" in _clean(r.output) + + +def test_store_update_sends_put_multipart(monkeypatch): + captured: dict = {} + + def _put(path, *, files, data): + captured["path"] = path + captured["files"] = files + captured["data"] = data + return {"id": "abc", "version": "newhash01234567"} + + import cli.commands.store as store_mod + monkeypatch.setattr(store_mod, "api_put_multipart", _put) + + r = runner.invoke(store_app, ["update", "abc", "--description", "new desc"]) + assert r.exit_code == 0, r.output + assert captured["path"] == "/api/store/entities/abc" + assert captured["data"] == {"description": "new desc"} + assert captured["files"] is None + assert "Updated" in _clean(r.output) + + +# --------------------------------------------------------------------------- +# `agnes store pull` / `agnes store info` +# --------------------------------------------------------------------------- + + +def test_store_pull_writes_zip(monkeypatch, tmp_path): + captured: dict = {} + + def _stream(path, dest, **params): + captured["path"] = path + captured["params"] = params + captured["dest"] = dest + # Write a placeholder so the size message looks plausible. + with open(dest, "wb") as f: + f.write(b"PK\x03\x04fakezip") + return 9 + + import cli.commands.store as store_mod + monkeypatch.setattr(store_mod, "api_get_stream", _stream) + + out = tmp_path / "store.zip" + r = runner.invoke(store_app, ["pull", "-o", str(out)]) + assert r.exit_code == 0, r.output + assert captured["path"] == "/api/store/bundle.zip" + assert "Wrote 9 bytes" in _clean(r.output) + assert out.exists() + + +def test_store_pull_unpack(monkeypatch, tmp_path): + """`--unpack DIR` streams to a temp ZIP and extracts into DIR.""" + import zipfile + + # Build a fake bundle in-memory and write it as the streamed payload. + fake_zip_path = tmp_path / "_fake.zip" + with zipfile.ZipFile(fake_zip_path, "w") as zf: + zf.writestr("manifest.json", '{"format":1,"entries":[]}') + zf.writestr("entities/abc/plugin/.claude-plugin/plugin.json", '{}') + + def _stream(path, dest, **params): + # Copy fake zip bytes into the streamed dest. + from pathlib import Path as _P + with open(dest, "wb") as fh: + fh.write(_P(fake_zip_path).read_bytes()) + return _P(dest).stat().st_size + + import cli.commands.store as store_mod + monkeypatch.setattr(store_mod, "api_get_stream", _stream) + + target = tmp_path / "unpacked" + r = runner.invoke(store_app, ["pull", "--unpack", str(target)]) + assert r.exit_code == 0, r.output + assert (target / "manifest.json").is_file() + assert (target / "entities/abc/plugin/.claude-plugin/plugin.json").is_file() + + +def test_store_info_summarizes(monkeypatch): + page1 = { + "items": [ + {"type": "skill", "file_size": 1024}, + {"type": "skill", "file_size": 512}, + {"type": "agent", "file_size": 256}, + ], + "total": 3, "skip": 0, "limit": 100, + } + empty = {"items": [], "total": 3, "skip": 100, "limit": 100} + pages = [page1, empty] + + def _get(path, **params): + return pages.pop(0) + + import cli.commands.store as store_mod + monkeypatch.setattr(store_mod, "api_get_json", _get) + + r = runner.invoke(store_app, ["info"]) + assert r.exit_code == 0, r.output + out = _clean(r.output) + assert "3 entit" in out + assert "skill" in out and "2" in out + assert "agent" in out and "1" in out + + +def test_store_info_json(monkeypatch): + one = { + "items": [{"type": "plugin", "file_size": 999}], + "total": 1, "skip": 0, "limit": 100, + } + pages = [one, {"items": [], "total": 1, "skip": 100, "limit": 100}] + import cli.commands.store as store_mod + monkeypatch.setattr(store_mod, "api_get_json", lambda *a, **kw: pages.pop(0)) + + r = runner.invoke(store_app, ["info", "--json"]) + assert r.exit_code == 0, r.output + import json as _json + body = _json.loads(_clean(r.output)) + assert body["total_entities"] == 1 + assert body["by_type"] == {"plugin": 1} + + +# --------------------------------------------------------------------------- +# `agnes admin store push` +# --------------------------------------------------------------------------- + + +def test_admin_store_push_help(): + from cli.commands.admin_store import admin_store_app + r = runner.invoke(admin_store_app, ["--help"]) + assert r.exit_code == 0 + assert "push" in _clean(r.output) + + +def test_admin_store_push_invalid_mode_exit_2(tmp_path): + """Single-command Typer app — invoke via parent so the `push` token + actually routes to the subcommand (otherwise Typer collapses the lone + command and treats `push` as the SOURCE positional).""" + from cli.commands.admin import admin_app + bundle = tmp_path / "x.zip" + bundle.write_bytes(b"PK\x03\x04") + r = runner.invoke(admin_app, ["store", "push", str(bundle), "--mode", "wat"]) + assert r.exit_code == 2 + assert "merge|replace|skip" in _clean(r.output) + + +def test_admin_store_push_zips_directory(monkeypatch, tmp_path): + """When source is a directory, CLI must zip it client-side and POST.""" + import zipfile as _zf + + captured: dict = {} + + def _post(path, *, files, data): + captured["path"] = path + captured["data"] = data + zip_bytes = files["file"][1] + with _zf.ZipFile(__import__("io").BytesIO(zip_bytes)) as zf: + captured["names"] = sorted(zf.namelist()) + return { + "imported": 1, "replaced": 0, "skipped": 0, + "stub_users_created": 0, "errors": [], + } + + from cli.commands import admin_store as admin_store_mod + from cli.commands.admin import admin_app + monkeypatch.setattr(admin_store_mod, "api_post_multipart", _post) + + bundle_dir = tmp_path / "bundle" + (bundle_dir / "entities" / "abc" / "plugin").mkdir(parents=True) + (bundle_dir / "manifest.json").write_text('{"format":1,"entries":[]}') + (bundle_dir / "entities" / "abc" / "plugin" / "marker.txt").write_text("x") + + r = runner.invoke( + admin_app, ["store", "push", str(bundle_dir), "--mode", "merge", "--yes"], + ) + assert r.exit_code == 0, r.output + assert captured["path"] == "/api/store/import-bundle" + assert captured["data"] == {"mode": "merge"} + assert "manifest.json" in captured["names"] + assert "entities/abc/plugin/marker.txt" in captured["names"] + assert "imported=1" in _clean(r.output) + + +def test_admin_store_push_directory_without_manifest_exit_2(tmp_path): + from cli.commands.admin import admin_app + empty_dir = tmp_path / "no_manifest" + empty_dir.mkdir() + r = runner.invoke( + admin_app, ["store", "push", str(empty_dir), "--yes"], + ) + assert r.exit_code == 2 + assert "manifest.json" in _clean(r.output) diff --git a/tests/test_store_api.py b/tests/test_store_api.py index 5ee720b..d8d7ae5 100644 --- a/tests/test_store_api.py +++ b/tests/test_store_api.py @@ -636,6 +636,216 @@ class TestStoreSecurityFixes: assert r.status_code == 201, r.text +class TestStoreBundle: + """GET /api/store/bundle.zip + POST /api/store/import-bundle.""" + + def _upload_skill(self, web_client, cookies, name="bundled-skill"): + return web_client.post( + "/api/store/entities", + files={"file": ("s.zip", _make_skill_zip(name), "application/zip")}, + data={"type": "skill"}, cookies=cookies, + ) + + def test_bundle_zip_contains_manifest_and_entity_tree(self, web_client): + _, cookies = _create_user(web_client, "owner-bundle@x.com") + r1 = self._upload_skill(web_client, cookies, name="bundle-a") + r2 = self._upload_skill(web_client, cookies, name="bundle-b") + eid_a, eid_b = r1.json()["id"], r2.json()["id"] + + bundle = web_client.get("/api/store/bundle.zip", cookies=cookies) + assert bundle.status_code == 200 + assert bundle.headers["content-type"] == "application/zip" + assert bundle.headers["x-bundle-entry-count"] == "2" + + with zipfile.ZipFile(io.BytesIO(bundle.content)) as zf: + names = set(zf.namelist()) + assert "manifest.json" in names + assert f"entities/{eid_a}/plugin/skills/bundle-a-by-owner-bundle/SKILL.md" in names + assert f"entities/{eid_b}/plugin/skills/bundle-b-by-owner-bundle/SKILL.md" in names + + manifest = json.loads(zf.read("manifest.json")) + assert manifest["format"] == 1 + assert manifest["entry_count"] == 2 + entries_by_id = {e["entity_id"]: e for e in manifest["entries"]} + assert entries_by_id[eid_a]["owner_email"] == "owner-bundle@x.com" + assert entries_by_id[eid_a]["name"] == "bundle-a" + + def test_bundle_zip_filters(self, web_client): + _, cookies = _create_user(web_client, "filter@x.com") + self._upload_skill(web_client, cookies, name="keep-this") + web_client.post( + "/api/store/entities", + files={"file": ("p.zip", _make_plugin_zip("filter-out"), "application/zip")}, + data={"type": "plugin"}, cookies=cookies, + ) + + only_skill = web_client.get( + "/api/store/bundle.zip?type=skill", cookies=cookies, + ) + assert only_skill.headers["x-bundle-entry-count"] == "1" + + def test_import_bundle_round_trip_preserves_entity(self, web_client, tmp_path): + from argon2 import PasswordHasher + from src.db import get_system_db + from src.repositories.users import UserRepository + from tests.helpers.auth import grant_admin + + # Source instance: create entity, pull bundle. + _, owner_cookies = _create_user(web_client, "src-owner@x.com") + r = self._upload_skill(web_client, owner_cookies, name="rt-skill") + eid = r.json()["id"] + bundle_bytes = web_client.get( + "/api/store/bundle.zip", cookies=owner_cookies, + ).content + + # Wipe Store DB rows + on-disk dir to simulate empty target. + conn = get_system_db() + conn.execute("DELETE FROM store_entities WHERE id = ?", [eid]) + import shutil as _shutil + _shutil.rmtree(tmp_path / "store" / eid, ignore_errors=True) + + # Promote a different user to admin and import. + ph = PasswordHasher() + UserRepository(conn).create( + id="adm-bundle", email="adm-bundle@x.com", name="adm", + password_hash=ph.hash("AdminPass1!"), + ) + grant_admin(conn, "adm-bundle") + admin_token = web_client.post( + "/auth/token", json={"email": "adm-bundle@x.com", "password": "AdminPass1!"} + ).json()["access_token"] + admin_cookies = {"access_token": admin_token} + + imp = web_client.post( + "/api/store/import-bundle", + files={"file": ("b.zip", bundle_bytes, "application/zip")}, + data={"mode": "merge"}, + cookies=admin_cookies, + ) + assert imp.status_code == 200, imp.text + body = imp.json() + assert body["imported"] == 1 + assert body["replaced"] == 0 + # Owner email matched existing user (src-owner@x.com), no stub needed. + assert body["stub_users_created"] == 0 + + # Entity should be present again. + r2 = web_client.get(f"/api/store/entities/{eid}", cookies=admin_cookies) + assert r2.status_code == 200 + assert r2.json()["name"] == "rt-skill" + assert (tmp_path / "store" / eid / "plugin" / "skills" / "rt-skill-by-src-owner" / "SKILL.md").is_file() + + def test_import_bundle_creates_stub_for_unknown_owner(self, web_client, tmp_path): + """When the bundle's owner_email is not in users table, server + creates a disabled stub so the entity row has a valid owner_user_id. + """ + from argon2 import PasswordHasher + from src.db import get_system_db + from src.repositories.users import UserRepository + from tests.helpers.auth import grant_admin + + _, owner_cookies = _create_user(web_client, "vanishing@x.com") + r = self._upload_skill(web_client, owner_cookies, name="orphan-skill") + eid = r.json()["id"] + bundle_bytes = web_client.get( + "/api/store/bundle.zip", cookies=owner_cookies, + ).content + + # Delete the owner + the entity (simulate fresh target instance). + conn = get_system_db() + conn.execute("DELETE FROM store_entities WHERE id = ?", [eid]) + # We can't easily delete users via repo (no method), so just rename + # so email lookup misses. Brute SQL. + conn.execute("UPDATE users SET email = 'gone@x.com' WHERE email = 'vanishing@x.com'") + import shutil as _shutil + _shutil.rmtree(tmp_path / "store" / eid, ignore_errors=True) + + ph = PasswordHasher() + UserRepository(conn).create( + id="adm-stub", email="adm-stub@x.com", name="adm", + password_hash=ph.hash("AdminPass1!"), + ) + grant_admin(conn, "adm-stub") + admin_token = web_client.post( + "/auth/token", json={"email": "adm-stub@x.com", "password": "AdminPass1!"} + ).json()["access_token"] + admin_cookies = {"access_token": admin_token} + + imp = web_client.post( + "/api/store/import-bundle", + files={"file": ("b.zip", bundle_bytes, "application/zip")}, + data={"mode": "merge"}, + cookies=admin_cookies, + ) + assert imp.status_code == 200, imp.text + body = imp.json() + assert body["imported"] == 1 + assert body["stub_users_created"] == 1 + + stub = conn.execute( + "SELECT id, active FROM users WHERE email = 'vanishing@x.com'" + ).fetchone() + assert stub is not None + assert stub[0].startswith("imported-") + assert stub[1] is False # disabled + + def test_import_bundle_skip_mode_keeps_existing(self, web_client): + from argon2 import PasswordHasher + from src.db import get_system_db + from src.repositories.users import UserRepository + from tests.helpers.auth import grant_admin + + _, owner_cookies = _create_user(web_client, "skip@x.com") + r = self._upload_skill(web_client, owner_cookies, name="skip-existing") + eid = r.json()["id"] + bundle_bytes = web_client.get( + "/api/store/bundle.zip", cookies=owner_cookies, + ).content + + conn = get_system_db() + ph = PasswordHasher() + UserRepository(conn).create( + id="adm-skip", email="adm-skip@x.com", name="adm", + password_hash=ph.hash("AdminPass1!"), + ) + grant_admin(conn, "adm-skip") + admin_token = web_client.post( + "/auth/token", json={"email": "adm-skip@x.com", "password": "AdminPass1!"} + ).json()["access_token"] + admin_cookies = {"access_token": admin_token} + + # Import without wiping → entity already present → mode=skip + # should report 1 skipped, 0 imported, 0 replaced. + imp = web_client.post( + "/api/store/import-bundle", + files={"file": ("b.zip", bundle_bytes, "application/zip")}, + data={"mode": "skip"}, + cookies=admin_cookies, + ) + assert imp.status_code == 200 + assert imp.json() == { + "imported": 0, "replaced": 0, "skipped": 1, + "stub_users_created": 0, "errors": [], + } + + def test_import_bundle_admin_only(self, web_client): + _, cookies = _create_user(web_client, "non-admin@x.com") + # Build the smallest valid bundle: just manifest.json + no entries. + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("manifest.json", json.dumps({ + "format": 1, "generated_at": "2026-01-01T00:00:00Z", + "entry_count": 0, "entries": [], + })) + r = web_client.post( + "/api/store/import-bundle", + files={"file": ("b.zip", buf.getvalue(), "application/zip")}, + data={"mode": "merge"}, cookies=cookies, + ) + # require_admin denies non-admin with 403. + assert r.status_code == 403, r.text + + class TestInstallCycle: def test_install_uninstall_and_count(self, web_client): # Owner uploads, two other users install, install_count = 2.