feat(cli): agnes store + agnes my-stack commands

Adds CLI coverage for the new REST surface introduced in this PR:

  agnes store list / show / install / uninstall / upload / delete
  agnes my-stack show / toggle

Covers 11 of the 15 new endpoints — listing, detail, install/uninstall,
upload (multipart), delete, my-stack get + curated toggle. Photo / docs
download endpoints intentionally skipped; analyst-side automation rarely
needs raw bytes back, and the web UI already covers them.

cli/v2_client.py: api_post_multipart + api_put_multipart helpers (httpx
files= passthrough). api_delete + api_put_json fillers were already
needed for non-multipart writes; added together.

Tests: tests/test_cli_store.py — help-text smoke tests + happy-path
mocked tests for list, install, upload, my-stack show, my-stack toggle.
12 new tests, all green.
This commit is contained in:
ZdenekSrotyr 2026-05-05 08:18:12 +02:00
parent fd3c76d21b
commit 16373d6b0b
5 changed files with 482 additions and 0 deletions

68
cli/commands/my_stack.py Normal file
View file

@ -0,0 +1,68 @@
"""`agnes my-stack {show,toggle}` — per-user marketplace composition view.
Reads ``GET /api/my-stack`` and writes
``PUT /api/my-stack/curated/{marketplace_id}/{plugin_name}`` opt-out flips.
"""
from __future__ import annotations
import json
from typing import Optional
import typer
from cli.v2_client import V2ClientError, api_get_json, api_put_json
my_stack_app = typer.Typer(help="Per-user marketplace composition (curated grants + Store installs)")
@my_stack_app.command("show")
def show_stack(
json_out: bool = typer.Option(False, "--json"),
):
"""Show admin-granted plugins (with opt-out state) and your Store installs."""
try:
body = api_get_json("/api/my-stack")
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
if json_out:
typer.echo(json.dumps(body, indent=2))
return
curated = body.get("curated", [])
store = body.get("store", [])
typer.echo(f"Curated (admin-granted): {len(curated)}")
for p in curated:
flag = "" if p["enabled"] else ""
typer.echo(
f" [{flag}] {p['marketplace_id']}/{p['plugin_name']:24s} "
f"manifest={p['manifest_name']} v{p.get('version') or '?'}"
)
typer.echo(f"\nFrom Store: {len(store)}")
for it in store:
typer.echo(
f" [{it['type']:6s}] {it['name']:24s} by {it['owner_username']:20s} "
f"invocation={it['invocation_name']} id={it['entity_id']}"
)
@my_stack_app.command("toggle")
def toggle(
marketplace_id: str = typer.Argument(...),
plugin_name: str = typer.Argument(...),
on: bool = typer.Option(False, "--on", help="Enable (drop opt-out)"),
off: bool = typer.Option(False, "--off", help="Disable (set opt-out)"),
):
"""Toggle a curated plugin on or off (writes a `user_plugin_optouts` row)."""
if on == off:
typer.echo("Pass exactly one of --on / --off.", err=True)
raise typer.Exit(2)
enabled = bool(on)
path = f"/api/my-stack/curated/{marketplace_id}/{plugin_name}"
try:
api_put_json(path, {"enabled": enabled})
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
state = "ENABLED" if enabled else "DISABLED"
typer.echo(f"{marketplace_id}/{plugin_name}: {state}")

161
cli/commands/store.py Normal file
View file

