diff --git a/connectors/jira/extract_init.py b/connectors/jira/extract_init.py new file mode 100644 index 0000000..0b6856d --- /dev/null +++ b/connectors/jira/extract_init.py @@ -0,0 +1,121 @@ +"""Initialize Jira extract.duckdb with _meta table and views for all entity types. + +Called once on first webhook or manually via CLI. Creates the extract.duckdb +contract structure for the Jira connector. +""" + +import logging +import os +from datetime import datetime, timezone +from pathlib import Path + +import duckdb + +logger = logging.getLogger(__name__) + +JIRA_TABLES = ["issues", "comments", "attachments", "changelog", "issuelinks", "remote_links"] + + +def init_extract(output_dir: str | Path) -> None: + """Create /data/extracts/jira/extract.duckdb with _meta and views. + + Views point to monthly parquet partitions in data/{table}/*.parquet. + Safe to call multiple times — recreates _meta and views. + """ + output_path = Path(output_dir) + data_dir = output_path / "data" + data_dir.mkdir(parents=True, exist_ok=True) + + db_path = output_path / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + + try: + # Create _meta table + conn.execute("DROP TABLE IF EXISTS _meta") + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR NOT NULL, + description VARCHAR, + rows BIGINT, + size_bytes BIGINT, + extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""") + + now = datetime.now(timezone.utc) + for table_name in JIRA_TABLES: + table_dir = data_dir / table_name + table_dir.mkdir(exist_ok=True) + + # Create view that reads all parquet files in the table directory + glob_path = str(table_dir / "*.parquet") + conn.execute( + f'CREATE OR REPLACE VIEW "{table_name}" AS ' + f"SELECT * FROM read_parquet('{glob_path}', union_by_name=true, hive_partitioning=false)" + ) + + # Count existing rows if any parquets exist + rows = 0 + size_bytes = 0 + parquets = list(table_dir.glob("*.parquet")) + if parquets: + try: + rows = conn.execute(f'SELECT count(*) FROM "{table_name}"').fetchone()[0] + size_bytes = sum(f.stat().st_size for f in parquets) + except Exception: + pass + + conn.execute( + "INSERT INTO _meta VALUES (?, ?, ?, ?, ?, 'local')", + [table_name, f"Jira {table_name}", rows, size_bytes, now], + ) + + logger.info("Initialized Jira extract.duckdb at %s with %d tables", db_path, len(JIRA_TABLES)) + finally: + conn.close() + + +def update_meta(output_dir: str | Path, table_name: str) -> None: + """Update _meta entry for a table after parquet write. + + Called after incremental_transform writes/updates a parquet file. + """ + output_path = Path(output_dir) + db_path = output_path / "extract.duckdb" + + if not db_path.exists(): + init_extract(output_dir) + return + + conn = duckdb.connect(str(db_path)) + try: + table_dir = output_path / "data" / table_name + parquets = list(table_dir.glob("*.parquet")) + + rows = 0 + size_bytes = 0 + if parquets: + try: + glob_path = str(table_dir / "*.parquet") + rows = conn.execute(f"SELECT count(*) FROM read_parquet('{glob_path}', union_by_name=true)").fetchone()[0] + size_bytes = sum(f.stat().st_size for f in parquets) + except Exception as e: + logger.warning("Could not count rows for %s: %s", table_name, e) + + now = datetime.now(timezone.utc) + conn.execute( + "UPDATE _meta SET rows = ?, size_bytes = ?, extracted_at = ? WHERE table_name = ?", + [rows, size_bytes, now, table_name], + ) + finally: + conn.close() + + +def get_default_output_dir() -> Path: + """Get the default Jira extract output directory.""" + data_dir = Path(os.environ.get("DATA_DIR", "/data")) + return data_dir / "extracts" / "jira" + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + init_extract(get_default_output_dir()) diff --git a/connectors/jira/incremental_transform.py b/connectors/jira/incremental_transform.py index 426368d..a582299 100644 --- a/connectors/jira/incremental_transform.py +++ b/connectors/jira/incremental_transform.py @@ -38,8 +38,8 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Default paths (can be overridden via environment) -DEFAULT_RAW_DIR = Path("/data/src_data/raw/jira") -DEFAULT_OUTPUT_DIR = Path("/data/src_data/parquet/jira") +DEFAULT_RAW_DIR = Path(os.environ.get("DATA_DIR", "/data")) / "extracts" / "jira" / "raw" +DEFAULT_OUTPUT_DIR = Path(os.environ.get("DATA_DIR", "/data")) / "extracts" / "jira" / "data" def upsert_dataframe( @@ -214,6 +214,15 @@ def transform_single_issue( path = save_parquet_month(updated_remote_links, REMOTE_LINKS_SCHEMA, output_dir / "remote_links", month_key) updated_paths.append(path) + # Update extract.duckdb _meta for all affected tables + try: + from .extract_init import update_meta + extract_dir = output_dir.parent # output_dir is .../data, parent is .../jira + for table_name in ["issues", "comments", "attachments", "changelog", "issuelinks", "remote_links"]: + update_meta(extract_dir, table_name) + except Exception as meta_err: + logger.warning(f"Could not update extract.duckdb _meta: {meta_err}") + logger.info(f"Successfully updated {issue_key} in Parquet files") return True diff --git a/connectors/jira/service.py b/connectors/jira/service.py index 7fc7a20..1265019 100644 --- a/connectors/jira/service.py +++ b/connectors/jira/service.py @@ -47,6 +47,12 @@ def trigger_incremental_transform(issue_key: str, deleted: bool = False) -> bool if success: logger.info(f"Incremental transform completed for {issue_key}") + # Rebuild Jira views in master analytics.duckdb + try: + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild_source("jira") + except Exception as orch_err: + logger.warning(f"Orchestrator rebuild failed: {orch_err}") else: logger.warning(f"Incremental transform failed for {issue_key}")