diff --git a/connectors/openmetadata/client.py b/connectors/openmetadata/client.py index 5ccc102..7784b3d 100644 --- a/connectors/openmetadata/client.py +++ b/connectors/openmetadata/client.py @@ -143,37 +143,34 @@ class OpenMetadataClient: self, data_product_name: str, entity_type: str = "", - limit: int = 50, - fields: str = "tags,owners", + limit: int = 200, ) -> List[Dict[str, Any]]: """ Search for entities belonging to a data product. - Uses OpenMetadata search API with queryFilter to find all assets - (metrics, tables, etc.) that are part of the specified data product. + Uses OpenMetadata search API to fetch entities, then filters client-side + by data product membership (queryFilter is unreliable for dataProducts field). Args: data_product_name: Name of the data product (e.g., "FoundryAIDataModel") entity_type: Filter by entity type (e.g., "metric", "table"). Empty = all types. - limit: Maximum number of results - fields: Comma-separated list of fields to include + limit: Maximum number of results to fetch before filtering Returns: - List of entity dictionaries + List of entity dictionaries that belong to the data product """ - must_clauses = [ - {"term": {"dataProducts.name.keyword": data_product_name}}, - ] - if entity_type: - must_clauses.append({"term": {"entityType": entity_type}}) - - query_filter = json.dumps({"bool": {"must": must_clauses}}) + # Use type-specific index for efficiency (queryFilter is ignored by API, + # so we filter client-side by dataProducts field) + index_map = { + "metric": "metric_search_index", + "table": "table_search_index", + } + index = index_map.get(entity_type, "all") params = { "q": "*", - "index": "all", + "index": index, "size": limit, - "queryFilter": query_filter, } response = self._client.get("/api/v1/search/query", params=params) @@ -181,7 +178,22 @@ class OpenMetadataClient: data = response.json() hits = data.get("hits", {}).get("hits", []) - return [hit.get("_source", {}) for hit in hits] + all_entities = [hit.get("_source", {}) for hit in hits] + + # Client-side filter: only entities that belong to the data product + filtered = [ + entity for entity in all_entities + if any( + dp.get("name") == data_product_name + for dp in entity.get("dataProducts", []) + ) + ] + + logger.info( + f"Data product '{data_product_name}' ({entity_type or 'all'}): " + f"{len(filtered)}/{len(all_entities)} entities matched" + ) + return filtered def close(self): """Close HTTP client session."""