@ -0,0 +1,161 @@
"""`agnes store {list,show,install,uninstall,upload,delete}` — community
marketplace browse/install over the REST API.
Mirrors the /store web UI for analyst CLI workflows. Listing + filters are
the read paths; install/uninstall/upload/delete are the write paths. All
commands authenticate via the configured PAT (see ``cli auth``); the
endpoints are gated by ``get_current_user`` server-side.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
import typer
from cli.v2_client import (
V2ClientError,
api_delete,
api_get_json,
api_post_json,
api_post_multipart,
)
store_app = typer.Typer(help="Community Store — browse, install, upload skills/agents/plugins")
@store_app.command("list")
def list_entities(
type: Optional[str] = typer.Option(None, "--type", help="skill | agent | plugin"),
category: Optional[str] = typer.Option(None, "--category"),
search: Optional[str] = typer.Option(None, "--search", "-q"),
owner: Optional[str] = typer.Option(None, "--owner", help="Filter by owner user_id"),
limit: int = typer.Option(24, "--limit", min=1, max=100),
skip: int = typer.Option(0, "--skip", min=0),
json_out: bool = typer.Option(False, "--json", help="Emit raw JSON instead of a table"),
):
"""List Store entities with optional filters."""
params: dict = {"limit": limit, "skip": skip}
if type:
params["type"] = type
if category:
params["category"] = category
if search:
params["search"] = search
if owner:
params["owner"] = owner
try:
body = api_get_json("/api/store/entities", **params)
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
if json_out:
typer.echo(json.dumps(body, indent=2))
return
items = body.get("items", [])
total = body.get("total", 0)
typer.echo(f"{total} entit(y) total — showing {len(items)} (skip={skip}):")
for it in items:
typer.echo(
f" [{it['type']:6s}] {it['name']:24s} by {it['owner_username']:20s} "
f"installs={it['install_count']:<4d} v{it['version']} id={it['id']}"
)
@store_app.command("show")
def show_entity(
entity_id: str = typer.Argument(...),
json_out: bool = typer.Option(False, "--json"),
):
"""Show a Store entity's full metadata."""
try:
body = api_get_json(f"/api/store/entities/{entity_id}")
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
if json_out:
typer.echo(json.dumps(body, indent=2))
return
typer.echo(f"{body['name']} ({body['type']}) v{body['version']}")
typer.echo(f" by {body['owner_username']} ({body.get('owner_display_name') or '?'})")
typer.echo(f" invocation: {body['invocation_name']}")
if body.get("description"):
typer.echo(f" description: {body['description']}")
typer.echo(f" installs: {body['install_count']}, size: {body['file_size']} bytes")
if body.get("video_url"):
typer.echo(f" video: {body['video_url']}")
@store_app.command("install")
def install_entity(entity_id: str = typer.Argument(...)):
"""Install a Store entity into your `/marketplace.zip` view."""
try:
body = api_post_json(f"/api/store/entities/{entity_id}/install", {})
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
typer.echo(f"Installed: entity_id={body['entity_id']}")
@store_app.command("uninstall")
def uninstall_entity(entity_id: str = typer.Argument(...)):
"""Uninstall a Store entity from your view."""
try:
body = api_delete(f"/api/store/entities/{entity_id}/install")
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
typer.echo(f"Uninstalled: entity_id={body.get('entity_id', entity_id)}")
@store_app.command("upload")
def upload_entity(
type: str = typer.Argument(..., help="skill | agent | plugin"),
zip_path: Path = typer.Argument(..., exists=True, dir_okay=False, readable=True),
name: Optional[str] = typer.Option(None, "--name"),
description: Optional[str] = typer.Option(None, "--description"),
category: Optional[str] = typer.Option(None, "--category"),
video_url: Optional[str] = typer.Option(None, "--video-url"),
):
"""Upload a Store entity from a local ZIP file."""
files = {
"file": (zip_path.name, zip_path.read_bytes(), "application/zip"),
}
data: dict = {"type": type}
if name:
data["name"] = name
if description:
data["description"] = description
if category:
data["category"] = category
if video_url:
data["video_url"] = video_url
try:
body = api_post_multipart("/api/store/entities", files=files, data=data)
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
typer.echo(
f"Uploaded: id={body['id']} name={body['name']} "
f"invocation={body['invocation_name']} version={body['version']}"
)
@store_app.command("delete")
def delete_entity(
entity_id: str = typer.Argument(...),
yes: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation"),
):
"""Delete a Store entity (owner or admin only)."""
if not yes:
confirm = typer.confirm(f"Delete entity {entity_id}?")
if not confirm:
raise typer.Abort()
try:
api_delete(f"/api/store/entities/{entity_id}")
except V2ClientError as e:
typer.echo(str(e), err=True)
raise typer.Exit(1)
typer.echo(f"Deleted: {entity_id}")

View file

@ -41,6 +41,8 @@ from cli.commands.schema import schema_app
from cli.commands.describe import describe_app
from cli.commands.snapshot import snapshot_app
from cli.commands.disk_info import disk_info_app
from cli.commands.store import store_app
from cli.commands.my_stack import my_stack_app
def _cli_version() -> str:
@ -121,6 +123,8 @@ app.add_typer(schema_app, name="schema")
app.add_typer(describe_app, name="describe")
app.add_typer(snapshot_app, name="snapshot")
app.add_typer(disk_info_app, name="disk-info")
app.add_typer(store_app, name="store")
app.add_typer(my_stack_app, name="my-stack")
if __name__ == "__main__":

View file

@ -59,6 +59,66 @@ def api_post_json(path: str, payload: dict) -> dict:
return r.json()
def api_delete(path: str) -> dict:
url = f"{get_server_url().rstrip('/')}{path}"
r = httpx.delete(url, headers=_headers(), timeout=30)
if r.status_code >= 400:
raise V2ClientError(status_code=r.status_code, body=_parse_error_body(r))
if not r.content:
return {}
if "json" in r.headers.get("content-type", ""):
return r.json()
return {}
def api_put_json(path: str, payload: dict) -> dict:
url = f"{get_server_url().rstrip('/')}{path}"
r = httpx.put(url, json=payload, headers=_headers(), timeout=30)
if r.status_code >= 400:
raise V2ClientError(status_code=r.status_code, body=_parse_error_body(r))
if not r.content:
return {}
return r.json()
def api_post_multipart(
path: str,
*,
files: dict | None = None,
data: dict | None = None,
) -> dict:
"""POST a multipart/form-data request — used for Store ZIP/photo uploads.
`files` mirrors httpx.post(..., files=...): each value is an
(filename, bytes, content_type) tuple or an open file-like object.
`data` is the form fields. Returns parsed JSON.
"""
url = f"{get_server_url().rstrip('/')}{path}"
r = httpx.post(
url, files=files or None, data=data or None,
headers=_headers(), timeout=600,
)
if r.status_code >= 400:
raise V2ClientError(status_code=r.status_code, body=_parse_error_body(r))
return r.json()
def api_put_multipart(
path: str,
*,
files: dict | None = None,
data: dict | None = None,
) -> dict:
url = f"{get_server_url().rstrip('/')}{path}"
r = httpx.put(
url, files=files or None, data=data or None,
headers=_headers(), timeout=600,
)
if r.status_code >= 400:
raise V2ClientError(status_code=r.status_code, body=_parse_error_body(r))
return r.json()
def api_post_arrow(path: str, payload: dict) -> pa.Table:
"""Post JSON, expect Arrow IPC stream response."""
url = f"{get_server_url().rstrip('/')}{path}"

189
tests/test_cli_store.py Normal file
View file

@ -0,0 +1,189 @@
"""Tests for `agnes store` and `agnes my-stack` Typer wrappers.
Smoke + happy-path. Network calls are mocked so tests don't depend on a
running server.
"""
from __future__ import annotations
import re
from typer.testing import CliRunner
from cli.commands.my_stack import my_stack_app
from cli.commands.store import store_app
runner = CliRunner()
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
def _clean(s: str) -> str:
return _ANSI_RE.sub("", s)
# ---------------------------------------------------------------------------
# Help-text smoke tests — guard against accidental command renames.
# ---------------------------------------------------------------------------
def test_store_help_lists_subcommands():
r = runner.invoke(store_app, ["--help"])
assert r.exit_code == 0
out = _clean(r.output)
for cmd in ("list", "show", "install", "uninstall", "upload", "delete"):
assert cmd in out, f"missing subcommand {cmd!r} in help"
def test_my_stack_help_lists_subcommands():
r = runner.invoke(my_stack_app, ["--help"])
assert r.exit_code == 0
out = _clean(r.output)
for cmd in ("show", "toggle"):
assert cmd in out
def test_store_list_default_help():
r = runner.invoke(store_app, ["list", "--help"])
assert r.exit_code == 0
out = _clean(r.output)
for opt in ("--type", "--category", "--search", "--owner", "--limit", "--skip", "--json"):
assert opt in out
# ---------------------------------------------------------------------------
# Happy-path mocked tests.
# ---------------------------------------------------------------------------
def test_store_list_renders_table(monkeypatch):
sample = {
"items": [
{
"id": "abc123",
"type": "skill",
"name": "code-review",
"owner_username": "alice",
"install_count": 5,
"version": "deadbeef00000000",
},
],
"total": 1,
"skip": 0,
"limit": 24,
}
import cli.commands.store as store_mod
monkeypatch.setattr(store_mod, "api_get_json", lambda *a, **kw: sample)
r = runner.invoke(store_app, ["list"])
assert r.exit_code == 0, r.output
out = _clean(r.output)
assert "1 entit" in out
assert "code-review" in out
assert "alice" in out
def test_store_install_calls_api(monkeypatch):
captured: dict = {}
def _post(path, payload):
captured["path"] = path
captured["payload"] = payload
return {"entity_id": "xyz", "installed": True}
import cli.commands.store as store_mod
monkeypatch.setattr(store_mod, "api_post_json", _post)
r = runner.invoke(store_app, ["install", "xyz"])
assert r.exit_code == 0, r.output
assert captured["path"] == "/api/store/entities/xyz/install"
assert "Installed" in _clean(r.output)
def test_store_upload_sends_multipart(monkeypatch, tmp_path):
captured: dict = {}
def _multipart(path, *, files, data):
captured["path"] = path
captured["data"] = data
captured["files_keys"] = list(files.keys())
return {
"id": "new-id",
"name": data.get("name", "fallback"),
"invocation_name": "fallback-by-someone",
"version": "abcd1234",
}
import cli.commands.store as store_mod
monkeypatch.setattr(store_mod, "api_post_multipart", _multipart)
zip_path = tmp_path / "skill.zip"
zip_path.write_bytes(b"PK\x03\x04fake-zip-content")
r = runner.invoke(
store_app,
["upload", "skill", str(zip_path), "--name", "my-skill", "--description", "d"],
)
assert r.exit_code == 0, r.output
assert captured["path"] == "/api/store/entities"
assert captured["data"]["type"] == "skill"
assert captured["data"]["name"] == "my-skill"
assert captured["data"]["description"] == "d"
assert captured["files_keys"] == ["file"]
def test_my_stack_show_renders(monkeypatch):
sample = {
"curated": [
{
"marketplace_id": "official",
"marketplace_slug": "official",
"plugin_name": "alpha",
"manifest_name": "alpha",
"version": "1.0",
"enabled": True,
},
],
"store": [
{
"entity_id": "e1",
"type": "skill",
"name": "code-review",
"owner_username": "alice",
"version": "abcd",
"invocation_name": "code-review-by-alice",
"install_count": 1,
},
],
}
import cli.commands.my_stack as ms_mod
monkeypatch.setattr(ms_mod, "api_get_json", lambda *a, **kw: sample)
r = runner.invoke(my_stack_app, ["show"])
assert r.exit_code == 0, r.output
out = _clean(r.output)
assert "Curated" in out and "alpha" in out
assert "From Store" in out and "code-review-by-alice" in out
def test_my_stack_toggle_requires_on_or_off():
r = runner.invoke(my_stack_app, ["toggle", "official", "alpha"])
assert r.exit_code == 2
assert "exactly one" in _clean(r.output) or "exactly one" in _clean(r.stderr or "")
def test_my_stack_toggle_writes_put(monkeypatch):
captured: dict = {}
def _put(path, payload):
captured["path"] = path
captured["payload"] = payload
return {"ok": True}
import cli.commands.my_stack as ms_mod
monkeypatch.setattr(ms_mod, "api_put_json", _put)
r = runner.invoke(my_stack_app, ["toggle", "official", "alpha", "--off"])
assert r.exit_code == 0, r.output
assert captured["path"] == "/api/my-stack/curated/official/alpha"
assert captured["payload"] == {"enabled": False}
assert "DISABLED" in _clean(r.output)