feat: Jira webhook FastAPI adapter — replaces Flask Blueprint

This commit is contained in:
ZdenekSrotyr 2026-04-08 07:04:50 +02:00
parent 3e3f84a00e
commit 67a1e0bb45
4 changed files with 233 additions and 0 deletions

129
app/api/jira_webhooks.py Normal file
View file

@ -0,0 +1,129 @@
"""
Jira webhook endpoints FastAPI replacement for Flask Blueprint.
Receives Jira webhook notifications, verifies HMAC-SHA256 signatures,
and delegates processing to the Jira service.
"""
import hashlib
import hmac
import json
import logging
from datetime import datetime
from fastapi import APIRouter, Request, Response
from fastapi.responses import JSONResponse
from connectors.jira.service import Config, get_jira_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/webhooks", tags=["jira-webhooks"])
# Path for storing raw webhook events (debugging/audit)
WEBHOOK_LOG_DIR = Config.JIRA_DATA_DIR / "webhook_events"
def _verify_signature(payload: bytes, signature: str | None) -> bool:
"""Verify HMAC-SHA256 signature from Jira webhook."""
secret = Config.JIRA_WEBHOOK_SECRET
if not secret:
logger.warning("JIRA_WEBHOOK_SECRET not configured, skipping signature verification")
return True
if not signature:
logger.warning("No signature provided in webhook request")
return False
if signature.startswith("sha256="):
signature = signature[7:]
expected = hmac.new(
secret.encode("utf-8"),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(signature, expected)
def _log_webhook_event(event_data: dict) -> None:
"""Log webhook event to file for debugging/audit."""
try:
WEBHOOK_LOG_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
event_type = event_data.get("webhookEvent", "unknown").replace(":", "_")
filename = f"{timestamp}_{event_type}.json"
filepath = WEBHOOK_LOG_DIR / filename
with open(filepath, "w") as f:
json.dump(event_data, f, indent=2, default=str)
except Exception as e:
logger.warning(f"Failed to log webhook event: {e}")
@router.post("/jira")
async def receive_jira_webhook(request: Request) -> Response:
"""Receive and process Jira webhook notifications."""
payload = await request.body()
# Verify signature
signature = request.headers.get("X-Hub-Signature-256") or request.headers.get("X-Hub-Signature")
if not _verify_signature(payload, signature):
logger.warning("Invalid webhook signature from %s", request.client.host if request.client else "unknown")
return JSONResponse({"detail": "Invalid signature"}, status_code=401)
# Parse JSON
if not payload:
return JSONResponse({"detail": "Empty payload"}, status_code=400)
try:
event_data = json.loads(payload)
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to parse webhook JSON: {e}")
return JSONResponse({"detail": "Invalid JSON payload"}, status_code=400)
if not event_data:
return JSONResponse({"detail": "Empty payload"}, status_code=400)
# Log event for debugging
_log_webhook_event(event_data)
webhook_event = event_data.get("webhookEvent", "unknown")
issue = event_data.get("issue", {})
issue_key = issue.get("key", "unknown")
logger.info(f"Received webhook: {webhook_event} for issue {issue_key}")
jira_service = get_jira_service()
if not jira_service.is_configured():
logger.error("Jira service not configured, cannot process webhook")
return JSONResponse(
{"status": "error", "message": "Jira service not configured"},
status_code=503,
)
success = jira_service.process_webhook_event(event_data)
if success:
return JSONResponse({"status": "ok", "event": webhook_event, "issue": issue_key})
else:
return JSONResponse(
{"status": "error", "message": "Failed to process event", "event": webhook_event, "issue": issue_key},
status_code=500,
)
@router.get("/jira/health")
async def jira_webhook_health() -> dict:
"""Health check for Jira webhook endpoint."""
jira_service = get_jira_service()
return {
"status": "ok",
"configured": jira_service.is_configured(),
"webhook_secret_set": bool(Config.JIRA_WEBHOOK_SECRET),
"jira_domain": Config.JIRA_DOMAIN or "(not set)",
}

View file

@ -25,6 +25,7 @@ from app.api.telegram import router as telegram_router
from app.api.admin import router as admin_router
from app.api.permissions import router as permissions_router
from app.api.access_requests import router as access_requests_router
from app.api.jira_webhooks import router as jira_webhooks_router
from app.web.router import router as web_router
logger = logging.getLogger(__name__)
@ -105,6 +106,7 @@ def create_app() -> FastAPI:
app.include_router(admin_router)
app.include_router(permissions_router)
app.include_router(access_requests_router)
app.include_router(jira_webhooks_router)
# Web UI router (must be last — has catch-all routes)
app.include_router(web_router)

View file

@ -30,6 +30,8 @@ class _JiraConfig:
JIRA_CLOUD_ID = os.environ.get("JIRA_CLOUD_ID", "")
JIRA_SLA_EMAIL = os.environ.get("JIRA_SLA_EMAIL", "")
JIRA_SLA_API_TOKEN = os.environ.get("JIRA_SLA_API_TOKEN", "")
JIRA_WEBHOOK_SECRET = os.environ.get("JIRA_WEBHOOK_SECRET", "")
DEBUG = os.environ.get("DEBUG", "").lower() in ("1", "true")
Config = _JiraConfig

100
tests/test_jira_webhooks.py Normal file
View file

@ -0,0 +1,100 @@
"""Tests for Jira webhook FastAPI router."""
import hashlib
import hmac
import json
import os
import tempfile
import pytest
from fastapi.testclient import TestClient
def _sign(payload: bytes, secret: str) -> str:
"""Compute sha256=<HMAC hex> for a given payload and secret."""
mac = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest()
return f"sha256={mac}"
@pytest.fixture()
def webhook_client(tmp_path, monkeypatch):
"""Create a TestClient with required env vars and dirs."""
data_dir = tmp_path / "data"
data_dir.mkdir()
(data_dir / "issues").mkdir()
monkeypatch.setenv("DATA_DIR", str(data_dir))
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret")
monkeypatch.setenv("JIRA_WEBHOOK_SECRET", "test-webhook-secret")
monkeypatch.setenv("JIRA_DATA_DIR", str(data_dir))
# Re-read env into Config (class attrs read os.environ at import time)
from connectors.jira import service as svc
monkeypatch.setattr(svc.Config, "JIRA_WEBHOOK_SECRET", "test-webhook-secret")
monkeypatch.setattr(svc.Config, "JIRA_DATA_DIR", data_dir)
# Reimport app to pick up router
from app.main import create_app
app = create_app()
return TestClient(app)
def test_health(webhook_client):
"""GET /webhooks/jira/health returns 200."""
resp = webhook_client.get("/webhooks/jira/health")
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ok"
assert "webhook_secret_set" in body
def test_missing_signature_401(webhook_client):
"""POST without signature header returns 401."""
payload = json.dumps({"webhookEvent": "jira:issue_updated", "issue": {"key": "TEST-1"}}).encode()
resp = webhook_client.post("/webhooks/jira", content=payload, headers={"Content-Type": "application/json"})
assert resp.status_code == 401
def test_invalid_signature_401(webhook_client):
"""POST with wrong signature returns 401."""
payload = json.dumps({"webhookEvent": "jira:issue_updated", "issue": {"key": "TEST-1"}}).encode()
resp = webhook_client.post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": "sha256=badhex",
},
)
assert resp.status_code == 401
def test_valid_signature_accepted(webhook_client):
"""POST with correct HMAC-SHA256 is not rejected as 401."""
payload = json.dumps({"webhookEvent": "jira:issue_updated", "issue": {"key": "TEST-1"}}).encode()
sig = _sign(payload, "test-webhook-secret")
resp = webhook_client.post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
# Should pass signature check; 200 or 503 (service not configured) are fine
assert resp.status_code in (200, 503)
def test_empty_payload_400(webhook_client):
"""POST with empty body and valid signature returns 400."""
payload = b""
sig = _sign(payload, "test-webhook-secret")
resp = webhook_client.post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": sig,
},
)
assert resp.status_code == 400