fix(security): gate Script-API /run on admin role (#44) (#92)

* fix(security): gate Script-API /run on admin role (#44)

The AST + string-blocklist sandbox in `_execute_script` is defense-in-depth,
not a primary trust boundary. It does not block `vars()`, `type()`, or
`__class__.__bases__` introspection chains, and the string blocklist is
trivially evadable via concatenation/dunder encoding. Treat the role gate
as the actual barrier: only admin can run scripts.

- `POST /api/scripts/run` and `POST /api/scripts/{id}/run` now require admin.
- `POST /api/scripts/deploy` stays analyst-accessible (storing != executing).
- Existing /run tests retargeted to admin_token; added regression tests
  asserting analyst → 403 on both endpoints.
- CHANGELOG: BREAKING (security) bullet under Unreleased/Changed.

Closes #44.

* fix(security): admin-gate /deploy + harden sandbox blocklist (review #92)

Reviewer of PR #92 flagged three MUST-FIXes that #44 wasn't fully closed:

1. /api/scripts/deploy still accepted analyst → planted-script attack
   path (analyst plants malicious source, waits for admin to /run).
   Now: /deploy also requires admin; the entire Script API is admin-only.

2. The "Minimum (same-day)" blocklist mitigations from issue #44 weren't
   applied. Added the introspection-chain dunders that the issue PoC
   pivots through: __subclasses__, __globals__, __class__, __base__,
   __bases__, __mro__, __dict__, __code__, __builtins__. Plus `vars`
   in BLOCKED_FUNCTIONS. Deliberately NOT adding __init__ /
   __getattribute__ (substring match would flag every legit `def __init__`)
   nor `type`/`dir` (frequent in legitimate admin scripts). Documented
   the trade-off inline.

3. Tests didn't cover the actual PoC payload nor non-analyst non-admin
   roles. Added test_run_pwn_payload_blocked parametrized over the issue's
   own PoC + two equivalent variants (lambda+__globals__, __mro__
   traversal); these stay green only as long as the dunder list does.
   test_*_requires_admin tests now parametrize over (analyst, viewer,
   km_admin) so all three non-admin core roles are pinned at 403.

Conftest extension: seeded_app now exposes viewer_token and
km_admin_token as siblings to admin_token / analyst_token.

CHANGELOG bullet updated to reflect /deploy gate change and new
internal regression tests. 35/35 scripts tests pass locally.

Refs review of #92.

* fix(tests): test_security TestScriptSandbox needs admin token after #44 hardening

CI failure on PR #92 caught a missed test file. tests/test_security.py
seeded only an analyst user and used the analyst token to drive sandbox
tests. After the #44 admin-gate (deploy + run both admin-only), every
sandbox test got 403 from the role gate before the AST/string check
could run, so 'blocks os.system' / 'blocks eval' / etc. all failed.

Fix: extend the fixture to also seed an admin user and return the admin
token. Sandbox tests now reach the sandbox layer; access-control tests
further down in the module continue to use the analyst that was kept
around. 41/41 test_security.py tests pass locally.

* fix(security): #92 round-3 — gate GET /api/scripts on admin role

Devin Review caught: GET /api/scripts (app/api/scripts.py:44-51) was
left on Depends(get_current_user) when the rest of the API moved to
admin-only. ScriptRepository.list_all() does SELECT * FROM script_registry
which returns ALL columns including 'source' (the full script body).
So any authenticated user (viewer / analyst / km_admin) could read
admin-deployed scripts — leak of code that may contain credentials,
business logic, or admin-only operational details.

CHANGELOG already says 'The entire Script API is now admin-only',
which was true for /deploy, /run, /{id}/run, DELETE — just not for
GET. Now consistent: every Script endpoint requires admin.

Tests:
- New parametrized test_list_scripts_requires_admin over (analyst,
  viewer, km_admin) tokens — all assert 403.
- Updated test_list_scripts_empty in both test_scripts_api.py and
  test_api_scripts.py to use admin_token.

79 tests pass.

Refs Devin Review of #92.

* fix: cleanup unused imports, stale docstrings, and incomplete CHANGELOG

- Remove unused imports: Path, List, get_current_user (ruff F401)
- Trim docstrings to describe current behavior, not change history
- CHANGELOG now lists GET /api/scripts among admin-gated endpoints
- Remove diff-commenting inline comments from tests

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

* fix: merge duplicate Changed sections into one per CLAUDE.md convention

Co-Authored-By: zdenek.srotyr <zdenek.srotyr@keboola.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
This commit is contained in:
ZdenekSrotyr 2026-04-27 21:13:56 +02:00 committed by GitHub
parent 4e4d2a39e6
commit 24e81fb671
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 224 additions and 97 deletions

View file

@ -13,6 +13,65 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
<!-- Add bullets here. Group: Added / Changed / Fixed / Removed / Internal.
Mark breaking changes with **BREAKING** at the start of the bullet. -->
### Changed
- **BREAKING (security)**: The entire Script API is now **admin-only** (issue #44).
`GET /api/scripts`, `POST /api/scripts/deploy`, `POST /api/scripts/run`, and
`POST /api/scripts/{id}/run` all require the admin role; previously the list
endpoint was open to any authenticated user and deploy/run were analyst-accessible.
Two reasons: (1) the AST + string-blocklist sandbox in `_execute_script` is
defense-in-depth and known to be bypassable through introspection chains
(`__class__.__base__.__subclasses__()`, `__globals__['__builtins__']`,
`__mro__` traversal — the dunder pattern list was tightened in this PR but
the policy is "the role gate is the trust boundary, not the blocklist");
(2) gating only `/run` left a planted-script attack open — an analyst could
deploy a malicious script and wait for an admin to run it. Operators who
need scripted workflows for non-admin users should run them on the user's
behalf or expose the relevant data via the read-only `/api/data` surface
instead.
- **BREAKING (ops)**: Generic ops scripts moved out of the customer-named
`scripts/grpn/` directory into `scripts/ops/` as part of the OSS
vendor-neutralization (issue #88):
- `scripts/grpn/agnes-tls-rotate.sh``scripts/ops/agnes-tls-rotate.sh`
- `scripts/grpn/agnes-auto-upgrade.sh``scripts/ops/agnes-auto-upgrade.sh`
Downstream consumer infra repos that copy these scripts onto VMs (e.g. via
their own `startup.sh`) must update the source path. The OSS-shipped
`infra/modules/customer-instance/` Terraform module is unaffected — it
embeds equivalent logic inline via heredoc and does not source-by-path
from `scripts/`. Script behaviour and env vars are unchanged. Cross-refs
in `README.md`, `CLAUDE.md`, `docs/DEPLOYMENT.md`, `Caddyfile`, and
`docker-compose.yml` were updated.
- **OSS neutralization (wave 2 — code, tests, planning docs)**. Customer
identifiers replaced with placeholders across the codebase to ready the
repo for public release (issue #88):
- **Code docstrings**: `connectors/openmetadata/{client,transformer,enricher}.py`,
`src/catalog_export.py`, `scripts/duckdb_manager.py``prj-grp-…`
`my-bq-project` / `prj-example-1234`, `AIAgent.FoundryAI`
`AIAgent.MyAgent` (in docstrings) / `AIAgent.Example` (in test fixtures),
`FoundryAIDataModel``AnalyticsDataModel`.
- **Test fixtures** in `tests/test_openmetadata_enricher.py`,
`tests/test_duckdb_manager.py`, `tests/test_catalog_export.py`,
`tests/test_openmetadata_transformer.py` — same set of replacements,
behaviour-preserving (157 tests still green).
- **Terraform module** `infra/modules/customer-instance/variables.tf`:
`customer_name` description rewritten in English, examples switched
from `keboola, grpn` to `acme, example`.
- **Workflow** `.github/workflows/keboola-deploy.yml`: comment "Groupon-side
dev VMs" → generic "per-developer dev VMs".
- **Caddyfile**: TLS-rotation cross-ref updated to `scripts/ops/…` and
Keboola-specific aside removed.
- **Auth docs** `docs/auth-groups.md` and the OAuth probe in
`scripts/debug/probe_google_groups.py`: GCP project name `kids-ai-data-analysis`
replaced with placeholder `acme-internal-prod`.
- **Planning docs** under `docs/superpowers/plans/` and `…/specs/`: the
five hackathon-era documents (`2026-04-21-deployment-log.md`,
`…-multi-customer-deployment.md`, `…-issues-14-and-10.md`,
`…-hackathon-dry-run.md`, the spec) had `34.77.94.14` / `34.77.102.61`
replaced with `<dev-vm-ip>` / `<prod-vm-ip>`, `Groupon`/`GRPN`/`grpn`
with `Acme`/`another-customer`, and `prj-grp-…` with `prj-example-…`.
### Fixed
- **BREAKING (security CRITICAL)**: Jira webhook handler is now
@ -46,52 +105,6 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
clips length to 64 chars, and routes the final filename through
`safe_join_under`.
### Changed
- **BREAKING (ops)**: Generic ops scripts moved out of the customer-named
`scripts/grpn/` directory into `scripts/ops/` as part of the OSS
vendor-neutralization (issue #88):
- `scripts/grpn/agnes-tls-rotate.sh``scripts/ops/agnes-tls-rotate.sh`
- `scripts/grpn/agnes-auto-upgrade.sh``scripts/ops/agnes-auto-upgrade.sh`
Downstream consumer infra repos that copy these scripts onto VMs (e.g. via
their own `startup.sh`) must update the source path. The OSS-shipped
`infra/modules/customer-instance/` Terraform module is unaffected — it
embeds equivalent logic inline via heredoc and does not source-by-path
from `scripts/`. Script behaviour and env vars are unchanged. Cross-refs
in `README.md`, `CLAUDE.md`, `docs/DEPLOYMENT.md`, `Caddyfile`, and
`docker-compose.yml` were updated.
- **OSS neutralization (wave 2 — code, tests, planning docs)**. Customer
identifiers replaced with placeholders across the codebase to ready the
repo for public release (issue #88):
- **Code docstrings**: `connectors/openmetadata/{client,transformer,enricher}.py`,
`src/catalog_export.py`, `scripts/duckdb_manager.py``prj-grp-…`
`my-bq-project` / `prj-example-1234`, `AIAgent.FoundryAI`
`AIAgent.MyAgent` (in docstrings) / `AIAgent.Example` (in test fixtures),
`FoundryAIDataModel``AnalyticsDataModel`.
- **Test fixtures** in `tests/test_openmetadata_enricher.py`,
`tests/test_duckdb_manager.py`, `tests/test_catalog_export.py`,
`tests/test_openmetadata_transformer.py` — same set of replacements,
behaviour-preserving (157 tests still green).
- **Terraform module** `infra/modules/customer-instance/variables.tf`:
`customer_name` description rewritten in English, examples switched
from `keboola, grpn` to `acme, example`.
- **Workflow** `.github/workflows/keboola-deploy.yml`: comment "Groupon-side
dev VMs" → generic "per-developer dev VMs".
- **Caddyfile**: TLS-rotation cross-ref updated to `scripts/ops/…` and
Keboola-specific aside removed.
- **Auth docs** `docs/auth-groups.md` and the OAuth probe in
`scripts/debug/probe_google_groups.py`: GCP project name `kids-ai-data-analysis`
replaced with placeholder `acme-internal-prod`.
- **Planning docs** under `docs/superpowers/plans/` and `…/specs/`: the
five hackathon-era documents (`2026-04-21-deployment-log.md`,
`…-multi-customer-deployment.md`, `…-issues-14-and-10.md`,
`…-hackathon-dry-run.md`, the spec) had `34.77.94.14` / `34.77.102.61`
replaced with `<dev-vm-ip>` / `<prod-vm-ip>`, `Groupon`/`GRPN`/`grpn`
with `Acme`/`another-customer`, and `prj-grp-…` with `prj-example-…`.
### Removed
- Customer-specific manual-deploy helper `scripts/grpn/Makefile` and its
@ -105,6 +118,22 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
`gcloud compute ssh <vm> --command "sed -i …/.env && sudo /usr/local/bin/agnes-auto-upgrade.sh"`
with their own VM details.
### Internal
- Sandbox blocklist now flags introspection-chain dunders explicitly:
`__subclasses__`, `__globals__`, `__class__`, `__base__`, `__bases__`,
`__mro__`, `__dict__`, `__code__`, `__builtins__`. `__init__` and
`__getattribute__` are intentionally **not** in the list — substring match
would flag every legitimate `def __init__(self):`. The chain breaks at
the next link anyway.
- New regression test `test_run_pwn_payload_blocked` parametrized over the
exact PoC from issue #44 plus two equivalent variants (lambda+`__globals__`,
`__mro__` traversal). If the dunder list is silently weakened in a future
refactor, the test fails. New `test_*_requires_admin` tests parametrized
over all three non-admin core roles (analyst, viewer, km_admin).
- `tests/conftest.py::seeded_app` extended with `viewer_token` and
`km_admin_token` so role-gating tests cover all four core roles.
## [0.11.5] — 2026-04-27
Follow-up release for PR #73: addresses four rounds of Devin AI review on the role-management-complete branch. No new public-API surface; the user-visible payoff is that v8→v9-migrated installations now work end-to-end (login flows, user list, admin nav, privilege revocation), and `make local-dev` startup is finally quiet.

View file

@ -5,15 +5,13 @@ import subprocess
import sys
import tempfile
import uuid
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Optional, List
from typing import Optional
import duckdb
from app.auth.dependencies import get_current_user, require_role, _get_db
from app.auth.dependencies import require_role, _get_db
from src.rbac import Role
from src.repositories.notifications import ScriptRepository
@ -43,9 +41,10 @@ class ScriptResponse(BaseModel):
@router.get("")
async def list_scripts(
user: dict = Depends(get_current_user),
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""List deployed scripts. Admin-only."""
repo = ScriptRepository(conn)
scripts = repo.list_all()
return {"scripts": scripts, "count": len(scripts)}
@ -54,10 +53,10 @@ async def list_scripts(
@router.post("/deploy", status_code=201)
async def deploy_script(
request: DeployScriptRequest,
user: dict = Depends(require_role(Role.ANALYST)),
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Deploy a Python script to be run on the server (optionally on schedule)."""
"""Deploy a Python script to be run on the server (optionally on schedule). Admin-only."""
repo = ScriptRepository(conn)
script_id = str(uuid.uuid4())
repo.deploy(
@ -76,10 +75,10 @@ async def deploy_script(
@router.post("/{script_id}/run")
async def run_deployed_script(
script_id: str,
user: dict = Depends(require_role(Role.ANALYST)),
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Run a deployed script by ID."""
"""Run a deployed script by ID. Admin-only."""
repo = ScriptRepository(conn)
script = repo.get(script_id)
if not script:
@ -90,9 +89,9 @@ async def run_deployed_script(
@router.post("/run")
async def run_adhoc_script(
request: RunScriptRequest,
user: dict = Depends(require_role(Role.ANALYST)),
user: dict = Depends(require_role(Role.ADMIN)),
):
"""Run an ad-hoc Python script (not deployed)."""
"""Run an ad-hoc Python script (not deployed). Admin-only."""
if not request.source:
raise HTTPException(status_code=400, detail="Script source required")
return _execute_script(request.source, request.name or "adhoc")
@ -111,7 +110,11 @@ async def undeploy_script(
def _execute_script(source: str, name: str) -> dict:
"""Execute a Python script in a sandboxed subprocess."""
"""Execute a Python script in a sandboxed subprocess.
The blocklist below is defense-in-depth, not a primary trust boundary.
The role gate on the route (admin-only) is the actual boundary; the
blocklist catches obvious mistakes, not a hostile admin."""
# Comprehensive safety checks — block dangerous patterns
blocked_patterns = [
# Direct imports of dangerous modules
@ -145,13 +148,26 @@ def _execute_script(source: str, name: str) -> dict:
"setattr(",
"delattr(",
"breakpoint(",
# Introspection-chain dunders that can pivot to RCE.
# `__init__`/`__getattribute__` deliberately omitted: substring
# match would flag every `def __init__(self):`.
"__subclasses__",
"__globals__",
"__class__",
"__base__",
"__bases__",
"__mro__",
"__dict__",
"__code__",
"__builtins__",
]
import ast
BLOCKED_MODULES = {"os", "sys", "subprocess", "shutil", "ctypes", "importlib", "socket",
"requests", "httpx", "urllib", "http", "signal", "pathlib", "builtins"}
BLOCKED_FUNCTIONS = {"exec", "eval", "compile", "open", "globals", "locals",
"getattr", "setattr", "delattr", "breakpoint", "__import__"}
"getattr", "setattr", "delattr", "breakpoint", "__import__",
"vars"}
try:
tree = ast.parse(source)

View file

@ -105,7 +105,8 @@ def write_test_parquet(path: str, data: list[dict]):
@pytest.fixture
def seeded_app(e2e_env):
"""FastAPI TestClient with seeded admin + analyst users, JWT tokens."""
"""FastAPI TestClient with all four core role tokens (admin, km_admin,
analyst, viewer). Use the role-specific token in role-gating tests."""
from src.db import get_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
@ -115,18 +116,24 @@ def seeded_app(e2e_env):
conn = get_system_db()
repo = UserRepository(conn)
repo.create(id="admin1", email="admin@test.com", name="Admin", role="admin")
repo.create(id="km_admin1", email="km@test.com", name="KM Admin", role="km_admin")
repo.create(id="analyst1", email="analyst@test.com", name="Analyst", role="analyst")
repo.create(id="viewer1", email="viewer@test.com", name="Viewer", role="viewer")
conn.close()
app = create_app()
client = TestClient(app)
admin_token = create_access_token("admin1", "admin@test.com", "admin")
km_admin_token = create_access_token("km_admin1", "km@test.com", "km_admin")
analyst_token = create_access_token("analyst1", "analyst@test.com", "analyst")
viewer_token = create_access_token("viewer1", "viewer@test.com", "viewer")
return {
"client": client,
"admin_token": admin_token,
"km_admin_token": km_admin_token,
"analyst_token": analyst_token,
"viewer_token": viewer_token,
"env": e2e_env,
}

View file

@ -37,14 +37,14 @@ def client(tmp_path, monkeypatch):
class TestScriptsAPI:
def test_list_scripts_empty(self, client):
c, _, analyst_token = client
resp = c.get("/api/scripts", headers={"Authorization": f"Bearer {analyst_token}"})
c, admin_token, _ = client
resp = c.get("/api/scripts", headers={"Authorization": f"Bearer {admin_token}"})
assert resp.status_code == 200
assert resp.json()["count"] == 0
def test_deploy_and_list(self, client):
c, _, analyst_token = client
headers = {"Authorization": f"Bearer {analyst_token}"}
c, admin_token, _ = client
headers = {"Authorization": f"Bearer {admin_token}"}
resp = c.post("/api/scripts/deploy", json={
"name": "hello", "source": "print('hello world')",
@ -56,8 +56,8 @@ class TestScriptsAPI:
assert resp.json()["count"] == 1
def test_run_script(self, client):
c, _, analyst_token = client
headers = {"Authorization": f"Bearer {analyst_token}"}
c, admin_token, _ = client
headers = {"Authorization": f"Bearer {admin_token}"}
resp = c.post("/api/scripts/run", json={
"source": "print('hello from script')", "name": "test",
@ -68,8 +68,8 @@ class TestScriptsAPI:
assert "hello from script" in data["stdout"]
def test_run_blocked_import(self, client):
c, _, analyst_token = client
headers = {"Authorization": f"Bearer {analyst_token}"}
c, admin_token, _ = client
headers = {"Authorization": f"Bearer {admin_token}"}
resp = c.post("/api/scripts/run", json={
"source": "import subprocess; subprocess.run(['ls'])", "name": "bad",
@ -79,18 +79,15 @@ class TestScriptsAPI:
assert "disallowed" in detail or "Blocked" in detail
def test_deploy_run_undeploy(self, client):
c, admin_token, analyst_token = client
analyst_headers = {"Authorization": f"Bearer {analyst_token}"}
c, admin_token, _ = client
admin_headers = {"Authorization": f"Bearer {admin_token}"}
# Deploy
resp = c.post("/api/scripts/deploy", json={
"name": "calc", "source": "print(2+2)", "schedule": "0 8 * * MON",
}, headers=analyst_headers)
}, headers=admin_headers)
script_id = resp.json()["id"]
# Run
resp = c.post(f"/api/scripts/{script_id}/run", headers=analyst_headers)
resp = c.post(f"/api/scripts/{script_id}/run", headers=admin_headers)
assert resp.status_code == 200
assert "4" in resp.json()["stdout"]

View file

@ -22,11 +22,22 @@ class TestScriptsList:
resp = c.get("/api/scripts")
assert resp.status_code == 401
@pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"])
def test_list_scripts_requires_admin(self, seeded_app, role_token):
"""Non-admin roles must not reach list_all() (returns source code)."""
c = seeded_app["client"]
resp = c.get(
"/api/scripts", headers=_auth(seeded_app[role_token])
)
assert resp.status_code == 403, (
f"role {role_token} should be denied list, got {resp.status_code}"
)
class TestScriptsDeploy:
def test_deploy_safe_script(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/scripts/deploy",
json={"name": "hello", "source": "print('hello world')"},
@ -39,7 +50,7 @@ class TestScriptsDeploy:
def test_deploy_with_schedule(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/scripts/deploy",
json={"name": "scheduled", "source": "print('scheduled')", "schedule": "0 8 * * MON"},
@ -52,18 +63,16 @@ class TestScriptsDeploy:
def test_deploy_script_with_blocked_import_deploys_ok_but_run_fails(self, seeded_app):
"""Deploy stores scripts as-is; safety validation happens at run time, not deploy time."""
c = seeded_app["client"]
token = seeded_app["analyst_token"]
# Deploy succeeds (no pre-validation at deploy time)
admin_token = seeded_app["admin_token"]
deploy_resp = c.post(
"/api/scripts/deploy",
json={"name": "bad_import", "source": "import os; print(os.getcwd())"},
headers=_auth(token),
headers=_auth(admin_token),
)
assert deploy_resp.status_code == 201
script_id = deploy_resp.json()["id"]
# Running it should fail with 400 due to blocked import
run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token))
run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(admin_token))
assert run_resp.status_code == 400
assert "Blocked" in run_resp.json()["detail"] or "disallowed" in run_resp.json()["detail"]
@ -75,15 +84,28 @@ class TestScriptsDeploy:
)
assert resp.status_code == 401
@pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"])
def test_deploy_requires_admin(self, seeded_app, role_token):
"""Only admin can deploy scripts."""
c = seeded_app["client"]
resp = c.post(
"/api/scripts/deploy",
json={"name": "blocked", "source": "print('x')"},
headers=_auth(seeded_app[role_token]),
)
assert resp.status_code == 403, (
f"role {role_token} should be denied deploy, got {resp.status_code}"
)
def test_deploy_appears_in_list(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
admin_token = seeded_app["admin_token"]
c.post(
"/api/scripts/deploy",
json={"name": "listed_script", "source": "x = 1"},
headers=_auth(token),
headers=_auth(admin_token),
)
resp = c.get("/api/scripts", headers=_auth(token))
resp = c.get("/api/scripts", headers=_auth(admin_token))
assert resp.json()["count"] >= 1
names = [s["name"] for s in resp.json()["scripts"]]
assert "listed_script" in names
@ -92,7 +114,7 @@ class TestScriptsDeploy:
class TestScriptsRun:
def test_run_adhoc_safe_script(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/scripts/run",
json={"source": "print('hello from adhoc')", "name": "adhoc_test"},
@ -105,7 +127,7 @@ class TestScriptsRun:
def test_run_adhoc_blocked_os_module(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/scripts/run",
json={"source": "import sys; print(sys.path)", "name": "bad"},
@ -121,9 +143,50 @@ class TestScriptsRun:
)
assert resp.status_code == 401
@pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"])
def test_run_adhoc_requires_admin(self, seeded_app, role_token):
"""Only admin can run ad-hoc scripts."""
c = seeded_app["client"]
resp = c.post(
"/api/scripts/run",
json={"source": "print('x')", "name": "should_be_blocked"},
headers=_auth(seeded_app[role_token]),
)
assert resp.status_code == 403, (
f"role {role_token} should be denied /run, got {resp.status_code}"
)
@pytest.mark.parametrize(
"pwn_payload",
[
# Exact PoC from issue #44 — pivot through __class__ chain to
# subprocess.Popen. After the role gate is in place, this returns
# 403 (admin-only). If the gate is ever lowered, the blocklist
# must catch the dunder pattern and return 400 — never 200.
"[c for c in ().__class__.__base__.__subclasses__() "
"if c.__name__ == 'Popen'][0](['id'], stdout=-1).communicate()",
# Variant: reach __builtins__ via frame globals.
"(lambda: None).__globals__['__builtins__'].__import__('os').system('id')",
# Variant: __mro__ traversal.
"type(()).__mro__[-1].__subclasses__()",
],
)
def test_run_pwn_payload_blocked(self, seeded_app, pwn_payload):
"""Dunder-chain PoC payloads must be rejected by the sandbox blocklist."""
c = seeded_app["client"]
resp = c.post(
"/api/scripts/run",
json={"source": pwn_payload, "name": "pwn"},
headers=_auth(seeded_app["admin_token"]),
)
assert resp.status_code == 400, (
f"PoC payload {pwn_payload!r} should be blocked at sandbox layer, "
f"got {resp.status_code} body={resp.json()}"
)
def test_run_adhoc_no_source_returns_400(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
token = seeded_app["admin_token"]
resp = c.post(
"/api/scripts/run",
json={"name": "no_source"},
@ -133,24 +196,38 @@ class TestScriptsRun:
def test_run_deployed_script(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
# Deploy first
admin_token = seeded_app["admin_token"]
deploy_resp = c.post(
"/api/scripts/deploy",
json={"name": "calc", "source": "print(2+2)"},
headers=_auth(token),
headers=_auth(admin_token),
)
assert deploy_resp.status_code == 201
script_id = deploy_resp.json()["id"]
# Run deployed script
resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token))
resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(admin_token))
assert resp.status_code == 200
assert "4" in resp.json()["stdout"]
@pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"])
def test_run_deployed_requires_admin(self, seeded_app, role_token):
"""Only admin can run a deployed script."""
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
deploy_resp = c.post(
"/api/scripts/deploy",
json={"name": "blocked_run", "source": "print('hi')"},
headers=_auth(admin_token),
)
script_id = deploy_resp.json()["id"]
resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(seeded_app[role_token]))
assert resp.status_code == 403, (
f"role {role_token} should be denied /run, got {resp.status_code}"
)
def test_run_nonexistent_script_returns_404(self, seeded_app):
c = seeded_app["client"]
token = seeded_app["analyst_token"]
token = seeded_app["admin_token"]
resp = c.post("/api/scripts/nonexistent-id/run", headers=_auth(token))
assert resp.status_code == 404
@ -162,11 +239,10 @@ class TestScriptsDelete:
admin_token = seeded_app["admin_token"]
analyst_token = seeded_app["analyst_token"]
# Deploy as analyst
deploy_resp = c.post(
"/api/scripts/deploy",
json={"name": "to_delete", "source": "print('bye')"},
headers=_auth(analyst_token),
headers=_auth(admin_token),
)
script_id = deploy_resp.json()["id"]

View file

@ -19,12 +19,14 @@ def client(tmp_path, monkeypatch):
from app.auth.jwt import create_access_token
conn = get_system_db()
UserRepository(conn).create(id="u1", email="user@test.com", name="User", role="analyst")
repo = UserRepository(conn)
repo.create(id="admin1", email="admin@test.com", name="Admin", role="admin")
repo.create(id="u1", email="user@test.com", name="User", role="analyst")
conn.close()
app = create_app()
c = TestClient(app)
token = create_access_token("u1", "user@test.com", "analyst")
token = create_access_token("admin1", "admin@test.com", "admin")
return c, token