""" Catalog Export - generate YAML metric and table definitions from OpenMetadata. Fetches metrics and table metadata from the OpenMetadata catalog API, transforms them into YAML files compatible with the AI Data Analyst agent (MetricParser format), and writes them to the docs directory. Output structure: /data/docs/metrics/{category}/{metric_name}.yml - individual metric files /data/docs/metrics/metrics.yml - summary index /data/docs/tables/{table_name}.yml - table column descriptions Usage: python -m src.catalog_export """ import json import logging import os from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List from urllib.parse import quote import yaml from config.loader import load_instance_config from connectors.openmetadata.client import OpenMetadataClient from connectors.openmetadata.transformer import ( has_tag, metric_to_yaml_dict, table_to_yaml_dict, ) from src.db import get_system_db from src.repositories.table_registry import TableRegistryRepository logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- AUTO_GENERATED_MARKER = "# AUTO-GENERATED from OpenMetadata catalog" CATALOG_SOURCE_LINE = "# Source: {url}" GENERATED_LINE = "# Generated: {timestamp}" DO_NOT_EDIT_LINE = "# DO NOT EDIT - changes will be overwritten on next catalog export" def _get_docs_dir() -> Path: """ Derive docs output directory from DATA_DIR environment variable. DATA_DIR is typically /data/src_data -> docs dir is /data/docs. Falls back to ./docs if DATA_DIR is not set. Returns: Path to docs directory (created if needed) """ data_dir = Path(os.environ.get("DATA_DIR", "./data")) # /data/src_data -> /data/docs docs_dir = data_dir.parent / "docs" return docs_dir def _yaml_header(catalog_url: str, fqn: str = "", entity_type: str = "") -> str: """ Generate AUTO-GENERATED header comment for YAML files. Args: catalog_url: OpenMetadata instance URL fqn: Optional entity FQN for reference entity_type: Entity type for catalog URL (e.g., "metric", "table") Returns: Multi-line comment string """ timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") if fqn and entity_type: source_url = f"{catalog_url}/{entity_type}/{quote(fqn, safe='.')}" else: source_url = catalog_url lines = [ AUTO_GENERATED_MARKER, CATALOG_SOURCE_LINE.format(url=source_url), GENERATED_LINE.format(timestamp=timestamp), DO_NOT_EDIT_LINE, ] if fqn: lines.append(f"# FQN: {fqn}") return "\n".join(lines) + "\n" def _is_auto_generated(file_path: Path) -> bool: """ Check if a YAML file was auto-generated by this script. Reads the first line and checks for the AUTO-GENERATED marker. Args: file_path: Path to YAML file Returns: True if file starts with AUTO-GENERATED marker """ try: with open(file_path, "r", encoding="utf-8") as f: first_line = f.readline().strip() return first_line == AUTO_GENERATED_MARKER except Exception: return False def export_metrics( client: OpenMetadataClient, docs_dir: Path, catalog_url: str, filter_tag: str = "", data_product: str = "", ) -> int: """ Export metrics from OpenMetadata to YAML files. For each metric: 1. Discovers metrics via data product (preferred) or fetches all + filters by tag 2. Transforms each to YAML-compatible dict 3. Writes individual YAML files: {docs_dir}/metrics/{category}/{name}.yml 4. Writes index file: {docs_dir}/metrics/metrics.yml 5. Cleans up stale auto-generated files Args: client: Initialized OpenMetadata API client docs_dir: Base docs directory (e.g., /data/docs) catalog_url: Catalog URL for header comments filter_tag: If set, only export metrics that have this tag (e.g., "AIAgent.MyAgent") data_product: If set, discover metrics via data product assets (preferred over filter_tag) Returns: Number of metrics exported """ metrics_dir = docs_dir / "metrics" metrics_dir.mkdir(parents=True, exist_ok=True) raw_metrics: List[Dict[str, Any]] = [] # Strategy 1: Discover metrics via data product (preferred) if data_product: try: raw_metrics = client.search_by_data_product( data_product_name=data_product, entity_type="metric", limit=200, ) logger.info(f"Data product '{data_product}': found {len(raw_metrics)} metrics") except Exception as e: logger.warning(f"Data product search failed, falling back to tag filter: {e}") raw_metrics = [] # Strategy 2: Fallback to tag-based filter if not raw_metrics: raw_metrics = client.get_metrics(limit=200, fields="tags,owners") if not raw_metrics: logger.warning("No metrics returned from catalog - preserving existing files") return 0 logger.info(f"Fetched {len(raw_metrics)} metrics from catalog") if filter_tag: filtered = [m for m in raw_metrics if has_tag(m.get("tags", []), filter_tag)] logger.info(f"Tag filter '{filter_tag}': {len(filtered)}/{len(raw_metrics)} metrics matched") raw_metrics = filtered # Track which files we write (for cleanup) written_files: set[Path] = set() index_entries: List[Dict[str, Any]] = [] for raw in raw_metrics: try: yaml_dict = metric_to_yaml_dict(raw) fqn = raw.get("fullyQualifiedName", "") category = yaml_dict["category"] metric_name = yaml_dict["name"] if not metric_name: logger.warning(f"Skipping metric with empty name: {fqn}") continue # Create category directory cat_dir = metrics_dir / category cat_dir.mkdir(parents=True, exist_ok=True) # Write individual metric YAML file_path = cat_dir / f"{metric_name}.yml" header = _yaml_header(catalog_url, fqn, entity_type="metric") # Wrap in list to match MetricParser expected format yaml_content = yaml.dump( [yaml_dict], default_flow_style=False, allow_unicode=True, sort_keys=False, ) file_path.write_text(header + "\n" + yaml_content, encoding="utf-8") written_files.add(file_path) # Add to index index_entries.append( { "name": metric_name, "display_name": yaml_dict["display_name"], "category": category, "grain": yaml_dict["grain"], "file": f"{category}/{metric_name}.yml", } ) logger.debug(f"Wrote metric: {file_path}") except Exception as e: logger.warning(f"Failed to export metric {raw.get('name', '?')}: {e}") continue # Write metrics index if index_entries: _write_metrics_index(metrics_dir, index_entries, catalog_url) written_files.add(metrics_dir / "metrics.yml") # Cleanup stale auto-generated metric files _cleanup_stale_files(metrics_dir, written_files, "*.yml") logger.info(f"Exported {len(index_entries)} metrics to {metrics_dir}") return len(index_entries) def _write_metrics_index( metrics_dir: Path, entries: List[Dict[str, Any]], catalog_url: str, ) -> None: """ Write metrics.yml summary index file. Args: metrics_dir: Path to metrics directory entries: List of index entry dicts (name, display_name, category, grain, file) catalog_url: Catalog URL for header """ timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") header_lines = [ AUTO_GENERATED_MARKER, f"# Index of all available metrics from OpenMetadata catalog ({catalog_url})", f"# Generated: {timestamp}", DO_NOT_EDIT_LINE, "#", "# Usage: Read specific metric file for full details before calculating.", "# Example: cat server/docs/metrics/finance/m1.yml", ] index_data = {"metrics": entries} yaml_content = yaml.dump( index_data, default_flow_style=False, allow_unicode=True, sort_keys=False, ) index_path = metrics_dir / "metrics.yml" index_path.write_text("\n".join(header_lines) + "\n\n" + yaml_content, encoding="utf-8") logger.debug(f"Wrote metrics index: {index_path}") def export_tables( client: OpenMetadataClient, tables: list[dict], docs_dir: Path, catalog_url: str, ) -> int: """ Export table metadata from OpenMetadata to YAML files. For each table in the registry: 1. Derives the OpenMetadata FQN 2. Fetches table metadata (columns, owners, tags, description) 3. Transforms to YAML dict 4. Writes to {docs_dir}/tables/{table_name}.yml Args: client: Initialized OpenMetadata API client tables: List of table dicts from TableRegistryRepository.list_all() docs_dir: Base docs directory catalog_url: Catalog URL for header comments Returns: Number of tables exported """ tables_dir = docs_dir / "tables" tables_dir.mkdir(parents=True, exist_ok=True) written_files: set[Path] = set() count = 0 for tbl in tables: table_id = tbl.get("id", "") table_name = tbl.get("name", "") try: # Use explicit catalog_fqn if set, otherwise derive from table id fqn = tbl.get("catalog_fqn") or f"bigquery.{table_id}" logger.debug(f"Fetching table metadata: {fqn}") raw_table = client.get_table(fqn) yaml_dict = table_to_yaml_dict(raw_table) # Write table YAML file_path = tables_dir / f"{table_name}.yml" header = _yaml_header(catalog_url, fqn, entity_type="table") yaml_content = yaml.dump( yaml_dict, default_flow_style=False, allow_unicode=True, sort_keys=False, ) file_path.write_text(header + "\n" + yaml_content, encoding="utf-8") written_files.add(file_path) count += 1 logger.info(f"Exported table: {table_name} ({len(yaml_dict.get('columns', []))} columns)") except Exception as e: logger.warning(f"Failed to export table {table_name}: {e}") continue # Cleanup stale auto-generated table files _cleanup_stale_files(tables_dir, written_files, "*.yml") logger.info(f"Exported {count} tables to {tables_dir}") return count def _cleanup_stale_files( directory: Path, current_files: set[Path], glob_pattern: str, ) -> None: """ Remove auto-generated files that are no longer in the current export set. Only removes files with the AUTO-GENERATED marker in the first line. Manual files (without the marker) are never touched. Args: directory: Directory to scan current_files: Set of file paths written in this export run glob_pattern: Glob pattern to match (e.g., "*.yml") """ for file_path in directory.rglob(glob_pattern): if file_path in current_files: continue if file_path.name == "metrics.yml" and file_path.parent == directory: # Index file managed separately continue if _is_auto_generated(file_path): logger.info(f"Removing stale auto-generated file: {file_path}") file_path.unlink() # Remove empty parent directory (category dir) parent = file_path.parent if parent != directory and not any(parent.iterdir()): parent.rmdir() logger.debug(f"Removed empty directory: {parent}") def _write_sync_state(docs_dir: Path, metrics_count: int, tables_count: int) -> None: """ Write sync state file for tracking last export. Args: docs_dir: Base docs directory metrics_count: Number of metrics exported tables_count: Number of tables exported """ state = { "last_export": datetime.now(timezone.utc).isoformat(), "metrics_count": metrics_count, "tables_count": tables_count, } state_path = docs_dir / ".catalog_sync_state.json" state_path.write_text(json.dumps(state, indent=2), encoding="utf-8") logger.debug(f"Wrote sync state: {state_path}") def main() -> None: """ Main entry point for catalog export. Loads instance config, initializes OpenMetadata client, exports metrics and tables to YAML, writes sync state. Exits gracefully (code 0) on catalog unavailability. """ logger.info("=== Catalog Export starting ===") try: instance_config = load_instance_config() except Exception as e: logger.error(f"Failed to load instance config: {e}") return # Get OpenMetadata config om_config = instance_config.get("openmetadata", {}) catalog_url = om_config.get("url", "").strip() token = om_config.get("token", "").strip() verify_ssl = om_config.get("verify_ssl", True) if not catalog_url or not token: logger.warning("OpenMetadata not configured (url or token missing) - skipping export") return docs_dir = _get_docs_dir() logger.info(f"Output directory: {docs_dir}") # Initialize client try: client = OpenMetadataClient( base_url=catalog_url, token=token, verify=verify_ssl, ) except Exception as e: logger.warning(f"Failed to initialize OpenMetadata client: {e}") return # Discovery config: data product (preferred) or tag filter (fallback) filter_tag = om_config.get("filter_tag", "").strip() data_product = om_config.get("data_product", "").strip() try: # Export metrics metrics_count = export_metrics( client, docs_dir, catalog_url, filter_tag=filter_tag, data_product=data_product, ) # Export tables try: conn = get_system_db() repo = TableRegistryRepository(conn) registered_tables = repo.list_all() tables_count = export_tables(client, registered_tables, docs_dir, catalog_url) except Exception as e: logger.warning(f"Table export skipped (registry error): {e}") tables_count = 0 # Write sync state _write_sync_state(docs_dir, metrics_count, tables_count) logger.info(f"=== Catalog Export complete: {metrics_count} metrics, {tables_count} tables ===") except Exception as e: logger.error(f"Catalog export failed: {e}") finally: client.close() if __name__ == "__main__": main()