Adds whole-Store backup/restore primitives so an external CI/CD job can
mirror the Store to a git repo (and restore back from one).
REST:
- GET /api/store/bundle.zip — deterministic ZIP of all (filtered) Store
entities. Layout: manifest.json + entities/<id>/{plugin,assets}/.
Manifest carries owner_email for cross-instance restore. Auth: any
authenticated user (Store is community-open).
- POST /api/store/import-bundle — admin-only restore. Modes
merge|replace|skip; owner resolution by email with stub-disabled-user
fallback when the email is unknown on the target instance.
CLI:
- agnes store update <id> [--description X] [--zip PATH] ... — in-place
edit (server PUT permits owner OR admin per F4). Closes the missing
edit affordance for analysts who want to fix a typo or push a new
ZIP without losing install_count.
- agnes store pull [-o store.zip] [--unpack DIR] — download the bundle.
--unpack streams + extracts so an external git-backup workflow can
drop the tree straight into a repo and `git add .`.
- agnes store info [--json] — counts + size summary.
- agnes admin store push <zip-or-dir> [--mode ...] — admin-only restore.
Auto-zips a directory client-side so a working-tree → server
round-trip is one command.
cli/v2_client.py gains api_get_stream helper for binary downloads.
Tests: 5 new server tests (bundle shape + filters + round-trip + stub
user creation + skip mode + admin-only gate) + 11 new CLI tests
(update, pull/unpack, info, admin push). 66/66 store-related tests
green locally.
398 lines
13 KiB
Python
398 lines
13 KiB
Python
"""Tests for `agnes store` and `agnes my-stack` Typer wrappers.
|
|
|
|
Smoke + happy-path. Network calls are mocked so tests don't depend on a
|
|
running server.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
from typer.testing import CliRunner
|
|
|
|
from cli.commands.my_stack import my_stack_app
|
|
from cli.commands.store import store_app
|
|
|
|
runner = CliRunner()
|
|
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
|
|
|
|
|
|
def _clean(s: str) -> str:
|
|
return _ANSI_RE.sub("", s)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Help-text smoke tests — guard against accidental command renames.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_store_help_lists_subcommands():
|
|
r = runner.invoke(store_app, ["--help"])
|
|
assert r.exit_code == 0
|
|
out = _clean(r.output)
|
|
for cmd in ("list", "show", "install", "uninstall", "upload", "delete"):
|
|
assert cmd in out, f"missing subcommand {cmd!r} in help"
|
|
|
|
|
|
def test_my_stack_help_lists_subcommands():
|
|
r = runner.invoke(my_stack_app, ["--help"])
|
|
assert r.exit_code == 0
|
|
out = _clean(r.output)
|
|
for cmd in ("show", "toggle"):
|
|
assert cmd in out
|
|
|
|
|
|
def test_store_list_default_help():
|
|
r = runner.invoke(store_app, ["list", "--help"])
|
|
assert r.exit_code == 0
|
|
out = _clean(r.output)
|
|
for opt in ("--type", "--category", "--search", "--owner", "--limit", "--skip", "--json"):
|
|
assert opt in out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Happy-path mocked tests.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_store_list_renders_table(monkeypatch):
|
|
sample = {
|
|
"items": [
|
|
{
|
|
"id": "abc123",
|
|
"type": "skill",
|
|
"name": "code-review",
|
|
"owner_username": "alice",
|
|
"install_count": 5,
|
|
"version": "deadbeef00000000",
|
|
},
|
|
],
|
|
"total": 1,
|
|
"skip": 0,
|
|
"limit": 24,
|
|
}
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_get_json", lambda *a, **kw: sample)
|
|
|
|
r = runner.invoke(store_app, ["list"])
|
|
assert r.exit_code == 0, r.output
|
|
out = _clean(r.output)
|
|
assert "1 entit" in out
|
|
assert "code-review" in out
|
|
assert "alice" in out
|
|
|
|
|
|
def test_store_install_calls_api(monkeypatch):
|
|
captured: dict = {}
|
|
|
|
def _post(path, payload):
|
|
captured["path"] = path
|
|
captured["payload"] = payload
|
|
return {"entity_id": "xyz", "installed": True}
|
|
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_post_json", _post)
|
|
|
|
r = runner.invoke(store_app, ["install", "xyz"])
|
|
assert r.exit_code == 0, r.output
|
|
assert captured["path"] == "/api/store/entities/xyz/install"
|
|
assert "Installed" in _clean(r.output)
|
|
|
|
|
|
def test_store_upload_sends_multipart(monkeypatch, tmp_path):
|
|
captured: dict = {}
|
|
|
|
def _multipart(path, *, files, data):
|
|
captured["path"] = path
|
|
captured["data"] = data
|
|
captured["files_keys"] = list(files.keys())
|
|
return {
|
|
"id": "new-id",
|
|
"name": data.get("name", "fallback"),
|
|
"invocation_name": "fallback-by-someone",
|
|
"version": "abcd1234",
|
|
}
|
|
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_post_multipart", _multipart)
|
|
|
|
zip_path = tmp_path / "skill.zip"
|
|
zip_path.write_bytes(b"PK\x03\x04fake-zip-content")
|
|
|
|
r = runner.invoke(
|
|
store_app,
|
|
["upload", "skill", str(zip_path), "--name", "my-skill", "--description", "d"],
|
|
)
|
|
assert r.exit_code == 0, r.output
|
|
assert captured["path"] == "/api/store/entities"
|
|
assert captured["data"]["type"] == "skill"
|
|
assert captured["data"]["name"] == "my-skill"
|
|
assert captured["data"]["description"] == "d"
|
|
assert captured["files_keys"] == ["file"]
|
|
|
|
|
|
def test_my_stack_show_renders(monkeypatch):
|
|
sample = {
|
|
"curated": [
|
|
{
|
|
"marketplace_id": "official",
|
|
"marketplace_slug": "official",
|
|
"plugin_name": "alpha",
|
|
"manifest_name": "alpha",
|
|
"version": "1.0",
|
|
"enabled": True,
|
|
},
|
|
],
|
|
"store": [
|
|
{
|
|
"entity_id": "e1",
|
|
"type": "skill",
|
|
"name": "code-review",
|
|
"owner_username": "alice",
|
|
"version": "abcd",
|
|
"invocation_name": "code-review-by-alice",
|
|
"install_count": 1,
|
|
},
|
|
],
|
|
}
|
|
import cli.commands.my_stack as ms_mod
|
|
monkeypatch.setattr(ms_mod, "api_get_json", lambda *a, **kw: sample)
|
|
|
|
r = runner.invoke(my_stack_app, ["show"])
|
|
assert r.exit_code == 0, r.output
|
|
out = _clean(r.output)
|
|
assert "Curated" in out and "alpha" in out
|
|
assert "From Store" in out and "code-review-by-alice" in out
|
|
|
|
|
|
def test_my_stack_toggle_requires_on_or_off():
|
|
r = runner.invoke(my_stack_app, ["toggle", "official", "alpha"])
|
|
assert r.exit_code == 2
|
|
assert "exactly one" in _clean(r.output) or "exactly one" in _clean(r.stderr or "")
|
|
|
|
|
|
def test_my_stack_toggle_writes_put(monkeypatch):
|
|
captured: dict = {}
|
|
|
|
def _put(path, payload):
|
|
captured["path"] = path
|
|
captured["payload"] = payload
|
|
return {"ok": True}
|
|
|
|
import cli.commands.my_stack as ms_mod
|
|
monkeypatch.setattr(ms_mod, "api_put_json", _put)
|
|
|
|
r = runner.invoke(my_stack_app, ["toggle", "official", "alpha", "--off"])
|
|
assert r.exit_code == 0, r.output
|
|
assert captured["path"] == "/api/my-stack/curated/official/alpha"
|
|
assert captured["payload"] == {"enabled": False}
|
|
assert "DISABLED" in _clean(r.output)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# `agnes store update`
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_store_update_help_lists_options():
|
|
r = runner.invoke(store_app, ["update", "--help"])
|
|
assert r.exit_code == 0
|
|
out = _clean(r.output)
|
|
for opt in ("--description", "--category", "--video-url", "--photo", "--zip"):
|
|
assert opt in out
|
|
|
|
|
|
def test_store_update_no_fields_exit_2():
|
|
r = runner.invoke(store_app, ["update", "abc123"])
|
|
assert r.exit_code == 2
|
|
assert "Nothing to update" in _clean(r.output)
|
|
|
|
|
|
def test_store_update_sends_put_multipart(monkeypatch):
|
|
captured: dict = {}
|
|
|
|
def _put(path, *, files, data):
|
|
captured["path"] = path
|
|
captured["files"] = files
|
|
captured["data"] = data
|
|
return {"id": "abc", "version": "newhash01234567"}
|
|
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_put_multipart", _put)
|
|
|
|
r = runner.invoke(store_app, ["update", "abc", "--description", "new desc"])
|
|
assert r.exit_code == 0, r.output
|
|
assert captured["path"] == "/api/store/entities/abc"
|
|
assert captured["data"] == {"description": "new desc"}
|
|
assert captured["files"] is None
|
|
assert "Updated" in _clean(r.output)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# `agnes store pull` / `agnes store info`
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_store_pull_writes_zip(monkeypatch, tmp_path):
|
|
captured: dict = {}
|
|
|
|
def _stream(path, dest, **params):
|
|
captured["path"] = path
|
|
captured["params"] = params
|
|
captured["dest"] = dest
|
|
# Write a placeholder so the size message looks plausible.
|
|
with open(dest, "wb") as f:
|
|
f.write(b"PK\x03\x04fakezip")
|
|
return 9
|
|
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_get_stream", _stream)
|
|
|
|
out = tmp_path / "store.zip"
|
|
r = runner.invoke(store_app, ["pull", "-o", str(out)])
|
|
assert r.exit_code == 0, r.output
|
|
assert captured["path"] == "/api/store/bundle.zip"
|
|
assert "Wrote 9 bytes" in _clean(r.output)
|
|
assert out.exists()
|
|
|
|
|
|
def test_store_pull_unpack(monkeypatch, tmp_path):
|
|
"""`--unpack DIR` streams to a temp ZIP and extracts into DIR."""
|
|
import zipfile
|
|
|
|
# Build a fake bundle in-memory and write it as the streamed payload.
|
|
fake_zip_path = tmp_path / "_fake.zip"
|
|
with zipfile.ZipFile(fake_zip_path, "w") as zf:
|
|
zf.writestr("manifest.json", '{"format":1,"entries":[]}')
|
|
zf.writestr("entities/abc/plugin/.claude-plugin/plugin.json", '{}')
|
|
|
|
def _stream(path, dest, **params):
|
|
# Copy fake zip bytes into the streamed dest.
|
|
from pathlib import Path as _P
|
|
with open(dest, "wb") as fh:
|
|
fh.write(_P(fake_zip_path).read_bytes())
|
|
return _P(dest).stat().st_size
|
|
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_get_stream", _stream)
|
|
|
|
target = tmp_path / "unpacked"
|
|
r = runner.invoke(store_app, ["pull", "--unpack", str(target)])
|
|
assert r.exit_code == 0, r.output
|
|
assert (target / "manifest.json").is_file()
|
|
assert (target / "entities/abc/plugin/.claude-plugin/plugin.json").is_file()
|
|
|
|
|
|
def test_store_info_summarizes(monkeypatch):
|
|
page1 = {
|
|
"items": [
|
|
{"type": "skill", "file_size": 1024},
|
|
{"type": "skill", "file_size": 512},
|
|
{"type": "agent", "file_size": 256},
|
|
],
|
|
"total": 3, "skip": 0, "limit": 100,
|
|
}
|
|
empty = {"items": [], "total": 3, "skip": 100, "limit": 100}
|
|
pages = [page1, empty]
|
|
|
|
def _get(path, **params):
|
|
return pages.pop(0)
|
|
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_get_json", _get)
|
|
|
|
r = runner.invoke(store_app, ["info"])
|
|
assert r.exit_code == 0, r.output
|
|
out = _clean(r.output)
|
|
assert "3 entit" in out
|
|
assert "skill" in out and "2" in out
|
|
assert "agent" in out and "1" in out
|
|
|
|
|
|
def test_store_info_json(monkeypatch):
|
|
one = {
|
|
"items": [{"type": "plugin", "file_size": 999}],
|
|
"total": 1, "skip": 0, "limit": 100,
|
|
}
|
|
pages = [one, {"items": [], "total": 1, "skip": 100, "limit": 100}]
|
|
import cli.commands.store as store_mod
|
|
monkeypatch.setattr(store_mod, "api_get_json", lambda *a, **kw: pages.pop(0))
|
|
|
|
r = runner.invoke(store_app, ["info", "--json"])
|
|
assert r.exit_code == 0, r.output
|
|
import json as _json
|
|
body = _json.loads(_clean(r.output))
|
|
assert body["total_entities"] == 1
|
|
assert body["by_type"] == {"plugin": 1}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# `agnes admin store push`
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_admin_store_push_help():
|
|
from cli.commands.admin_store import admin_store_app
|
|
r = runner.invoke(admin_store_app, ["--help"])
|
|
assert r.exit_code == 0
|
|
assert "push" in _clean(r.output)
|
|
|
|
|
|
def test_admin_store_push_invalid_mode_exit_2(tmp_path):
|
|
"""Single-command Typer app — invoke via parent so the `push` token
|
|
actually routes to the subcommand (otherwise Typer collapses the lone
|
|
command and treats `push` as the SOURCE positional)."""
|
|
from cli.commands.admin import admin_app
|
|
bundle = tmp_path / "x.zip"
|
|
bundle.write_bytes(b"PK\x03\x04")
|
|
r = runner.invoke(admin_app, ["store", "push", str(bundle), "--mode", "wat"])
|
|
assert r.exit_code == 2
|
|
assert "merge|replace|skip" in _clean(r.output)
|
|
|
|
|
|
def test_admin_store_push_zips_directory(monkeypatch, tmp_path):
|
|
"""When source is a directory, CLI must zip it client-side and POST."""
|
|
import zipfile as _zf
|
|
|
|
captured: dict = {}
|
|
|
|
def _post(path, *, files, data):
|
|
captured["path"] = path
|
|
captured["data"] = data
|
|
zip_bytes = files["file"][1]
|
|
with _zf.ZipFile(__import__("io").BytesIO(zip_bytes)) as zf:
|
|
captured["names"] = sorted(zf.namelist())
|
|
return {
|
|
"imported": 1, "replaced": 0, "skipped": 0,
|
|
"stub_users_created": 0, "errors": [],
|
|
}
|
|
|
|
from cli.commands import admin_store as admin_store_mod
|
|
from cli.commands.admin import admin_app
|
|
monkeypatch.setattr(admin_store_mod, "api_post_multipart", _post)
|
|
|
|
bundle_dir = tmp_path / "bundle"
|
|
(bundle_dir / "entities" / "abc" / "plugin").mkdir(parents=True)
|
|
(bundle_dir / "manifest.json").write_text('{"format":1,"entries":[]}')
|
|
(bundle_dir / "entities" / "abc" / "plugin" / "marker.txt").write_text("x")
|
|
|
|
r = runner.invoke(
|
|
admin_app, ["store", "push", str(bundle_dir), "--mode", "merge", "--yes"],
|
|
)
|
|
assert r.exit_code == 0, r.output
|
|
assert captured["path"] == "/api/store/import-bundle"
|
|
assert captured["data"] == {"mode": "merge"}
|
|
assert "manifest.json" in captured["names"]
|
|
assert "entities/abc/plugin/marker.txt" in captured["names"]
|
|
assert "imported=1" in _clean(r.output)
|
|
|
|
|
|
def test_admin_store_push_directory_without_manifest_exit_2(tmp_path):
|
|
from cli.commands.admin import admin_app
|
|
empty_dir = tmp_path / "no_manifest"
|
|
empty_dir.mkdir()
|
|
r = runner.invoke(
|
|
admin_app, ["store", "push", str(empty_dir), "--yes"],
|
|
)
|
|
assert r.exit_code == 2
|
|
assert "manifest.json" in _clean(r.output)
|