Fix search_by_data_product: client-side filtering

OpenMetadata search API ignores queryFilter for dataProducts field.
Use type-specific index + client-side filtering by dataProducts
membership instead. Correctly returns 16/32 metrics for FoundryAI.
This commit is contained in:
Petr 2026-03-18 12:54:59 +01:00
parent fb63a72a98
commit 908d1f2247

View file

@ -143,37 +143,34 @@ class OpenMetadataClient:
self,
data_product_name: str,
entity_type: str = "",
limit: int = 50,
fields: str = "tags,owners",
limit: int = 200,
) -> List[Dict[str, Any]]:
"""
Search for entities belonging to a data product.
Uses OpenMetadata search API with queryFilter to find all assets
(metrics, tables, etc.) that are part of the specified data product.
Uses OpenMetadata search API to fetch entities, then filters client-side
by data product membership (queryFilter is unreliable for dataProducts field).
Args:
data_product_name: Name of the data product (e.g., "FoundryAIDataModel")
entity_type: Filter by entity type (e.g., "metric", "table"). Empty = all types.
limit: Maximum number of results
fields: Comma-separated list of fields to include
limit: Maximum number of results to fetch before filtering
Returns:
List of entity dictionaries
List of entity dictionaries that belong to the data product
"""
must_clauses = [
{"term": {"dataProducts.name.keyword": data_product_name}},
]
if entity_type:
must_clauses.append({"term": {"entityType": entity_type}})
query_filter = json.dumps({"bool": {"must": must_clauses}})
# Use type-specific index for efficiency (queryFilter is ignored by API,
# so we filter client-side by dataProducts field)
index_map = {
"metric": "metric_search_index",
"table": "table_search_index",
}
index = index_map.get(entity_type, "all")
params = {
"q": "*",
"index": "all",
"index": index,
"size": limit,
"queryFilter": query_filter,
}
response = self._client.get("/api/v1/search/query", params=params)
@ -181,7 +178,22 @@ class OpenMetadataClient:
data = response.json()
hits = data.get("hits", {}).get("hits", [])
return [hit.get("_source", {}) for hit in hits]
all_entities = [hit.get("_source", {}) for hit in hits]
# Client-side filter: only entities that belong to the data product
filtered = [
entity for entity in all_entities
if any(
dp.get("name") == data_product_name
for dp in entity.get("dataProducts", [])
)
]
logger.info(
f"Data product '{data_product_name}' ({entity_type or 'all'}): "
f"{len(filtered)}/{len(all_entities)} entities matched"
)
return filtered
def close(self):
"""Close HTTP client session."""