Cuts release 0.20.0. ## Highlights - X-Request-ID header on every response + sanitized to [A-Za-z0-9_-] (CRLF log-forging mitigation) - Error pages (HTML + JSON 500) surface request_id for support tickets - Dev debug toolbar gated by DEBUG=1 — fastapi-debug-toolbar with custom DuckDBPanel - Centralized app.logging_config.setup_logging() replaces 23 scattered basicConfig calls - Telegram bot drops bot.log file — stdout only (BREAKING) ## Devin findings addressed - BUG_0001: .env.template no longer claims FastAPI debug=True - BUG_0002: subprocess extractor logs INFO to stderr again - ANALYSIS_0003: _wants_html no longer matches Accept: */* (curl gets JSON as before) - BUG on b1c6ee9: HTML 500 page no longer leaks str(exc) in production - BUG on b13d2fe: 2 CLAUDE.md compliance flags (transform.py + ws_gateway) accepted as scope-limited logging refactor — follow-up to update CLAUDE.md if needed See CHANGELOG [0.20.0] for full notes.
746 lines
27 KiB
Python
746 lines
27 KiB
Python
"""
|
|
Transform raw Jira JSON data into clean Parquet format for analysis.
|
|
|
|
Extracts key fields from Jira issues including custom fields used by support team.
|
|
Converts Atlassian Document Format (ADF) to plain text.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pandas as pd
|
|
import pyarrow as pa
|
|
import pyarrow.parquet as pq
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Custom field mapping (ID -> human readable name)
|
|
# Verified against Jira field configuration (Feb 2026)
|
|
CUSTOM_FIELD_NAMES = {
|
|
"customfield_10156": "participants", # List of users watching/participating
|
|
"customfield_10002": "organizations", # Organizations
|
|
"customfield_10010": "request_type_info", # Service Desk request type details
|
|
"customfield_10004": "severity", # Severity level
|
|
"customfield_10365": "spam", # Spam flag
|
|
"customfield_10157": "satisfaction", # Customer satisfaction (was: sla_info)
|
|
"customfield_10323": "triage", # Triage multi-select (was: team_tier)
|
|
"customfield_10330": "context", # Context field (was: root_cause)
|
|
"customfield_10325": "custom_url", # Custom URL (was: resolution_summary)
|
|
"customfield_10350": "slack_link", # Slack link (was: customer_type)
|
|
"customfield_10475": "email_address", # Email address (was: context)
|
|
"customfield_10511": "configuration_item", # Configuration item (was: categories)
|
|
"customfield_10676": "technical_issue_category", # Technical issue category (was: satisfaction_rating)
|
|
"customfield_10328": "first_response_time", # SLA: first response time (new)
|
|
"customfield_10161": "time_to_resolution", # SLA: time to resolution (new)
|
|
"customfield_11831": "l3_team", # L3 team assignment (new)
|
|
}
|
|
|
|
# Explicit schema definitions for consistent types across monthly chunks
|
|
# This prevents DuckDB union errors when some months have all-NULL columns
|
|
ISSUES_SCHEMA = {
|
|
"issue_key": "string",
|
|
"issue_id": "string",
|
|
"issue_url": "string",
|
|
"summary": "string",
|
|
"description": "string",
|
|
"issue_type": "string",
|
|
"status": "string",
|
|
"status_category": "string",
|
|
"priority": "string",
|
|
"resolution": "string",
|
|
"project_key": "string",
|
|
"project_name": "string",
|
|
"creator_email": "string",
|
|
"creator_name": "string",
|
|
"reporter_email": "string",
|
|
"reporter_name": "string",
|
|
"assignee_email": "string",
|
|
"assignee_name": "string",
|
|
"created_at": "datetime64[ns, UTC]",
|
|
"updated_at": "datetime64[ns, UTC]",
|
|
"resolved_at": "datetime64[ns, UTC]",
|
|
"due_date": "string",
|
|
"labels": "string",
|
|
"attachment_count": "Int64",
|
|
"comment_count": "Int64",
|
|
"issuelink_count": "Int64",
|
|
"request_type": "string",
|
|
"request_status": "string",
|
|
"severity": "string",
|
|
"triage": "string",
|
|
"configuration_item": "string",
|
|
"participants": "string",
|
|
"organizations": "string",
|
|
"spam": "string",
|
|
"context": "string",
|
|
"custom_url": "string",
|
|
"slack_link": "string",
|
|
"technical_issue_category": "string",
|
|
"email_address": "string",
|
|
"satisfaction": "Int64",
|
|
"first_response_breached": "string",
|
|
"first_response_goal_millis": "Int64",
|
|
"first_response_elapsed_millis": "Int64",
|
|
"time_to_resolution_breached": "string",
|
|
"time_to_resolution_goal_millis": "Int64",
|
|
"time_to_resolution_elapsed_millis": "Int64",
|
|
"l3_team": "string",
|
|
"_synced_at": "string",
|
|
"_raw_file": "string",
|
|
}
|
|
|
|
COMMENTS_SCHEMA = {
|
|
"comment_id": "string",
|
|
"issue_key": "string",
|
|
"author_email": "string",
|
|
"author_name": "string",
|
|
"body": "string",
|
|
"created_at": "datetime64[ns, UTC]",
|
|
"updated_at": "datetime64[ns, UTC]",
|
|
"update_author_email": "string",
|
|
}
|
|
|
|
ATTACHMENTS_SCHEMA = {
|
|
"attachment_id": "string",
|
|
"issue_key": "string",
|
|
"filename": "string",
|
|
"local_path": "string",
|
|
"hierarchical_path": "string",
|
|
"size_bytes": "Int64",
|
|
"mime_type": "string",
|
|
"author_email": "string",
|
|
"created_at": "datetime64[ns, UTC]",
|
|
"content_url": "string",
|
|
"thumbnail_url": "string",
|
|
}
|
|
|
|
CHANGELOG_SCHEMA = {
|
|
"change_id": "string",
|
|
"issue_key": "string",
|
|
"author_email": "string",
|
|
"author_name": "string",
|
|
"field_name": "string",
|
|
"field_type": "string",
|
|
"from_value": "string",
|
|
"to_value": "string",
|
|
"changed_at": "datetime64[ns, UTC]",
|
|
}
|
|
|
|
ISSUELINKS_SCHEMA = {
|
|
"issue_key": "string",
|
|
"link_id": "string",
|
|
"link_type": "string",
|
|
"direction": "string",
|
|
"linked_issue_key": "string",
|
|
"linked_issue_summary": "string",
|
|
"linked_issue_status": "string",
|
|
"linked_issue_priority": "string",
|
|
}
|
|
|
|
REMOTE_LINKS_SCHEMA = {
|
|
"issue_key": "string",
|
|
"remote_link_id": "string",
|
|
"url": "string",
|
|
"title": "string",
|
|
"application_name": "string",
|
|
"application_type": "string",
|
|
}
|
|
|
|
|
|
def get_pyarrow_schema(schema_dict: dict) -> pa.Schema:
|
|
"""Convert schema dict to PyArrow schema for consistent Parquet types."""
|
|
pa_fields = []
|
|
for col, dtype in schema_dict.items():
|
|
if dtype == "string":
|
|
pa_fields.append(pa.field(col, pa.string()))
|
|
elif dtype.startswith("datetime64"):
|
|
pa_fields.append(pa.field(col, pa.timestamp("us", tz="UTC")))
|
|
elif dtype == "Int64":
|
|
pa_fields.append(pa.field(col, pa.int64()))
|
|
else:
|
|
pa_fields.append(pa.field(col, pa.string()))
|
|
return pa.schema(pa_fields)
|
|
|
|
|
|
def apply_schema(df: pd.DataFrame, schema: dict) -> pa.Table:
|
|
"""
|
|
Apply explicit schema to DataFrame and return PyArrow Table.
|
|
|
|
This ensures all monthly chunks have the same column types,
|
|
preventing DuckDB union errors when querying with glob patterns.
|
|
"""
|
|
# Ensure all schema columns exist
|
|
for col in schema.keys():
|
|
if col not in df.columns:
|
|
df[col] = None
|
|
|
|
# Convert types
|
|
for col, dtype in schema.items():
|
|
if dtype == "string":
|
|
# Convert to string, keeping None as None
|
|
df[col] = df[col].apply(lambda x: str(x) if x is not None and pd.notna(x) else None)
|
|
elif dtype.startswith("datetime64"):
|
|
df[col] = pd.to_datetime(df[col], utc=True, errors="coerce")
|
|
elif dtype == "Int64":
|
|
df[col] = pd.to_numeric(df[col], errors="coerce")
|
|
|
|
# Reorder columns to match schema
|
|
df = df[[col for col in schema.keys()]]
|
|
|
|
# Convert to PyArrow with explicit schema
|
|
pa_schema = get_pyarrow_schema(schema)
|
|
return pa.Table.from_pandas(df, schema=pa_schema, preserve_index=False)
|
|
|
|
|
|
def extract_text_from_adf(node: dict | list | None) -> str:
|
|
"""
|
|
Extract plain text from Atlassian Document Format (ADF) content.
|
|
|
|
ADF is a nested JSON structure used by Jira for rich text.
|
|
This function recursively extracts all text content.
|
|
"""
|
|
if node is None:
|
|
return ""
|
|
|
|
if isinstance(node, str):
|
|
return node
|
|
|
|
if isinstance(node, list):
|
|
return " ".join(extract_text_from_adf(item) for item in node)
|
|
|
|
if not isinstance(node, dict):
|
|
return ""
|
|
|
|
# Get text from this node
|
|
text_parts = []
|
|
|
|
# Direct text content
|
|
if "text" in node:
|
|
text_parts.append(node["text"])
|
|
|
|
# Recursive content
|
|
if "content" in node:
|
|
text_parts.append(extract_text_from_adf(node["content"]))
|
|
|
|
return " ".join(text_parts).strip()
|
|
|
|
|
|
def extract_user_info(user: dict | None) -> dict:
|
|
"""Extract key user information from Jira user object."""
|
|
if not user:
|
|
return {"email": None, "name": None, "account_id": None}
|
|
|
|
return {
|
|
"email": user.get("emailAddress"),
|
|
"name": user.get("displayName"),
|
|
"account_id": user.get("accountId"),
|
|
}
|
|
|
|
|
|
def extract_option_value(field: Any) -> str | None:
|
|
"""Extract value from Jira option field (select/radio)."""
|
|
if field is None:
|
|
return None
|
|
if isinstance(field, dict):
|
|
return field.get("value") or field.get("name")
|
|
return str(field)
|
|
|
|
|
|
def extract_option_list(field: Any) -> list[str]:
|
|
"""Extract values from Jira multi-select field."""
|
|
if not field or not isinstance(field, list):
|
|
return []
|
|
return [extract_option_value(item) for item in field if item]
|
|
|
|
|
|
def parse_datetime(dt_str: str | None) -> datetime | None:
|
|
"""Parse Jira datetime string to datetime object."""
|
|
if not dt_str:
|
|
return None
|
|
try:
|
|
# Jira format: "2026-02-03T12:06:52.829+0100"
|
|
# Remove milliseconds and parse
|
|
dt_str = re.sub(r"\.\d+", "", dt_str)
|
|
return datetime.fromisoformat(dt_str)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def extract_sla_metrics(sla_field: Any) -> dict:
|
|
"""
|
|
Extract flattened SLA metrics from a Jira SLA field.
|
|
|
|
Prefers ongoingCycle (active tickets), falls back to last completedCycle.
|
|
Returns dict with breached, goal_millis, elapsed_millis.
|
|
"""
|
|
result = {"breached": None, "goal_millis": None, "elapsed_millis": None}
|
|
|
|
if not isinstance(sla_field, dict):
|
|
return result
|
|
# Skip error responses from permission issues
|
|
if "errorMessage" in sla_field:
|
|
return result
|
|
|
|
# Prefer ongoing cycle, fall back to last completed cycle
|
|
cycle = sla_field.get("ongoingCycle")
|
|
if not cycle:
|
|
completed = sla_field.get("completedCycles", [])
|
|
if completed:
|
|
cycle = completed[-1]
|
|
|
|
if cycle:
|
|
result["breached"] = str(cycle.get("breached")) if cycle.get("breached") is not None else None
|
|
goal = cycle.get("goalDuration", {})
|
|
elapsed = cycle.get("elapsedTime", {})
|
|
result["goal_millis"] = goal.get("millis")
|
|
result["elapsed_millis"] = elapsed.get("millis")
|
|
|
|
return result
|
|
|
|
|
|
def transform_issue(raw_issue: dict) -> dict:
|
|
"""
|
|
Transform a single raw Jira issue into clean format.
|
|
|
|
Returns a flat dictionary suitable for DataFrame conversion.
|
|
"""
|
|
fields = raw_issue.get("fields", {})
|
|
|
|
# Extract user info
|
|
creator = extract_user_info(fields.get("creator"))
|
|
reporter = extract_user_info(fields.get("reporter"))
|
|
assignee = extract_user_info(fields.get("assignee"))
|
|
|
|
# Extract request type info from Service Desk field
|
|
request_type_info = fields.get("customfield_10010", {}) or {}
|
|
request_type = request_type_info.get("requestType", {}) or {}
|
|
current_status = request_type_info.get("currentStatus", {}) or {}
|
|
|
|
# Build clean record
|
|
record = {
|
|
# Core identifiers
|
|
"issue_key": raw_issue.get("key"),
|
|
"issue_id": raw_issue.get("id"),
|
|
"issue_url": f"https://{os.environ.get('JIRA_DOMAIN', 'your-org.atlassian.net')}/browse/{raw_issue.get('key')}",
|
|
# Standard fields
|
|
"summary": fields.get("summary"),
|
|
"description": extract_text_from_adf(fields.get("description")),
|
|
"issue_type": fields.get("issuetype", {}).get("name") if fields.get("issuetype") else None,
|
|
"status": fields.get("status", {}).get("name") if fields.get("status") else None,
|
|
"status_category": fields.get("status", {}).get("statusCategory", {}).get("name")
|
|
if fields.get("status")
|
|
else None,
|
|
"priority": fields.get("priority", {}).get("name") if fields.get("priority") else None,
|
|
"resolution": fields.get("resolution", {}).get("name") if fields.get("resolution") else None,
|
|
# Project
|
|
"project_key": fields.get("project", {}).get("key") if fields.get("project") else None,
|
|
"project_name": fields.get("project", {}).get("name") if fields.get("project") else None,
|
|
# People
|
|
"creator_email": creator["email"],
|
|
"creator_name": creator["name"],
|
|
"reporter_email": reporter["email"],
|
|
"reporter_name": reporter["name"],
|
|
"assignee_email": assignee["email"],
|
|
"assignee_name": assignee["name"],
|
|
# Dates
|
|
"created_at": parse_datetime(fields.get("created")),
|
|
"updated_at": parse_datetime(fields.get("updated")),
|
|
"resolved_at": parse_datetime(fields.get("resolutiondate")),
|
|
"due_date": fields.get("duedate"),
|
|
# Arrays as JSON strings for Parquet compatibility
|
|
"labels": json.dumps(fields.get("labels", [])),
|
|
# Counts
|
|
"attachment_count": len(fields.get("attachment", [])),
|
|
"comment_count": fields.get("comment", {}).get("total", 0),
|
|
"issuelink_count": len(fields.get("issuelinks", [])),
|
|
# Service Desk specific
|
|
"request_type": request_type.get("name"),
|
|
"request_status": current_status.get("status"),
|
|
# Custom fields (verified against Jira field configuration Feb 2026)
|
|
"severity": extract_option_value(fields.get("customfield_10004")),
|
|
"triage": json.dumps(extract_option_list(fields.get("customfield_10323"))),
|
|
"configuration_item": json.dumps(extract_option_list(fields.get("customfield_10511"))),
|
|
"participants": json.dumps(
|
|
[extract_user_info(u).get("email") for u in (fields.get("customfield_10156") or [])]
|
|
),
|
|
"organizations": json.dumps(extract_option_list(fields.get("customfield_10002"))),
|
|
"spam": extract_option_value(fields.get("customfield_10365")),
|
|
"context": extract_text_from_adf(fields.get("customfield_10330")) or None,
|
|
"custom_url": fields.get("customfield_10325"),
|
|
"slack_link": extract_option_value(fields.get("customfield_10350")),
|
|
"technical_issue_category": extract_option_value(fields.get("customfield_10676")),
|
|
"email_address": extract_option_value(fields.get("customfield_10475")),
|
|
"satisfaction": fields.get("customfield_10157", {}).get("rating")
|
|
if isinstance(fields.get("customfield_10157"), dict)
|
|
else None,
|
|
**{f"first_response_{k}": v for k, v in extract_sla_metrics(fields.get("customfield_10328")).items()},
|
|
**{f"time_to_resolution_{k}": v for k, v in extract_sla_metrics(fields.get("customfield_10161")).items()},
|
|
"l3_team": extract_option_value(fields.get("customfield_11831")),
|
|
# Metadata
|
|
"_synced_at": raw_issue.get("_synced_at"),
|
|
"_raw_file": None, # Will be set by caller
|
|
}
|
|
|
|
return record
|
|
|
|
|
|
def transform_comments(raw_issue: dict) -> list[dict]:
|
|
"""Extract and transform comments from an issue."""
|
|
issue_key = raw_issue.get("key")
|
|
fields = raw_issue.get("fields", {})
|
|
comments_data = fields.get("comment", {})
|
|
comments = comments_data.get("comments", [])
|
|
|
|
records = []
|
|
for comment in comments:
|
|
author = extract_user_info(comment.get("author"))
|
|
update_author = extract_user_info(comment.get("updateAuthor"))
|
|
|
|
records.append(
|
|
{
|
|
"comment_id": comment.get("id"),
|
|
"issue_key": issue_key,
|
|
"author_email": author["email"],
|
|
"author_name": author["name"],
|
|
"body": extract_text_from_adf(comment.get("body")),
|
|
"created_at": parse_datetime(comment.get("created")),
|
|
"updated_at": parse_datetime(comment.get("updated")),
|
|
"update_author_email": update_author["email"],
|
|
}
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
def transform_attachments(raw_issue: dict, attachments_dir: Path | None = None) -> list[dict]:
|
|
"""Extract and transform attachments from an issue."""
|
|
issue_key = raw_issue.get("key")
|
|
fields = raw_issue.get("fields", {})
|
|
attachments = fields.get("attachment", [])
|
|
|
|
records = []
|
|
for att in attachments:
|
|
author = extract_user_info(att.get("author"))
|
|
att_id = att.get("id")
|
|
filename = att.get("filename")
|
|
|
|
# Check if local file exists
|
|
local_path = None
|
|
if attachments_dir and issue_key:
|
|
expected_path = attachments_dir / issue_key / f"{att_id}_{filename}"
|
|
if expected_path.exists():
|
|
local_path = str(expected_path)
|
|
|
|
records.append(
|
|
{
|
|
"attachment_id": att_id,
|
|
"issue_key": issue_key,
|
|
"filename": filename,
|
|
"local_path": local_path,
|
|
"size_bytes": att.get("size"),
|
|
"mime_type": att.get("mimeType"),
|
|
"author_email": author["email"],
|
|
"created_at": parse_datetime(att.get("created")),
|
|
"content_url": att.get("content"),
|
|
"thumbnail_url": att.get("thumbnail"),
|
|
}
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
def transform_changelog(raw_issue: dict) -> list[dict]:
|
|
"""Extract and transform changelog entries from an issue."""
|
|
issue_key = raw_issue.get("key")
|
|
changelog = raw_issue.get("changelog", {})
|
|
histories = changelog.get("histories", [])
|
|
|
|
records = []
|
|
for history in histories:
|
|
author = extract_user_info(history.get("author"))
|
|
changed_at = parse_datetime(history.get("created"))
|
|
|
|
for item in history.get("items", []):
|
|
records.append(
|
|
{
|
|
"change_id": history.get("id"),
|
|
"issue_key": issue_key,
|
|
"author_email": author["email"],
|
|
"author_name": author["name"],
|
|
"field_name": item.get("field"),
|
|
"field_type": item.get("fieldtype"),
|
|
"from_value": item.get("fromString"),
|
|
"to_value": item.get("toString"),
|
|
"changed_at": changed_at,
|
|
}
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
def transform_issuelinks(raw_issue: dict) -> list[dict]:
|
|
"""Extract and transform issue links from an issue."""
|
|
issue_key = raw_issue.get("key")
|
|
fields = raw_issue.get("fields", {})
|
|
issuelinks = fields.get("issuelinks", [])
|
|
|
|
records = []
|
|
for link in issuelinks:
|
|
link_type = link.get("type", {})
|
|
link_type_name = link_type.get("name", "")
|
|
|
|
# Each link has either inwardIssue or outwardIssue
|
|
if "inwardIssue" in link:
|
|
linked = link["inwardIssue"]
|
|
direction = "inward"
|
|
elif "outwardIssue" in link:
|
|
linked = link["outwardIssue"]
|
|
direction = "outward"
|
|
else:
|
|
continue
|
|
|
|
linked_fields = linked.get("fields", {})
|
|
records.append(
|
|
{
|
|
"issue_key": issue_key,
|
|
"link_id": link.get("id"),
|
|
"link_type": link_type_name,
|
|
"direction": direction,
|
|
"linked_issue_key": linked.get("key"),
|
|
"linked_issue_summary": linked_fields.get("summary"),
|
|
"linked_issue_status": linked_fields.get("status", {}).get("name")
|
|
if linked_fields.get("status")
|
|
else None,
|
|
"linked_issue_priority": linked_fields.get("priority", {}).get("name")
|
|
if linked_fields.get("priority")
|
|
else None,
|
|
}
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
def transform_remote_links(raw_issue: dict) -> list[dict]:
|
|
"""Extract and transform remote links from an issue.
|
|
|
|
Remote links are embedded in the raw issue JSON as `_remote_links`
|
|
by the fetch layer (jira_service.py / jira_backfill.py).
|
|
"""
|
|
issue_key = raw_issue.get("key")
|
|
remote_links = raw_issue.get("_remote_links", [])
|
|
|
|
records = []
|
|
for rl in remote_links:
|
|
obj = rl.get("object", {})
|
|
app = rl.get("application", {})
|
|
records.append(
|
|
{
|
|
"issue_key": issue_key,
|
|
"remote_link_id": str(rl.get("id", "")),
|
|
"url": obj.get("url"),
|
|
"title": obj.get("title"),
|
|
"application_name": app.get("name"),
|
|
"application_type": app.get("type"),
|
|
}
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
def get_month_key(dt: datetime | None) -> str:
|
|
"""Get month key (YYYY-MM) from datetime, defaulting to current month."""
|
|
if dt is None:
|
|
dt = datetime.now(timezone.utc)
|
|
return dt.strftime("%Y-%m")
|
|
|
|
|
|
def get_attachment_path(issue_key: str, attachment_id: str, filename: str) -> str:
|
|
"""
|
|
Generate hierarchical attachment path.
|
|
|
|
SUPPORT-14991 -> 14/991/54908_files.zip
|
|
"""
|
|
# Extract number from issue key (e.g., "SUPPORT-14991" -> "14991")
|
|
match = re.search(r"(\d+)$", issue_key)
|
|
if not match:
|
|
return f"other/{issue_key}/{attachment_id}_{filename}"
|
|
|
|
num = match.group(1)
|
|
# Split into prefix (thousands) and suffix (rest)
|
|
prefix = num[:-3] if len(num) > 3 else "0"
|
|
suffix = num[-3:] if len(num) >= 3 else num
|
|
|
|
return f"{prefix}/{suffix}/{attachment_id}_{filename}"
|
|
|
|
|
|
def transform_all(
|
|
raw_dir: Path,
|
|
output_dir: Path,
|
|
attachments_dir: Path | None = None,
|
|
) -> dict[str, int]:
|
|
"""
|
|
Transform all raw Jira JSON files into monthly Parquet chunks.
|
|
|
|
Output structure:
|
|
output_dir/
|
|
├── issues/
|
|
│ ├── 2025-01.parquet
|
|
│ └── 2026-02.parquet
|
|
├── comments/
|
|
│ └── ...
|
|
├── changelog/
|
|
│ └── ...
|
|
├── attachments/
|
|
│ └── ... (metadata only)
|
|
└── attachments_files/
|
|
└── 14/991/54908_files.zip (hierarchical)
|
|
|
|
Args:
|
|
raw_dir: Directory containing raw JSON files (issues/*.json)
|
|
output_dir: Directory for output Parquet files
|
|
attachments_dir: Directory containing downloaded attachments
|
|
|
|
Returns:
|
|
Dict with counts of records per table
|
|
"""
|
|
issues_dir = raw_dir / "issues"
|
|
if not issues_dir.exists():
|
|
logger.error(f"Issues directory not found: {issues_dir}")
|
|
return {}
|
|
|
|
# Collect records grouped by month (based on issue created_at)
|
|
issues_by_month: dict[str, list] = {}
|
|
comments_by_month: dict[str, list] = {}
|
|
attachments_by_month: dict[str, list] = {}
|
|
changelog_by_month: dict[str, list] = {}
|
|
issuelinks_by_month: dict[str, list] = {}
|
|
remote_links_by_month: dict[str, list] = {}
|
|
|
|
# Process each issue file
|
|
json_files = list(issues_dir.glob("*.json"))
|
|
logger.info(f"Processing {len(json_files)} issue files...")
|
|
|
|
for json_file in json_files:
|
|
try:
|
|
with open(json_file) as f:
|
|
raw_issue = json.load(f)
|
|
|
|
# Transform issue
|
|
issue_record = transform_issue(raw_issue)
|
|
issue_record["_raw_file"] = json_file.name
|
|
|
|
# Determine month key based on issue creation date
|
|
month_key = get_month_key(issue_record.get("created_at"))
|
|
|
|
# Add to month bucket
|
|
if month_key not in issues_by_month:
|
|
issues_by_month[month_key] = []
|
|
comments_by_month[month_key] = []
|
|
attachments_by_month[month_key] = []
|
|
changelog_by_month[month_key] = []
|
|
issuelinks_by_month[month_key] = []
|
|
remote_links_by_month[month_key] = []
|
|
|
|
issues_by_month[month_key].append(issue_record)
|
|
|
|
# Transform related data (all go to same month as parent issue)
|
|
comments_by_month[month_key].extend(transform_comments(raw_issue))
|
|
|
|
# Transform attachments with hierarchical paths
|
|
issue_key = raw_issue.get("key", "unknown")
|
|
for att_record in transform_attachments(raw_issue, attachments_dir):
|
|
# Update local_path to hierarchical structure
|
|
if att_record.get("local_path"):
|
|
att_record["hierarchical_path"] = get_attachment_path(
|
|
issue_key, att_record["attachment_id"], att_record["filename"]
|
|
)
|
|
attachments_by_month[month_key].append(att_record)
|
|
|
|
changelog_by_month[month_key].extend(transform_changelog(raw_issue))
|
|
issuelinks_by_month[month_key].extend(transform_issuelinks(raw_issue))
|
|
remote_links_by_month[month_key].extend(transform_remote_links(raw_issue))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {json_file}: {e}")
|
|
|
|
# Create output directories
|
|
(output_dir / "issues").mkdir(parents=True, exist_ok=True)
|
|
(output_dir / "comments").mkdir(parents=True, exist_ok=True)
|
|
(output_dir / "attachments").mkdir(parents=True, exist_ok=True)
|
|
(output_dir / "changelog").mkdir(parents=True, exist_ok=True)
|
|
(output_dir / "issuelinks").mkdir(parents=True, exist_ok=True)
|
|
(output_dir / "remote_links").mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save to monthly Parquet files
|
|
counts = {"issues": 0, "comments": 0, "attachments": 0, "changelog": 0, "issuelinks": 0, "remote_links": 0}
|
|
|
|
for month_key in sorted(issues_by_month.keys()):
|
|
# Issues
|
|
if issues_by_month[month_key]:
|
|
table = apply_schema(pd.DataFrame(issues_by_month[month_key]), ISSUES_SCHEMA)
|
|
pq.write_table(table, output_dir / "issues" / f"{month_key}.parquet")
|
|
counts["issues"] += table.num_rows
|
|
logger.info(f"Saved {table.num_rows} issues to issues/{month_key}.parquet")
|
|
|
|
# Comments
|
|
if comments_by_month[month_key]:
|
|
table = apply_schema(pd.DataFrame(comments_by_month[month_key]), COMMENTS_SCHEMA)
|
|
pq.write_table(table, output_dir / "comments" / f"{month_key}.parquet")
|
|
counts["comments"] += table.num_rows
|
|
logger.info(f"Saved {table.num_rows} comments to comments/{month_key}.parquet")
|
|
|
|
# Attachments (metadata)
|
|
if attachments_by_month[month_key]:
|
|
table = apply_schema(pd.DataFrame(attachments_by_month[month_key]), ATTACHMENTS_SCHEMA)
|
|
pq.write_table(table, output_dir / "attachments" / f"{month_key}.parquet")
|
|
counts["attachments"] += table.num_rows
|
|
logger.info(f"Saved {table.num_rows} attachments to attachments/{month_key}.parquet")
|
|
|
|
# Changelog
|
|
if changelog_by_month[month_key]:
|
|
table = apply_schema(pd.DataFrame(changelog_by_month[month_key]), CHANGELOG_SCHEMA)
|
|
pq.write_table(table, output_dir / "changelog" / f"{month_key}.parquet")
|
|
counts["changelog"] += table.num_rows
|
|
logger.info(f"Saved {table.num_rows} changelog entries to changelog/{month_key}.parquet")
|
|
|
|
# Issue links
|
|
if issuelinks_by_month[month_key]:
|
|
table = apply_schema(pd.DataFrame(issuelinks_by_month[month_key]), ISSUELINKS_SCHEMA)
|
|
pq.write_table(table, output_dir / "issuelinks" / f"{month_key}.parquet")
|
|
counts["issuelinks"] += table.num_rows
|
|
logger.info(f"Saved {table.num_rows} issue links to issuelinks/{month_key}.parquet")
|
|
|
|
# Remote links
|
|
if remote_links_by_month[month_key]:
|
|
table = apply_schema(pd.DataFrame(remote_links_by_month[month_key]), REMOTE_LINKS_SCHEMA)
|
|
pq.write_table(table, output_dir / "remote_links" / f"{month_key}.parquet")
|
|
counts["remote_links"] += table.num_rows
|
|
logger.info(f"Saved {table.num_rows} remote links to remote_links/{month_key}.parquet")
|
|
|
|
logger.info(f"Created monthly chunks for {len(issues_by_month)} months")
|
|
return counts
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Transform raw Jira JSON to Parquet")
|
|
parser.add_argument("--raw-dir", type=Path, required=True, help="Directory with raw JSON files")
|
|
parser.add_argument("--output-dir", type=Path, required=True, help="Output directory for Parquet files")
|
|
parser.add_argument("--attachments-dir", type=Path, help="Directory with downloaded attachments")
|
|
|
|
args = parser.parse_args()
|
|
|
|
counts = transform_all(
|
|
raw_dir=args.raw_dir,
|
|
output_dir=args.output_dir,
|
|
attachments_dir=args.attachments_dir,
|
|
)
|
|
|
|
print(f"\nTransformation complete: {counts}")
|