Implement OpenMetadata catalog integration (Phase 1)
Add OpenMetadata REST API connector and enricher to merge table/column metadata
from OpenMetadata catalog at sync and query time.
Changes:
- connectors/openmetadata/client.py: HTTP client for OM API
- connectors/openmetadata/enricher.py: Data enrichment with TTL cache
- tests/test_openmetadata_*: Unit tests for client and enricher
- src/config.py: Add catalog_fqn field to TableConfig
- src/data_sync.py: Use enricher in _generate_schema_yaml (catalog > BQ API > data_description.md)
- webapp/app.py: Initialize enricher, enrich catalog data with tags/tier/owners/url
- config/instance.yaml.example: Document openmetadata section
Features:
- FQN auto-derivation: bigquery.{table.id}
- TTL cache (default 1h) to avoid repeated API calls
- Graceful degradation: disabled if token missing, silent on HTTP errors
- Column description priority: catalog > BQ API > (none)
- Table description priority: catalog > data_description.md
This commit is contained in:
parent
8bb46a9e0a
commit
c5c24cb45b
9 changed files with 1086 additions and 6 deletions
|
|
@ -77,6 +77,14 @@ data_source:
|
|||
# Uses ADC (Application Default Credentials) - VM service account on GCP
|
||||
# Data can live in a different project -- use fully-qualified table IDs in data_description.md
|
||||
|
||||
# --- OpenMetadata catalog (optional - Groupon-specific) ---
|
||||
# Enriches table and column metadata from OpenMetadata REST API.
|
||||
# If not configured, app works normally without catalog enrichment.
|
||||
# openmetadata:
|
||||
# url: "https://your-catalog.example.com"
|
||||
# token: "${OPENMETADATA_TOKEN}" # JWT bearer token
|
||||
# cache_ttl_seconds: 3600 # Cache TTL in seconds
|
||||
|
||||
# --- Email delivery (optional, for magic link auth) ---
|
||||
# Without SMTP, magic links are shown directly in browser (development mode).
|
||||
# For production, configure any SMTP relay (Gmail, Mailgun, SendGrid SMTP, etc.)
|
||||
|
|
|
|||
1
connectors/openmetadata/__init__.py
Normal file
1
connectors/openmetadata/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
"""OpenMetadata catalog integration connector."""
|
||||
120
connectors/openmetadata/client.py
Normal file
120
connectors/openmetadata/client.py
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
"""
|
||||
OpenMetadata REST API Client
|
||||
|
||||
Low-level HTTP wrapper for OpenMetadata REST API with these functions:
|
||||
1. Authentication using JWT bearer token
|
||||
2. Get table metadata (description, columns, tags, owners)
|
||||
3. Get metrics (for Phase 2)
|
||||
4. Proper error handling and logging
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OpenMetadataClient:
|
||||
"""
|
||||
HTTP client for OpenMetadata REST API.
|
||||
|
||||
Provides methods for querying table metadata:
|
||||
- get_table(fqn) -> table metadata with columns, owners, tags
|
||||
- get_metrics() -> list of available business metrics
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
token: str,
|
||||
timeout: int = 30,
|
||||
):
|
||||
"""
|
||||
Initialize OpenMetadata API client.
|
||||
|
||||
Args:
|
||||
base_url: Base URL of OpenMetadata instance (e.g., "https://catalog.example.com")
|
||||
token: JWT bearer token for authentication
|
||||
timeout: HTTP request timeout in seconds
|
||||
"""
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.token = token
|
||||
self.timeout = timeout
|
||||
self._client = httpx.Client(
|
||||
base_url=self.base_url,
|
||||
headers={
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
def get_table(self, fqn: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Fetch table metadata from OpenMetadata.
|
||||
|
||||
Args:
|
||||
fqn: Fully qualified name (e.g., "bigquery.project.dataset.table")
|
||||
|
||||
Returns:
|
||||
Dictionary with table metadata including:
|
||||
- id, name, fullyQualifiedName
|
||||
- description
|
||||
- columns: list of column dicts with name, dataType, description
|
||||
- tags: list of tag dicts
|
||||
- owners: list of owner dicts with name, email
|
||||
- extension: custom metadata (e.g., tier)
|
||||
|
||||
Raises:
|
||||
httpx.HTTPStatusError: If request fails (non-2xx status)
|
||||
"""
|
||||
url = f"/api/v1/tables/name/{fqn}"
|
||||
params = {
|
||||
"fields": "columns,owners,tags,extension",
|
||||
"include": "all",
|
||||
}
|
||||
|
||||
response = self._client.get(url, params=params)
|
||||
response.raise_for_status()
|
||||
|
||||
return response.json()
|
||||
|
||||
def get_metrics(self, limit: int = 100) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Fetch list of available metrics from OpenMetadata (Phase 2).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of metrics to return
|
||||
|
||||
Returns:
|
||||
List of metric dictionaries with:
|
||||
- id, name, fullyQualifiedName
|
||||
- description
|
||||
- expression: metric calculation SQL/formula
|
||||
- owners, tags
|
||||
"""
|
||||
params = {
|
||||
"limit": limit,
|
||||
"fields": "description,expression,owners,tags",
|
||||
}
|
||||
|
||||
response = self._client.get("/api/v1/metrics", params=params)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
return data.get("data", [])
|
||||
|
||||
def close(self):
|
||||
"""Close HTTP client session."""
|
||||
self._client.close()
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry."""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit."""
|
||||
self.close()
|
||||
312
connectors/openmetadata/enricher.py
Normal file
312
connectors/openmetadata/enricher.py
Normal file
|
|
@ -0,0 +1,312 @@
|
|||
"""
|
||||
OpenMetadata Catalog Data Enricher
|
||||
|
||||
High-level enrichment layer that:
|
||||
1. Initializes from instance config (disabled gracefully if no token)
|
||||
2. Caches table metadata with TTL (default 1 hour)
|
||||
3. Parses OpenMetadata responses into typed data
|
||||
4. Enriches table/column metadata at sync and query time
|
||||
5. Gracefully degrades on errors (never crashes app)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
from src.config import TableConfig
|
||||
from .client import OpenMetadataClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CatalogColumnData:
|
||||
"""Column metadata enriched from OpenMetadata catalog."""
|
||||
|
||||
description: str
|
||||
data_type: str
|
||||
tags: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CatalogTableData:
|
||||
"""Table metadata enriched from OpenMetadata catalog."""
|
||||
|
||||
description: str
|
||||
columns: Dict[str, CatalogColumnData] # key = lowercase column name
|
||||
tags: List[str] = field(default_factory=list)
|
||||
owners: List[str] = field(default_factory=list) # owner names
|
||||
tier: Optional[str] = None # "Tier1", "Tier2", etc.
|
||||
catalog_url: Optional[str] = None # Direct link to catalog
|
||||
|
||||
|
||||
class CatalogEnricher:
|
||||
"""
|
||||
Enriches table and column metadata from OpenMetadata catalog.
|
||||
|
||||
Usage:
|
||||
enricher = CatalogEnricher(instance_config)
|
||||
if enricher.enabled:
|
||||
catalog_data = enricher.enrich_table(table_config)
|
||||
# Use catalog_data.description, columns, tags, owners, tier
|
||||
"""
|
||||
|
||||
enabled: bool
|
||||
|
||||
def __init__(self, instance_config: Dict[str, Any]):
|
||||
"""
|
||||
Initialize enricher from instance config.
|
||||
|
||||
Args:
|
||||
instance_config: Dictionary with optional "openmetadata" section:
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "jwt-token-or-env-var",
|
||||
"cache_ttl_seconds": 3600
|
||||
}
|
||||
}
|
||||
|
||||
Sets self.enabled = False if:
|
||||
- No "openmetadata" section in config
|
||||
- No token provided (returns gracefully, not an error)
|
||||
- URL is invalid
|
||||
"""
|
||||
self.enabled = False
|
||||
self._client: Optional[OpenMetadataClient] = None
|
||||
self._cache: Dict[str, Dict[str, Any]] = {}
|
||||
self._cache_ttl_seconds = 3600
|
||||
|
||||
try:
|
||||
om_config = instance_config.get("openmetadata", {})
|
||||
if not om_config:
|
||||
logger.debug("OpenMetadata not configured (openmetadata section missing)")
|
||||
return
|
||||
|
||||
url = om_config.get("url", "").strip()
|
||||
token = om_config.get("token", "").strip()
|
||||
cache_ttl = om_config.get("cache_ttl_seconds", 3600)
|
||||
|
||||
if not url or not token:
|
||||
logger.debug(
|
||||
f"OpenMetadata disabled: url={bool(url)}, token={bool(token)}"
|
||||
)
|
||||
return
|
||||
|
||||
self._cache_ttl_seconds = cache_ttl
|
||||
self._client = OpenMetadataClient(base_url=url, token=token)
|
||||
self.enabled = True
|
||||
|
||||
logger.info(
|
||||
f"OpenMetadata enricher initialized: {url} (cache TTL: {cache_ttl}s)"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to initialize OpenMetadata enricher: {e}. Continuing without catalog enrichment."
|
||||
)
|
||||
self.enabled = False
|
||||
|
||||
def enrich_table(
|
||||
self, table_config: TableConfig
|
||||
) -> Optional[CatalogTableData]:
|
||||
"""
|
||||
Enrich table metadata from catalog.
|
||||
|
||||
Args:
|
||||
table_config: Table configuration with id and optional catalog_fqn
|
||||
|
||||
Returns:
|
||||
CatalogTableData with description, columns, tags, owners, tier
|
||||
or None if:
|
||||
- enricher is disabled
|
||||
- FQN derivation fails
|
||||
- HTTP request fails
|
||||
- Parsing fails
|
||||
Always returns None gracefully, never raises exception.
|
||||
"""
|
||||
if not self.enabled or not self._client:
|
||||
return None
|
||||
|
||||
try:
|
||||
fqn = self._derive_fqn(table_config)
|
||||
if not fqn:
|
||||
return None
|
||||
|
||||
# Check cache
|
||||
cached = self._get_from_cache(fqn)
|
||||
if cached:
|
||||
logger.debug(f"Catalog cache hit: {fqn}")
|
||||
return cached
|
||||
|
||||
# Fetch from API
|
||||
logger.debug(f"Fetching catalog metadata: {fqn}")
|
||||
raw_response = self._client.get_table(fqn)
|
||||
|
||||
# Parse and cache
|
||||
parsed = self._parse_table_response(raw_response)
|
||||
if parsed:
|
||||
self._cache_entry(fqn, parsed)
|
||||
|
||||
return parsed
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error enriching table {table_config.name}: {e}")
|
||||
return None
|
||||
|
||||
def _derive_fqn(self, table_config: TableConfig) -> Optional[str]:
|
||||
"""
|
||||
Derive OpenMetadata FQN from table config.
|
||||
|
||||
Auto-derivation: bigquery.{table_config.id}
|
||||
Example: table_config.id = "prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2"
|
||||
-> FQN = "bigquery.prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2"
|
||||
|
||||
Args:
|
||||
table_config: Configuration with id and optional catalog_fqn
|
||||
|
||||
Returns:
|
||||
FQN string or None if derivation fails
|
||||
"""
|
||||
try:
|
||||
# Use explicit override if provided
|
||||
if hasattr(table_config, "catalog_fqn") and table_config.catalog_fqn:
|
||||
return table_config.catalog_fqn
|
||||
|
||||
# Auto-derive: bigquery.{table_id}
|
||||
return f"bigquery.{table_config.id}"
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"FQN derivation failed for {table_config.name}: {e}")
|
||||
return None
|
||||
|
||||
def _parse_table_response(self, raw: Dict[str, Any]) -> Optional[CatalogTableData]:
|
||||
"""
|
||||
Parse OpenMetadata table response into typed CatalogTableData.
|
||||
|
||||
Args:
|
||||
raw: Raw response from OpenMetadata /api/v1/tables/name/{fqn}
|
||||
|
||||
Returns:
|
||||
CatalogTableData or None if parsing fails
|
||||
"""
|
||||
try:
|
||||
description = raw.get("description", "") or ""
|
||||
|
||||
# Parse columns
|
||||
columns = {}
|
||||
for col in raw.get("columns", []):
|
||||
col_name = col.get("name", "").lower()
|
||||
col_description = col.get("description", "") or ""
|
||||
col_type = col.get("dataType", "")
|
||||
|
||||
columns[col_name] = CatalogColumnData(
|
||||
description=col_description,
|
||||
data_type=col_type,
|
||||
tags=self._extract_column_tags(col),
|
||||
)
|
||||
|
||||
# Parse tags
|
||||
tags = self._extract_tags(raw.get("tags", []))
|
||||
|
||||
# Parse owners
|
||||
owners = self._extract_owners(raw.get("owners", []))
|
||||
|
||||
# Parse tier from extension metadata
|
||||
tier = None
|
||||
extension = raw.get("extension", {})
|
||||
if extension:
|
||||
tier = extension.get("tier") or extension.get("Tier")
|
||||
|
||||
# Build catalog URL
|
||||
fqn = raw.get("fullyQualifiedName", "")
|
||||
catalog_url = None
|
||||
if fqn:
|
||||
# Parse base URL from FQN context (would need base_url from config ideally)
|
||||
# For now, construct a reasonable path
|
||||
catalog_url = f"{self._client.base_url}/explore/{fqn}"
|
||||
|
||||
return CatalogTableData(
|
||||
description=description,
|
||||
columns=columns,
|
||||
tags=tags,
|
||||
owners=owners,
|
||||
tier=tier,
|
||||
catalog_url=catalog_url,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse catalog response: {e}")
|
||||
return None
|
||||
|
||||
def _extract_tags(self, tags_list: List[Dict[str, Any]]) -> List[str]:
|
||||
"""Extract tag names from tags list."""
|
||||
try:
|
||||
return [tag.get("name") or tag.get("tagFQN", "").split(".")[-1] for tag in tags_list]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _extract_column_tags(self, column: Dict[str, Any]) -> List[str]:
|
||||
"""Extract tags for a single column."""
|
||||
return self._extract_tags(column.get("tags", []))
|
||||
|
||||
def _extract_owners(self, owners_list: List[Dict[str, Any]]) -> List[str]:
|
||||
"""Extract owner names from owners list."""
|
||||
try:
|
||||
names = []
|
||||
for owner in owners_list:
|
||||
name = owner.get("name") or owner.get("displayName", "")
|
||||
if name:
|
||||
names.append(name)
|
||||
return names
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _get_from_cache(self, fqn: str) -> Optional[CatalogTableData]:
|
||||
"""
|
||||
Check if cached entry exists and is still valid (TTL).
|
||||
|
||||
Args:
|
||||
fqn: Fully qualified name
|
||||
|
||||
Returns:
|
||||
CatalogTableData if valid, None if expired or missing
|
||||
"""
|
||||
if fqn not in self._cache:
|
||||
return None
|
||||
|
||||
entry = self._cache[fqn]
|
||||
fetched_at = entry.get("fetched_at")
|
||||
|
||||
if not fetched_at:
|
||||
return None
|
||||
|
||||
age = datetime.now() - fetched_at
|
||||
if age > timedelta(seconds=self._cache_ttl_seconds):
|
||||
del self._cache[fqn]
|
||||
return None
|
||||
|
||||
return entry.get("data")
|
||||
|
||||
def _cache_entry(self, fqn: str, data: CatalogTableData):
|
||||
"""Cache an enriched table entry."""
|
||||
self._cache[fqn] = {
|
||||
"data": data,
|
||||
"fetched_at": datetime.now(),
|
||||
}
|
||||
|
||||
def clear_cache(self):
|
||||
"""Manually clear all cached entries."""
|
||||
self._cache.clear()
|
||||
logger.info("Catalog cache cleared")
|
||||
|
||||
def __del__(self):
|
||||
"""Cleanup HTTP client on deletion."""
|
||||
if self._client:
|
||||
try:
|
||||
self._client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
|
@ -106,6 +106,7 @@ class TableConfig:
|
|||
row_filter: Optional[str] = None # SQL WHERE clause for filtering (e.g., "event_date >= '2024-01-01'")
|
||||
query_mode: str = "local" # "local" (Parquet) | "remote" (BQ direct) | "hybrid" (sync subset, query BQ)
|
||||
partition_column_type: str = "TIMESTAMP" # BQ SQL type for partition column: "DATE", "TIMESTAMP", "DATETIME"
|
||||
catalog_fqn: Optional[str] = None # Explicit OpenMetadata FQN override (auto-derived if not set)
|
||||
|
||||
def __post_init__(self):
|
||||
"""Validate configuration after initialization."""
|
||||
|
|
@ -455,6 +456,7 @@ class Config:
|
|||
row_filter=table_data.get("row_filter"),
|
||||
query_mode=table_data.get("query_mode", "local"),
|
||||
partition_column_type=table_data.get("partition_column_type", "TIMESTAMP"),
|
||||
catalog_fqn=table_data.get("catalog_fqn"),
|
||||
)
|
||||
table_configs.append(config)
|
||||
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@ from datetime import datetime
|
|||
from tqdm import tqdm
|
||||
|
||||
from .config import get_config, TableConfig
|
||||
from config.loader import load_instance_config
|
||||
from connectors.openmetadata.enricher import CatalogEnricher
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -217,6 +219,14 @@ class DataSyncManager:
|
|||
)
|
||||
self.data_source = create_data_source()
|
||||
|
||||
# Initialize OpenMetadata catalog enricher
|
||||
try:
|
||||
instance_config = load_instance_config()
|
||||
self.catalog_enricher = CatalogEnricher(instance_config)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize catalog enricher: {e}")
|
||||
self.catalog_enricher = CatalogEnricher({}) # Disabled enricher
|
||||
|
||||
def _generate_schema_yaml(self):
|
||||
"""
|
||||
Generate schema.yml file with actual table schemas from Parquet files.
|
||||
|
|
@ -269,10 +279,14 @@ class DataSyncManager:
|
|||
# Get column metadata from data source (if supported)
|
||||
col_metadata = self.data_source.get_column_metadata(table_config.id)
|
||||
|
||||
# Enrich with catalog metadata (OpenMetadata)
|
||||
catalog_data = self.catalog_enricher.enrich_table(table_config)
|
||||
|
||||
# Extract column information
|
||||
columns = []
|
||||
for field_item in arrow_schema:
|
||||
col_name = field_item.name
|
||||
col_name_lower = col_name.lower()
|
||||
pyarrow_type = str(field_item.type)
|
||||
|
||||
column_info = {
|
||||
|
|
@ -280,21 +294,35 @@ class DataSyncManager:
|
|||
"type": pyarrow_type,
|
||||
}
|
||||
|
||||
# Add source type and description from connector metadata
|
||||
# Priority for description: catalog > BQ API > (nothing)
|
||||
description = None
|
||||
if catalog_data and col_name_lower in catalog_data.columns:
|
||||
description = catalog_data.columns[col_name_lower].description
|
||||
elif col_metadata and "columns" in col_metadata:
|
||||
col_meta = col_metadata["columns"].get(col_name, {})
|
||||
description = col_meta.get("description")
|
||||
|
||||
if description:
|
||||
column_info["description"] = description
|
||||
|
||||
# Add source type from connector metadata
|
||||
if col_metadata and "columns" in col_metadata:
|
||||
col_meta = col_metadata["columns"].get(col_name, {})
|
||||
if "source_type" in col_meta:
|
||||
column_info["source_type"] = col_meta["source_type"]
|
||||
if "description" in col_meta:
|
||||
column_info["description"] = col_meta["description"]
|
||||
|
||||
columns.append(column_info)
|
||||
|
||||
primary_key = table_config.get_primary_key_columns()
|
||||
|
||||
# Priority for table description: catalog > data_description.md
|
||||
table_description = table_config.description
|
||||
if catalog_data:
|
||||
table_description = catalog_data.description or table_description
|
||||
|
||||
table_info = {
|
||||
"table_id": table_config.id,
|
||||
"description": table_config.description,
|
||||
"description": table_description,
|
||||
"primary_key": primary_key,
|
||||
"sync_strategy": table_config.sync_strategy,
|
||||
"columns": columns,
|
||||
|
|
|
|||
157
tests/test_openmetadata_client.py
Normal file
157
tests/test_openmetadata_client.py
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
"""
|
||||
Tests for OpenMetadata client
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import httpx
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
|
||||
from connectors.openmetadata.client import OpenMetadataClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_httpx_client():
|
||||
"""Mock httpx.Client."""
|
||||
with patch("connectors.openmetadata.client.httpx.Client") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
def test_client_init(mock_httpx_client):
|
||||
"""Test OpenMetadataClient initialization."""
|
||||
client = OpenMetadataClient(
|
||||
base_url="https://catalog.example.com",
|
||||
token="test-token",
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
assert client.base_url == "https://catalog.example.com"
|
||||
assert client.token == "test-token"
|
||||
assert client.timeout == 30
|
||||
|
||||
# Verify httpx.Client was called with correct headers
|
||||
mock_httpx_client.assert_called_once()
|
||||
call_kwargs = mock_httpx_client.call_args[1]
|
||||
assert call_kwargs["headers"]["Authorization"] == "Bearer test-token"
|
||||
|
||||
|
||||
def test_client_init_strips_trailing_slash():
|
||||
"""Test that base_url trailing slash is stripped."""
|
||||
with patch("connectors.openmetadata.client.httpx.Client"):
|
||||
client = OpenMetadataClient(
|
||||
base_url="https://catalog.example.com/",
|
||||
token="test-token",
|
||||
)
|
||||
assert client.base_url == "https://catalog.example.com"
|
||||
|
||||
|
||||
def test_get_table_success():
|
||||
"""Test successful get_table() call."""
|
||||
with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class:
|
||||
mock_client_instance = MagicMock()
|
||||
mock_client_class.return_value = mock_client_instance
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"id": "table-id",
|
||||
"name": "roi_datamart_v2",
|
||||
"fullyQualifiedName": "bigquery.project.dataset.table",
|
||||
"description": "Test table",
|
||||
"columns": [
|
||||
{"name": "id", "dataType": "INTEGER", "description": "ID column"},
|
||||
{"name": "name", "dataType": "STRING", "description": "Name column"},
|
||||
],
|
||||
"tags": [{"name": "important"}],
|
||||
"owners": [{"name": "Data Team", "email": "data@example.com"}],
|
||||
}
|
||||
mock_client_instance.get.return_value = mock_response
|
||||
|
||||
client = OpenMetadataClient(
|
||||
base_url="https://catalog.example.com",
|
||||
token="test-token",
|
||||
)
|
||||
result = client.get_table("bigquery.project.dataset.table")
|
||||
|
||||
assert result["name"] == "roi_datamart_v2"
|
||||
assert len(result["columns"]) == 2
|
||||
|
||||
# Verify correct API endpoint and params
|
||||
mock_client_instance.get.assert_called_once()
|
||||
call_args = mock_client_instance.get.call_args
|
||||
assert "/api/v1/tables/name/bigquery.project.dataset.table" in str(call_args)
|
||||
|
||||
|
||||
def test_get_table_http_error():
|
||||
"""Test get_table() with HTTP error."""
|
||||
with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class:
|
||||
mock_client_instance = MagicMock()
|
||||
mock_client_class.return_value = mock_client_instance
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
|
||||
"401 Unauthorized",
|
||||
request=MagicMock(),
|
||||
response=MagicMock(status_code=401),
|
||||
)
|
||||
mock_client_instance.get.return_value = mock_response
|
||||
|
||||
client = OpenMetadataClient(
|
||||
base_url="https://catalog.example.com",
|
||||
token="invalid-token",
|
||||
)
|
||||
|
||||
with pytest.raises(httpx.HTTPStatusError):
|
||||
client.get_table("bigquery.project.dataset.table")
|
||||
|
||||
|
||||
def test_get_metrics_success():
|
||||
"""Test successful get_metrics() call."""
|
||||
with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class:
|
||||
mock_client_instance = MagicMock()
|
||||
mock_client_class.return_value = mock_client_instance
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {
|
||||
"data": [
|
||||
{
|
||||
"id": "metric-1",
|
||||
"name": "revenue",
|
||||
"fullyQualifiedName": "metrics.revenue",
|
||||
"description": "Total revenue",
|
||||
"expression": "SUM(amount)",
|
||||
},
|
||||
{
|
||||
"id": "metric-2",
|
||||
"name": "users",
|
||||
"fullyQualifiedName": "metrics.users",
|
||||
"description": "Active users",
|
||||
"expression": "COUNT(DISTINCT user_id)",
|
||||
},
|
||||
]
|
||||
}
|
||||
mock_client_instance.get.return_value = mock_response
|
||||
|
||||
client = OpenMetadataClient(
|
||||
base_url="https://catalog.example.com",
|
||||
token="test-token",
|
||||
)
|
||||
result = client.get_metrics(limit=10)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0]["name"] == "revenue"
|
||||
assert result[1]["name"] == "users"
|
||||
|
||||
|
||||
def test_context_manager():
|
||||
"""Test client can be used as context manager."""
|
||||
with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class:
|
||||
mock_client_instance = MagicMock()
|
||||
mock_client_class.return_value = mock_client_instance
|
||||
|
||||
with OpenMetadataClient(
|
||||
base_url="https://catalog.example.com",
|
||||
token="test-token",
|
||||
) as client:
|
||||
assert client is not None
|
||||
|
||||
# Verify close() was called
|
||||
mock_client_instance.close.assert_called_once()
|
||||
399
tests/test_openmetadata_enricher.py
Normal file
399
tests/test_openmetadata_enricher.py
Normal file
|
|
@ -0,0 +1,399 @@
|
|||
"""
|
||||
Tests for OpenMetadata catalog enricher
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from dataclasses import dataclass
|
||||
|
||||
from src.config import TableConfig
|
||||
from connectors.openmetadata.enricher import (
|
||||
CatalogEnricher,
|
||||
CatalogTableData,
|
||||
CatalogColumnData,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_table_config():
|
||||
"""Sample table configuration."""
|
||||
return TableConfig(
|
||||
id="prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2",
|
||||
name="roi_datamart_v2",
|
||||
description="ROI metrics",
|
||||
primary_key="id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_om_response():
|
||||
"""Sample OpenMetadata API response."""
|
||||
return {
|
||||
"id": "table-uuid",
|
||||
"name": "roi_datamart_v2",
|
||||
"fullyQualifiedName": "bigquery.prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2",
|
||||
"description": "Daily ROI analytics",
|
||||
"columns": [
|
||||
{
|
||||
"name": "id",
|
||||
"dataType": "BIGINT",
|
||||
"description": "Record ID",
|
||||
"tags": [{"name": "pii"}],
|
||||
},
|
||||
{
|
||||
"name": "revenue",
|
||||
"dataType": "DECIMAL",
|
||||
"description": "Revenue amount",
|
||||
"tags": [],
|
||||
},
|
||||
],
|
||||
"tags": [{"name": "analytics"}, {"name": "daily"}],
|
||||
"owners": [
|
||||
{"name": "Analytics Team", "email": "analytics@example.com"},
|
||||
],
|
||||
"extension": {"tier": "Tier1"},
|
||||
}
|
||||
|
||||
|
||||
def test_enricher_disabled_no_config():
|
||||
"""Test enricher is disabled when openmetadata section is missing."""
|
||||
enricher = CatalogEnricher({})
|
||||
assert enricher.enabled is False
|
||||
|
||||
|
||||
def test_enricher_disabled_no_token():
|
||||
"""Test enricher is disabled when token is missing."""
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
# no token
|
||||
}
|
||||
}
|
||||
)
|
||||
assert enricher.enabled is False
|
||||
|
||||
|
||||
def test_enricher_disabled_no_url():
|
||||
"""Test enricher is disabled when URL is missing."""
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"token": "test-token",
|
||||
# no url
|
||||
}
|
||||
}
|
||||
)
|
||||
assert enricher.enabled is False
|
||||
|
||||
|
||||
def test_enricher_init_success():
|
||||
"""Test enricher initialization with valid config."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
"cache_ttl_seconds": 3600,
|
||||
}
|
||||
}
|
||||
)
|
||||
assert enricher.enabled is True
|
||||
|
||||
|
||||
def test_enrich_table_disabled():
|
||||
"""Test enrich_table returns None when enricher is disabled."""
|
||||
enricher = CatalogEnricher({})
|
||||
|
||||
table_config = TableConfig(
|
||||
id="test.table",
|
||||
name="test",
|
||||
description="Test",
|
||||
primary_key="id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
|
||||
result = enricher.enrich_table(table_config)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_enrich_table_cache_hit():
|
||||
"""Test enrich_table returns cached data."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Pre-populate cache
|
||||
cached_data = CatalogTableData(
|
||||
description="Cached description",
|
||||
columns={"id": CatalogColumnData(description="ID", data_type="BIGINT")},
|
||||
)
|
||||
enricher._cache_entry(
|
||||
"bigquery.prj-grp-dataview-prod-1ff9.marketing.test",
|
||||
cached_data,
|
||||
)
|
||||
|
||||
table_config = TableConfig(
|
||||
id="prj-grp-dataview-prod-1ff9.marketing.test",
|
||||
name="test",
|
||||
description="Test",
|
||||
primary_key="id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
|
||||
result = enricher.enrich_table(table_config)
|
||||
assert result is not None
|
||||
assert result.description == "Cached description"
|
||||
|
||||
|
||||
def test_enrich_table_cache_expiry():
|
||||
"""Test cache entry expires after TTL."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
"cache_ttl_seconds": 1, # 1 second TTL
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
# Pre-populate cache with old entry
|
||||
cached_data = CatalogTableData(
|
||||
description="Old data",
|
||||
columns={},
|
||||
)
|
||||
fqn = "bigquery.prj-grp-dataview-prod-1ff9.marketing.test"
|
||||
enricher._cache[fqn] = {
|
||||
"data": cached_data,
|
||||
"fetched_at": datetime.now() - timedelta(seconds=2), # 2 seconds old
|
||||
}
|
||||
|
||||
# Should return None due to expiry
|
||||
result = enricher._get_from_cache(fqn)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_derive_fqn_auto():
|
||||
"""Test FQN auto-derivation from table ID."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
table_config = TableConfig(
|
||||
id="prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2",
|
||||
name="roi_datamart_v2",
|
||||
description="Test",
|
||||
primary_key="id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
|
||||
fqn = enricher._derive_fqn(table_config)
|
||||
assert fqn == "bigquery.prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2"
|
||||
|
||||
|
||||
def test_derive_fqn_explicit_override():
|
||||
"""Test FQN explicit override via catalog_fqn."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
table_config = TableConfig(
|
||||
id="prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2",
|
||||
name="roi_datamart_v2",
|
||||
description="Test",
|
||||
primary_key="id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
table_config.catalog_fqn = "bigquery.custom.fqn.override"
|
||||
|
||||
fqn = enricher._derive_fqn(table_config)
|
||||
assert fqn == "bigquery.custom.fqn.override"
|
||||
|
||||
|
||||
def test_parse_table_response(sample_om_response):
|
||||
"""Test parsing OpenMetadata table response."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
result = enricher._parse_table_response(sample_om_response)
|
||||
|
||||
assert result is not None
|
||||
assert result.description == "Daily ROI analytics"
|
||||
assert len(result.columns) == 2
|
||||
|
||||
# Check lowercase column key
|
||||
assert "id" in result.columns
|
||||
assert result.columns["id"].description == "Record ID"
|
||||
assert result.columns["id"].data_type == "BIGINT"
|
||||
|
||||
assert len(result.tags) == 2
|
||||
assert "analytics" in result.tags
|
||||
|
||||
assert len(result.owners) == 1
|
||||
assert "Analytics Team" in result.owners
|
||||
|
||||
assert result.tier == "Tier1"
|
||||
assert "catalog.example.com" in result.catalog_url
|
||||
|
||||
|
||||
def test_parse_table_response_with_minimal_data():
|
||||
"""Test parsing response with minimal fields."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
minimal_response = {
|
||||
"name": "minimal_table",
|
||||
"fullyQualifiedName": "bigquery.minimal.table",
|
||||
# Missing description, columns, tags, owners, extension
|
||||
}
|
||||
|
||||
result = enricher._parse_table_response(minimal_response)
|
||||
|
||||
assert result is not None
|
||||
assert result.description == ""
|
||||
assert len(result.columns) == 0
|
||||
assert len(result.tags) == 0
|
||||
assert len(result.owners) == 0
|
||||
assert result.tier is None
|
||||
|
||||
|
||||
def test_extract_tags():
|
||||
"""Test tag extraction."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
tags = [
|
||||
{"name": "important"},
|
||||
{"tagFQN": "tags.sensitive"},
|
||||
{"name": "", "tagFQN": "tags.fallback"}, # Test fallback
|
||||
]
|
||||
|
||||
result = enricher._extract_tags(tags)
|
||||
assert "important" in result
|
||||
assert "sensitive" in result
|
||||
assert "fallback" in result
|
||||
|
||||
|
||||
def test_cache_behavior():
|
||||
"""Test cache hit and miss."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
"cache_ttl_seconds": 3600,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
fqn = "bigquery.test.table"
|
||||
data = CatalogTableData(
|
||||
description="Test",
|
||||
columns={},
|
||||
)
|
||||
|
||||
# Cache miss
|
||||
assert enricher._get_from_cache(fqn) is None
|
||||
|
||||
# Cache entry
|
||||
enricher._cache_entry(fqn, data)
|
||||
|
||||
# Cache hit
|
||||
cached = enricher._get_from_cache(fqn)
|
||||
assert cached is not None
|
||||
assert cached.description == "Test"
|
||||
|
||||
|
||||
def test_clear_cache():
|
||||
"""Test cache clearing."""
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient"):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
data = CatalogTableData(description="Test", columns={})
|
||||
enricher._cache_entry("bigquery.test1", data)
|
||||
enricher._cache_entry("bigquery.test2", data)
|
||||
|
||||
assert len(enricher._cache) == 2
|
||||
|
||||
enricher.clear_cache()
|
||||
assert len(enricher._cache) == 0
|
||||
|
||||
|
||||
def test_enrich_table_http_error_graceful():
|
||||
"""Test enrich_table gracefully handles HTTP errors."""
|
||||
mock_client = MagicMock()
|
||||
mock_client.get_table.side_effect = Exception("Connection refused")
|
||||
|
||||
with patch("connectors.openmetadata.enricher.OpenMetadataClient", return_value=mock_client):
|
||||
enricher = CatalogEnricher(
|
||||
{
|
||||
"openmetadata": {
|
||||
"url": "https://catalog.example.com",
|
||||
"token": "test-token",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
table_config = TableConfig(
|
||||
id="test.table",
|
||||
name="test",
|
||||
description="Test",
|
||||
primary_key="id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
|
||||
# Should return None, not raise
|
||||
result = enricher.enrich_table(table_config)
|
||||
assert result is None
|
||||
|
|
@ -48,6 +48,14 @@ from .user_service import (
|
|||
validate_ssh_key,
|
||||
)
|
||||
|
||||
# Optional OpenMetadata catalog enrichment
|
||||
try:
|
||||
from connectors.openmetadata.enricher import CatalogEnricher
|
||||
_CATALOG_ENRICHER_AVAILABLE = True
|
||||
except ImportError:
|
||||
_CATALOG_ENRICHER_AVAILABLE = False
|
||||
CatalogEnricher = None
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
|
|
@ -55,9 +63,14 @@ logging.basicConfig(
|
|||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global catalog enricher (initialized in create_app)
|
||||
_catalog_enricher = None
|
||||
|
||||
|
||||
def create_app() -> Flask:
|
||||
"""Create and configure the Flask application."""
|
||||
global _catalog_enricher
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config.from_object(Config)
|
||||
|
||||
|
|
@ -67,6 +80,18 @@ def create_app() -> Flask:
|
|||
for error in errors:
|
||||
logger.warning(f"Configuration warning: {error}")
|
||||
|
||||
# Initialize OpenMetadata catalog enricher
|
||||
if _CATALOG_ENRICHER_AVAILABLE:
|
||||
try:
|
||||
from config.loader import load_instance_config
|
||||
instance_config = load_instance_config()
|
||||
_catalog_enricher = CatalogEnricher(instance_config)
|
||||
if _catalog_enricher.enabled:
|
||||
logger.info("OpenMetadata catalog enricher initialized")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize catalog enricher: {e}")
|
||||
_catalog_enricher = None
|
||||
|
||||
# Register core auth blueprint (login_required, login page, logout)
|
||||
app.register_blueprint(auth_bp)
|
||||
|
||||
|
|
@ -360,13 +385,41 @@ def _load_catalog_data() -> list:
|
|||
# Determine if "large" badge
|
||||
rows_large = rows >= 1_000_000
|
||||
|
||||
categories[folder].append({
|
||||
table_info = {
|
||||
"name": table.get("name", ""),
|
||||
"description": table.get("description", ""),
|
||||
"rows": rows,
|
||||
"rows_display": rows_display,
|
||||
"rows_large": rows_large,
|
||||
})
|
||||
}
|
||||
|
||||
# Enrich with catalog metadata (OpenMetadata)
|
||||
if _catalog_enricher:
|
||||
try:
|
||||
# Create minimal config for enrichment
|
||||
from src.config import TableConfig
|
||||
table_config = TableConfig(
|
||||
id=table_id,
|
||||
name=table.get("name", ""),
|
||||
description=table.get("description", ""),
|
||||
primary_key=table.get("primary_key", "id"),
|
||||
sync_strategy=table.get("sync_strategy", "full_refresh"),
|
||||
catalog_fqn=table.get("catalog_fqn"),
|
||||
)
|
||||
catalog_data = _catalog_enricher.enrich_table(table_config)
|
||||
if catalog_data:
|
||||
# Enrich table info with catalog data
|
||||
table_info["catalog_tags"] = catalog_data.tags
|
||||
table_info["catalog_tier"] = catalog_data.tier
|
||||
table_info["catalog_owners"] = catalog_data.owners
|
||||
table_info["catalog_url"] = catalog_data.catalog_url
|
||||
# Override description if catalog has one
|
||||
if catalog_data.description:
|
||||
table_info["description"] = catalog_data.description
|
||||
except Exception as e:
|
||||
logger.warning(f"Error enriching {table.get('name')}: {e}")
|
||||
|
||||
categories[folder].append(table_info)
|
||||
|
||||
# Build ordered catalog (from instance config or use discovered folders)
|
||||
try:
|
||||
|
|
|
|||
Loading…
Reference in a new issue