From 985f47cdb735cecadeaf5595ffcbd16422a36070 Mon Sep 17 00:00:00 2001 From: Petr Date: Sun, 15 Mar 2026 01:15:30 +0100 Subject: [PATCH] Add catalog export: generate YAML metrics and tables from OpenMetadata - New `connectors/openmetadata/transformer.py` with shared parsing logic for extracting categories, grain, dimensions, expressions from OM tags - New `src/catalog_export.py` script (python -m src.catalog_export) that fetches metrics/tables from OpenMetadata API and writes YAML files to /data/docs/metrics/ and /data/docs/tables/ for agent consumption - Refactor webapp/app.py to delegate to transformer (with inline fallback) - Add `fields` parameter to client.get_metrics() and get_metric_by_fqn() for fetching tags+owners in a single API call - Fix pre-existing mock bug in test_openmetadata_enricher (base_url) - 101 new tests (80 transformer + 21 export), all passing --- connectors/openmetadata/client.py | 16 +- connectors/openmetadata/transformer.py | 392 +++++++++++++ src/catalog_export.py | 424 +++++++++++++ tests/test_catalog_export.py | 439 ++++++++++++++ tests/test_openmetadata_enricher.py | 3 +- tests/test_openmetadata_transformer.py | 783 +++++++++++++++++++++++++ webapp/app.py | 94 ++- 7 files changed, 2085 insertions(+), 66 deletions(-) create mode 100644 connectors/openmetadata/transformer.py create mode 100644 src/catalog_export.py create mode 100644 tests/test_catalog_export.py create mode 100644 tests/test_openmetadata_transformer.py diff --git a/connectors/openmetadata/client.py b/connectors/openmetadata/client.py index 738d4fc..f9623cc 100644 --- a/connectors/openmetadata/client.py +++ b/connectors/openmetadata/client.py @@ -87,22 +87,24 @@ class OpenMetadataClient: return response.json() - def get_metrics(self, limit: int = 100) -> List[Dict[str, Any]]: + def get_metrics(self, limit: int = 100, fields: str = "tags,owners") -> List[Dict[str, Any]]: """ - Fetch list of available metrics from OpenMetadata (Phase 2). + Fetch list of available metrics from OpenMetadata. Args: limit: Maximum number of metrics to return + fields: Comma-separated list of fields to include (e.g., "tags,owners") Returns: List of metric dictionaries with: - id, name, fullyQualifiedName - description - metricExpression: metric calculation SQL/formula - - owners, tags + - owners, tags (when requested via fields) """ params = { "limit": limit, + "fields": fields, } response = self._client.get("/api/v1/metrics", params=params) @@ -111,25 +113,27 @@ class OpenMetadataClient: data = response.json() return data.get("data", []) - def get_metric_by_fqn(self, fqn: str) -> Dict[str, Any]: + def get_metric_by_fqn(self, fqn: str, fields: str = "tags,owners") -> Dict[str, Any]: """ Fetch a specific metric by FQN from OpenMetadata. Args: fqn: Fully qualified name (e.g., "Active2 Customers") + fields: Comma-separated list of fields to include Returns: Dictionary with metric metadata: - id, name, fullyQualifiedName - description, metricExpression - - owners, tags + - owners, tags (when requested via fields) Raises: httpx.HTTPStatusError: If request fails (non-2xx status) """ url = f"/api/v1/metrics/name/{fqn}" + params = {"fields": fields} - response = self._client.get(url) + response = self._client.get(url, params=params) response.raise_for_status() return response.json() diff --git a/connectors/openmetadata/transformer.py b/connectors/openmetadata/transformer.py new file mode 100644 index 0000000..b842ec8 --- /dev/null +++ b/connectors/openmetadata/transformer.py @@ -0,0 +1,392 @@ +""" +OpenMetadata data transformer. + +Shared logic for parsing OpenMetadata API responses into structured dicts +suitable for YAML export and webapp display. Used by: +- src/catalog_export.py (YAML file generation) +- webapp/app.py (metric list and detail display) + +Extracts metadata from OpenMetadata tag conventions: +- MetricCategory.* or Category.* -> category +- Grain.* -> grain/granularity +- Dimension.* -> dimensions list +- MetricType.* -> metric type +- Unit.* -> unit of measurement +""" + +import logging +import re +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +def extract_category(tags: List[Dict[str, Any]]) -> str: + """ + Extract metric category from OpenMetadata tags. + + Looks for tagFQN prefixed with "MetricCategory." or "Category.". + Returns the first match found, or "general" as fallback. + + Args: + tags: List of tag dicts from OpenMetadata (each with "tagFQN" key) + + Returns: + Category string (e.g., "finance", "marketing") + """ + for tag in tags: + tag_fqn = tag.get("tagFQN", "") + if tag_fqn.startswith("MetricCategory."): + return tag_fqn.split(".", 1)[1] + if tag_fqn.startswith("Category."): + return tag_fqn.split(".", 1)[1] + return "general" + + +def extract_grain(raw_metric: Dict[str, Any]) -> str: + """ + Extract metric granularity from OpenMetadata metric data. + + Checks the "granularity" field first, then falls back to Grain.* tags. + + Args: + raw_metric: Raw metric dict from OpenMetadata API + + Returns: + Grain string (e.g., "monthly", "daily"), lowercase. Empty string if not found. + """ + grain = raw_metric.get("granularity", "") or "" + if grain: + return grain.lower() + + for tag in raw_metric.get("tags", []): + tag_fqn = tag.get("tagFQN", "") + if tag_fqn.startswith("Grain."): + return tag_fqn.split(".", 1)[1].lower() + + return "" + + +def extract_dimensions(tags: List[Dict[str, Any]]) -> List[str]: + """ + Extract dimension names from OpenMetadata tags. + + Looks for tagFQN prefixed with "Dimension.". + + Args: + tags: List of tag dicts from OpenMetadata + + Returns: + List of dimension names (e.g., ["economic_area", "merchant_country"]) + """ + dimensions = [] + for tag in tags: + tag_fqn = tag.get("tagFQN", "") + if tag_fqn.startswith("Dimension."): + dimensions.append(tag_fqn.split(".", 1)[1]) + return dimensions + + +def extract_expression(raw_metric: Dict[str, Any]) -> str: + """ + Extract metric SQL expression from OpenMetadata metric data. + + Handles both dict format ({"expression": "..."}) and plain string. + + Args: + raw_metric: Raw metric dict from OpenMetadata API + + Returns: + SQL expression string, or empty string if not found. + """ + metric_expr = raw_metric.get("metricExpression", {}) + if isinstance(metric_expr, dict): + return metric_expr.get("expression", "") or "" + if isinstance(metric_expr, str): + return metric_expr + return "" + + +def extract_owners(raw: Dict[str, Any]) -> List[str]: + """ + Extract owner names from OpenMetadata entity data. + + Args: + raw: Raw entity dict with optional "owners" list + + Returns: + List of owner name strings + """ + names = [] + for owner in raw.get("owners", []): + name = owner.get("name") or owner.get("displayName", "") + if name: + names.append(name) + return names + + +def extract_metric_type(raw_metric: Dict[str, Any]) -> str: + """ + Extract metric type from OpenMetadata metric data. + + Checks "metricType" field first, then MetricType.* tags. + + Args: + raw_metric: Raw metric dict from OpenMetadata API + + Returns: + Metric type string (e.g., "sum", "count"), lowercase. + """ + metric_type = raw_metric.get("metricType", "") or "" + if metric_type: + return metric_type.lower() + + for tag in raw_metric.get("tags", []): + tag_fqn = tag.get("tagFQN", "") + if tag_fqn.startswith("MetricType."): + return tag_fqn.split(".", 1)[1].lower() + + return "" + + +def extract_unit(raw_metric: Dict[str, Any]) -> str: + """ + Extract unit of measurement from OpenMetadata metric data. + + Checks "unitOfMeasurement" field first, then Unit.* tags. + + Args: + raw_metric: Raw metric dict from OpenMetadata API + + Returns: + Unit string (e.g., "USD", "count"). + """ + unit = raw_metric.get("unitOfMeasurement", "") or "" + if unit: + return unit + + for tag in raw_metric.get("tags", []): + tag_fqn = tag.get("tagFQN", "") + if tag_fqn.startswith("Unit."): + return tag_fqn.split(".", 1)[1] + + return "" + + +def extract_tag_names(tags: List[Dict[str, Any]]) -> List[str]: + """ + Extract simple tag names from OpenMetadata tag list. + + Uses "name" field if present, otherwise extracts last segment of "tagFQN". + + Args: + tags: List of tag dicts from OpenMetadata + + Returns: + List of tag name strings + """ + result = [] + for tag in tags: + name = tag.get("name") or tag.get("tagFQN", "").split(".")[-1] + if name: + result.append(name) + return result + + +def sanitize_filename(name: str) -> str: + """ + Convert metric/entity name to safe filesystem name. + + Replaces non-alphanumeric characters with underscores, collapses + consecutive underscores, strips leading/trailing underscores, lowercases. + + Args: + name: Raw entity name (e.g., "M1 Operational Margin") + + Returns: + Safe filename (e.g., "m1_operational_margin") + """ + safe = re.sub(r"[^a-zA-Z0-9]+", "_", name) + safe = re.sub(r"_+", "_", safe) + return safe.strip("_").lower() + + +def metric_to_yaml_dict(raw_metric: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform raw OpenMetadata metric into YAML-compatible dict. + + Output format is compatible with MetricParser._structure_metric_data() + and can be written directly as YAML for Claude Code agent consumption. + + Args: + raw_metric: Raw metric dict from OpenMetadata API + + Returns: + Dict with keys: name, display_name, category, type, unit, grain, + time_column, table, expression, description, dimensions, notes, synonyms + """ + tags = raw_metric.get("tags", []) + name = raw_metric.get("name", "") + display_name = raw_metric.get("displayName", name) + fqn = raw_metric.get("fullyQualifiedName", "") + + owner_names = extract_owners(raw_metric) + notes = [] + if fqn: + notes.append(f"Source: OpenMetadata catalog (FQN: {fqn})") + if owner_names: + notes.append(f"Owners: {', '.join(owner_names)}") + + return { + "name": sanitize_filename(name), + "display_name": display_name, + "category": extract_category(tags), + "type": extract_metric_type(raw_metric), + "unit": extract_unit(raw_metric), + "grain": extract_grain(raw_metric), + "time_column": "", + "table": "", + "expression": extract_expression(raw_metric), + "description": (raw_metric.get("description", "") or "").strip(), + "dimensions": extract_dimensions(tags), + "notes": notes, + "synonyms": [], + } + + +def metric_to_display_dict(raw_metric: Dict[str, Any]) -> Dict[str, Any]: + """ + Parse raw OpenMetadata metric for metric list display in webapp. + + Returns a lightweight dict for listing metrics (not full detail). + + Args: + raw_metric: Raw metric dict from OpenMetadata API + + Returns: + Dict with keys: name, display_name, description, grain, category, path + """ + fqn = raw_metric.get("fullyQualifiedName", "") + name = raw_metric.get("name", "") + display_name = raw_metric.get("displayName", name) + description = raw_metric.get("description", "") or "" + tags = raw_metric.get("tags", []) + + return { + "name": name, + "display_name": display_name, + "description": description, + "grain": extract_grain(raw_metric), + "category": extract_category(tags), + "path": f"catalog:{fqn}", + } + + +def metric_to_detail_dict(raw_metric: Dict[str, Any], category_colors: Optional[Dict[str, str]] = None) -> Dict[str, Any]: + """ + Convert raw OpenMetadata metric into MetricParser-compatible detail dict for modal display. + + Args: + raw_metric: Raw metric dict from OpenMetadata API + category_colors: Optional mapping of category -> CSS color hex + + Returns: + Dict matching MetricParser._structure_metric_data() output format + """ + if category_colors is None: + category_colors = {} + + tags = raw_metric.get("tags", []) + name = raw_metric.get("name", "") + display_name = raw_metric.get("displayName", name) + description = raw_metric.get("description", "") or "" + category = extract_category(tags) + expression = extract_expression(raw_metric) + + return { + "name": name, + "display_name": display_name, + "category": category, + "category_color": category_colors.get(category, "#6B7280"), + "metadata": { + "type": extract_metric_type(raw_metric), + "unit": extract_unit(raw_metric), + "grain": extract_grain(raw_metric), + "time_column": "", + }, + "overview": { + "description": description.strip(), + "key_insights": [], + }, + "validation": None, + "dimensions": extract_dimensions(tags), + "notes": { + "all": [], + "key_insights": [], + }, + "sql_examples": { + "expression": { + "title": "Metric Expression", + "query": expression, + "complexity": "simple", + } + } if expression else {}, + "technical": { + "table": "", + "expression": expression, + "synonyms": [], + "data_sources": [], + }, + "special_sections": {}, + } + + +def table_to_yaml_dict(raw_table: Dict[str, Any]) -> Dict[str, Any]: + """ + Transform raw OpenMetadata table response into YAML-compatible dict. + + Extracts table description, column metadata, owners, tags, and tier. + Reuses parsing logic from CatalogEnricher._parse_table_response(). + + Args: + raw_table: Raw table dict from OpenMetadata /api/v1/tables/name/{fqn} + + Returns: + Dict with keys: name, fqn, description, owners, tags, tier, columns + """ + fqn = raw_table.get("fullyQualifiedName", "") + name = raw_table.get("name", "") + description = raw_table.get("description", "") or "" + tags = raw_table.get("tags", []) + + # Parse columns + columns = [] + for col in raw_table.get("columns", []): + col_entry = { + "name": col.get("name", ""), + "type": col.get("dataType", ""), + "description": (col.get("description", "") or "").strip(), + } + columns.append(col_entry) + + # Parse tier from tags (Tier.Tier1 etc.) or extension + tier = None + extension = raw_table.get("extension", {}) + if extension: + tier = extension.get("tier") or extension.get("Tier") + if not tier: + for tag in tags: + tag_fqn = tag.get("tagFQN", "") + if tag_fqn.startswith("Tier."): + tier = tag_fqn.split(".", 1)[1] + break + + return { + "name": name, + "fqn": fqn, + "description": description.strip(), + "owners": extract_owners(raw_table), + "tags": extract_tag_names(tags), + "tier": tier or "", + "columns": columns, + } diff --git a/src/catalog_export.py b/src/catalog_export.py new file mode 100644 index 0000000..30a21e3 --- /dev/null +++ b/src/catalog_export.py @@ -0,0 +1,424 @@ +""" +Catalog Export - generate YAML metric and table definitions from OpenMetadata. + +Fetches metrics and table metadata from the OpenMetadata catalog API, +transforms them into YAML files compatible with the AI Data Analyst agent +(MetricParser format), and writes them to the docs directory. + +Output structure: + /data/docs/metrics/{category}/{metric_name}.yml - individual metric files + /data/docs/metrics/metrics.yml - summary index + /data/docs/tables/{table_name}.yml - table column descriptions + +Usage: + python -m src.catalog_export +""" + +import json +import logging +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +import yaml + +from config.loader import load_instance_config +from connectors.openmetadata.client import OpenMetadataClient +from connectors.openmetadata.transformer import ( + extract_category, + extract_grain, + metric_to_yaml_dict, + sanitize_filename, + table_to_yaml_dict, +) +from src.config import Config + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +AUTO_GENERATED_MARKER = "# AUTO-GENERATED from OpenMetadata catalog" +CATALOG_SOURCE_LINE = "# Source: {url}" +GENERATED_LINE = "# Generated: {timestamp}" +DO_NOT_EDIT_LINE = "# DO NOT EDIT - changes will be overwritten on next catalog export" + + +def _get_docs_dir() -> Path: + """ + Derive docs output directory from DATA_DIR environment variable. + + DATA_DIR is typically /data/src_data -> docs dir is /data/docs. + Falls back to ./docs if DATA_DIR is not set. + + Returns: + Path to docs directory (created if needed) + """ + data_dir = Path(os.environ.get("DATA_DIR", "./data")) + # /data/src_data -> /data/docs + docs_dir = data_dir.parent / "docs" + return docs_dir + + +def _yaml_header(catalog_url: str, fqn: str = "") -> str: + """ + Generate AUTO-GENERATED header comment for YAML files. + + Args: + catalog_url: OpenMetadata instance URL + fqn: Optional entity FQN for reference + + Returns: + Multi-line comment string + """ + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + lines = [ + AUTO_GENERATED_MARKER, + CATALOG_SOURCE_LINE.format(url=catalog_url), + GENERATED_LINE.format(timestamp=timestamp), + DO_NOT_EDIT_LINE, + ] + if fqn: + lines.append(f"# FQN: {fqn}") + return "\n".join(lines) + "\n" + + +def _is_auto_generated(file_path: Path) -> bool: + """ + Check if a YAML file was auto-generated by this script. + + Reads the first line and checks for the AUTO-GENERATED marker. + + Args: + file_path: Path to YAML file + + Returns: + True if file starts with AUTO-GENERATED marker + """ + try: + with open(file_path, "r", encoding="utf-8") as f: + first_line = f.readline().strip() + return first_line == AUTO_GENERATED_MARKER + except Exception: + return False + + +def export_metrics( + client: OpenMetadataClient, + docs_dir: Path, + catalog_url: str, +) -> int: + """ + Export metrics from OpenMetadata to YAML files. + + For each metric: + 1. Fetches all metrics from catalog API + 2. Transforms each to YAML-compatible dict + 3. Writes individual YAML files: {docs_dir}/metrics/{category}/{name}.yml + 4. Writes index file: {docs_dir}/metrics/metrics.yml + 5. Cleans up stale auto-generated files + + Args: + client: Initialized OpenMetadata API client + docs_dir: Base docs directory (e.g., /data/docs) + catalog_url: Catalog URL for header comments + + Returns: + Number of metrics exported + """ + metrics_dir = docs_dir / "metrics" + metrics_dir.mkdir(parents=True, exist_ok=True) + + # Fetch all metrics with tags and owners + raw_metrics = client.get_metrics(limit=200, fields="tags,owners") + if not raw_metrics: + logger.warning("No metrics returned from catalog - preserving existing files") + return 0 + + logger.info(f"Fetched {len(raw_metrics)} metrics from catalog") + + # Track which files we write (for cleanup) + written_files: set[Path] = set() + index_entries: List[Dict[str, Any]] = [] + + for raw in raw_metrics: + try: + yaml_dict = metric_to_yaml_dict(raw) + fqn = raw.get("fullyQualifiedName", "") + category = yaml_dict["category"] + metric_name = yaml_dict["name"] + + if not metric_name: + logger.warning(f"Skipping metric with empty name: {fqn}") + continue + + # Create category directory + cat_dir = metrics_dir / category + cat_dir.mkdir(parents=True, exist_ok=True) + + # Write individual metric YAML + file_path = cat_dir / f"{metric_name}.yml" + header = _yaml_header(catalog_url, fqn) + # Wrap in list to match MetricParser expected format + yaml_content = yaml.dump( + [yaml_dict], + default_flow_style=False, + allow_unicode=True, + sort_keys=False, + ) + file_path.write_text(header + "\n" + yaml_content, encoding="utf-8") + written_files.add(file_path) + + # Add to index + index_entries.append({ + "name": metric_name, + "display_name": yaml_dict["display_name"], + "category": category, + "grain": yaml_dict["grain"], + "file": f"{category}/{metric_name}.yml", + }) + + logger.debug(f"Wrote metric: {file_path}") + + except Exception as e: + logger.warning(f"Failed to export metric {raw.get('name', '?')}: {e}") + continue + + # Write metrics index + if index_entries: + _write_metrics_index(metrics_dir, index_entries, catalog_url) + written_files.add(metrics_dir / "metrics.yml") + + # Cleanup stale auto-generated metric files + _cleanup_stale_files(metrics_dir, written_files, "*.yml") + + logger.info(f"Exported {len(index_entries)} metrics to {metrics_dir}") + return len(index_entries) + + +def _write_metrics_index( + metrics_dir: Path, + entries: List[Dict[str, Any]], + catalog_url: str, +) -> None: + """ + Write metrics.yml summary index file. + + Args: + metrics_dir: Path to metrics directory + entries: List of index entry dicts (name, display_name, category, grain, file) + catalog_url: Catalog URL for header + """ + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + header_lines = [ + AUTO_GENERATED_MARKER, + f"# Index of all available metrics from OpenMetadata catalog ({catalog_url})", + f"# Generated: {timestamp}", + DO_NOT_EDIT_LINE, + "#", + "# Usage: Read specific metric file for full details before calculating.", + '# Example: cat server/docs/metrics/finance/m1.yml', + ] + + index_data = {"metrics": entries} + yaml_content = yaml.dump( + index_data, + default_flow_style=False, + allow_unicode=True, + sort_keys=False, + ) + + index_path = metrics_dir / "metrics.yml" + index_path.write_text("\n".join(header_lines) + "\n\n" + yaml_content, encoding="utf-8") + logger.debug(f"Wrote metrics index: {index_path}") + + +def export_tables( + client: OpenMetadataClient, + config: Config, + docs_dir: Path, + catalog_url: str, +) -> int: + """ + Export table metadata from OpenMetadata to YAML files. + + For each table defined in data_description.md: + 1. Derives the OpenMetadata FQN + 2. Fetches table metadata (columns, owners, tags, description) + 3. Transforms to YAML dict + 4. Writes to {docs_dir}/tables/{table_name}.yml + + Args: + client: Initialized OpenMetadata API client + config: Application config with table definitions + docs_dir: Base docs directory + catalog_url: Catalog URL for header comments + + Returns: + Number of tables exported + """ + tables_dir = docs_dir / "tables" + tables_dir.mkdir(parents=True, exist_ok=True) + + written_files: set[Path] = set() + count = 0 + + for table_config in config.tables: + try: + # Derive FQN: explicit override or auto-derive + fqn = table_config.catalog_fqn or f"bigquery.{table_config.id}" + + logger.debug(f"Fetching table metadata: {fqn}") + raw_table = client.get_table(fqn) + + yaml_dict = table_to_yaml_dict(raw_table) + + # Write table YAML + file_path = tables_dir / f"{table_config.name}.yml" + header = _yaml_header(catalog_url, fqn) + yaml_content = yaml.dump( + yaml_dict, + default_flow_style=False, + allow_unicode=True, + sort_keys=False, + ) + file_path.write_text(header + "\n" + yaml_content, encoding="utf-8") + written_files.add(file_path) + count += 1 + + logger.info(f"Exported table: {table_config.name} ({len(yaml_dict.get('columns', []))} columns)") + + except Exception as e: + logger.warning(f"Failed to export table {table_config.name}: {e}") + continue + + # Cleanup stale auto-generated table files + _cleanup_stale_files(tables_dir, written_files, "*.yml") + + logger.info(f"Exported {count} tables to {tables_dir}") + return count + + +def _cleanup_stale_files( + directory: Path, + current_files: set[Path], + glob_pattern: str, +) -> None: + """ + Remove auto-generated files that are no longer in the current export set. + + Only removes files with the AUTO-GENERATED marker in the first line. + Manual files (without the marker) are never touched. + + Args: + directory: Directory to scan + current_files: Set of file paths written in this export run + glob_pattern: Glob pattern to match (e.g., "*.yml") + """ + for file_path in directory.rglob(glob_pattern): + if file_path in current_files: + continue + if file_path.name == "metrics.yml" and file_path.parent == directory: + # Index file managed separately + continue + if _is_auto_generated(file_path): + logger.info(f"Removing stale auto-generated file: {file_path}") + file_path.unlink() + + # Remove empty parent directory (category dir) + parent = file_path.parent + if parent != directory and not any(parent.iterdir()): + parent.rmdir() + logger.debug(f"Removed empty directory: {parent}") + + +def _write_sync_state(docs_dir: Path, metrics_count: int, tables_count: int) -> None: + """ + Write sync state file for tracking last export. + + Args: + docs_dir: Base docs directory + metrics_count: Number of metrics exported + tables_count: Number of tables exported + """ + state = { + "last_export": datetime.now(timezone.utc).isoformat(), + "metrics_count": metrics_count, + "tables_count": tables_count, + } + state_path = docs_dir / ".catalog_sync_state.json" + state_path.write_text(json.dumps(state, indent=2), encoding="utf-8") + logger.debug(f"Wrote sync state: {state_path}") + + +def main() -> None: + """ + Main entry point for catalog export. + + Loads instance config, initializes OpenMetadata client, + exports metrics and tables to YAML, writes sync state. + Exits gracefully (code 0) on catalog unavailability. + """ + logger.info("=== Catalog Export starting ===") + + try: + instance_config = load_instance_config() + except Exception as e: + logger.error(f"Failed to load instance config: {e}") + return + + # Get OpenMetadata config + om_config = instance_config.get("openmetadata", {}) + catalog_url = om_config.get("url", "").strip() + token = om_config.get("token", "").strip() + + if not catalog_url or not token: + logger.warning("OpenMetadata not configured (url or token missing) - skipping export") + return + + docs_dir = _get_docs_dir() + logger.info(f"Output directory: {docs_dir}") + + # Initialize client + try: + client = OpenMetadataClient(base_url=catalog_url, token=token) + except Exception as e: + logger.warning(f"Failed to initialize OpenMetadata client: {e}") + return + + try: + # Export metrics + metrics_count = export_metrics(client, docs_dir, catalog_url) + + # Export tables + try: + config = Config() + tables_count = export_tables(client, config, docs_dir, catalog_url) + except Exception as e: + logger.warning(f"Table export skipped (config error): {e}") + tables_count = 0 + + # Write sync state + _write_sync_state(docs_dir, metrics_count, tables_count) + + logger.info( + f"=== Catalog Export complete: {metrics_count} metrics, {tables_count} tables ===" + ) + + except Exception as e: + logger.error(f"Catalog export failed: {e}") + + finally: + client.close() + + +if __name__ == "__main__": + main() diff --git a/tests/test_catalog_export.py b/tests/test_catalog_export.py new file mode 100644 index 0000000..af322a6 --- /dev/null +++ b/tests/test_catalog_export.py @@ -0,0 +1,439 @@ +""" +Tests for src/catalog_export.py + +Covers YAML header generation, auto-generated file detection, metric/table +export to YAML files, stale file cleanup, sync state writing, and docs dir +resolution. +""" + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional +from unittest.mock import MagicMock, patch + +import pytest +import yaml + +from src.catalog_export import ( + AUTO_GENERATED_MARKER, + _get_docs_dir, + _is_auto_generated, + _write_sync_state, + _yaml_header, + export_metrics, + export_tables, +) + + +# --------------------------------------------------------------------------- +# Helpers / fixtures +# --------------------------------------------------------------------------- + + +@dataclass +class FakeTableConfig: + """Minimal stand-in for src.config.TableConfig with attributes used by + export_tables().""" + + name: str + id: str + catalog_fqn: Optional[str] = None + + +def _make_raw_metric( + name: str = "Total Revenue", + fqn: str = "metrics.total_revenue", + category_tag: str = "MetricCategory.finance", + description: str = "Sum of order revenue", + expression: str = "SUM(grs_revenue_plan_local)", +) -> Dict[str, Any]: + """Build a realistic raw metric dict as returned by the catalog API.""" + return { + "id": f"id-{name}", + "name": name, + "fullyQualifiedName": fqn, + "displayName": name, + "description": description, + "metricExpression": {"expression": expression}, + "metricType": "sum", + "unitOfMeasurement": "USD", + "granularity": "monthly", + "tags": [{"tagFQN": category_tag}], + "owners": [{"name": "Data Team"}], + } + + +def _make_raw_table( + name: str = "order_economics", + fqn: str = "bigquery.project.dataset.order_economics", +) -> Dict[str, Any]: + """Build a realistic raw table dict as returned by the catalog API.""" + return { + "id": "tbl-1", + "name": name, + "fullyQualifiedName": fqn, + "description": "Order-level economics table", + "columns": [ + {"name": "order_id", "dataType": "STRING", "description": "PK"}, + {"name": "grs_revenue", "dataType": "FLOAT", "description": "Revenue"}, + ], + "tags": [{"tagFQN": "Tier.Tier1"}], + "owners": [{"name": "Finance"}], + } + + +@pytest.fixture +def mock_client(): + """Return a MagicMock that behaves like OpenMetadataClient.""" + client = MagicMock() + client.get_metrics.return_value = [_make_raw_metric()] + client.get_table.return_value = _make_raw_table() + return client + + +@pytest.fixture +def mock_config(): + """Return a mock Config with a single table.""" + cfg = MagicMock() + cfg.tables = [ + FakeTableConfig( + name="order_economics", + id="prj.dataset.order_economics", + catalog_fqn="bigquery.prj.dataset.order_economics", + ) + ] + return cfg + + +CATALOG_URL = "https://catalog.example.com" + + +# --------------------------------------------------------------------------- +# 1. _yaml_header +# --------------------------------------------------------------------------- + + +class TestYamlHeader: + def test_yaml_header_contains_marker(self): + """_yaml_header() output starts with AUTO-GENERATED marker.""" + header = _yaml_header(CATALOG_URL) + assert header.startswith(AUTO_GENERATED_MARKER) + + def test_yaml_header_contains_url(self): + """Header includes catalog URL.""" + header = _yaml_header(CATALOG_URL) + assert CATALOG_URL in header + + def test_yaml_header_contains_fqn(self): + """Header includes FQN when provided.""" + fqn = "metrics.total_revenue" + header = _yaml_header(CATALOG_URL, fqn=fqn) + assert f"# FQN: {fqn}" in header + + def test_yaml_header_no_fqn_line_when_empty(self): + """Header omits FQN line when fqn argument is empty.""" + header = _yaml_header(CATALOG_URL, fqn="") + assert "# FQN:" not in header + + +# --------------------------------------------------------------------------- +# 2. _is_auto_generated +# --------------------------------------------------------------------------- + + +class TestIsAutoGenerated: + def test_is_auto_generated_true(self, tmp_path: Path): + """File starting with AUTO-GENERATED marker returns True.""" + f = tmp_path / "metric.yml" + f.write_text(AUTO_GENERATED_MARKER + "\nsome: data\n") + assert _is_auto_generated(f) is True + + def test_is_auto_generated_false(self, tmp_path: Path): + """File without marker returns False.""" + f = tmp_path / "manual.yml" + f.write_text("# Manually written metric\nname: custom\n") + assert _is_auto_generated(f) is False + + def test_is_auto_generated_missing_file(self, tmp_path: Path): + """Non-existent file returns False.""" + f = tmp_path / "does_not_exist.yml" + assert _is_auto_generated(f) is False + + +# --------------------------------------------------------------------------- +# 3. export_metrics +# --------------------------------------------------------------------------- + + +class TestExportMetrics: + def test_export_metrics_writes_files(self, tmp_path: Path, mock_client): + """Creates category dirs and .yml files for each metric.""" + docs = tmp_path / "docs" + count = export_metrics(mock_client, docs, CATALOG_URL) + + assert count == 1 + # Category directory should exist + category_dir = docs / "metrics" / "finance" + assert category_dir.is_dir() + + # Metric file should exist + metric_files = list(category_dir.glob("*.yml")) + assert len(metric_files) == 1 + assert metric_files[0].name == "total_revenue.yml" + + def test_export_metrics_writes_index(self, tmp_path: Path, mock_client): + """Creates metrics.yml index with correct structure.""" + docs = tmp_path / "docs" + export_metrics(mock_client, docs, CATALOG_URL) + + index_path = docs / "metrics" / "metrics.yml" + assert index_path.exists() + + # Skip header comments, parse YAML body + content = index_path.read_text() + # Remove all comment lines to parse pure YAML + yaml_lines = [ + line for line in content.splitlines() if not line.startswith("#") + ] + parsed = yaml.safe_load("\n".join(yaml_lines)) + + assert "metrics" in parsed + assert len(parsed["metrics"]) == 1 + entry = parsed["metrics"][0] + assert entry["name"] == "total_revenue" + assert entry["category"] == "finance" + assert "file" in entry + + def test_export_metrics_yaml_parseable(self, tmp_path: Path, mock_client): + """Output metric YAML is valid and parseable by yaml.safe_load.""" + docs = tmp_path / "docs" + export_metrics(mock_client, docs, CATALOG_URL) + + metric_file = docs / "metrics" / "finance" / "total_revenue.yml" + content = metric_file.read_text() + + # Strip header comments before parsing + yaml_lines = [ + line for line in content.splitlines() if not line.startswith("#") + ] + parsed = yaml.safe_load("\n".join(yaml_lines)) + + assert isinstance(parsed, list) + assert len(parsed) == 1 + assert parsed[0]["name"] == "total_revenue" + assert parsed[0]["expression"] == "SUM(grs_revenue_plan_local)" + + def test_export_metrics_preserves_manual_files( + self, tmp_path: Path, mock_client + ): + """Files without AUTO-GENERATED marker are never deleted.""" + docs = tmp_path / "docs" + metrics_dir = docs / "metrics" / "custom" + metrics_dir.mkdir(parents=True) + + manual = metrics_dir / "hand_crafted.yml" + manual.write_text("# My custom metric\nname: hand_crafted\n") + + export_metrics(mock_client, docs, CATALOG_URL) + + # Manual file must survive + assert manual.exists() + assert manual.read_text().startswith("# My custom metric") + + def test_export_metrics_cleans_stale(self, tmp_path: Path, mock_client): + """Old auto-generated files removed when metric no longer in catalog.""" + docs = tmp_path / "docs" + stale_dir = docs / "metrics" / "old_category" + stale_dir.mkdir(parents=True) + + stale = stale_dir / "gone_metric.yml" + stale.write_text(AUTO_GENERATED_MARKER + "\nname: gone\n") + + export_metrics(mock_client, docs, CATALOG_URL) + + # Stale auto-generated file should be removed + assert not stale.exists() + + def test_export_metrics_zero_results_preserves( + self, tmp_path: Path, mock_client + ): + """When API returns 0 metrics, existing files are untouched.""" + mock_client.get_metrics.return_value = [] + + docs = tmp_path / "docs" + metrics_dir = docs / "metrics" / "finance" + metrics_dir.mkdir(parents=True) + + existing = metrics_dir / "existing.yml" + existing.write_text(AUTO_GENERATED_MARKER + "\nname: existing\n") + + count = export_metrics(mock_client, docs, CATALOG_URL) + + assert count == 0 + # Existing file untouched (early return before cleanup) + assert existing.exists() + + def test_export_metrics_multiple(self, tmp_path: Path, mock_client): + """Multiple metrics across categories are all exported.""" + mock_client.get_metrics.return_value = [ + _make_raw_metric( + name="Total Revenue", + fqn="metrics.total_revenue", + category_tag="MetricCategory.finance", + ), + _make_raw_metric( + name="Active Users", + fqn="metrics.active_users", + category_tag="MetricCategory.product", + expression="COUNT(DISTINCT user_id)", + ), + ] + + docs = tmp_path / "docs" + count = export_metrics(mock_client, docs, CATALOG_URL) + + assert count == 2 + assert (docs / "metrics" / "finance" / "total_revenue.yml").exists() + assert (docs / "metrics" / "product" / "active_users.yml").exists() + + +# --------------------------------------------------------------------------- +# 4. export_tables +# --------------------------------------------------------------------------- + + +class TestExportTables: + def test_export_tables_writes_files( + self, tmp_path: Path, mock_client, mock_config + ): + """Creates table YAML with columns.""" + docs = tmp_path / "docs" + count = export_tables(mock_client, mock_config, docs, CATALOG_URL) + + assert count == 1 + + table_file = docs / "tables" / "order_economics.yml" + assert table_file.exists() + + # Parse YAML content (skip header comments) + content = table_file.read_text() + yaml_lines = [ + line for line in content.splitlines() if not line.startswith("#") + ] + parsed = yaml.safe_load("\n".join(yaml_lines)) + + assert parsed["name"] == "order_economics" + assert len(parsed["columns"]) == 2 + assert parsed["columns"][0]["name"] == "order_id" + + def test_export_tables_handles_api_error( + self, tmp_path: Path, mock_client, mock_config + ): + """Continues on per-table errors, exports remaining tables.""" + # Two tables: first will fail, second succeeds + mock_config.tables = [ + FakeTableConfig( + name="broken_table", + id="prj.dataset.broken", + catalog_fqn="bigquery.prj.dataset.broken", + ), + FakeTableConfig( + name="good_table", + id="prj.dataset.good", + catalog_fqn="bigquery.prj.dataset.good", + ), + ] + + def side_effect(fqn): + if "broken" in fqn: + raise RuntimeError("API unreachable") + return _make_raw_table(name="good_table", fqn=fqn) + + mock_client.get_table.side_effect = side_effect + + docs = tmp_path / "docs" + count = export_tables(mock_client, mock_config, docs, CATALOG_URL) + + # Only the good table should succeed + assert count == 1 + assert not (docs / "tables" / "broken_table.yml").exists() + assert (docs / "tables" / "good_table.yml").exists() + + def test_export_tables_uses_catalog_fqn( + self, tmp_path: Path, mock_client, mock_config + ): + """Uses explicit catalog_fqn when set on table config.""" + docs = tmp_path / "docs" + export_tables(mock_client, mock_config, docs, CATALOG_URL) + + mock_client.get_table.assert_called_once_with( + "bigquery.prj.dataset.order_economics" + ) + + def test_export_tables_derives_fqn_from_id( + self, tmp_path: Path, mock_client + ): + """When catalog_fqn is None, derives FQN as 'bigquery.{id}'.""" + cfg = MagicMock() + cfg.tables = [ + FakeTableConfig( + name="my_table", + id="project.dataset.my_table", + catalog_fqn=None, + ) + ] + + docs = tmp_path / "docs" + export_tables(mock_client, cfg, docs, CATALOG_URL) + + mock_client.get_table.assert_called_once_with( + "bigquery.project.dataset.my_table" + ) + + +# --------------------------------------------------------------------------- +# 5. _write_sync_state +# --------------------------------------------------------------------------- + + +class TestWriteSyncState: + def test_write_sync_state(self, tmp_path: Path): + """Writes .catalog_sync_state.json with counts and timestamp.""" + docs = tmp_path / "docs" + docs.mkdir() + + _write_sync_state(docs, metrics_count=5, tables_count=2) + + state_path = docs / ".catalog_sync_state.json" + assert state_path.exists() + + state = json.loads(state_path.read_text()) + assert state["metrics_count"] == 5 + assert state["tables_count"] == 2 + assert "last_export" in state + + # Timestamp should be ISO format + from datetime import datetime + + datetime.fromisoformat(state["last_export"]) # raises on bad format + + +# --------------------------------------------------------------------------- +# 6. _get_docs_dir +# --------------------------------------------------------------------------- + + +class TestGetDocsDir: + def test_get_docs_dir_from_env(self, monkeypatch): + """DATA_DIR env var is used to derive docs directory.""" + monkeypatch.setenv("DATA_DIR", "/data/src_data") + result = _get_docs_dir() + assert result == Path("/data/docs") + + def test_get_docs_dir_default(self, monkeypatch): + """Defaults to ./data/../docs when DATA_DIR is not set.""" + monkeypatch.delenv("DATA_DIR", raising=False) + result = _get_docs_dir() + # ./data -> parent is "." -> docs is "./docs" + assert result == Path("docs") diff --git a/tests/test_openmetadata_enricher.py b/tests/test_openmetadata_enricher.py index e3055ac..66ec8b9 100644 --- a/tests/test_openmetadata_enricher.py +++ b/tests/test_openmetadata_enricher.py @@ -235,7 +235,8 @@ def test_derive_fqn_explicit_override(): def test_parse_table_response(sample_om_response): """Test parsing OpenMetadata table response.""" - with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + with patch("connectors.openmetadata.enricher.OpenMetadataClient") as mock_client_cls: + mock_client_cls.return_value.base_url = "https://catalog.example.com" enricher = CatalogEnricher( { "openmetadata": { diff --git a/tests/test_openmetadata_transformer.py b/tests/test_openmetadata_transformer.py new file mode 100644 index 0000000..15111e8 --- /dev/null +++ b/tests/test_openmetadata_transformer.py @@ -0,0 +1,783 @@ +""" +Tests for OpenMetadata transformer. + +All transformer functions are pure (dict in -> dict/str/list out), so no mocks needed. +""" + +import pytest + +from connectors.openmetadata.transformer import ( + extract_category, + extract_dimensions, + extract_expression, + extract_grain, + extract_metric_type, + extract_owners, + extract_tag_names, + extract_unit, + metric_to_detail_dict, + metric_to_display_dict, + metric_to_yaml_dict, + sanitize_filename, + table_to_yaml_dict, +) + + +# --------------------------------------------------------------------------- +# Helper: build a tag dict the way OpenMetadata returns them +# --------------------------------------------------------------------------- + +def _tag(fqn: str, name: str = "") -> dict: + """Build a minimal OpenMetadata tag dict.""" + tag = {"tagFQN": fqn} + if name: + tag["name"] = name + return tag + + +# =========================================================================== +# extract_category +# =========================================================================== + +class TestExtractCategory: + def test_extract_category_from_metric_category_tag(self): + """MetricCategory.finance tag -> 'finance'.""" + tags = [_tag("MetricCategory.finance")] + assert extract_category(tags) == "finance" + + def test_extract_category_from_category_tag(self): + """Category.marketing tag -> 'marketing'.""" + tags = [_tag("Category.marketing")] + assert extract_category(tags) == "marketing" + + def test_extract_category_default(self): + """No matching tags -> 'general'.""" + tags = [_tag("SomeOther.tag"), _tag("Tier.Tier1")] + assert extract_category(tags) == "general" + + def test_extract_category_empty_tags(self): + """Empty tag list -> 'general'.""" + assert extract_category([]) == "general" + + def test_extract_category_metric_category_takes_priority(self): + """MetricCategory.* is checked before Category.* (iteration order).""" + tags = [_tag("MetricCategory.finance"), _tag("Category.marketing")] + assert extract_category(tags) == "finance" + + def test_extract_category_category_fallback_if_no_metric_category(self): + """Category.* is used when MetricCategory.* is absent.""" + tags = [_tag("Tier.Tier1"), _tag("Category.operations")] + assert extract_category(tags) == "operations" + + def test_extract_category_with_nested_dot_in_value(self): + """MetricCategory.sub.area -> 'sub.area' (split on first dot only).""" + tags = [_tag("MetricCategory.sub.area")] + assert extract_category(tags) == "sub.area" + + def test_extract_category_missing_tagfqn_key(self): + """Tag dict without tagFQN key is safely skipped.""" + tags = [{"name": "orphan"}] + assert extract_category(tags) == "general" + + +# =========================================================================== +# extract_grain +# =========================================================================== + +class TestExtractGrain: + def test_extract_grain_from_field(self): + """granularity field takes priority over tags.""" + raw = { + "granularity": "Daily", + "tags": [_tag("Grain.monthly")], + } + assert extract_grain(raw) == "daily" + + def test_extract_grain_from_tag(self): + """Grain.monthly tag used when granularity field is absent.""" + raw = {"tags": [_tag("Grain.monthly")]} + assert extract_grain(raw) == "monthly" + + def test_extract_grain_empty(self): + """No grain info -> empty string.""" + raw = {"tags": [_tag("Category.finance")]} + assert extract_grain(raw) == "" + + def test_extract_grain_no_tags_no_field(self): + """Completely empty metric -> empty string.""" + assert extract_grain({}) == "" + + def test_extract_grain_field_is_none(self): + """granularity=None should fall through to tags.""" + raw = {"granularity": None, "tags": [_tag("Grain.weekly")]} + assert extract_grain(raw) == "weekly" + + def test_extract_grain_field_is_empty_string(self): + """granularity='' should fall through to tags.""" + raw = {"granularity": "", "tags": [_tag("Grain.yearly")]} + assert extract_grain(raw) == "yearly" + + def test_extract_grain_tag_lowercased(self): + """Grain tag value is lowercased.""" + raw = {"tags": [_tag("Grain.QUARTERLY")]} + assert extract_grain(raw) == "quarterly" + + +# =========================================================================== +# extract_dimensions +# =========================================================================== + +class TestExtractDimensions: + def test_extract_dimensions(self): + """Multiple Dimension.* tags -> list of dimension names.""" + tags = [ + _tag("Dimension.economic_area"), + _tag("Dimension.country"), + _tag("Category.finance"), + ] + result = extract_dimensions(tags) + assert result == ["economic_area", "country"] + + def test_extract_dimensions_empty(self): + """No Dimension tags -> empty list.""" + tags = [_tag("Category.finance"), _tag("Tier.Tier1")] + assert extract_dimensions(tags) == [] + + def test_extract_dimensions_empty_list(self): + """Empty tag list -> empty list.""" + assert extract_dimensions([]) == [] + + def test_extract_dimensions_preserves_order(self): + """Dimensions are returned in tag order.""" + tags = [_tag("Dimension.z_last"), _tag("Dimension.a_first")] + assert extract_dimensions(tags) == ["z_last", "a_first"] + + +# =========================================================================== +# extract_expression +# =========================================================================== + +class TestExtractExpression: + def test_extract_expression_dict(self): + """metricExpression as dict with 'expression' key.""" + raw = {"metricExpression": {"expression": "SUM(revenue_usd)"}} + assert extract_expression(raw) == "SUM(revenue_usd)" + + def test_extract_expression_string(self): + """metricExpression as plain string.""" + raw = {"metricExpression": "COUNT(DISTINCT order_id)"} + assert extract_expression(raw) == "COUNT(DISTINCT order_id)" + + def test_extract_expression_empty(self): + """No metricExpression -> empty string.""" + raw = {"name": "some_metric"} + assert extract_expression(raw) == "" + + def test_extract_expression_dict_missing_key(self): + """Dict without 'expression' key -> empty string.""" + raw = {"metricExpression": {"formula": "x + y"}} + assert extract_expression(raw) == "" + + def test_extract_expression_dict_none_value(self): + """Dict with expression=None -> empty string.""" + raw = {"metricExpression": {"expression": None}} + assert extract_expression(raw) == "" + + def test_extract_expression_none(self): + """metricExpression=None -> empty string (default {} from .get()).""" + raw = {"metricExpression": None} + # None is not dict and not str, so returns "" + assert extract_expression(raw) == "" + + def test_extract_expression_empty_dict(self): + """metricExpression={} -> empty string.""" + raw = {"metricExpression": {}} + assert extract_expression(raw) == "" + + +# =========================================================================== +# extract_owners +# =========================================================================== + +class TestExtractOwners: + def test_extract_owners(self): + """owners list with name/displayName.""" + raw = { + "owners": [ + {"name": "alice", "displayName": "Alice Smith"}, + {"name": "bob"}, + ] + } + assert extract_owners(raw) == ["alice", "bob"] + + def test_extract_owners_display_name_fallback(self): + """displayName is used when name is absent.""" + raw = { + "owners": [ + {"displayName": "Charlie Brown"}, + ] + } + assert extract_owners(raw) == ["Charlie Brown"] + + def test_extract_owners_empty(self): + """No owners key -> empty list.""" + raw = {"name": "something"} + assert extract_owners(raw) == [] + + def test_extract_owners_empty_list(self): + """Empty owners list -> empty list.""" + raw = {"owners": []} + assert extract_owners(raw) == [] + + def test_extract_owners_skips_empty_names(self): + """Owners with no name or displayName are skipped.""" + raw = { + "owners": [ + {"email": "no-name@example.com"}, + {"name": "", "displayName": ""}, + {"name": "valid_user"}, + ] + } + assert extract_owners(raw) == ["valid_user"] + + def test_extract_owners_name_none_falls_to_display_name(self): + """name=None falls back to displayName.""" + raw = { + "owners": [{"name": None, "displayName": "Fallback Name"}] + } + assert extract_owners(raw) == ["Fallback Name"] + + +# =========================================================================== +# extract_metric_type +# =========================================================================== + +class TestExtractMetricType: + def test_extract_metric_type_from_field(self): + """metricType field takes priority.""" + raw = { + "metricType": "SUM", + "tags": [_tag("MetricType.count")], + } + assert extract_metric_type(raw) == "sum" + + def test_extract_metric_type_from_tag(self): + """MetricType.* tag used when field is absent.""" + raw = {"tags": [_tag("MetricType.ratio")]} + assert extract_metric_type(raw) == "ratio" + + def test_extract_metric_type_empty(self): + """No metric type info -> empty string.""" + raw = {"tags": [_tag("Category.finance")]} + assert extract_metric_type(raw) == "" + + def test_extract_metric_type_field_none(self): + """metricType=None falls through to tags.""" + raw = {"metricType": None, "tags": [_tag("MetricType.average")]} + assert extract_metric_type(raw) == "average" + + def test_extract_metric_type_lowercased(self): + """Metric type from field is lowercased.""" + raw = {"metricType": "COUNT", "tags": []} + assert extract_metric_type(raw) == "count" + + def test_extract_metric_type_tag_lowercased(self): + """Metric type from tag is lowercased.""" + raw = {"tags": [_tag("MetricType.PERCENTAGE")]} + assert extract_metric_type(raw) == "percentage" + + +# =========================================================================== +# extract_unit +# =========================================================================== + +class TestExtractUnit: + def test_extract_unit_from_field(self): + """unitOfMeasurement field takes priority.""" + raw = { + "unitOfMeasurement": "USD", + "tags": [_tag("Unit.EUR")], + } + assert extract_unit(raw) == "USD" + + def test_extract_unit_from_tag(self): + """Unit.* tag used when field is absent.""" + raw = {"tags": [_tag("Unit.count")]} + assert extract_unit(raw) == "count" + + def test_extract_unit_empty(self): + """No unit info -> empty string.""" + raw = {"tags": [_tag("Category.finance")]} + assert extract_unit(raw) == "" + + def test_extract_unit_field_none(self): + """unitOfMeasurement=None falls through to tags.""" + raw = {"unitOfMeasurement": None, "tags": [_tag("Unit.percent")]} + assert extract_unit(raw) == "percent" + + def test_extract_unit_field_empty_string(self): + """unitOfMeasurement='' falls through to tags.""" + raw = {"unitOfMeasurement": "", "tags": [_tag("Unit.GBP")]} + assert extract_unit(raw) == "GBP" + + def test_extract_unit_preserves_case(self): + """Unit value from field is NOT lowercased (unlike metric_type).""" + raw = {"unitOfMeasurement": "USD", "tags": []} + assert extract_unit(raw) == "USD" + + +# =========================================================================== +# extract_tag_names +# =========================================================================== + +class TestExtractTagNames: + def test_extract_tag_names_with_name_field(self): + """Tags with 'name' field use that value.""" + tags = [ + {"name": "finance", "tagFQN": "Category.finance"}, + {"name": "Tier1", "tagFQN": "Tier.Tier1"}, + ] + assert extract_tag_names(tags) == ["finance", "Tier1"] + + def test_extract_tag_names_from_fqn(self): + """Tags without 'name' extract last segment of tagFQN.""" + tags = [ + {"tagFQN": "Category.finance"}, + {"tagFQN": "Tier.Tier1"}, + ] + assert extract_tag_names(tags) == ["finance", "Tier1"] + + def test_extract_tag_names_empty(self): + """Empty tag list -> empty list.""" + assert extract_tag_names([]) == [] + + def test_extract_tag_names_mixed(self): + """Mix of tags with and without 'name' field.""" + tags = [ + {"name": "explicit_name", "tagFQN": "Category.something_else"}, + {"tagFQN": "Dimension.country"}, + ] + result = extract_tag_names(tags) + assert result == ["explicit_name", "country"] + + def test_extract_tag_names_no_name_no_fqn(self): + """Tag without name or tagFQN is skipped (empty string).""" + tags = [{"description": "orphan tag"}] + # tagFQN defaults to "" -> split(".")[-1] is "" -> falsy, skipped + assert extract_tag_names(tags) == [] + + +# =========================================================================== +# sanitize_filename +# =========================================================================== + +class TestSanitizeFilename: + def test_sanitize_filename(self): + """Spaces and mixed case -> underscores and lowercase.""" + assert sanitize_filename("M1 Operational Margin") == "m1_operational_margin" + + def test_sanitize_filename_special_chars(self): + """Special characters replaced with underscores.""" + assert sanitize_filename("Revenue (USD) - Net") == "revenue_usd_net" + + def test_sanitize_filename_multiple_underscores_collapsed(self): + """Consecutive underscores are collapsed.""" + assert sanitize_filename("foo---bar___baz") == "foo_bar_baz" + + def test_sanitize_filename_leading_trailing_stripped(self): + """Leading and trailing underscores are stripped.""" + assert sanitize_filename("__hello__") == "hello" + + def test_sanitize_filename_already_clean(self): + """Already clean name passes through unchanged.""" + assert sanitize_filename("total_revenue") == "total_revenue" + + def test_sanitize_filename_numbers(self): + """Numbers are preserved.""" + assert sanitize_filename("M1+VFM Margin 2024") == "m1_vfm_margin_2024" + + def test_sanitize_filename_empty_string(self): + """Empty string -> empty string.""" + assert sanitize_filename("") == "" + + def test_sanitize_filename_only_special_chars(self): + """String of only special chars -> empty string.""" + assert sanitize_filename("@#$%") == "" + + +# =========================================================================== +# metric_to_yaml_dict +# =========================================================================== + +class TestMetricToYamlDict: + def test_metric_to_yaml_dict(self): + """Full transformation with all fields populated.""" + raw = { + "name": "M1 Operational Margin", + "displayName": "M1 Operational Margin", + "fullyQualifiedName": "catalog.metrics.m1_margin", + "description": " Gross margin after operational costs ", + "granularity": "Monthly", + "metricType": "ratio", + "unitOfMeasurement": "USD", + "metricExpression": {"expression": "SUM(m1_margin_usd)"}, + "tags": [ + _tag("MetricCategory.finance"), + _tag("Dimension.economic_area"), + _tag("Dimension.country"), + ], + "owners": [ + {"name": "alice", "displayName": "Alice Smith"}, + ], + } + result = metric_to_yaml_dict(raw) + + assert result["name"] == "m1_operational_margin" + assert result["display_name"] == "M1 Operational Margin" + assert result["category"] == "finance" + assert result["type"] == "ratio" + assert result["unit"] == "USD" + assert result["grain"] == "monthly" + assert result["time_column"] == "" + assert result["table"] == "" + assert result["expression"] == "SUM(m1_margin_usd)" + assert result["description"] == "Gross margin after operational costs" + assert result["dimensions"] == ["economic_area", "country"] + assert result["synonyms"] == [] + # Notes contain FQN and owner info + assert any("catalog.metrics.m1_margin" in n for n in result["notes"]) + assert any("alice" in n for n in result["notes"]) + + def test_metric_to_yaml_dict_minimal(self): + """Minimal metric with empty/missing fields.""" + raw = {"name": "simple"} + result = metric_to_yaml_dict(raw) + + assert result["name"] == "simple" + assert result["display_name"] == "simple" + assert result["category"] == "general" + assert result["type"] == "" + assert result["unit"] == "" + assert result["grain"] == "" + assert result["expression"] == "" + assert result["description"] == "" + assert result["dimensions"] == [] + assert result["synonyms"] == [] + # No FQN -> no source note; no owners -> no owners note + assert result["notes"] == [] + + def test_metric_to_yaml_dict_notes_with_fqn_only(self): + """Notes include FQN source but no owners when owners missing.""" + raw = { + "name": "test", + "fullyQualifiedName": "catalog.test", + } + result = metric_to_yaml_dict(raw) + assert len(result["notes"]) == 1 + assert "FQN: catalog.test" in result["notes"][0] + + def test_metric_to_yaml_dict_description_stripped(self): + """Description whitespace is stripped.""" + raw = { + "name": "test", + "description": "\n Some description with spaces \n", + } + result = metric_to_yaml_dict(raw) + assert result["description"] == "Some description with spaces" + + def test_metric_to_yaml_dict_description_none(self): + """description=None -> empty string.""" + raw = {"name": "test", "description": None} + result = metric_to_yaml_dict(raw) + assert result["description"] == "" + + +# =========================================================================== +# metric_to_display_dict +# =========================================================================== + +class TestMetricToDisplayDict: + def test_metric_to_display_dict(self): + """Full display dict with all fields.""" + raw = { + "name": "total_revenue", + "displayName": "Total Revenue", + "fullyQualifiedName": "catalog.metrics.total_revenue", + "description": "Total revenue in USD", + "granularity": "Daily", + "tags": [_tag("MetricCategory.finance")], + } + result = metric_to_display_dict(raw) + + assert result["name"] == "total_revenue" + assert result["display_name"] == "Total Revenue" + assert result["description"] == "Total revenue in USD" + assert result["grain"] == "daily" + assert result["category"] == "finance" + assert result["path"] == "catalog:catalog.metrics.total_revenue" + + def test_metric_to_display_dict_minimal(self): + """Minimal metric produces valid display dict.""" + raw = {"name": "bare"} + result = metric_to_display_dict(raw) + + assert result["name"] == "bare" + assert result["display_name"] == "bare" + assert result["description"] == "" + assert result["grain"] == "" + assert result["category"] == "general" + assert result["path"] == "catalog:" + + def test_metric_to_display_dict_display_name_fallback(self): + """displayName falls back to name when absent.""" + raw = {"name": "revenue_net"} + assert metric_to_display_dict(raw)["display_name"] == "revenue_net" + + def test_metric_to_display_dict_description_none(self): + """description=None -> empty string.""" + raw = {"name": "test", "description": None} + assert metric_to_display_dict(raw)["description"] == "" + + +# =========================================================================== +# metric_to_detail_dict +# =========================================================================== + +class TestMetricToDetailDict: + def _full_raw_metric(self) -> dict: + """Build a fully-populated raw metric for reuse.""" + return { + "name": "m1_margin", + "displayName": "M1 Margin", + "fullyQualifiedName": "catalog.metrics.m1_margin", + "description": "M1 operational margin in USD", + "granularity": "Monthly", + "metricType": "ratio", + "unitOfMeasurement": "USD", + "metricExpression": {"expression": "SUM(m1_margin_usd)"}, + "tags": [ + _tag("MetricCategory.finance"), + _tag("Dimension.economic_area"), + _tag("Dimension.country"), + ], + } + + def test_metric_to_detail_dict(self): + """Full detail dict with all sections populated.""" + raw = self._full_raw_metric() + result = metric_to_detail_dict(raw) + + assert result["name"] == "m1_margin" + assert result["display_name"] == "M1 Margin" + assert result["category"] == "finance" + # Default color when no category_colors provided + assert result["category_color"] == "#6B7280" + + # metadata section + assert result["metadata"]["type"] == "ratio" + assert result["metadata"]["unit"] == "USD" + assert result["metadata"]["grain"] == "monthly" + assert result["metadata"]["time_column"] == "" + + # overview section + assert result["overview"]["description"] == "M1 operational margin in USD" + assert result["overview"]["key_insights"] == [] + + # dimensions + assert result["dimensions"] == ["economic_area", "country"] + + # sql_examples (expression present) + assert "expression" in result["sql_examples"] + assert result["sql_examples"]["expression"]["query"] == "SUM(m1_margin_usd)" + assert result["sql_examples"]["expression"]["title"] == "Metric Expression" + assert result["sql_examples"]["expression"]["complexity"] == "simple" + + # technical + assert result["technical"]["expression"] == "SUM(m1_margin_usd)" + assert result["technical"]["table"] == "" + assert result["technical"]["synonyms"] == [] + assert result["technical"]["data_sources"] == [] + + # other sections + assert result["validation"] is None + assert result["notes"] == {"all": [], "key_insights": []} + assert result["special_sections"] == {} + + def test_metric_to_detail_dict_with_colors(self): + """category_colors parameter maps category to color.""" + raw = self._full_raw_metric() + colors = {"finance": "#10B981", "marketing": "#F59E0B"} + result = metric_to_detail_dict(raw, category_colors=colors) + + assert result["category_color"] == "#10B981" + + def test_metric_to_detail_dict_color_fallback(self): + """Unknown category falls back to default gray.""" + raw = self._full_raw_metric() + colors = {"marketing": "#F59E0B"} + result = metric_to_detail_dict(raw, category_colors=colors) + + assert result["category_color"] == "#6B7280" + + def test_metric_to_detail_dict_no_expression(self): + """sql_examples is empty dict when no expression.""" + raw = {"name": "test", "tags": []} + result = metric_to_detail_dict(raw) + + assert result["sql_examples"] == {} + assert result["technical"]["expression"] == "" + + def test_metric_to_detail_dict_minimal(self): + """Minimal metric produces valid detail dict with all sections.""" + raw = {"name": "bare"} + result = metric_to_detail_dict(raw) + + assert result["name"] == "bare" + assert result["display_name"] == "bare" + assert result["category"] == "general" + assert result["category_color"] == "#6B7280" + assert result["metadata"]["type"] == "" + assert result["metadata"]["unit"] == "" + assert result["metadata"]["grain"] == "" + assert result["overview"]["description"] == "" + assert result["dimensions"] == [] + assert result["sql_examples"] == {} + + def test_metric_to_detail_dict_description_stripped(self): + """Description whitespace is stripped in detail dict.""" + raw = { + "name": "test", + "description": " leading and trailing spaces ", + "tags": [], + } + result = metric_to_detail_dict(raw) + assert result["overview"]["description"] == "leading and trailing spaces" + + +# =========================================================================== +# table_to_yaml_dict +# =========================================================================== + +class TestTableToYamlDict: + def test_table_to_yaml_dict(self): + """Full table with columns, owners, tags, tier.""" + raw = { + "name": "order_economics", + "fullyQualifiedName": "bigquery.prj.dataset.order_economics", + "description": " Order-level economics data ", + "columns": [ + { + "name": "order_id", + "dataType": "STRING", + "description": "Unique order identifier", + }, + { + "name": "revenue_usd", + "dataType": "FLOAT64", + "description": " Revenue in USD ", + }, + { + "name": "created_at", + "dataType": "TIMESTAMP", + "description": None, + }, + ], + "tags": [ + {"name": "FoundryAI", "tagFQN": "AIAgent.FoundryAI"}, + {"tagFQN": "Tier.Tier1"}, + ], + "owners": [ + {"name": "data_team", "displayName": "Data Team"}, + ], + } + result = table_to_yaml_dict(raw) + + assert result["name"] == "order_economics" + assert result["fqn"] == "bigquery.prj.dataset.order_economics" + assert result["description"] == "Order-level economics data" + assert result["owners"] == ["data_team"] + assert result["tags"] == ["FoundryAI", "Tier1"] + assert result["tier"] == "Tier1" + + # Columns + assert len(result["columns"]) == 3 + assert result["columns"][0] == { + "name": "order_id", + "type": "STRING", + "description": "Unique order identifier", + } + assert result["columns"][1] == { + "name": "revenue_usd", + "type": "FLOAT64", + "description": "Revenue in USD", + } + # description=None -> empty string after strip + assert result["columns"][2]["description"] == "" + + def test_table_to_yaml_dict_minimal(self): + """Minimal table with empty fields.""" + raw = {"name": "empty_table"} + result = table_to_yaml_dict(raw) + + assert result["name"] == "empty_table" + assert result["fqn"] == "" + assert result["description"] == "" + assert result["owners"] == [] + assert result["tags"] == [] + assert result["tier"] == "" + assert result["columns"] == [] + + def test_table_to_yaml_dict_tier_from_extension(self): + """Tier extracted from extension field (priority over tags).""" + raw = { + "name": "test", + "extension": {"tier": "Gold"}, + "tags": [{"tagFQN": "Tier.Silver"}], + } + result = table_to_yaml_dict(raw) + assert result["tier"] == "Gold" + + def test_table_to_yaml_dict_tier_from_extension_capital(self): + """Tier extracted from extension with capital 'Tier' key.""" + raw = { + "name": "test", + "extension": {"Tier": "Platinum"}, + "tags": [], + } + result = table_to_yaml_dict(raw) + assert result["tier"] == "Platinum" + + def test_table_to_yaml_dict_tier_from_tag_fallback(self): + """Tier from tag when extension is absent.""" + raw = { + "name": "test", + "tags": [{"tagFQN": "Tier.Tier2"}], + } + result = table_to_yaml_dict(raw) + assert result["tier"] == "Tier2" + + def test_table_to_yaml_dict_no_tier(self): + """No tier info -> empty string.""" + raw = { + "name": "test", + "tags": [{"tagFQN": "Category.finance"}], + } + result = table_to_yaml_dict(raw) + assert result["tier"] == "" + + def test_table_to_yaml_dict_column_missing_fields(self): + """Columns with missing fields get empty defaults.""" + raw = { + "name": "test", + "columns": [{}], + } + result = table_to_yaml_dict(raw) + assert result["columns"] == [ + {"name": "", "type": "", "description": ""}, + ] + + def test_table_to_yaml_dict_description_none(self): + """description=None -> empty string.""" + raw = {"name": "test", "description": None} + result = table_to_yaml_dict(raw) + assert result["description"] == "" diff --git a/webapp/app.py b/webapp/app.py index f119a13..4e67d28 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -62,6 +62,16 @@ try: except ImportError: MetricParser = None +# Shared OpenMetadata transformer (catalog -> dict) +try: + from connectors.openmetadata.transformer import ( + metric_to_detail_dict as _transformer_metric_detail, + metric_to_display_dict as _transformer_metric_display, + ) + _TRANSFORMER_AVAILABLE = True +except ImportError: + _TRANSFORMER_AVAILABLE = False + # Configure logging logging.basicConfig( level=logging.INFO, @@ -645,31 +655,29 @@ def _parse_om_metric(raw_metric: dict) -> dict: """ Parse raw OpenMetadata metric dict into format for metric list display. - Extracts category, grain from tags with standard prefixes: - - Category: tagFQN like "MetricCategory.finance" or "Category.marketing" - - Grain: tagFQN like "Grain.monthly" + Delegates to shared transformer module for consistent parsing across + webapp and catalog export. Args: - raw_metric: Raw metric dict from OpenMetadata (id, fullyQualifiedName, description, tags, etc.) + raw_metric: Raw metric dict from OpenMetadata Returns: - Dict with keys: name, display_name, description, grain, path - (path = "catalog:{fullyQualifiedName}" for JS routing) + Dict with keys: name, display_name, description, grain, category, path """ + if _TRANSFORMER_AVAILABLE: + return _transformer_metric_display(raw_metric) + + # Inline fallback if transformer module not available fqn = raw_metric.get("fullyQualifiedName", "") name = raw_metric.get("name", "") display_name = raw_metric.get("displayName", name) description = raw_metric.get("description", "") or "" - - # Extract category from tags, grain from granularity field tags = raw_metric.get("tags", []) category = "general" grain = raw_metric.get("granularity", "") or "" for tag in tags: tag_fqn = tag.get("tagFQN", "") - - # Extract category from MetricCategory.* or Category.* tags if tag_fqn.startswith("MetricCategory."): category = tag_fqn.split(".", 1)[1] elif tag_fqn.startswith("Category."): @@ -679,9 +687,9 @@ def _parse_om_metric(raw_metric: dict) -> dict: "name": name, "display_name": display_name, "description": description, - "grain": grain.lower() if grain else "", # Normalize to lowercase + "grain": grain.lower() if grain else "", "category": category, - "path": f"catalog:{fqn}", # Special prefix for JS routing + "path": f"catalog:{fqn}", } @@ -761,8 +769,7 @@ def _build_om_metric_detail(raw_metric: dict) -> dict: """ Convert raw OpenMetadata metric into MetricParser-compatible JSON for modal. - Maps OpenMetadata fields to MetricParser structure (name, display_name, category, metadata, etc.). - Extracts type, unit, grain from OpenMetadata fields (metricType, unitOfMeasurement, granularity). + Delegates to shared transformer module for consistent parsing. Args: raw_metric: Raw metric dict from OpenMetadata @@ -770,12 +777,17 @@ def _build_om_metric_detail(raw_metric: dict) -> dict: Returns: Dict matching MetricParser._structure_metric_data() format """ + category_colors = MetricParser.CATEGORY_COLORS if MetricParser else {} + + if _TRANSFORMER_AVAILABLE: + return _transformer_metric_detail(raw_metric, category_colors=category_colors) + + # Inline fallback if transformer module not available fqn = raw_metric.get("fullyQualifiedName", "") name = raw_metric.get("name", "") display_name = raw_metric.get("displayName", name) description = raw_metric.get("description", "") or "" - # OpenMetadata uses metricExpression instead of expression expression = "" metric_expr = raw_metric.get("metricExpression", {}) if isinstance(metric_expr, dict): @@ -783,67 +795,31 @@ def _build_om_metric_detail(raw_metric: dict) -> dict: elif isinstance(metric_expr, str): expression = metric_expr - owners = raw_metric.get("owners", []) - - # Extract metadata from OpenMetadata fields and tags metric_type = raw_metric.get("metricType", "") or "" unit = raw_metric.get("unitOfMeasurement", "") or "" grain = raw_metric.get("granularity", "") or "" category = "general" dimensions = [] - # Also check tags for category and dimensions - tags = raw_metric.get("tags", []) - for tag in tags: + for tag in raw_metric.get("tags", []): tag_fqn = tag.get("tagFQN", "") - if tag_fqn.startswith("MetricCategory."): category = tag_fqn.split(".", 1)[1] elif tag_fqn.startswith("Dimension."): dimensions.append(tag_fqn.split(".", 1)[1]) - # Extract owner names - owner_names = [] - for owner in owners: - name_val = owner.get("name") or owner.get("displayName", "") - if name_val: - owner_names.append(name_val) - - # Build MetricParser-compatible structure return { "name": name, "display_name": display_name, "category": category, - "category_color": MetricParser.CATEGORY_COLORS.get(category, "#6B7280"), - "metadata": { - "type": metric_type, - "unit": unit, - "grain": grain, - "time_column": "", # Not available in OpenMetadata - }, - "overview": { - "description": description.strip(), - "key_insights": [], # Not available in OpenMetadata - }, - "validation": None, # Not available in OpenMetadata + "category_color": category_colors.get(category, "#6B7280"), + "metadata": {"type": metric_type, "unit": unit, "grain": grain, "time_column": ""}, + "overview": {"description": description.strip(), "key_insights": []}, + "validation": None, "dimensions": dimensions, - "notes": { - "all": [], # Not available in OpenMetadata - "key_insights": [], - }, - "sql_examples": { - "expression": { - "title": "Metric Expression", - "query": expression, - "complexity": "simple", - } - } if expression else {}, - "technical": { - "table": "", # Not available in OpenMetadata - "expression": expression, - "synonyms": [], - "data_sources": [], - }, + "notes": {"all": [], "key_insights": []}, + "sql_examples": {"expression": {"title": "Metric Expression", "query": expression, "complexity": "simple"}} if expression else {}, + "technical": {"table": "", "expression": expression, "synonyms": [], "data_sources": []}, "special_sections": {}, }