agnes-the-ai-analyst/connectors/jira/transform.py
ZdenekSrotyr ed3e8337ab
fix(jira): harden _remote_links fetch — prevent transient outage from wiping parquet rows (#319)
* fix(jira): harden _remote_links fetch — transient API failure no longer wipes parquet rows

Pre-fix, all three fetch_remote_links call sites (service.py,
scripts/backfill.py, scripts/backfill_remote_links.py) silently
returned [] on 401/403/429/5xx or httpx.RequestError. Callers overlaid
that [] onto cached issue JSON, and transform_remote_links interpreted
the empty list as 'issue legitimately has no remote links — delete
existing rows', so a transient Jira auth blip permanently wiped
remote-link history.

Now:
- Every fetch site raises JiraFetchError on non-200/non-404 status,
  on httpx.RequestError, and on the 'service not configured' path.
- Overlay sites skip the _remote_links key when fetch raises, leaving
  it ABSENT (not present-but-empty).
- transform_remote_links returns None for absent/null keys (preserve
  existing rows) vs [] (legitimate empty — wipe).
- Both consumers (batch transform_all, incremental
  transform_single_issue) honor the new contract.
- End-to-end tests
  test_incremental_preserves_remote_links_when_overlay_absent and
  test_incremental_wipes_remote_links_when_overlay_present_but_empty
  lock both halves.

Adversarial-review fixes bundled:
- service.py: unconfigured-service path now raises JiraFetchError
  instead of returning [] (a webhook can arrive while API creds are
  missing — HMAC verification uses a separate JIRA_WEBHOOK_SECRET).
  Regression guard test_raises_when_unconfigured added.
- consistency_check.py: AUTO_FIX_THRESHOLD bumped 10 -> 20 to cover
  typical SLA-poller hiccups before escalating to ERROR.
- CLAUDE.md: connectors/jira/transform.py removed from 'Files NOT to
  modify' (overlay-contract change required touching it; module
  remains sensitive but is no longer off-limits).

* release: 0.54.19 — jira remote_links hardening (transient API failure no longer wipes parquet rows)
2026-05-15 19:09:46 +02:00

772 lines
29 KiB
Python

