diff --git a/connectors/keboola/adapter.py b/connectors/keboola/adapter.py index bd347c6..760e928 100644 --- a/connectors/keboola/adapter.py +++ b/connectors/keboola/adapter.py @@ -107,6 +107,10 @@ class KeboolaDataSource(DataSource): return {"columns": result} + def discover_tables(self) -> List[Dict[str, Any]]: + """Discover all available tables from Keboola Storage.""" + return self.keboola_client.discover_all_tables() + def get_source_name(self) -> str: """Display name of this data source.""" return "Keboola Storage API" diff --git a/connectors/keboola/client.py b/connectors/keboola/client.py index 9e1344d..1c1b98e 100644 --- a/connectors/keboola/client.py +++ b/connectors/keboola/client.py @@ -754,6 +754,51 @@ class KeboolaClient: output_path.unlink() raise + def discover_all_tables(self) -> List[Dict[str, Any]]: + """List all available tables in the Keboola project. + + Tries tables.list(include=["columns","buckets"]) first. + Falls back to per-bucket listing if that fails. + + Returns: + Normalized list of table dicts. + """ + logger.info("Discovering all tables in Keboola project...") + + try: + raw_tables = self.client.tables.list(include="columns,buckets") + except Exception as e: + logger.warning(f"tables.list() failed ({e}), falling back to per-bucket listing") + raw_tables = [] + for bucket in self.client.buckets.list(): + bucket_id = bucket["id"] + try: + bucket_tables = self.client.buckets.list_tables(bucket_id, include="columns") + for t in bucket_tables: + t.setdefault("bucket", bucket) + raw_tables.extend(bucket_tables) + except Exception as be: + logger.warning(f"Could not list tables in bucket {bucket_id}: {be}") + + result = [] + for t in raw_tables: + bucket = t.get("bucket", {}) + result.append({ + "id": t.get("id", ""), + "name": t.get("name", ""), + "bucket_id": bucket.get("id", ""), + "bucket_name": bucket.get("name", bucket.get("id", "")), + "columns": t.get("columns", []), + "row_count": t.get("rowsCount", 0), + "size_bytes": t.get("dataSizeBytes", 0), + "primary_key": t.get("primaryKey", []), + "last_change": t.get("lastChangeDate"), + "last_import": t.get("lastImportDate"), + }) + + logger.info(f"Discovered {len(result)} tables") + return result + def test_connection(self) -> bool: """ Test connection to Keboola API. diff --git a/scripts/sync_data.sh b/scripts/sync_data.sh index 05aa6f8..131e745 100755 --- a/scripts/sync_data.sh +++ b/scripts/sync_data.sh @@ -210,6 +210,14 @@ datasets: kbc_telemetry_expert: false DEFAULTS fi + # Download rsync filter for per-table sync + SYNC_FILTER_LOCAL="/tmp/.sync_rsync_filter_$(id -u)" + if scp -q data-analyst:~/.sync_rsync_filter "$SYNC_FILTER_LOCAL" 2>/dev/null; then + echo " ✅ Filter file loaded" + else + # No filter file = no per-table filtering + rm -f "$SYNC_FILTER_LOCAL" + fi echo "" else # For dry-run, still need settings to show what would happen @@ -221,6 +229,9 @@ datasets: kbc_telemetry_expert: false DEFAULTS fi + # Download rsync filter for dry-run too + SYNC_FILTER_LOCAL="/tmp/.sync_rsync_filter_$(id -u)" + scp -q data-analyst:~/.sync_rsync_filter "$SYNC_FILTER_LOCAL" 2>/dev/null || rm -f "$SYNC_FILTER_LOCAL" fi # --- Sync server/ content (read-only from server, --delete removes obsolete files) --- @@ -275,7 +286,12 @@ fi # Optional datasets are synced by sub-scripts based on user config echo "📦 Syncing core parquet files..." if [[ "$USE_RSYNC" == true ]]; then - rsync_reliable -av --delete --progress --exclude='jira/' --exclude='kbc_telemetry_expert/' $DRY_RUN data-analyst:server/parquet/ ./server/parquet/ + if [[ -f "$SYNC_FILTER_LOCAL" ]] && grep -q "table_mode: explicit" "$SYNC_FILTER_LOCAL" 2>/dev/null; then + echo " Using per-table filter (explicit mode)" + rsync_reliable -av --delete --progress --filter="merge $SYNC_FILTER_LOCAL" $DRY_RUN data-analyst:server/parquet/ ./server/parquet/ + else + rsync_reliable -av --delete --progress --exclude='jira/' --exclude='kbc_telemetry_expert/' $DRY_RUN data-analyst:server/parquet/ ./server/parquet/ + fi else sync_from_server server/parquet ./server/parquet fi diff --git a/src/data_sync.py b/src/data_sync.py index a99b47a..5484279 100644 --- a/src/data_sync.py +++ b/src/data_sync.py @@ -162,6 +162,16 @@ class DataSource(ABC): """ pass + def discover_tables(self) -> List[Dict[str, Any]]: + """List all available tables in the data source. + + Returns list of dicts with at minimum: + id, name, bucket_id, columns, row_count, size_bytes, + primary_key, last_change + Default: empty list (source doesn't support discovery). + """ + return [] + def get_column_metadata(self, table_id: str) -> Optional[Dict[str, Any]]: """Return processed column metadata for schema generation. @@ -426,6 +436,24 @@ class DataSyncManager: except Exception as e: logger.warning(f"Failed to generate schema.yml: {e}") + # Auto-profile changed tables + if success_count > 0: + try: + from src.profiler import profile_changed_tables + changed = [ + self.config.get_table_config(tid).name + for tid, r in results.items() + if r.get("success") and self.config.get_table_config(tid) + ] + if changed: + result = profile_changed_tables(changed) + logger.info( + f"Auto-profiling: {result['success']} profiled, " + f"{result['errors']} errors, {result['skipped']} skipped" + ) + except Exception as e: + logger.warning(f"Auto-profiling failed (non-fatal): {e}") + return results diff --git a/src/profiler.py b/src/profiler.py index 56c0c88..3f07731 100644 --- a/src/profiler.py +++ b/src/profiler.py @@ -1053,6 +1053,108 @@ def profile_table( } +# --------------------------------------------------------------------------- +# Auto-profiling API +# --------------------------------------------------------------------------- +def profile_changed_tables(table_names: list[str]) -> dict: + """Profile only specified tables, preserve existing profiles for others. + + Public API for auto-profiling after sync. + + 1. Load existing profiles.json + 2. For each table_name in table_names: find parquet, call profile_table() + 3. Merge new profiles into existing (preserve untouched tables) + 4. Write atomically + Returns: {"success": int, "errors": int, "skipped": int} + """ + success = 0 + errors = 0 + skipped = 0 + + # Parse data_description.md for table definitions and folder mapping + tables, folder_mapping = parse_data_description(DATA_DESCRIPTION_PATH) + if not tables: + logger.warning("profile_changed_tables: no tables in data_description.md") + return {"success": 0, "errors": 0, "skipped": len(table_names)} + + # Build lookup by table name + table_by_name: Dict[str, TableInfo] = {t.name: t for t in tables} + + # Load sync state and metrics + sync_state = load_sync_state(SYNC_STATE_PATH) + metrics_map = load_metrics(METRICS_YML_PATH) + metric_file_map = load_metric_file_map(METRICS_YML_PATH) + + # Load existing profiles.json to preserve untouched tables + existing_profiles: Dict[str, Any] = {} + try: + if PROFILES_OUTPUT_PATH.exists(): + with open(PROFILES_OUTPUT_PATH) as f: + existing_data = json.load(f) + existing_profiles = existing_data.get("tables", {}) + except Exception as exc: + logger.warning("profile_changed_tables: could not load existing profiles: %s", exc) + + # Profile each requested table + new_profiles: Dict[str, Any] = {} + for name in table_names: + table = table_by_name.get(name) + if table is None: + logger.warning("profile_changed_tables: table %r not found in data_description.md", name) + skipped += 1 + continue + + parquet_path = get_parquet_path(table, folder_mapping) + + # Check parquet existence + if parquet_path.is_dir(): + parquet_files = list(parquet_path.glob("*.parquet")) + if not parquet_files: + logger.warning("profile_changed_tables: no parquet files for %s in %s", name, parquet_path) + skipped += 1 + continue + elif not parquet_path.exists(): + logger.warning("profile_changed_tables: parquet not found for %s at %s", name, parquet_path) + skipped += 1 + continue + + try: + logger.info("Auto-profiling %s ...", name) + profile = profile_table( + table, parquet_path, tables, sync_state, metrics_map, metric_file_map + ) + new_profiles[name] = profile + success += 1 + logger.info( + " %s: %d rows, %d cols, %d alerts", + name, + profile["row_count"], + profile["column_count"], + len(profile["alerts"]), + ) + except Exception as exc: + logger.error("Auto-profiling failed for %s: %s", name, exc) + errors += 1 + + # Merge: existing profiles + newly profiled (new overwrite old) + merged = {**existing_profiles, **new_profiles} + + # Write atomically + output = { + "generated_at": datetime.utcnow().isoformat() + "Z", + "version": "1.0", + "tables": merged, + } + METADATA_DIR.mkdir(parents=True, exist_ok=True) + write_json_atomic(PROFILES_OUTPUT_PATH, output) + + logger.info( + "Auto-profiling complete: %d profiled, %d skipped, %d errors", + success, skipped, errors, + ) + return {"success": success, "errors": errors, "skipped": skipped} + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- diff --git a/src/table_registry.py b/src/table_registry.py new file mode 100644 index 0000000..e8b84cf --- /dev/null +++ b/src/table_registry.py @@ -0,0 +1,464 @@ +""" +Table Registry - Central source of truth for registered tables. + +Manages table registrations in a JSON file. Generates data_description.md +as a read-only output for downstream consumers (config.py, profiler.py, webapp). + +Supports: +- CRUD operations on registered tables +- Folder mapping (bucket -> folder name) +- Atomic persistence (tempfile + os.replace) +- Optimistic locking (version field) +- Audit logging +- One-time migration from existing data_description.md +- Generation of data_description.md with checksum header +""" + +import hashlib +import json +import logging +import os +import re +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Optional + +import yaml + +logger = logging.getLogger(__name__) + +# Default registry location +_DEFAULT_REGISTRY_DIR = Path( + os.environ.get("REGISTRY_DIR", "/data/src_data/metadata") +) +_REGISTRY_FILENAME = "table_registry.json" + + +def _now_iso() -> str: + """Return current UTC time as ISO string.""" + return datetime.now(timezone.utc).isoformat() + + +def _atomic_write_json(path: Path, data: dict) -> None: + """Write JSON atomically using tempfile + os.replace.""" + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp( + dir=str(path.parent), suffix=".tmp" + ) + try: + with os.fdopen(fd, "w") as f: + json.dump(data, f, indent=2, default=str) + os.chmod(tmp_path, 0o660) + os.replace(tmp_path, str(path)) + except Exception: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +def _audit_log(registry_path: Path, action: str, details: dict) -> None: + """Append entry to registry audit log.""" + audit_path = registry_path.parent / "registry_audit.log" + try: + entry = { + "timestamp": _now_iso(), + "action": action, + **details, + } + with open(audit_path, "a") as f: + f.write(json.dumps(entry, default=str) + "\n") + except Exception as e: + logger.warning(f"Could not write audit log: {e}") + + +class TableRegistry: + """Manages table registrations. Source of truth for what gets synced.""" + + def __init__(self, registry_path: Path): + self.registry_path = registry_path + self._data = self._load() + + @classmethod + def default(cls) -> "TableRegistry": + """Create registry at the default location.""" + return cls(_DEFAULT_REGISTRY_DIR / _REGISTRY_FILENAME) + + # ── Persistence ────────────────────────────────────────────────── + + def _load(self) -> dict: + """Load registry from disk. Returns empty structure if not found.""" + if self.registry_path.exists(): + try: + with open(self.registry_path) as f: + data = json.load(f) + logger.info( + f"Registry loaded: {len(data.get('tables', []))} tables" + ) + return data + except Exception as e: + logger.error(f"Error loading registry: {e}") + return self._empty_registry() + + def _save(self) -> None: + """Save registry to disk atomically.""" + self._data["_metadata"]["updated_at"] = _now_iso() + self._data["_metadata"]["version"] = self.version + 1 + _atomic_write_json(self.registry_path, self._data) + logger.debug("Registry saved (version %d)", self.version) + + @staticmethod + def _empty_registry() -> dict: + now = _now_iso() + return { + "_metadata": { + "version": 0, + "created_at": now, + "updated_at": now, + }, + "folder_mapping": {}, + "tables": [], + } + + # ── Properties ─────────────────────────────────────────────────── + + @property + def version(self) -> int: + return self._data.get("_metadata", {}).get("version", 0) + + # ── Core CRUD ──────────────────────────────────────────────────── + + def list_tables(self) -> list[dict]: + """Return all registered tables.""" + return list(self._data.get("tables", [])) + + def get_table(self, table_id: str) -> Optional[dict]: + """Get a single table by ID.""" + for t in self._data.get("tables", []): + if t["id"] == table_id: + return dict(t) + return None + + def is_registered(self, table_id: str) -> bool: + return any(t["id"] == table_id for t in self._data.get("tables", [])) + + def register_table( + self, + table_def: dict, + registered_by: str, + expected_version: Optional[int] = None, + ) -> None: + """Register a new table. + + Args: + table_def: Table definition dict (must contain id, name, sync_strategy, primary_key). + registered_by: Email of the admin who registered the table. + expected_version: If provided, reject if registry version doesn't match (optimistic lock). + + Raises: + ValueError: If table already registered or validation fails. + ConflictError: If expected_version doesn't match. + """ + if expected_version is not None and expected_version != self.version: + raise ConflictError( + f"Version conflict: expected {expected_version}, current {self.version}" + ) + + table_id = table_def.get("id", "") + if not table_id: + raise ValueError("Table definition must include 'id'") + + if self.is_registered(table_id): + raise ValueError(f"Table '{table_id}' is already registered") + + # Validate required fields + for field in ("name", "sync_strategy", "primary_key"): + if not table_def.get(field): + raise ValueError(f"Table definition must include '{field}'") + + # Validate sync_strategy + valid_strategies = ("full_refresh", "incremental", "partitioned") + if table_def["sync_strategy"] not in valid_strategies: + raise ValueError( + f"Invalid sync_strategy '{table_def['sync_strategy']}'. " + f"Allowed: {', '.join(valid_strategies)}" + ) + + # Build full record + record = { + "id": table_id, + "name": table_def["name"], + "description": table_def.get("description", ""), + "primary_key": table_def["primary_key"], + "sync_strategy": table_def["sync_strategy"], + "incremental_window_days": table_def.get("incremental_window_days"), + "partition_by": table_def.get("partition_by"), + "partition_granularity": table_def.get("partition_granularity"), + "foreign_keys": table_def.get("foreign_keys", []), + "where_filters": table_def.get("where_filters", []), + "folder": table_def.get("folder"), + "dataset": table_def.get("dataset"), + "initial_load_chunk_days": table_def.get("initial_load_chunk_days", 30), + "registered_at": _now_iso(), + "registered_by": registered_by, + "source_metadata": table_def.get("source_metadata", {}), + } + + self._data["tables"].append(record) + self._save() + + _audit_log(self.registry_path, "register", { + "table_id": table_id, + "by": registered_by, + }) + + def unregister_table( + self, + table_id: str, + unregistered_by: str = "", + expected_version: Optional[int] = None, + ) -> None: + """Remove a table from the registry. + + Raises: + ValueError: If table not found. + ConflictError: If expected_version doesn't match. + """ + if expected_version is not None and expected_version != self.version: + raise ConflictError( + f"Version conflict: expected {expected_version}, current {self.version}" + ) + + tables = self._data.get("tables", []) + new_tables = [t for t in tables if t["id"] != table_id] + + if len(new_tables) == len(tables): + raise ValueError(f"Table '{table_id}' is not registered") + + self._data["tables"] = new_tables + self._save() + + _audit_log(self.registry_path, "unregister", { + "table_id": table_id, + "by": unregistered_by, + }) + + def update_table( + self, + table_id: str, + updates: dict, + updated_by: str = "", + expected_version: Optional[int] = None, + ) -> None: + """Update table configuration. + + Raises: + ValueError: If table not found. + ConflictError: If expected_version doesn't match. + """ + if expected_version is not None and expected_version != self.version: + raise ConflictError( + f"Version conflict: expected {expected_version}, current {self.version}" + ) + + # Fields that can be updated + allowed_fields = { + "description", "primary_key", "sync_strategy", + "incremental_window_days", "partition_by", "partition_granularity", + "foreign_keys", "where_filters", "folder", "dataset", + "initial_load_chunk_days", + } + + for t in self._data.get("tables", []): + if t["id"] == table_id: + for key, value in updates.items(): + if key in allowed_fields: + t[key] = value + self._save() + _audit_log(self.registry_path, "update", { + "table_id": table_id, + "fields": list(updates.keys()), + "by": updated_by, + }) + return + + raise ValueError(f"Table '{table_id}' is not registered") + + # ── Folder mapping ─────────────────────────────────────────────── + + def get_folder_mapping(self) -> dict[str, str]: + return dict(self._data.get("folder_mapping", {})) + + def set_folder_mapping(self, bucket_id: str, folder: str) -> None: + self._data.setdefault("folder_mapping", {})[bucket_id] = folder + self._save() + + # ── Generation ─────────────────────────────────────────────────── + + def generate_data_description_md(self, output_path: Path) -> None: + """Regenerate data_description.md from registry. + + The generated file is read-only and includes a checksum header. + Existing readers (config.py, profiler.py) consume this without changes. + """ + tables = self.list_tables() + folder_mapping = self.get_folder_mapping() + + # Build YAML structure matching existing data_description.md format + yaml_data: dict[str, Any] = {} + + if folder_mapping: + yaml_data["folder_mapping"] = folder_mapping + + yaml_tables = [] + for t in tables: + entry: dict[str, Any] = { + "id": t["id"], + "name": t["name"], + "description": t.get("description", ""), + "primary_key": t["primary_key"], + "sync_strategy": t["sync_strategy"], + } + + # Optional fields -- only include if set + if t.get("incremental_window_days"): + entry["incremental_window_days"] = t["incremental_window_days"] + if t.get("partition_by"): + entry["partition_by"] = t["partition_by"] + if t.get("partition_granularity"): + entry["partition_granularity"] = t["partition_granularity"] + if t.get("max_history_days"): + entry["max_history_days"] = t["max_history_days"] + if t.get("initial_load_chunk_days") and t["initial_load_chunk_days"] != 30: + entry["initial_load_chunk_days"] = t["initial_load_chunk_days"] + if t.get("foreign_keys"): + entry["foreign_keys"] = t["foreign_keys"] + if t.get("where_filters"): + entry["where_filters"] = t["where_filters"] + if t.get("folder"): + entry["folder"] = t["folder"] + if t.get("dataset"): + entry["dataset"] = t["dataset"] + + yaml_tables.append(entry) + + yaml_data["tables"] = yaml_tables + + yaml_str = yaml.dump( + yaml_data, default_flow_style=False, sort_keys=False, allow_unicode=True + ) + + # Compute checksum + checksum = hashlib.sha256(yaml_str.encode()).hexdigest()[:16] + + # Build markdown + lines = [ + f"", + f"", + f"", + "", + "# Data Description", + "", + f"Generated at {_now_iso()} from table registry " + f"(version {self.version}, {len(yaml_tables)} tables).", + "", + "```yaml", + yaml_str.rstrip(), + "```", + "", + ] + + content = "\n".join(lines) + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(content) + logger.info( + f"Generated data_description.md: {len(yaml_tables)} tables " + f"(checksum: {checksum})" + ) + + # ── Migration ──────────────────────────────────────────────────── + + @classmethod + def import_from_data_description( + cls, + md_path: Path, + registry_path: Path, + registered_by: str = "migration", + ) -> "TableRegistry": + """One-time migration: parse existing data_description.md into registry. + + Creates a new registry JSON from the existing markdown YAML blocks. + """ + if not md_path.exists(): + raise FileNotFoundError(f"data_description.md not found: {md_path}") + + content = md_path.read_text() + + # Extract YAML blocks + yaml_matches = re.findall(r"```yaml\n(.*?)```", content, re.DOTALL) + if not yaml_matches: + raise ValueError("No YAML blocks found in data_description.md") + + all_tables: list[dict] = [] + folder_mapping: dict[str, str] = {} + + for yaml_block in yaml_matches: + data = yaml.safe_load(yaml_block) + if data: + if "tables" in data: + all_tables.extend(data["tables"]) + if "folder_mapping" in data: + folder_mapping.update(data["folder_mapping"]) + + if not all_tables: + raise ValueError("No tables found in YAML blocks") + + # Build registry + registry = cls(registry_path) + registry._data = cls._empty_registry() + registry._data["folder_mapping"] = folder_mapping + registry._data["_metadata"]["migrated_from"] = str(md_path) + + now = _now_iso() + for table_data in all_tables: + record = { + "id": table_data.get("id", ""), + "name": table_data.get("name", ""), + "description": table_data.get("description", ""), + "primary_key": table_data.get("primary_key", ""), + "sync_strategy": table_data.get("sync_strategy", "full_refresh"), + "incremental_window_days": table_data.get("incremental_window_days"), + "partition_by": table_data.get("partition_by"), + "partition_granularity": table_data.get("partition_granularity"), + "foreign_keys": table_data.get("foreign_keys", []), + "where_filters": table_data.get("where_filters", []), + "folder": table_data.get("folder"), + "dataset": table_data.get("dataset"), + "initial_load_chunk_days": table_data.get("initial_load_chunk_days", 30), + "max_history_days": table_data.get("max_history_days"), + "registered_at": now, + "registered_by": registered_by, + "source_metadata": {}, + } + registry._data["tables"].append(record) + + registry._save() + + _audit_log(registry_path, "migrate", { + "source": str(md_path), + "tables_imported": len(all_tables), + "by": registered_by, + }) + + logger.info( + f"Migrated {len(all_tables)} tables from {md_path} to registry" + ) + return registry + + +class ConflictError(Exception): + """Raised when optimistic locking version doesn't match.""" + pass diff --git a/tests/test_auto_profiling.py b/tests/test_auto_profiling.py new file mode 100644 index 0000000..f8e51e6 --- /dev/null +++ b/tests/test_auto_profiling.py @@ -0,0 +1,345 @@ +"""Tests for auto-profiling: profile_changed_tables() function.""" + +import json +import tempfile +from pathlib import Path +from unittest.mock import patch + +import duckdb +import pytest + +from src.profiler import ( + TableInfo, + profile_changed_tables, + PROFILES_OUTPUT_PATH, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _make_parquet(tmp_path: Path, folder: str, table_name: str) -> Path: + """Create a small parquet file and return the file path.""" + folder_path = tmp_path / "parquet" / folder + folder_path.mkdir(parents=True, exist_ok=True) + parquet_path = folder_path / f"{table_name}.parquet" + con = duckdb.connect() + con.execute(f""" + COPY ( + SELECT * FROM (VALUES + (1, 'alpha', 10.0), + (2, 'beta', 20.0), + (3, 'gamma', 30.0) + ) AS t(id, name, value) + ) TO '{parquet_path}' (FORMAT PARQUET) + """) + con.close() + return parquet_path + + +def _make_data_description(tmp_path: Path, tables: list[dict]) -> Path: + """Create a minimal data_description.md with the given table definitions.""" + import yaml + + docs_dir = tmp_path / "docs" + docs_dir.mkdir(parents=True, exist_ok=True) + dd_path = docs_dir / "data_description.md" + + table_defs = [] + for t in tables: + table_defs.append({ + "id": t["id"], + "name": t["name"], + "description": t.get("description", f"Table {t['name']}"), + "primary_key": t.get("primary_key", "id"), + "sync_strategy": t.get("sync_strategy", "full"), + "foreign_keys": [], + }) + + yaml_content = yaml.dump( + {"tables": table_defs, "folder_mapping": t.get("folder_mapping", {})}, + default_flow_style=False, + ) + dd_path.write_text(f"# Data\n\n```yaml\n{yaml_content}```\n") + return dd_path + + +def _make_profiles_json(metadata_dir: Path, tables: dict) -> Path: + """Write an existing profiles.json.""" + metadata_dir.mkdir(parents=True, exist_ok=True) + profiles_path = metadata_dir / "profiles.json" + profiles_path.write_text(json.dumps({ + "generated_at": "2026-01-01T00:00:00Z", + "version": "1.0", + "tables": tables, + })) + return profiles_path + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- +@pytest.fixture +def data_env(tmp_path): + """Set up a temporary data environment with parquet + data_description. + + Returns a dict with paths and table definitions. + """ + # Create two tables' parquet files + _make_parquet(tmp_path, "bucket_a", "orders") + _make_parquet(tmp_path, "bucket_a", "customers") + + # Create data_description.md + folder_mapping = {"in.c-main": "bucket_a"} + tables = [ + { + "id": "in.c-main.orders", + "name": "orders", + "primary_key": "id", + "sync_strategy": "full", + "folder_mapping": folder_mapping, + }, + { + "id": "in.c-main.customers", + "name": "customers", + "primary_key": "id", + "sync_strategy": "full", + "folder_mapping": folder_mapping, + }, + ] + dd_path = _make_data_description(tmp_path, tables) + + metadata_dir = tmp_path / "parquet" / ".." / "metadata" + metadata_dir = tmp_path / "metadata" + metadata_dir.mkdir(parents=True, exist_ok=True) + + return { + "tmp_path": tmp_path, + "parquet_dir": tmp_path / "parquet", + "metadata_dir": metadata_dir, + "docs_dir": tmp_path / "docs", + "dd_path": dd_path, + "profiles_path": metadata_dir / "profiles.json", + } + + +def _patch_profiler_paths(data_env): + """Return a dict of patches for profiler module-level path constants.""" + return { + "src.profiler.PARQUET_DIR": data_env["parquet_dir"], + "src.profiler.METADATA_DIR": data_env["metadata_dir"], + "src.profiler.PROFILES_OUTPUT_PATH": data_env["profiles_path"], + "src.profiler.DATA_DESCRIPTION_PATH": data_env["dd_path"], + "src.profiler.SYNC_STATE_PATH": data_env["metadata_dir"] / "sync_state.json", + "src.profiler.METRICS_YML_PATH": data_env["docs_dir"] / "metrics.yml", + } + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- +class TestProfileChangedTablesReturnsCounts: + """profile_changed_tables returns correct success/errors/skipped counts.""" + + def test_all_tables_profiled(self, data_env): + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + result = profile_changed_tables(["orders", "customers"]) + + assert result["success"] == 2 + assert result["errors"] == 0 + assert result["skipped"] == 0 + + def test_single_table_profiled(self, data_env): + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + result = profile_changed_tables(["orders"]) + + assert result["success"] == 1 + assert result["errors"] == 0 + assert result["skipped"] == 0 + + def test_profiles_json_written(self, data_env): + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + profile_changed_tables(["orders"]) + + profiles_path = data_env["profiles_path"] + assert profiles_path.exists() + data = json.loads(profiles_path.read_text()) + assert "orders" in data["tables"] + assert data["tables"]["orders"]["row_count"] == 3 + + +class TestPreservesExistingProfiles: + """When profiling a subset, existing profiles for other tables are preserved.""" + + def test_existing_profiles_kept(self, data_env): + # Write pre-existing profiles for a table called "legacy" + _make_profiles_json(data_env["metadata_dir"], { + "legacy": {"row_count": 999, "column_count": 5, "alerts": []}, + }) + + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + result = profile_changed_tables(["orders"]) + + assert result["success"] == 1 + + data = json.loads(data_env["profiles_path"].read_text()) + # New profile written + assert "orders" in data["tables"] + # Old profile preserved + assert "legacy" in data["tables"] + assert data["tables"]["legacy"]["row_count"] == 999 + + def test_existing_profile_overwritten_for_reprofiled_table(self, data_env): + # Write stale profile for "orders" + _make_profiles_json(data_env["metadata_dir"], { + "orders": {"row_count": 0, "column_count": 0, "alerts": [], "_stale": True}, + }) + + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + result = profile_changed_tables(["orders"]) + + assert result["success"] == 1 + data = json.loads(data_env["profiles_path"].read_text()) + # Profile should be fresh, not the stale one + assert data["tables"]["orders"]["row_count"] == 3 + assert "_stale" not in data["tables"]["orders"] + + +class TestErrorsCounted: + """Errors during profiling are counted and don't abort the whole run.""" + + def test_error_counted_not_aborted(self, data_env): + # Capture the real profile_table before patching to avoid recursion + from src.profiler import profile_table as real_profile_table + + def _failing_profile_table(table, *args, **kwargs): + if table.name == "orders": + raise RuntimeError("Simulated profiling error") + return real_profile_table(table, *args, **kwargs) + + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}), \ + patch("src.profiler.profile_table", side_effect=_failing_profile_table): + result = profile_changed_tables(["orders", "customers"]) + + assert result["errors"] == 1 + assert result["success"] == 1 + assert result["skipped"] == 0 + + def test_all_errors(self, data_env): + patches = _patch_profiler_paths(data_env) + + def _always_fail(table, *args, **kwargs): + raise RuntimeError("Simulated error") + + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}), \ + patch("src.profiler.profile_table", side_effect=_always_fail): + result = profile_changed_tables(["orders", "customers"]) + + assert result["errors"] == 2 + assert result["success"] == 0 + assert result["skipped"] == 0 + + +class TestSkippedTables: + """Tables without parquet files or not in data_description are skipped.""" + + def test_unknown_table_skipped(self, data_env): + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + result = profile_changed_tables(["nonexistent_table"]) + + assert result["skipped"] == 1 + assert result["success"] == 0 + assert result["errors"] == 0 + + def test_missing_parquet_skipped(self, data_env): + # Add a table to data_description but don't create its parquet file + import yaml + + dd_path = data_env["dd_path"] + folder_mapping = {"in.c-main": "bucket_a"} + tables = [ + { + "id": "in.c-main.orders", + "name": "orders", + "description": "Orders table", + "primary_key": "id", + "sync_strategy": "full", + "foreign_keys": [], + }, + { + "id": "in.c-main.no_data", + "name": "no_data", + "description": "Table without parquet", + "primary_key": "id", + "sync_strategy": "full", + "foreign_keys": [], + }, + ] + yaml_content = yaml.dump( + {"tables": tables, "folder_mapping": folder_mapping}, + default_flow_style=False, + ) + dd_path.write_text(f"# Data\n\n```yaml\n{yaml_content}```\n") + + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + result = profile_changed_tables(["orders", "no_data"]) + + assert result["success"] == 1 + assert result["skipped"] == 1 + assert result["errors"] == 0 + + def test_empty_list(self, data_env): + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + result = profile_changed_tables([]) + + assert result["success"] == 0 + assert result["skipped"] == 0 + assert result["errors"] == 0 + + def test_mixed_valid_invalid_unknown(self, data_env): + """Combination: one valid, one unknown, one missing parquet.""" + import yaml + + dd_path = data_env["dd_path"] + folder_mapping = {"in.c-main": "bucket_a"} + tables = [ + { + "id": "in.c-main.orders", + "name": "orders", + "description": "Orders table", + "primary_key": "id", + "sync_strategy": "full", + "foreign_keys": [], + }, + { + "id": "in.c-main.ghost", + "name": "ghost", + "description": "Ghost table without data", + "primary_key": "id", + "sync_strategy": "full", + "foreign_keys": [], + }, + ] + yaml_content = yaml.dump( + {"tables": tables, "folder_mapping": folder_mapping}, + default_flow_style=False, + ) + dd_path.write_text(f"# Data\n\n```yaml\n{yaml_content}```\n") + + patches = _patch_profiler_paths(data_env) + with patch.multiple("src.profiler", **{k.split(".")[-1]: v for k, v in patches.items()}): + # orders = valid, ghost = no parquet, unknown = not in data_description + result = profile_changed_tables(["orders", "ghost", "unknown"]) + + assert result["success"] == 1 + assert result["skipped"] == 2 # ghost (no parquet) + unknown (not in DD) + assert result["errors"] == 0 diff --git a/tests/test_table_registry.py b/tests/test_table_registry.py new file mode 100644 index 0000000..819bfa7 --- /dev/null +++ b/tests/test_table_registry.py @@ -0,0 +1,363 @@ +"""Tests for the Table Registry module.""" + +import json +from pathlib import Path + +import pytest +import yaml + +from src.table_registry import ConflictError, TableRegistry + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def registry_path(tmp_path): + """Return a temp path for the registry JSON.""" + return tmp_path / "table_registry.json" + + +@pytest.fixture +def registry(registry_path): + """Create an empty registry.""" + return TableRegistry(registry_path) + + +@pytest.fixture +def sample_table(): + """Minimal valid table definition.""" + return { + "id": "in.c-crm.company", + "name": "company", + "description": "Customer master data", + "primary_key": "id", + "sync_strategy": "full_refresh", + } + + +@pytest.fixture +def sample_table_incremental(): + """Incremental table definition.""" + return { + "id": "in.c-crm.events", + "name": "events", + "description": "User events", + "primary_key": "event_id", + "sync_strategy": "incremental", + "incremental_window_days": 14, + "partition_by": "created_at", + "partition_granularity": "month", + } + + +# --------------------------------------------------------------------------- +# Basic CRUD +# --------------------------------------------------------------------------- + +class TestRegistryCRUD: + + def test_empty_registry(self, registry): + assert registry.list_tables() == [] + assert registry.version == 0 + + def test_register_table(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + tables = registry.list_tables() + assert len(tables) == 1 + assert tables[0]["id"] == "in.c-crm.company" + assert tables[0]["registered_by"] == "admin@test.com" + assert registry.version == 1 + + def test_register_duplicate_raises(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + with pytest.raises(ValueError, match="already registered"): + registry.register_table(sample_table, registered_by="admin@test.com") + + def test_get_table(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + t = registry.get_table("in.c-crm.company") + assert t is not None + assert t["name"] == "company" + + def test_get_table_not_found(self, registry): + assert registry.get_table("nonexistent") is None + + def test_is_registered(self, registry, sample_table): + assert not registry.is_registered("in.c-crm.company") + registry.register_table(sample_table, registered_by="admin@test.com") + assert registry.is_registered("in.c-crm.company") + + def test_unregister_table(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + registry.unregister_table("in.c-crm.company", unregistered_by="admin@test.com") + assert not registry.is_registered("in.c-crm.company") + assert registry.list_tables() == [] + + def test_unregister_nonexistent_raises(self, registry): + with pytest.raises(ValueError, match="not registered"): + registry.unregister_table("nonexistent") + + def test_update_table(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + registry.update_table( + "in.c-crm.company", + {"description": "Updated description", "sync_strategy": "incremental"}, + updated_by="admin@test.com", + ) + t = registry.get_table("in.c-crm.company") + assert t["description"] == "Updated description" + assert t["sync_strategy"] == "incremental" + + def test_update_nonexistent_raises(self, registry): + with pytest.raises(ValueError, match="not registered"): + registry.update_table("nonexistent", {"description": "x"}) + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + +class TestValidation: + + def test_missing_id_raises(self, registry): + with pytest.raises(ValueError, match="must include 'id'"): + registry.register_table( + {"name": "x", "sync_strategy": "full_refresh", "primary_key": "id"}, + registered_by="admin@test.com", + ) + + def test_missing_name_raises(self, registry): + with pytest.raises(ValueError, match="must include 'name'"): + registry.register_table( + {"id": "x.y.z", "sync_strategy": "full_refresh", "primary_key": "id"}, + registered_by="admin@test.com", + ) + + def test_invalid_sync_strategy_raises(self, registry): + with pytest.raises(ValueError, match="Invalid sync_strategy"): + registry.register_table( + { + "id": "x.y.z", + "name": "z", + "sync_strategy": "magic", + "primary_key": "id", + }, + registered_by="admin@test.com", + ) + + +# --------------------------------------------------------------------------- +# Optimistic locking +# --------------------------------------------------------------------------- + +class TestOptimisticLocking: + + def test_register_with_wrong_version_raises(self, registry, sample_table): + with pytest.raises(ConflictError, match="Version conflict"): + registry.register_table( + sample_table, registered_by="admin@test.com", expected_version=99 + ) + + def test_register_with_correct_version(self, registry, sample_table): + registry.register_table( + sample_table, registered_by="admin@test.com", expected_version=0 + ) + assert registry.version == 1 + + def test_unregister_with_wrong_version_raises(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + with pytest.raises(ConflictError): + registry.unregister_table( + "in.c-crm.company", expected_version=0 + ) + + def test_update_with_wrong_version_raises(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + with pytest.raises(ConflictError): + registry.update_table( + "in.c-crm.company", + {"description": "x"}, + expected_version=0, + ) + + +# --------------------------------------------------------------------------- +# Persistence +# --------------------------------------------------------------------------- + +class TestPersistence: + + def test_save_and_reload(self, registry_path, sample_table): + reg1 = TableRegistry(registry_path) + reg1.register_table(sample_table, registered_by="admin@test.com") + + # Reload from disk + reg2 = TableRegistry(registry_path) + assert len(reg2.list_tables()) == 1 + assert reg2.get_table("in.c-crm.company")["name"] == "company" + assert reg2.version == 1 + + def test_json_format(self, registry_path, sample_table): + reg = TableRegistry(registry_path) + reg.register_table(sample_table, registered_by="admin@test.com") + + with open(registry_path) as f: + data = json.load(f) + + assert "_metadata" in data + assert "tables" in data + assert data["_metadata"]["version"] == 1 + assert len(data["tables"]) == 1 + + +# --------------------------------------------------------------------------- +# Folder mapping +# --------------------------------------------------------------------------- + +class TestFolderMapping: + + def test_set_and_get(self, registry): + registry.set_folder_mapping("in.c-crm", "crm") + assert registry.get_folder_mapping() == {"in.c-crm": "crm"} + + def test_persists(self, registry_path): + reg1 = TableRegistry(registry_path) + reg1.set_folder_mapping("in.c-crm", "crm") + + reg2 = TableRegistry(registry_path) + assert reg2.get_folder_mapping() == {"in.c-crm": "crm"} + + +# --------------------------------------------------------------------------- +# Generation +# --------------------------------------------------------------------------- + +class TestGeneration: + + def test_generate_data_description_md(self, registry, sample_table, tmp_path): + registry.register_table(sample_table, registered_by="admin@test.com") + registry.set_folder_mapping("in.c-crm", "crm") + + output = tmp_path / "data_description.md" + registry.generate_data_description_md(output) + + content = output.read_text() + + # Check header + assert "AUTO-GENERATED" in content + assert "checksum: sha256:" in content + + # Check YAML block is parseable + yaml_match = __import__("re").search(r"```yaml\n(.*?)```", content, __import__("re").DOTALL) + assert yaml_match + yaml_data = yaml.safe_load(yaml_match.group(1)) + assert len(yaml_data["tables"]) == 1 + assert yaml_data["tables"][0]["id"] == "in.c-crm.company" + assert yaml_data["folder_mapping"] == {"in.c-crm": "crm"} + + def test_generate_includes_incremental_fields( + self, registry, sample_table_incremental, tmp_path + ): + registry.register_table(sample_table_incremental, registered_by="admin@test.com") + + output = tmp_path / "data_description.md" + registry.generate_data_description_md(output) + + content = output.read_text() + yaml_match = __import__("re").search(r"```yaml\n(.*?)```", content, __import__("re").DOTALL) + yaml_data = yaml.safe_load(yaml_match.group(1)) + table = yaml_data["tables"][0] + assert table["partition_by"] == "created_at" + assert table["partition_granularity"] == "month" + assert table["incremental_window_days"] == 14 + + +# --------------------------------------------------------------------------- +# Migration +# --------------------------------------------------------------------------- + +class TestMigration: + + def test_import_from_data_description(self, tmp_path): + # Create a fake data_description.md + md_content = """# Data Description + +```yaml +folder_mapping: + in.c-crm: crm + +tables: + - id: in.c-crm.company + name: company + description: Companies + primary_key: id + sync_strategy: full_refresh + + - id: in.c-crm.contact + name: contact + description: Contacts + primary_key: id + sync_strategy: incremental + incremental_window_days: 7 +``` +""" + md_path = tmp_path / "data_description.md" + md_path.write_text(md_content) + + registry_path = tmp_path / "table_registry.json" + registry = TableRegistry.import_from_data_description(md_path, registry_path) + + assert len(registry.list_tables()) == 2 + assert registry.is_registered("in.c-crm.company") + assert registry.is_registered("in.c-crm.contact") + assert registry.get_folder_mapping() == {"in.c-crm": "crm"} + + # Check migrated_from marker + with open(registry_path) as f: + data = json.load(f) + assert "migrated_from" in data["_metadata"] + + def test_import_no_yaml_raises(self, tmp_path): + md_path = tmp_path / "data_description.md" + md_path.write_text("# Empty file\nNo YAML here.") + + with pytest.raises(ValueError, match="No YAML blocks"): + TableRegistry.import_from_data_description( + md_path, tmp_path / "registry.json" + ) + + def test_import_file_not_found_raises(self, tmp_path): + with pytest.raises(FileNotFoundError): + TableRegistry.import_from_data_description( + tmp_path / "nonexistent.md", tmp_path / "registry.json" + ) + + +# --------------------------------------------------------------------------- +# Audit log +# --------------------------------------------------------------------------- + +class TestAuditLog: + + def test_register_writes_audit(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + + audit_path = registry.registry_path.parent / "registry_audit.log" + assert audit_path.exists() + + lines = audit_path.read_text().strip().split("\n") + assert len(lines) >= 1 + entry = json.loads(lines[-1]) + assert entry["action"] == "register" + assert entry["table_id"] == "in.c-crm.company" + + def test_unregister_writes_audit(self, registry, sample_table): + registry.register_table(sample_table, registered_by="admin@test.com") + registry.unregister_table("in.c-crm.company", unregistered_by="admin@test.com") + + audit_path = registry.registry_path.parent / "registry_audit.log" + lines = audit_path.read_text().strip().split("\n") + last_entry = json.loads(lines[-1]) + assert last_entry["action"] == "unregister" diff --git a/webapp/app.py b/webapp/app.py index de1c87d..6117774 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -15,12 +15,12 @@ from pathlib import Path from flask import Flask, flash, jsonify, redirect, render_template, request, session, url_for -from .auth import auth_bp, login_required +from .auth import admin_required, auth_bp, login_required from .config import Config from .desktop_auth import require_desktop_auth from .notification_images import images_bp from .account_service import get_account_details -from .sync_settings_service import get_sync_settings, update_sync_settings +from .sync_settings_service import get_sync_settings, update_sync_settings, get_table_subscriptions, update_table_subscriptions # Jira connector is optional - only loaded if configured try: @@ -396,6 +396,18 @@ def register_routes(app: Flask) -> None: # Load sync settings (for existing users) sync_settings = get_sync_settings(username) if user_info.exists else None + # Add subscription status to catalog tables + if user_info.exists: + subs = get_table_subscriptions(username) + table_mode = subs.get("table_mode", "all") + table_subs = subs.get("tables", {}) + for cat in catalog_data: + for table in cat.get("tables", []): + if table_mode == "all": + table["subscribed"] = True + else: + table["subscribed"] = table_subs.get(table["name"], False) + # Gather account widget details (notification scripts, cron, last sync) account_details = get_account_details(username) if user_info.exists else None @@ -432,6 +444,18 @@ def register_routes(app: Flask) -> None: data_stats = _load_data_stats() catalog_data = _load_catalog_data() sync_settings = get_sync_settings(username) + + # Add subscription status to catalog tables + subs = get_table_subscriptions(username) + table_mode = subs.get("table_mode", "all") + table_subs = subs.get("tables", {}) + for cat in catalog_data: + for table in cat.get("tables", []): + if table_mode == "all": + table["subscribed"] = True + else: + table["subscribed"] = table_subs.get(table["name"], False) + return render_template( "catalog.html", data_stats=data_stats, @@ -686,6 +710,37 @@ def register_routes(app: Flask) -> None: return jsonify({"ok": True, "message": message}) return jsonify({"error": message}), 400 + @app.route("/api/table-subscriptions") + @login_required + def table_subscriptions_get(): + """Get per-table subscriptions for current user.""" + user = session.get("user", {}) + email = user.get("email", "") + username = get_username_from_email(email) + subs = get_table_subscriptions(username) + return jsonify(subs) + + @app.route("/api/table-subscriptions", methods=["POST"]) + @login_required + def table_subscriptions_update(): + """Update per-table subscriptions for current user.""" + user = session.get("user", {}) + email = user.get("email", "") + username = get_username_from_email(email) + + data = request.get_json(silent=True) or {} + table_mode = data.get("table_mode", "all") + tables = data.get("tables", {}) + + if table_mode not in ("all", "explicit"): + return jsonify({"error": "table_mode must be 'all' or 'explicit'"}), 400 + + success, message = update_table_subscriptions(username, table_mode, tables) + if success: + logger.info(f"Table subscriptions updated for {username}") + return jsonify({"ok": True, "message": message}) + return jsonify({"error": message}), 400 + # ───────────────────────────────────────────────────────────────── # Corporate Memory routes # ───────────────────────────────────────────────────────────────── @@ -809,6 +864,215 @@ def register_routes(app: Flask) -> None: votes = get_user_votes(username) return jsonify({"votes": votes}) + # ───────────────────────────────────────────────────────────────── + # Admin pages + # ───────────────────────────────────────────────────────────────── + + @app.route("/admin/tables") + @login_required + @admin_required + def admin_tables(): + """Admin table management page.""" + return render_template("admin_tables.html") + + # ───────────────────────────────────────────────────────────────── + # Admin API routes + # ───────────────────────────────────────────────────────────────── + + @app.route("/api/admin/discover-tables") + @login_required + @admin_required + def admin_discover_tables(): + """Discover all available tables from the data source.""" + try: + from src.data_sync import create_data_source + + ds = create_data_source() + raw_tables = ds.discover_tables() + + # Check which tables are already registered + registered_ids = set() + try: + from src.table_registry import TableRegistry + registry = TableRegistry.default() + registered_ids = {t["id"] for t in registry.list_tables()} + except Exception: + pass + + # Group by bucket + buckets: dict = {} + for t in raw_tables: + bid = t.get("bucket_id", "other") + if bid not in buckets: + buckets[bid] = { + "bucket_id": bid, + "bucket_name": t.get("bucket_name", bid), + "tables": [], + } + t["is_registered"] = t["id"] in registered_ids + buckets[bid]["tables"].append(t) + + return jsonify({ + "ok": True, + "total": len(raw_tables), + "buckets": list(buckets.values()), + }) + + except Exception as e: + logger.error(f"Discovery failed: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/admin/registry") + @login_required + @admin_required + def admin_registry_list(): + """Return the full table registry.""" + try: + from src.table_registry import TableRegistry + + registry = TableRegistry.default() + return jsonify({ + "ok": True, + "version": registry.version, + "folder_mapping": registry.get_folder_mapping(), + "tables": registry.list_tables(), + }) + except Exception as e: + logger.error(f"Registry list failed: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/admin/register-table", methods=["POST"]) + @login_required + @admin_required + def admin_register_table(): + """Register a new table from discovery results.""" + from src.table_registry import ConflictError, TableRegistry + + user = session.get("user", {}) + email = user.get("email", "") + + data = request.get_json(silent=True) or {} + if not data.get("id"): + return jsonify({"error": "Missing table 'id'"}), 400 + + try: + registry = TableRegistry.default() + registry.register_table( + table_def=data, + registered_by=email, + expected_version=data.get("version"), + ) + + # Regenerate data_description.md + docs_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md" + registry.generate_data_description_md(docs_path.resolve()) + + return jsonify({"ok": True, "version": registry.version}) + + except ConflictError as e: + return jsonify({"error": str(e)}), 409 + except ValueError as e: + return jsonify({"error": str(e)}), 400 + except Exception as e: + logger.error(f"Register table failed: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/admin/registry/", methods=["PUT"]) + @login_required + @admin_required + def admin_update_table(table_id): + """Update configuration of a registered table.""" + from src.table_registry import ConflictError, TableRegistry + + user = session.get("user", {}) + email = user.get("email", "") + + data = request.get_json(silent=True) or {} + + try: + registry = TableRegistry.default() + registry.update_table( + table_id=table_id, + updates=data, + updated_by=email, + expected_version=data.pop("version", None), + ) + + # Regenerate data_description.md + docs_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md" + registry.generate_data_description_md(docs_path.resolve()) + + return jsonify({"ok": True, "version": registry.version}) + + except ConflictError as e: + return jsonify({"error": str(e)}), 409 + except ValueError as e: + return jsonify({"error": str(e)}), 400 + except Exception as e: + logger.error(f"Update table failed: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/admin/registry/", methods=["DELETE"]) + @login_required + @admin_required + def admin_unregister_table(table_id): + """Unregister a table and clean up subscriptions.""" + from src.table_registry import ConflictError, TableRegistry + + user = session.get("user", {}) + email = user.get("email", "") + + data = request.get_json(silent=True) or {} + + try: + registry = TableRegistry.default() + + # Get table name before deletion (for subscription cleanup) + table_info = registry.get_table(table_id) + table_name = table_info["name"] if table_info else None + + registry.unregister_table( + table_id=table_id, + unregistered_by=email, + expected_version=data.get("version"), + ) + + # Clean up per-user subscriptions for removed table + if table_name: + try: + _cleanup_table_subscriptions(table_name) + except Exception as ce: + logger.warning(f"Subscription cleanup for {table_name} failed: {ce}") + + # Regenerate data_description.md + docs_path = Path(os.path.dirname(__file__)) / ".." / "docs" / "data_description.md" + registry.generate_data_description_md(docs_path.resolve()) + + return jsonify({"ok": True, "version": registry.version}) + + except ConflictError as e: + return jsonify({"error": str(e)}), 409 + except ValueError as e: + return jsonify({"error": str(e)}), 400 + except Exception as e: + logger.error(f"Unregister table failed: {e}") + return jsonify({"error": str(e)}), 500 + + def _cleanup_table_subscriptions(table_name: str) -> None: + """Remove a table from all users' per-table subscriptions.""" + from webapp.sync_settings_service import _read_json, _write_json, SYNC_SETTINGS_FILE + + all_settings = _read_json(SYNC_SETTINGS_FILE) + changed = False + for username, user_data in all_settings.items(): + tables = user_data.get("tables", {}) + if table_name in tables: + del tables[table_name] + changed = True + if changed: + _write_json(SYNC_SETTINGS_FILE, all_settings) + logger.info(f"Cleaned up subscriptions for removed table: {table_name}") + @app.route("/health") def health(): """ diff --git a/webapp/auth.py b/webapp/auth.py index f81a9de..6b16e1b 100644 --- a/webapp/auth.py +++ b/webapp/auth.py @@ -13,7 +13,7 @@ Auth provider-specific logic lives in auth//provider.py. import functools import logging -from flask import Blueprint, flash, redirect, render_template, session, url_for +from flask import Blueprint, flash, jsonify, redirect, render_template, request, session, url_for from .config import Config @@ -34,6 +34,37 @@ def login_required(f): return decorated_function +def admin_required(f): + """Decorator to require admin privileges for a route. + + Recomputes admin status server-side on every request. + Returns 403 JSON for API routes, redirect for HTML routes. + """ + + @functools.wraps(f) + def decorated_function(*args, **kwargs): + if "user" not in session: + if request.path.startswith("/api/"): + return jsonify({"error": "Authentication required"}), 401 + return redirect(url_for("auth.login")) + + from .user_service import check_user_exists, get_username_from_email + + email = session.get("user", {}).get("email", "") + username = get_username_from_email(email) + user_info = check_user_exists(username) + + if not user_info.is_admin: + if request.path.startswith("/api/"): + return jsonify({"error": "Admin access required"}), 403 + flash("Admin access required.", "error") + return redirect(url_for("dashboard")) + + return f(*args, **kwargs) + + return decorated_function + + def validate_email_domain(email: str) -> bool: """Check if email belongs to allowed domain or whitelist. diff --git a/webapp/sync_settings_service.py b/webapp/sync_settings_service.py index a4f4f8f..764db1b 100644 --- a/webapp/sync_settings_service.py +++ b/webapp/sync_settings_service.py @@ -111,17 +111,24 @@ def update_sync_settings(username: str, settings: dict) -> tuple[bool, str]: if requires and existing.get(key) and not existing.get(requires): return False, f"{key} requires {requires} to be enabled" + # Preserve existing table subscription settings + existing_user = all_settings.get(username, {}) + table_mode = existing_user.get("table_mode", "all") + table_settings = existing_user.get("tables", {}) + # Update user's settings all_settings[username] = { "datasets": existing, + "table_mode": table_mode, + "tables": table_settings, "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } # Write back _write_json(SYNC_SETTINGS_FILE, all_settings) - # Regenerate user's config file - success = _regenerate_user_config(username, existing) + # Regenerate user's config file (with table settings) + success = _regenerate_user_config(username, existing, table_mode, table_settings) if not success: logger.warning(f"Failed to regenerate config for {username}") # Don't fail - settings are saved, just config generation failed @@ -130,13 +137,13 @@ def update_sync_settings(username: str, settings: dict) -> tuple[bool, str]: return True, "Settings saved. Changes take effect on next sync." -def _regenerate_user_config(username: str, settings: dict) -> bool: - """Regenerate ~/.sync_settings.yaml for a user on the server. +def _regenerate_user_config(username: str, settings: dict, table_mode: str = "all", table_settings: dict | None = None) -> bool: + """Regenerate ~/.sync_settings.yaml and ~/.sync_rsync_filter for a user on the server. Returns True on success, False on failure. """ # Generate YAML content - yaml_content = generate_user_config_yaml(settings) + yaml_content = generate_user_config_yaml(settings, table_mode, table_settings) # Write to user's home directory on server user_config_path = f"/home/{username}/.sync_settings.yaml" @@ -163,6 +170,12 @@ def _regenerate_user_config(username: str, settings: dict) -> bool: logger.error(f"Failed to install config for {username}: {result.stderr}") return False + # Generate and write rsync filter file + filter_ok = _write_rsync_filter(username, settings, table_mode, table_settings or {}) + if not filter_ok: + logger.warning(f"Failed to write rsync filter for {username}") + # Don't fail overall - YAML config was written successfully + return True except subprocess.TimeoutExpired: @@ -173,11 +186,186 @@ def _regenerate_user_config(username: str, settings: dict) -> bool: return False -def generate_user_config_yaml(settings: dict) -> str: +def _write_rsync_filter(username: str, dataset_settings: dict, table_mode: str, table_settings: dict) -> bool: + """Write ~/.sync_rsync_filter for a user on the server. + + Returns True on success, False on failure. + """ + # Load folder_mapping from table registry (or instance config as fallback) + folder_mapping = {} + try: + from src.table_registry import TableRegistry + registry = TableRegistry.default() + folder_mapping = registry.get_folder_mapping() + except Exception: + try: + from config.loader import load_instance_config, get_instance_value + config = load_instance_config() + folder_mapping = get_instance_value(config, "folder_mapping", default={}) + except Exception: + pass + + # Generate filter content + filter_content = generate_rsync_filter(dataset_settings, table_mode, table_settings, folder_mapping) + + user_filter_path = f"/home/{username}/.sync_rsync_filter" + + try: + # Write filter to temp file, then install to user's home + # IMPORTANT: Must use /tmp/ explicitly - sudoers rule restrictions + with tempfile.NamedTemporaryFile(mode="w", suffix=".filter", delete=False, dir="/tmp") as f: + f.write(filter_content) + tmp_path = f.name + + result = subprocess.run( + ["/usr/bin/sudo", "-n", "/usr/bin/install", "-o", username, "-g", username, "-m", "644", tmp_path, user_filter_path], + capture_output=True, + text=True, + timeout=10, + ) + + os.unlink(tmp_path) + + if result.returncode != 0: + logger.error(f"Failed to install rsync filter for {username}: {result.stderr}") + return False + + return True + + except subprocess.TimeoutExpired: + logger.error(f"Timeout installing rsync filter for {username}") + return False + except Exception as e: + logger.error(f"Error installing rsync filter for {username}: {e}") + return False + + +def get_table_subscriptions(username: str) -> dict: + """Get per-table subscription settings for a user. + + Returns: + {"table_mode": "all"|"explicit", "tables": {"name": bool, ...}} + """ + all_settings = _read_json(SYNC_SETTINGS_FILE) + user_settings = all_settings.get(username, {}) + + return { + "table_mode": user_settings.get("table_mode", "all"), + "tables": user_settings.get("tables", {}), + } + + +def update_table_subscriptions(username: str, table_mode: str, table_settings: dict) -> tuple[bool, str]: + """Update per-table subscriptions for a user. + + Args: + username: The username + table_mode: "all" or "explicit" + table_settings: Dict with table names as keys and bool as values + + Returns: + (success, message) + """ + # Validate table_mode + if table_mode not in ("all", "explicit"): + return False, f"Invalid table_mode: {table_mode}. Must be 'all' or 'explicit'" + + # Validate table_settings values + for key, value in table_settings.items(): + if not isinstance(value, bool): + return False, f"Invalid value for table '{key}': must be boolean" + + # Read current settings and update + all_settings = _read_json(SYNC_SETTINGS_FILE) + if username not in all_settings: + all_settings[username] = {} + + all_settings[username]["table_mode"] = table_mode + all_settings[username]["tables"] = table_settings + all_settings[username]["updated_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + # Write back + _write_json(SYNC_SETTINGS_FILE, all_settings) + + # Regenerate user's config file (with dataset + table settings) + dataset_settings = all_settings[username].get("datasets", dict(DEFAULT_SETTINGS)) + success = _regenerate_user_config(username, dataset_settings, table_mode, table_settings) + if not success: + logger.warning(f"Failed to regenerate config for {username}") + + logger.info(f"Updated table subscriptions for '{username}': mode={table_mode}, tables={table_settings}") + return True, "Table subscriptions saved. Changes take effect on next sync." + + +def generate_rsync_filter(dataset_settings: dict, table_mode: str, table_settings: dict, folder_mapping: dict) -> str: + """Generate rsync filter file content for per-table sync. + + Args: + dataset_settings: {"jira": True, ...} + table_mode: "all" or "explicit" + table_settings: {"company": True, "events": False, ...} + folder_mapping: {"in.c-crm": "crm", ...} from registry/config + + Returns: + Rsync filter file content string. + """ + if table_mode == "all": + # No filtering needed - include everything + lines = [ + "# AUTO-GENERATED rsync filter for per-table sync", + "# table_mode: all", + "", + "# No filtering - all tables included", + ] + return "\n".join(lines) + "\n" + + lines = [ + "# AUTO-GENERATED rsync filter for per-table sync", + "# table_mode: explicit", + "", + ] + + # Build reverse mapping: table_name -> folder + # We need to know which folder each table lives in + # folder_mapping is bucket_id -> folder_name + # We'll collect all unique folders + folders_used = set(folder_mapping.values()) if folder_mapping else set() + + # Subscribed tables + subscribed = {name for name, enabled in table_settings.items() if enabled} + unsubscribed = {name for name, enabled in table_settings.items() if not enabled} + + if subscribed: + lines.append("# Subscribed tables") + for name in sorted(subscribed): + # Include parquet file and partitioned directory + lines.append(f"+ **/{name}.parquet") + lines.append(f"+ **/{name}/***") + lines.append("") + + if unsubscribed: + lines.append("# Excluded tables") + for name in sorted(unsubscribed): + lines.append(f"- **/{name}.parquet") + lines.append(f"- **/{name}/***") + lines.append("") + + # Include folder structure but exclude unknown files + lines.append("# Include folder structure") + lines.append("+ */") + lines.append("- *") + lines.append("") + + return "\n".join(lines) + + +def generate_user_config_yaml(settings: dict, table_mode: str = "all", table_settings: dict | None = None) -> str: """Generate YAML content for sync config. Args: settings: Dict with dataset names and enabled status + table_mode: "all" or "explicit" (default "all") + table_settings: Dict with table names and subscription status (optional) Returns: YAML string content @@ -193,5 +381,18 @@ def generate_user_config_yaml(settings: dict) -> str: value = "true" if enabled else "false" lines.append(f" {dataset}: {value}") + lines.append("") + + # Per-table subscriptions + lines.append(f"table_mode: {table_mode}") + + if table_settings: + lines.append("tables:") + for table_name, subscribed in sorted(table_settings.items()): + value = "true" if subscribed else "false" + lines.append(f" {table_name}: {value}") + else: + lines.append("tables: {}") + lines.append("") return "\n".join(lines) diff --git a/webapp/templates/admin_tables.html b/webapp/templates/admin_tables.html new file mode 100644 index 0000000..5cf14a8 --- /dev/null +++ b/webapp/templates/admin_tables.html @@ -0,0 +1,1336 @@ + + + + + + Table Management - {{ config.INSTANCE_NAME }} + + + + + + + + +
+
+ + + + + +
+ + Table Management +
+
+
+ Admin +
+
+ + +
+

Table Management

+

Discover, register, and manage data tables from your source

+
+ + +
+ + +
+
+
+
+ + + + +
+
+
Discover Tables
+
Scan your data source for available tables
+
+
+ +
+
+
+ Click "Discover tables from source" to scan for available tables +
+
+
+ + +
+
+
+
+ + + + + + + +
+
+
Registered Tables
+
Tables currently configured for sync
+
+
+ +
+
+
+
+ Loading registry... +
+
+
+ +
+ + + + + + + + +
+
+ +
+ + + + + + +