diff --git a/connectors/openmetadata/enricher.py b/connectors/openmetadata/enricher.py index e7ebfe0..5f01316 100644 --- a/connectors/openmetadata/enricher.py +++ b/connectors/openmetadata/enricher.py @@ -340,6 +340,35 @@ class CatalogEnricher: logger.warning(f"Failed to fetch metrics from catalog: {e}") return [] + def get_metrics_by_data_product(self, data_product_name: str, limit: int = 200) -> List[Dict[str, Any]]: + """ + Fetch metrics belonging to a specific data product. + + Preferred over get_metrics() + tag filter when data_product is configured, + as it returns exactly the metrics in the data product regardless of tags. + + Returns empty list if enricher disabled, catalog unavailable, or on error. + Never raises exception (graceful degradation). + """ + if not self.enabled or not self._client: + return [] + + try: + cache_key = f"__metrics_dp_{data_product_name}__" + cached = self._get_from_cache(cache_key) + if cached is not None: + logger.debug(f"Catalog cache hit: metrics for data product '{data_product_name}'") + return cached + + metrics = self._client.search_by_data_product(data_product_name, entity_type="metric", limit=limit) + self._cache_entry(cache_key, metrics) + logger.info(f"Loaded {len(metrics)} metrics from data product '{data_product_name}'") + return metrics + + except Exception as e: + logger.warning(f"Failed to fetch metrics for data product '{data_product_name}': {e}") + return [] + def clear_cache(self): """Manually clear all cached entries.""" self._cache.clear() diff --git a/webapp/app.py b/webapp/app.py index cbce3cd..2a9ba80 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -83,6 +83,7 @@ logger = logging.getLogger(__name__) # Global catalog enricher (initialized in create_app) _catalog_enricher = None _catalog_filter_tag = "" +_catalog_data_product = "" def get_git_commit_hash() -> str: @@ -124,11 +125,14 @@ def create_app() -> Flask: _catalog_enricher = CatalogEnricher(instance_config) if _catalog_enricher.enabled: logger.info("OpenMetadata catalog enricher initialized") - # Store filter tag for metric filtering - global _catalog_filter_tag + # Store metric discovery config + global _catalog_filter_tag, _catalog_data_product om_config = instance_config.get("openmetadata", {}) + _catalog_data_product = om_config.get("data_product", "").strip() _catalog_filter_tag = om_config.get("filter_tag", "").strip() - if _catalog_filter_tag: + if _catalog_data_product: + logger.info(f"Catalog metric discovery: data product '{_catalog_data_product}'") + elif _catalog_filter_tag: logger.info(f"Catalog metric filter tag: {_catalog_filter_tag}") except Exception as e: logger.warning(f"Failed to initialize catalog enricher: {e}") @@ -722,19 +726,21 @@ def _load_metrics_from_catalog() -> list: return [] try: - # Fetch metrics from catalog - raw_metrics = _catalog_enricher.get_metrics() + # Fetch metrics - prefer data product filter, fall back to tag filter + if _catalog_data_product: + raw_metrics = _catalog_enricher.get_metrics_by_data_product(_catalog_data_product) + else: + raw_metrics = _catalog_enricher.get_metrics() + if _catalog_filter_tag and _TRANSFORMER_AVAILABLE: + raw_metrics = [ + m for m in raw_metrics + if _transformer_has_tag(m.get("tags", []), _catalog_filter_tag) + ] + if not raw_metrics: logger.debug("No metrics found in catalog") return [] - # Filter by tag if configured - if _catalog_filter_tag and _TRANSFORMER_AVAILABLE: - raw_metrics = [ - m for m in raw_metrics - if _transformer_has_tag(m.get("tags", []), _catalog_filter_tag) - ] - # Parse each metric and group by category categories = {} for raw in raw_metrics: