From ad525a96aa4ca306544877770009183d5cba45f9 Mon Sep 17 00:00:00 2001 From: Petr Date: Mon, 16 Mar 2026 22:03:53 +0100 Subject: [PATCH] Filter catalog metrics by configurable tag (e.g., AIAgent.FoundryAI) Add filter_tag support to catalog_export and webapp so only metrics with the required tag are exported to YAML and displayed in UI. Previously all 19+ metrics were exported regardless of relevance. - Add has_tag() helper to transformer module - catalog_export.py: filter_tag parameter from instance.yaml openmetadata config - webapp/app.py: filter metrics in _load_metrics_from_catalog() - 7 new tests (has_tag, filter_tag export, stale cleanup) --- connectors/openmetadata/transformer.py | 14 ++++++++ src/catalog_export.py | 25 +++++++++++--- tests/test_catalog_export.py | 46 ++++++++++++++++++++++++++ tests/test_openmetadata_transformer.py | 25 ++++++++++++++ webapp/app.py | 15 +++++++++ 5 files changed, 120 insertions(+), 5 deletions(-) diff --git a/connectors/openmetadata/transformer.py b/connectors/openmetadata/transformer.py index 76c4feb..6452478 100644 --- a/connectors/openmetadata/transformer.py +++ b/connectors/openmetadata/transformer.py @@ -174,6 +174,20 @@ def extract_unit(raw_metric: Dict[str, Any]) -> str: return "" +def has_tag(tags: List[Dict[str, Any]], tag_fqn: str) -> bool: + """ + Check if a specific tag (by FQN) is present in the tag list. + + Args: + tags: List of tag dicts from OpenMetadata + tag_fqn: Fully qualified tag name to check (e.g., "AIAgent.FoundryAI") + + Returns: + True if the tag is found + """ + return any(t.get("tagFQN", "") == tag_fqn for t in tags) + + def extract_tag_names(tags: List[Dict[str, Any]]) -> List[str]: """ Extract simple tag names from OpenMetadata tag list. diff --git a/src/catalog_export.py b/src/catalog_export.py index c56ddf0..83c17c2 100644 --- a/src/catalog_export.py +++ b/src/catalog_export.py @@ -29,6 +29,7 @@ from connectors.openmetadata.client import OpenMetadataClient from connectors.openmetadata.transformer import ( extract_category, extract_grain, + has_tag, metric_to_yaml_dict, sanitize_filename, table_to_yaml_dict, @@ -121,21 +122,24 @@ def export_metrics( client: OpenMetadataClient, docs_dir: Path, catalog_url: str, + filter_tag: str = "", ) -> int: """ Export metrics from OpenMetadata to YAML files. For each metric: 1. Fetches all metrics from catalog API - 2. Transforms each to YAML-compatible dict - 3. Writes individual YAML files: {docs_dir}/metrics/{category}/{name}.yml - 4. Writes index file: {docs_dir}/metrics/metrics.yml - 5. Cleans up stale auto-generated files + 2. Filters by required tag (if configured) + 3. Transforms each to YAML-compatible dict + 4. Writes individual YAML files: {docs_dir}/metrics/{category}/{name}.yml + 5. Writes index file: {docs_dir}/metrics/metrics.yml + 6. Cleans up stale auto-generated files Args: client: Initialized OpenMetadata API client docs_dir: Base docs directory (e.g., /data/docs) catalog_url: Catalog URL for header comments + filter_tag: If set, only export metrics that have this tag (e.g., "AIAgent.FoundryAI") Returns: Number of metrics exported @@ -151,6 +155,14 @@ def export_metrics( logger.info(f"Fetched {len(raw_metrics)} metrics from catalog") + # Filter by tag if configured + if filter_tag: + filtered = [m for m in raw_metrics if has_tag(m.get("tags", []), filter_tag)] + logger.info( + f"Tag filter '{filter_tag}': {len(filtered)}/{len(raw_metrics)} metrics matched" + ) + raw_metrics = filtered + # Track which files we write (for cleanup) written_files: set[Path] = set() index_entries: List[Dict[str, Any]] = [] @@ -400,9 +412,12 @@ def main() -> None: logger.warning(f"Failed to initialize OpenMetadata client: {e}") return + # Optional tag filter (only export metrics with this tag) + filter_tag = om_config.get("filter_tag", "").strip() + try: # Export metrics - metrics_count = export_metrics(client, docs_dir, catalog_url) + metrics_count = export_metrics(client, docs_dir, catalog_url, filter_tag=filter_tag) # Export tables try: diff --git a/tests/test_catalog_export.py b/tests/test_catalog_export.py index af322a6..082dcb5 100644 --- a/tests/test_catalog_export.py +++ b/tests/test_catalog_export.py @@ -297,6 +297,52 @@ class TestExportMetrics: assert (docs / "metrics" / "finance" / "total_revenue.yml").exists() assert (docs / "metrics" / "product" / "active_users.yml").exists() + def test_export_metrics_filter_tag_keeps_matching(self, tmp_path: Path, mock_client): + """Only metrics with the filter_tag are exported.""" + tagged = _make_raw_metric(name="M1", fqn="M1", category_tag="MetricCategory.finance") + tagged["tags"].append({"tagFQN": "AIAgent.FoundryAI", "name": "FoundryAI"}) + + untagged = _make_raw_metric( + name="Live Deals", fqn="LiveDeals", category_tag="MetricCategory.supply" + ) + + mock_client.get_metrics.return_value = [tagged, untagged] + + docs = tmp_path / "docs" + count = export_metrics(mock_client, docs, CATALOG_URL, filter_tag="AIAgent.FoundryAI") + + assert count == 1 + assert (docs / "metrics" / "finance" / "m1.yml").exists() + assert not (docs / "metrics" / "supply").exists() + + def test_export_metrics_filter_tag_empty_exports_all(self, tmp_path: Path, mock_client): + """Empty filter_tag means no filtering - all metrics exported.""" + mock_client.get_metrics.return_value = [ + _make_raw_metric(name="A", fqn="A"), + _make_raw_metric(name="B", fqn="B"), + ] + + docs = tmp_path / "docs" + count = export_metrics(mock_client, docs, CATALOG_URL, filter_tag="") + + assert count == 2 + + def test_export_metrics_filter_tag_cleans_stale_untagged(self, tmp_path: Path, mock_client): + """Stale files from previously-exported untagged metrics get cleaned up.""" + tagged = _make_raw_metric(name="M1", fqn="M1", category_tag="MetricCategory.finance") + tagged["tags"].append({"tagFQN": "AIAgent.FoundryAI", "name": "FoundryAI"}) + mock_client.get_metrics.return_value = [tagged] + + docs = tmp_path / "docs" + stale_dir = docs / "metrics" / "general" + stale_dir.mkdir(parents=True) + stale = stale_dir / "livedeals.yml" + stale.write_text(AUTO_GENERATED_MARKER + "\nname: livedeals\n") + + export_metrics(mock_client, docs, CATALOG_URL, filter_tag="AIAgent.FoundryAI") + + assert not stale.exists() + # --------------------------------------------------------------------------- # 4. export_tables diff --git a/tests/test_openmetadata_transformer.py b/tests/test_openmetadata_transformer.py index 8ca2c8f..7d16570 100644 --- a/tests/test_openmetadata_transformer.py +++ b/tests/test_openmetadata_transformer.py @@ -15,6 +15,7 @@ from connectors.openmetadata.transformer import ( extract_owners, extract_tag_names, extract_unit, + has_tag, metric_to_detail_dict, metric_to_display_dict, metric_to_yaml_dict, @@ -331,6 +332,30 @@ class TestExtractUnit: # extract_tag_names # =========================================================================== +class TestHasTag: + def test_has_tag_present(self): + """Returns True when tag with matching FQN is in the list.""" + tags = [ + {"tagFQN": "AIAgent.FoundryAI", "name": "FoundryAI"}, + {"tagFQN": "Tier.Tier1"}, + ] + assert has_tag(tags, "AIAgent.FoundryAI") is True + + def test_has_tag_absent(self): + """Returns False when tag is not in the list.""" + tags = [{"tagFQN": "Tier.Tier2"}] + assert has_tag(tags, "AIAgent.FoundryAI") is False + + def test_has_tag_empty_list(self): + """Returns False for empty tag list.""" + assert has_tag([], "AIAgent.FoundryAI") is False + + def test_has_tag_partial_match(self): + """Does not match partial FQN.""" + tags = [{"tagFQN": "AIAgent.FoundryAI_v2"}] + assert has_tag(tags, "AIAgent.FoundryAI") is False + + class TestExtractTagNames: def test_extract_tag_names_with_name_field(self): """Tags with 'name' field use that value.""" diff --git a/webapp/app.py b/webapp/app.py index 4e67d28..14b5d79 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -65,6 +65,7 @@ except ImportError: # Shared OpenMetadata transformer (catalog -> dict) try: from connectors.openmetadata.transformer import ( + has_tag as _transformer_has_tag, metric_to_detail_dict as _transformer_metric_detail, metric_to_display_dict as _transformer_metric_display, ) @@ -81,6 +82,7 @@ logger = logging.getLogger(__name__) # Global catalog enricher (initialized in create_app) _catalog_enricher = None +_catalog_filter_tag = "" def get_git_commit_hash() -> str: @@ -122,6 +124,12 @@ def create_app() -> Flask: _catalog_enricher = CatalogEnricher(instance_config) if _catalog_enricher.enabled: logger.info("OpenMetadata catalog enricher initialized") + # Store filter tag for metric filtering + global _catalog_filter_tag + om_config = instance_config.get("openmetadata", {}) + _catalog_filter_tag = om_config.get("filter_tag", "").strip() + if _catalog_filter_tag: + logger.info(f"Catalog metric filter tag: {_catalog_filter_tag}") except Exception as e: logger.warning(f"Failed to initialize catalog enricher: {e}") _catalog_enricher = None @@ -720,6 +728,13 @@ def _load_metrics_from_catalog() -> list: logger.debug("No metrics found in catalog") return [] + # Filter by tag if configured + if _catalog_filter_tag and _TRANSFORMER_AVAILABLE: + raw_metrics = [ + m for m in raw_metrics + if _transformer_has_tag(m.get("tags", []), _catalog_filter_tag) + ] + # Parse each metric and group by category categories = {} for raw in raw_metrics: