agnes-the-ai-analyst/connectors/openmetadata/transformer.py
ZdenekSrotyr 4e4d2a39e6
chore(oss): isolate customer-specific deploy bits from scripts/grpn/ (#88, wave 1) (#94)
* chore(oss): isolate customer-specific deploy bits from scripts/grpn/ (#88)

Vendor-neutralization step before public release. The directory mixed
two concerns: (1) generic ops scripts referenced from mainline OSS
infrastructure (TLS rotation, auto-upgrade cron) and (2) one operator's
hackathon manual-deploy helper with hardcoded GCP project IDs, VM names,
and admin emails. Splitting them per concern.

Moved (still in OSS, just under a vendor-neutral name):
- scripts/grpn/agnes-tls-rotate.sh   → scripts/ops/agnes-tls-rotate.sh
- scripts/grpn/agnes-auto-upgrade.sh → scripts/ops/agnes-auto-upgrade.sh

Removed (belongs in private consumer infra repos, not upstream OSS):
- scripts/grpn/Makefile (hardcoded prj-grp-foundryai-dev-7c37, foundryai-development VM name, e_zsrotyr@groupon.com bootstrap email)
- scripts/grpn/README.md (GRPN hackathon deploy walkthrough)
- docs/superpowers/plans/2026-04-22-grpn-deploy-learnings.md (org-specific deploy log)

Cross-refs updated in README.md, CLAUDE.md, docs/DEPLOYMENT.md,
docker-compose.yml. CHANGELOG entry flags BREAKING (ops) for any
consumer infra repo that installs these scripts via path-based systemd
timers.

This is the first wave of #88 — the remaining leaks (test data with
prj-grp-dataview-prod-1ff9, AIAgent.FoundryAI tags in OpenMetadata test
fixtures, docstrings in connectors/openmetadata/enricher.py) will be a
separate, smaller PR.

Refs #88.

* chore(oss): comprehensive vendor-neutralization (#88 wave 2 + review fixes)

PR #94 review found that the original wave-1 grep was scoped wrong and
many leaks survived. This commit closes wave 1 properly AND folds in all
wave-2 anonymization in a single pass — easier to review than two PRs.

Wave-1 review-fix corrections:
- Caddyfile: scripts/grpn/agnes-tls-rotate.sh → scripts/ops/ (the original
  wave-1 grep filter excluded extensionless files like Caddyfile).
- CHANGELOG bullet rewritten — original wording implied an in-repo migration
  for infra/modules/customer-instance/, which is wrong (the TF module embeds
  the script inline via heredoc, never sourced from scripts/grpn/). Now
  flags downstream consumer infra repos only.
- infra/modules/customer-instance/variables.tf: Czech docstring with `grpn`
  example → English description with `acme, example` placeholders.

Wave-2 anonymization:
- Code docstrings (connectors/openmetadata/{client,transformer,enricher}.py,
  src/catalog_export.py, scripts/duckdb_manager.py): prj-grp-… →
  my-bq-project / prj-example-1234, AIAgent.FoundryAI → AIAgent.MyAgent,
  FoundryAIDataModel → AnalyticsDataModel.
- Test fixtures (4 files): same set of replacements — 157 tests still pass.
- .github/workflows/keboola-deploy.yml: "Groupon-side dev VMs" comment →
  generic "per-developer dev VMs".
- docs/auth-groups.md + scripts/debug/probe_google_groups.py:
  kids-ai-data-analysis project name → acme-internal-prod placeholder.
- 5 planning/spec docs under docs/superpowers/{plans,specs}/2026-04-21-*:
  hardcoded IPs (34.77.94.14, 34.77.102.61) → <dev-vm-ip>/<prod-vm-ip>;
  GRPN/Groupon → Acme/another-customer; prj-grp-… → prj-example-….
- scripts/switch-dev-vm.sh deleted — hackathon-era helper hardcoded to a
  specific shared dev VM. Per-developer dev VMs are the supported pattern.

Final grep `groupon|grpn|foundryai|prj-grp|groupondev|34\.77\.(94|102)\.…|kids-ai-data`
returns zero hits (excluding CHANGELOG.md historical entries).

CHANGELOG entry expanded to document both waves under one bullet, with
the BREAKING (ops) clarification about the TF module being unaffected.

Refs review of #94, closes #88.

* fix(oss): close remaining #94 review-2 findings (Czech, padak refs, CHANGELOG)

Reviewer of PR #94 round 2 caught 4 remaining items the wave-2 pass missed:

1. infra/modules/customer-instance/variables.tf had Czech descriptions on
   8 more variables. Previous review only flagged line 19; this round
   audited the rest. Translated lines 2, 28, 42-46 (heredoc), 60, 65, 71,
   78, 84 to English. Same review concern: a Terraform module that is
   the customer-facing API surface in Czech is unfit for OSS distribution.

2. infra/modules/customer-instance/outputs.tf had Czech descriptions on
   four outputs. Same fix.

3. docs/padak-security.md referenced a private repo (padak/keboola_agent_cli#206)
   in two places. Replaced with generic 'tracked upstream in the auth-CLI repo'
   per CLAUDE.md vendor-agnostic rule (no cross-refs to private repos).

4. scripts/fetch-env-from-secrets.sh:41 had a Czech comment.
   Translated.

5. CHANGELOG cosmetic: bullet said 'AIAgent.FoundryAI -> AIAgent.MyAgent'
   but the actual code uses both MyAgent (in docstrings) and Example
   (in test fixtures). Reworded to mention both targets.

Final grep across all shipping file types (.md, .py, .yml, .yaml, .sh,
Makefile, .json, .tf, .tpl, Caddyfile, .toml) for groupon|grpn|foundryai|
prj-grp|groupondev|34.77.94.14|34.77.102.61|kids-ai-data|padak/keboola_agent_cli
returns ZERO hits (excluding CHANGELOG.md). Czech-diacritic grep across
.tf/.toml/Caddyfile/Makefile/.yml returns ZERO hits.

157/157 OpenMetadata + DuckDB tests still pass.

* fix(oss): close #94 round-3 leaks (env.template, instance.yaml.example, padak typo)

Round-3 reviewer caught two MUST-FIX leaks the round-2 grep missed
(grep was scoped to extensions that did not include .template / .example
suffixes — the audit was right, the previous grep was not paranoid enough):

1. config/instance.yaml.example:114 — '(optional - Groupon-specific)' brand
   leak in a shipping config example. Replaced with '(optional)'.

2. config/.env.template:68 — stale path 'scripts/grpn/agnes-tls-rotate.sh'
   in operator-facing env-template comment. The script lives at
   scripts/ops/ now (commit 16a85cc); this comment had been pointing
   operators at a non-existent path.

3. docs/padak-security.md:188 — phrase duplication 'tracked in tracked
   upstream' from a sloppy substitution in round-2. Trivial wording fix.

Final paranoid grep across .md/.py/.yml/.yaml/.sh/Makefile/.json/.tf/.tpl/
Caddyfile/.toml/.template/.example/.env* with the full token set
(groupon|grpn|foundryai|prj-grp|groupondev|34\.77\.94\.14|34\.77\.102\.61|
kids-ai-data|padak/keboola_agent_cli) returns ZERO hits, excluding
CHANGELOG.md historical entries.

* fix(oss): #94 round-4 — QUICKSTART.md + rename padak-security.md

Devin Review caught two findings on the latest round-3 commit:

1. docs/QUICKSTART.md:67 still pointed users at the deleted
   scripts/switch-dev-vm.sh. A Quickstart user following step-by-step
   would hit a missing-file error at the final step. Replaced with the
   inline gcloud-ssh equivalent that the Removed bullet documents.

2. docs/padak-security.md filename retains the personal identifier
   'padak'. The PR fixed the body content (replaced
   padak/keboola_agent_cli#206 references with generic wording) but
   missed the filename. Renamed to docs/security-audit-2026-04.md
   (date-anchored, vendor-neutral). Updated the historical CHANGELOG
   link to point at the new path with an inline note about the rename.

* fix(oss): redact remaining hardcoded IPs from planning docs + remove default email

Devin Review caught two more leaks:
1. scripts/fetch-env-from-secrets.sh line 16 had a hardcoded
   personal-email default (zdenek.srotyr@keboola.com). Replaced with
   ':?' bash error so SEED_ADMIN_EMAIL must be explicitly set —
   safer than carrying any specific identity.
2. Planning docs still had 35.195.96.98 and 34.62.223.189 (legacy
   prod/dev IPs) that the round-1 IP-replace pattern missed (it only
   targeted 34.77.x.x). Generic regex redaction across all five
   planning docs replaces every public IP with <redacted-ip>,
   preserving private/loopback/IAP ranges.
2026-04-27 20:24:34 +02:00

463 lines
14 KiB
Python

"""
OpenMetadata data transformer.
Shared logic for parsing OpenMetadata API responses into structured dicts
suitable for YAML export and webapp display. Used by:
- src/catalog_export.py (YAML file generation)
- webapp/app.py (metric list and detail display)
Extracts metadata from OpenMetadata tag conventions:
- MetricCategory.* or Category.* -> category
- Grain.* -> grain/granularity
- Dimension.* -> dimensions list
- MetricType.* -> metric type
- Unit.* -> unit of measurement
"""
import html
import logging
import re
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
def extract_category(tags: List[Dict[str, Any]]) -> str:
"""
Extract metric category from OpenMetadata tags.
Looks for tagFQN prefixed with "MetricCategory." or "Category.".
Returns the first match found, or "general" as fallback.
Args:
tags: List of tag dicts from OpenMetadata (each with "tagFQN" key)
Returns:
Category string (e.g., "finance", "marketing")
"""
for tag in tags:
tag_fqn = tag.get("tagFQN", "")
if tag_fqn.startswith("MetricCategory."):
return tag_fqn.split(".", 1)[1]
if tag_fqn.startswith("Category."):
return tag_fqn.split(".", 1)[1]
return "general"
def extract_grain(raw_metric: Dict[str, Any]) -> str:
"""
Extract metric granularity from OpenMetadata metric data.
Checks the "granularity" field first, then falls back to Grain.* tags.
Args:
raw_metric: Raw metric dict from OpenMetadata API
Returns:
Grain string (e.g., "monthly", "daily"), lowercase. Empty string if not found.
"""
grain = raw_metric.get("granularity", "") or ""
if grain:
return grain.lower()
for tag in raw_metric.get("tags", []):
tag_fqn = tag.get("tagFQN", "")
if tag_fqn.startswith("Grain."):
return tag_fqn.split(".", 1)[1].lower()
return ""
def extract_dimensions(tags: List[Dict[str, Any]]) -> List[str]:
"""
Extract dimension names from OpenMetadata tags.
Looks for tagFQN prefixed with "Dimension.".
Args:
tags: List of tag dicts from OpenMetadata
Returns:
List of dimension names (e.g., ["economic_area", "merchant_country"])
"""
dimensions = []
for tag in tags:
tag_fqn = tag.get("tagFQN", "")
if tag_fqn.startswith("Dimension."):
dimensions.append(tag_fqn.split(".", 1)[1])
return dimensions
def extract_expression(raw_metric: Dict[str, Any]) -> str:
"""
Extract metric SQL expression from OpenMetadata metric data.
Handles both dict format ({"expression": "..."}) and plain string.
Args:
raw_metric: Raw metric dict from OpenMetadata API
Returns:
SQL expression string, or empty string if not found.
"""
metric_expr = raw_metric.get("metricExpression", {})
if isinstance(metric_expr, dict):
# OpenMetadata uses "code" field for the SQL expression
result = metric_expr.get("code", "") or metric_expr.get("expression", "") or ""
if result:
return result
elif isinstance(metric_expr, str) and metric_expr:
return metric_expr
# Fallback: top-level expression field (OpenMetadata format varies)
return raw_metric.get("expression", "") or ""
def extract_owners(raw: Dict[str, Any]) -> List[str]:
"""
Extract owner names from OpenMetadata entity data.
Args:
raw: Raw entity dict with optional "owners" list
Returns:
List of owner name strings
"""
names = []
for owner in raw.get("owners", []):
name = owner.get("name") or owner.get("displayName", "")
if name:
names.append(name)
return names
def extract_metric_type(raw_metric: Dict[str, Any]) -> str:
"""
Extract metric type from OpenMetadata metric data.
Checks "metricType" field first, then MetricType.* tags.
Args:
raw_metric: Raw metric dict from OpenMetadata API
Returns:
Metric type string (e.g., "sum", "count"), lowercase.
"""
metric_type = raw_metric.get("metricType", "") or ""
if metric_type:
return metric_type.lower()
for tag in raw_metric.get("tags", []):
tag_fqn = tag.get("tagFQN", "")
if tag_fqn.startswith("MetricType."):
return tag_fqn.split(".", 1)[1].lower()
return ""
def extract_unit(raw_metric: Dict[str, Any]) -> str:
"""
Extract unit of measurement from OpenMetadata metric data.
Checks "unitOfMeasurement" field first, then Unit.* tags.
Args:
raw_metric: Raw metric dict from OpenMetadata API
Returns:
Unit string (e.g., "USD", "count").
"""
unit = raw_metric.get("unitOfMeasurement", "") or ""
if unit:
return unit
for tag in raw_metric.get("tags", []):
tag_fqn = tag.get("tagFQN", "")
if tag_fqn.startswith("Unit."):
return tag_fqn.split(".", 1)[1]
return ""
def has_tag(tags: List[Dict[str, Any]], tag_fqn: str) -> bool:
"""
Check if a specific tag (by FQN) is present in the tag list.
Args:
tags: List of tag dicts from OpenMetadata
tag_fqn: Fully qualified tag name to check (e.g., "AIAgent.MyAgent")
Returns:
True if the tag is found
"""
return any(t.get("tagFQN", "") == tag_fqn for t in tags)
def extract_tag_names(tags: List[Dict[str, Any]]) -> List[str]:
"""
Extract simple tag names from OpenMetadata tag list.
Uses "name" field if present, otherwise extracts last segment of "tagFQN".
Args:
tags: List of tag dicts from OpenMetadata
Returns:
List of tag name strings
"""
result = []
for tag in tags:
name = tag.get("name") or tag.get("tagFQN", "").split(".")[-1]
if name:
result.append(name)
return result
def strip_html(text: str) -> str:
"""
Strip HTML tags and decode entities from OpenMetadata descriptions.
OpenMetadata stores descriptions as rich HTML. This converts to clean
plain text suitable for YAML files and agent consumption.
Handles:
- HTML tags (<p>, <strong>, <em>, <ul>, <li>, etc.)
- HTML entities (&nbsp;, &amp;, etc.)
- List items (converted to "- " prefix)
- Excessive whitespace from tag removal
Args:
text: Raw HTML string from OpenMetadata
Returns:
Clean plain text string
"""
if not text:
return ""
# Convert <li> to list items before stripping tags
result = re.sub(r"<li[^>]*>", "\n- ", text)
# Convert block-level tags to newlines
result = re.sub(r"<br\s*/?>", "\n", result)
result = re.sub(r"</(?:p|div|h[1-6]|tr|ul|ol)>", "\n", result)
# Remove all remaining HTML tags
result = re.sub(r"<[^>]+>", "", result)
# Decode HTML entities (&nbsp; -> space, &amp; -> &, etc.)
result = html.unescape(result)
# Clean up whitespace: collapse multiple spaces, strip lines
lines = []
for line in result.split("\n"):
cleaned = " ".join(line.split())
if cleaned:
lines.append(cleaned)
return "\n".join(lines)
def sanitize_filename(name: str) -> str:
"""
Convert metric/entity name to safe filesystem name.
Replaces non-alphanumeric characters with underscores, collapses
consecutive underscores, strips leading/trailing underscores, lowercases.
Args:
name: Raw entity name (e.g., "M1 Operational Margin")
Returns:
Safe filename (e.g., "m1_operational_margin")
"""
safe = re.sub(r"[^a-zA-Z0-9]+", "_", name)
safe = re.sub(r"_+", "_", safe)
return safe.strip("_").lower()
def metric_to_yaml_dict(raw_metric: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform raw OpenMetadata metric into YAML-compatible dict.
Output format is compatible with MetricParser._structure_metric_data()
and can be written directly as YAML for Claude Code agent consumption.
Args:
raw_metric: Raw metric dict from OpenMetadata API
Returns:
Dict with keys: name, display_name, category, type, unit, grain,
time_column, table, expression, description, dimensions, notes, synonyms
"""
tags = raw_metric.get("tags", [])
name = raw_metric.get("name", "")
display_name = raw_metric.get("displayName", name)
fqn = raw_metric.get("fullyQualifiedName", "")
owner_names = extract_owners(raw_metric)
notes = []
if fqn:
notes.append(f"Source: OpenMetadata catalog (FQN: {fqn})")
if owner_names:
notes.append(f"Owners: {', '.join(owner_names)}")
return {
"name": sanitize_filename(name),
"display_name": display_name,
"category": extract_category(tags),
"type": extract_metric_type(raw_metric),
"unit": extract_unit(raw_metric),
"grain": extract_grain(raw_metric),
"time_column": "",
"table": "",
"expression": extract_expression(raw_metric),
"description": strip_html(raw_metric.get("description", "") or ""),
"dimensions": extract_dimensions(tags),
"notes": notes,
"synonyms": [],
}
def metric_to_display_dict(raw_metric: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse raw OpenMetadata metric for metric list display in webapp.
Returns a lightweight dict for listing metrics (not full detail).
Description is stripped of HTML and truncated for list view.
Args:
raw_metric: Raw metric dict from OpenMetadata API
Returns:
Dict with keys: name, display_name, description, grain, category, path
"""
fqn = raw_metric.get("fullyQualifiedName", "")
name = raw_metric.get("name", "")
display_name = raw_metric.get("displayName", name)
description = raw_metric.get("description", "") or ""
tags = raw_metric.get("tags", [])
# Strip HTML and truncate for list excerpt
clean_desc = strip_html(description)
if len(clean_desc) > 150:
clean_desc = clean_desc[:147] + "..."
return {
"name": name,
"display_name": display_name,
"description": clean_desc,
"grain": extract_grain(raw_metric),
"category": extract_category(tags),
"path": f"catalog:{fqn}",
}
def metric_to_detail_dict(raw_metric: Dict[str, Any], category_colors: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""
Convert raw OpenMetadata metric into MetricParser-compatible detail dict for modal display.
Args:
raw_metric: Raw metric dict from OpenMetadata API
category_colors: Optional mapping of category -> CSS color hex
Returns:
Dict matching MetricParser._structure_metric_data() output format
"""
if category_colors is None:
category_colors = {}
tags = raw_metric.get("tags", [])
name = raw_metric.get("name", "")
display_name = raw_metric.get("displayName", name)
description = raw_metric.get("description", "") or ""
category = extract_category(tags)
expression = extract_expression(raw_metric)
return {
"name": name,
"display_name": display_name,
"category": category,
"category_color": category_colors.get(category, "#6B7280"),
"metadata": {
"type": extract_metric_type(raw_metric),
"unit": extract_unit(raw_metric),
"grain": extract_grain(raw_metric),
"time_column": "",
},
"overview": {
"description": strip_html(description),
"description_html": description,
"key_insights": [],
},
"validation": None,
"dimensions": extract_dimensions(tags),
"notes": {
"all": [],
"key_insights": [],
},
"sql_examples": {
"expression": {
"title": "Metric Expression",
"query": expression,
"complexity": "simple",
}
} if expression else {},
"technical": {
"table": "",
"expression": expression,
"synonyms": [],
"data_sources": [],
},
"special_sections": {},
}
def table_to_yaml_dict(raw_table: Dict[str, Any]) -> Dict[str, Any]:
"""
Transform raw OpenMetadata table response into YAML-compatible dict.
Extracts table description, column metadata, owners, tags, and tier.
Reuses parsing logic from CatalogEnricher._parse_table_response().
Args:
raw_table: Raw table dict from OpenMetadata /api/v1/tables/name/{fqn}
Returns:
Dict with keys: name, fqn, description, owners, tags, tier, columns
"""
fqn = raw_table.get("fullyQualifiedName", "")
name = raw_table.get("name", "")
description = strip_html(raw_table.get("description", "") or "")
tags = raw_table.get("tags", [])
# Parse columns
columns = []
for col in raw_table.get("columns", []):
col_entry = {
"name": col.get("name", ""),
"type": col.get("dataType", ""),
"description": strip_html(col.get("description", "") or ""),
}
columns.append(col_entry)
# Parse tier from tags (Tier.Tier1 etc.) or extension
tier = None
extension = raw_table.get("extension", {})
if extension:
tier = extension.get("tier") or extension.get("Tier")
if not tier:
for tag in tags:
tag_fqn = tag.get("tagFQN", "")
if tag_fqn.startswith("Tier."):
tier = tag_fqn.split(".", 1)[1]
break
return {
"name": name,
"fqn": fqn,
"description": description.strip(),
"owners": extract_owners(raw_table),
"tags": extract_tag_names(tags),
"tier": tier or "",
"columns": columns,
}