BigQuery adapter writes table entries at top level, not nested under 'tables'. Detect flat format by checking if values contain 'rows' key.
1655 lines
62 KiB
Python
1655 lines
62 KiB
Python
"""
|
|
Flask application for Google SSO user management.
|
|
|
|
Allows users to:
|
|
1. Log in with Google (allowed domain only)
|
|
2. View their account status if they exist
|
|
3. Create a new analyst account with their SSH key
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
from flask import Flask, flash, jsonify, redirect, render_template, request, session, url_for
|
|
|
|
from .auth import admin_required, auth_bp, login_required
|
|
from .config import Config
|
|
from .desktop_auth import require_desktop_auth
|
|
from .notification_images import images_bp
|
|
from .account_service import get_account_details
|
|
from .sync_settings_service import get_sync_settings, update_sync_settings, get_table_subscriptions, update_table_subscriptions
|
|
|
|
# Jira connector is optional - only loaded if configured
|
|
try:
|
|
from connectors.jira.webhook import jira_bp
|
|
JIRA_AVAILABLE = True
|
|
except ImportError:
|
|
JIRA_AVAILABLE = False
|
|
jira_bp = None
|
|
from .telegram_service import get_telegram_status, link_telegram, unlink_telegram
|
|
from .corporate_memory_service import (
|
|
get_knowledge,
|
|
get_stats as get_memory_stats,
|
|
get_user_stats as get_memory_user_stats,
|
|
get_user_votes,
|
|
vote as memory_vote,
|
|
)
|
|
from .user_service import (
|
|
UserInfo,
|
|
check_user_exists,
|
|
create_user,
|
|
get_webapp_username,
|
|
is_username_available,
|
|
validate_ssh_key,
|
|
)
|
|
|
|
# Optional OpenMetadata catalog enrichment
|
|
try:
|
|
from connectors.openmetadata.enricher import CatalogEnricher
|
|
_CATALOG_ENRICHER_AVAILABLE = True
|
|
except ImportError:
|
|
_CATALOG_ENRICHER_AVAILABLE = False
|
|
CatalogEnricher = None
|
|
|
|
# Metric parser for modal detail rendering
|
|
try:
|
|
from webapp.utils.metric_parser import MetricParser
|
|
except ImportError:
|
|
MetricParser = None
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global catalog enricher (initialized in create_app)
|
|
_catalog_enricher = None
|
|
|
|
|
|
def get_git_commit_hash() -> str:
|
|
"""Get current git commit hash for cache busting static assets."""
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
['git', 'rev-parse', '--short', 'HEAD'],
|
|
cwd=Path(__file__).parent.parent,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "dev"
|
|
|
|
|
|
def create_app() -> Flask:
|
|
"""Create and configure the Flask application."""
|
|
global _catalog_enricher
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object(Config)
|
|
|
|
# Validate configuration
|
|
errors = Config.validate()
|
|
if errors and not app.debug:
|
|
for error in errors:
|
|
logger.warning(f"Configuration warning: {error}")
|
|
|
|
# Initialize OpenMetadata catalog enricher
|
|
if _CATALOG_ENRICHER_AVAILABLE:
|
|
try:
|
|
from config.loader import load_instance_config
|
|
instance_config = load_instance_config()
|
|
_catalog_enricher = CatalogEnricher(instance_config)
|
|
if _catalog_enricher.enabled:
|
|
logger.info("OpenMetadata catalog enricher initialized")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to initialize catalog enricher: {e}")
|
|
_catalog_enricher = None
|
|
|
|
# Register core auth blueprint (login_required, login page, logout)
|
|
app.register_blueprint(auth_bp)
|
|
|
|
# Auto-discover and register auth providers
|
|
from auth import discover_providers
|
|
|
|
for provider_instance in discover_providers():
|
|
provider_instance.init_app(app)
|
|
app.register_blueprint(provider_instance.get_blueprint())
|
|
|
|
# Register other blueprints
|
|
app.register_blueprint(images_bp)
|
|
if JIRA_AVAILABLE and jira_bp:
|
|
app.register_blueprint(jira_bp)
|
|
|
|
# Register main routes
|
|
register_routes(app)
|
|
|
|
# Add template context processor for current year and config
|
|
@app.context_processor
|
|
def inject_now():
|
|
return {"now": datetime.now}
|
|
|
|
@app.context_processor
|
|
def inject_config():
|
|
return {"config": Config}
|
|
|
|
# Add cache busting for static files
|
|
@app.context_processor
|
|
def inject_static_cache_buster():
|
|
def static_url(filename: str) -> str:
|
|
"""Generate static URL with cache-busting query parameter."""
|
|
static_path = Path(app.static_folder) / filename
|
|
if static_path.exists():
|
|
mtime = int(static_path.stat().st_mtime)
|
|
return url_for("static", filename=filename, v=mtime)
|
|
return url_for("static", filename=filename)
|
|
return {"static_url": static_url}
|
|
|
|
return app
|
|
|
|
|
|
NOTIFY_SOCKET_PATH = "/data/notifications/bot.sock"
|
|
|
|
# Path to sync state (written by data sync process)
|
|
SYNC_STATE_PATH = Path("/data/src_data/metadata/sync_state.json")
|
|
# Local development: fall back to dev_data/metadata/ relative to project root
|
|
_DEV_METADATA_PATH = Path(__file__).parent.parent / "dev_data" / "metadata"
|
|
|
|
|
|
def _build_activity_data() -> dict:
|
|
"""Build activity data for the Activity Center page.
|
|
|
|
Returns a dict with the structure expected by activity_center.html.
|
|
Currently returns empty-state defaults; will be populated with real
|
|
data from query logs, user sessions, and corporate memory as those
|
|
data sources become available.
|
|
"""
|
|
return {
|
|
"executive_summary": {
|
|
"active_today": 0,
|
|
"active_this_week": 0,
|
|
"teams_active": 0,
|
|
"business_processes_identified": 0,
|
|
"decisions_supported_this_week": 0,
|
|
"avg_success_rate": 0,
|
|
"adoption_trend": "-",
|
|
},
|
|
"maturity_roadmap": {
|
|
"summary": {
|
|
"overall_score": 0,
|
|
"optimized_count": 0,
|
|
"mature_count": 0,
|
|
"developing_count": 0,
|
|
"total_potential_value": "-",
|
|
},
|
|
"categories": [],
|
|
},
|
|
"business_processes": [],
|
|
"teams": [],
|
|
"activity_feed": [],
|
|
"data_opportunities": [],
|
|
}
|
|
|
|
|
|
def _resolve_metadata_path(filename: str) -> Path:
|
|
"""Resolve metadata file path with dev fallback."""
|
|
prod_path = SYNC_STATE_PATH.parent / filename
|
|
if prod_path.exists():
|
|
return prod_path
|
|
dev_path = _DEV_METADATA_PATH / filename
|
|
return dev_path
|
|
|
|
# Fallback stats (used when sync_state.json is unavailable)
|
|
FALLBACK_DATA_STATS = {
|
|
"tables": 0,
|
|
"columns": 0,
|
|
"rows": 0,
|
|
"rows_display": "-",
|
|
"size_mb": 0,
|
|
"size_display": "0 MB",
|
|
"uncompressed_mb": 0,
|
|
"unstructured_gb": 0,
|
|
"unstructured_display": "",
|
|
"last_updated": None,
|
|
"highlights": {},
|
|
}
|
|
|
|
|
|
def _load_data_stats() -> dict:
|
|
"""Load aggregate data stats from sync_state.json, with hardcoded fallback."""
|
|
try:
|
|
sync_path = _resolve_metadata_path("sync_state.json")
|
|
if sync_path.exists():
|
|
with open(sync_path) as f:
|
|
state = json.load(f)
|
|
|
|
tables_data = state.get("tables", {})
|
|
# Support flat format (table_id at top level, no "tables" wrapper)
|
|
if not tables_data and any(isinstance(v, dict) and "rows" in v for v in state.values()):
|
|
tables_data = {k: v for k, v in state.items() if isinstance(v, dict) and "rows" in v}
|
|
if not tables_data:
|
|
return dict(FALLBACK_DATA_STATS)
|
|
|
|
total_tables = len(tables_data)
|
|
total_columns = sum(t.get("columns", 0) for t in tables_data.values())
|
|
total_rows = sum(t.get("rows", 0) for t in tables_data.values())
|
|
total_size_mb = sum(t.get("file_size_mb", 0) for t in tables_data.values())
|
|
total_uncompressed_mb = sum(t.get("uncompressed_mb", 0) for t in tables_data.values())
|
|
|
|
# Format rows for display
|
|
if total_rows >= 1_000_000:
|
|
rows_display = f"{total_rows / 1_000_000:.0f}M+"
|
|
elif total_rows >= 1_000:
|
|
rows_display = f"{total_rows / 1_000:.0f}K+"
|
|
else:
|
|
rows_display = str(total_rows)
|
|
|
|
# Parse last_updated timestamp
|
|
last_updated = state.get("last_updated")
|
|
last_updated_display = None
|
|
if last_updated:
|
|
try:
|
|
dt = datetime.fromisoformat(last_updated)
|
|
last_updated_display = dt.strftime("%Y-%m-%d %H:%M") + " UTC"
|
|
except (ValueError, TypeError):
|
|
last_updated_display = last_updated[:16] if last_updated else None
|
|
|
|
# Format size for display
|
|
size_mb = round(total_size_mb)
|
|
if size_mb >= 1000:
|
|
size_display = f"{size_mb / 1000:.1f} GB"
|
|
else:
|
|
size_display = f"{size_mb} MB"
|
|
|
|
return {
|
|
"tables": total_tables,
|
|
"columns": total_columns if total_columns > 0 else FALLBACK_DATA_STATS["columns"],
|
|
"rows": total_rows,
|
|
"rows_display": rows_display,
|
|
"size_mb": size_mb,
|
|
"size_display": size_display,
|
|
"uncompressed_mb": round(total_uncompressed_mb),
|
|
"unstructured_gb": FALLBACK_DATA_STATS["unstructured_gb"],
|
|
"unstructured_display": FALLBACK_DATA_STATS["unstructured_display"],
|
|
"last_updated": last_updated_display,
|
|
"highlights": FALLBACK_DATA_STATS["highlights"],
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Could not load data stats from sync_state.json: {e}")
|
|
|
|
# Fallback: derive stats from profiles.json (covers sample data / no-sync setups)
|
|
try:
|
|
profiles_path = _resolve_metadata_path("profiles.json")
|
|
if profiles_path.exists():
|
|
with open(profiles_path) as f:
|
|
profiles = json.load(f)
|
|
tables_data = profiles.get("tables", {})
|
|
if tables_data:
|
|
total_tables = len(tables_data)
|
|
total_rows = sum(t.get("row_count", 0) for t in tables_data.values())
|
|
total_columns = sum(t.get("column_count", 0) for t in tables_data.values())
|
|
total_size_mb = sum(t.get("file_size_mb", 0) or 0 for t in tables_data.values())
|
|
if total_rows >= 1_000_000:
|
|
rows_display = f"{total_rows / 1_000_000:.0f}M+"
|
|
elif total_rows >= 1_000:
|
|
rows_display = f"{total_rows / 1_000:.0f}K+"
|
|
else:
|
|
rows_display = str(total_rows)
|
|
size_mb = round(total_size_mb)
|
|
size_display = f"{size_mb / 1000:.1f} GB" if size_mb >= 1000 else f"{size_mb} MB"
|
|
return {
|
|
"tables": total_tables,
|
|
"columns": total_columns,
|
|
"rows": total_rows,
|
|
"rows_display": rows_display,
|
|
"size_mb": size_mb,
|
|
"size_display": size_display,
|
|
"uncompressed_mb": 0,
|
|
"unstructured_gb": 0,
|
|
"unstructured_display": "",
|
|
"last_updated": None,
|
|
"highlights": {},
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Could not load data stats from profiles.json: {e}")
|
|
|
|
return dict(FALLBACK_DATA_STATS)
|
|
|
|
|
|
def _load_catalog_data() -> list:
|
|
"""Load catalog data by merging data_description.md (YAML) with sync_state.json.
|
|
|
|
Returns list of category dicts: [{name, icon_type, tables: [{name, description, rows, rows_display, period}]}]
|
|
"""
|
|
import re
|
|
|
|
import yaml
|
|
|
|
catalog = []
|
|
|
|
try:
|
|
# Parse data_description.md YAML block
|
|
desc_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md"
|
|
if not desc_path.exists():
|
|
return catalog
|
|
|
|
with open(desc_path) as f:
|
|
content = f.read()
|
|
|
|
# Extract YAML block between ```yaml and ```
|
|
yaml_match = re.search(r'```yaml\s*\n(.*?)```', content, re.DOTALL)
|
|
if not yaml_match:
|
|
return catalog
|
|
|
|
yaml_data = yaml.safe_load(yaml_match.group(1))
|
|
if not yaml_data or "tables" not in yaml_data:
|
|
return catalog
|
|
|
|
# Load sync state for row counts
|
|
sync_data = {}
|
|
try:
|
|
sync_path = _resolve_metadata_path("sync_state.json")
|
|
if sync_path.exists():
|
|
with open(sync_path) as f:
|
|
state = json.load(f)
|
|
sync_data = state.get("tables", {})
|
|
except Exception:
|
|
pass
|
|
|
|
# Get folder mapping
|
|
folder_mapping = yaml_data.get("folder_mapping", {})
|
|
|
|
# Load category mappings from instance config, with empty fallback
|
|
try:
|
|
from config.loader import load_instance_config, get_instance_value
|
|
_catalog_config = load_instance_config()
|
|
_catalog_categories = get_instance_value(_catalog_config, "catalog", "categories", default={})
|
|
folder_to_category = {k: v.get("label", k) for k, v in _catalog_categories.items()}
|
|
folder_to_icon = {k: v.get("icon", k) for k, v in _catalog_categories.items()}
|
|
except Exception:
|
|
folder_to_category = {}
|
|
folder_to_icon = {}
|
|
|
|
# Map bucket to folder
|
|
bucket_to_folder = {}
|
|
for bucket_id, folder_name in folder_mapping.items():
|
|
bucket_to_folder[bucket_id] = folder_name
|
|
|
|
# Group tables by category (folder)
|
|
categories = {}
|
|
for table in yaml_data["tables"]:
|
|
table_id = table.get("id", "")
|
|
# Extract bucket from table_id (e.g., "in.c-crm.company" -> "in.c-crm")
|
|
parts = table_id.rsplit(".", 1)
|
|
bucket_id = parts[0] if len(parts) > 1 else ""
|
|
folder = bucket_to_folder.get(bucket_id, "other")
|
|
|
|
if folder not in categories:
|
|
categories[folder] = []
|
|
|
|
# Get sync info
|
|
sync_info = sync_data.get(table_id, {})
|
|
rows = sync_info.get("rows", 0)
|
|
|
|
# Format rows
|
|
if rows >= 1_000_000:
|
|
rows_display = f"{rows / 1_000_000:.1f}M"
|
|
elif rows >= 1_000:
|
|
rows_display = f"{rows:,}"
|
|
else:
|
|
rows_display = str(rows) if rows > 0 else "-"
|
|
|
|
# Determine if "large" badge
|
|
rows_large = rows >= 1_000_000
|
|
|
|
table_info = {
|
|
"name": table.get("name", ""),
|
|
"description": table.get("description", ""),
|
|
"rows": rows,
|
|
"rows_display": rows_display,
|
|
"rows_large": rows_large,
|
|
}
|
|
|
|
# Enrich with catalog metadata (OpenMetadata)
|
|
if _catalog_enricher:
|
|
try:
|
|
# Create config for enrichment with all available fields
|
|
from src.config import TableConfig
|
|
table_config = TableConfig(
|
|
id=table_id,
|
|
name=table.get("name", ""),
|
|
description=table.get("description", ""),
|
|
primary_key=table.get("primary_key", "id"),
|
|
sync_strategy=table.get("sync_strategy", "full_refresh"),
|
|
incremental_window_days=table.get("incremental_window_days"),
|
|
partition_by=table.get("partition_by"),
|
|
partition_granularity=table.get("partition_granularity"),
|
|
max_history_days=table.get("max_history_days"),
|
|
partition_column_type=table.get("partition_column_type", "TIMESTAMP"),
|
|
catalog_fqn=table.get("catalog_fqn"),
|
|
)
|
|
catalog_data = _catalog_enricher.enrich_table(table_config)
|
|
if catalog_data:
|
|
# Enrich table info with catalog data
|
|
table_info["catalog_tags"] = catalog_data.tags
|
|
table_info["catalog_tier"] = catalog_data.tier
|
|
table_info["catalog_owners"] = catalog_data.owners
|
|
table_info["catalog_url"] = catalog_data.catalog_url
|
|
# Override description if catalog has one
|
|
if catalog_data.description:
|
|
table_info["description"] = catalog_data.description
|
|
except Exception as e:
|
|
logger.warning(f"Error enriching {table.get('name')}: {e}")
|
|
|
|
categories[folder].append(table_info)
|
|
|
|
# Build ordered catalog (from instance config or use discovered folders)
|
|
try:
|
|
category_order = get_instance_value(_catalog_config, "catalog", "order", default=list(folder_to_category.keys()))
|
|
except Exception:
|
|
category_order = list(folder_to_category.keys())
|
|
for folder in category_order:
|
|
if folder in categories:
|
|
catalog.append({
|
|
"name": folder_to_category.get(folder, folder),
|
|
"icon_type": folder_to_icon.get(folder, folder),
|
|
"tables": categories[folder],
|
|
"count": len(categories[folder]),
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not load catalog data: {e}")
|
|
|
|
return catalog
|
|
|
|
|
|
# Category metadata for Business Metrics card
|
|
METRIC_CATEGORY_META = {
|
|
'revenue': {'label': 'Revenue', 'css': 'sales', 'order': 1},
|
|
'customers': {'label': 'Customers', 'css': 'hr', 'order': 2},
|
|
'marketing': {'label': 'Marketing', 'css': 'telemetry', 'order': 3},
|
|
'support': {'label': 'Support', 'css': 'support', 'order': 4},
|
|
}
|
|
|
|
|
|
def _load_metrics_data():
|
|
"""Load business metric definitions for catalog display.
|
|
|
|
Prefers metrics from OpenMetadata catalog. Falls back to YAML files if catalog unavailable.
|
|
|
|
Returns list of category dicts ordered by METRIC_CATEGORY_META:
|
|
[{'key': 'finance', 'label': 'Finance...', 'css': '...', 'metrics': [...]}, ...]
|
|
"""
|
|
# Try catalog first (Phase 2)
|
|
catalog_metrics = _load_metrics_from_catalog()
|
|
if catalog_metrics:
|
|
return catalog_metrics
|
|
|
|
# Fallback to YAML files if catalog unavailable
|
|
# Try production path first, fall back to local dev path
|
|
metrics_dir = Path("/data/docs/metrics")
|
|
if not metrics_dir.exists():
|
|
metrics_dir = Path(__file__).parent.parent / "docs" / "metrics"
|
|
|
|
if not metrics_dir.exists():
|
|
return []
|
|
|
|
categories = {}
|
|
for yml_file in sorted(metrics_dir.glob("*/*.yml")):
|
|
try:
|
|
with open(yml_file, 'r', encoding='utf-8') as f:
|
|
raw = yaml.safe_load(f)
|
|
|
|
if isinstance(raw, list) and raw:
|
|
metric = raw[0]
|
|
elif isinstance(raw, dict):
|
|
metric = raw
|
|
else:
|
|
continue
|
|
|
|
cat_key = yml_file.parent.name
|
|
if cat_key not in categories:
|
|
categories[cat_key] = []
|
|
|
|
categories[cat_key].append({
|
|
'name': metric.get('name', yml_file.stem),
|
|
'display_name': metric.get('display_name', yml_file.stem),
|
|
'description': metric.get('description', ''),
|
|
'grain': metric.get('grain', ''),
|
|
'path': f"{cat_key}/{yml_file.name}",
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Could not parse metric {yml_file}: {e}")
|
|
|
|
# Build ordered result using METRIC_CATEGORY_META
|
|
result = []
|
|
for cat_key, meta in sorted(METRIC_CATEGORY_META.items(), key=lambda x: x[1]['order']):
|
|
if cat_key in categories:
|
|
result.append({
|
|
'key': cat_key,
|
|
'label': meta['label'],
|
|
'css': meta['css'],
|
|
'metrics': categories[cat_key],
|
|
})
|
|
|
|
# Add any unknown categories at the end
|
|
for cat_key, metrics in sorted(categories.items()):
|
|
if cat_key not in METRIC_CATEGORY_META:
|
|
result.append({
|
|
'key': cat_key,
|
|
'label': cat_key.replace('_', ' ').title(),
|
|
'css': cat_key,
|
|
'metrics': metrics,
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def _parse_om_metric(raw_metric: dict) -> dict:
|
|
"""
|
|
Parse raw OpenMetadata metric dict into format for metric list display.
|
|
|
|
Extracts category, grain from tags with standard prefixes:
|
|
- Category: tagFQN like "MetricCategory.finance" or "Category.marketing"
|
|
- Grain: tagFQN like "Grain.monthly"
|
|
|
|
Args:
|
|
raw_metric: Raw metric dict from OpenMetadata (id, fullyQualifiedName, description, tags, etc.)
|
|
|
|
Returns:
|
|
Dict with keys: name, display_name, description, grain, path
|
|
(path = "catalog:{fullyQualifiedName}" for JS routing)
|
|
"""
|
|
fqn = raw_metric.get("fullyQualifiedName", "")
|
|
name = raw_metric.get("name", "")
|
|
display_name = raw_metric.get("displayName", name)
|
|
description = raw_metric.get("description", "") or ""
|
|
|
|
# Extract category from tags, grain from granularity field
|
|
tags = raw_metric.get("tags", [])
|
|
category = "general"
|
|
grain = raw_metric.get("granularity", "") or ""
|
|
|
|
for tag in tags:
|
|
tag_fqn = tag.get("tagFQN", "")
|
|
|
|
# Extract category from MetricCategory.* or Category.* tags
|
|
if tag_fqn.startswith("MetricCategory."):
|
|
category = tag_fqn.split(".", 1)[1]
|
|
elif tag_fqn.startswith("Category."):
|
|
category = tag_fqn.split(".", 1)[1]
|
|
|
|
return {
|
|
"name": name,
|
|
"display_name": display_name,
|
|
"description": description,
|
|
"grain": grain.lower() if grain else "", # Normalize to lowercase
|
|
"category": category,
|
|
"path": f"catalog:{fqn}", # Special prefix for JS routing
|
|
}
|
|
|
|
|
|
def _load_metrics_from_catalog() -> list:
|
|
"""
|
|
Load business metrics from OpenMetadata catalog.
|
|
|
|
Groups metrics by category (from tags or fallback to "general").
|
|
Returns same structure as _load_metrics_data() for UI compatibility.
|
|
|
|
Returns:
|
|
List of category dicts with metrics:
|
|
[
|
|
{'key': 'finance', 'label': '...', 'css': '...', 'metrics': [...]},
|
|
{'key': 'marketing', 'label': '...', 'css': '...', 'metrics': [...]}
|
|
]
|
|
Returns empty list if catalog disabled or fails.
|
|
"""
|
|
global _catalog_enricher
|
|
|
|
if not _catalog_enricher or not _catalog_enricher.enabled:
|
|
return []
|
|
|
|
try:
|
|
# Fetch metrics from catalog
|
|
raw_metrics = _catalog_enricher.get_metrics()
|
|
if not raw_metrics:
|
|
logger.debug("No metrics found in catalog")
|
|
return []
|
|
|
|
# Parse each metric and group by category
|
|
categories = {}
|
|
for raw in raw_metrics:
|
|
try:
|
|
metric = _parse_om_metric(raw)
|
|
cat = metric["category"]
|
|
|
|
if cat not in categories:
|
|
categories[cat] = []
|
|
|
|
categories[cat].append(metric)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse metric {raw.get('name', '?')}: {e}")
|
|
continue
|
|
|
|
# Build result using METRIC_CATEGORY_META for order and labels
|
|
result = []
|
|
for cat_key, meta in sorted(METRIC_CATEGORY_META.items(), key=lambda x: x[1]["order"]):
|
|
if cat_key in categories:
|
|
result.append({
|
|
"key": cat_key,
|
|
"label": meta["label"],
|
|
"css": meta["css"],
|
|
"metrics": categories[cat_key],
|
|
})
|
|
|
|
# Add unknown categories at the end
|
|
for cat_key, metrics in sorted(categories.items()):
|
|
if cat_key not in METRIC_CATEGORY_META:
|
|
result.append({
|
|
"key": cat_key,
|
|
"label": cat_key.replace("_", " ").title(),
|
|
"css": cat_key,
|
|
"metrics": metrics,
|
|
})
|
|
|
|
logger.info(f"Loaded {sum(len(c['metrics']) for c in result)} metrics from catalog")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load metrics from catalog: {e}")
|
|
return []
|
|
|
|
|
|
def _build_om_metric_detail(raw_metric: dict) -> dict:
|
|
"""
|
|
Convert raw OpenMetadata metric into MetricParser-compatible JSON for modal.
|
|
|
|
Maps OpenMetadata fields to MetricParser structure (name, display_name, category, metadata, etc.).
|
|
Extracts type, unit, grain from OpenMetadata fields (metricType, unitOfMeasurement, granularity).
|
|
|
|
Args:
|
|
raw_metric: Raw metric dict from OpenMetadata
|
|
|
|
Returns:
|
|
Dict matching MetricParser._structure_metric_data() format
|
|
"""
|
|
fqn = raw_metric.get("fullyQualifiedName", "")
|
|
name = raw_metric.get("name", "")
|
|
display_name = raw_metric.get("displayName", name)
|
|
description = raw_metric.get("description", "") or ""
|
|
|
|
# OpenMetadata uses metricExpression instead of expression
|
|
expression = ""
|
|
metric_expr = raw_metric.get("metricExpression", {})
|
|
if isinstance(metric_expr, dict):
|
|
expression = metric_expr.get("expression", "") or ""
|
|
elif isinstance(metric_expr, str):
|
|
expression = metric_expr
|
|
|
|
owners = raw_metric.get("owners", [])
|
|
|
|
# Extract metadata from OpenMetadata fields and tags
|
|
metric_type = raw_metric.get("metricType", "") or ""
|
|
unit = raw_metric.get("unitOfMeasurement", "") or ""
|
|
grain = raw_metric.get("granularity", "") or ""
|
|
category = "general"
|
|
dimensions = []
|
|
|
|
# Also check tags for category and dimensions
|
|
tags = raw_metric.get("tags", [])
|
|
for tag in tags:
|
|
tag_fqn = tag.get("tagFQN", "")
|
|
|
|
if tag_fqn.startswith("MetricCategory."):
|
|
category = tag_fqn.split(".", 1)[1]
|
|
elif tag_fqn.startswith("Dimension."):
|
|
dimensions.append(tag_fqn.split(".", 1)[1])
|
|
|
|
# Extract owner names
|
|
owner_names = []
|
|
for owner in owners:
|
|
name_val = owner.get("name") or owner.get("displayName", "")
|
|
if name_val:
|
|
owner_names.append(name_val)
|
|
|
|
# Build MetricParser-compatible structure
|
|
return {
|
|
"name": name,
|
|
"display_name": display_name,
|
|
"category": category,
|
|
"category_color": MetricParser.CATEGORY_COLORS.get(category, "#6B7280"),
|
|
"metadata": {
|
|
"type": metric_type,
|
|
"unit": unit,
|
|
"grain": grain,
|
|
"time_column": "", # Not available in OpenMetadata
|
|
},
|
|
"overview": {
|
|
"description": description.strip(),
|
|
"key_insights": [], # Not available in OpenMetadata
|
|
},
|
|
"validation": None, # Not available in OpenMetadata
|
|
"dimensions": dimensions,
|
|
"notes": {
|
|
"all": [], # Not available in OpenMetadata
|
|
"key_insights": [],
|
|
},
|
|
"sql_examples": {
|
|
"expression": {
|
|
"title": "Metric Expression",
|
|
"query": expression,
|
|
"complexity": "simple",
|
|
}
|
|
} if expression else {},
|
|
"technical": {
|
|
"table": "", # Not available in OpenMetadata
|
|
"expression": expression,
|
|
"synonyms": [],
|
|
"data_sources": [],
|
|
},
|
|
"special_sections": {},
|
|
}
|
|
|
|
|
|
def _send_welcome_message(username: str) -> None:
|
|
"""Send a welcome message to the user via bot socket after linking."""
|
|
try:
|
|
import httpx
|
|
|
|
transport = httpx.HTTPTransport(uds=NOTIFY_SOCKET_PATH)
|
|
with httpx.Client(transport=transport, timeout=10) as client:
|
|
client.post(
|
|
"http://localhost/send",
|
|
json={
|
|
"user": username,
|
|
"text": (
|
|
f"Account linked!\n\n"
|
|
f"Your server login: *{username}*\n"
|
|
f"Notifications dir: `~/user/notifications/`\n\n"
|
|
f"To create notification scripts, ask your local AI assistant "
|
|
f"(Claude Code). It knows how to build them for you.\n\n"
|
|
f"You will receive alerts from your scripts here."
|
|
),
|
|
"parse_mode": "Markdown",
|
|
},
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to send welcome message to {username}: {e}")
|
|
|
|
|
|
def register_routes(app: Flask) -> None:
|
|
"""Register main application routes."""
|
|
|
|
@app.route("/")
|
|
def index():
|
|
"""Redirect to dashboard or login."""
|
|
if "user" in session:
|
|
return redirect(url_for("dashboard"))
|
|
return redirect(url_for("auth.login"))
|
|
|
|
@app.route("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
"""Show user dashboard with account info or registration form."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
# Check if user exists on the system
|
|
user_info = check_user_exists(username)
|
|
|
|
# Check if username is available (for new registrations)
|
|
username_available, username_error = is_username_available(username)
|
|
|
|
# Read bootstrap YAML for Claude Code setup instructions
|
|
bootstrap_yaml = ""
|
|
try:
|
|
bootstrap_path = os.path.join(os.path.dirname(__file__), "..", "docs", "setup", "bootstrap.yaml")
|
|
with open(bootstrap_path, "r") as f:
|
|
bootstrap_yaml_template = f.read()
|
|
|
|
# Inject username and server info into template
|
|
bootstrap_yaml = bootstrap_yaml_template.replace("{username}", username)
|
|
bootstrap_yaml = bootstrap_yaml.replace("{server_host}", Config.SERVER_HOST)
|
|
bootstrap_yaml = bootstrap_yaml.replace("{server_hostname}", Config.SERVER_HOSTNAME)
|
|
bootstrap_yaml = bootstrap_yaml.replace("{ssh_alias}", Config.SSH_ALIAS)
|
|
bootstrap_yaml = bootstrap_yaml.replace("{ssh_key}", Config.SSH_KEY)
|
|
bootstrap_yaml = bootstrap_yaml.replace("{project_dir}", Config.PROJECT_DIR)
|
|
webapp_url = f"https://{Config.SERVER_HOSTNAME}" if Config.SERVER_HOSTNAME else ""
|
|
bootstrap_yaml = bootstrap_yaml.replace("{webapp_url}", webapp_url)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not read bootstrap.yaml: {e}")
|
|
|
|
# Get Telegram link status
|
|
telegram_status = get_telegram_status(username)
|
|
|
|
# Get desktop app link status
|
|
from .desktop_auth import get_desktop_status
|
|
desktop_status = get_desktop_status(username)
|
|
|
|
# Load data stats
|
|
data_stats = _load_data_stats()
|
|
catalog_data = _load_catalog_data()
|
|
|
|
# Load sync settings (for existing users)
|
|
sync_settings = get_sync_settings(username) if user_info.exists else None
|
|
|
|
# Add subscription status to catalog tables
|
|
if user_info.exists:
|
|
subs = get_table_subscriptions(username)
|
|
table_mode = subs.get("table_mode", "all")
|
|
table_subs = subs.get("tables", {})
|
|
for cat in catalog_data:
|
|
for table in cat.get("tables", []):
|
|
if table_mode == "all":
|
|
table["subscribed"] = True
|
|
else:
|
|
table["subscribed"] = table_subs.get(table["name"], False)
|
|
|
|
# Gather account widget details (notification scripts, cron, last sync)
|
|
account_details = get_account_details(username) if user_info.exists else None
|
|
|
|
# Activity Center summary for dashboard widget (empty fallback)
|
|
activity_summary = {}
|
|
|
|
# Load business metrics for dashboard widget
|
|
metrics_data = _load_metrics_data()
|
|
|
|
return render_template(
|
|
"dashboard.html",
|
|
user=user,
|
|
username=username,
|
|
user_info=user_info,
|
|
username_available=username_available,
|
|
username_error=username_error,
|
|
server_host=Config.SERVER_HOST,
|
|
server_hostname=Config.SERVER_HOSTNAME,
|
|
ssh_alias=Config.SSH_ALIAS,
|
|
ssh_key=Config.SSH_KEY,
|
|
project_dir=Config.PROJECT_DIR,
|
|
bootstrap_yaml=bootstrap_yaml,
|
|
telegram_status=telegram_status,
|
|
desktop_status=desktop_status,
|
|
data_stats=data_stats,
|
|
catalog_data=catalog_data,
|
|
sync_settings=sync_settings,
|
|
account_details=account_details,
|
|
activity_summary=activity_summary,
|
|
metrics_data=metrics_data,
|
|
)
|
|
|
|
@app.route("/catalog")
|
|
@login_required
|
|
def catalog():
|
|
"""Data catalog page."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
data_stats = _load_data_stats()
|
|
catalog_data = _load_catalog_data()
|
|
sync_settings = get_sync_settings(username)
|
|
|
|
# Add subscription status to catalog tables
|
|
subs = get_table_subscriptions(username)
|
|
table_mode = subs.get("table_mode", "all")
|
|
table_subs = subs.get("tables", {})
|
|
for cat in catalog_data:
|
|
for table in cat.get("tables", []):
|
|
if table_mode == "all":
|
|
table["subscribed"] = True
|
|
else:
|
|
table["subscribed"] = table_subs.get(table["name"], False)
|
|
|
|
metrics_data = _load_metrics_data()
|
|
|
|
return render_template(
|
|
"catalog.html",
|
|
data_stats=data_stats,
|
|
catalog_data=catalog_data,
|
|
sync_settings=sync_settings,
|
|
metrics_data=metrics_data,
|
|
git_version=get_git_commit_hash(),
|
|
)
|
|
|
|
@app.route("/api/catalog/profile/<table_name>")
|
|
@login_required
|
|
def catalog_profile(table_name):
|
|
"""Return profiler data for a single table with OpenMetadata catalog enrichment."""
|
|
profiles_path = _resolve_metadata_path("profiles.json")
|
|
try:
|
|
if not profiles_path.exists():
|
|
return jsonify({"error": "Profiler data not available yet"}), 404
|
|
|
|
with open(profiles_path) as f:
|
|
profiles = json.load(f)
|
|
|
|
table_profile = profiles.get("tables", {}).get(table_name)
|
|
if not table_profile:
|
|
return jsonify({"error": f"No profile for table '{table_name}'"}), 404
|
|
|
|
# Enrich with OpenMetadata catalog data if available
|
|
if _catalog_enricher and _catalog_enricher.enabled:
|
|
try:
|
|
# Find table config from data_description.md
|
|
from src.config import TableConfig
|
|
from config.loader import load_instance_config
|
|
|
|
# Load data_description.md to find table config by name
|
|
instance_config = load_instance_config()
|
|
desc_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md"
|
|
if desc_path.exists():
|
|
with open(desc_path) as f:
|
|
content = f.read()
|
|
|
|
import re
|
|
yaml_match = re.search(r'```yaml\s*\n(.*?)```', content, re.DOTALL)
|
|
if yaml_match:
|
|
import yaml
|
|
yaml_data = yaml.safe_load(yaml_match.group(1))
|
|
if yaml_data and "tables" in yaml_data:
|
|
# Find table by name
|
|
for table_def in yaml_data["tables"]:
|
|
if table_def.get("name") == table_name:
|
|
table_config = TableConfig(
|
|
id=table_def.get("id", ""),
|
|
name=table_def.get("name", ""),
|
|
description=table_def.get("description", ""),
|
|
primary_key=table_def.get("primary_key", "id"),
|
|
sync_strategy=table_def.get("sync_strategy", "full_refresh"),
|
|
incremental_window_days=table_def.get("incremental_window_days"),
|
|
partition_by=table_def.get("partition_by"),
|
|
partition_granularity=table_def.get("partition_granularity"),
|
|
max_history_days=table_def.get("max_history_days"),
|
|
partition_column_type=table_def.get("partition_column_type", "TIMESTAMP"),
|
|
catalog_fqn=table_def.get("catalog_fqn"),
|
|
)
|
|
catalog_data = _catalog_enricher.enrich_table(table_config)
|
|
if catalog_data:
|
|
# Add catalog enrichment to profile
|
|
table_profile["catalog"] = {
|
|
"description": catalog_data.description,
|
|
"tags": catalog_data.tags,
|
|
"tier": catalog_data.tier,
|
|
"owners": catalog_data.owners,
|
|
"url": catalog_data.catalog_url,
|
|
}
|
|
# Override description with catalog version
|
|
if catalog_data.description:
|
|
table_profile["description"] = catalog_data.description
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Error enriching profile for {table_name}: {e}")
|
|
|
|
return jsonify(table_profile)
|
|
except Exception as e:
|
|
logger.error(f"Error loading profile for {table_name}: {e}")
|
|
return jsonify({"error": "Failed to load profile data"}), 500
|
|
|
|
@app.route("/api/metrics/<path:metric_path>")
|
|
@login_required
|
|
def api_metric(metric_path):
|
|
"""API endpoint to serve metric definition as structured JSON."""
|
|
import re
|
|
|
|
# Validate path to prevent directory traversal (allow category/file.yml pattern)
|
|
if not re.match(r"^[a-z_]+/[a-z_]+\.yml$", metric_path):
|
|
return jsonify({"error": "Invalid metric path"}), 400
|
|
|
|
# Try production path first, fall back to local dev path
|
|
docs_dir = Path("/data/docs/metrics")
|
|
if not docs_dir.exists():
|
|
# Local development: use docs/metrics relative to project root
|
|
docs_dir = Path(__file__).parent.parent / "docs" / "metrics"
|
|
|
|
file_path = docs_dir / metric_path
|
|
|
|
# Security check: ensure path stays within docs_dir
|
|
try:
|
|
if not file_path.is_file() or not file_path.resolve().is_relative_to(
|
|
docs_dir.resolve()
|
|
):
|
|
return jsonify({"error": "Metric file not found"}), 404
|
|
except (ValueError, OSError):
|
|
return jsonify({"error": "Invalid path"}), 400
|
|
|
|
# Parse metric YAML and return structured JSON
|
|
try:
|
|
from webapp.utils.metric_parser import MetricParser
|
|
|
|
parser = MetricParser(docs_dir)
|
|
metric_data = parser.parse_metric(metric_path)
|
|
return jsonify(metric_data)
|
|
except Exception as e:
|
|
logger.error(f"Error parsing metric {metric_path}: {e}")
|
|
return jsonify({"error": f"Failed to parse metric: {str(e)}"}), 500
|
|
|
|
@app.route("/api/catalog/metrics/<path:metric_fqn>")
|
|
@login_required
|
|
def api_catalog_metric(metric_fqn):
|
|
"""
|
|
API endpoint to serve metric from OpenMetadata catalog as structured JSON.
|
|
|
|
Args:
|
|
metric_fqn: Fully qualified name (e.g., "Active2%20Customers" URL-encoded)
|
|
|
|
Returns:
|
|
JSON matching MetricParser format for modal rendering
|
|
"""
|
|
global _catalog_enricher
|
|
|
|
if not _catalog_enricher or not _catalog_enricher.enabled:
|
|
return jsonify({"error": "Catalog not available"}), 503
|
|
|
|
try:
|
|
# URL-decode FQN (Flask path parameter already decoded, but just in case)
|
|
from urllib.parse import unquote
|
|
fqn = unquote(metric_fqn)
|
|
|
|
# Fetch metric from catalog
|
|
raw = _catalog_enricher._client.get_metric_by_fqn(fqn)
|
|
|
|
# Convert to MetricParser format
|
|
metric_data = _build_om_metric_detail(raw)
|
|
|
|
return jsonify(metric_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching catalog metric {metric_fqn}: {e}")
|
|
return jsonify({"error": f"Failed to fetch metric: {str(e)}"}), 500
|
|
|
|
@app.route("/docs/metrics/<path:metric_path>")
|
|
@login_required
|
|
def serve_metric(metric_path):
|
|
"""Serve metric definition YAML files (legacy endpoint for backward compatibility)."""
|
|
import re
|
|
|
|
# Validate path to prevent directory traversal (allow category/file.yml pattern)
|
|
if not re.match(r"^[a-z_]+/[a-z_]+\.yml$", metric_path):
|
|
return render_template("error.html", error="Invalid metric path", code=400), 400
|
|
|
|
docs_dir = Path("/data/docs/metrics")
|
|
file_path = docs_dir / metric_path
|
|
|
|
# Security check: ensure path stays within docs_dir
|
|
try:
|
|
if not file_path.is_file() or not file_path.resolve().is_relative_to(
|
|
docs_dir.resolve()
|
|
):
|
|
return (
|
|
render_template("error.html", error="Metric file not found", code=404),
|
|
404,
|
|
)
|
|
except (ValueError, OSError):
|
|
return render_template("error.html", error="Invalid path", code=400), 400
|
|
|
|
from flask import send_file as flask_send_file
|
|
|
|
return flask_send_file(file_path, mimetype="text/plain")
|
|
|
|
@app.route("/register", methods=["POST"])
|
|
@login_required
|
|
def register():
|
|
"""Create a new analyst account."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
# Check if user already exists
|
|
user_info = check_user_exists(username)
|
|
if user_info.exists:
|
|
flash("Your account already exists.", "info")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
# Get and validate SSH key
|
|
# Normalize whitespace: collapse newlines/tabs/multiple spaces to single spaces
|
|
# Users often paste keys with line breaks from terminal wrapping
|
|
ssh_key = " ".join(request.form.get("ssh_key", "").split())
|
|
|
|
is_valid, error = validate_ssh_key(ssh_key)
|
|
if not is_valid:
|
|
flash(error, "error")
|
|
return redirect(url_for("dashboard"))
|
|
|
|
# Create the user
|
|
success, message = create_user(username, ssh_key)
|
|
|
|
if success:
|
|
flash(message, "success")
|
|
logger.info(f"Account created for {email} (username: {username})")
|
|
else:
|
|
flash(message, "error")
|
|
logger.error(f"Failed to create account for {email}: {message}")
|
|
|
|
return redirect(url_for("dashboard"))
|
|
|
|
@app.route("/api/telegram/verify", methods=["POST"])
|
|
@login_required
|
|
def telegram_verify():
|
|
"""Verify a Telegram verification code and link the account."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
code = data.get("code", "").strip()
|
|
|
|
if not code:
|
|
return jsonify({"error": "Verification code is required"}), 400
|
|
|
|
success, message = link_telegram(username, code)
|
|
if success:
|
|
logger.info(f"Telegram linked for {username}")
|
|
# Send welcome message via bot socket
|
|
_send_welcome_message(username)
|
|
return jsonify({"ok": True, "message": message})
|
|
return jsonify({"error": message}), 400
|
|
|
|
@app.route("/api/telegram/unlink", methods=["POST"])
|
|
@login_required
|
|
def telegram_unlink():
|
|
"""Unlink Telegram from the account."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
success, message = unlink_telegram(username)
|
|
if success:
|
|
logger.info(f"Telegram unlinked for {username}")
|
|
return jsonify({"ok": True, "message": message})
|
|
return jsonify({"error": message}), 400
|
|
|
|
@app.route("/api/telegram/status")
|
|
@login_required
|
|
def telegram_status():
|
|
"""Get Telegram link status."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
status = get_telegram_status(username)
|
|
return jsonify(status)
|
|
|
|
@app.route("/download/<filename>")
|
|
@login_required
|
|
def download(filename):
|
|
"""Serve downloadable files (e.g., desktop app)."""
|
|
import re
|
|
|
|
if not re.match(r"^[a-zA-Z0-9_\-]+\.(zip|dmg)$", filename):
|
|
return render_template("error.html", error="Invalid filename", code=400), 400
|
|
|
|
download_dir = Path("/data/downloads")
|
|
file_path = download_dir / filename
|
|
if not file_path.is_file():
|
|
return render_template("error.html", error="File not found", code=404), 404
|
|
|
|
from flask import send_file as flask_send_file
|
|
|
|
return flask_send_file(file_path, as_attachment=True)
|
|
|
|
@app.route("/api/desktop/scripts")
|
|
def desktop_scripts():
|
|
"""List notification scripts for the authenticated desktop user."""
|
|
username = require_desktop_auth()
|
|
from services.telegram_bot.status import get_script_list_structured
|
|
scripts = get_script_list_structured(username)
|
|
return jsonify(scripts)
|
|
|
|
@app.route("/api/desktop/scripts/run", methods=["POST"])
|
|
def desktop_run_script():
|
|
"""Run a notification script on-demand for the authenticated desktop user."""
|
|
username = require_desktop_auth()
|
|
data = request.get_json(silent=True) or {}
|
|
script_name = data.get("name", "").strip()
|
|
if not script_name:
|
|
return jsonify({"error": "Missing 'name' field"}), 400
|
|
|
|
from services.telegram_bot.runner import run_user_script
|
|
from services.telegram_bot.dispatch import dispatch_to_ws_gateway
|
|
|
|
output = run_user_script(username, script_name)
|
|
if output is None:
|
|
return jsonify({"error": f"Script '{script_name}' failed or not found"}), 500
|
|
|
|
if output.get("notify", False):
|
|
dispatch_to_ws_gateway(username, output, script_name)
|
|
|
|
return jsonify({"ok": True})
|
|
|
|
@app.route("/api/sync-settings")
|
|
@login_required
|
|
def sync_settings_get():
|
|
"""Get sync settings for current user."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
settings = get_sync_settings(username)
|
|
return jsonify(settings)
|
|
|
|
@app.route("/api/sync-settings", methods=["POST"])
|
|
@login_required
|
|
def sync_settings_update():
|
|
"""Update sync settings for current user."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
datasets = data.get("datasets", {})
|
|
|
|
if not datasets:
|
|
return jsonify({"error": "Missing datasets field"}), 400
|
|
|
|
success, message = update_sync_settings(username, datasets)
|
|
if success:
|
|
logger.info(f"Sync settings updated for {username}")
|
|
return jsonify({"ok": True, "message": message})
|
|
return jsonify({"error": message}), 400
|
|
|
|
@app.route("/api/table-subscriptions")
|
|
@login_required
|
|
def table_subscriptions_get():
|
|
"""Get per-table subscriptions for current user."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
subs = get_table_subscriptions(username)
|
|
return jsonify(subs)
|
|
|
|
@app.route("/api/table-subscriptions", methods=["POST"])
|
|
@login_required
|
|
def table_subscriptions_update():
|
|
"""Update per-table subscriptions for current user."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
table_mode = data.get("table_mode", "all")
|
|
tables = data.get("tables", {})
|
|
|
|
if table_mode not in ("all", "explicit"):
|
|
return jsonify({"error": "table_mode must be 'all' or 'explicit'"}), 400
|
|
|
|
success, message = update_table_subscriptions(username, table_mode, tables)
|
|
if success:
|
|
logger.info(f"Table subscriptions updated for {username}")
|
|
return jsonify({"ok": True, "message": message})
|
|
return jsonify({"error": message}), 400
|
|
|
|
# ─────────────────────────────────────────────────────────────────
|
|
# Corporate Memory routes
|
|
# ─────────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/corporate-memory")
|
|
@login_required
|
|
def corporate_memory():
|
|
"""Corporate Memory knowledge browser page."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
# Get stats for header
|
|
stats = get_memory_stats()
|
|
user_stats = get_memory_user_stats(username)
|
|
|
|
# Get user's votes for highlighting
|
|
user_votes = get_user_votes(username)
|
|
|
|
# Get initial page of knowledge
|
|
knowledge = get_knowledge(page=0, per_page=20)
|
|
|
|
return render_template(
|
|
"corporate_memory.html",
|
|
stats=stats,
|
|
user_stats=user_stats,
|
|
user_votes=user_votes,
|
|
knowledge=knowledge,
|
|
)
|
|
|
|
# ─────────────────────────────────────────────────────────────────
|
|
# Activity Center routes
|
|
# ─────────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/activity-center")
|
|
@login_required
|
|
def activity_center():
|
|
"""Activity Center page - enterprise data intelligence overview."""
|
|
activity = _build_activity_data()
|
|
return render_template("activity_center.html", activity=activity)
|
|
|
|
@app.route("/api/corporate-memory/knowledge")
|
|
@login_required
|
|
def api_corporate_memory_knowledge():
|
|
"""Get knowledge items with optional filtering."""
|
|
category = request.args.get("category")
|
|
search = request.args.get("search")
|
|
page = request.args.get("page", 0, type=int)
|
|
per_page = request.args.get("per_page", 20, type=int)
|
|
sort = request.args.get("sort", "score")
|
|
my_rules = request.args.get("my_rules", "").lower() == "true"
|
|
|
|
# Get username for my_rules filter
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
# Limit per_page to reasonable maximum
|
|
per_page = min(per_page, 100)
|
|
|
|
result = get_knowledge(
|
|
category=category,
|
|
search=search,
|
|
page=page,
|
|
per_page=per_page,
|
|
sort=sort,
|
|
username=username,
|
|
my_rules=my_rules,
|
|
)
|
|
return jsonify(result)
|
|
|
|
@app.route("/api/corporate-memory/stats")
|
|
@login_required
|
|
def api_corporate_memory_stats():
|
|
"""Get corporate memory statistics for dashboard."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
stats = get_memory_stats()
|
|
user_stats = get_memory_user_stats(username)
|
|
|
|
return jsonify({
|
|
**stats,
|
|
**user_stats,
|
|
})
|
|
|
|
@app.route("/api/corporate-memory/vote", methods=["POST"])
|
|
@login_required
|
|
def api_corporate_memory_vote():
|
|
"""Vote on a knowledge item."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
item_id = data.get("item_id")
|
|
vote_value = data.get("vote", 0)
|
|
|
|
if not item_id:
|
|
return jsonify({"error": "Missing item_id"}), 400
|
|
|
|
try:
|
|
vote_value = int(vote_value)
|
|
except (TypeError, ValueError):
|
|
return jsonify({"error": "Invalid vote value"}), 400
|
|
|
|
success, message = memory_vote(username, item_id, vote_value)
|
|
if success:
|
|
return jsonify({"ok": True, "message": message})
|
|
return jsonify({"error": message}), 400
|
|
|
|
@app.route("/api/corporate-memory/my-votes")
|
|
@login_required
|
|
def api_corporate_memory_my_votes():
|
|
"""Get current user's votes."""
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
username = get_webapp_username(email)
|
|
|
|
votes = get_user_votes(username)
|
|
return jsonify({"votes": votes})
|
|
|
|
# ─────────────────────────────────────────────────────────────────
|
|
# Admin pages
|
|
# ─────────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/admin/tables")
|
|
@login_required
|
|
@admin_required
|
|
def admin_tables():
|
|
"""Admin table management page."""
|
|
return render_template("admin_tables.html")
|
|
|
|
# ─────────────────────────────────────────────────────────────────
|
|
# Admin API routes
|
|
# ─────────────────────────────────────────────────────────────────
|
|
|
|
@app.route("/api/admin/discover-tables")
|
|
@login_required
|
|
@admin_required
|
|
def admin_discover_tables():
|
|
"""Discover all available tables from the data source."""
|
|
try:
|
|
from src.data_sync import create_data_source
|
|
|
|
ds = create_data_source()
|
|
raw_tables = ds.discover_tables()
|
|
|
|
# Check which tables are already registered
|
|
registered_ids = set()
|
|
try:
|
|
from src.table_registry import TableRegistry
|
|
registry = TableRegistry.default()
|
|
registered_ids = {t["id"] for t in registry.list_tables()}
|
|
except Exception:
|
|
pass
|
|
|
|
# Group by bucket
|
|
buckets: dict = {}
|
|
for t in raw_tables:
|
|
bid = t.get("bucket_id", "other")
|
|
if bid not in buckets:
|
|
buckets[bid] = {
|
|
"bucket_id": bid,
|
|
"bucket_name": t.get("bucket_name", bid),
|
|
"tables": [],
|
|
}
|
|
t["is_registered"] = t["id"] in registered_ids
|
|
buckets[bid]["tables"].append(t)
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
"total": len(raw_tables),
|
|
"buckets": list(buckets.values()),
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Discovery failed: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/admin/registry")
|
|
@login_required
|
|
@admin_required
|
|
def admin_registry_list():
|
|
"""Return the full table registry."""
|
|
try:
|
|
from src.table_registry import TableRegistry
|
|
|
|
registry = TableRegistry.default()
|
|
return jsonify({
|
|
"ok": True,
|
|
"version": registry.version,
|
|
"folder_mapping": registry.get_folder_mapping(),
|
|
"tables": registry.list_tables(),
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Registry list failed: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/admin/register-table", methods=["POST"])
|
|
@login_required
|
|
@admin_required
|
|
def admin_register_table():
|
|
"""Register a new table from discovery results."""
|
|
from src.table_registry import ConflictError, TableRegistry
|
|
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
if not data.get("id"):
|
|
return jsonify({"error": "Missing table 'id'"}), 400
|
|
|
|
try:
|
|
registry = TableRegistry.default()
|
|
registry.register_table(
|
|
table_def=data,
|
|
registered_by=email,
|
|
expected_version=data.get("version"),
|
|
)
|
|
|
|
# Regenerate data_description.md
|
|
docs_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md"
|
|
registry.generate_data_description_md(docs_path.resolve())
|
|
|
|
return jsonify({"ok": True, "version": registry.version})
|
|
|
|
except ConflictError as e:
|
|
return jsonify({"error": str(e)}), 409
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
except Exception as e:
|
|
logger.error(f"Register table failed: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/admin/registry/<path:table_id>", methods=["PUT"])
|
|
@login_required
|
|
@admin_required
|
|
def admin_update_table(table_id):
|
|
"""Update configuration of a registered table."""
|
|
from src.table_registry import ConflictError, TableRegistry
|
|
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
|
|
try:
|
|
registry = TableRegistry.default()
|
|
registry.update_table(
|
|
table_id=table_id,
|
|
updates=data,
|
|
updated_by=email,
|
|
expected_version=data.pop("version", None),
|
|
)
|
|
|
|
# Regenerate data_description.md
|
|
docs_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md"
|
|
registry.generate_data_description_md(docs_path.resolve())
|
|
|
|
return jsonify({"ok": True, "version": registry.version})
|
|
|
|
except ConflictError as e:
|
|
return jsonify({"error": str(e)}), 409
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
except Exception as e:
|
|
logger.error(f"Update table failed: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/api/admin/registry/<path:table_id>", methods=["DELETE"])
|
|
@login_required
|
|
@admin_required
|
|
def admin_unregister_table(table_id):
|
|
"""Unregister a table and clean up subscriptions."""
|
|
from src.table_registry import ConflictError, TableRegistry
|
|
|
|
user = session.get("user", {})
|
|
email = user.get("email", "")
|
|
|
|
data = request.get_json(silent=True) or {}
|
|
|
|
try:
|
|
registry = TableRegistry.default()
|
|
|
|
# Get table name before deletion (for subscription cleanup)
|
|
table_info = registry.get_table(table_id)
|
|
table_name = table_info["name"] if table_info else None
|
|
|
|
registry.unregister_table(
|
|
table_id=table_id,
|
|
unregistered_by=email,
|
|
expected_version=data.get("version"),
|
|
)
|
|
|
|
# Clean up per-user subscriptions for removed table
|
|
if table_name:
|
|
try:
|
|
_cleanup_table_subscriptions(table_name)
|
|
except Exception as ce:
|
|
logger.warning(f"Subscription cleanup for {table_name} failed: {ce}")
|
|
|
|
# Regenerate data_description.md
|
|
docs_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md"
|
|
registry.generate_data_description_md(docs_path.resolve())
|
|
|
|
return jsonify({"ok": True, "version": registry.version})
|
|
|
|
except ConflictError as e:
|
|
return jsonify({"error": str(e)}), 409
|
|
except ValueError as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
except Exception as e:
|
|
logger.error(f"Unregister table failed: {e}")
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
def _cleanup_table_subscriptions(table_name: str) -> None:
|
|
"""Remove a table from all users' per-table subscriptions."""
|
|
from webapp.sync_settings_service import _read_json, _write_json, SYNC_SETTINGS_FILE
|
|
|
|
all_settings = _read_json(SYNC_SETTINGS_FILE)
|
|
changed = False
|
|
for username, user_data in all_settings.items():
|
|
tables = user_data.get("tables", {})
|
|
if table_name in tables:
|
|
del tables[table_name]
|
|
changed = True
|
|
if changed:
|
|
_write_json(SYNC_SETTINGS_FILE, all_settings)
|
|
logger.info(f"Cleaned up subscriptions for removed table: {table_name}")
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
"""
|
|
Health check endpoint for monitoring.
|
|
|
|
Returns detailed status of services, disk, load, and recent activity.
|
|
Returns 200 if healthy, 503 if degraded.
|
|
"""
|
|
from webapp.health_service import health_check
|
|
|
|
response, status_code = health_check()
|
|
return response, status_code
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(e):
|
|
"""Handle 404 errors."""
|
|
return render_template("error.html", error="Page not found", code=404), 404
|
|
|
|
@app.errorhandler(500)
|
|
def server_error(e):
|
|
"""Handle 500 errors."""
|
|
logger.exception("Server error")
|
|
return render_template("error.html", error="Internal server error", code=500), 500
|
|
|
|
|
|
# Create the app instance for Gunicorn
|
|
app = create_app()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Development server
|
|
app.run(debug=True, host="127.0.0.1", port=5000)
|