Move all Jira-specific code into a self-contained connector module: - 22 files moved via git mv (transform, service, webhook, scripts, systemd units, tests, docs, bin helper) - All imports updated to use connectors.jira.* paths - Jira is now conditional: auto-detected via JIRA_DOMAIN env var - Webapp registers Jira blueprint only when available - Health service monitors Jira timers only when enabled - Profiler loads Jira tables dynamically from filesystem - Sync settings uses config-driven dependency validation - Renamed keboola_platform_url -> custom_url in transform - Updated deploy.sh, sudoers-deploy, backfill_gap.sh paths - Fixed pytest.ini to skip live tests by default
281 lines
10 KiB
Python
281 lines
10 KiB
Python
"""
|
|
Incremental Jira transform - update single issue in Parquet files.
|
|
|
|
Called by webhook handler after issue JSON and attachments are saved.
|
|
Updates only the affected monthly Parquet file for efficient rsync.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
import pyarrow as pa
|
|
import pyarrow.parquet as pq
|
|
|
|
# Import transform functions from batch transform
|
|
from .file_lock import parquet_month_lock
|
|
from .transform import (
|
|
ATTACHMENTS_SCHEMA,
|
|
CHANGELOG_SCHEMA,
|
|
COMMENTS_SCHEMA,
|
|
ISSUES_SCHEMA,
|
|
ISSUELINKS_SCHEMA,
|
|
REMOTE_LINKS_SCHEMA,
|
|
apply_schema,
|
|
get_month_key,
|
|
transform_attachments,
|
|
transform_changelog,
|
|
transform_comments,
|
|
transform_issue,
|
|
transform_issuelinks,
|
|
transform_remote_links,
|
|
)
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default paths (can be overridden via environment)
|
|
DEFAULT_RAW_DIR = Path("/data/src_data/raw/jira")
|
|
DEFAULT_OUTPUT_DIR = Path("/data/src_data/parquet/jira")
|
|
|
|
|
|
def upsert_dataframe(
|
|
existing_df: pd.DataFrame | None,
|
|
new_records: list[dict],
|
|
key_column: str,
|
|
issue_key: str,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Upsert new records into existing DataFrame.
|
|
|
|
- Removes all rows matching issue_key
|
|
- Adds new records
|
|
|
|
Args:
|
|
existing_df: Existing DataFrame (or None if new file)
|
|
new_records: List of new records to add
|
|
key_column: Column used for matching (e.g., 'issue_key')
|
|
issue_key: Issue key to remove/replace
|
|
|
|
Returns:
|
|
Updated DataFrame
|
|
"""
|
|
new_df = pd.DataFrame(new_records) if new_records else pd.DataFrame()
|
|
|
|
if existing_df is None or existing_df.empty:
|
|
return new_df
|
|
|
|
if new_df.empty:
|
|
# Remove issue from existing data (deletion case)
|
|
return existing_df[existing_df[key_column] != issue_key].copy()
|
|
|
|
# Remove old records for this issue, add new ones
|
|
filtered = existing_df[existing_df[key_column] != issue_key]
|
|
return pd.concat([filtered, new_df], ignore_index=True)
|
|
|
|
|
|
def load_parquet_month(parquet_dir: Path, month_key: str) -> pd.DataFrame | None:
|
|
"""Load existing Parquet file for a month, or return None."""
|
|
parquet_path = parquet_dir / f"{month_key}.parquet"
|
|
if parquet_path.exists():
|
|
try:
|
|
return pd.read_parquet(parquet_path)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to read {parquet_path}: {e}")
|
|
return None
|
|
|
|
|
|
def save_parquet_month(
|
|
df: pd.DataFrame,
|
|
schema: dict,
|
|
output_dir: Path,
|
|
month_key: str,
|
|
) -> Path:
|
|
"""Save DataFrame to monthly Parquet file with explicit schema."""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_path = output_dir / f"{month_key}.parquet"
|
|
|
|
if df.empty:
|
|
# Don't write empty files, but delete if exists
|
|
if output_path.exists():
|
|
output_path.unlink()
|
|
logger.info(f"Removed empty {output_path}")
|
|
return output_path
|
|
|
|
table = apply_schema(df, schema)
|
|
pq.write_table(table, output_path)
|
|
logger.info(f"Saved {len(df)} records to {output_path}")
|
|
return output_path
|
|
|
|
|
|
def transform_single_issue(
|
|
issue_key: str,
|
|
raw_dir: Path | None = None,
|
|
output_dir: Path | None = None,
|
|
attachments_dir: Path | None = None,
|
|
deleted: bool = False,
|
|
) -> bool:
|
|
"""
|
|
Transform a single issue and update monthly Parquet files.
|
|
|
|
This is called by webhook handler after issue JSON is saved.
|
|
Only updates the month that the issue belongs to.
|
|
|
|
Args:
|
|
issue_key: Jira issue key (e.g., "SUPPORT-1234")
|
|
raw_dir: Directory with raw JSON files
|
|
output_dir: Output directory for Parquet files
|
|
attachments_dir: Directory with downloaded attachments
|
|
deleted: If True, remove issue from Parquet (deletion event)
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
raw_dir = raw_dir or DEFAULT_RAW_DIR
|
|
output_dir = output_dir or DEFAULT_OUTPUT_DIR
|
|
attachments_dir = attachments_dir or (raw_dir / "attachments")
|
|
|
|
json_path = raw_dir / "issues" / f"{issue_key}.json"
|
|
|
|
if deleted:
|
|
# For deletion, we need to find which month the issue was in
|
|
# Check all monthly files - this is rare so OK to be slower
|
|
logger.info(f"Processing deletion for {issue_key}")
|
|
return _handle_deletion(issue_key, output_dir)
|
|
|
|
if not json_path.exists():
|
|
logger.error(f"Issue JSON not found: {json_path}")
|
|
return False
|
|
|
|
try:
|
|
# Load raw issue data
|
|
with open(json_path) as f:
|
|
raw_issue = json.load(f)
|
|
|
|
# Transform issue
|
|
issue_record = transform_issue(raw_issue)
|
|
issue_record["_raw_file"] = json_path.name
|
|
|
|
# Determine month
|
|
month_key = get_month_key(issue_record.get("created_at"))
|
|
logger.info(f"Updating {issue_key} in month {month_key}")
|
|
|
|
# Transform related data
|
|
comments_records = transform_comments(raw_issue)
|
|
attachments_records = transform_attachments(raw_issue, attachments_dir)
|
|
changelog_records = transform_changelog(raw_issue)
|
|
|
|
# Transform link/remote data outside lock (minimize hold time)
|
|
issuelinks_records = transform_issuelinks(raw_issue)
|
|
remote_links_records = transform_remote_links(raw_issue)
|
|
|
|
# Parquet read-modify-write under per-month lock to prevent
|
|
# "last writer wins" race when concurrent webhooks touch the
|
|
# same monthly partition (see issue #205).
|
|
with parquet_month_lock(output_dir, month_key):
|
|
updated_paths = []
|
|
|
|
# Issues
|
|
existing_issues = load_parquet_month(output_dir / "issues", month_key)
|
|
updated_issues = upsert_dataframe(existing_issues, [issue_record], "issue_key", issue_key)
|
|
path = save_parquet_month(updated_issues, ISSUES_SCHEMA, output_dir / "issues", month_key)
|
|
updated_paths.append(path)
|
|
|
|
# Comments
|
|
existing_comments = load_parquet_month(output_dir / "comments", month_key)
|
|
updated_comments = upsert_dataframe(existing_comments, comments_records, "issue_key", issue_key)
|
|
path = save_parquet_month(updated_comments, COMMENTS_SCHEMA, output_dir / "comments", month_key)
|
|
updated_paths.append(path)
|
|
|
|
# Attachments
|
|
existing_attachments = load_parquet_month(output_dir / "attachments", month_key)
|
|
updated_attachments = upsert_dataframe(existing_attachments, attachments_records, "issue_key", issue_key)
|
|
path = save_parquet_month(updated_attachments, ATTACHMENTS_SCHEMA, output_dir / "attachments", month_key)
|
|
updated_paths.append(path)
|
|
|
|
# Changelog
|
|
existing_changelog = load_parquet_month(output_dir / "changelog", month_key)
|
|
updated_changelog = upsert_dataframe(existing_changelog, changelog_records, "issue_key", issue_key)
|
|
path = save_parquet_month(updated_changelog, CHANGELOG_SCHEMA, output_dir / "changelog", month_key)
|
|
updated_paths.append(path)
|
|
|
|
# Issue links
|
|
existing_issuelinks = load_parquet_month(output_dir / "issuelinks", month_key)
|
|
updated_issuelinks = upsert_dataframe(existing_issuelinks, issuelinks_records, "issue_key", issue_key)
|
|
path = save_parquet_month(updated_issuelinks, ISSUELINKS_SCHEMA, output_dir / "issuelinks", month_key)
|
|
updated_paths.append(path)
|
|
|
|
# Remote links
|
|
existing_remote_links = load_parquet_month(output_dir / "remote_links", month_key)
|
|
updated_remote_links = upsert_dataframe(existing_remote_links, remote_links_records, "issue_key", issue_key)
|
|
path = save_parquet_month(updated_remote_links, REMOTE_LINKS_SCHEMA, output_dir / "remote_links", month_key)
|
|
updated_paths.append(path)
|
|
|
|
logger.info(f"Successfully updated {issue_key} in Parquet files")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error transforming {issue_key}: {e}", exc_info=True)
|
|
return False
|
|
|
|
|
|
def _handle_deletion(
|
|
issue_key: str,
|
|
output_dir: Path,
|
|
) -> bool:
|
|
"""Handle issue deletion by removing from all monthly files."""
|
|
found = False
|
|
|
|
for table_name, schema in [
|
|
("issues", ISSUES_SCHEMA),
|
|
("comments", COMMENTS_SCHEMA),
|
|
("attachments", ATTACHMENTS_SCHEMA),
|
|
("changelog", CHANGELOG_SCHEMA),
|
|
("issuelinks", ISSUELINKS_SCHEMA),
|
|
("remote_links", REMOTE_LINKS_SCHEMA),
|
|
]:
|
|
table_dir = output_dir / table_name
|
|
if not table_dir.exists():
|
|
continue
|
|
|
|
for parquet_file in table_dir.glob("*.parquet"):
|
|
month_key = parquet_file.stem
|
|
try:
|
|
with parquet_month_lock(output_dir, month_key):
|
|
df = pd.read_parquet(parquet_file)
|
|
if "issue_key" in df.columns and issue_key in df["issue_key"].values:
|
|
df = df[df["issue_key"] != issue_key]
|
|
save_parquet_month(df, schema, table_dir, month_key)
|
|
|
|
found = True
|
|
logger.info(f"Removed {issue_key} from {parquet_file}")
|
|
except Exception as e:
|
|
logger.warning(f"Error checking {parquet_file}: {e}")
|
|
|
|
return found
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Incremental Jira transform")
|
|
parser.add_argument("issue_key", help="Jira issue key (e.g., SUPPORT-1234)")
|
|
parser.add_argument("--raw-dir", type=Path, help="Raw JSON directory")
|
|
parser.add_argument("--output-dir", type=Path, help="Output Parquet directory")
|
|
parser.add_argument("--attachments-dir", type=Path, help="Attachments directory")
|
|
parser.add_argument("--deleted", action="store_true", help="Issue was deleted")
|
|
|
|
args = parser.parse_args()
|
|
|
|
success = transform_single_issue(
|
|
issue_key=args.issue_key,
|
|
raw_dir=args.raw_dir,
|
|
output_dir=args.output_dir,
|
|
attachments_dir=args.attachments_dir,
|
|
deleted=args.deleted,
|
|
)
|
|
|
|
exit(0 if success else 1)
|