diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ce4d5b..946b8e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,65 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C +### Changed + +- **BREAKING (security)**: The entire Script API is now **admin-only** (issue #44). + `GET /api/scripts`, `POST /api/scripts/deploy`, `POST /api/scripts/run`, and + `POST /api/scripts/{id}/run` all require the admin role; previously the list + endpoint was open to any authenticated user and deploy/run were analyst-accessible. + Two reasons: (1) the AST + string-blocklist sandbox in `_execute_script` is + defense-in-depth and known to be bypassable through introspection chains + (`__class__.__base__.__subclasses__()`, `__globals__['__builtins__']`, + `__mro__` traversal — the dunder pattern list was tightened in this PR but + the policy is "the role gate is the trust boundary, not the blocklist"); + (2) gating only `/run` left a planted-script attack open — an analyst could + deploy a malicious script and wait for an admin to run it. Operators who + need scripted workflows for non-admin users should run them on the user's + behalf or expose the relevant data via the read-only `/api/data` surface + instead. +- **BREAKING (ops)**: Generic ops scripts moved out of the customer-named + `scripts/grpn/` directory into `scripts/ops/` as part of the OSS + vendor-neutralization (issue #88): + - `scripts/grpn/agnes-tls-rotate.sh` → `scripts/ops/agnes-tls-rotate.sh` + - `scripts/grpn/agnes-auto-upgrade.sh` → `scripts/ops/agnes-auto-upgrade.sh` + + Downstream consumer infra repos that copy these scripts onto VMs (e.g. via + their own `startup.sh`) must update the source path. The OSS-shipped + `infra/modules/customer-instance/` Terraform module is unaffected — it + embeds equivalent logic inline via heredoc and does not source-by-path + from `scripts/`. Script behaviour and env vars are unchanged. Cross-refs + in `README.md`, `CLAUDE.md`, `docs/DEPLOYMENT.md`, `Caddyfile`, and + `docker-compose.yml` were updated. +- **OSS neutralization (wave 2 — code, tests, planning docs)**. Customer + identifiers replaced with placeholders across the codebase to ready the + repo for public release (issue #88): + + - **Code docstrings**: `connectors/openmetadata/{client,transformer,enricher}.py`, + `src/catalog_export.py`, `scripts/duckdb_manager.py` — `prj-grp-…` → + `my-bq-project` / `prj-example-1234`, `AIAgent.FoundryAI` → + `AIAgent.MyAgent` (in docstrings) / `AIAgent.Example` (in test fixtures), + `FoundryAIDataModel` → `AnalyticsDataModel`. + - **Test fixtures** in `tests/test_openmetadata_enricher.py`, + `tests/test_duckdb_manager.py`, `tests/test_catalog_export.py`, + `tests/test_openmetadata_transformer.py` — same set of replacements, + behaviour-preserving (157 tests still green). + - **Terraform module** `infra/modules/customer-instance/variables.tf`: + `customer_name` description rewritten in English, examples switched + from `keboola, grpn` to `acme, example`. + - **Workflow** `.github/workflows/keboola-deploy.yml`: comment "Groupon-side + dev VMs" → generic "per-developer dev VMs". + - **Caddyfile**: TLS-rotation cross-ref updated to `scripts/ops/…` and + Keboola-specific aside removed. + - **Auth docs** `docs/auth-groups.md` and the OAuth probe in + `scripts/debug/probe_google_groups.py`: GCP project name `kids-ai-data-analysis` + replaced with placeholder `acme-internal-prod`. + - **Planning docs** under `docs/superpowers/plans/` and `…/specs/`: the + five hackathon-era documents (`2026-04-21-deployment-log.md`, + `…-multi-customer-deployment.md`, `…-issues-14-and-10.md`, + `…-hackathon-dry-run.md`, the spec) had `34.77.94.14` / `34.77.102.61` + replaced with `` / ``, `Groupon`/`GRPN`/`grpn` + with `Acme`/`another-customer`, and `prj-grp-…` with `prj-example-…`. + ### Fixed - **BREAKING (security CRITICAL)**: Jira webhook handler is now @@ -46,52 +105,6 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C clips length to 64 chars, and routes the final filename through `safe_join_under`. -### Changed - -- **BREAKING (ops)**: Generic ops scripts moved out of the customer-named - `scripts/grpn/` directory into `scripts/ops/` as part of the OSS - vendor-neutralization (issue #88): - - `scripts/grpn/agnes-tls-rotate.sh` → `scripts/ops/agnes-tls-rotate.sh` - - `scripts/grpn/agnes-auto-upgrade.sh` → `scripts/ops/agnes-auto-upgrade.sh` - - Downstream consumer infra repos that copy these scripts onto VMs (e.g. via - their own `startup.sh`) must update the source path. The OSS-shipped - `infra/modules/customer-instance/` Terraform module is unaffected — it - embeds equivalent logic inline via heredoc and does not source-by-path - from `scripts/`. Script behaviour and env vars are unchanged. Cross-refs - in `README.md`, `CLAUDE.md`, `docs/DEPLOYMENT.md`, `Caddyfile`, and - `docker-compose.yml` were updated. - -- **OSS neutralization (wave 2 — code, tests, planning docs)**. Customer - identifiers replaced with placeholders across the codebase to ready the - repo for public release (issue #88): - - - **Code docstrings**: `connectors/openmetadata/{client,transformer,enricher}.py`, - `src/catalog_export.py`, `scripts/duckdb_manager.py` — `prj-grp-…` → - `my-bq-project` / `prj-example-1234`, `AIAgent.FoundryAI` → - `AIAgent.MyAgent` (in docstrings) / `AIAgent.Example` (in test fixtures), - `FoundryAIDataModel` → `AnalyticsDataModel`. - - **Test fixtures** in `tests/test_openmetadata_enricher.py`, - `tests/test_duckdb_manager.py`, `tests/test_catalog_export.py`, - `tests/test_openmetadata_transformer.py` — same set of replacements, - behaviour-preserving (157 tests still green). - - **Terraform module** `infra/modules/customer-instance/variables.tf`: - `customer_name` description rewritten in English, examples switched - from `keboola, grpn` to `acme, example`. - - **Workflow** `.github/workflows/keboola-deploy.yml`: comment "Groupon-side - dev VMs" → generic "per-developer dev VMs". - - **Caddyfile**: TLS-rotation cross-ref updated to `scripts/ops/…` and - Keboola-specific aside removed. - - **Auth docs** `docs/auth-groups.md` and the OAuth probe in - `scripts/debug/probe_google_groups.py`: GCP project name `kids-ai-data-analysis` - replaced with placeholder `acme-internal-prod`. - - **Planning docs** under `docs/superpowers/plans/` and `…/specs/`: the - five hackathon-era documents (`2026-04-21-deployment-log.md`, - `…-multi-customer-deployment.md`, `…-issues-14-and-10.md`, - `…-hackathon-dry-run.md`, the spec) had `34.77.94.14` / `34.77.102.61` - replaced with `` / ``, `Groupon`/`GRPN`/`grpn` - with `Acme`/`another-customer`, and `prj-grp-…` with `prj-example-…`. - ### Removed - Customer-specific manual-deploy helper `scripts/grpn/Makefile` and its @@ -105,6 +118,22 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C `gcloud compute ssh --command "sed -i …/.env && sudo /usr/local/bin/agnes-auto-upgrade.sh"` with their own VM details. +### Internal + +- Sandbox blocklist now flags introspection-chain dunders explicitly: + `__subclasses__`, `__globals__`, `__class__`, `__base__`, `__bases__`, + `__mro__`, `__dict__`, `__code__`, `__builtins__`. `__init__` and + `__getattribute__` are intentionally **not** in the list — substring match + would flag every legitimate `def __init__(self):`. The chain breaks at + the next link anyway. +- New regression test `test_run_pwn_payload_blocked` parametrized over the + exact PoC from issue #44 plus two equivalent variants (lambda+`__globals__`, + `__mro__` traversal). If the dunder list is silently weakened in a future + refactor, the test fails. New `test_*_requires_admin` tests parametrized + over all three non-admin core roles (analyst, viewer, km_admin). +- `tests/conftest.py::seeded_app` extended with `viewer_token` and + `km_admin_token` so role-gating tests cover all four core roles. + ## [0.11.5] — 2026-04-27 Follow-up release for PR #73: addresses four rounds of Devin AI review on the role-management-complete branch. No new public-API surface; the user-visible payoff is that v8→v9-migrated installations now work end-to-end (login flows, user list, admin nav, privilege revocation), and `make local-dev` startup is finally quiet. diff --git a/app/api/scripts.py b/app/api/scripts.py index 0171307..54effa9 100644 --- a/app/api/scripts.py +++ b/app/api/scripts.py @@ -5,15 +5,13 @@ import subprocess import sys import tempfile import uuid -from pathlib import Path - from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel -from typing import Optional, List +from typing import Optional import duckdb -from app.auth.dependencies import get_current_user, require_role, _get_db +from app.auth.dependencies import require_role, _get_db from src.rbac import Role from src.repositories.notifications import ScriptRepository @@ -43,9 +41,10 @@ class ScriptResponse(BaseModel): @router.get("") async def list_scripts( - user: dict = Depends(get_current_user), + user: dict = Depends(require_role(Role.ADMIN)), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): + """List deployed scripts. Admin-only.""" repo = ScriptRepository(conn) scripts = repo.list_all() return {"scripts": scripts, "count": len(scripts)} @@ -54,10 +53,10 @@ async def list_scripts( @router.post("/deploy", status_code=201) async def deploy_script( request: DeployScriptRequest, - user: dict = Depends(require_role(Role.ANALYST)), + user: dict = Depends(require_role(Role.ADMIN)), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Deploy a Python script to be run on the server (optionally on schedule).""" + """Deploy a Python script to be run on the server (optionally on schedule). Admin-only.""" repo = ScriptRepository(conn) script_id = str(uuid.uuid4()) repo.deploy( @@ -76,10 +75,10 @@ async def deploy_script( @router.post("/{script_id}/run") async def run_deployed_script( script_id: str, - user: dict = Depends(require_role(Role.ANALYST)), + user: dict = Depends(require_role(Role.ADMIN)), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Run a deployed script by ID.""" + """Run a deployed script by ID. Admin-only.""" repo = ScriptRepository(conn) script = repo.get(script_id) if not script: @@ -90,9 +89,9 @@ async def run_deployed_script( @router.post("/run") async def run_adhoc_script( request: RunScriptRequest, - user: dict = Depends(require_role(Role.ANALYST)), + user: dict = Depends(require_role(Role.ADMIN)), ): - """Run an ad-hoc Python script (not deployed).""" + """Run an ad-hoc Python script (not deployed). Admin-only.""" if not request.source: raise HTTPException(status_code=400, detail="Script source required") return _execute_script(request.source, request.name or "adhoc") @@ -111,7 +110,11 @@ async def undeploy_script( def _execute_script(source: str, name: str) -> dict: - """Execute a Python script in a sandboxed subprocess.""" + """Execute a Python script in a sandboxed subprocess. + + The blocklist below is defense-in-depth, not a primary trust boundary. + The role gate on the route (admin-only) is the actual boundary; the + blocklist catches obvious mistakes, not a hostile admin.""" # Comprehensive safety checks — block dangerous patterns blocked_patterns = [ # Direct imports of dangerous modules @@ -145,13 +148,26 @@ def _execute_script(source: str, name: str) -> dict: "setattr(", "delattr(", "breakpoint(", + # Introspection-chain dunders that can pivot to RCE. + # `__init__`/`__getattribute__` deliberately omitted: substring + # match would flag every `def __init__(self):`. + "__subclasses__", + "__globals__", + "__class__", + "__base__", + "__bases__", + "__mro__", + "__dict__", + "__code__", + "__builtins__", ] import ast BLOCKED_MODULES = {"os", "sys", "subprocess", "shutil", "ctypes", "importlib", "socket", "requests", "httpx", "urllib", "http", "signal", "pathlib", "builtins"} BLOCKED_FUNCTIONS = {"exec", "eval", "compile", "open", "globals", "locals", - "getattr", "setattr", "delattr", "breakpoint", "__import__"} + "getattr", "setattr", "delattr", "breakpoint", "__import__", + "vars"} try: tree = ast.parse(source) diff --git a/tests/conftest.py b/tests/conftest.py index 922a39a..d2a8fa6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -105,7 +105,8 @@ def write_test_parquet(path: str, data: list[dict]): @pytest.fixture def seeded_app(e2e_env): - """FastAPI TestClient with seeded admin + analyst users, JWT tokens.""" + """FastAPI TestClient with all four core role tokens (admin, km_admin, + analyst, viewer). Use the role-specific token in role-gating tests.""" from src.db import get_system_db from src.repositories.users import UserRepository from app.auth.jwt import create_access_token @@ -115,18 +116,24 @@ def seeded_app(e2e_env): conn = get_system_db() repo = UserRepository(conn) repo.create(id="admin1", email="admin@test.com", name="Admin", role="admin") + repo.create(id="km_admin1", email="km@test.com", name="KM Admin", role="km_admin") repo.create(id="analyst1", email="analyst@test.com", name="Analyst", role="analyst") + repo.create(id="viewer1", email="viewer@test.com", name="Viewer", role="viewer") conn.close() app = create_app() client = TestClient(app) admin_token = create_access_token("admin1", "admin@test.com", "admin") + km_admin_token = create_access_token("km_admin1", "km@test.com", "km_admin") analyst_token = create_access_token("analyst1", "analyst@test.com", "analyst") + viewer_token = create_access_token("viewer1", "viewer@test.com", "viewer") return { "client": client, "admin_token": admin_token, + "km_admin_token": km_admin_token, "analyst_token": analyst_token, + "viewer_token": viewer_token, "env": e2e_env, } diff --git a/tests/test_api_scripts.py b/tests/test_api_scripts.py index e2d4969..29281eb 100644 --- a/tests/test_api_scripts.py +++ b/tests/test_api_scripts.py @@ -37,14 +37,14 @@ def client(tmp_path, monkeypatch): class TestScriptsAPI: def test_list_scripts_empty(self, client): - c, _, analyst_token = client - resp = c.get("/api/scripts", headers={"Authorization": f"Bearer {analyst_token}"}) + c, admin_token, _ = client + resp = c.get("/api/scripts", headers={"Authorization": f"Bearer {admin_token}"}) assert resp.status_code == 200 assert resp.json()["count"] == 0 def test_deploy_and_list(self, client): - c, _, analyst_token = client - headers = {"Authorization": f"Bearer {analyst_token}"} + c, admin_token, _ = client + headers = {"Authorization": f"Bearer {admin_token}"} resp = c.post("/api/scripts/deploy", json={ "name": "hello", "source": "print('hello world')", @@ -56,8 +56,8 @@ class TestScriptsAPI: assert resp.json()["count"] == 1 def test_run_script(self, client): - c, _, analyst_token = client - headers = {"Authorization": f"Bearer {analyst_token}"} + c, admin_token, _ = client + headers = {"Authorization": f"Bearer {admin_token}"} resp = c.post("/api/scripts/run", json={ "source": "print('hello from script')", "name": "test", @@ -68,8 +68,8 @@ class TestScriptsAPI: assert "hello from script" in data["stdout"] def test_run_blocked_import(self, client): - c, _, analyst_token = client - headers = {"Authorization": f"Bearer {analyst_token}"} + c, admin_token, _ = client + headers = {"Authorization": f"Bearer {admin_token}"} resp = c.post("/api/scripts/run", json={ "source": "import subprocess; subprocess.run(['ls'])", "name": "bad", @@ -79,18 +79,15 @@ class TestScriptsAPI: assert "disallowed" in detail or "Blocked" in detail def test_deploy_run_undeploy(self, client): - c, admin_token, analyst_token = client - analyst_headers = {"Authorization": f"Bearer {analyst_token}"} + c, admin_token, _ = client admin_headers = {"Authorization": f"Bearer {admin_token}"} - # Deploy resp = c.post("/api/scripts/deploy", json={ "name": "calc", "source": "print(2+2)", "schedule": "0 8 * * MON", - }, headers=analyst_headers) + }, headers=admin_headers) script_id = resp.json()["id"] - # Run - resp = c.post(f"/api/scripts/{script_id}/run", headers=analyst_headers) + resp = c.post(f"/api/scripts/{script_id}/run", headers=admin_headers) assert resp.status_code == 200 assert "4" in resp.json()["stdout"] diff --git a/tests/test_scripts_api.py b/tests/test_scripts_api.py index 79ac002..1e901c9 100644 --- a/tests/test_scripts_api.py +++ b/tests/test_scripts_api.py @@ -22,11 +22,22 @@ class TestScriptsList: resp = c.get("/api/scripts") assert resp.status_code == 401 + @pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"]) + def test_list_scripts_requires_admin(self, seeded_app, role_token): + """Non-admin roles must not reach list_all() (returns source code).""" + c = seeded_app["client"] + resp = c.get( + "/api/scripts", headers=_auth(seeded_app[role_token]) + ) + assert resp.status_code == 403, ( + f"role {role_token} should be denied list, got {resp.status_code}" + ) + class TestScriptsDeploy: def test_deploy_safe_script(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] + token = seeded_app["admin_token"] resp = c.post( "/api/scripts/deploy", json={"name": "hello", "source": "print('hello world')"}, @@ -39,7 +50,7 @@ class TestScriptsDeploy: def test_deploy_with_schedule(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] + token = seeded_app["admin_token"] resp = c.post( "/api/scripts/deploy", json={"name": "scheduled", "source": "print('scheduled')", "schedule": "0 8 * * MON"}, @@ -52,18 +63,16 @@ class TestScriptsDeploy: def test_deploy_script_with_blocked_import_deploys_ok_but_run_fails(self, seeded_app): """Deploy stores scripts as-is; safety validation happens at run time, not deploy time.""" c = seeded_app["client"] - token = seeded_app["analyst_token"] - # Deploy succeeds (no pre-validation at deploy time) + admin_token = seeded_app["admin_token"] deploy_resp = c.post( "/api/scripts/deploy", json={"name": "bad_import", "source": "import os; print(os.getcwd())"}, - headers=_auth(token), + headers=_auth(admin_token), ) assert deploy_resp.status_code == 201 script_id = deploy_resp.json()["id"] - # Running it should fail with 400 due to blocked import - run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token)) + run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(admin_token)) assert run_resp.status_code == 400 assert "Blocked" in run_resp.json()["detail"] or "disallowed" in run_resp.json()["detail"] @@ -75,15 +84,28 @@ class TestScriptsDeploy: ) assert resp.status_code == 401 + @pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"]) + def test_deploy_requires_admin(self, seeded_app, role_token): + """Only admin can deploy scripts.""" + c = seeded_app["client"] + resp = c.post( + "/api/scripts/deploy", + json={"name": "blocked", "source": "print('x')"}, + headers=_auth(seeded_app[role_token]), + ) + assert resp.status_code == 403, ( + f"role {role_token} should be denied deploy, got {resp.status_code}" + ) + def test_deploy_appears_in_list(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] + admin_token = seeded_app["admin_token"] c.post( "/api/scripts/deploy", json={"name": "listed_script", "source": "x = 1"}, - headers=_auth(token), + headers=_auth(admin_token), ) - resp = c.get("/api/scripts", headers=_auth(token)) + resp = c.get("/api/scripts", headers=_auth(admin_token)) assert resp.json()["count"] >= 1 names = [s["name"] for s in resp.json()["scripts"]] assert "listed_script" in names @@ -92,7 +114,7 @@ class TestScriptsDeploy: class TestScriptsRun: def test_run_adhoc_safe_script(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] + token = seeded_app["admin_token"] resp = c.post( "/api/scripts/run", json={"source": "print('hello from adhoc')", "name": "adhoc_test"}, @@ -105,7 +127,7 @@ class TestScriptsRun: def test_run_adhoc_blocked_os_module(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] + token = seeded_app["admin_token"] resp = c.post( "/api/scripts/run", json={"source": "import sys; print(sys.path)", "name": "bad"}, @@ -121,9 +143,50 @@ class TestScriptsRun: ) assert resp.status_code == 401 + @pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"]) + def test_run_adhoc_requires_admin(self, seeded_app, role_token): + """Only admin can run ad-hoc scripts.""" + c = seeded_app["client"] + resp = c.post( + "/api/scripts/run", + json={"source": "print('x')", "name": "should_be_blocked"}, + headers=_auth(seeded_app[role_token]), + ) + assert resp.status_code == 403, ( + f"role {role_token} should be denied /run, got {resp.status_code}" + ) + + @pytest.mark.parametrize( + "pwn_payload", + [ + # Exact PoC from issue #44 — pivot through __class__ chain to + # subprocess.Popen. After the role gate is in place, this returns + # 403 (admin-only). If the gate is ever lowered, the blocklist + # must catch the dunder pattern and return 400 — never 200. + "[c for c in ().__class__.__base__.__subclasses__() " + "if c.__name__ == 'Popen'][0](['id'], stdout=-1).communicate()", + # Variant: reach __builtins__ via frame globals. + "(lambda: None).__globals__['__builtins__'].__import__('os').system('id')", + # Variant: __mro__ traversal. + "type(()).__mro__[-1].__subclasses__()", + ], + ) + def test_run_pwn_payload_blocked(self, seeded_app, pwn_payload): + """Dunder-chain PoC payloads must be rejected by the sandbox blocklist.""" + c = seeded_app["client"] + resp = c.post( + "/api/scripts/run", + json={"source": pwn_payload, "name": "pwn"}, + headers=_auth(seeded_app["admin_token"]), + ) + assert resp.status_code == 400, ( + f"PoC payload {pwn_payload!r} should be blocked at sandbox layer, " + f"got {resp.status_code} body={resp.json()}" + ) + def test_run_adhoc_no_source_returns_400(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] + token = seeded_app["admin_token"] resp = c.post( "/api/scripts/run", json={"name": "no_source"}, @@ -133,24 +196,38 @@ class TestScriptsRun: def test_run_deployed_script(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] - # Deploy first + admin_token = seeded_app["admin_token"] deploy_resp = c.post( "/api/scripts/deploy", json={"name": "calc", "source": "print(2+2)"}, - headers=_auth(token), + headers=_auth(admin_token), ) assert deploy_resp.status_code == 201 script_id = deploy_resp.json()["id"] - # Run deployed script - resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(token)) + resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(admin_token)) assert resp.status_code == 200 assert "4" in resp.json()["stdout"] + @pytest.mark.parametrize("role_token", ["analyst_token", "viewer_token", "km_admin_token"]) + def test_run_deployed_requires_admin(self, seeded_app, role_token): + """Only admin can run a deployed script.""" + c = seeded_app["client"] + admin_token = seeded_app["admin_token"] + deploy_resp = c.post( + "/api/scripts/deploy", + json={"name": "blocked_run", "source": "print('hi')"}, + headers=_auth(admin_token), + ) + script_id = deploy_resp.json()["id"] + resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(seeded_app[role_token])) + assert resp.status_code == 403, ( + f"role {role_token} should be denied /run, got {resp.status_code}" + ) + def test_run_nonexistent_script_returns_404(self, seeded_app): c = seeded_app["client"] - token = seeded_app["analyst_token"] + token = seeded_app["admin_token"] resp = c.post("/api/scripts/nonexistent-id/run", headers=_auth(token)) assert resp.status_code == 404 @@ -162,11 +239,10 @@ class TestScriptsDelete: admin_token = seeded_app["admin_token"] analyst_token = seeded_app["analyst_token"] - # Deploy as analyst deploy_resp = c.post( "/api/scripts/deploy", json={"name": "to_delete", "source": "print('bye')"}, - headers=_auth(analyst_token), + headers=_auth(admin_token), ) script_id = deploy_resp.json()["id"] diff --git a/tests/test_security.py b/tests/test_security.py index 16f4c59..21464ec 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -19,12 +19,14 @@ def client(tmp_path, monkeypatch): from app.auth.jwt import create_access_token conn = get_system_db() - UserRepository(conn).create(id="u1", email="user@test.com", name="User", role="analyst") + repo = UserRepository(conn) + repo.create(id="admin1", email="admin@test.com", name="Admin", role="admin") + repo.create(id="u1", email="user@test.com", name="User", role="analyst") conn.close() app = create_app() c = TestClient(app) - token = create_access_token("u1", "user@test.com", "analyst") + token = create_access_token("admin1", "admin@test.com", "admin") return c, token