agnes-the-ai-analyst/tests/test_jira_webhooks.py
Vojtech c5948f26fc
fix(api): harden API surface before Swagger (issue #336) (#339)
* fix(api): harden API surface before Swagger — 9 findings from issue #336

ADV-001: POST /api/sync/table-subscriptions now checks can_access() per
table entry, matching the gate already on POST /api/sync/settings.

ADV-002: GET /webhooks/jira/health gated behind require_admin; jira_domain
removed from response to prevent anonymous info disclosure.

ADV-003: GET /api/version no longer exposes commit_sha or schema_version.

ADV-005: /docs, /redoc, /openapi.json now require a valid session via custom
FastAPI routes (docs_url=None, redoc_url=None, openapi_url=None).

ADV-006: /cli/ and /webhooks/ added to _API_PATH_PREFIXES so future
auth-gated routes there return JSON 401 not an HTML redirect.

ADV-007: GET /api/catalog/tables wired to CatalogTablesResponse model.

ADV-008: TableSubscriptionUpdate.tables capped at max_length=500.

ADV-009: GET /api/users and GET /auth/admin/tokens accept limit/offset
(default 1000, max 10000); repositories updated accordingly.

Tests: 11 new regression tests in TestApiHardening336; test_jira_webhooks
fixture updated with seeded admin user; OpenAPI snapshot regenerated.

* fix(test): update test_journey_jira health check to use admin auth after ADV-002 gate

* fix(security): close /auth/bootstrap auth-bypass + BREAKING markers on ADV-002/003/005

Reviewer-flagged regression introduced by ADV-009's pagination on
UserRepository.list_all(): the silent default LIMIT 1000 broke the
bootstrap check at app/auth/router.py and the startup no-password
warning at app/main.py — both call list_all() with no args and depend
on exhaustive enumeration.

On an instance with >1000 users where no password-holder lands in
the email-sorted first page, [u for u in list_all() if
u.get('password_hash')] becomes empty → bootstrap re-opens → an
unauthenticated caller can claim admin via /auth/bootstrap. Real
auth-bypass on a security-sensitive boot path.

Fix:
- src/repositories/users.py: list_all() restored to no-arg, returns
  EVERY row (no LIMIT). Comment explicitly warns against re-adding
  pagination here. API-surface pagination moved to a new
  list_paginated(limit, offset) method with its own docstring.
- app/api/users.py: GET /api/users now calls list_paginated().
  Existing query-param validation (limit <= 10000) preserved.

Regression guards in tests/test_security.py::TestApiHardening336:
- test_users_list_all_returns_every_row_no_silent_limit asserts
  list_all() takes no params other than self (via inspect.signature)
  so a future cleanup can't accidentally re-add limit/offset.
- test_users_list_paginated_is_separate_method asserts the
  paginated variant is a distinct method, not an overload.

CHANGELOG: added **BREAKING** markers per CLAUDE.md release
discipline to three pre-existing ADV bullets that are observable
breaking changes for external consumers:
- ADV-002 (webhook health going from anonymous to admin-only)
- ADV-003 (/api/version dropping commit_sha + schema_version)
- ADV-005 (/docs, /redoc, /openapi.json going from anonymous to
  session-required)

* release: 0.54.25 — API hardening before Swagger (ADV-001..009) + bootstrap-bypass regression fix

---------

Co-authored-by: ZdenekSrotyr <zdenek.srotyr@keboola.com>
2026-05-18 15:13:21 +02:00

539 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for Jira webhook FastAPI router."""
import hashlib
import hmac
import json
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
def _sign(payload: bytes, secret: str) -> str:
"""Compute sha256=<HMAC hex> for a given payload and secret."""
mac = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest()
return f"sha256={mac}"
@pytest.fixture()
def webhook_client(tmp_path, monkeypatch):
"""Create a TestClient with required env vars, dirs, and a seeded admin user."""
data_dir = tmp_path / "data"
data_dir.mkdir()
(data_dir / "issues").mkdir()
state_dir = tmp_path / "state"
state_dir.mkdir()
monkeypatch.setenv("DATA_DIR", str(data_dir))
monkeypatch.setenv("STATE_DIR", str(state_dir))
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret")
monkeypatch.setenv("JIRA_WEBHOOK_SECRET", "test-webhook-secret")
monkeypatch.setenv("JIRA_DATA_DIR", str(data_dir))
# Re-read env into Config (class attrs read os.environ at import time)
from connectors.jira import service as svc
monkeypatch.setattr(svc.Config, "JIRA_WEBHOOK_SECRET", "test-webhook-secret")
monkeypatch.setattr(svc.Config, "JIRA_DATA_DIR", data_dir)
# Reset singleton so it picks up fresh Config values
svc._jira_service = None
from src.db import SYSTEM_ADMIN_GROUP, get_system_db
from src.repositories.user_group_members import UserGroupMembersRepository
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import create_app
conn = get_system_db()
UserRepository(conn).create(id="wh_admin", email="whadmin@test.com", name="WH Admin")
admin_gid = conn.execute(
"SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP]
).fetchone()[0]
UserGroupMembersRepository(conn).add_member("wh_admin", admin_gid, source="system_seed")
conn.close()
app = create_app()
admin_token = create_access_token("wh_admin", "whadmin@test.com")
return {"client": TestClient(app), "admin_token": admin_token}
def test_health_requires_auth(webhook_client):
"""GET /webhooks/jira/health returns 401 without credentials (ADV-002)."""
resp = webhook_client["client"].get("/webhooks/jira/health")
assert resp.status_code == 401
def test_health(webhook_client):
"""GET /webhooks/jira/health returns 200 for admin; jira_domain not exposed."""
headers = {"Authorization": f"Bearer {webhook_client['admin_token']}"}
resp = webhook_client["client"].get("/webhooks/jira/health", headers=headers)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ok"
assert "webhook_secret_set" in body
assert "jira_domain" not in body
def test_missing_signature_401(webhook_client):
"""POST without signature header returns 401."""
payload = json.dumps({"webhookEvent": "jira:issue_updated", "issue": {"key": "TEST-1"}}).encode()
resp = webhook_client["client"].post("/webhooks/jira", content=payload, headers={"Content-Type": "application/json"})
assert resp.status_code == 401
def test_invalid_signature_401(webhook_client):
"""POST with wrong signature returns 401."""
payload = json.dumps({"webhookEvent": "jira:issue_updated", "issue": {"key": "TEST-1"}}).encode()
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": "sha256=badhex",
},
)
assert resp.status_code == 401
def test_valid_signature_accepted(webhook_client):
"""POST with correct HMAC-SHA256 passes signature check (not 401)."""
from unittest.mock import patch
payload = json.dumps({"webhookEvent": "jira:issue_updated", "issue": {"key": "TEST-1"}}).encode()
sig = _sign(payload, "test-webhook-secret")
# Mock process_webhook_event so the test only checks HMAC validation,
# not the full Jira API flow (which requires a real Jira connection).
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = True
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 200
def test_empty_payload_400(webhook_client):
"""POST with empty body and valid signature returns 400."""
payload = b""
sig = _sign(payload, "test-webhook-secret")
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 400
def test_unconfigured_secret_returns_503(tmp_path, monkeypatch):
"""Issue #83: missing JIRA_WEBHOOK_SECRET must fail-closed (no fall-through to 200)."""
data_dir = tmp_path / "data"
data_dir.mkdir()
(data_dir / "issues").mkdir()
monkeypatch.setenv("DATA_DIR", str(data_dir))
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret")
monkeypatch.delenv("JIRA_WEBHOOK_SECRET", raising=False)
monkeypatch.setenv("JIRA_DATA_DIR", str(data_dir))
from connectors.jira import service as svc
monkeypatch.setattr(svc.Config, "JIRA_WEBHOOK_SECRET", "")
monkeypatch.setattr(svc.Config, "JIRA_DATA_DIR", data_dir)
svc._jira_service = None
from app.main import create_app
client = TestClient(create_app())
payload = json.dumps({"webhookEvent": "jira:issue_updated", "issue": {"key": "TEST-1"}}).encode()
resp = client.post(
"/webhooks/jira",
content=payload,
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 503
assert "secret" in resp.json()["detail"].lower()
@pytest.mark.parametrize(
"bad_key",
[
"../../etc/passwd",
"../foo",
"TEST-1/../../../bar",
"TEST-1\x00.json",
"TEST-1\r\n", # CRLF injection
"test-1", # lowercase project — Jira keys are uppercase
"TEST", # missing -<num>
"TEST-", # missing num
"-1", # missing project
"", # empty
"A" * 100 + "-1", # absurd length
"ABC_DEF-1", # underscore — not allowed in real Jira
"А-1", # Cyrillic А (looks like Latin A)
],
)
def test_path_traversal_in_issue_key_rejected(webhook_client, bad_key):
"""Issue #83: malformed issue keys must be rejected with 400, not used in paths."""
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": bad_key},
}).encode()
sig = _sign(payload, "test-webhook-secret")
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 400, f"key {bad_key!r} should have been rejected, got {resp.status_code}"
def test_null_issue_field_does_not_crash(webhook_client):
"""Issue #83 round-5: a payload with `issue: null` (not just missing)
used to raise AttributeError on `issue.get('key')` → unhandled 500.
The handler now normalises None to {} and falls through to the
400 'Malformed or missing issue key' response."""
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": None,
}).encode()
sig = _sign(payload, "test-webhook-secret")
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 400
assert "issue key" in resp.json()["detail"].lower()
def test_valid_issue_key_accepted(webhook_client):
"""Sanity: a well-formed issue key still passes validation."""
from unittest.mock import patch
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-42"},
}).encode()
sig = _sign(payload, "test-webhook-secret")
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = True
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 200
def test_webhook_event_path_traversal_sanitized(webhook_client, tmp_path, monkeypatch):
"""Issue #83: `webhookEvent` is attacker-controlled and was used to build
the webhook log filename. A payload with `../../tmp/pwn` for `webhookEvent`
must NOT escape the WEBHOOK_LOG_DIR; the file (if written at all) lands
under WEBHOOK_LOG_DIR with the traversal characters sanitized."""
from unittest.mock import patch
import app.api.jira_webhooks as wh
log_dir = tmp_path / "webhook_log"
log_dir.mkdir()
monkeypatch.setattr(wh, "WEBHOOK_LOG_DIR", log_dir)
payload = json.dumps({
"webhookEvent": "../../tmp/pwn",
"issue": {"key": "TEST-1"},
}).encode()
sig = _sign(payload, "test-webhook-secret")
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = True
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 200
# No file landed outside log_dir.
parent = log_dir.parent
assert not (parent / "tmp" / "pwn.json").exists(), "path traversal succeeded"
# Either nothing was written (refused), or file is under log_dir with
# traversal chars replaced by underscores.
written = list(log_dir.glob("*.json"))
for f in written:
assert f.is_relative_to(log_dir), f"file {f} escaped log dir"
assert "/" not in f.name and ".." not in f.name
# ---------------------------------------------------------------------------
# Additional HMAC validation + error handling tests
# ---------------------------------------------------------------------------
def test_valid_hmac_signature_accepted(webhook_client):
"""Webhook with valid HMAC-SHA256 signature is accepted (200)."""
from unittest.mock import patch
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-1"},
}).encode()
sig = _sign(payload, "test-webhook-secret")
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = True
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 200
def test_invalid_hmac_signature_rejected_401(webhook_client):
"""Webhook with wrong HMAC signature is rejected with 401."""
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-1"},
}).encode()
# Sign with the wrong secret
sig = _sign(payload, "wrong-secret")
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 401
def test_missing_signature_header_rejected(webhook_client):
"""Webhook with no signature header at all is rejected with 401."""
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-1"},
}).encode()
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 401
def test_x_hub_signature_legacy_header_accepted(webhook_client):
"""X-Hub-Signature (SHA1 legacy) header is also checked."""
from unittest.mock import patch
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-1"},
}).encode()
# The handler falls back to X-Hub-Signature if X-Hub-Signature-256 is absent.
# _verify_signature strips "sha256=" prefix; for sha1 it strips "sha1=".
# Since the handler uses hmac.new with sha256, a sha1= prefix will still
# be checked against sha256 HMAC. This test verifies the fallback header
# is read at all (the signature won't match sha256, so expect 401).
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature": "sha1=somehex",
},
)
# Legacy header is read but signature won't match → 401
assert resp.status_code == 401
def test_malformed_json_payload_handled_gracefully(webhook_client):
"""Malformed webhook payload (invalid JSON) is handled gracefully with 400."""
payload = b'this is not json {!><'
sig = _sign(payload, "test-webhook-secret")
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 400
assert "json" in resp.json()["detail"].lower() or "invalid" in resp.json()["detail"].lower()
def test_duplicate_event_processed_twice(webhook_client):
"""Same Jira event ID sent twice is processed both times (idempotent at
the service layer, not rejected at the webhook layer)."""
from unittest.mock import patch
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "DUP-1"},
}).encode()
sig = _sign(payload, "test-webhook-secret")
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = True
resp1 = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
resp2 = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
# Both requests succeed — deduplication is the service layer's job
assert resp1.status_code == 200
assert resp2.status_code == 200
def test_signature_without_sha256_prefix(webhook_client):
"""A raw hex signature without 'sha256=' prefix is also accepted by
_verify_signature (it strips the prefix if present)."""
from unittest.mock import patch
import hmac as hmac_mod
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-1"},
}).encode()
# Compute raw hex without prefix
mac = hmac_mod.new("test-webhook-secret".encode(), payload, hashlib.sha256).hexdigest()
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = True
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": mac, # no sha256= prefix
},
)
assert resp.status_code == 200
def test_jira_service_not_configured_returns_503(webhook_client):
"""When Jira service is not configured, webhook returns 503."""
from unittest.mock import patch
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-1"},
}).encode()
sig = _sign(payload, "test-webhook-secret")
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = False
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 503
def test_process_webhook_event_failure_returns_500(webhook_client):
"""When process_webhook_event returns False, the endpoint returns 500."""
from unittest.mock import patch
payload = json.dumps({
"webhookEvent": "jira:issue_updated",
"issue": {"key": "PROJ-1"},
}).encode()
sig = _sign(payload, "test-webhook-secret")
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = False
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 500
def test_issue_key_at_top_level_accepted(webhook_client):
"""Some Jira event types deliver issue_key at the top level instead of
issue.key. The handler should accept these."""
from unittest.mock import patch
payload = json.dumps({
"webhookEvent": "jira:issue_deleted",
"issue_key": "PROJ-99",
}).encode()
sig = _sign(payload, "test-webhook-secret")
with patch("app.api.jira_webhooks.get_jira_service") as mock_svc:
mock_svc.return_value.is_configured.return_value = True
mock_svc.return_value.process_webhook_event.return_value = True
resp = webhook_client["client"].post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 200