From 6fb11a137b7aa564e5999deee2b58b60eb57ce9b Mon Sep 17 00:00:00 2001 From: Vojtech <119944107+cvrysanek@users.noreply.github.com> Date: Fri, 15 May 2026 19:45:43 +0400 Subject: [PATCH] fix(store): close 1 critical + 2 high adversarial-review findings (C2/H2/H3 from #318) (#320) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(store): close 1 critical + 2 high adversarial-review findings Three findings from Codex's adversarial review of PR #316 (issue #318). C2 — `/api/store/bundle.zip` leaked quarantined entities. The export endpoint called `repo.list(...)` with no `visibility_status` filter, so any authenticated non-admin could download pending / blocked v1 bytes — bypassing the publish gate. Mirrored the browse-listing gate: non-admin sees only `approved` (plus their own non-approved entries via `include_owner_id`); admins skip the filter. H2 — concurrent PUTs on the same entity could both pass the `latest_for_entity` pending gate. The `update_entity` and `restore_version` handlers now wrap their critical section in a per-entity asyncio.Lock (`_hold_entity_write_lock`). Single-process deployments are now serialized; multi-worker deployments still have a residual window (tracked in issue #318). H3 — `StoreSubmissionsRepository.update_status` blindly overwrote any current status. A late BG-task LLM verdict could clobber an `overridden` row back to `approved` / `blocked_llm` after the admin had already force-published. Added compare-and-swap on terminal statuses (`approved`, `overridden`, `blocked_inline`); callers that legitimately need to overwrite (admin rescan etc.) pass `allow_terminal_overwrite=True`. Returns bool indicating whether the write landed; BG callers no-op on terminal rows. Tests: - TestStoreBundle::test_bundle_zip_filters_quarantined_for_non_owner - TestStoreBundle::test_bundle_zip_owner_sees_own_pending - TestStoreBundle::test_bundle_zip_admin_sees_all - TestConcurrentPutSerialization::test_per_entity_lock_serializes - TestConcurrentPutSerialization::test_per_entity_lock_does_not_serialize_across_entities - TestBgTaskIdempotency::test_late_verdict_does_not_clobber_overridden - TestBgTaskIdempotency::test_explicit_allow_terminal_overwrite_works * review fix: runner.run_llm_review honors update_status CAS bool Codex's CAS in update_status closes the DB-level race correctly, but runner.run_llm_review was still discarding the new bool return on both its `approved` and `blocked_llm` branches. When the CAS no-op'd (submission already at terminal status — most commonly an admin override fired mid-review), the runner kept running the downstream cascade: - set_visibility_if_pending (no-op on approved, but still ran) - promote_version + _swap_live_to_version (forward-only check mitigated worst case) - update_flea_attribution - audit.log(action="store.submission.approved" / "blocked_llm") — this is the operator-visible damage: the audit trail would show a verdict that contradicts the row's actual `overridden` status. Fix: capture the bool, skip the cascade on no-op, log a single `store.submission.bg_verdict_skipped` audit row instead. Mirrors the existing `superseded_reason` path the runner already has for the archive-during-review case (TestPRReviewFixes:: test_bg_verdict_skipped_when_admin_archives_during_review). Test: TestBgTaskIdempotency::test_runner_late_verdict_logs_skipped_not_approved sets up the v1-approved + v2-pending + admin-override sequence, fires run_llm_review directly with a mocked "approved" verdict, asserts row stays overridden AND audit has bg_verdict_skipped AND audit does NOT have a contradictory approved entry. CHANGELOG H3 bullet expanded to acknowledge the bg_verdict_skipped audit-row behavior — operator reviewing the queue now sees dropped verdicts explicitly rather than via row-vs-audit contradiction. --------- Co-authored-by: ZdenekSrotyr --- CHANGELOG.md | 32 ++++ app/api/store.py | 99 ++++++++++ src/repositories/store_submissions.py | 44 ++++- src/store_guardrails/runner.py | 43 ++++- tests/test_store_api.py | 95 ++++++++++ tests/test_store_entity_versions.py | 252 ++++++++++++++++++++++++++ 6 files changed, 559 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff21e45..cf385e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,38 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C banner now renders for those failure statuses too, with copy that acknowledges the prior version is still live ("Latest edit failed review — previously approved version (vN) keeps serving …"). +- Flea-market `/api/store/bundle.zip` export now filters by + `visibility_status='approved'` for non-admin non-owner callers. + Previously an authenticated user could call the bundle endpoint + and pull pending / blocked / hidden v1 bytes — bypassing the + publish gate the same way `_enforce_visibility` already prevents + on detail pages + install. Owners still see their own non-approved + entries (matches the browse-listing `include_owner_id` affordance); + admins still see everything. (Critical — surfaced by adversarial + review.) +- Flea-market PUT (edit) + restore endpoints now serialize concurrent + writes against the same `entity_id` via a per-entity asyncio lock. + Two concurrent PUTs could previously both pass the + `latest_for_entity` pending gate, both bake into + `versions/v/plugin/`, and both append a `version_history` + entry. The lock closes the window for single-process deployments; + multi-worker deployments still have a residual window (tracked in + the follow-up issue). (High — surfaced by adversarial review.) +- Flea-market `StoreSubmissionsRepository.update_status` is now + compare-and-swap on terminal statuses (`approved`, `overridden`, + `blocked_inline`). A late background-task LLM verdict racing with + an admin override or with a more recent terminal verdict can no + longer silently clobber the row. Callers that legitimately need to + overwrite a terminal state pass `allow_terminal_overwrite=True` + explicitly. Returns a boolean indicating whether the write landed. + `runner.run_llm_review` now honors the bool on both its `approved` + and `blocked_llm` branches: a CAS no-op skips the downstream + cascade (visibility flip, version promote, the verdict-specific + audit entry that would otherwise contradict the row) and logs a + single `store.submission.bg_verdict_skipped` audit row instead, + so an operator reviewing the queue sees dropped verdicts + explicitly rather than via row-vs-audit contradiction. + (High — surfaced by adversarial review.) - Flea-market admin **Retry review** and **Rescan** now review the STAGED version's bundle, not the live `plugin/` directory. For a v2+ edit held at `pending_llm` / `blocked_llm` / `review_error`, diff --git a/app/api/store.py b/app/api/store.py index 047258a..71ae0a6 100644 --- a/app/api/store.py +++ b/app/api/store.py @@ -16,6 +16,7 @@ display name don't collide in Claude Code's flat namespace. from __future__ import annotations +import asyncio import io import json import logging @@ -25,6 +26,7 @@ import shutil import tempfile import uuid import zipfile +from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional @@ -400,6 +402,40 @@ def _submission_plugin_dir( return _entity_dir(entity_id) / "versions" / f"v{int(version_no)}" / "plugin" +# Per-entity write lock. Serializes the "read latest submission → bake +# new version dir → append history" critical section in PUT + restore +# so two concurrent edits on the same entity_id can't both pass the +# "no pending submission" gate, both append history rows, and race +# on ``versions/v/plugin/``. Surfaced by the adversarial review +# of PR #316. +# +# Scope: single-process. Multi-worker uvicorn deployments still have +# a window — a process-shared lock (DB advisory, filesystem flock) +# would be the next step. For the typical single-worker corporate +# deployment this closes the race; the publish-gate model is already +# defense-in-depth (LLM tier won't approve duplicate bytes anyway). +_entity_write_locks: Dict[str, asyncio.Lock] = {} +_entity_write_locks_guard = asyncio.Lock() + + +@asynccontextmanager +async def _hold_entity_write_lock(entity_id: str): + """Serialize concurrent writes to a single flea-market entity. + + Wrap the version-creating critical section in PUT + restore: + read latest submission status, bake new version dir, append + ``version_history``. Outside this section the request can hit the + DB freely. + """ + async with _entity_write_locks_guard: + lock = _entity_write_locks.get(entity_id) + if lock is None: + lock = asyncio.Lock() + _entity_write_locks[entity_id] = lock + async with lock: + yield + + def _version_no_for_submission( entity_row: Dict[str, Any], submission_id: str, ) -> Optional[int]: @@ -1479,6 +1515,30 @@ async def update_entity( * **Metadata-only edit** (no ``file`` posted) skips the bundle pipeline and the version bump. """ + async with _hold_entity_write_lock(entity_id): + return await _update_entity_locked( + entity_id=entity_id, + background_tasks=background_tasks, + file=file, name=name, type=type, description=description, + category=category, video_url=video_url, photo=photo, + user=user, conn=conn, + ) + + +async def _update_entity_locked( + *, + entity_id: str, + background_tasks: BackgroundTasks, + file: Optional[UploadFile], + name: Optional[str], + type: Optional[str], + description: Optional[str], + category: Optional[str], + video_url: Optional[str], + photo: Optional[UploadFile], + user: dict, + conn: duckdb.DuckDBPyConnection, +): repo = StoreEntitiesRepository(conn) entity = repo.get(entity_id) if not entity: @@ -1887,7 +1947,26 @@ async def restore_version( Refuses while a prior version is under review (same ``prior_version_pending`` 409 as PUT). + + Wrapped in the per-entity write lock so a concurrent PUT and + restore on the same entity can't both pass the pending-gate + + race on ``versions/v/plugin/``. """ + async with _hold_entity_write_lock(entity_id): + return await _restore_version_locked( + entity_id=entity_id, version_no=version_no, + background_tasks=background_tasks, user=user, conn=conn, + ) + + +async def _restore_version_locked( + *, + entity_id: str, + version_no: int, + background_tasks: BackgroundTasks, + user: dict, + conn: duckdb.DuckDBPyConnection, +): repo = StoreEntitiesRepository(conn) entity = repo.get(entity_id) if not entity: @@ -2422,6 +2501,24 @@ async def export_bundle( if owner == "me": owner = user["id"] repo = StoreEntitiesRepository(conn) + # Visibility filter mirrors the marketplace browse query: only + # `approved` is visible to non-admin non-owner callers. Without + # this filter, an authenticated non-admin could pull the entire + # store including pending / blocked / hidden v1 bytes — bypassing + # the publish gate the same way `_enforce_visibility` already + # prevents on the detail page + install endpoint. Surfaced by the + # adversarial review pass on PR #316. + is_admin = is_user_admin(user["id"], conn) + visibility_filter: Optional[List[str]] = ( + None if is_admin else ["approved"] + ) + # Owners always see their own non-approved entries in their + # export — same affordance the browse listing applies via the + # `include_owner_id` knob on `repo.list`. Admins skip the filter + # entirely. + include_owner_id: Optional[str] = ( + None if is_admin else user["id"] + ) # Page through everything. The 100/req limit on `list` is a UI # pagination affordance, not a backup constraint — for a bulk export # we want all matches. @@ -2432,6 +2529,8 @@ async def export_bundle( page_items, _total = repo.list( skip=skip, limit=page, type=type, category=category, search=search, owner_user_id=owner, + visibility_status=visibility_filter, + include_owner_id=include_owner_id, ) if not page_items: break diff --git a/src/repositories/store_submissions.py b/src/repositories/store_submissions.py index 9dcb12c..bfc0981 100644 --- a/src/repositories/store_submissions.py +++ b/src/repositories/store_submissions.py @@ -173,6 +173,16 @@ class StoreSubmissionsRepository: # Routes to the broader counter post-#9. count_blocked_inline_for_submitter_since = count_blocked_for_submitter_since + # Terminal states whose `status` should never be silently overwritten + # by an asynchronous (BG-task) writer. Admin-triggered actions + # (override, delete) call dedicated repo methods or set + # ``allow_terminal_overwrite=True`` explicitly. The BG-task path + # in ``runner.run_llm_review`` calls ``update_status`` without that + # flag — so a late LLM verdict racing with an admin override OR + # with a more recent terminal verdict can no longer clobber the + # row. + _TERMINAL_STATUSES = frozenset({"approved", "overridden", "blocked_inline"}) + def update_status( self, id: str, @@ -180,7 +190,19 @@ class StoreSubmissionsRepository: status: str, llm_findings: Optional[Dict[str, Any]] = None, reviewed_by_model: Optional[str] = None, - ) -> None: + allow_terminal_overwrite: bool = False, + ) -> bool: + """Update a submission's status. Returns ``True`` when the row + was actually updated, ``False`` when a compare-and-swap skipped + the write because the row had already moved to a terminal state. + + The CAS protects against the BG-task race surfaced by the + adversarial review of PR #316: a late LLM verdict could + previously clobber ``status='overridden'`` (admin force-published + the submission while the LLM was still running). With the guard, + BG callers no-op on terminal rows; admin paths still call + ``set_override`` etc. which write unconditionally. + """ if status not in VALID_STATUSES: raise ValueError(f"invalid submission status: {status!r}") sets = ["status = ?", "updated_at = ?"] @@ -191,11 +213,25 @@ class StoreSubmissionsRepository: if reviewed_by_model is not None: sets.append("reviewed_by_model = ?") params.append(reviewed_by_model) + where_clauses = ["id = ?"] params.append(id) - self.conn.execute( - f"UPDATE store_submissions SET {', '.join(sets)} WHERE id = ?", - params, + if not allow_terminal_overwrite: + placeholders = ",".join("?" for _ in self._TERMINAL_STATUSES) + where_clauses.append(f"status NOT IN ({placeholders})") + params.extend(self._TERMINAL_STATUSES) + sql = ( + f"UPDATE store_submissions SET {', '.join(sets)} " + f"WHERE {' AND '.join(where_clauses)}" ) + result = self.conn.execute(sql, params) + # DuckDB returns a relation with the rowcount in row 0, col 0 + # for an UPDATE. fetchone() is the portable way to read it. + try: + row = result.fetchone() + rowcount = int(row[0]) if row else 0 + except Exception: + rowcount = 0 + return rowcount > 0 def set_override( self, diff --git a/src/store_guardrails/runner.py b/src/store_guardrails/runner.py index b7e20ab..d5c4781 100644 --- a/src/store_guardrails/runner.py +++ b/src/store_guardrails/runner.py @@ -225,12 +225,33 @@ def run_llm_review( passed = llm_review.is_safe(verdict) if passed: - subs_repo.update_status( + written = subs_repo.update_status( submission_id, status="approved", llm_findings=verdict, reviewed_by_model=model, ) + if not written: + # The row hit a terminal status (approved / overridden / + # blocked_inline) before this BG verdict could land — + # most commonly an admin Override fired while the LLM + # call was running. Skip the entire downstream cascade + # (visibility flip, version promote, "approved" audit + # entry that would contradict the row) and log the + # suppression instead so the operator timeline shows + # the dropped verdict. + audit.log( + user_id=submitter_id, + action="store.submission.bg_verdict_skipped", + resource=f"store_submission:{submission_id}", + params={ + "attempted_verdict": "approved", + "reason": "submission already at terminal status (CAS no-op)", + "model": model, + }, + result="skipped", + ) + return LlmResult(verdict=verdict, reviewed_by_model=model) # Two outcomes are possible AND independent here: # # 1. Initial-upload (v1) approval flips visibility from @@ -332,12 +353,30 @@ def run_llm_review( result="ok", ) else: - subs_repo.update_status( + written = subs_repo.update_status( submission_id, status="blocked_llm", llm_findings=verdict, reviewed_by_model=model, ) + if not written: + # CAS no-op: row hit a terminal status before this + # verdict landed (admin override, prior terminal write). + # See the parallel `approved` branch above — same + # treatment: log the suppression, skip the misleading + # "blocked_llm" audit entry, return early. + audit.log( + user_id=submitter_id, + action="store.submission.bg_verdict_skipped", + resource=f"store_submission:{submission_id}", + params={ + "attempted_verdict": "blocked_llm", + "reason": "submission already at terminal status (CAS no-op)", + "model": model, + }, + result="skipped", + ) + return LlmResult(verdict=verdict, reviewed_by_model=model) # On block, entity state depends on which path triggered # this submission: # diff --git a/tests/test_store_api.py b/tests/test_store_api.py index 3ce434d..b276cc2 100644 --- a/tests/test_store_api.py +++ b/tests/test_store_api.py @@ -906,6 +906,101 @@ class TestStoreBundle: # require_admin denies non-admin with 403. assert r.status_code == 403, r.text + def test_bundle_zip_filters_quarantined_for_non_owner( + self, web_client, monkeypatch, + ): + """Codex adversarial review [CRITICAL]: GET /bundle.zip used + ``repo.list(...)`` without a visibility filter. An + authenticated non-admin could download pending / blocked v1 + bytes by hitting the bundle endpoint. Fixed by mirroring the + browse-listing gate: non-admin sees only ``approved`` (plus + their own non-approved entries).""" + from src.repositories.store_entities import StoreEntitiesRepository + + # Owner uploads a clean skill (lands approved with guardrails off). + owner_id, owner_cookies = _create_user(web_client, "bundle-owner@x.com") + r = self._upload_skill(web_client, owner_cookies, name="bundle-public") + eid_public = r.json()["id"] + + from src.db import get_system_db + # Owner also has a SECOND skill that we manually flip to + # visibility=pending (simulating in-review). + r = self._upload_skill(web_client, owner_cookies, name="bundle-pending") + eid_pending = r.json()["id"] + conn = get_system_db() + StoreEntitiesRepository(conn).set_visibility(eid_pending, "pending") + conn.close() + + # Snoop is a different non-admin user. + _, snoop_cookies = _create_user(web_client, "bundle-snoop@x.com") + r = web_client.get("/api/store/bundle.zip", cookies=snoop_cookies) + assert r.status_code == 200 + with zipfile.ZipFile(io.BytesIO(r.content)) as zf: + names = zf.namelist() + # Snoop sees the approved entity ... + assert any(f"entities/{eid_public}/" in n for n in names), ( + "approved entity must be present in bundle" + ) + # ... but NEVER the pending one. + assert not any(f"entities/{eid_pending}/" in n for n in names), ( + "non-admin must NOT see pending entities via bundle.zip" + ) + # Manifest entry count reflects the filter. + manifest = json.loads( + zipfile.ZipFile(io.BytesIO(r.content)).read("manifest.json"), + ) + manifest_ids = {e["entity_id"] for e in manifest["entries"]} + assert eid_public in manifest_ids + assert eid_pending not in manifest_ids + + def test_bundle_zip_owner_sees_own_pending(self, web_client): + """Owner-of-pending sees their own non-approved entries in + their bundle export (matches the browse-listing affordance + via include_owner_id).""" + from src.repositories.store_entities import StoreEntitiesRepository + + from src.db import get_system_db + owner_id, owner_cookies = _create_user(web_client, "bundle-mine@x.com") + r = self._upload_skill(web_client, owner_cookies, name="mine-pending") + eid = r.json()["id"] + conn = get_system_db() + StoreEntitiesRepository(conn).set_visibility(eid, "pending") + conn.close() + + r = web_client.get("/api/store/bundle.zip", cookies=owner_cookies) + assert r.status_code == 200 + with zipfile.ZipFile(io.BytesIO(r.content)) as zf: + names = zf.namelist() + assert any(f"entities/{eid}/" in n for n in names), ( + "owner must see their OWN pending entity in their bundle" + ) + + def test_bundle_zip_admin_sees_all(self, web_client): + """Admin sees pending entries from other users too.""" + from src.repositories.store_entities import StoreEntitiesRepository + from tests.helpers.auth import grant_admin + + from src.db import get_system_db + owner_id, owner_cookies = _create_user(web_client, "bundle-other-owner@x.com") + r = self._upload_skill(web_client, owner_cookies, name="other-pending") + eid = r.json()["id"] + conn = get_system_db() + StoreEntitiesRepository(conn).set_visibility(eid, "pending") + conn.close() + + _, admin_cookies = _create_user(web_client, "bundle-admin@x.com") + conn = get_system_db() + grant_admin(conn, "bundle-admin") + conn.close() + + r = web_client.get("/api/store/bundle.zip", cookies=admin_cookies) + assert r.status_code == 200 + with zipfile.ZipFile(io.BytesIO(r.content)) as zf: + names = zf.namelist() + assert any(f"entities/{eid}/" in n for n in names), ( + "admin must see pending entities from any owner" + ) + class TestInstallCycle: def test_install_uninstall_and_count(self, web_client): diff --git a/tests/test_store_entity_versions.py b/tests/test_store_entity_versions.py index 371188f..cae19be 100644 --- a/tests/test_store_entity_versions.py +++ b/tests/test_store_entity_versions.py @@ -1408,3 +1408,255 @@ class TestPublishGateFailClosed: conn.close() assert ent["visibility_status"] == "approved" assert sub["status"] == "approved" + + +class TestConcurrentPutSerialization: + """Codex adversarial review [HIGH]: concurrent PUTs racing on the + same entity_id could both pass the ``latest_for_entity`` pending + gate, both bake into ``versions/v/plugin/``, and both append + a ``version_history`` entry. Per-entity asyncio lock added to + serialize the critical section in PUT + restore. + + Integration coverage (two real PUTs racing against TestClient) + isn't practical here: each TestClient call wraps the async handler + in its own event loop, so asyncio.Lock acquired in loop A cannot + coordinate with loop B — they deadlock instead of contending. In + a real uvicorn deployment all requests run on a single event loop + and the lock works as designed. This test exercises the helper + directly to verify the serialization semantics; the integration + side is covered by the existing `prior_version_pending` test + (which fires once the first PUT has committed).""" + + def test_per_entity_lock_serializes(self): + import asyncio + from app.api.store import _hold_entity_write_lock + + seq: list = [] + + async def task(label: str) -> None: + async with _hold_entity_write_lock("ent-shared"): + seq.append(f"{label}-in") + # Yield to the scheduler to give the other coroutine a + # chance to run if the lock isn't held. + await asyncio.sleep(0.01) + seq.append(f"{label}-out") + + async def driver() -> None: + await asyncio.gather(task("A"), task("B")) + + asyncio.run(driver()) + + # Pairs must NOT interleave — one finishes entirely before + # the other starts. + assert seq in ( + ["A-in", "A-out", "B-in", "B-out"], + ["B-in", "B-out", "A-in", "A-out"], + ), f"per-entity lock failed to serialize: seq={seq}" + + def test_per_entity_lock_does_not_serialize_across_entities(self): + """Different entity_ids get independent locks so unrelated + writes don't block each other.""" + import asyncio + from app.api.store import _hold_entity_write_lock + + seq: list = [] + + async def task(label: str, entity: str) -> None: + async with _hold_entity_write_lock(entity): + seq.append(f"{label}-in") + await asyncio.sleep(0.01) + seq.append(f"{label}-out") + + async def driver() -> None: + await asyncio.gather(task("A", "ent-a"), task("B", "ent-b")) + + asyncio.run(driver()) + + # Interleaving expected: A-in, B-in, A-out, B-out (or B/A + # ordering depending on which coroutine the loop picks first). + assert seq[0] in {"A-in", "B-in"} + assert seq[1] in {"A-in", "B-in"} + assert seq[0] != seq[1], ( + f"entities should have run in parallel — got serial: {seq}" + ) + + +class TestBgTaskIdempotency: + """Codex adversarial review [HIGH]: `update_status` blindly + overwrote any current status. A late BG-task LLM verdict racing + with an admin override could clobber `overridden` back to + `approved`/`blocked_llm`. Now: terminal statuses are + compare-and-swap-protected; BG callers no-op.""" + + def test_late_verdict_does_not_clobber_overridden(self, web_client): + """Admin overrides a blocked submission. A subsequent late + BG-task ``update_status`` for the same submission must NOT + flip it back.""" + from src.repositories.store_entities import StoreEntitiesRepository + from src.repositories.store_submissions import StoreSubmissionsRepository + + user_id, _ = _create_user(web_client, "idemp@x.com") + conn = get_system_db() + ents = StoreEntitiesRepository(conn) + ents.create( + id="ent-idemp", owner_user_id=user_id, owner_username="idemp", + type="skill", name="idemp-skill", description="x" * 40, + category=None, version="aaaaaaaaaaaaaaaa", file_size=10, + visibility_status="pending", + ) + subs = StoreSubmissionsRepository(conn) + sid = subs.create( + submitter_id=user_id, submitter_email="idemp@x.com", + type="skill", name="idemp-skill", version="aaaaaaaaaaaaaaaa", + status="blocked_llm", entity_id="ent-idemp", + llm_findings={"risk_level": "high", "summary": "x"}, + ) + ents.update_history_submission_id("ent-idemp", 1, sid) + conn.close() + + from tests.helpers.auth import grant_admin + admin_id, admin_cookies = _create_user(web_client, "idemp-admin@x.com") + conn = get_system_db() + grant_admin(conn, admin_id) + conn.close() + + # Override the blocked submission → status='overridden'. + r = web_client.post( + f"/api/admin/store/submissions/{sid}/override", + json={"reason": "false positive — cleared in offline review"}, + cookies=admin_cookies, + ) + assert r.status_code == 200 + + # Now simulate a late BG-task verdict arriving: + # update_status is called without allow_terminal_overwrite. + conn = get_system_db() + subs = StoreSubmissionsRepository(conn) + # CAS no-op because status=='overridden' is terminal. + wrote = subs.update_status( + sid, status="approved", + llm_findings={"risk_level": "safe", "summary": "late"}, + ) + conn.close() + assert wrote is False, ( + "late BG verdict must NOT overwrite a terminal `overridden` row" + ) + + # Status still overridden. + conn = get_system_db() + row = StoreSubmissionsRepository(conn).get(sid) + conn.close() + assert row["status"] == "overridden" + + def test_runner_late_verdict_logs_skipped_not_approved( + self, web_client, monkeypatch, + ): + """End-to-end pair to ``test_late_verdict_does_not_clobber_overridden``: + when the LLM verdict lands on an already-overridden submission, + ``runner.run_llm_review`` honors the CAS bool and: + 1. row status stays ``overridden``, + 2. audit log gets a single ``bg_verdict_skipped`` entry, + 3. audit log does NOT get a contradictory ``approved`` / + ``blocked_llm`` entry — pre-fix the runner discarded the + return value and ran the downstream cascade including + the misleading audit write. + """ + from src.repositories.audit import AuditRepository + from src.repositories.store_entities import StoreEntitiesRepository + from src.repositories.store_submissions import StoreSubmissionsRepository + from src.store_guardrails.runner import run_llm_review + from app.utils import get_store_dir + + owner_id, owner_cookies = _create_user(web_client, "lateverdict@x.com") + eid = _upload_clean(web_client, owner_cookies, name="lateverdict") + + # Flip guardrails on, PUT v2 → pending_llm under deferred-promotion + # (visibility stays 'approved' at v1). + monkeypatch.setattr("app.api.store.get_guardrails_enabled", lambda: True) + monkeypatch.setattr("app.api.store.get_guardrails_llm_provider_ready", lambda: True) + monkeypatch.setattr( + "app.api.store._schedule_llm_review", lambda *a, **kw: None, + ) + # Mock review_bundle to return an "approved"-shape verdict so + # the runner would (pre-fix) hit the approved branch + cascade. + monkeypatch.setattr( + "src.store_guardrails.llm_review.review_bundle", + lambda *a, **kw: { + "risk_level": "safe", "summary": "ok", + "findings": [], "template_placeholders_found": 0, + "reviewed_by_model": "mock", "error": None, + }, + ) + + v2 = _make_skill_zip("lateverdict", body="v2 " * 80) + web_client.put( + f"/api/store/entities/{eid}", + files={"file": ("v2.zip", v2, "application/zip")}, + cookies=owner_cookies, + ) + + # Admin override flips the v2 submission row to 'overridden'. + from tests.helpers.auth import grant_admin + admin_id, admin_cookies = _create_user(web_client, "lv-admin@x.com") + conn = get_system_db() + grant_admin(conn, admin_id) + sub_id = StoreSubmissionsRepository(conn).latest_for_entity(eid)["id"] + conn.close() + r = web_client.post( + f"/api/admin/store/submissions/{sub_id}/override", + json={"reason": "false positive cleared offline"}, + cookies=admin_cookies, + ) + assert r.status_code == 200, r.text + + # Now fire the runner directly — it would (pre-fix) try to write + # status='approved' on the already-overridden row. + run_llm_review( + sub_id, + plugin_dir=Path(get_store_dir()) / eid / "versions" / "v2" / "plugin", + conn_factory=get_system_db, + api_key_loader=lambda: "sk-test", + model_loader=lambda: "claude-haiku-4-5-20251001", + ) + + # Row must stay overridden + audit log must show skipped, not + # the misleading approved write. + conn = get_system_db() + row = StoreSubmissionsRepository(conn).get(sub_id) + rows = AuditRepository(conn).query_for_resources( + [f"store_submission:{sub_id}"], limit=20, + ) + conn.close() + actions = [r.get("action") for r in rows] + assert row["status"] == "overridden", ( + f"row must stay overridden under CAS no-op; got {row['status']}" + ) + assert "store.submission.bg_verdict_skipped" in actions, ( + f"runner must log bg_verdict_skipped on CAS no-op; got {actions}" + ) + assert "store.submission.approved" not in actions, ( + "runner must NOT log approved when the CAS no-op'd the write — " + f"audit must not contradict the row state; got {actions}" + ) + + def test_explicit_allow_terminal_overwrite_works(self, web_client): + """Admin paths that legitimately need to overwrite a terminal + state can pass `allow_terminal_overwrite=True` and get the + write through. Used by rescan and similar admin actions.""" + from src.repositories.store_submissions import StoreSubmissionsRepository + user_id, _ = _create_user(web_client, "termok@x.com") + conn = get_system_db() + sid = StoreSubmissionsRepository(conn).create( + submitter_id=user_id, submitter_email="termok@x.com", + type="skill", name="x", version="aaaa", status="approved", + entity_id=None, + ) + wrote = StoreSubmissionsRepository(conn).update_status( + sid, status="pending_llm", allow_terminal_overwrite=True, + ) + conn.close() + assert wrote is True + conn = get_system_db() + row = StoreSubmissionsRepository(conn).get(sid) + conn.close() + assert row["status"] == "pending_llm"