Use data_product config for metric discovery instead of filter_tag in webapp
This commit is contained in:
parent
e63c8747b5
commit
ed16122994
2 changed files with 47 additions and 12 deletions
|
|
@ -340,6 +340,35 @@ class CatalogEnricher:
|
||||||
logger.warning(f"Failed to fetch metrics from catalog: {e}")
|
logger.warning(f"Failed to fetch metrics from catalog: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
def get_metrics_by_data_product(self, data_product_name: str, limit: int = 200) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Fetch metrics belonging to a specific data product.
|
||||||
|
|
||||||
|
Preferred over get_metrics() + tag filter when data_product is configured,
|
||||||
|
as it returns exactly the metrics in the data product regardless of tags.
|
||||||
|
|
||||||
|
Returns empty list if enricher disabled, catalog unavailable, or on error.
|
||||||
|
Never raises exception (graceful degradation).
|
||||||
|
"""
|
||||||
|
if not self.enabled or not self._client:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
cache_key = f"__metrics_dp_{data_product_name}__"
|
||||||
|
cached = self._get_from_cache(cache_key)
|
||||||
|
if cached is not None:
|
||||||
|
logger.debug(f"Catalog cache hit: metrics for data product '{data_product_name}'")
|
||||||
|
return cached
|
||||||
|
|
||||||
|
metrics = self._client.search_by_data_product(data_product_name, entity_type="metric", limit=limit)
|
||||||
|
self._cache_entry(cache_key, metrics)
|
||||||
|
logger.info(f"Loaded {len(metrics)} metrics from data product '{data_product_name}'")
|
||||||
|
return metrics
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to fetch metrics for data product '{data_product_name}': {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
def clear_cache(self):
|
def clear_cache(self):
|
||||||
"""Manually clear all cached entries."""
|
"""Manually clear all cached entries."""
|
||||||
self._cache.clear()
|
self._cache.clear()
|
||||||
|
|
|
||||||
|
|
@ -83,6 +83,7 @@ logger = logging.getLogger(__name__)
|
||||||
# Global catalog enricher (initialized in create_app)
|
# Global catalog enricher (initialized in create_app)
|
||||||
_catalog_enricher = None
|
_catalog_enricher = None
|
||||||
_catalog_filter_tag = ""
|
_catalog_filter_tag = ""
|
||||||
|
_catalog_data_product = ""
|
||||||
|
|
||||||
|
|
||||||
def get_git_commit_hash() -> str:
|
def get_git_commit_hash() -> str:
|
||||||
|
|
@ -124,11 +125,14 @@ def create_app() -> Flask:
|
||||||
_catalog_enricher = CatalogEnricher(instance_config)
|
_catalog_enricher = CatalogEnricher(instance_config)
|
||||||
if _catalog_enricher.enabled:
|
if _catalog_enricher.enabled:
|
||||||
logger.info("OpenMetadata catalog enricher initialized")
|
logger.info("OpenMetadata catalog enricher initialized")
|
||||||
# Store filter tag for metric filtering
|
# Store metric discovery config
|
||||||
global _catalog_filter_tag
|
global _catalog_filter_tag, _catalog_data_product
|
||||||
om_config = instance_config.get("openmetadata", {})
|
om_config = instance_config.get("openmetadata", {})
|
||||||
|
_catalog_data_product = om_config.get("data_product", "").strip()
|
||||||
_catalog_filter_tag = om_config.get("filter_tag", "").strip()
|
_catalog_filter_tag = om_config.get("filter_tag", "").strip()
|
||||||
if _catalog_filter_tag:
|
if _catalog_data_product:
|
||||||
|
logger.info(f"Catalog metric discovery: data product '{_catalog_data_product}'")
|
||||||
|
elif _catalog_filter_tag:
|
||||||
logger.info(f"Catalog metric filter tag: {_catalog_filter_tag}")
|
logger.info(f"Catalog metric filter tag: {_catalog_filter_tag}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to initialize catalog enricher: {e}")
|
logger.warning(f"Failed to initialize catalog enricher: {e}")
|
||||||
|
|
@ -722,19 +726,21 @@ def _load_metrics_from_catalog() -> list:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Fetch metrics from catalog
|
# Fetch metrics - prefer data product filter, fall back to tag filter
|
||||||
raw_metrics = _catalog_enricher.get_metrics()
|
if _catalog_data_product:
|
||||||
|
raw_metrics = _catalog_enricher.get_metrics_by_data_product(_catalog_data_product)
|
||||||
|
else:
|
||||||
|
raw_metrics = _catalog_enricher.get_metrics()
|
||||||
|
if _catalog_filter_tag and _TRANSFORMER_AVAILABLE:
|
||||||
|
raw_metrics = [
|
||||||
|
m for m in raw_metrics
|
||||||
|
if _transformer_has_tag(m.get("tags", []), _catalog_filter_tag)
|
||||||
|
]
|
||||||
|
|
||||||
if not raw_metrics:
|
if not raw_metrics:
|
||||||
logger.debug("No metrics found in catalog")
|
logger.debug("No metrics found in catalog")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Filter by tag if configured
|
|
||||||
if _catalog_filter_tag and _TRANSFORMER_AVAILABLE:
|
|
||||||
raw_metrics = [
|
|
||||||
m for m in raw_metrics
|
|
||||||
if _transformer_has_tag(m.get("tags", []), _catalog_filter_tag)
|
|
||||||
]
|
|
||||||
|
|
||||||
# Parse each metric and group by category
|
# Parse each metric and group by category
|
||||||
categories = {}
|
categories = {}
|
||||||
for raw in raw_metrics:
|
for raw in raw_metrics:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue