agnes-the-ai-analyst/tests/test_admin_store_submissions.py
Vojtech 4501c9c3dd
fix(store-guardrails): post-#290 review follow-up — purge tuple, filter chip, stale docs, lazy bundle_meta, logger.exception (#295)
Addresses post-merge review findings on #290:

- Admin Rescan is the only post-v30 producer of status='blocked_inline'.
  Re-add it to admin queue 'Needs review' filter chip and to
  TERMINAL_BLOCKED_STATUSES in the bundle-purge job so rescan-produced
  rows surface in the default operator view and bundles get TTL-swept
  instead of lingering indefinitely.

- Update three doc-drift sites still referring to the pre-#290 spam
  counter scope (counted blocked_inline). The counter now narrows to
  blocked_llm + review_error; fix the comment in app/api/store.py,
  the docstring in get_guardrails_blocked_quota_per_day(), and the
  operator-facing hint rendered on /admin/server-config.

- Add positive test for _reject_inline_or_continue validation branch
  (code='validation_failed', checks payload shape, no-DB-write
  contract). Locks the frontend wizard's detail.checks contract.

- Tighten test_quota_disabled_with_zero — assert (200, 201) explicitly
  instead of !=429 so a 500 regression no longer passes.

- _reject_inline_or_continue takes plugin_dir and lazy-computes
  bundle_meta only on the security branch. Validation rejects no
  longer pay for a SHA256 walk on the bundle.

- Surface store.upload.security_blocked audit-log write failures via
  logger.exception instead of swallowing — that audit row is the only
  forensic trace by design.
2026-05-14 08:02:44 +02:00

1908 lines
83 KiB
Python

