agnes-the-ai-analyst/connectors/jira/incremental_transform.py
Vojtech 38f6b639d2
feat(observability): request_id end-to-end + dev debug toolbar + centralized logging (#136)
Cuts release 0.20.0.

## Highlights
- X-Request-ID header on every response + sanitized to [A-Za-z0-9_-] (CRLF log-forging mitigation)
- Error pages (HTML + JSON 500) surface request_id for support tickets
- Dev debug toolbar gated by DEBUG=1 — fastapi-debug-toolbar with custom DuckDBPanel
- Centralized app.logging_config.setup_logging() replaces 23 scattered basicConfig calls
- Telegram bot drops bot.log file — stdout only (BREAKING)

## Devin findings addressed
- BUG_0001: .env.template no longer claims FastAPI debug=True
- BUG_0002: subprocess extractor logs INFO to stderr again
- ANALYSIS_0003: _wants_html no longer matches Accept: */* (curl gets JSON as before)
- BUG on b1c6ee9: HTML 500 page no longer leaks str(exc) in production
- BUG on b13d2fe: 2 CLAUDE.md compliance flags (transform.py + ws_gateway) accepted as scope-limited logging refactor — follow-up to update CLAUDE.md if needed

See CHANGELOG [0.20.0] for full notes.
2026-04-29 22:54:21 +02:00

299 lines
11 KiB
Python

"""
Incremental Jira transform - update single issue in Parquet files.
Called by webhook handler after issue JSON and attachments are saved.
Updates only the affected monthly Parquet file for efficient rsync.
"""
import json
import logging
import os
from pathlib import Path
import pandas as pd
import pyarrow.parquet as pq
# Import transform functions from batch transform
from .file_lock import parquet_month_lock
from .validation import is_valid_issue_key, safe_join_under
from .transform import (
ATTACHMENTS_SCHEMA,
CHANGELOG_SCHEMA,
COMMENTS_SCHEMA,
ISSUES_SCHEMA,
ISSUELINKS_SCHEMA,
REMOTE_LINKS_SCHEMA,
apply_schema,
get_month_key,
transform_attachments,
transform_changelog,
transform_comments,
transform_issue,
transform_issuelinks,
transform_remote_links,
)
logger = logging.getLogger(__name__)
# Default paths (can be overridden via environment)
DEFAULT_RAW_DIR = Path(os.environ.get("DATA_DIR", "/data")) / "extracts" / "jira" / "raw"
DEFAULT_OUTPUT_DIR = Path(os.environ.get("DATA_DIR", "/data")) / "extracts" / "jira" / "data"
def upsert_dataframe(
existing_df: pd.DataFrame | None,
new_records: list[dict],
key_column: str,
issue_key: str,
) -> pd.DataFrame:
"""
Upsert new records into existing DataFrame.
- Removes all rows matching issue_key
- Adds new records
Args:
existing_df: Existing DataFrame (or None if new file)
new_records: List of new records to add
key_column: Column used for matching (e.g., 'issue_key')
issue_key: Issue key to remove/replace
Returns:
Updated DataFrame
"""
new_df = pd.DataFrame(new_records) if new_records else pd.DataFrame()
if existing_df is None or existing_df.empty:
return new_df
if new_df.empty:
# Remove issue from existing data (deletion case)
return existing_df[existing_df[key_column] != issue_key].copy()
# Remove old records for this issue, add new ones
filtered = existing_df[existing_df[key_column] != issue_key]
return pd.concat([filtered, new_df], ignore_index=True)
def load_parquet_month(parquet_dir: Path, month_key: str) -> pd.DataFrame | None:
"""Load existing Parquet file for a month, or return None."""
parquet_path = parquet_dir / f"{month_key}.parquet"
if parquet_path.exists():
try:
return pd.read_parquet(parquet_path)
except Exception as e:
logger.warning(f"Failed to read {parquet_path}: {e}")
return None
def save_parquet_month(
df: pd.DataFrame,
schema: dict,
output_dir: Path,
month_key: str,
) -> Path:
"""Save DataFrame to monthly Parquet file with explicit schema."""
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{month_key}.parquet"
if df.empty:
# Don't write empty files, but delete if exists
if output_path.exists():
output_path.unlink()
logger.info(f"Removed empty {output_path}")
return output_path
table = apply_schema(df, schema)
pq.write_table(table, output_path)
logger.info(f"Saved {len(df)} records to {output_path}")
return output_path
def transform_single_issue(
issue_key: str,
raw_dir: Path | None = None,
output_dir: Path | None = None,
attachments_dir: Path | None = None,
deleted: bool = False,
) -> bool:
"""
Transform a single issue and update monthly Parquet files.
This is called by webhook handler after issue JSON is saved.
Only updates the month that the issue belongs to.
Args:
issue_key: Jira issue key (e.g., "SUPPORT-1234")
raw_dir: Directory with raw JSON files
output_dir: Output directory for Parquet files
attachments_dir: Directory with downloaded attachments
deleted: If True, remove issue from Parquet (deletion event)
Returns:
True if successful, False otherwise
"""
raw_dir = raw_dir or DEFAULT_RAW_DIR
output_dir = output_dir or DEFAULT_OUTPUT_DIR
attachments_dir = attachments_dir or (raw_dir / "attachments")
# Defense-in-depth: even if a stale/legacy code path bypasses webhook
# validation, the transform step will refuse a malformed key (issue #83).
if not is_valid_issue_key(issue_key):
logger.error(f"Refusing transform for malformed issue key: {issue_key!r}")
return False
issues_dir = raw_dir / "issues"
try:
json_path = safe_join_under(issues_dir, f"{issue_key}.json")
except ValueError as e:
logger.error(f"Path traversal blocked in transform for {issue_key!r}: {e}")
return False
if deleted:
# For deletion, we need to find which month the issue was in
# Check all monthly files - this is rare so OK to be slower
logger.info(f"Processing deletion for {issue_key}")
return _handle_deletion(issue_key, output_dir)
if not json_path.exists():
logger.error(f"Issue JSON not found: {json_path}")
return False
try:
# Load raw issue data
with open(json_path) as f:
raw_issue = json.load(f)
# Transform issue
issue_record = transform_issue(raw_issue)
issue_record["_raw_file"] = json_path.name
# Determine month
month_key = get_month_key(issue_record.get("created_at"))
logger.info(f"Updating {issue_key} in month {month_key}")
# Transform related data
comments_records = transform_comments(raw_issue)
attachments_records = transform_attachments(raw_issue, attachments_dir)
changelog_records = transform_changelog(raw_issue)
# Transform link/remote data outside lock (minimize hold time)
issuelinks_records = transform_issuelinks(raw_issue)
remote_links_records = transform_remote_links(raw_issue)
# Parquet read-modify-write under per-month lock to prevent
# "last writer wins" race when concurrent webhooks touch the
# same monthly partition (see issue #205).
with parquet_month_lock(output_dir, month_key):
updated_paths = []
# Issues
existing_issues = load_parquet_month(output_dir / "issues", month_key)
updated_issues = upsert_dataframe(existing_issues, [issue_record], "issue_key", issue_key)
path = save_parquet_month(updated_issues, ISSUES_SCHEMA, output_dir / "issues", month_key)
updated_paths.append(path)
# Comments
existing_comments = load_parquet_month(output_dir / "comments", month_key)
updated_comments = upsert_dataframe(existing_comments, comments_records, "issue_key", issue_key)
path = save_parquet_month(updated_comments, COMMENTS_SCHEMA, output_dir / "comments", month_key)
updated_paths.append(path)
# Attachments
existing_attachments = load_parquet_month(output_dir / "attachments", month_key)
updated_attachments = upsert_dataframe(existing_attachments, attachments_records, "issue_key", issue_key)
path = save_parquet_month(updated_attachments, ATTACHMENTS_SCHEMA, output_dir / "attachments", month_key)
updated_paths.append(path)
# Changelog
existing_changelog = load_parquet_month(output_dir / "changelog", month_key)
updated_changelog = upsert_dataframe(existing_changelog, changelog_records, "issue_key", issue_key)
path = save_parquet_month(updated_changelog, CHANGELOG_SCHEMA, output_dir / "changelog", month_key)
updated_paths.append(path)
# Issue links
existing_issuelinks = load_parquet_month(output_dir / "issuelinks", month_key)
updated_issuelinks = upsert_dataframe(existing_issuelinks, issuelinks_records, "issue_key", issue_key)
path = save_parquet_month(updated_issuelinks, ISSUELINKS_SCHEMA, output_dir / "issuelinks", month_key)
updated_paths.append(path)
# Remote links
existing_remote_links = load_parquet_month(output_dir / "remote_links", month_key)
updated_remote_links = upsert_dataframe(existing_remote_links, remote_links_records, "issue_key", issue_key)
path = save_parquet_month(updated_remote_links, REMOTE_LINKS_SCHEMA, output_dir / "remote_links", month_key)
updated_paths.append(path)
# Update extract.duckdb _meta for all affected tables
try:
from .extract_init import update_meta
extract_dir = output_dir.parent # output_dir is .../data, parent is .../jira
for table_name in ["issues", "comments", "attachments", "changelog", "issuelinks", "remote_links"]:
update_meta(extract_dir, table_name)
except Exception as meta_err:
logger.warning(f"Could not update extract.duckdb _meta: {meta_err}")
logger.info(f"Successfully updated {issue_key} in Parquet files")
return True
except Exception as e:
logger.error(f"Error transforming {issue_key}: {e}", exc_info=True)
return False
def _handle_deletion(
issue_key: str,
output_dir: Path,
) -> bool:
"""Handle issue deletion by removing from all monthly files."""
found = False
for table_name, schema in [
("issues", ISSUES_SCHEMA),
("comments", COMMENTS_SCHEMA),
("attachments", ATTACHMENTS_SCHEMA),
("changelog", CHANGELOG_SCHEMA),
("issuelinks", ISSUELINKS_SCHEMA),
("remote_links", REMOTE_LINKS_SCHEMA),
]:
table_dir = output_dir / table_name
if not table_dir.exists():
continue
for parquet_file in table_dir.glob("*.parquet"):
month_key = parquet_file.stem
try:
with parquet_month_lock(output_dir, month_key):
df = pd.read_parquet(parquet_file)
if "issue_key" in df.columns and issue_key in df["issue_key"].values:
df = df[df["issue_key"] != issue_key]
save_parquet_month(df, schema, table_dir, month_key)
found = True
logger.info(f"Removed {issue_key} from {parquet_file}")
except Exception as e:
logger.warning(f"Error checking {parquet_file}: {e}")
return found
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Incremental Jira transform")
parser.add_argument("issue_key", help="Jira issue key (e.g., SUPPORT-1234)")
parser.add_argument("--raw-dir", type=Path, help="Raw JSON directory")
parser.add_argument("--output-dir", type=Path, help="Output Parquet directory")
parser.add_argument("--attachments-dir", type=Path, help="Attachments directory")
parser.add_argument("--deleted", action="store_true", help="Issue was deleted")
args = parser.parse_args()
success = transform_single_issue(
issue_key=args.issue_key,
raw_dir=args.raw_dir,
output_dir=args.output_dir,
attachments_dir=args.attachments_dir,
deleted=args.deleted,
)
exit(0 if success else 1)