"""
Transform raw Jira JSON data into clean Parquet format for analysis.
Extracts key fields from Jira issues including custom fields used by support team.
Converts Atlassian Document Format (ADF) to plain text.
"""
import json
import logging
import os
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
logger = logging.getLogger(__name__)
# Custom field mapping (ID -> human readable name)
# Verified against Jira field configuration (Feb 2026)
CUSTOM_FIELD_NAMES = {
"customfield_10156": "participants", # List of users watching/participating
"customfield_10002": "organizations", # Organizations
"customfield_10010": "request_type_info", # Service Desk request type details
"customfield_10004": "severity", # Severity level
"customfield_10365": "spam", # Spam flag
"customfield_10157": "satisfaction", # Customer satisfaction (was: sla_info)
"customfield_10323": "triage", # Triage multi-select (was: team_tier)
"customfield_10330": "context", # Context field (was: root_cause)
"customfield_10325": "custom_url", # Custom URL (was: resolution_summary)
"customfield_10350": "slack_link", # Slack link (was: customer_type)
"customfield_10475": "email_address", # Email address (was: context)
"customfield_10511": "configuration_item", # Configuration item (was: categories)
"customfield_10676": "technical_issue_category", # Technical issue category (was: satisfaction_rating)
"customfield_10328": "first_response_time", # SLA: first response time (new)
"customfield_10161": "time_to_resolution", # SLA: time to resolution (new)
"customfield_11831": "l3_team", # L3 team assignment (new)
}
# Explicit schema definitions for consistent types across monthly chunks
# This prevents DuckDB union errors when some months have all-NULL columns
ISSUES_SCHEMA = {
"issue_key": "string",
"issue_id": "string",
"issue_url": "string",
"summary": "string",
"description": "string",
"issue_type": "string",
"status": "string",
"status_category": "string",
"priority": "string",
"resolution": "string",
"project_key": "string",
"project_name": "string",
"creator_email": "string",
"creator_name": "string",
"reporter_email": "string",
"reporter_name": "string",
"assignee_email": "string",
"assignee_name": "string",
"created_at": "datetime64[ns, UTC]",
"updated_at": "datetime64[ns, UTC]",
"resolved_at": "datetime64[ns, UTC]",
"due_date": "string",
"labels": "string",
"attachment_count": "Int64",
"comment_count": "Int64",
"issuelink_count": "Int64",
"request_type": "string",
"request_status": "string",
"severity": "string",
"triage": "string",
"configuration_item": "string",
"participants": "string",
"organizations": "string",
"spam": "string",
"context": "string",
"custom_url": "string",
"slack_link": "string",
"technical_issue_category": "string",
"email_address": "string",
"satisfaction": "Int64",
"first_response_breached": "string",
"first_response_goal_millis": "Int64",
"first_response_elapsed_millis": "Int64",
"time_to_resolution_breached": "string",
"time_to_resolution_goal_millis": "Int64",
"time_to_resolution_elapsed_millis": "Int64",
"l3_team": "string",
"_synced_at": "string",
"_raw_file": "string",
}
COMMENTS_SCHEMA = {
"comment_id": "string",
"issue_key": "string",
"author_email": "string",
"author_name": "string",
"body": "string",
"created_at": "datetime64[ns, UTC]",
"updated_at": "datetime64[ns, UTC]",
"update_author_email": "string",
}
ATTACHMENTS_SCHEMA = {
"attachment_id": "string",
"issue_key": "string",
"filename": "string",
"local_path": "string",
"hierarchical_path": "string",
"size_bytes": "Int64",
"mime_type": "string",
"author_email": "string",
"created_at": "datetime64[ns, UTC]",
"content_url": "string",
"thumbnail_url": "string",
}
CHANGELOG_SCHEMA = {
"change_id": "string",
"issue_key": "string",
"author_email": "string",
"author_name": "string",
"field_name": "string",
"field_type": "string",
"from_value": "string",
"to_value": "string",
"changed_at": "datetime64[ns, UTC]",
}
ISSUELINKS_SCHEMA = {
"issue_key": "string",
"link_id": "string",
"link_type": "string",
"direction": "string",
"linked_issue_key": "string",
"linked_issue_summary": "string",
"linked_issue_status": "string",
"linked_issue_priority": "string",
}
REMOTE_LINKS_SCHEMA = {
"issue_key": "string",
"remote_link_id": "string",
"url": "string",
"title": "string",
"application_name": "string",
"application_type": "string",
}
def get_pyarrow_schema(schema_dict: dict) -> pa.Schema:
"""Convert schema dict to PyArrow schema for consistent Parquet types."""
pa_fields = []
for col, dtype in schema_dict.items():
if dtype == "string":
pa_fields.append(pa.field(col, pa.string()))
elif dtype.startswith("datetime64"):
pa_fields.append(pa.field(col, pa.timestamp("us", tz="UTC")))
elif dtype == "Int64":
pa_fields.append(pa.field(col, pa.int64()))
else:
pa_fields.append(pa.field(col, pa.string()))
return pa.schema(pa_fields)
def apply_schema(df: pd.DataFrame, schema: dict) -> pa.Table:
"""
Apply explicit schema to DataFrame and return PyArrow Table.
This ensures all monthly chunks have the same column types,
preventing DuckDB union errors when querying with glob patterns.
"""
# Ensure all schema columns exist
for col in schema.keys():
if col not in df.columns:
df[col] = None
# Convert types
for col, dtype in schema.items():
if dtype == "string":
# Convert to string, keeping None as None
df[col] = df[col].apply(lambda x: str(x) if x is not None and pd.notna(x) else None)
elif dtype.startswith("datetime64"):
df[col] = pd.to_datetime(df[col], utc=True, errors="coerce")
elif dtype == "Int64":
df[col] = pd.to_numeric(df[col], errors="coerce")
# Reorder columns to match schema
df = df[[col for col in schema.keys()]]
# Convert to PyArrow with explicit schema
pa_schema = get_pyarrow_schema(schema)
return pa.Table.from_pandas(df, schema=pa_schema, preserve_index=False)
def extract_text_from_adf(node: dict | list | None) -> str:
"""
Extract plain text from Atlassian Document Format (ADF) content.
ADF is a nested JSON structure used by Jira for rich text.
This function recursively extracts all text content.
"""
if node is None:
return ""
if isinstance(node, str):
return node
if isinstance(node, list):
return " ".join(extract_text_from_adf(item) for item in node)
if not isinstance(node, dict):
return ""
# Get text from this node
text_parts = []
# Direct text content
if "text" in node:
text_parts.append(node["text"])
# Recursive content
if "content" in node:
text_parts.append(extract_text_from_adf(node["content"]))
return " ".join(text_parts).strip()
def extract_user_info(user: dict | None) -> dict:
"""Extract key user information from Jira user object."""
if not user:
return {"email": None, "name": None, "account_id": None}
return {
"email": user.get("emailAddress"),
"name": user.get("displayName"),
"account_id": user.get("accountId"),
}
def extract_option_value(field: Any) -> str | None:
"""Extract value from Jira option field (select/radio)."""
if field is None:
return None
if isinstance(field, dict):
return field.get("value") or field.get("name")
return str(field)
def extract_option_list(field: Any) -> list[str]:
"""Extract values from Jira multi-select field."""
if not field or not isinstance(field, list):
return []
return [extract_option_value(item) for item in field if item]
def parse_datetime(dt_str: str | None) -> datetime | None:
"""Parse Jira datetime string to datetime object."""
if not dt_str:
return None
try:
# Jira format: "2026-02-03T12:06:52.829+0100"
# Remove milliseconds and parse
dt_str = re.sub(r"\.\d+", "", dt_str)
return datetime.fromisoformat(dt_str)
except (ValueError, TypeError):
return None
def extract_sla_metrics(sla_field: Any) -> dict:
"""
Extract flattened SLA metrics from a Jira SLA field.
Prefers ongoingCycle (active tickets), falls back to last completedCycle.
Returns dict with breached, goal_millis, elapsed_millis.
"""
result = {"breached": None, "goal_millis": None, "elapsed_millis": None}
if not isinstance(sla_field, dict):
return result
# Skip error responses from permission issues
if "errorMessage" in sla_field:
return result
# Prefer ongoing cycle, fall back to last completed cycle
cycle = sla_field.get("ongoingCycle")
if not cycle:
completed = sla_field.get("completedCycles", [])
if completed:
cycle = completed[-1]
if cycle:
result["breached"] = str(cycle.get("breached")) if cycle.get("breached") is not None else None
goal = cycle.get("goalDuration", {})
elapsed = cycle.get("elapsedTime", {})
result["goal_millis"] = goal.get("millis")
result["elapsed_millis"] = elapsed.get("millis")
return result
def transform_issue(raw_issue: dict) -> dict:
"""
Transform a single raw Jira issue into clean format.
Returns a flat dictionary suitable for DataFrame conversion.
"""
fields = raw_issue.get("fields", {})
# Extract user info
creator = extract_user_info(fields.get("creator"))
reporter = extract_user_info(fields.get("reporter"))
assignee = extract_user_info(fields.get("assignee"))
# Extract request type info from Service Desk field
request_type_info = fields.get("customfield_10010", {}) or {}
request_type = request_type_info.get("requestType", {}) or {}
current_status = request_type_info.get("currentStatus", {}) or {}
# Build clean record
record = {
# Core identifiers
"issue_key": raw_issue.get("key"),
"issue_id": raw_issue.get("id"),
"issue_url": f"https://{os.environ.get('JIRA_DOMAIN', 'your-org.atlassian.net')}/browse/{raw_issue.get('key')}",
# Standard fields
"summary": fields.get("summary"),
"description": extract_text_from_adf(fields.get("description")),
"issue_type": fields.get("issuetype", {}).get("name") if fields.get("issuetype") else None,
"status": fields.get("status", {}).get("name") if fields.get("status") else None,
"status_category": fields.get("status", {}).get("statusCategory", {}).get("name")
if fields.get("status")
else None,
"priority": fields.get("priority", {}).get("name") if fields.get("priority") else None,
"resolution": fields.get("resolution", {}).get("name") if fields.get("resolution") else None,
# Project
"project_key": fields.get("project", {}).get("key") if fields.get("project") else None,
"project_name": fields.get("project", {}).get("name") if fields.get("project") else None,
# People
"creator_email": creator["email"],
"creator_name": creator["name"],
"reporter_email": reporter["email"],
"reporter_name": reporter["name"],
"assignee_email": assignee["email"],
"assignee_name": assignee["name"],
# Dates
"created_at": parse_datetime(fields.get("created")),
"updated_at": parse_datetime(fields.get("updated")),
"resolved_at": parse_datetime(fields.get("resolutiondate")),
"due_date": fields.get("duedate"),
# Arrays as JSON strings for Parquet compatibility
"labels": json.dumps(fields.get("labels", [])),
# Counts
"attachment_count": len(fields.get("attachment", [])),
"comment_count": fields.get("comment", {}).get("total", 0),
"issuelink_count": len(fields.get("issuelinks", [])),
# Service Desk specific
"request_type": request_type.get("name"),
"request_status": current_status.get("status"),
# Custom fields (verified against Jira field configuration Feb 2026)
"severity": extract_option_value(fields.get("customfield_10004")),
"triage": json.dumps(extract_option_list(fields.get("customfield_10323"))),
"configuration_item": json.dumps(extract_option_list(fields.get("customfield_10511"))),
"participants": json.dumps(
[extract_user_info(u).get("email") for u in (fields.get("customfield_10156") or [])]
),
"organizations": json.dumps(extract_option_list(fields.get("customfield_10002"))),
"spam": extract_option_value(fields.get("customfield_10365")),
"context": extract_text_from_adf(fields.get("customfield_10330")) or None,
"custom_url": fields.get("customfield_10325"),
"slack_link": extract_option_value(fields.get("customfield_10350")),
"technical_issue_category": extract_option_value(fields.get("customfield_10676")),
"email_address": extract_option_value(fields.get("customfield_10475")),
"satisfaction": fields.get("customfield_10157", {}).get("rating")
if isinstance(fields.get("customfield_10157"), dict)
else None,
**{f"first_response_{k}": v for k, v in extract_sla_metrics(fields.get("customfield_10328")).items()},
**{f"time_to_resolution_{k}": v for k, v in extract_sla_metrics(fields.get("customfield_10161")).items()},
"l3_team": extract_option_value(fields.get("customfield_11831")),
# Metadata
"_synced_at": raw_issue.get("_synced_at"),
"_raw_file": None, # Will be set by caller
}
return record
def transform_comments(raw_issue: dict) -> list[dict]:
"""Extract and transform comments from an issue."""
issue_key = raw_issue.get("key")
fields = raw_issue.get("fields", {})
comments_data = fields.get("comment", {})
comments = comments_data.get("comments", [])
records = []
for comment in comments:
author = extract_user_info(comment.get("author"))
update_author = extract_user_info(comment.get("updateAuthor"))
records.append(
{
"comment_id": comment.get("id"),
"issue_key": issue_key,
"author_email": author["email"],
"author_name": author["name"],
"body": extract_text_from_adf(comment.get("body")),
"created_at": parse_datetime(comment.get("created")),
"updated_at": parse_datetime(comment.get("updated")),
"update_author_email": update_author["email"],
}
)
return records
def transform_attachments(raw_issue: dict, attachments_dir: Path | None = None) -> list[dict]:
"""Extract and transform attachments from an issue."""
issue_key = raw_issue.get("key")
fields = raw_issue.get("fields", {})
attachments = fields.get("attachment", [])
records = []
for att in attachments:
author = extract_user_info(att.get("author"))
att_id = att.get("id")
filename = att.get("filename")
# Check if local file exists
local_path = None
if attachments_dir and issue_key:
expected_path = attachments_dir / issue_key / f"{att_id}_{filename}"
if expected_path.exists():
local_path = str(expected_path)
records.append(
{
"attachment_id": att_id,
"issue_key": issue_key,
"filename": filename,
"local_path": local_path,
"size_bytes": att.get("size"),
"mime_type": att.get("mimeType"),
"author_email": author["email"],
"created_at": parse_datetime(att.get("created")),
"content_url": att.get("content"),
"thumbnail_url": att.get("thumbnail"),
}
)
return records
def transform_changelog(raw_issue: dict) -> list[dict]:
"""Extract and transform changelog entries from an issue."""
issue_key = raw_issue.get("key")
changelog = raw_issue.get("changelog", {})
histories = changelog.get("histories", [])
records = []
for history in histories:
author = extract_user_info(history.get("author"))
changed_at = parse_datetime(history.get("created"))
for item in history.get("items", []):
records.append(
{
"change_id": history.get("id"),
"issue_key": issue_key,
"author_email": author["email"],
"author_name": author["name"],
"field_name": item.get("field"),
"field_type": item.get("fieldtype"),
"from_value": item.get("fromString"),
"to_value": item.get("toString"),
"changed_at": changed_at,
}
)
return records
def transform_issuelinks(raw_issue: dict) -> list[dict]:
"""Extract and transform issue links from an issue."""
issue_key = raw_issue.get("key")
fields = raw_issue.get("fields", {})
issuelinks = fields.get("issuelinks", [])
records = []
for link in issuelinks:
link_type = link.get("type", {})
link_type_name = link_type.get("name", "")
# Each link has either inwardIssue or outwardIssue
if "inwardIssue" in link:
linked = link["inwardIssue"]
direction = "inward"
elif "outwardIssue" in link:
linked = link["outwardIssue"]
direction = "outward"
else:
continue
linked_fields = linked.get("fields", {})
records.append(
{
"issue_key": issue_key,
"link_id": link.get("id"),
"link_type": link_type_name,
"direction": direction,
"linked_issue_key": linked.get("key"),
"linked_issue_summary": linked_fields.get("summary"),
"linked_issue_status": linked_fields.get("status", {}).get("name")
if linked_fields.get("status")
else None,
"linked_issue_priority": linked_fields.get("priority", {}).get("name")
if linked_fields.get("priority")
else None,
}
)
return records
def transform_remote_links(raw_issue: dict) -> list[dict] | None:
"""Extract and transform remote links from an issue.
Returns:
- list[dict]: fresh records to upsert into parquet. May be empty,
meaning the issue legitimately has no remote links right now
(HTTP 200 with [] or HTTP 404 from the fetch).
- None: the _remote_links key was absent from raw_issue, which
signals that save_issue (or the equivalent backfill writer)
could not refresh remote links — typically a 401/403/5xx from
the Jira API. Callers MUST treat None as "skip the upsert";
overwriting with [] would delete existing parquet rows for
this issue.
The key shape is set by the writers (JiraService.save_issue,
JiraBackfiller, backfill_remote_links): present means the fetch
succeeded (200 or 404), absent means the fetch raised.
"""
issue_key = raw_issue.get("key")
# Treat both absent key and explicit None as the "no fresh data" signal —
# absent is the contract from save_issue/backfill writers, None is the
# defensive case where a JSON edit or older buggy code stored an explicit
# null (would otherwise blow up on `for rl in None`).
remote_links = raw_issue.get("_remote_links")
if remote_links is None:
return None
records = []
for rl in remote_links:
obj = rl.get("object", {})
app = rl.get("application", {})
records.append(
{
"issue_key": issue_key,
"remote_link_id": str(rl.get("id", "")),
"url": obj.get("url"),
"title": obj.get("title"),
"application_name": app.get("name"),
"application_type": app.get("type"),
}
)
return records
def get_month_key(dt: datetime | None) -> str:
"""Get month key (YYYY-MM) from datetime, defaulting to current month."""
if dt is None:
dt = datetime.now(timezone.utc)
return dt.strftime("%Y-%m")
def get_attachment_path(issue_key: str, attachment_id: str, filename: str) -> str:
"""
Generate hierarchical attachment path.
SUPPORT-14991 -> 14/991/54908_files.zip
"""
# Extract number from issue key (e.g., "SUPPORT-14991" -> "14991")
match = re.search(r"(\d+)$", issue_key)
if not match:
return f"other/{issue_key}/{attachment_id}_{filename}"
num = match.group(1)
# Split into prefix (thousands) and suffix (rest)
prefix = num[:-3] if len(num) > 3 else "0"
suffix = num[-3:] if len(num) >= 3 else num
return f"{prefix}/{suffix}/{attachment_id}_{filename}"
def transform_all(
raw_dir: Path,
output_dir: Path,
attachments_dir: Path | None = None,
) -> dict[str, int]:
"""
Transform all raw Jira JSON files into monthly Parquet chunks.
Output structure:
output_dir/
├── issues/
│ ├── 2025-01.parquet
│ └── 2026-02.parquet
├── comments/
│ └── ...
├── changelog/
│ └── ...
├── attachments/
│ └── ... (metadata only)
└── attachments_files/
└── 14/991/54908_files.zip (hierarchical)
Args:
raw_dir: Directory containing raw JSON files (issues/*.json)
output_dir: Directory for output Parquet files
attachments_dir: Directory containing downloaded attachments
Returns:
Dict with counts of records per table
"""
issues_dir = raw_dir / "issues"
if not issues_dir.exists():
logger.error(f"Issues directory not found: {issues_dir}")
return {}
# Collect records grouped by month (based on issue created_at)
issues_by_month: dict[str, list] = {}
comments_by_month: dict[str, list] = {}
attachments_by_month: dict[str, list] = {}
changelog_by_month: dict[str, list] = {}
issuelinks_by_month: dict[str, list] = {}
remote_links_by_month: dict[str, list] = {}
# Process each issue file
json_files = list(issues_dir.glob("*.json"))
logger.info(f"Processing {len(json_files)} issue files...")
for json_file in json_files:
try:
with open(json_file) as f:
raw_issue = json.load(f)
# Transform issue
issue_record = transform_issue(raw_issue)
issue_record["_raw_file"] = json_file.name
# Determine month key based on issue creation date
month_key = get_month_key(issue_record.get("created_at"))
# Add to month bucket
if month_key not in issues_by_month:
issues_by_month[month_key] = []
comments_by_month[month_key] = []
attachments_by_month[month_key] = []
changelog_by_month[month_key] = []
issuelinks_by_month[month_key] = []
remote_links_by_month[month_key] = []
issues_by_month[month_key].append(issue_record)
# Transform related data (all go to same month as parent issue)
comments_by_month[month_key].extend(transform_comments(raw_issue))
# Transform attachments with hierarchical paths
issue_key = raw_issue.get("key", "unknown")
for att_record in transform_attachments(raw_issue, attachments_dir):
# Update local_path to hierarchical structure
if att_record.get("local_path"):
att_record["hierarchical_path"] = get_attachment_path(
issue_key, att_record["attachment_id"], att_record["filename"]
)
attachments_by_month[month_key].append(att_record)
changelog_by_month[month_key].extend(transform_changelog(raw_issue))
issuelinks_by_month[month_key].extend(transform_issuelinks(raw_issue))
rl_records = transform_remote_links(raw_issue)
if rl_records is not None:
remote_links_by_month[month_key].extend(rl_records)
# else: _remote_links overlay was skipped (fetch failure). The batch
# rebuild writes monthly parquets from scratch, so this issue simply
# contributes no rows to the rebuild — it doesn't "preserve" anything.
# A re-run after the outage clears will repopulate. The incremental
# path (incremental_transform.py) is what genuinely preserves
# existing rows; batch mode is full-rebuild and not the hot path.
except Exception as e:
logger.error(f"Error processing {json_file}: {e}")
# Create output directories
(output_dir / "issues").mkdir(parents=True, exist_ok=True)
(output_dir / "comments").mkdir(parents=True, exist_ok=True)
(output_dir / "attachments").mkdir(parents=True, exist_ok=True)
(output_dir / "changelog").mkdir(parents=True, exist_ok=True)
(output_dir / "issuelinks").mkdir(parents=True, exist_ok=True)
(output_dir / "remote_links").mkdir(parents=True, exist_ok=True)
# Save to monthly Parquet files
counts = {"issues": 0, "comments": 0, "attachments": 0, "changelog": 0, "issuelinks": 0, "remote_links": 0}
for month_key in sorted(issues_by_month.keys()):
# Issues
if issues_by_month[month_key]:
table = apply_schema(pd.DataFrame(issues_by_month[month_key]), ISSUES_SCHEMA)
pq.write_table(table, output_dir / "issues" / f"{month_key}.parquet")
counts["issues"] += table.num_rows
logger.info(f"Saved {table.num_rows} issues to issues/{month_key}.parquet")
# Comments
if comments_by_month[month_key]:
table = apply_schema(pd.DataFrame(comments_by_month[month_key]), COMMENTS_SCHEMA)
pq.write_table(table, output_dir / "comments" / f"{month_key}.parquet")
counts["comments"] += table.num_rows
logger.info(f"Saved {table.num_rows} comments to comments/{month_key}.parquet")
# Attachments (metadata)
if attachments_by_month[month_key]:
table = apply_schema(pd.DataFrame(attachments_by_month[month_key]), ATTACHMENTS_SCHEMA)
pq.write_table(table, output_dir / "attachments" / f"{month_key}.parquet")
counts["attachments"] += table.num_rows
logger.info(f"Saved {table.num_rows} attachments to attachments/{month_key}.parquet")
# Changelog
if changelog_by_month[month_key]:
table = apply_schema(pd.DataFrame(changelog_by_month[month_key]), CHANGELOG_SCHEMA)
pq.write_table(table, output_dir / "changelog" / f"{month_key}.parquet")
counts["changelog"] += table.num_rows
logger.info(f"Saved {table.num_rows} changelog entries to changelog/{month_key}.parquet")
# Issue links
if issuelinks_by_month[month_key]:
table = apply_schema(pd.DataFrame(issuelinks_by_month[month_key]), ISSUELINKS_SCHEMA)
pq.write_table(table, output_dir / "issuelinks" / f"{month_key}.parquet")
counts["issuelinks"] += table.num_rows
logger.info(f"Saved {table.num_rows} issue links to issuelinks/{month_key}.parquet")
# Remote links
if remote_links_by_month[month_key]:
table = apply_schema(pd.DataFrame(remote_links_by_month[month_key]), REMOTE_LINKS_SCHEMA)
pq.write_table(table, output_dir / "remote_links" / f"{month_key}.parquet")
counts["remote_links"] += table.num_rows
logger.info(f"Saved {table.num_rows} remote links to remote_links/{month_key}.parquet")
logger.info(f"Created monthly chunks for {len(issues_by_month)} months")
return counts
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Transform raw Jira JSON to Parquet")
parser.add_argument("--raw-dir", type=Path, required=True, help="Directory with raw JSON files")
parser.add_argument("--output-dir", type=Path, required=True, help="Output directory for Parquet files")
parser.add_argument("--attachments-dir", type=Path, help="Directory with downloaded attachments")
args = parser.parse_args()
counts = transform_all(
raw_dir=args.raw_dir,
output_dir=args.output_dir,
attachments_dir=args.attachments_dir,
)
print(f"\nTransformation complete: {counts}")