"""Admin endpoints for the flea-market guardrail submissions surface.
Covers:
* Listing — non-admin gets 403, admin gets the table
* Override — flips status + entity visibility, writes audit row
* Retry — re-queues a review_error / blocked_llm submission
* Delete — wipes both submission row and entity bundle
* Override edge: inline-blocked submissions without an entity_id
cannot be overridden (refused with 409)
"""
from __future__ import annotations
import io
import json
import zipfile
from pathlib import Path
import pytest
from argon2 import PasswordHasher
from fastapi.testclient import TestClient
from app.utils import get_store_dir
from src.db import close_system_db, get_system_db
from src.repositories.users import UserRepository
@pytest.fixture
def web_client(tmp_path, monkeypatch):
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("TESTING", "1")
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-key-min-32-characters!!")
(tmp_path / "state").mkdir()
(tmp_path / "analytics").mkdir()
(tmp_path / "extracts").mkdir()
close_system_db()
from app.main import create_app
app = create_app()
yield TestClient(app)
close_system_db()
def _create_user(client, email, password="UserPass1!"):
ph = PasswordHasher()
conn = get_system_db()
user_id = email.split("@")[0]
UserRepository(conn).create(
id=user_id, email=email, name=user_id, password_hash=ph.hash(password),
)
conn.close()
r = client.post("/auth/token", json={"email": email, "password": password})
assert r.status_code == 200, r.text
return user_id, {"access_token": r.json()["access_token"]}
def _create_admin(client, email="admin@x.com"):
from tests.helpers.auth import grant_admin
user_id, cookies = _create_user(client, email, password="AdminPass1!")
conn = get_system_db()
grant_admin(conn, user_id)
conn.close()
return user_id, cookies
def _make_skill_zip(skill_name: str = "probe") -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(
f"{skill_name}/SKILL.md",
f"---\nname: {skill_name}\ndescription: Use when staging a clean reference bundle for admin-review pipeline tests\n---\n\n"
+ ("Body that is intentionally long enough to clear quality thresholds. " * 6),
)
return buf.getvalue()
def _make_eval_skill_zip(skill_name: str = "bad") -> bytes:
"""A skill with a bash-eval script — guaranteed to fail static_security."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(
f"{skill_name}/SKILL.md",
f"---\nname: {skill_name}\ndescription: Use when staging a bundle that intentionally trips static-security review checks\n---\n\n"
+ ("Body. " * 50),
)
zf.writestr(f"{skill_name}/run.sh", "#!/bin/sh\neval $1\n")
return buf.getvalue()
def _seed_quarantined_entity(
user_id: str,
user_email: str,
skill_name: str = "quarantined",
description: str = "Description seeded for tests — long enough to pass content checks.",
*,
status: str = "blocked_llm",
static_findings=None,
llm_summary: str = "test stub finding",
):
"""Seed a hidden flea entity + matching submission row + on-disk
bundle, mimicking the post-LLM-review-blocked state.
Inline failures (manifest, static-security, content) are now
hard-rejected upstream and never create DB rows. Tests that
previously triggered the v30 ``submission_blocked`` path by
uploading a bad bundle must seed the quarantined state directly
via this helper. The default status is ``blocked_llm`` — the only
status path that still creates a hidden+pending entity.
Returns ``(entity_id, submission_id)``.
"""
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
from src.store_naming import suffixed_name
import uuid as _uuid
entity_id = _uuid.uuid4().hex
username = user_email.split("@")[0]
store_dir = get_store_dir()
entity_dir = store_dir / entity_id
plugin_root = entity_dir / "plugin"
skill_subdir = plugin_root / "skills" / suffixed_name(skill_name, username)
skill_subdir.mkdir(parents=True, exist_ok=True)
skill_md = skill_subdir / "SKILL.md"
skill_md.write_text(
f"---\nname: {suffixed_name(skill_name, username)}\n"
f"description: {description}\n---\n\n"
+ ("Body content. " * 30),
)
run_sh = skill_subdir / "run.sh"
run_sh.write_text("#!/bin/sh\neval $1\n")
# v1 seed dir so the version download / restore endpoints find it.
v1_plugin = entity_dir / "versions" / "v1" / "plugin"
v1_plugin.parent.mkdir(parents=True, exist_ok=True)
import shutil as _shutil
_shutil.copytree(plugin_root, v1_plugin)
# Mirror the InlineResult.to_response_dict() shape that the runner
# would have produced. Static findings are surfaced verbatim in the
# quarantine banner template (_quarantine_banner.html).
findings = static_findings if static_findings is not None else [
{"file": "run.sh", "line": 2, "category": "code_exec",
"severity": "high",
"reason": "shell eval expanding a variable",
"snippet": "eval $1"},
]
inline_checks = {
"manifest": {"status": "pass", "issues": []},
"static_security": {"status": "fail", "findings": findings},
"content": {"status": "pass", "issues": []},
"quality": {"status": "pass", "issues": []},
}
conn = get_system_db()
StoreEntitiesRepository(conn).create(
id=entity_id,
owner_user_id=user_id,
owner_username=username,
type="skill",
name=skill_name,
description=description,
category=None,
version="1.0.0",
file_size=512,
visibility_status="hidden",
)
sub_id = StoreSubmissionsRepository(conn).create(
submitter_id=user_id,
submitter_email=user_email,
type="skill",
name=skill_name,
version="1.0.0",
status=status,
entity_id=entity_id,
inline_checks=inline_checks,
llm_findings={"risk_level": "high", "summary": llm_summary,
"findings": findings},
file_size=512,
bundle_sha256="0" * 64,
)
conn.close()
return entity_id, sub_id
# ---------------------------------------------------------------------------
# /api/admin/store/submissions — listing
# ---------------------------------------------------------------------------
class TestAdminListing:
def test_non_admin_forbidden(self, web_client):
_, user_cookies = _create_user(web_client, "user@x.com")
r = web_client.get("/api/admin/store/submissions", cookies=user_cookies)
assert r.status_code == 403
def test_security_upload_creates_no_submission_row(self, web_client):
"""Static-security findings are hard-rejected — no submission row,
no entity row, no bundle on disk. Replaces the v30 contract
where inline failures landed in admin's queue at
``blocked_inline``.
"""
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
user_id, user_cookies = _create_user(web_client, "u@x.com")
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_eval_skill_zip("bad"), "application/zip")},
data={"type": "skill"}, cookies=user_cookies,
)
assert c.status_code == 422
detail = c.json()["detail"]
assert detail["code"] == "security_blocked"
# Findings are exposed inline so the wizard banner can render them.
assert detail["checks"]["static_security"]["status"] == "fail"
assert detail["checks"]["static_security"]["findings"]
# No DB rows, no quarantined entity for the submitter to inspect.
assert "submission_id" not in detail
assert "entity_id" not in detail
conn = get_system_db()
items, _total = StoreSubmissionsRepository(conn).list_for_admin(
submitter_id=user_id,
)
assert items == []
ent_items, _ = StoreEntitiesRepository(conn).list(owner_user_id=user_id)
assert ent_items == []
conn.close()
# Admin queue is empty: no row was ever created.
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
"/api/admin/store/submissions", cookies=admin_cookies,
)
assert r.status_code == 200
items = r.json()["items"]
assert not any(s["submitter_id"] == user_id for s in items), (
"security_blocked upload must not surface in admin queue"
)
def test_security_upload_emits_audit_log_entry(self, web_client):
"""A static-security rejection writes one ``store.upload.security_blocked``
audit_log row carrying the findings + sha256 + size. That row is
the *only* trace of the attempt; admin can grep audit_log for
repeated offenders.
"""
from src.repositories.audit import AuditRepository
user_id, user_cookies = _create_user(web_client, "spammer@x.com")
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_eval_skill_zip("audit"), "application/zip")},
data={"type": "skill"}, cookies=user_cookies,
)
assert c.status_code == 422
assert c.json()["detail"]["code"] == "security_blocked"
conn = get_system_db()
rows, _cursor = AuditRepository(conn).query(
user_id=user_id, action="store.upload.security_blocked",
limit=10,
)
conn.close()
assert len(rows) == 1
params = rows[0].get("params") or {}
if isinstance(params, str):
params = json.loads(params)
assert params.get("finding_count", 0) >= 1
assert params.get("bundle_sha256")
assert params.get("submitter_email") == "spammer@x.com"
def test_inline_validation_returns_validation_failed_code(self, web_client):
"""A bundle that survives pre-bake but fails ``content_check``
(description too short) goes through ``_reject_inline_or_continue``
and is rejected with the new two-tier response: 422,
``detail.code == 'validation_failed'``, populated
``detail.checks`` shape, NO submission row, NO entity row,
and NO audit_log entry (validation-tier failures are
operator-fixable, not forensically interesting).
Distinct from ``test_validation_failure_creates_no_audit_trail``
below which exercises the pre-bake ``zip_missing_skill_md`` path
— that one fails before inline checks ever run.
"""
from src.repositories.audit import AuditRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
# Valid skill layout — pre-bake parses frontmatter, layout
# check passes. But description is < 60 chars → content_check
# fires inside run_inline_checks → _reject_inline_or_continue
# returns validation_failed.
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(
"tiny/SKILL.md",
"---\nname: tiny\ndescription: too short\n---\n\n"
+ ("Body text. " * 50),
)
short_desc_zip = buf.getvalue()
user_id, user_cookies = _create_user(web_client, "shortdesc@x.com")
r = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", short_desc_zip, "application/zip")},
data={"type": "skill"}, cookies=user_cookies,
)
assert r.status_code == 422, r.text
detail = r.json()["detail"]
assert detail["code"] == "validation_failed", detail
# Frontend wizard (humanizeError in store_upload.html) reads
# detail.checks.{manifest,content,quality} — lock the shape.
assert set(detail["checks"].keys()) == {"manifest", "content", "quality"}
assert detail["checks"]["content"]["status"] != "pass"
# Validation-tier failures must not produce DB rows or audit entries.
conn = get_system_db()
items, _total = StoreSubmissionsRepository(conn).list_for_admin(
submitter_id=user_id,
)
assert items == []
rows, _cursor = AuditRepository(conn).query(
user_id=user_id, action_prefix="store.upload.", limit=10,
)
conn.close()
assert rows == [], (
"validation-tier rejection must not write audit_log entries"
)
def test_validation_failure_creates_no_audit_trail(self, web_client):
"""A bundle that fails manifest validation (missing SKILL.md) is
a fixable user error — no submission row, no entity row, and
NO audit_log entry. The submitter just sees the wizard banner.
"""
from src.repositories.audit import AuditRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
# Skill ZIP without the required SKILL.md — manifest_check fails.
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("broken/notes.md", "no manifest here\n")
bad_zip = buf.getvalue()
user_id, user_cookies = _create_user(web_client, "validation@x.com")
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", bad_zip, "application/zip")},
data={"type": "skill"}, cookies=user_cookies,
)
assert c.status_code == 422
# zip_missing_skill_md fires at metadata-extract (pre-bake), so
# the response is a plain ``detail: "zip_missing_skill_md"`` —
# but the contract under test is "no DB rows, no audit trail",
# which is what we assert below.
conn = get_system_db()
items, _total = StoreSubmissionsRepository(conn).list_for_admin(
submitter_id=user_id,
)
assert items == []
rows, _cursor = AuditRepository(conn).query(
user_id=user_id, action_prefix="store.upload.", limit=10,
)
conn.close()
assert rows == [], (
"validation-tier rejection must not write audit_log entries"
)
# ---------------------------------------------------------------------------
# Override
# ---------------------------------------------------------------------------
class TestAdminOverride:
def test_override_blocked_llm_publishes_entity(self, web_client):
"""Manually stage a blocked_llm row + entity, then override — the
entity must flip to visibility_status='approved' and the
submission to 'overridden'."""
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
user_id, _ = _create_user(web_client, "submitter@x.com")
conn = get_system_db()
ents = StoreEntitiesRepository(conn)
ents.create(
id="ent-blk", owner_user_id=user_id, owner_username="submitter",
type="skill", name="blocked-thing", description="x" * 30,
category=None, version="1.0.0", file_size=10,
visibility_status="pending",
)
subs = StoreSubmissionsRepository(conn)
sid = subs.create(
submitter_id=user_id, submitter_email="submitter@x.com",
type="skill", name="blocked-thing", version="1.0.0",
status="blocked_llm", entity_id="ent-blk",
llm_findings={"risk_level": "high", "summary": "exfil"},
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.post(
f"/api/admin/store/submissions/{sid}/override",
json={"reason": "false positive — internal-only constants"},
cookies=admin_cookies,
)
assert r.status_code == 200, r.text
conn = get_system_db()
ent = StoreEntitiesRepository(conn).get("ent-blk")
assert ent["visibility_status"] == "approved"
sub = StoreSubmissionsRepository(conn).get(sid)
assert sub["status"] == "overridden"
assert sub["override_reason"].startswith("false positive")
conn.close()
def test_override_short_reason_rejected(self, web_client):
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
user_id, _ = _create_user(web_client, "u@x.com")
conn = get_system_db()
StoreEntitiesRepository(conn).create(
id="e1", owner_user_id=user_id, owner_username="u",
type="skill", name="x", description="x" * 30, category=None,
version="1.0.0", file_size=10, visibility_status="pending",
)
sid = StoreSubmissionsRepository(conn).create(
submitter_id=user_id, submitter_email="u@x.com",
type="skill", name="x", version="1.0.0",
status="blocked_llm", entity_id="e1",
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.post(
f"/api/admin/store/submissions/{sid}/override",
json={"reason": "ok"}, # 2 chars < min_length=4
cookies=admin_cookies,
)
assert r.status_code == 422
# ---------------------------------------------------------------------------
# Delete
# ---------------------------------------------------------------------------
class TestAdminDelete:
def test_delete_clears_submission_and_entity(self, web_client):
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
user_id, _ = _create_user(web_client, "u@x.com")
conn = get_system_db()
StoreEntitiesRepository(conn).create(
id="e2", owner_user_id=user_id, owner_username="u",
type="skill", name="y", description="x" * 30, category=None,
version="1.0.0", file_size=10, visibility_status="approved",
)
sid = StoreSubmissionsRepository(conn).create(
submitter_id=user_id, submitter_email="u@x.com",
type="skill", name="y", version="1.0.0",
status="approved", entity_id="e2",
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.delete(
f"/api/admin/store/submissions/{sid}",
cookies=admin_cookies,
)
assert r.status_code == 200, r.text
conn = get_system_db()
assert StoreEntitiesRepository(conn).get("e2") is None
assert StoreSubmissionsRepository(conn).get(sid) is None
conn.close()
# ---------------------------------------------------------------------------
# List filters + pagination
# ---------------------------------------------------------------------------
def _seed_submissions(submitters):
"""Stage rows directly through the repo. Returns list of submission ids
in insertion order so tests can reference rows."""
from src.repositories.store_submissions import StoreSubmissionsRepository
conn = get_system_db()
ids = []
subs = StoreSubmissionsRepository(conn)
for u_id, u_email, type_, name, version, status in submitters:
ids.append(subs.create(
submitter_id=u_id, submitter_email=u_email,
type=type_, name=name, version=version, status=status,
entity_id=f"ent-{u_id}-{name}",
))
conn.close()
return ids
class TestAdminListFilters:
def test_filter_by_submitter(self, web_client):
u1, _ = _create_user(web_client, "alice@x.com")
u2, _ = _create_user(web_client, "bob@x.com")
_seed_submissions([
(u1, "alice@x.com", "skill", "thing", "1.0", "approved"),
(u1, "alice@x.com", "skill", "other", "1.0", "blocked_llm"),
(u2, "bob@x.com", "skill", "third", "1.0", "approved"),
])
_, admin_cookies = _create_admin(web_client)
r = web_client.get(f"/api/admin/store/submissions?submitter={u1}", cookies=admin_cookies)
assert r.status_code == 200
assert r.json()["total"] == 2
def test_filter_by_type(self, web_client):
u, _ = _create_user(web_client, "u@x.com")
_seed_submissions([
(u, "u@x.com", "skill", "a", "1.0", "approved"),
(u, "u@x.com", "agent", "b", "1.0", "approved"),
(u, "u@x.com", "plugin", "c", "1.0", "approved"),
])
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/api/admin/store/submissions?type=agent", cookies=admin_cookies)
assert r.status_code == 200
assert r.json()["total"] == 1
assert r.json()["items"][0]["type"] == "agent"
def test_invalid_type_400(self, web_client):
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/api/admin/store/submissions?type=bogus", cookies=admin_cookies)
assert r.status_code == 400
def test_filter_by_name_substring(self, web_client):
u, _ = _create_user(web_client, "u@x.com")
_seed_submissions([
(u, "u@x.com", "skill", "summarizer-pro", "1.0", "approved"),
(u, "u@x.com", "skill", "summarizer-alpha", "1.0", "approved"),
(u, "u@x.com", "skill", "totally-other", "1.0", "approved"),
])
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/api/admin/store/submissions?name=summarizer", cookies=admin_cookies)
assert r.status_code == 200
assert r.json()["total"] == 2
def test_filter_by_version_substring(self, web_client):
u, _ = _create_user(web_client, "u@x.com")
_seed_submissions([
(u, "u@x.com", "skill", "a", "1.0.0", "approved"),
(u, "u@x.com", "skill", "b", "1.0.1", "approved"),
(u, "u@x.com", "skill", "c", "2.0.0", "approved"),
])
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/api/admin/store/submissions?version=1.0", cookies=admin_cookies)
assert r.status_code == 200
assert r.json()["total"] == 2
def test_combined_filters(self, web_client):
u1, _ = _create_user(web_client, "alice@x.com")
u2, _ = _create_user(web_client, "bob@x.com")
_seed_submissions([
(u1, "alice@x.com", "skill", "x", "1.0", "blocked_llm"),
(u1, "alice@x.com", "agent", "y", "1.0", "blocked_llm"),
(u2, "bob@x.com", "skill", "z", "1.0", "blocked_llm"),
])
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
f"/api/admin/store/submissions?submitter={u1}&type=skill",
cookies=admin_cookies,
)
assert r.json()["total"] == 1
class TestAdminListPagination:
def test_skip_limit_and_total(self, web_client):
u, _ = _create_user(web_client, "u@x.com")
_seed_submissions([
(u, "u@x.com", "skill", f"thing-{i}", "1.0", "approved")
for i in range(7)
])
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/api/admin/store/submissions?limit=3&skip=0", cookies=admin_cookies)
b = r.json()
assert b["total"] == 7 and len(b["items"]) == 3
r = web_client.get("/api/admin/store/submissions?limit=3&skip=3", cookies=admin_cookies)
b = r.json()
assert b["total"] == 7 and len(b["items"]) == 3
r = web_client.get("/api/admin/store/submissions?limit=3&skip=6", cookies=admin_cookies)
b = r.json()
assert b["total"] == 7 and len(b["items"]) == 1
def test_limit_clamped(self, web_client):
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/api/admin/store/submissions?limit=999999", cookies=admin_cookies)
# Endpoint clamps to 500. No assertion on content; smoke test.
assert r.status_code == 200
assert r.json()["limit"] == 500
# ---------------------------------------------------------------------------
# Detail page
# ---------------------------------------------------------------------------
class TestAdminDetailPage:
def test_detail_renders_for_existing_submission(self, web_client):
u, _ = _create_user(web_client, "u@x.com")
ids = _seed_submissions([
(u, "u@x.com", "skill", "thing", "1.0", "approved"),
])
_, admin_cookies = _create_admin(web_client)
r = web_client.get(f"/admin/store/submissions/{ids[0]}", cookies=admin_cookies)
assert r.status_code == 200
body = r.text
assert ids[0] in body
assert "Back to all submissions" in body
def test_detail_404_on_missing(self, web_client):
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/admin/store/submissions/does-not-exist", cookies=admin_cookies)
assert r.status_code == 404
def test_detail_non_admin_forbidden(self, web_client):
u, user_cookies = _create_user(web_client, "u@x.com")
ids = _seed_submissions([
(u, "u@x.com", "skill", "thing", "1.0", "approved"),
])
r = web_client.get(f"/admin/store/submissions/{ids[0]}", cookies=user_cookies)
assert r.status_code in (302, 401, 403)
# ---------------------------------------------------------------------------
# Rescan
# ---------------------------------------------------------------------------
def _stage_entity_with_bundle(tmp_root, owner_id, name, body=None):
"""Create a real on-disk plugin tree under DATA_DIR/store/<entity_id>/plugin
so the rescan endpoint sees a bundle to scan."""
from pathlib import Path
from src.repositories.store_entities import StoreEntitiesRepository
import uuid
entity_id = uuid.uuid4().hex
plugin_dir = Path(tmp_root) / "store" / entity_id / "plugin" / "skills" / name
plugin_dir.mkdir(parents=True, exist_ok=True)
(plugin_dir / "SKILL.md").write_text(
body or (
"---\nname: " + name
+ "\ndescription: Use when staging a reference clean bundle so the admin rescan flow can re-run inline checks against it\n---\n\n"
+ ("Long body to satisfy quality and content guardrail thresholds. " * 8)
),
encoding="utf-8",
)
conn = get_system_db()
StoreEntitiesRepository(conn).create(
id=entity_id, owner_user_id=owner_id, owner_username=owner_id,
type="skill", name=name,
description="Use when staging an entity row so admin rescan can re-evaluate the on-disk bundle against the inline guardrail tier",
category=None,
version="1.0.0", file_size=10, visibility_status="approved",
)
conn.close()
return entity_id
class TestAdminRescan:
def test_rescan_clean_bundle_pending_llm(self, web_client, tmp_path):
from src.repositories.store_submissions import StoreSubmissionsRepository
u, _ = _create_user(web_client, "u@x.com")
eid = _stage_entity_with_bundle(tmp_path, u, "rescan-clean")
conn = get_system_db()
sid = StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="rescan-clean", version="1.0.0",
status="approved", entity_id=eid,
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.post(
f"/api/admin/store/submissions/{sid}/rescan", cookies=admin_cookies,
)
assert r.status_code == 200, r.text
# No ANTHROPIC_API_KEY in test env → guardrails disabled → auto-approved.
assert r.json()["status"] in {"pending_llm", "approved"}
conn = get_system_db()
sub = StoreSubmissionsRepository(conn).get(sid)
assert "retry_count" not in sub # v34: column dropped
conn.close()
def test_rescan_dirty_bundle_blocks_inline(self, web_client, tmp_path):
"""A bundle that introduces a static-security violation since the
original review must rescan to blocked_inline."""
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
from pathlib import Path
u, _ = _create_user(web_client, "u@x.com")
eid = _stage_entity_with_bundle(tmp_path, u, "rescan-dirty")
# Inject a bash-eval script — re-rescan must catch it.
bad = Path(tmp_path) / "store" / eid / "plugin" / "skills" / "rescan-dirty" / "run.sh"
bad.write_text("#!/bin/sh\neval $1\n", encoding="utf-8")
conn = get_system_db()
sid = StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="rescan-dirty", version="1.0.0",
status="approved", entity_id=eid,
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.post(
f"/api/admin/store/submissions/{sid}/rescan", cookies=admin_cookies,
)
assert r.status_code == 200, r.text
assert r.json()["status"] == "blocked_inline"
conn = get_system_db()
ent = StoreEntitiesRepository(conn).get(eid)
assert ent["visibility_status"] == "hidden"
sub = StoreSubmissionsRepository(conn).get(sid)
assert sub["status"] == "blocked_inline"
# Static-security finding from the new bash-eval is captured.
ic = sub["inline_checks"]
assert ic["static_security"]["status"] == "fail"
assert any(f["category"] == "code_exec" for f in ic["static_security"]["findings"])
conn.close()
def test_rescan_without_entity_409(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
u, _ = _create_user(web_client, "u@x.com")
conn = get_system_db()
sid = StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="x", version="1.0.0",
status="blocked_inline", entity_id=None,
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.post(
f"/api/admin/store/submissions/{sid}/rescan", cookies=admin_cookies,
)
assert r.status_code == 409
def test_rescan_missing_bundle_410(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
u, _ = _create_user(web_client, "u@x.com")
conn = get_system_db()
sid = StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="x", version="1.0.0",
status="approved", entity_id="missing-eid",
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.post(
f"/api/admin/store/submissions/{sid}/rescan", cookies=admin_cookies,
)
assert r.status_code == 410
def test_rescan_non_admin_forbidden(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
u, user_cookies = _create_user(web_client, "u@x.com")
conn = get_system_db()
sid = StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="x", version="1.0.0", status="approved",
)
conn.close()
r = web_client.post(
f"/api/admin/store/submissions/{sid}/rescan", cookies=user_cookies,
)
assert r.status_code == 403
# ---------------------------------------------------------------------------
# v30: Download bundle, Sort by size, Quota
# ---------------------------------------------------------------------------
class TestAdminBundleDownload:
def test_download_returns_zip(self, web_client):
"""Live blocked-LLM bundle is downloadable as a fresh ZIP. Inline
rejections no longer persist bundles, so this exercises the LLM
path via the seed helper."""
user_id, _ = _create_user(web_client, "u@x.com")
_entity_id, sub_id = _seed_quarantined_entity(
user_id, "u@x.com", skill_name="dl",
)
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
f"/api/admin/store/submissions/{sub_id}/bundle.zip",
cookies=admin_cookies,
)
assert r.status_code == 200
assert r.headers["content-type"] == "application/zip"
assert "attachment" in r.headers["content-disposition"]
with zipfile.ZipFile(io.BytesIO(r.content)) as zf:
assert any("SKILL.md" in n for n in zf.namelist())
assert any("run.sh" in n for n in zf.namelist())
def test_download_410_after_purge(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
u, _ = _create_user(web_client, "u@x.com")
conn = get_system_db()
sub_id = StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="x", version="1.0.0",
status="blocked_inline", entity_id=None,
)
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
f"/api/admin/store/submissions/{sub_id}/bundle.zip",
cookies=admin_cookies,
)
assert r.status_code == 410
def test_download_non_admin_forbidden(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
u, user_cookies = _create_user(web_client, "u@x.com")
conn = get_system_db()
sub_id = StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="x", version="1.0.0",
status="approved", entity_id="some-id",
)
conn.close()
r = web_client.get(
f"/api/admin/store/submissions/{sub_id}/bundle.zip",
cookies=user_cookies,
)
assert r.status_code == 403
class TestAdminSortBySize:
def test_sort_file_size_asc_desc(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
u, _ = _create_user(web_client, "u@x.com")
conn = get_system_db()
StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="big", version="1", status="approved",
file_size=10000,
)
StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="med", version="1", status="approved",
file_size=5000,
)
StoreSubmissionsRepository(conn).create(
submitter_id=u, submitter_email="u@x.com",
type="skill", name="tiny", version="1", status="approved",
file_size=100,
)
conn.close()
# Admin endpoint passes sort/order through to the repo whitelist
# (#23). Confirm both directions via the API.
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
"/api/admin/store/submissions?sort=file_size&order=asc",
cookies=admin_cookies,
)
assert r.status_code == 200
names = [i["name"] for i in r.json()["items"]]
assert names == ["tiny", "med", "big"], names
r = web_client.get(
"/api/admin/store/submissions?sort=file_size&order=desc",
cookies=admin_cookies,
)
assert r.status_code == 200
names = [i["name"] for i in r.json()["items"]]
assert names == ["big", "med", "tiny"], names
def test_invalid_sort_key_400(self, web_client):
"""#23 — sort whitelist rejects bogus keys at the API edge.
Pre-fix, an unknown key fell through to a substring-replace
chain that could surface 500s; now it's a clean 400."""
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
"/api/admin/store/submissions?sort=injected__column",
cookies=admin_cookies,
)
assert r.status_code == 400, r.text
assert "invalid_sort_key" in r.text
class TestQuota:
def test_quota_blocks_after_threshold(self, web_client, monkeypatch):
"""Quota gate triggers on the LLM-tier reject count. Inline
failures no longer create rows, so the quota is seeded via
the repo (mimicking two prior blocked_llm verdicts in the
last 24h). The third upload is gated upstream by 429."""
from app import instance_config as ic
from src.repositories.store_submissions import StoreSubmissionsRepository
monkeypatch.setattr(ic, "get_guardrails_blocked_quota_per_day", lambda: 2)
user_id, user_cookies = _create_user(web_client, "spammer@x.com")
conn = get_system_db()
repo = StoreSubmissionsRepository(conn)
for i in range(2):
repo.create(
submitter_id=user_id, submitter_email="spammer@x.com",
type="skill", name=f"seed-{i}", version="1.0.0",
status="blocked_llm", entity_id=None,
)
conn.close()
# Third upload — any clean ZIP would do; expect 429 before the
# guardrail pipeline runs.
r = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip("clean-after-quota"), "application/zip")},
data={"type": "skill"}, cookies=user_cookies,
)
assert r.status_code == 429, r.text
body = r.json()["detail"]
assert body["code"] == "quota_exceeded"
assert body["limit"] == 2
def test_quota_disabled_with_zero(self, web_client, monkeypatch):
"""quota=0 disables the gate entirely. Seed many blocked_llm
rows; clean uploads still succeed."""
from app import instance_config as ic
from src.repositories.store_submissions import StoreSubmissionsRepository
monkeypatch.setattr(ic, "get_guardrails_blocked_quota_per_day", lambda: 0)
user_id, user_cookies = _create_user(web_client, "trusted@x.com")
conn = get_system_db()
for i in range(5):
StoreSubmissionsRepository(conn).create(
submitter_id=user_id, submitter_email="trusted@x.com",
type="skill", name=f"history-{i}", version="1.0.0",
status="blocked_llm", entity_id=None,
)
conn.close()
r = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip("clean-zero-quota"), "application/zip")},
data={"type": "skill"}, cookies=user_cookies,
)
# Clean upload — passes inline guardrails. With ANTHROPIC_API_KEY
# absent in tests the guardrail pipeline auto-disables so the
# entity lands at ``approved`` (201). Assert the success codes
# explicitly so a 500 from an unrelated regression doesn't
# masquerade as quota-disabled.
assert r.status_code in (200, 201), r.text
def test_quota_counter_includes_blocked_llm_and_review_error(self, web_client):
"""The counter narrows to ``blocked_llm`` + ``review_error`` —
inline failures no longer create rows. Legacy ``blocked_inline``
rows from pre-cutover instances are intentionally excluded
(kept in DB as historical audit, not counted toward the live
quota)."""
from datetime import datetime, timezone, timedelta
from src.repositories.store_submissions import StoreSubmissionsRepository
_, user_cookies = _create_user(web_client, "spammer-9@x.com")
conn = get_system_db()
repo = StoreSubmissionsRepository(conn)
for i, status in enumerate(("blocked_inline", "blocked_llm", "review_error")):
repo.create(
submitter_id="spammer-9", submitter_email="spammer-9@x.com",
type="skill", name=f"q9-{i}", version="1.0.0",
status=status, entity_id=None,
inline_checks={"manifest": {"status": "fail"}},
)
since = datetime.now(timezone.utc) - timedelta(hours=24)
count = repo.count_blocked_for_submitter_since("spammer-9", since)
conn.close()
assert count == 2, (
f"counter must skip legacy blocked_inline; got {count}"
)
# ---------------------------------------------------------------------------
# v32+ quarantine semantics
# ---------------------------------------------------------------------------
class TestQuarantineGates:
def test_owner_cannot_delete_quarantined(self, web_client):
"""Owner trying to DELETE their own quarantined (blocked_llm)
entity must be refused — admin investigates first."""
user_id, user_cookies = _create_user(web_client, "u@x.com")
entity_id, _sub_id = _seed_quarantined_entity(user_id, "u@x.com", "q1")
r = web_client.delete(
f"/api/store/entities/{entity_id}", cookies=user_cookies,
)
assert r.status_code == 403
body = r.json()["detail"]
assert body["code"] == "quarantined_owner_cannot_delete"
def test_admin_can_delete_quarantined(self, web_client):
user_id, _ = _create_user(web_client, "u@x.com")
entity_id, _sub_id = _seed_quarantined_entity(user_id, "u@x.com", "q2")
_, admin_cookies = _create_admin(web_client)
r = web_client.delete(
f"/api/store/entities/{entity_id}", cookies=admin_cookies,
)
assert r.status_code == 200, r.text
def test_non_owner_non_admin_cannot_view_quarantined(self, web_client):
"""Random user navigating to ANY per-entity asset endpoint gets
404 — same as if the entity didn't exist (no leak via 403).
Covers every ``_enforce_visibility`` caller in app/api/store.py
+ the marketplace flea detail."""
owner_id, _ = _create_user(web_client, "owner@x.com")
entity_id, _sub_id = _seed_quarantined_entity(owner_id, "owner@x.com", "q3")
_, intruder_cookies = _create_user(web_client, "snoop@x.com")
r = web_client.get(
f"/api/store/entities/{entity_id}", cookies=intruder_cookies,
)
assert r.status_code == 404, "detail must 404 for non-owner"
r = web_client.get(
f"/api/store/entities/{entity_id}/files", cookies=intruder_cookies,
)
assert r.status_code == 404, "files must 404 for non-owner"
r = web_client.get(
f"/api/store/entities/{entity_id}/photo", cookies=intruder_cookies,
)
assert r.status_code == 404, "photo must 404 for non-owner"
r = web_client.get(
f"/api/store/entities/{entity_id}/docs/anything.md",
cookies=intruder_cookies,
)
assert r.status_code == 404, "docs must 404 for non-owner"
def test_quarantined_entity_excluded_from_store_entities_list(self, web_client):
"""Random non-owner non-admin hitting the public flea-listing
(`/api/store/entities`) must NOT see another user's quarantined
entry. Mirrors the marketplace-items coverage but on the
store-namespaced listing."""
owner_id, owner_cookies = _create_user(web_client, "qowner@x.com")
entity_id, _sub_id = _seed_quarantined_entity(
owner_id, "qowner@x.com", "q-list",
)
_, intruder_cookies = _create_user(web_client, "qsnoop@x.com")
r = web_client.get("/api/store/entities", cookies=intruder_cookies)
assert r.status_code == 200
ids = {it["id"] for it in r.json().get("items", [])}
assert entity_id not in ids, (
"non-owner non-admin saw another user's quarantined entity "
"in /api/store/entities listing"
)
r = web_client.get("/api/store/entities", cookies=owner_cookies)
owner_ids = {it["id"] for it in r.json().get("items", [])}
assert entity_id in owner_ids, (
"owner should see own quarantined entity in their listing"
)
def test_owner_can_view_their_quarantined_entity(self, web_client):
owner_id, owner_cookies = _create_user(web_client, "owner@x.com")
entity_id, _sub_id = _seed_quarantined_entity(owner_id, "owner@x.com", "q4")
r = web_client.get(
f"/api/store/entities/{entity_id}", cookies=owner_cookies,
)
assert r.status_code == 200
def test_install_quarantined_refused_for_non_admin(self, web_client):
"""Even owner cannot add their own quarantined item to my-stack."""
owner_id, owner_cookies = _create_user(web_client, "owner@x.com")
entity_id, _sub_id = _seed_quarantined_entity(owner_id, "owner@x.com", "q5")
r = web_client.post(
f"/api/store/entities/{entity_id}/install", cookies=owner_cookies,
)
assert r.status_code == 409
assert r.json()["detail"] == "entity_not_approved"
# ---------------------------------------------------------------------------
# v32+ marketplace consolidation: visibility gates + owner-visible cards
# ---------------------------------------------------------------------------
class TestMarketplaceFleaConsolidation:
def test_marketplace_flea_detail_404_for_non_owner_quarantined(self, web_client):
"""Random non-owner non-admin pasting an entity_id into
/marketplace/flea/{id} gets 404 — same policy as the now-deleted
/store/{id}."""
owner_id, _ = _create_user(web_client, "owner@x.com")
eid, _sub_id = _seed_quarantined_entity(owner_id, "owner@x.com", "c1")
_, intruder_cookies = _create_user(web_client, "snoop@x.com")
r = web_client.get(f"/marketplace/flea/{eid}", cookies=intruder_cookies)
assert r.status_code == 404
r = web_client.get(f"/api/marketplace/flea/{eid}/detail", cookies=intruder_cookies)
assert r.status_code == 404
def test_marketplace_flea_detail_owner_sees_quarantine_banner(self, web_client):
"""Owner landing on /marketplace/flea/{id} sees the quarantine
banner with the failure summary AND the actual finding details
— not just a generic "Quarantined" header."""
owner_id, owner_cookies = _create_user(web_client, "owner@x.com")
eid, _sub_id = _seed_quarantined_entity(
owner_id, "owner@x.com", "c2",
llm_summary="reviewer flagged the bash eval",
static_findings=[
{"file": "run.sh", "line": 2, "severity": "high",
"category": "code_exec",
"reason": "shell eval expanding a variable",
"explanation": "shell eval expanding a variable",
"snippet": "eval $1"},
],
)
r = web_client.get(f"/marketplace/flea/{eid}", cookies=owner_cookies)
assert r.status_code == 200
body = r.text
assert "vis-banner" in body
assert "Quarantined" in body
# blocked_llm path renders the LLM verdict summary + per-finding
# list. Banner must surface BOTH so the submitter knows WHY
# without having to ping an admin.
assert "Security findings" in body, (
"banner missing 'Security findings' section"
)
assert "run.sh" in body, "banner missing path of offending file"
assert "shell eval" in body, "banner missing reviewer summary"
def test_review_error_banner_shows_error_detail(self, web_client):
"""#review_error — banner must surface the underlying error
message + any inline_checks the runner captured before bailing.
Pre-fix the banner only said 'couldn't complete its check' with
no actionable detail."""
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.store_submissions import StoreSubmissionsRepository
owner_id, owner_cookies = _create_user(web_client, "rev-err@x.com")
# Stage entity + submission directly so we can land in review_error.
conn = get_system_db()
StoreEntitiesRepository(conn).create(
id="ent-rev-err", owner_user_id=owner_id, owner_username="rev-err",
type="skill", name="rev-err", description="Test review_error banner",
category=None, version="1.0.0", file_size=10,
visibility_status="hidden",
)
StoreSubmissionsRepository(conn).create(
submitter_id=owner_id, submitter_email="rev-err@x.com",
type="skill", name="rev-err", version="1.0.0",
status="review_error", entity_id="ent-rev-err",
inline_checks={
"manifest": {"status": "pass", "issues": []},
"static_security": {"status": "pass", "findings": []},
"quality": {"status": "pass", "issues": [],
"template_placeholders": 0},
},
llm_findings={
"risk_level": None, "summary": None, "findings": [],
"template_placeholders_found": 0,
"reviewed_by_model": None,
"error": "LLMTimeoutError: Anthropic connection error",
},
)
conn.close()
r = web_client.get(
"/marketplace/flea/ent-rev-err", cookies=owner_cookies,
)
assert r.status_code == 200
assert "vis-banner" in r.text
assert "errored" in r.text or "Under review" in r.text
# The actionable error detail must surface.
assert "LLMTimeoutError" in r.text, (
"review_error banner must surface llm_findings.error"
)
def test_legacy_store_detail_url_returns_404(self, web_client):
"""The /store/{id} route was deleted in v32+. Stale bookmarks 404."""
_, owner_cookies = _create_user(web_client, "u@x.com")
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip("legacy"), "application/zip")},
data={"type": "skill"}, cookies=owner_cookies,
)
eid = c.json()["id"]
r = web_client.get(f"/store/{eid}", cookies=owner_cookies)
assert r.status_code == 404
def test_marketplace_listing_includes_owner_quarantined(self, web_client):
"""Submitter sees their own non-approved entries in the
/api/marketplace/items?tab=flea grid; non-owner does not."""
owner_id, owner_cookies = _create_user(web_client, "owner@x.com")
eid, _sub_id = _seed_quarantined_entity(owner_id, "owner@x.com", "c4")
# Owner — their own quarantined card surfaces with is_viewer_owner=True.
r = web_client.get("/api/marketplace/items?tab=flea", cookies=owner_cookies)
assert r.status_code == 200
items = r.json()["items"]
own = [it for it in items if it["id"] == f"flea-{eid}"]
assert own, f"owner should see own quarantined item; got {[it['id'] for it in items]}"
assert own[0]["visibility_status"] != "approved"
assert own[0]["is_viewer_owner"] is True
# Non-owner — same listing must NOT surface the quarantined entry.
_, snoop_cookies = _create_user(web_client, "snoop@x.com")
r = web_client.get("/api/marketplace/items?tab=flea", cookies=snoop_cookies)
assert r.status_code == 200
items = r.json()["items"]
snoop = [it for it in items if it["id"] == f"flea-{eid}"]
assert not snoop, "non-owner must not see another user's quarantined item"
# ---------------------------------------------------------------------------
# v35 archive (soft delete) semantics
# ---------------------------------------------------------------------------
class TestArchiveSoftDelete:
def _upload_clean(self, web_client, cookies, name="clean1"):
"""Helper: upload a clean skill that lands as approved (no API key
in test env -> auto-approve via guardrails-disabled fallback)."""
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip(name), "application/zip")},
data={
"type": "skill",
"description": (
"Use when verifying lifecycle and admin flows over a "
"clean reference bundle that passes every guardrail tier"
),
},
cookies=cookies,
)
assert c.status_code == 201, c.text
return c.json()["id"]
def test_archive_frees_name_for_reupload(self, web_client):
"""v36 rename-on-archive: owner uploads `myskill`, archives,
re-uploads `myskill` → 201. Both rows exist; archived row's
name carries the `__archived__<epoch>` marker."""
from src.repositories.store_entities import StoreEntitiesRepository
from src.store_naming import is_archived_name
_, owner_cookies = _create_user(web_client, "rename-archive@x.com")
eid_v1 = self._upload_clean(web_client, owner_cookies, name="myskill")
# Archive v1.
r = web_client.delete(
f"/api/store/entities/{eid_v1}", cookies=owner_cookies,
)
assert r.status_code == 200, r.text
# Re-upload under the original name — must succeed.
eid_v2 = self._upload_clean(web_client, owner_cookies, name="myskill")
assert eid_v2 != eid_v1, "re-upload should produce a new entity id"
conn = get_system_db()
repo = StoreEntitiesRepository(conn)
v1 = repo.get(eid_v1)
v2 = repo.get(eid_v2)
conn.close()
assert v1["visibility_status"] == "archived"
assert is_archived_name(v1["name"]), (
f"archived row name must carry the rename suffix; got {v1['name']!r}"
)
assert v2["visibility_status"] in ("approved", "pending")
assert v2["name"] == "myskill", (
f"new row should keep plain name; got {v2['name']!r}"
)
def test_archive_renames_baked_skill_dir_on_disk(self, web_client):
"""The on-disk `skills/<old_suffix>/` directory is renamed to
`skills/<new_suffix>/` and SKILL.md frontmatter is rewritten
in lockstep so consumers' Claude Code resolves the new slug."""
from app.utils import get_store_dir
from src.repositories.store_entities import StoreEntitiesRepository
from src.store_naming import suffixed_name
owner_id, owner_cookies = _create_user(web_client, "disk-rename@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="diskskill")
old_suffix = suffixed_name("diskskill", "disk-rename")
old_dir = Path(get_store_dir()) / eid / "plugin" / "skills" / old_suffix
assert old_dir.is_dir(), f"pre-archive: missing {old_dir}"
web_client.delete(
f"/api/store/entities/{eid}", cookies=owner_cookies,
)
conn = get_system_db()
new_name = StoreEntitiesRepository(conn).get(eid)["name"]
conn.close()
new_suffix = suffixed_name(new_name, "disk-rename")
new_dir = Path(get_store_dir()) / eid / "plugin" / "skills" / new_suffix
assert new_dir.is_dir(), f"post-archive: missing {new_dir}"
assert not old_dir.exists(), (
f"old slug dir must be gone post-archive; still found {old_dir}"
)
# Frontmatter rewritten to new suffix.
skill_md = (new_dir / "SKILL.md").read_text(encoding="utf-8")
assert f"name: {new_suffix}" in skill_md, (
f"SKILL.md frontmatter not updated; got:\n{skill_md[:200]}"
)
def test_un_archive_strips_suffix_back_to_original(self, web_client):
"""Admin un-archive (set_visibility('approved') from 'archived')
strips the `__archived__\\d+$` suffix and restores the original
name + clears archive metadata."""
from src.repositories.store_entities import StoreEntitiesRepository
from src.store_naming import is_archived_name
owner_id, owner_cookies = _create_user(web_client, "unarch@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="back")
web_client.delete(f"/api/store/entities/{eid}", cookies=owner_cookies)
conn = get_system_db()
repo = StoreEntitiesRepository(conn)
assert is_archived_name(repo.get(eid)["name"])
repo.set_visibility(eid, "approved")
row = repo.get(eid)
conn.close()
assert row["name"] == "back"
assert row["visibility_status"] == "approved"
assert row["archived_at"] is None
assert row["archived_by"] is None
def test_un_archive_when_name_taken_appends_restored_suffix(self, web_client):
"""Owner archives `taken`, re-uploads `taken` (new entity), admin
un-archives the original. Original entity gets
`taken-restored-1` since the slot is occupied."""
from src.repositories.store_entities import StoreEntitiesRepository
owner_id, owner_cookies = _create_user(web_client, "conflict-arch@x.com")
eid_v1 = self._upload_clean(web_client, owner_cookies, name="taken")
web_client.delete(f"/api/store/entities/{eid_v1}", cookies=owner_cookies)
# Re-upload takes the slot.
self._upload_clean(web_client, owner_cookies, name="taken")
conn = get_system_db()
repo = StoreEntitiesRepository(conn)
repo.set_visibility(eid_v1, "approved")
row = repo.get(eid_v1)
conn.close()
assert row["name"] == "taken-restored-1", (
f"un-archive into taken slot must append -restored-1; "
f"got {row['name']!r}"
)
def test_active_same_name_still_409(self, web_client):
"""Regression: when the prior entity is NOT archived, the
same-name 409 still fires."""
_, owner_cookies = _create_user(web_client, "still409@x.com")
self._upload_clean(web_client, owner_cookies, name="active")
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip("active"), "application/zip")},
data={"type": "skill"}, cookies=owner_cookies,
)
assert c.status_code == 409, c.text
assert c.json()["detail"] == "conflict_owner_name"
def test_admin_queue_strips_archive_suffix_for_display(self, web_client):
"""Admin queue renders the original name (not the suffixed one)
in the row's name cell so admins don't see ugly markers."""
owner_id, owner_cookies = _create_user(web_client, "displaystrip@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="displaytest")
web_client.delete(f"/api/store/entities/{eid}", cookies=owner_cookies)
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
"/admin/store/submissions?status=archived", cookies=admin_cookies,
)
assert r.status_code == 200
body = r.text
assert "displaytest" in body
assert "__archived__" not in body, (
"admin queue must strip the archive-rename suffix for display"
)
def test_owner_can_archive_approved_entity(self, web_client):
"""DELETE without ?hard=true on owner's approved entity = soft archive.
Bundle stays on disk; existing installs preserved."""
from src.repositories.store_entities import StoreEntitiesRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="arch-1")
# Pre-archive sanity.
conn = get_system_db()
ent = StoreEntitiesRepository(conn).get(eid)
assert ent["visibility_status"] == "approved"
conn.close()
r = web_client.delete(f"/api/store/entities/{eid}", cookies=owner_cookies)
assert r.status_code == 200, r.text
conn = get_system_db()
ent = StoreEntitiesRepository(conn).get(eid)
assert ent is not None # row preserved
assert ent["visibility_status"] == "archived"
assert ent["archived_at"] is not None
assert ent["archived_by"] == "owner"
# Bundle dir still on disk.
from app.utils import get_store_dir
assert (get_store_dir() / eid).exists()
conn.close()
def test_owner_hard_delete_refused(self, web_client):
"""Owner cannot pass ?hard=true — admin-only path."""
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="arch-2")
r = web_client.delete(f"/api/store/entities/{eid}?hard=true", cookies=owner_cookies)
assert r.status_code == 403
assert r.json()["detail"]["code"] == "hard_delete_admin_only"
def test_admin_can_hard_delete(self, web_client):
from src.repositories.store_entities import StoreEntitiesRepository
from app.utils import get_store_dir
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="arch-3")
bundle_dir = get_store_dir() / eid
assert bundle_dir.exists()
_, admin_cookies = _create_admin(web_client)
r = web_client.delete(f"/api/store/entities/{eid}?hard=true", cookies=admin_cookies)
assert r.status_code == 200, r.text
conn = get_system_db()
assert StoreEntitiesRepository(conn).get(eid) is None
conn.close()
assert not bundle_dir.exists()
def test_archived_excluded_from_marketplace_listing(self, web_client):
"""Approved → archived: every browse listing hides it (even owner)."""
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="arch-4")
web_client.delete(f"/api/store/entities/{eid}", cookies=owner_cookies)
# Owner — own archived must NOT surface in the public-style listing.
r = web_client.get("/api/marketplace/items?tab=flea", cookies=owner_cookies)
assert r.status_code == 200
ids = {it["id"] for it in r.json()["items"]}
assert f"flea-{eid}" not in ids
def test_install_refused_on_archived(self, web_client):
"""Archived entities can't be added to my stack."""
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="arch-5")
web_client.delete(f"/api/store/entities/{eid}", cookies=owner_cookies)
_, other_cookies = _create_user(web_client, "other@x.com")
r = web_client.post(
f"/api/store/entities/{eid}/install", cookies=other_cookies,
)
assert r.status_code == 409
assert r.json()["detail"] == "entity_not_approved"
def test_archived_still_serves_existing_installs(self, web_client):
"""Pre-existing user_store_installs keep getting the entity in
list_for_user even after archive (drives marketplace.zip serve)."""
from src.repositories.store_entities import StoreEntitiesRepository
from src.repositories.user_store_installs import UserStoreInstallsRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="arch-6")
# Other user installs while approved.
other_id, other_cookies = _create_user(web_client, "other@x.com")
web_client.post(f"/api/store/entities/{eid}/install", cookies=other_cookies)
# Owner archives.
web_client.delete(f"/api/store/entities/{eid}", cookies=owner_cookies)
# Other user's stack still has it.
conn = get_system_db()
installs = UserStoreInstallsRepository(conn).list_for_user(other_id)
ids = {r["id"] for r in installs}
assert eid in ids, "archived entity must still serve to existing installs"
# And carries archived flag for the badge.
row = next(r for r in installs if r["id"] == eid)
assert row["visibility_status"] == "archived"
conn.close()
def test_owner_cannot_archive_quarantined(self, web_client):
"""Owner Delete on quarantined still refused (existing v32 policy)."""
user_id, user_cookies = _create_user(web_client, "u@x.com")
eid, _sub_id = _seed_quarantined_entity(user_id, "u@x.com", "q-arch")
r = web_client.delete(f"/api/store/entities/{eid}", cookies=user_cookies)
assert r.status_code == 403
assert r.json()["detail"]["code"] == "quarantined_owner_cannot_delete"
def test_admin_can_archive_quarantined(self, web_client):
"""Admin can archive a quarantined entity (separate from override
+ hard-delete paths — admin keeps full control)."""
from src.repositories.store_entities import StoreEntitiesRepository
user_id, _ = _create_user(web_client, "u@x.com")
eid, _sub_id = _seed_quarantined_entity(user_id, "u@x.com", "q-arch2")
_, admin_cookies = _create_admin(web_client)
r = web_client.delete(f"/api/store/entities/{eid}", cookies=admin_cookies)
assert r.status_code == 200, r.text
conn = get_system_db()
ent = StoreEntitiesRepository(conn).get(eid)
assert ent["visibility_status"] == "archived"
conn.close()
def test_owners_endpoint_filters_quarantined_for_non_admin(self, web_client):
"""A user with only quarantined uploads must NOT appear in the
public /api/store/owners dropdown."""
user_id, _ = _create_user(web_client, "spammer@x.com")
_seed_quarantined_entity(user_id, "spammer@x.com", "only-bad")
_, other_cookies = _create_user(web_client, "other@x.com")
r = web_client.get("/api/store/owners", cookies=other_cookies)
assert r.status_code == 200
owner_ids = {o["user_id"] for o in r.json()}
assert "spammer" not in owner_ids
def test_categories_endpoint_filters_quarantined_for_non_owner(self, web_client):
"""`/api/marketplace/categories?tab=flea` aggregates per-category
counts. The visibility predicate is duplicated inline in
marketplace.py (drift risk against repo); this test locks the
parity with marketplace items so a future change to the repo
clause that misses the inline copy gets caught."""
owner_id, owner_cookies = _create_user(web_client, "qcat-owner@x.com")
_seed_quarantined_entity(owner_id, "qcat-owner@x.com", "qcat")
_, snoop_cookies = _create_user(web_client, "qcat-snoop@x.com")
r = web_client.get(
"/api/marketplace/categories?tab=flea", cookies=snoop_cookies,
)
assert r.status_code == 200, r.text
body = r.json()
total = sum(c.get("count", 0) for c in body.get("items", []))
assert total == 0, (
"non-owner saw quarantined entry counted in /categories: "
f"{body}"
)
r = web_client.get(
"/api/marketplace/categories?tab=flea", cookies=owner_cookies,
)
body = r.json()
owner_total = sum(c.get("count", 0) for c in body.get("items", []))
assert owner_total >= 1, (
"owner should count own quarantined entry in /categories: "
f"{body}"
)
# ---------------------------------------------------------------------------
# v35 lifecycle marking on submissions (archived / deleted)
# ---------------------------------------------------------------------------
class TestSubmissionLifecycleMarking:
def _upload_clean(self, web_client, cookies, name="lc"):
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip(name), "application/zip")},
data={"type": "skill"}, cookies=cookies,
)
assert c.status_code == 201, c.text
return c.json()["id"]
def test_archive_surfaces_via_entity_visibility_join(self, web_client):
"""Verdict on submission stays immutable; archived chip surfaces the row
because store_entities.visibility_status flipped to 'archived'.
Locks in the JOIN-based architecture replacing the prior denormalization."""
from src.repositories.store_submissions import StoreSubmissionsRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="lc-arch")
conn = get_system_db()
sub = StoreSubmissionsRepository(conn).latest_for_entity(eid)
sid = sub["id"]
assert sub["status"] == "approved"
conn.close()
web_client.delete(f"/api/store/entities/{eid}", cookies=owner_cookies)
# Verdict UNCHANGED — that's the whole point: submission.status is the
# forensic record of what was decided at review time, not lifecycle.
conn = get_system_db()
sub = StoreSubmissionsRepository(conn).get(sid)
assert sub["status"] == "approved"
conn.close()
# But the JOIN-based archived chip surfaces it because the entity's
# visibility_status flipped.
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
"/api/admin/store/submissions?status=archived", cookies=admin_cookies,
)
assert r.status_code == 200
names = {s["name"] for s in r.json()["items"]}
assert "lc-arch" in names
def test_archived_chip_surfaces_entity_archived_outside_delete_flow(self, web_client):
"""Regression for the user-reported bug: archived entity didn't show up
in ?status=archived because the prior denormalized field never flipped.
Pre-seed a submission with status='approved' and manually flip the
linked entity to visibility_status='archived' (simulating any code path
that bypasses the soft-delete API), then assert the archived chip
surfaces it."""
from src.repositories.store_entities import StoreEntitiesRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="lc-bypass-archive")
# Bypass the API: flip visibility directly at the repo layer.
conn = get_system_db()
StoreEntitiesRepository(conn).set_visibility(eid, "archived")
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
"/api/admin/store/submissions?status=archived", cookies=admin_cookies,
)
assert r.status_code == 200, r.text
names = {s["name"] for s in r.json()["items"]}
assert "lc-bypass-archive" in names
# And the default queue excludes it.
r = web_client.get("/api/admin/store/submissions", cookies=admin_cookies)
names = {s["name"] for s in r.json()["items"]}
assert "lc-bypass-archive" not in names
def test_hard_delete_marks_submissions_deleted(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="lc-del")
conn = get_system_db()
sub_id = StoreSubmissionsRepository(conn).latest_for_entity(eid)["id"]
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.delete(f"/api/store/entities/{eid}?hard=true", cookies=admin_cookies)
assert r.status_code == 200, r.text
conn = get_system_db()
sub = StoreSubmissionsRepository(conn).get(sub_id)
assert sub["status"] == "deleted"
# entity_id is preserved as a tombstone — the live entity row
# is gone, but keeping the pointer lets the detail page
# resolve the activity timeline by querying audit_log for
# `store_entity:{entity_id}` even after the row is dropped.
assert sub["entity_id"] == eid
conn.close()
def test_default_listing_excludes_archived_and_deleted(self, web_client):
"""Admin queue defaults to actionable rows. Archived + deleted
only surface when the user clicks the dedicated chip."""
from src.repositories.store_submissions import StoreSubmissionsRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid_arch = self._upload_clean(web_client, owner_cookies, name="lc-d-arch")
eid_keep = self._upload_clean(web_client, owner_cookies, name="lc-d-keep")
web_client.delete(f"/api/store/entities/{eid_arch}", cookies=owner_cookies)
_, admin_cookies = _create_admin(web_client)
r = web_client.get("/api/admin/store/submissions", cookies=admin_cookies)
assert r.status_code == 200
names = {s["name"] for s in r.json()["items"]}
assert "lc-d-keep" in names
assert "lc-d-arch" not in names
# Explicit chip surfaces it.
r = web_client.get(
"/api/admin/store/submissions?status=archived", cookies=admin_cookies,
)
names = {s["name"] for s in r.json()["items"]}
assert names == {"lc-d-arch"}
def test_deleted_chip_surfaces_hard_deleted(self, web_client):
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="lc-d-hard")
_, admin_cookies = _create_admin(web_client)
web_client.delete(f"/api/store/entities/{eid}?hard=true", cookies=admin_cookies)
r = web_client.get(
"/api/admin/store/submissions?status=deleted", cookies=admin_cookies,
)
names = {s["name"] for s in r.json()["items"]}
assert "lc-d-hard" in names
def test_deleted_submission_detail_renders_timeline(self, web_client):
"""Regression: hard-deleted submissions used to lose their activity
timeline because mark_deleted_for_entity nulled entity_id, severing
the audit_log linkage. Tombstone semantics: entity_id is preserved
post-delete so `store_entity:{entity_id}` audits keep resolving."""
from src.repositories.store_submissions import StoreSubmissionsRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="lc-d-timeline")
conn = get_system_db()
sub_id = StoreSubmissionsRepository(conn).latest_for_entity(eid)["id"]
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.delete(f"/api/store/entities/{eid}?hard=true", cookies=admin_cookies)
assert r.status_code == 200, r.text
# Detail page must render and include at least one audit row
# (creation events scoped to store_entity:{eid} would otherwise
# be invisible after the entity_id link was nulled).
r = web_client.get(
f"/admin/store/submissions/{sub_id}", cookies=admin_cookies,
)
assert r.status_code == 200
# Sanity: the deleted submission's body references the original
# entity_id (the tombstone), proving the linkage survives.
assert eid in r.text
# ---------------------------------------------------------------------------
# Coverage gap fills (post-pre-PR audit)
# ---------------------------------------------------------------------------
class TestFleaDetailSubmissionStatusField:
"""The quarantine banner's auto-refresh JS polls
`/api/marketplace/flea/{id}/detail` and reloads when
`submission_status` flips off the pending verdicts. Visibility
alone is insufficient because `blocked_llm` keeps the entity at
`visibility_status='pending'`. This class locks in the contract
that the field is populated for owner/admin only."""
def _upload_clean(self, web_client, cookies, name="dst"):
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip(name), "application/zip")},
data={"type": "skill"}, cookies=cookies,
)
assert c.status_code == 201, c.text
return c.json()["id"]
def test_owner_sees_submission_status(self, web_client):
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="dst-own")
r = web_client.get(
f"/api/marketplace/flea/{eid}/detail", cookies=owner_cookies,
)
assert r.status_code == 200
body = r.json()
# Verdict landed by the time create returned 201 (clean upload skips
# LLM since fixtures don't carry a real api key) — value is whatever
# status the runner left; just assert the field is populated.
assert body.get("submission_status") is not None
def test_admin_sees_submission_status_for_any_entity(self, web_client):
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="dst-adm")
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
f"/api/marketplace/flea/{eid}/detail", cookies=admin_cookies,
)
assert r.status_code == 200
assert r.json().get("submission_status") is not None
def test_other_user_does_not_see_submission_status(self, web_client):
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="dst-other")
# Non-owner non-admin can hit the endpoint when entity is approved
# (404 otherwise per the visibility gate). Field must stay null.
_, viewer_cookies = _create_user(web_client, "viewer@x.com")
r = web_client.get(
f"/api/marketplace/flea/{eid}/detail", cookies=viewer_cookies,
)
if r.status_code == 200:
assert r.json().get("submission_status") is None
# If the entity isn't approved (404 for non-owner non-admin),
# the leak isn't reachable either way — also acceptable.
class TestDetailPageEntityLifecycleRow:
"""The submission detail page renders Status (verdict) and Entity
lifecycle side by side so admins see the verdict-vs-lifecycle
distinction at a glance. Locks in the row's presence so a future
template refactor can't silently drop it."""
def _upload_clean(self, web_client, cookies, name="elr"):
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip(name), "application/zip")},
data={"type": "skill"}, cookies=cookies,
)
assert c.status_code == 201, c.text
return c.json()["id"]
def test_detail_renders_entity_lifecycle_row(self, web_client):
from src.repositories.store_submissions import StoreSubmissionsRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="elr-1")
conn = get_system_db()
sub_id = StoreSubmissionsRepository(conn).latest_for_entity(eid)["id"]
conn.close()
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
f"/admin/store/submissions/{sub_id}", cookies=admin_cookies,
)
assert r.status_code == 200
# Both labels must coexist on the page.
assert "Status (verdict)" in r.text
assert "Entity lifecycle" in r.text
class TestAuditLogResourcePrefix:
"""Locks in the contract that submission-event audits land at
resources the activity-timeline query knows how to find. Two
paths emit them:
* `app/api/store.py:_audit` helper hardcodes `store_entity:`
prefix — submission events written this way live at
`store_entity:{sub_id}`. Timeline query covers it.
* `src/store_guardrails/runner.py` uses
`store_submission:{sub_id}` — the post-fix convention.
Timeline query covers it.
Either format must surface in the rendered detail page so admins
can audit the lifecycle of any submission."""
def _upload_clean(self, web_client, cookies, name="aud"):
c = web_client.post(
"/api/store/entities",
files={"file": ("s.zip", _make_skill_zip(name), "application/zip")},
data={"type": "skill"}, cookies=cookies,
)
assert c.status_code == 201, c.text
return c.json()["id"]
def test_helper_emitted_audits_surface_in_timeline(self, web_client):
"""The `_audit` helper writes resource=`store_entity:{sub_id}`
for submission events. Timeline query must include that
pattern so `store.submission.accepted` (or `.approved`) rows
are visible on the detail page."""
from src.repositories.store_submissions import StoreSubmissionsRepository
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="aud-helper")
conn = get_system_db()
sub_id = StoreSubmissionsRepository(conn).latest_for_entity(eid)["id"]
rows = conn.execute(
"""SELECT resource, action FROM audit_log
WHERE resource = ?
AND action LIKE 'store.submission.%'""",
[f"store_entity:{sub_id}"],
).fetchall()
conn.close()
assert rows, (
"expected at least one store.submission.* audit row at "
"resource=store_entity:{sub_id} — the helper format the "
"timeline query relies on"
)
# And the rendered timeline must include the action.
_, admin_cookies = _create_admin(web_client)
r = web_client.get(
f"/admin/store/submissions/{sub_id}", cookies=admin_cookies,
)
assert r.status_code == 200
# Either accepted (guardrails on) or approved (guardrails off);
# both appear in rows returned by the timeline query when the
# query covers the helper's `store_entity:` resource pattern.
assert any(a in r.text for a in (
"store.submission.accepted",
"store.submission.approved",
))
def test_runner_audit_uses_prefixed_resource(self, monkeypatch, web_client):
"""runner.py's audit calls must use `store_submission:{id}` —
we drive that path by faking a config-loader failure inside
run_llm_review so the runner emits a `review_error` audit
synchronously."""
from src.repositories.store_submissions import StoreSubmissionsRepository
from src.store_guardrails import runner as runner_mod
_, owner_cookies = _create_user(web_client, "owner@x.com")
eid = self._upload_clean(web_client, owner_cookies, name="aud-runner")
conn = get_system_db()
sub_id = StoreSubmissionsRepository(conn).latest_for_entity(eid)["id"]
plugin_dir = Path(get_store_dir()) / eid / "plugin"
conn.close()
def boom() -> str:
raise RuntimeError("missing api key for test")
runner_mod.run_llm_review(
submission_id=sub_id,
plugin_dir=plugin_dir,
conn_factory=get_system_db,
api_key_loader=boom,
model_loader=lambda: "claude-haiku-4-5-20251001",
)
conn = get_system_db()
rows = conn.execute(
"""SELECT resource, action FROM audit_log
WHERE resource = ?
AND action = 'store.submission.review_error'""",
[f"store_submission:{sub_id}"],
).fetchall()
conn.close()
assert rows, (
"runner.py must emit prefixed store_submission:{id} so the "
"timeline query resolves it — bare-id format is legacy only"
)