From c5c24cb45b6f361957bdee711b58bcabb3532d40 Mon Sep 17 00:00:00 2001 From: Petr Date: Thu, 12 Mar 2026 14:07:13 +0100 Subject: [PATCH] Implement OpenMetadata catalog integration (Phase 1) Add OpenMetadata REST API connector and enricher to merge table/column metadata from OpenMetadata catalog at sync and query time. Changes: - connectors/openmetadata/client.py: HTTP client for OM API - connectors/openmetadata/enricher.py: Data enrichment with TTL cache - tests/test_openmetadata_*: Unit tests for client and enricher - src/config.py: Add catalog_fqn field to TableConfig - src/data_sync.py: Use enricher in _generate_schema_yaml (catalog > BQ API > data_description.md) - webapp/app.py: Initialize enricher, enrich catalog data with tags/tier/owners/url - config/instance.yaml.example: Document openmetadata section Features: - FQN auto-derivation: bigquery.{table.id} - TTL cache (default 1h) to avoid repeated API calls - Graceful degradation: disabled if token missing, silent on HTTP errors - Column description priority: catalog > BQ API > (none) - Table description priority: catalog > data_description.md --- config/instance.yaml.example | 8 + connectors/openmetadata/__init__.py | 1 + connectors/openmetadata/client.py | 120 +++++++++ connectors/openmetadata/enricher.py | 312 ++++++++++++++++++++++ src/config.py | 2 + src/data_sync.py | 36 ++- tests/test_openmetadata_client.py | 157 +++++++++++ tests/test_openmetadata_enricher.py | 399 ++++++++++++++++++++++++++++ webapp/app.py | 57 +++- 9 files changed, 1086 insertions(+), 6 deletions(-) create mode 100644 connectors/openmetadata/__init__.py create mode 100644 connectors/openmetadata/client.py create mode 100644 connectors/openmetadata/enricher.py create mode 100644 tests/test_openmetadata_client.py create mode 100644 tests/test_openmetadata_enricher.py diff --git a/config/instance.yaml.example b/config/instance.yaml.example index c783409..4409de4 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -77,6 +77,14 @@ data_source: # Uses ADC (Application Default Credentials) - VM service account on GCP # Data can live in a different project -- use fully-qualified table IDs in data_description.md +# --- OpenMetadata catalog (optional - Groupon-specific) --- +# Enriches table and column metadata from OpenMetadata REST API. +# If not configured, app works normally without catalog enrichment. +# openmetadata: +# url: "https://your-catalog.example.com" +# token: "${OPENMETADATA_TOKEN}" # JWT bearer token +# cache_ttl_seconds: 3600 # Cache TTL in seconds + # --- Email delivery (optional, for magic link auth) --- # Without SMTP, magic links are shown directly in browser (development mode). # For production, configure any SMTP relay (Gmail, Mailgun, SendGrid SMTP, etc.) diff --git a/connectors/openmetadata/__init__.py b/connectors/openmetadata/__init__.py new file mode 100644 index 0000000..443440e --- /dev/null +++ b/connectors/openmetadata/__init__.py @@ -0,0 +1 @@ +"""OpenMetadata catalog integration connector.""" diff --git a/connectors/openmetadata/client.py b/connectors/openmetadata/client.py new file mode 100644 index 0000000..89042b0 --- /dev/null +++ b/connectors/openmetadata/client.py @@ -0,0 +1,120 @@ +""" +OpenMetadata REST API Client + +Low-level HTTP wrapper for OpenMetadata REST API with these functions: +1. Authentication using JWT bearer token +2. Get table metadata (description, columns, tags, owners) +3. Get metrics (for Phase 2) +4. Proper error handling and logging +""" + +import logging +from typing import Dict, List, Optional, Any + +import httpx + + +logger = logging.getLogger(__name__) + + +class OpenMetadataClient: + """ + HTTP client for OpenMetadata REST API. + + Provides methods for querying table metadata: + - get_table(fqn) -> table metadata with columns, owners, tags + - get_metrics() -> list of available business metrics + """ + + def __init__( + self, + base_url: str, + token: str, + timeout: int = 30, + ): + """ + Initialize OpenMetadata API client. + + Args: + base_url: Base URL of OpenMetadata instance (e.g., "https://catalog.example.com") + token: JWT bearer token for authentication + timeout: HTTP request timeout in seconds + """ + self.base_url = base_url.rstrip("/") + self.token = token + self.timeout = timeout + self._client = httpx.Client( + base_url=self.base_url, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }, + timeout=timeout, + ) + + def get_table(self, fqn: str) -> Dict[str, Any]: + """ + Fetch table metadata from OpenMetadata. + + Args: + fqn: Fully qualified name (e.g., "bigquery.project.dataset.table") + + Returns: + Dictionary with table metadata including: + - id, name, fullyQualifiedName + - description + - columns: list of column dicts with name, dataType, description + - tags: list of tag dicts + - owners: list of owner dicts with name, email + - extension: custom metadata (e.g., tier) + + Raises: + httpx.HTTPStatusError: If request fails (non-2xx status) + """ + url = f"/api/v1/tables/name/{fqn}" + params = { + "fields": "columns,owners,tags,extension", + "include": "all", + } + + response = self._client.get(url, params=params) + response.raise_for_status() + + return response.json() + + def get_metrics(self, limit: int = 100) -> List[Dict[str, Any]]: + """ + Fetch list of available metrics from OpenMetadata (Phase 2). + + Args: + limit: Maximum number of metrics to return + + Returns: + List of metric dictionaries with: + - id, name, fullyQualifiedName + - description + - expression: metric calculation SQL/formula + - owners, tags + """ + params = { + "limit": limit, + "fields": "description,expression,owners,tags", + } + + response = self._client.get("/api/v1/metrics", params=params) + response.raise_for_status() + + data = response.json() + return data.get("data", []) + + def close(self): + """Close HTTP client session.""" + self._client.close() + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close() diff --git a/connectors/openmetadata/enricher.py b/connectors/openmetadata/enricher.py new file mode 100644 index 0000000..496a44a --- /dev/null +++ b/connectors/openmetadata/enricher.py @@ -0,0 +1,312 @@ +""" +OpenMetadata Catalog Data Enricher + +High-level enrichment layer that: +1. Initializes from instance config (disabled gracefully if no token) +2. Caches table metadata with TTL (default 1 hour) +3. Parses OpenMetadata responses into typed data +4. Enriches table/column metadata at sync and query time +5. Gracefully degrades on errors (never crashes app) +""" + +import logging +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional, Any + +from src.config import TableConfig +from .client import OpenMetadataClient + + +logger = logging.getLogger(__name__) + + +@dataclass +class CatalogColumnData: + """Column metadata enriched from OpenMetadata catalog.""" + + description: str + data_type: str + tags: List[str] = field(default_factory=list) + + +@dataclass +class CatalogTableData: + """Table metadata enriched from OpenMetadata catalog.""" + + description: str + columns: Dict[str, CatalogColumnData] # key = lowercase column name + tags: List[str] = field(default_factory=list) + owners: List[str] = field(default_factory=list) # owner names + tier: Optional[str] = None # "Tier1", "Tier2", etc. + catalog_url: Optional[str] = None # Direct link to catalog + + +class CatalogEnricher: + """ + Enriches table and column metadata from OpenMetadata catalog. + + Usage: + enricher = CatalogEnricher(instance_config) + if enricher.enabled: + catalog_data = enricher.enrich_table(table_config) + # Use catalog_data.description, columns, tags, owners, tier + """ + + enabled: bool + + def __init__(self, instance_config: Dict[str, Any]): + """ + Initialize enricher from instance config. + + Args: + instance_config: Dictionary with optional "openmetadata" section: + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "jwt-token-or-env-var", + "cache_ttl_seconds": 3600 + } + } + + Sets self.enabled = False if: + - No "openmetadata" section in config + - No token provided (returns gracefully, not an error) + - URL is invalid + """ + self.enabled = False + self._client: Optional[OpenMetadataClient] = None + self._cache: Dict[str, Dict[str, Any]] = {} + self._cache_ttl_seconds = 3600 + + try: + om_config = instance_config.get("openmetadata", {}) + if not om_config: + logger.debug("OpenMetadata not configured (openmetadata section missing)") + return + + url = om_config.get("url", "").strip() + token = om_config.get("token", "").strip() + cache_ttl = om_config.get("cache_ttl_seconds", 3600) + + if not url or not token: + logger.debug( + f"OpenMetadata disabled: url={bool(url)}, token={bool(token)}" + ) + return + + self._cache_ttl_seconds = cache_ttl + self._client = OpenMetadataClient(base_url=url, token=token) + self.enabled = True + + logger.info( + f"OpenMetadata enricher initialized: {url} (cache TTL: {cache_ttl}s)" + ) + + except Exception as e: + logger.warning( + f"Failed to initialize OpenMetadata enricher: {e}. Continuing without catalog enrichment." + ) + self.enabled = False + + def enrich_table( + self, table_config: TableConfig + ) -> Optional[CatalogTableData]: + """ + Enrich table metadata from catalog. + + Args: + table_config: Table configuration with id and optional catalog_fqn + + Returns: + CatalogTableData with description, columns, tags, owners, tier + or None if: + - enricher is disabled + - FQN derivation fails + - HTTP request fails + - Parsing fails + Always returns None gracefully, never raises exception. + """ + if not self.enabled or not self._client: + return None + + try: + fqn = self._derive_fqn(table_config) + if not fqn: + return None + + # Check cache + cached = self._get_from_cache(fqn) + if cached: + logger.debug(f"Catalog cache hit: {fqn}") + return cached + + # Fetch from API + logger.debug(f"Fetching catalog metadata: {fqn}") + raw_response = self._client.get_table(fqn) + + # Parse and cache + parsed = self._parse_table_response(raw_response) + if parsed: + self._cache_entry(fqn, parsed) + + return parsed + + except Exception as e: + logger.warning(f"Error enriching table {table_config.name}: {e}") + return None + + def _derive_fqn(self, table_config: TableConfig) -> Optional[str]: + """ + Derive OpenMetadata FQN from table config. + + Auto-derivation: bigquery.{table_config.id} + Example: table_config.id = "prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2" + -> FQN = "bigquery.prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2" + + Args: + table_config: Configuration with id and optional catalog_fqn + + Returns: + FQN string or None if derivation fails + """ + try: + # Use explicit override if provided + if hasattr(table_config, "catalog_fqn") and table_config.catalog_fqn: + return table_config.catalog_fqn + + # Auto-derive: bigquery.{table_id} + return f"bigquery.{table_config.id}" + + except Exception as e: + logger.warning(f"FQN derivation failed for {table_config.name}: {e}") + return None + + def _parse_table_response(self, raw: Dict[str, Any]) -> Optional[CatalogTableData]: + """ + Parse OpenMetadata table response into typed CatalogTableData. + + Args: + raw: Raw response from OpenMetadata /api/v1/tables/name/{fqn} + + Returns: + CatalogTableData or None if parsing fails + """ + try: + description = raw.get("description", "") or "" + + # Parse columns + columns = {} + for col in raw.get("columns", []): + col_name = col.get("name", "").lower() + col_description = col.get("description", "") or "" + col_type = col.get("dataType", "") + + columns[col_name] = CatalogColumnData( + description=col_description, + data_type=col_type, + tags=self._extract_column_tags(col), + ) + + # Parse tags + tags = self._extract_tags(raw.get("tags", [])) + + # Parse owners + owners = self._extract_owners(raw.get("owners", [])) + + # Parse tier from extension metadata + tier = None + extension = raw.get("extension", {}) + if extension: + tier = extension.get("tier") or extension.get("Tier") + + # Build catalog URL + fqn = raw.get("fullyQualifiedName", "") + catalog_url = None + if fqn: + # Parse base URL from FQN context (would need base_url from config ideally) + # For now, construct a reasonable path + catalog_url = f"{self._client.base_url}/explore/{fqn}" + + return CatalogTableData( + description=description, + columns=columns, + tags=tags, + owners=owners, + tier=tier, + catalog_url=catalog_url, + ) + + except Exception as e: + logger.warning(f"Failed to parse catalog response: {e}") + return None + + def _extract_tags(self, tags_list: List[Dict[str, Any]]) -> List[str]: + """Extract tag names from tags list.""" + try: + return [tag.get("name") or tag.get("tagFQN", "").split(".")[-1] for tag in tags_list] + except Exception: + return [] + + def _extract_column_tags(self, column: Dict[str, Any]) -> List[str]: + """Extract tags for a single column.""" + return self._extract_tags(column.get("tags", [])) + + def _extract_owners(self, owners_list: List[Dict[str, Any]]) -> List[str]: + """Extract owner names from owners list.""" + try: + names = [] + for owner in owners_list: + name = owner.get("name") or owner.get("displayName", "") + if name: + names.append(name) + return names + except Exception: + return [] + + def _get_from_cache(self, fqn: str) -> Optional[CatalogTableData]: + """ + Check if cached entry exists and is still valid (TTL). + + Args: + fqn: Fully qualified name + + Returns: + CatalogTableData if valid, None if expired or missing + """ + if fqn not in self._cache: + return None + + entry = self._cache[fqn] + fetched_at = entry.get("fetched_at") + + if not fetched_at: + return None + + age = datetime.now() - fetched_at + if age > timedelta(seconds=self._cache_ttl_seconds): + del self._cache[fqn] + return None + + return entry.get("data") + + def _cache_entry(self, fqn: str, data: CatalogTableData): + """Cache an enriched table entry.""" + self._cache[fqn] = { + "data": data, + "fetched_at": datetime.now(), + } + + def clear_cache(self): + """Manually clear all cached entries.""" + self._cache.clear() + logger.info("Catalog cache cleared") + + def __del__(self): + """Cleanup HTTP client on deletion.""" + if self._client: + try: + self._client.close() + except Exception: + pass diff --git a/src/config.py b/src/config.py index 4ee3df9..1975dc9 100644 --- a/src/config.py +++ b/src/config.py @@ -106,6 +106,7 @@ class TableConfig: row_filter: Optional[str] = None # SQL WHERE clause for filtering (e.g., "event_date >= '2024-01-01'") query_mode: str = "local" # "local" (Parquet) | "remote" (BQ direct) | "hybrid" (sync subset, query BQ) partition_column_type: str = "TIMESTAMP" # BQ SQL type for partition column: "DATE", "TIMESTAMP", "DATETIME" + catalog_fqn: Optional[str] = None # Explicit OpenMetadata FQN override (auto-derived if not set) def __post_init__(self): """Validate configuration after initialization.""" @@ -455,6 +456,7 @@ class Config: row_filter=table_data.get("row_filter"), query_mode=table_data.get("query_mode", "local"), partition_column_type=table_data.get("partition_column_type", "TIMESTAMP"), + catalog_fqn=table_data.get("catalog_fqn"), ) table_configs.append(config) diff --git a/src/data_sync.py b/src/data_sync.py index bd601ca..337c42f 100644 --- a/src/data_sync.py +++ b/src/data_sync.py @@ -27,6 +27,8 @@ from datetime import datetime from tqdm import tqdm from .config import get_config, TableConfig +from config.loader import load_instance_config +from connectors.openmetadata.enricher import CatalogEnricher logger = logging.getLogger(__name__) @@ -217,6 +219,14 @@ class DataSyncManager: ) self.data_source = create_data_source() + # Initialize OpenMetadata catalog enricher + try: + instance_config = load_instance_config() + self.catalog_enricher = CatalogEnricher(instance_config) + except Exception as e: + logger.warning(f"Failed to initialize catalog enricher: {e}") + self.catalog_enricher = CatalogEnricher({}) # Disabled enricher + def _generate_schema_yaml(self): """ Generate schema.yml file with actual table schemas from Parquet files. @@ -269,10 +279,14 @@ class DataSyncManager: # Get column metadata from data source (if supported) col_metadata = self.data_source.get_column_metadata(table_config.id) + # Enrich with catalog metadata (OpenMetadata) + catalog_data = self.catalog_enricher.enrich_table(table_config) + # Extract column information columns = [] for field_item in arrow_schema: col_name = field_item.name + col_name_lower = col_name.lower() pyarrow_type = str(field_item.type) column_info = { @@ -280,21 +294,35 @@ class DataSyncManager: "type": pyarrow_type, } - # Add source type and description from connector metadata + # Priority for description: catalog > BQ API > (nothing) + description = None + if catalog_data and col_name_lower in catalog_data.columns: + description = catalog_data.columns[col_name_lower].description + elif col_metadata and "columns" in col_metadata: + col_meta = col_metadata["columns"].get(col_name, {}) + description = col_meta.get("description") + + if description: + column_info["description"] = description + + # Add source type from connector metadata if col_metadata and "columns" in col_metadata: col_meta = col_metadata["columns"].get(col_name, {}) if "source_type" in col_meta: column_info["source_type"] = col_meta["source_type"] - if "description" in col_meta: - column_info["description"] = col_meta["description"] columns.append(column_info) primary_key = table_config.get_primary_key_columns() + # Priority for table description: catalog > data_description.md + table_description = table_config.description + if catalog_data: + table_description = catalog_data.description or table_description + table_info = { "table_id": table_config.id, - "description": table_config.description, + "description": table_description, "primary_key": primary_key, "sync_strategy": table_config.sync_strategy, "columns": columns, diff --git a/tests/test_openmetadata_client.py b/tests/test_openmetadata_client.py new file mode 100644 index 0000000..5e8fe62 --- /dev/null +++ b/tests/test_openmetadata_client.py @@ -0,0 +1,157 @@ +""" +Tests for OpenMetadata client +""" + +import pytest +import httpx +from unittest.mock import Mock, patch, MagicMock + +from connectors.openmetadata.client import OpenMetadataClient + + +@pytest.fixture +def mock_httpx_client(): + """Mock httpx.Client.""" + with patch("connectors.openmetadata.client.httpx.Client") as mock: + yield mock + + +def test_client_init(mock_httpx_client): + """Test OpenMetadataClient initialization.""" + client = OpenMetadataClient( + base_url="https://catalog.example.com", + token="test-token", + timeout=30, + ) + + assert client.base_url == "https://catalog.example.com" + assert client.token == "test-token" + assert client.timeout == 30 + + # Verify httpx.Client was called with correct headers + mock_httpx_client.assert_called_once() + call_kwargs = mock_httpx_client.call_args[1] + assert call_kwargs["headers"]["Authorization"] == "Bearer test-token" + + +def test_client_init_strips_trailing_slash(): + """Test that base_url trailing slash is stripped.""" + with patch("connectors.openmetadata.client.httpx.Client"): + client = OpenMetadataClient( + base_url="https://catalog.example.com/", + token="test-token", + ) + assert client.base_url == "https://catalog.example.com" + + +def test_get_table_success(): + """Test successful get_table() call.""" + with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class: + mock_client_instance = MagicMock() + mock_client_class.return_value = mock_client_instance + + mock_response = MagicMock() + mock_response.json.return_value = { + "id": "table-id", + "name": "roi_datamart_v2", + "fullyQualifiedName": "bigquery.project.dataset.table", + "description": "Test table", + "columns": [ + {"name": "id", "dataType": "INTEGER", "description": "ID column"}, + {"name": "name", "dataType": "STRING", "description": "Name column"}, + ], + "tags": [{"name": "important"}], + "owners": [{"name": "Data Team", "email": "data@example.com"}], + } + mock_client_instance.get.return_value = mock_response + + client = OpenMetadataClient( + base_url="https://catalog.example.com", + token="test-token", + ) + result = client.get_table("bigquery.project.dataset.table") + + assert result["name"] == "roi_datamart_v2" + assert len(result["columns"]) == 2 + + # Verify correct API endpoint and params + mock_client_instance.get.assert_called_once() + call_args = mock_client_instance.get.call_args + assert "/api/v1/tables/name/bigquery.project.dataset.table" in str(call_args) + + +def test_get_table_http_error(): + """Test get_table() with HTTP error.""" + with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class: + mock_client_instance = MagicMock() + mock_client_class.return_value = mock_client_instance + + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = httpx.HTTPStatusError( + "401 Unauthorized", + request=MagicMock(), + response=MagicMock(status_code=401), + ) + mock_client_instance.get.return_value = mock_response + + client = OpenMetadataClient( + base_url="https://catalog.example.com", + token="invalid-token", + ) + + with pytest.raises(httpx.HTTPStatusError): + client.get_table("bigquery.project.dataset.table") + + +def test_get_metrics_success(): + """Test successful get_metrics() call.""" + with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class: + mock_client_instance = MagicMock() + mock_client_class.return_value = mock_client_instance + + mock_response = MagicMock() + mock_response.json.return_value = { + "data": [ + { + "id": "metric-1", + "name": "revenue", + "fullyQualifiedName": "metrics.revenue", + "description": "Total revenue", + "expression": "SUM(amount)", + }, + { + "id": "metric-2", + "name": "users", + "fullyQualifiedName": "metrics.users", + "description": "Active users", + "expression": "COUNT(DISTINCT user_id)", + }, + ] + } + mock_client_instance.get.return_value = mock_response + + client = OpenMetadataClient( + base_url="https://catalog.example.com", + token="test-token", + ) + result = client.get_metrics(limit=10) + + assert len(result) == 2 + assert result[0]["name"] == "revenue" + assert result[1]["name"] == "users" + + +def test_context_manager(): + """Test client can be used as context manager.""" + with patch("connectors.openmetadata.client.httpx.Client") as mock_client_class: + mock_client_instance = MagicMock() + mock_client_class.return_value = mock_client_instance + + with OpenMetadataClient( + base_url="https://catalog.example.com", + token="test-token", + ) as client: + assert client is not None + + # Verify close() was called + mock_client_instance.close.assert_called_once() diff --git a/tests/test_openmetadata_enricher.py b/tests/test_openmetadata_enricher.py new file mode 100644 index 0000000..e3055ac --- /dev/null +++ b/tests/test_openmetadata_enricher.py @@ -0,0 +1,399 @@ +""" +Tests for OpenMetadata catalog enricher +""" + +import pytest +from datetime import datetime, timedelta +from unittest.mock import Mock, patch, MagicMock +from dataclasses import dataclass + +from src.config import TableConfig +from connectors.openmetadata.enricher import ( + CatalogEnricher, + CatalogTableData, + CatalogColumnData, +) + + +@pytest.fixture +def sample_table_config(): + """Sample table configuration.""" + return TableConfig( + id="prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2", + name="roi_datamart_v2", + description="ROI metrics", + primary_key="id", + sync_strategy="full_refresh", + ) + + +@pytest.fixture +def sample_om_response(): + """Sample OpenMetadata API response.""" + return { + "id": "table-uuid", + "name": "roi_datamart_v2", + "fullyQualifiedName": "bigquery.prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2", + "description": "Daily ROI analytics", + "columns": [ + { + "name": "id", + "dataType": "BIGINT", + "description": "Record ID", + "tags": [{"name": "pii"}], + }, + { + "name": "revenue", + "dataType": "DECIMAL", + "description": "Revenue amount", + "tags": [], + }, + ], + "tags": [{"name": "analytics"}, {"name": "daily"}], + "owners": [ + {"name": "Analytics Team", "email": "analytics@example.com"}, + ], + "extension": {"tier": "Tier1"}, + } + + +def test_enricher_disabled_no_config(): + """Test enricher is disabled when openmetadata section is missing.""" + enricher = CatalogEnricher({}) + assert enricher.enabled is False + + +def test_enricher_disabled_no_token(): + """Test enricher is disabled when token is missing.""" + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + # no token + } + } + ) + assert enricher.enabled is False + + +def test_enricher_disabled_no_url(): + """Test enricher is disabled when URL is missing.""" + enricher = CatalogEnricher( + { + "openmetadata": { + "token": "test-token", + # no url + } + } + ) + assert enricher.enabled is False + + +def test_enricher_init_success(): + """Test enricher initialization with valid config.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + "cache_ttl_seconds": 3600, + } + } + ) + assert enricher.enabled is True + + +def test_enrich_table_disabled(): + """Test enrich_table returns None when enricher is disabled.""" + enricher = CatalogEnricher({}) + + table_config = TableConfig( + id="test.table", + name="test", + description="Test", + primary_key="id", + sync_strategy="full_refresh", + ) + + result = enricher.enrich_table(table_config) + assert result is None + + +def test_enrich_table_cache_hit(): + """Test enrich_table returns cached data.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + # Pre-populate cache + cached_data = CatalogTableData( + description="Cached description", + columns={"id": CatalogColumnData(description="ID", data_type="BIGINT")}, + ) + enricher._cache_entry( + "bigquery.prj-grp-dataview-prod-1ff9.marketing.test", + cached_data, + ) + + table_config = TableConfig( + id="prj-grp-dataview-prod-1ff9.marketing.test", + name="test", + description="Test", + primary_key="id", + sync_strategy="full_refresh", + ) + + result = enricher.enrich_table(table_config) + assert result is not None + assert result.description == "Cached description" + + +def test_enrich_table_cache_expiry(): + """Test cache entry expires after TTL.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + "cache_ttl_seconds": 1, # 1 second TTL + } + } + ) + + # Pre-populate cache with old entry + cached_data = CatalogTableData( + description="Old data", + columns={}, + ) + fqn = "bigquery.prj-grp-dataview-prod-1ff9.marketing.test" + enricher._cache[fqn] = { + "data": cached_data, + "fetched_at": datetime.now() - timedelta(seconds=2), # 2 seconds old + } + + # Should return None due to expiry + result = enricher._get_from_cache(fqn) + assert result is None + + +def test_derive_fqn_auto(): + """Test FQN auto-derivation from table ID.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + table_config = TableConfig( + id="prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2", + name="roi_datamart_v2", + description="Test", + primary_key="id", + sync_strategy="full_refresh", + ) + + fqn = enricher._derive_fqn(table_config) + assert fqn == "bigquery.prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2" + + +def test_derive_fqn_explicit_override(): + """Test FQN explicit override via catalog_fqn.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + table_config = TableConfig( + id="prj-grp-dataview-prod-1ff9.marketing.roi_datamart_v2", + name="roi_datamart_v2", + description="Test", + primary_key="id", + sync_strategy="full_refresh", + ) + table_config.catalog_fqn = "bigquery.custom.fqn.override" + + fqn = enricher._derive_fqn(table_config) + assert fqn == "bigquery.custom.fqn.override" + + +def test_parse_table_response(sample_om_response): + """Test parsing OpenMetadata table response.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + result = enricher._parse_table_response(sample_om_response) + + assert result is not None + assert result.description == "Daily ROI analytics" + assert len(result.columns) == 2 + + # Check lowercase column key + assert "id" in result.columns + assert result.columns["id"].description == "Record ID" + assert result.columns["id"].data_type == "BIGINT" + + assert len(result.tags) == 2 + assert "analytics" in result.tags + + assert len(result.owners) == 1 + assert "Analytics Team" in result.owners + + assert result.tier == "Tier1" + assert "catalog.example.com" in result.catalog_url + + +def test_parse_table_response_with_minimal_data(): + """Test parsing response with minimal fields.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + minimal_response = { + "name": "minimal_table", + "fullyQualifiedName": "bigquery.minimal.table", + # Missing description, columns, tags, owners, extension + } + + result = enricher._parse_table_response(minimal_response) + + assert result is not None + assert result.description == "" + assert len(result.columns) == 0 + assert len(result.tags) == 0 + assert len(result.owners) == 0 + assert result.tier is None + + +def test_extract_tags(): + """Test tag extraction.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + tags = [ + {"name": "important"}, + {"tagFQN": "tags.sensitive"}, + {"name": "", "tagFQN": "tags.fallback"}, # Test fallback + ] + + result = enricher._extract_tags(tags) + assert "important" in result + assert "sensitive" in result + assert "fallback" in result + + +def test_cache_behavior(): + """Test cache hit and miss.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + "cache_ttl_seconds": 3600, + } + } + ) + + fqn = "bigquery.test.table" + data = CatalogTableData( + description="Test", + columns={}, + ) + + # Cache miss + assert enricher._get_from_cache(fqn) is None + + # Cache entry + enricher._cache_entry(fqn, data) + + # Cache hit + cached = enricher._get_from_cache(fqn) + assert cached is not None + assert cached.description == "Test" + + +def test_clear_cache(): + """Test cache clearing.""" + with patch("connectors.openmetadata.enricher.OpenMetadataClient"): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + data = CatalogTableData(description="Test", columns={}) + enricher._cache_entry("bigquery.test1", data) + enricher._cache_entry("bigquery.test2", data) + + assert len(enricher._cache) == 2 + + enricher.clear_cache() + assert len(enricher._cache) == 0 + + +def test_enrich_table_http_error_graceful(): + """Test enrich_table gracefully handles HTTP errors.""" + mock_client = MagicMock() + mock_client.get_table.side_effect = Exception("Connection refused") + + with patch("connectors.openmetadata.enricher.OpenMetadataClient", return_value=mock_client): + enricher = CatalogEnricher( + { + "openmetadata": { + "url": "https://catalog.example.com", + "token": "test-token", + } + } + ) + + table_config = TableConfig( + id="test.table", + name="test", + description="Test", + primary_key="id", + sync_strategy="full_refresh", + ) + + # Should return None, not raise + result = enricher.enrich_table(table_config) + assert result is None diff --git a/webapp/app.py b/webapp/app.py index c087a5f..4bbc2e2 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -48,6 +48,14 @@ from .user_service import ( validate_ssh_key, ) +# Optional OpenMetadata catalog enrichment +try: + from connectors.openmetadata.enricher import CatalogEnricher + _CATALOG_ENRICHER_AVAILABLE = True +except ImportError: + _CATALOG_ENRICHER_AVAILABLE = False + CatalogEnricher = None + # Configure logging logging.basicConfig( level=logging.INFO, @@ -55,9 +63,14 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +# Global catalog enricher (initialized in create_app) +_catalog_enricher = None + def create_app() -> Flask: """Create and configure the Flask application.""" + global _catalog_enricher + app = Flask(__name__) app.config.from_object(Config) @@ -67,6 +80,18 @@ def create_app() -> Flask: for error in errors: logger.warning(f"Configuration warning: {error}") + # Initialize OpenMetadata catalog enricher + if _CATALOG_ENRICHER_AVAILABLE: + try: + from config.loader import load_instance_config + instance_config = load_instance_config() + _catalog_enricher = CatalogEnricher(instance_config) + if _catalog_enricher.enabled: + logger.info("OpenMetadata catalog enricher initialized") + except Exception as e: + logger.warning(f"Failed to initialize catalog enricher: {e}") + _catalog_enricher = None + # Register core auth blueprint (login_required, login page, logout) app.register_blueprint(auth_bp) @@ -360,13 +385,41 @@ def _load_catalog_data() -> list: # Determine if "large" badge rows_large = rows >= 1_000_000 - categories[folder].append({ + table_info = { "name": table.get("name", ""), "description": table.get("description", ""), "rows": rows, "rows_display": rows_display, "rows_large": rows_large, - }) + } + + # Enrich with catalog metadata (OpenMetadata) + if _catalog_enricher: + try: + # Create minimal config for enrichment + from src.config import TableConfig + table_config = TableConfig( + id=table_id, + name=table.get("name", ""), + description=table.get("description", ""), + primary_key=table.get("primary_key", "id"), + sync_strategy=table.get("sync_strategy", "full_refresh"), + catalog_fqn=table.get("catalog_fqn"), + ) + catalog_data = _catalog_enricher.enrich_table(table_config) + if catalog_data: + # Enrich table info with catalog data + table_info["catalog_tags"] = catalog_data.tags + table_info["catalog_tier"] = catalog_data.tier + table_info["catalog_owners"] = catalog_data.owners + table_info["catalog_url"] = catalog_data.catalog_url + # Override description if catalog has one + if catalog_data.description: + table_info["description"] = catalog_data.description + except Exception as e: + logger.warning(f"Error enriching {table.get('name')}: {e}") + + categories[folder].append(table_info) # Build ordered catalog (from instance config or use discovered folders) try: