From f3bd378b4743b10249d8638f80b842f174af30bc Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Thu, 9 Apr 2026 17:14:06 +0200 Subject: [PATCH] chore: remove 17 dead files from v1 architecture Removes unused scripts (collect_session, generate_user_sync_configs, standalone_profiler, remote_query, update, setup_views, test_sync, activate_venv, backfill_gap, sync_config_template), legacy config (data_description.md.example), llms.txt, completed planning docs (plan-rsync-fix, plan_parquet_types_fix, plan-corporate-memory), and notification examples/ directory. --- config/data_description.md.example | 164 --- dev_docs/plan-corporate-memory.md | 348 ------ dev_docs/plan-rsync-fix.md | 137 --- dev_docs/plan_parquet_types_fix.md | 139 --- examples/notifications/data_freshness.py | 62 -- examples/notifications/metric_report.py | 125 --- examples/notifications/revenue_drop.py | 83 -- llms.txt | 47 - scripts/activate_venv.sh | 28 - scripts/backfill_gap.sh | 109 -- scripts/collect_session.py | 69 -- scripts/generate_user_sync_configs.py | 162 --- scripts/remote_query.sh | 41 - scripts/setup_views.sh | 126 --- scripts/standalone_profiler.py | 1271 ---------------------- scripts/sync_config_template.yaml | 23 - scripts/test_sync.sh | 138 --- scripts/update.sh | 71 -- 18 files changed, 3143 deletions(-) delete mode 100644 config/data_description.md.example delete mode 100644 dev_docs/plan-corporate-memory.md delete mode 100644 dev_docs/plan-rsync-fix.md delete mode 100644 dev_docs/plan_parquet_types_fix.md delete mode 100644 examples/notifications/data_freshness.py delete mode 100644 examples/notifications/metric_report.py delete mode 100644 examples/notifications/revenue_drop.py delete mode 100644 llms.txt delete mode 100644 scripts/activate_venv.sh delete mode 100755 scripts/backfill_gap.sh delete mode 100644 scripts/collect_session.py delete mode 100755 scripts/generate_user_sync_configs.py delete mode 100644 scripts/remote_query.sh delete mode 100644 scripts/setup_views.sh delete mode 100644 scripts/standalone_profiler.py delete mode 100644 scripts/sync_config_template.yaml delete mode 100755 scripts/test_sync.sh delete mode 100755 scripts/update.sh diff --git a/config/data_description.md.example b/config/data_description.md.example deleted file mode 100644 index 033c25c..0000000 --- a/config/data_description.md.example +++ /dev/null @@ -1,164 +0,0 @@ -# Data Description - -This file defines the tables available for synchronization and analysis. -Copy this file to `data_description.md` and customize for your data sources. - -## Tables - -```yaml -# Folder mapping: data source bucket -> local folder name -folder_mapping: - "in.c-example": "example" - -tables: - # Small reference table - full refresh, no automatic sync - - id: "in.c-example.customers" - name: "customers" - description: "Customer master data" - primary_key: "id" - sync_strategy: "full_refresh" - - # Large transactional table - daily automatic sync with profiling - - id: "in.c-example.orders" - name: "orders" - description: "Order transactions with line items" - primary_key: "id" - sync_strategy: "partitioned" - partition_by: "created_at" - partition_column_type: "DATE" - partition_granularity: "day" - incremental_window_days: 3 - max_history_days: 450 - query_mode: "local" - sync_schedule: "daily 05:00" - profile_after_sync: true - - # Frequently updated table - sync every hour, skip profiling - - id: "in.c-example.events" - name: "events" - description: "Real-time event stream" - primary_key: "event_id" - sync_strategy: "partitioned" - partition_by: "event_date" - partition_column_type: "DATE" - partition_granularity: "day" - incremental_window_days: 1 - query_mode: "local" - sync_schedule: "every 1h" - profile_after_sync: false - - # Remote table - too large for local sync, queried via BigQuery - - id: "project.dataset.page_views" - name: "page_views" - description: "Page-level traffic metrics (~5M rows/day)" - primary_key: "page_view_id" - sync_strategy: "partitioned" - partition_by: "event_date" - partition_column_type: "DATE" - partition_granularity: "day" - query_mode: "remote" - bq_entity_type: "view" -``` - -## Table Configuration Reference - -### Required Fields - -| Field | Description | Example | -|-------|-------------|---------| -| `id` | Full table identifier in data source | `"in.c-crm.company"` | -| `name` | Short name (used for Parquet filenames) | `"company"` | -| `description` | Human-readable description | `"Company master data"` | -| `primary_key` | Primary key column(s), comma-separated | `"id"` or `"order_id, line_id"` | -| `sync_strategy` | How data is downloaded (see below) | `"full_refresh"` | - -### Sync Strategy - -| Strategy | Description | Use for | -|----------|-------------|---------| -| `full_refresh` | Downloads entire table each sync | Small reference tables (< 100K rows) | -| `incremental` | Downloads changed rows via changedSince | Medium tables with update tracking | -| `partitioned` | Downloads by time partitions, overwrites only recent ones | Large tables with date column | - -### Partitioning - -| Field | Default | Description | -|-------|---------|-------------| -| `partition_by` | *(none)* | Column to partition by (e.g., `"created_at"`, `"event_date"`) | -| `partition_granularity` | `"month"` | `"day"`, `"month"`, or `"year"` | -| `partition_column_type` | `"TIMESTAMP"` | SQL type: `"DATE"`, `"TIMESTAMP"`, or `"DATETIME"` | -| `incremental_window_days` | `7` | How many recent days to re-download on each sync | -| `max_history_days` | *(all)* | Maximum history to keep (e.g., `450` for ~15 months) | -| `initial_load_chunk_days` | `30` | Chunk size for first-time download | - -### Query Mode - -| Field | Default | Description | -|-------|---------|-------------| -| `query_mode` | `"local"` | How the AI agent queries this table | -| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) | - -| Mode | Description | Best for | -|------|-------------|----------| -| `local` | Synced to Parquet, queried via DuckDB | Tables < 2 GB, fast queries | -| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data | -| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data | - -### Remote Table Metadata - -Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent -write safe, efficient queries: - -| Field | Description | -|-------|-------------| -| `grain` | Row granularity description | -| `volume` | Size estimates (rows_per_day, unique entities) | -| `columns` | Column descriptions with value distributions | -| `dimension_profile` | Cardinality per dimension | -| `query_result_estimates` | Expected rows after GROUP BY combinations | -| `join_keys` | How to join with other tables | - -### Automatic Sync Schedule - -| Field | Default | Description | -|-------|---------|-------------| -| `sync_schedule` | *(none)* | When to automatically sync this table | -| `profile_after_sync` | `true` | Run data profiler after sync completes | - -The `sync_schedule` field controls automatic synchronization via the `data-refresh` -systemd timer (runs every 15 minutes). If omitted, the table is only synced manually. - -**Schedule formats:** - -| Format | Example | Description | -|--------|---------|-------------| -| `every {N}m` | `"every 15m"`, `"every 30m"` | Sync every N minutes | -| `every {N}h` | `"every 1h"`, `"every 6h"` | Sync every N hours | -| `daily HH:MM` | `"daily 05:00"`, `"daily 17:30"` | Sync once per day at HH:MM UTC | -| *(omitted)* | - | Manual sync only (`python -m src.data_sync`) | - -**How scheduling works:** -- A systemd timer runs `python -m src.data_sync --scheduled` every 15 minutes -- For each table with `sync_schedule`, it checks the last sync time from `sync_state.json` -- `every` schedules: syncs if enough time has elapsed since last sync -- `daily` schedules: syncs once after the target time passes (skips if already synced today) -- Tables without `sync_schedule` are never synced automatically - -**Profiling control:** -- `profile_after_sync: true` (default) - runs profiler after sync to update column statistics -- `profile_after_sync: false` - skips profiler (use for frequently synced tables where - profiling overhead is not worth it; the AI agent uses slightly older statistics) -- When profiling runs, the webapp is automatically restarted to load new statistics - -### Optional Fields - -| Field | Default | Description | -|-------|---------|-------------| -| `folder` | *(from folder_mapping)* | Override output folder name | -| `row_filter` | *(none)* | SQL WHERE clause (e.g., `"date >= DATE_SUB(CURRENT_DATE(), INTERVAL 15 MONTH)"`) | -| `columns` | *(all)* | List of columns to sync (subset) | -| `incremental_column` | *(none)* | Column for timestamp-based incremental sync (BigQuery) | -| `dataset` | *(none)* | Dataset group name for on-demand tables | -| `catalog_fqn` | *(auto)* | OpenMetadata FQN override (auto-derived from table ID if not set) | -| `foreign_keys` | `[]` | List of foreign key relationships | -| `where_filters` | `[]` | List of filters for Keboola Storage API | diff --git a/dev_docs/plan-corporate-memory.md b/dev_docs/plan-corporate-memory.md deleted file mode 100644 index 4eb65f8..0000000 --- a/dev_docs/plan-corporate-memory.md +++ /dev/null @@ -1,348 +0,0 @@ -# Corporate Memory Module - Implementation Plan - -## Overview - -Server-side modul, který každých 30 minut sbírá znalosti z `CLAUDE.local.md` všech uživatelů, pomocí Claude HAIKU je extrahuje a filtruje, a vytváří sdílenou firemní knowledge base. Uživatelé mohou hlasovat a oblíbené znalosti se jim syncují do `.claude/rules/`. - -## Architecture - -``` -┌─────────────────────────────────────────────────────────────────┐ -│ Cron (*/30 * * * *) │ -│ └── /usr/local/bin/collect-knowledge │ -│ 1. Čte /home/*/CLAUDE.local.md │ -│ 2. Volá Claude HAIKU pro extrakci (AI filtering) │ -│ 3. Ukládá do /data/corporate-memory/knowledge.json │ -└─────────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────────┐ -│ /data/corporate-memory/ (deploy:data-ops 2770)│ -│ ├── knowledge.json # {items: {id: {title, content, ...}}} │ -│ ├── votes.json # {username: {item_id: 1|-1}} │ -│ ├── user_hashes.json # {username: {file_hash, last_proc}} │ -│ └── collection.log # Logy z HAIKU procesingu │ -└─────────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────────┐ -│ Webapp │ -│ ├── Dashboard widget: stats (contributors, items, your rules) │ -│ ├── /corporate-memory: sub-page s 👍/👎 hlasováním │ -│ └── API: /api/corporate-memory/* │ -└─────────────────────────────────────────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────────────────────────┐ -│ sync_data.sh │ -│ └── Stáhne upvoted items do .claude/rules/*.md │ -└─────────────────────────────────────────────────────────────────┘ -``` - -## Data Model - -**knowledge.json:** -```json -{ - "items": { - "km_abc123": { - "id": "km_abc123", - "title": "DuckDB query optimization", - "content": "When querying large parquet files...", - "category": "data_analysis", - "tags": ["duckdb", "performance"], - "source_users": ["john.doe", "jane.smith", "bob"], - "votes_up": 5, - "votes_down": 1, - "score": 4, - "extracted_at": "2026-02-05T10:30:00Z", - "updated_at": "2026-02-05T12:00:00Z" - } - }, - "metadata": { - "last_collection": "2026-02-05T10:30:00Z", - "total_users": 8 - } -} -``` - -**votes.json** (kdo jak hlasoval - pro UI "already voted"): -```json -{ - "john.doe": {"km_abc123": 1, "km_def456": -1}, - "jane.smith": {"km_abc123": 1} -} -``` - -### Voting & Popularity - -- `votes_up` / `votes_down` - počet palců nahoru/dolů (viditelné v UI) -- `score` = votes_up - votes_down (pro řazení) -- votes.json trackuje KDO hlasoval (pro zabránění dvojího hlasování) -- UI NEZOBRAZUJE jména hlasujících, jen počty - -**Řazení v UI:** -- **Nejoblíbenější** (Most Popular): ORDER BY score DESC -- **Nejméně oblíbené** (Least Popular): ORDER BY score ASC -- **Nejnovější** (Recent): ORDER BY extracted_at DESC -- **Nejvíc přispěvatelů** (Most Contributors): ORDER BY len(source_users) DESC - -**user_hashes.json** (tracking změn CLAUDE.local.md): -```json -{ - "john.doe": { - "file_hash": "a1b2c3d4e5f6...", - "last_processed": "2026-02-05T10:30:00Z" - }, - "jane.smith": { - "file_hash": "f6e5d4c3b2a1...", - "last_processed": "2026-02-05T09:00:00Z" - } -} -``` - -## Change Detection & Deduplication - -### Change Detection (šetření tokenů) - -**DŮLEŽITÉ**: Každých 30 minut se zpracovávají POUZE změněné soubory! - -``` -┌─────────────────────────────────────────────────────────────────┐ -│ Cron běží každých 30 min │ -│ │ -│ Pro každého uživatele: │ -│ 1. Spočítej MD5 hash /home/$user/CLAUDE.local.md │ -│ 2. Porovnej s uloženým hashem v user_hashes.json │ -│ 3. Pokud STEJNÝ → PŘESKOČ (žádné API volání) │ -│ 4. Pokud ZMĚNĚNÝ → zpracuj pomocí HAIKU API │ -│ │ -│ Typicky: 8 uživatelů, 1-2 změny denně = 2-4 API volání/den │ -└─────────────────────────────────────────────────────────────────┘ -``` - -```python -def collect_all(): - users_processed = 0 - users_skipped = 0 - - for user_dir in Path("/home").iterdir(): - claude_file = user_dir / "CLAUDE.local.md" - if not claude_file.exists(): - continue - - username = user_dir.name - current_hash = hashlib.md5(claude_file.read_bytes()).hexdigest() - stored_hash = user_hashes.get(username, {}).get("file_hash") - - if current_hash == stored_hash: - users_skipped += 1 - continue # ← PŘESKOČÍ - žádné API volání! - - # Pouze změněné soubory volají HAIKU - new_knowledge = extract_knowledge(claude_file.read_text()) - deduplicate_and_merge(new_knowledge, username) - update_user_hash(username, current_hash) - users_processed += 1 - - log(f"Processed: {users_processed}, Skipped: {users_skipped}") -``` - -### Deduplikace znalostí - -Při extrakci nových znalostí od změněného uživatele: -1. HAIKU vrátí seznam znalostí -2. Pro každou novou znalost: - - Hledej podobnou v CELÉ knowledge base (všichni uživatelé) - - Pokud existuje podobná → přidej uživatele do `source_users[]` - - Pokud neexistuje → vytvoř novou - -```python -def deduplicate_and_merge(new_items: list, username: str): - """Deduplikuje nové znalosti proti celé knowledge base.""" - for new_item in new_items: - similar_id = find_similar_knowledge(new_item, knowledge_base) - - if similar_id: - # Existující znalost - přidej uživatele - knowledge_base["items"][similar_id]["source_users"].append(username) - knowledge_base["items"][similar_id]["updated_at"] = now() - else: - # Nová znalost - item_id = generate_id() - knowledge_base["items"][item_id] = { - **new_item, - "id": item_id, - "source_users": [username], - "extracted_at": now() - } -``` - -### Příklad deduplikace - -5 uživatelů má podobnou znalost o DuckDB: -- User A: "Pro rychlé dotazy v DuckDB použij WHERE před JOIN" -- User B: "DuckDB je rychlejší když filtruješ před joinem" -- User C: "Filtruj data před JOIN operací v DuckDB" -- User D: "WHERE klauzule před JOIN zrychlí DuckDB" -- User E: "V DuckDB dej WHERE před JOIN" - -→ HAIKU rozpozná jako JEDNU znalost → `source_users: ["user_a", "user_b", "user_c", "user_d", "user_e"]` - -## Files to Create - -### 1. Server-side Collector - -**`services/corporate_memory/collector.py`** - Main collection logic: -- `collect_all()` - iteruje přes /home/*/CLAUDE.local.md -- `should_process_user(username)` - kontrola hash změn (šetří tokeny) -- `extract_knowledge(content)` - volá HAIKU API -- `find_similar_knowledge(new, existing)` - hledá duplicity -- `merge_knowledge(existing, new, username)` - sloučí a přidá uživatele -- `save_knowledge(data)` - atomic JSON write -- `update_user_hash(username, hash)` - uloží hash pro příští run - -**`services/corporate_memory/prompts.py`** - HAIKU prompts: -- Prompt pro extrakci znalostí -- Prompt pro AI filtering citlivých dat -- Prompt pro merge podobných znalostí (optional) - -**`server/bin/collect-knowledge`** - Shell wrapper: -```bash -#!/bin/bash -cd /opt/data-analyst/repo -/opt/data-analyst/.venv/bin/python -m services.corporate_memory -``` - -**`services/corporate_memory/systemd/corporate-memory.timer`** + **`services/corporate_memory/systemd/corporate-memory.service`** - Systemd timer (30 min) - -### 2. Webapp Backend - -**`webapp/corporate_memory_service.py`** (pattern: telegram_service.py): -- `get_knowledge(filter, page)` - seznam znalostí -- `get_stats()` - pro dashboard widget -- `vote(username, item_id, vote)` - +1/-1 -- `get_user_rules(username)` - upvoted items pro sync -- `generate_user_rules_file(username)` - vytvoří JSON pro sync - -**`webapp/app.py`** - nové routes: -- `GET /corporate-memory` - sub-page -- `GET /api/corporate-memory/knowledge` - list items -- `GET /api/corporate-memory/stats` - dashboard stats -- `POST /api/corporate-memory/vote` - hlasování -- `GET /api/corporate-memory/my-rules` - user's upvoted - -### 3. Webapp Frontend - -**`webapp/templates/corporate_memory.html`** - sub-page: -- Header se stats -- Řazení: Most Popular | Least Popular | Recent | Most Contributors -- Seznam znalostí s filtry (category, search) -- Každá položka: - - Title + content preview - - Tags (category badges) - - 👍 (count) / 👎 (count) buttons - - "From X contributors" badge - - "Synced to your rules" indicator (if user upvoted) - -``` -┌─────────────────────────────────────────────────────────────────┐ -│ DuckDB query optimization [data_analysis] │ -│ When querying large parquet files, filter before JOIN... │ -│ │ -│ 👍 5 👎 1 | From 3 contributors | ✓ In your rules │ -└─────────────────────────────────────────────────────────────────┘ -``` - -**`webapp/templates/dashboard.html`** - přidat widget: -```html -
-

Corporate Memory

-
-
{{ stats.contributors }} contributors
-
{{ stats.knowledge_count }} knowledge items
-
{{ stats.your_rules }} your rules
-
- Browse Knowledge → -
-``` - -### 4. Client Sync - -Server generuje .md soubory přímo do `/home/$user/.claude_rules/` pro každého uživatele. -Klient si je jen stáhne. - -**`scripts/sync_data.sh`** - přidat: -```bash -# --- Sync corporate memory rules --- -echo "📚 Syncing corporate memory rules..." -mkdir -p .claude/rules -rsync -avz data-analyst:~/.claude_rules/ .claude/rules/ 2>/dev/null || \ - scp -r data-analyst:~/.claude_rules/* .claude/rules/ 2>/dev/null || true -``` - -**Server-side** (v `corporate_memory_service.py`): -- `generate_user_rules(username)` - při hlasování regeneruje .md soubory -- Zapisuje do `/home/$user/.claude_rules/km_*.md` -- Webapp volá po každém vote změně - -### 5. Deployment - -**`server/deploy.sh`** - additions: -- Vytvořit /data/corporate-memory/ (2770 setgid) -- Nainstalovat collect-knowledge do /usr/local/bin/ -- Deploy systemd timer -- Přidat ANTHROPIC_API_KEY do .env - -**`server/sudoers-deploy`** - přidat práva pro /data/corporate-memory/ - -**GitHub Secrets**: `ANTHROPIC_API_KEY` - -## Implementation Phases - -### Phase 1: Server Infrastructure -1. `services/corporate_memory/` Python modul -2. `server/bin/collect-knowledge` wrapper -3. `services/corporate_memory/systemd/corporate-memory.service` + `.timer` -4. Update `server/deploy.sh` -5. Update sudoers - -### Phase 2: Webapp Backend -1. `webapp/corporate_memory_service.py` -2. API endpoints v `webapp/app.py` -3. Update `webapp/config.py` (ANTHROPIC_API_KEY, paths) - -### Phase 3: Webapp Frontend -1. Dashboard widget v `dashboard.html` -2. Sub-page `corporate_memory.html` -3. CSS styles - -### Phase 4: Client Sync -1. Update `scripts/sync_data.sh` - přidat rsync pro `.claude_rules/` - -## Key Files to Modify - -| File | Changes | -|------|---------| -| `server/deploy.sh` | Add /data/corporate-memory/, ANTHROPIC_API_KEY, systemd timer | -| `server/sudoers-deploy` | Add permissions for corporate-memory dir | -| `webapp/app.py` | Add routes and API endpoints | -| `webapp/config.py` | Add ANTHROPIC_API_KEY, CORPORATE_MEMORY_DIR | -| `webapp/templates/dashboard.html` | Add Corporate Memory widget | -| `scripts/sync_data.sh` | Add corporate rules sync step | - -## Reusable Patterns (from codebase) - -- **JSON I/O**: `webapp/telegram_service.py` - atomic writes with tempfile -- **Dashboard widget**: `webapp/templates/dashboard.html` - KPI card pattern -- **Sub-page**: `webapp/templates/catalog.html` - route + template pattern -- **Systemd service**: `services/telegram_bot/systemd/notify-bot.service` - timer pattern -- **Deploy**: `server/deploy.sh` - directory setup, script installation - -## Verification - -1. **Collector**: `ssh kids "/usr/local/bin/collect-knowledge --dry-run"` -2. **API**: `curl https://your-instance.example.com/api/corporate-memory/stats` -3. **Widget**: Check dashboard shows stats -4. **Voting**: Click 👍/👎, verify votes.json updated -5. **Sync**: Run `sync_data.sh`, check `.claude/rules/` has .md files diff --git a/dev_docs/plan-rsync-fix.md b/dev_docs/plan-rsync-fix.md deleted file mode 100644 index 7f37e3b..0000000 --- a/dev_docs/plan-rsync-fix.md +++ /dev/null @@ -1,137 +0,0 @@ -# Plan: Fix rsync synchronization hang (#197) - -## Problem - -Rsync from GCP server (YOUR_SERVER_IP) hangs after 1-5 minutes. Process exists but has 0% CPU and no network activity. 100% reproducible with ~7000 parquet files. - -## Root Cause Analysis - -The rsync commands in `scripts/sync_data.sh` use plain `rsync -avz` without: - -1. **SSH keepalive** - No `-e "ssh -o ServerAliveInterval=60"` to keep the TCP connection alive through firewall/NAT -2. **I/O timeout** - No `--timeout=N` to detect stalled transfers -3. **Retry logic** - A single connection drop kills the entire sync -4. **Partial resume** - No `--partial-dir` so retries restart file transfers from scratch -5. **Compression skip** - `-z` compresses parquet files that are already snappy-compressed (CPU waste) - -A stateful firewall or NAT device (GCP Cloud NAT, VPC firewall, or analyst's home router) drops idle TCP connections. Server-side `ClientAliveInterval=300s` sends keepalive, but the client side sends nothing. The firewall sees idle traffic and silently drops the connection. Neither rsync nor SSH client detects the dead connection. - -Note: GCP VPC firewall default idle timeout is 600s (10 min). The 1-5 minute hang suggests a more aggressive timeout on Cloud NAT or analyst's local network. - -## Implementation Plan - -### Step 1: Create diagnostic tests (`tests/test_sync_data.py`) - -Real end-to-end tests that download data to gitignored `data/` folder with detailed logging to pinpoint exactly where and why the hang occurs. - -**Live diagnostic tests (require server access, marked `@pytest.mark.live`):** - -| Test | Purpose | -|------|---------| -| `test_ssh_connectivity` | Basic SSH connection test with timeout | -| `test_rsync_small_directory` | Sync docs (~few files) to verify baseline works | -| `test_rsync_with_keepalive` | Sync parquet with SSH keepalive options | -| `test_rsync_with_timeout` | Sync parquet with `--timeout=60` to detect hang quickly | -| `test_rsync_per_subdirectory` | Sync each parquet subdir separately to isolate which triggers the hang | -| `test_rsync_without_compression` | Sync parquet without `-z` to test if compression causes stall | - -Each test logs to `data/sync_diagnostics/` with timestamps, exit codes, bytes transferred, duration. - -**Static regression tests (run in CI without server access):** - -| Test | Purpose | -|------|---------| -| `test_rsync_commands_use_ssh_keepalive` | Every rsync in scripts has `ServerAliveInterval` | -| `test_rsync_commands_have_timeout` | Every rsync has `--timeout=N` | -| `test_parquet_rsync_does_not_compress` | Parquet rsync uses `-av` not `-avz` | -| `test_sync_scripts_have_retry_logic` | Scripts define retry wrapper | - -### Step 2: Run diagnostic tests and analyze results - -Before implementing any fix, run the live diagnostic tests to confirm the root cause: - -```bash -pytest tests/test_sync_data.py -v -k "live" --tb=short 2>&1 | tee data/sync_diagnostics/test_run.log -``` - -**Expected outcomes to validate:** -- `test_ssh_connectivity` passes → SSH connection itself is fine -- `test_rsync_small_directory` passes → small syncs work, problem is scale/duration-related -- `test_rsync_with_timeout` fails with timeout → confirms connection drop (rsync exits instead of hanging forever) -- `test_rsync_with_keepalive` passes → confirms keepalive fixes the issue -- `test_rsync_per_subdirectory` → identifies if specific directory triggers the hang or if it's purely time-based -- `test_rsync_without_compression` → reveals if `-z` contributes to the stall - -**Decision gate:** Based on results, confirm which combination of fixes is needed before proceeding to Step 3. If keepalive alone fixes it, the other changes are still applied as defense-in-depth. - -### Step 3: Fix `scripts/sync_data.sh` - -Add reliability wrapper after argument parsing: - -```bash -# --- Rsync reliability settings (Issue #197) --- -RSYNC_SSH_OPTS='ssh -o ServerAliveInterval=60 -o ServerAliveCountMax=3 -o ConnectTimeout=30' -RSYNC_TIMEOUT=300 -RSYNC_MAX_RETRIES=3 -RSYNC_RETRY_DELAY=5 - -rsync_reliable() { - local attempt=1 - local delay=$RSYNC_RETRY_DELAY - while [[ $attempt -le $RSYNC_MAX_RETRIES ]]; do - if rsync -e "$RSYNC_SSH_OPTS" --timeout="$RSYNC_TIMEOUT" \ - --partial-dir=.rsync-partial "$@"; then - return 0 - fi - local exit_code=$? - if [[ $attempt -lt $RSYNC_MAX_RETRIES ]]; then - echo " Rsync failed (exit $exit_code), retrying in ${delay}s ($attempt/$RSYNC_MAX_RETRIES)..." - sleep "$delay" - delay=$((delay * 2)) - fi - attempt=$((attempt + 1)) - done - echo " ERROR: Rsync failed after $RSYNC_MAX_RETRIES attempts" - return 1 -} -``` - -Replace all rsync invocations to use `rsync_reliable()` wrapper: -- Drop `-z` for parquet transfers (already snappy-compressed) -- Keep `-z` for text transfers (docs, scripts, metadata) -- `--partial-dir=.rsync-partial` enables resume on retry (Gemini review recommendation) - -### Step 4: Fix `scripts/sync_jira.sh` - -Same `rsync_reliable()` wrapper and fixes. - -### Step 5: Update CI - -Add `tests/test_sync_data.py` to `.github/workflows/deploy-guard.yml` (static regression tests only, live diagnostics excluded via marker). - -### Step 6: Verify fix end-to-end - -Run full sync and confirm no hanging: -```bash -bash scripts/sync_data.sh --dry-run # script syntax OK -bash scripts/sync_data.sh # full sync completes -``` - -## Files to Modify - -| File | Action | Description | -|------|--------|-------------| -| `tests/test_sync_data.py` | CREATE | Diagnostic + regression tests | -| `scripts/sync_data.sh` | EDIT | Add reliability wrapper, fix all rsync calls | -| `scripts/sync_jira.sh` | EDIT | Same fixes | -| `.github/workflows/deploy-guard.yml` | EDIT | Add static tests to CI | - -## Review Notes - -Plan reviewed by Google Gemini (2026-02-17). Key feedback incorporated: -- **`--partial-dir=.rsync-partial`** added to wrapper for efficient retry resume -- **Root cause confirmed** as most likely firewall/NAT idle timeout (textbook signature) -- **`ServerAliveInterval=60`** confirmed as correct value (low overhead, safe interval) -- **`--timeout=300`** confirmed as reasonable (long enough to avoid false positives) -- **Drop `-z`** confirmed correct (compressing snappy data wastes CPU) -- **Future optimization**: `--files-from` approach for large file sets if needed diff --git a/dev_docs/plan_parquet_types_fix.md b/dev_docs/plan_parquet_types_fix.md deleted file mode 100644 index bf324be..0000000 --- a/dev_docs/plan_parquet_types_fix.md +++ /dev/null @@ -1,139 +0,0 @@ -# Plan: Fix Parquet Type Preservation & DuckDB Schema Mismatch - -## Context - -Issues #185, #186, #187 report that DuckDB views fail for partitioned tables when parquet files have schema mismatches. The root cause is twofold: -1. **DuckDB** reads partitioned parquet files without `union_by_name=true`, so if a column is `null` type in one partition and `VARCHAR` in another, it crashes. -2. **Parquet writing** uses `pa.Table.from_pandas()` without explicit schema — columns with all NULLs get inferred as `null` type instead of proper type (STRING, DATE, etc.). - -**Note:** #187 is a duplicate of #186. - -## Changes - -### 1. DuckDB fix — `scripts/duckdb_manager.py` (line 235) - -Add `union_by_name=true` to `read_parquet()` for partitioned views: - -```python -# Before: -sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}')" -# After: -sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)" -``` - -This is an immediate fix — existing partition files with `null`-typed columns will be readable without re-syncing. - -### 2. PyArrow type mapping — `src/keboola_client.py` (after line 48) - -Add `KEBOOLA_TO_PYARROW_TYPES` dict alongside existing `KEBOOLA_TO_PANDAS_TYPES`: - -```python -KEBOOLA_TO_PYARROW_TYPES = { - "STRING": pa.string(), "VARCHAR": pa.string(), "TEXT": pa.string(), - "INTEGER": pa.int64(), "BIGINT": pa.int64(), - "NUMERIC": pa.float64(), "DECIMAL": pa.float64(), "FLOAT": pa.float64(), "DOUBLE": pa.float64(), - "BOOLEAN": pa.bool_(), - "DATE": pa.date32(), - "TIMESTAMP": pa.timestamp("us"), "TIMESTAMP_NTZ": pa.timestamp("us"), "TIMESTAMP_TZ": pa.timestamp("us", tz="UTC"), -} -``` - -### 3. Refactor provider cascade + new `get_pyarrow_schema()` — `src/keboola_client.py` - -**3a.** Extract duplicated provider cascade logic from `get_pandas_dtypes()`, `get_date_columns()` into a shared `_resolve_keboola_type(col_meta_list) -> str` helper. This logic is repeated 3x — the new method would be the 4th copy without refactoring. - -**3b.** Add `get_pyarrow_schema(table_id) -> Optional[pa.Schema]` method to `KeboolaClient`: -- Builds `pa.Schema` from Keboola metadata using `_resolve_keboola_type()` + `KEBOOLA_TO_PYARROW_TYPES` -- Returns `None` with WARNING log if metadata unavailable (graceful fallback) -- Refactor `get_pandas_dtypes()` and `get_date_columns()` to also use `_resolve_keboola_type()` - -### 4. New helper `apply_schema_to_table()` — `src/parquet_manager.py` (after line 103) - -Function that casts a `pa.Table` to match a target `pa.Schema`: -- Matches columns by name (not position) -- For `null`-type columns: creates typed null array via `pa.nulls(len, type=target_type)` -- For other mismatches: uses `col.cast(target_type, safe=True)` — if cast fails, keeps original type and logs warning (no data is changed or lost) -- Columns not in schema keep their inferred type -- Logs warnings on cast failures, doesn't crash - -### 5. Integrate schema into all parquet write paths - -Add `pyarrow_schema` parameter to these methods and apply `apply_schema_to_table()` after `pa.Table.from_pandas()` and `convert_date_columns_to_date32()`: - -| Method | File | Line | Current issue | -|--------|------|------|---------------| -| `csv_to_parquet()` | `parquet_manager.py` | 274 | No explicit schema | -| `merge_parquet()` | `parquet_manager.py` | 493 | No explicit schema | -| `_process_csv_to_partitions()` | `data_sync.py` | 848, 854 | No explicit schema | -| `_deduplicate_partitions()` | `data_sync.py` | 890 | Uses `df.to_parquet()` — bypasses entire PyArrow pipeline | - -**`_deduplicate_partitions()` fix (line 890):** Replace `df.to_parquet()` with full PyArrow pipeline: -```python -table = pa.Table.from_pandas(df, preserve_index=False) -if date_columns: - table = convert_date_columns_to_date32(table, date_columns) -if pyarrow_schema: - table = apply_schema_to_table(table, pyarrow_schema) -pq.write_table(table, partition_path, compression="snappy") -``` - -### 6. Update all callers to fetch and pass `pyarrow_schema` - -Each caller already fetches `dtypes` and `date_columns`. Add one line after them: - -```python -pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id) -``` - -| Caller method | File:Line | Passes to | -|---------------|-----------|-----------| -| `_full_refresh()` | `data_sync.py:318-328` | `csv_to_parquet()` | -| `_incremental_single_file_sync()` | `data_sync.py:455-508` | `merge_parquet()`, `csv_to_parquet()` | -| `_incremental_partitioned_sync()` | `data_sync.py:600-612` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` | -| `_chunked_initial_load()` | `data_sync.py:673-766` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` | -| `_partitioned_sync()` | `data_sync.py:989-1001` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` | - -## Tests — `data/tests/test_parquet_types.py` (local only, gitignored) - -Test file lives in `data/tests/` which is gitignored (`data/` is in `.gitignore`). Tests are for **local verification only** — they don't go to the server or CI. - -All test data is created via pytest's `tmp_path` fixture (auto-cleaned) or in `data/tests/tmp/`. No test artifacts persist after the run. - -10 test cases, all local (no Keboola API calls): - -**Core tests:** -1. **`test_union_by_name_resolves_schema_mismatch`** — Two parquet files with conflicting types, DuckDB reads with `union_by_name=true` -2. **`test_apply_schema_fixes_null_type_columns`** — `apply_schema_to_table()` converts null-type columns to proper types -3. **`test_parquet_preserves_types_with_all_null_columns`** — End-to-end: CSV with all-NULL column → Parquet → verify correct type -4. **`test_deduplicate_preserves_date32_types`** — After dedup, DATE32 columns retain their type -5. **`test_keboola_to_pyarrow_mapping_completeness`** — Every key in `KEBOOLA_TO_PANDAS_TYPES` exists in `KEBOOLA_TO_PYARROW_TYPES` -6. **`test_get_pyarrow_schema_from_metadata`** — Unit test with mocked Keboola metadata → verify correct schema - -**Additional tests (from Gemini review):** -7. **`test_apply_schema_handles_cast_error_gracefully`** — Column with uncastable values (e.g. "abc" → int64) doesn't crash, logs warning, keeps original type -8. **`test_schema_with_extra_csv_column`** — CSV has column not in metadata → column preserved with inferred type -9. **`test_get_pyarrow_schema_handles_all_types`** — All types from `KEBOOLA_TO_PYARROW_TYPES` mapping verified -10. **`test_deduplicate_with_mixed_types_and_nulls`** — Partition with all-null + mixed-data columns → correct schema after dedup - -## Files modified - -- `scripts/duckdb_manager.py` — 1 line change (union_by_name) -- `src/keboola_client.py` — new `KEBOOLA_TO_PYARROW_TYPES` mapping + `_resolve_keboola_type()` helper + `get_pyarrow_schema()` method + refactor existing methods -- `src/parquet_manager.py` — new `apply_schema_to_table()` function + `pyarrow_schema` param on `csv_to_parquet()` and `merge_parquet()` -- `src/data_sync.py` — `pyarrow_schema` param on `_process_csv_to_partitions()` and `_deduplicate_partitions()` + update all 5 callers -- `data/tests/test_parquet_types.py` — new file, 10 tests (**gitignored**, local verification only) - -## Verification - -1. Run `pytest data/tests/test_parquet_types.py -v` — all 10 tests pass -2. Run `pytest tests/` — existing CI tests still pass (no regressions) -3. Existing parquet files don't need re-sync — the `union_by_name=true` fix handles them immediately -4. On next sync cycle, partitions get re-written with correct schema - -## Cleanup after verification - -After all tests pass and code changes are committed: -```bash -rm -rf data/tests/ -``` -Test file and all test artifacts live in `data/` (gitignored) — nothing gets committed or deployed to server. diff --git a/examples/notifications/data_freshness.py b/examples/notifications/data_freshness.py deleted file mode 100644 index e2c1510..0000000 --- a/examples/notifications/data_freshness.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python3 -""" -Example notification: Data freshness alert. - -Checks if the local data is stale (older than threshold) and notifies. -Outputs JSON to stdout for notify-runner. -""" - -import json -import os -import sys -from datetime import datetime, timezone -from pathlib import Path - -# Configuration -DATA_DIR = Path.home() / "server" / "parquet" -STALE_THRESHOLD_HOURS = 24 - - -def check_freshness() -> dict: - """Check data freshness by examining parquet file modification times.""" - if not DATA_DIR.exists(): - return { - "notify": True, - "title": "Data directory missing", - "message": f"Data directory not found: {DATA_DIR}\nRun: bash scripts/sync_data.sh", - "cooldown": "6h", - } - - # Find newest parquet file - parquet_files = list(DATA_DIR.rglob("*.parquet")) - if not parquet_files: - return { - "notify": True, - "title": "No data files found", - "message": "No parquet files in data directory.\nRun: bash scripts/sync_data.sh", - "cooldown": "6h", - } - - newest_mtime = max(f.stat().st_mtime for f in parquet_files) - newest_dt = datetime.fromtimestamp(newest_mtime, tz=timezone.utc) - now = datetime.now(tz=timezone.utc) - age_hours = (now - newest_dt).total_seconds() / 3600 - - if age_hours > STALE_THRESHOLD_HOURS: - return { - "notify": True, - "title": f"Data is {age_hours:.0f}h old", - "message": ( - f"Latest file: {newest_dt:%Y-%m-%d %H:%M} UTC\n" - f"Age: {age_hours:.1f} hours (threshold: {STALE_THRESHOLD_HOURS}h)\n" - f"Run: bash scripts/sync_data.sh" - ), - "cooldown": "6h", - } - - return {"notify": False} - - -if __name__ == "__main__": - result = check_freshness() - print(json.dumps(result)) diff --git a/examples/notifications/metric_report.py b/examples/notifications/metric_report.py deleted file mode 100644 index f9eca15..0000000 --- a/examples/notifications/metric_report.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python3 -""" -Example notification: Daily metric report with chart image. - -Generates a summary chart using matplotlib and sends it as a Telegram photo. -Outputs JSON to stdout for notify-runner. -""" - -import json -import os -import sys -import tempfile -from datetime import datetime -from pathlib import Path - -import duckdb - -DB_PATH = Path.home() / "user" / "duckdb" / "analytics.duckdb" - - -def generate_chart(data: list[tuple]) -> str | None: - """Generate a bar chart and return the file path.""" - try: - import matplotlib - - matplotlib.use("Agg") # Non-interactive backend - import matplotlib.pyplot as plt - - dates = [row[0].strftime("%m/%d") for row in data] - values = [float(row[1]) for row in data] - - fig, ax = plt.subplots(figsize=(8, 4)) - bars = ax.bar(dates, values, color="#0073D1", width=0.6) - - # Highlight today - if len(bars) > 0: - bars[-1].set_color("#EA580C") - - ax.set_title("Daily Revenue - Last 7 Days", fontsize=14, fontweight="bold") - ax.set_ylabel("Revenue ($)") - ax.spines["top"].set_visible(False) - ax.spines["right"].set_visible(False) - plt.xticks(rotation=45, ha="right") - plt.tight_layout() - - # Save to temp file - chart_path = os.path.join( - tempfile.gettempdir(), - f"notify_{os.environ.get('USER', 'user')}_metric_{datetime.now():%Y%m%d}.png", - ) - plt.savefig(chart_path, dpi=150, bbox_inches="tight") - plt.close() - - return chart_path - - except ImportError: - print("matplotlib not installed, skipping chart", file=sys.stderr) - return None - except Exception as e: - print(f"Chart generation error: {e}", file=sys.stderr) - return None - - -def build_report() -> dict: - """Build daily metric report.""" - if not DB_PATH.exists(): - return {"notify": False} - - try: - conn = duckdb.connect(str(DB_PATH), read_only=True) - - # TODO: Adapt this query to your schema. - # DuckDB views use relative paths, so scripts must run from ~/ - # (notify-runner sets cwd to home directory automatically). - # - # Example for a table with date + numeric columns: - # SELECT DATE_TRUNC('day', created_at)::DATE AS day, - # SUM(amount) AS revenue - # FROM my_table - # WHERE created_at >= CURRENT_DATE - INTERVAL '7 days' - # GROUP BY 1 ORDER BY 1 - rows = conn.execute(""" - SELECT - DATE_TRUNC('day', created_at)::DATE AS day, - COUNT(*) AS cnt - FROM kbc_project - WHERE created_at >= CURRENT_DATE - INTERVAL '7 days' - GROUP BY 1 - ORDER BY 1 - """).fetchall() - - conn.close() - - if not rows: - return {"notify": False} - - today_val = float(rows[-1][1]) if rows else 0 - total_7d = sum(float(r[1]) for r in rows) - - chart_path = generate_chart(rows) - - result = { - "notify": True, - "title": "Daily Metric Report", - "message": ( - f"Today: {today_val:,.0f}\n" - f"7d total: {total_7d:,.0f}\n" - f"7d avg: {total_7d / len(rows):,.0f}" - ), - "cooldown": "1d", - } - - if chart_path: - result["image_path"] = chart_path - - return result - - except Exception as e: - print(f"metric_report error: {e}", file=sys.stderr) - return {"notify": False} - - -if __name__ == "__main__": - result = build_report() - print(json.dumps(result)) diff --git a/examples/notifications/revenue_drop.py b/examples/notifications/revenue_drop.py deleted file mode 100644 index 8c437e8..0000000 --- a/examples/notifications/revenue_drop.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python3 -""" -Example notification: Revenue drop alert (text only). - -Checks if today's revenue dropped significantly vs 7-day average. -Outputs JSON to stdout for notify-runner. -""" - -import json -import sys -from pathlib import Path - -import duckdb - -# Configuration -DB_PATH = Path.home() / "user" / "duckdb" / "analytics.duckdb" -DROP_THRESHOLD_PERCENT = 20 - - -def check_revenue_drop() -> dict: - """Query revenue data and check for significant drop.""" - if not DB_PATH.exists(): - return {"notify": False} - - try: - conn = duckdb.connect(str(DB_PATH), read_only=True) - - # Example query - adjust table/column names to your schema - result = conn.execute(""" - WITH daily AS ( - SELECT - DATE_TRUNC('day', created_at) AS day, - SUM(amount) AS revenue - FROM payments - WHERE created_at >= CURRENT_DATE - INTERVAL '8 days' - GROUP BY 1 - ), - stats AS ( - SELECT - (SELECT revenue FROM daily WHERE day = CURRENT_DATE) AS today_rev, - AVG(CASE WHEN day < CURRENT_DATE AND day >= CURRENT_DATE - INTERVAL '7 days' - THEN revenue END) AS avg_7d - FROM daily - ) - SELECT today_rev, avg_7d, - ROUND((1 - today_rev / NULLIF(avg_7d, 0)) * 100, 1) AS drop_pct - FROM stats - """).fetchone() - - conn.close() - - if result is None or result[0] is None or result[1] is None: - return {"notify": False} - - today_rev, avg_7d, drop_pct = result - - if drop_pct >= DROP_THRESHOLD_PERCENT: - return { - "notify": True, - "title": f"Revenue dropped {drop_pct}%", - "message": ( - f"Today: ${today_rev:,.0f}\n" - f"7d avg: ${avg_7d:,.0f}\n" - f"Drop: {drop_pct}%" - ), - "cooldown": "6h", - "data": { - "today_revenue": float(today_rev), - "avg_7d_revenue": float(avg_7d), - "drop_percent": float(drop_pct), - }, - } - - return {"notify": False} - - except Exception as e: - print(f"revenue_drop error: {e}", file=sys.stderr) - return {"notify": False} - - -if __name__ == "__main__": - result = check_revenue_drop() - print(json.dumps(result)) diff --git a/llms.txt b/llms.txt deleted file mode 100644 index efc1a0d..0000000 --- a/llms.txt +++ /dev/null @@ -1,47 +0,0 @@ -# AI Data Analyst - -> A data distribution platform for AI analytical systems. Syncs data from configured sources (Keboola, CSV, BigQuery), converts to Parquet, and distributes to analysts who query locally using Claude Code and DuckDB. - -## Key Files - -- [README](README.md): Project overview, quick start, and feature list -- [CLAUDE.md](CLAUDE.md): Claude Code project context and development instructions -- [ARCHITECTURE.md](ARCHITECTURE.md): System architecture, data flow, and component overview - -## Documentation - -- [Quick Start](docs/QUICKSTART.md): End-to-end setup for new deployments -- [Configuration](docs/CONFIGURATION.md): All instance.yaml options explained -- [Deployment](docs/DEPLOYMENT.md): Server provisioning and deployment guide -- [Data Sources](docs/DATA_SOURCES.md): Adapter system and data source configuration - -## Core Source Code - -- [src/data_sync.py](src/data_sync.py): Data sync orchestration and DataSource ABC -- [src/adapters/](src/adapters/): Pluggable data source adapters (Keboola, CSV) -- [src/parquet_manager.py](src/parquet_manager.py): CSV to Parquet conversion engine -- [src/config.py](src/config.py): Runtime configuration from data_description.md -- [config/loader.py](config/loader.py): Instance config loader with ${ENV_VAR} interpolation - -## Web Application - -- [webapp/app.py](webapp/app.py): Flask application entry point -- [webapp/config.py](webapp/config.py): Webapp configuration from instance.yaml -- [webapp/account_service.py](webapp/account_service.py): User account and sync management - -## Server Operations - -- [server/deploy.sh](server/deploy.sh): Deployment script -- [server/setup.sh](server/setup.sh): Initial server provisioning -- [server/webapp-setup.sh](server/webapp-setup.sh): Web application setup (Nginx, SSL, systemd) -- [config/instance.yaml.example](config/instance.yaml.example): Configuration template - -## Configuration - -Instance config: `config/instance.yaml` (YAML with `${ENV_VAR}` references for secrets). -Environment variables: `.env` file (never committed). See `config/.env.template`. -Data schema: `docs/data_description.md` (YAML blocks in markdown). - -## Architecture Summary - -Data Source -> Source Adapter -> Parquet files on server -> rsync over SSH -> Analyst machine -> DuckDB views -> Claude Code queries diff --git a/scripts/activate_venv.sh b/scripts/activate_venv.sh deleted file mode 100644 index d60306f..0000000 --- a/scripts/activate_venv.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash -# Helper script to activate virtual environment -# Usage: source scripts/activate_venv.sh - -# Detect project root (should be run from project directory) -if [ ! -d ".venv" ]; then - echo "❌ Virtual environment not found. Are you in the project directory?" - echo " Expected: A directory with .venv/ folder" - return 1 2>/dev/null || exit 1 -fi - -# Activate based on platform -if [ -f ".venv/bin/activate" ]; then - # Unix/macOS - source .venv/bin/activate - echo "✅ Virtual environment activated (Unix)" -elif [ -f ".venv/Scripts/activate" ]; then - # Windows (Git Bash) - source .venv/Scripts/activate - echo "✅ Virtual environment activated (Windows)" -else - echo "❌ Could not find activation script in .venv/" - return 1 2>/dev/null || exit 1 -fi - -# Show Python version -echo " Python: $(python --version 2>&1)" -echo " Location: $(which python)" diff --git a/scripts/backfill_gap.sh b/scripts/backfill_gap.sh deleted file mode 100755 index cd910a0..0000000 --- a/scripts/backfill_gap.sh +++ /dev/null @@ -1,109 +0,0 @@ -#!/bin/bash -# Backfill missing Jira issues from GitHub issue #101 -# Range: SUPPORT-15166 to SUPPORT-15243 (71 missing of 78 total) -# Safe to run while webhook processing is active. -# -# Usage: -# ssh kids -# cd /opt/data-analyst/repo -# source /opt/data-analyst/.venv/bin/activate -# bash scripts/backfill_gap.sh [--dry-run] - -set -euo pipefail - -REPO_DIR="/opt/data-analyst/repo" -VENV_DIR="/opt/data-analyst/.venv" -RAW_DIR="/data/src_data/raw/jira" -PARQUET_DIR="/data/src_data/parquet/jira" -LOG_FILE="/opt/data-analyst/logs/backfill_gap.log" -JIRA_PROJECT="${JIRA_PROJECT:-}" -if [ -n "$JIRA_PROJECT" ]; then - JQL="project = \"${JIRA_PROJECT}\" AND key >= SUPPORT-15166 AND key <= SUPPORT-15243" -else - JQL='key >= SUPPORT-15166 AND key <= SUPPORT-15243' -fi -RANGE_START=15166 -RANGE_END=15243 -DRY_RUN=false - -# Parse args -if [[ "${1:-}" == "--dry-run" ]]; then - DRY_RUN=true -fi - -# Ensure log directory exists -mkdir -p "$(dirname "$LOG_FILE")" - -# Log to both stdout and file -exec > >(tee -a "$LOG_FILE") 2>&1 -echo "=== Backfill started: $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" - -cd "$REPO_DIR" - -# --- Phase 1: Download raw JSON --- -echo "" -echo "--- Phase 1: Download raw JSON ---" -if $DRY_RUN; then - python -m connectors.jira.scripts.backfill --jql "$JQL" --dry-run - echo "Dry run complete. Exiting." - exit 0 -fi - -python -m connectors.jira.scripts.backfill --jql "$JQL" --skip-existing --parallel 4 - -# --- Phase 2: Incremental Parquet transform --- -echo "" -echo "--- Phase 2: Incremental Parquet transform ---" -success=0 -skipped=0 -failed=0 - -for issue_num in $(seq $RANGE_START $RANGE_END); do - issue_key="SUPPORT-${issue_num}" - json_file="${RAW_DIR}/issues/${issue_key}.json" - - if [ ! -f "$json_file" ]; then - echo "SKIP: $issue_key (no JSON)" - skipped=$((skipped + 1)) - continue - fi - - echo -n "Transform $issue_key... " - if python -m src.incremental_jira_transform "$issue_key" 2>&1 | tail -1; then - success=$((success + 1)) - else - echo "FAILED: $issue_key" - failed=$((failed + 1)) - fi - - sleep 0.5 # reduce collision window with live webhooks -done - -echo "" -echo "Transform complete: $success ok, $skipped skipped, $failed failed" - -# --- Phase 3: Verification --- -echo "" -echo "--- Phase 3: Verification ---" -python -c " -import pyarrow.parquet as pq -from pathlib import Path - -parquet_dir = Path('$PARQUET_DIR/issues') -all_keys = set() -for pf in parquet_dir.glob('*.parquet'): - table = pq.read_table(pf, columns=['issue_key']) - all_keys.update(table.column('issue_key').to_pylist()) - -expected = {f'SUPPORT-{n}' for n in range($RANGE_START, $RANGE_END + 1)} -found = expected & all_keys -missing = expected - all_keys -print(f'Found: {len(found)}/{len(expected)} issues in Parquet') -if missing: - print(f'STILL MISSING ({len(missing)}): {sorted(missing)}') -else: - print('SUCCESS: All issues present in Parquet') -" - -echo "" -echo "=== Backfill finished: $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" diff --git a/scripts/collect_session.py b/scripts/collect_session.py deleted file mode 100644 index d3d0779..0000000 --- a/scripts/collect_session.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python3 -"""Collect Claude Code session transcript to user/sessions/. - -This script is invoked by Claude Code's SessionEnd hook. -It reads JSON from stdin containing session_id, transcript_path, and cwd, -then copies the transcript JSONL file to user/sessions/ with a date prefix. - -Design principles: -- Stdlib only (no external dependencies) -- Must NEVER crash or produce non-zero exit - Claude Code expects clean exit -- Uses shutil.copy2 (not move) - Claude Code still references the transcript -- UTC date for consistency across timezones -""" - -import json -import shutil -import sys -from datetime import datetime, timezone -from pathlib import Path - - -def main() -> None: - try: - raw = sys.stdin.read() - if not raw.strip(): - return - - data = json.loads(raw) - session_id = data.get("session_id", "") - transcript_path = data.get("transcript_path", "") - cwd = data.get("cwd", "") - - if not transcript_path or not session_id: - return - - source = Path(transcript_path) - if not source.exists() or not source.is_file(): - return - - # Determine target directory: cwd/user/sessions/ - if not cwd: - return - - target_dir = Path(cwd) / "user" / "sessions" - - # Only collect if we're inside a project that has user/ directory - user_dir = Path(cwd) / "user" - if not user_dir.is_dir(): - return - - target_dir.mkdir(parents=True, exist_ok=True) - - # Format: YYYY-MM-DD_{full_session_id}.jsonl - date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") - target_file = target_dir / f"{date_str}_{session_id}.jsonl" - - # Skip if already collected (check by session_id suffix to handle date changes) - for existing in target_dir.glob(f"*_{session_id}.jsonl"): - return - - shutil.copy2(str(source), str(target_file)) - - except Exception: - # Must never crash - Claude Code depends on clean exit - pass - - -if __name__ == "__main__": - main() diff --git a/scripts/generate_user_sync_configs.py b/scripts/generate_user_sync_configs.py deleted file mode 100755 index 5c692f7..0000000 --- a/scripts/generate_user_sync_configs.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/env python3 -""" -Generate per-user sync config files from central sync_settings.json. - -This script reads /data/notifications/sync_settings.json and writes -~/.sync_settings.yaml for each user with their configured settings. - -Usage: - python3 generate_user_sync_configs.py [--dry-run] - -Run this script: -- After manual changes to sync_settings.json -- As a cron job to ensure configs stay in sync -- The webapp calls this automatically after settings changes -""" - -import argparse -import json -import os -import pwd -import sys -from pathlib import Path - -SYNC_SETTINGS_FILE = Path("/data/notifications/sync_settings.json") -DEFAULT_SETTINGS = { - "jira": False, - "jira_attachments": False, -} - - -def read_settings() -> dict: - """Read sync settings from JSON file.""" - if not SYNC_SETTINGS_FILE.exists(): - return {} - try: - with open(SYNC_SETTINGS_FILE) as f: - return json.load(f) - except (json.JSONDecodeError, IOError) as e: - print(f"Error reading {SYNC_SETTINGS_FILE}: {e}", file=sys.stderr) - return {} - - -def generate_yaml(settings: dict) -> str: - """Generate YAML content for a user's sync config.""" - lines = [ - "# AI Data Analyst - Data Sync Configuration", - "# Managed by web portal - changes here may be overwritten", - "#", - "# To manage settings, visit the web portal (Data Settings page)", - "", - "datasets:", - ] - - for dataset, enabled in sorted(settings.items()): - value = "true" if enabled else "false" - lines.append(f" {dataset}: {value}") - - lines.append("") - return "\n".join(lines) - - -def get_user_home(username: str) -> Path | None: - """Get home directory for a user.""" - try: - return Path(pwd.getpwnam(username).pw_dir) - except KeyError: - return None - - -def write_user_config(username: str, yaml_content: str, dry_run: bool = False) -> bool: - """Write config file to user's home directory. - - Returns True on success, False on failure. - """ - home = get_user_home(username) - if not home: - print(f" [SKIP] User {username} not found on system") - return False - - config_path = home / ".sync_settings.yaml" - - if dry_run: - print(f" [DRY-RUN] Would write {config_path}") - return True - - try: - # Write to temp file first - import tempfile - - fd, tmp_path = tempfile.mkstemp(suffix=".yaml", dir=str(home.parent)) - try: - with os.fdopen(fd, "w") as f: - f.write(yaml_content) - - # Get user's uid/gid - user_info = pwd.getpwnam(username) - - # Set ownership - os.chown(tmp_path, user_info.pw_uid, user_info.pw_gid) - os.chmod(tmp_path, 0o644) - - # Atomic move - os.replace(tmp_path, config_path) - - print(f" [OK] {config_path}") - return True - - except Exception as e: - os.unlink(tmp_path) - raise e - - except PermissionError: - print(f" [ERROR] Permission denied for {username}") - return False - except Exception as e: - print(f" [ERROR] {username}: {e}") - return False - - -def main(): - parser = argparse.ArgumentParser( - description="Generate per-user sync config files from central settings." - ) - parser.add_argument( - "--dry-run", action="store_true", help="Show what would be done without making changes" - ) - parser.add_argument("--user", help="Generate config for specific user only") - args = parser.parse_args() - - all_settings = read_settings() - - if not all_settings: - print("No sync settings found or file is empty.") - return 0 - - if args.user: - users = {args.user: all_settings.get(args.user, {})} - else: - users = all_settings - - print(f"Generating sync configs for {len(users)} user(s)...") - success = 0 - failed = 0 - - for username, user_data in users.items(): - # Merge with defaults - settings = dict(DEFAULT_SETTINGS) - settings.update(user_data.get("datasets", {})) - - yaml_content = generate_yaml(settings) - - if write_user_config(username, yaml_content, args.dry_run): - success += 1 - else: - failed += 1 - - print(f"\nDone: {success} succeeded, {failed} failed") - return 0 if failed == 0 else 1 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/scripts/remote_query.sh b/scripts/remote_query.sh deleted file mode 100644 index 5e05492..0000000 --- a/scripts/remote_query.sh +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/bash -# Remote Query - wrapper for src.remote_query -# -# Runs DuckDB queries spanning local Parquet + remote BigQuery tables. -# Sets up the correct environment (PYTHONPATH, CONFIG_DIR, env vars) automatically. -# -# Usage (via SSH from analyst machine): -# ssh 'bash ~/server/scripts/remote_query.sh \ -# --register-bq "traffic=SELECT ... FROM \`project.dataset.table\` WHERE ... GROUP BY ..." \ -# --sql "SELECT * FROM traffic ORDER BY ..." \ -# --format table' -# -# All arguments are passed directly to python -m src.remote_query. -# See: python -m src.remote_query --help - -set -e - -APP_DIR="/opt/data-analyst" -SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" - -# Load BigQuery environment variables -# Try analyst-readable env first (deployed to /data/scripts/), fall back to app .env -if [[ -f "${SCRIPT_DIR}/.remote_query.env" ]]; then - set -a - source "${SCRIPT_DIR}/.remote_query.env" - set +a -elif [[ -r "${APP_DIR}/.env" ]]; then - set -a - source "${APP_DIR}/.env" - set +a -else - echo "ERROR: No environment file found. Contact your admin." >&2 - echo " Tried: ${SCRIPT_DIR}/.remote_query.env, ${APP_DIR}/.env" >&2 - exit 1 -fi - -# Run remote_query with correct paths -cd "${APP_DIR}" -PYTHONPATH="${APP_DIR}/repo" \ -CONFIG_DIR="${APP_DIR}/instance/config" \ -exec "${APP_DIR}/.venv/bin/python3" -m src.remote_query "$@" diff --git a/scripts/setup_views.sh b/scripts/setup_views.sh deleted file mode 100644 index 2029710..0000000 --- a/scripts/setup_views.sh +++ /dev/null @@ -1,126 +0,0 @@ -#!/bin/bash - -# setup_views.sh - DuckDB views initialization script -# -# This script performs: -# 1. DuckDB views initialization from Parquet files -# -# Run this after syncing Parquet files (either via update.sh or rsync) - -set -e # Exit on error - -echo "🦆 DuckDB Views Setup" -echo "" - -# Check that we're in the correct folder -# Support both local (server/docs/) and server (~/ with server/ symlinks) layouts -if [ ! -f "server/docs/data_description.md" ] && [ ! -f "docs/data_description.md" ]; then - echo "❌ Run script from project root (folder with server/docs/data_description.md)" - exit 1 -fi - -# Activate virtual environment -echo "1️⃣ Activating virtual environment..." -if [ -d ".venv" ]; then - # Try Unix-style activation first - if [ -f ".venv/bin/activate" ]; then - source .venv/bin/activate - # Windows-style activation - elif [ -f ".venv/Scripts/activate" ]; then - source .venv/Scripts/activate - else - echo " ❌ Virtual environment activation script not found." - exit 1 - fi - echo " ✅ Virtual environment activated" -elif [ -d "venv" ]; then - # Legacy support for old venv name - if [ -f "venv/bin/activate" ]; then - source venv/bin/activate - elif [ -f "venv/Scripts/activate" ]; then - source venv/Scripts/activate - else - echo " ❌ Virtual environment activation script not found." - exit 1 - fi - echo " ✅ Virtual environment activated (legacy venv)" -else - echo " ❌ Virtual environment not found (.venv or venv)." - echo " Run bootstrap setup first." - exit 1 -fi - -# Initialize DuckDB views -echo "" -echo "2️⃣ Initializing DuckDB views..." -echo "" - -# Use python3 if available, otherwise python (Windows compatibility) -if command -v python3 >/dev/null 2>&1; then - PYTHON_CMD=python3 -else - PYTHON_CMD=python -fi - -# Determine scripts location (local: server/scripts/, server deploy: same dir as this script) -SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" -DUCKDB_MANAGER="${SCRIPT_DIR}/duckdb_manager.py" - -if $PYTHON_CMD "$DUCKDB_MANAGER" --reinit; then - echo "" - echo " ✅ DuckDB views initialized" -else - echo "" - echo " ❌ DuckDB initialization failed. Check logs above." - exit 1 -fi - -# Optional dataset views (each sync script manages its own views) -if [ -d "server/parquet/jira" ]; then - JIRA_SCRIPT="${SCRIPT_DIR}/sync_jira.sh" - if [ -f "$JIRA_SCRIPT" ]; then - echo "" - echo "3️⃣ Creating Jira views..." - bash "$JIRA_SCRIPT" --views-only - fi -fi - -# Display sync state -echo "" -echo "📊 Data state:" -# Check for sync state in either location -SYNC_STATE="" -if [ -f "server/metadata/sync_state.json" ]; then - SYNC_STATE="server/metadata/sync_state.json" -elif [ -f "data/metadata/sync_state.json" ]; then - SYNC_STATE="data/metadata/sync_state.json" -fi - -if [ -n "$SYNC_STATE" ]; then - $PYTHON_CMD << PYTHON -import json -from datetime import datetime - -with open("$SYNC_STATE", "r") as f: - state = json.load(f) - -print(f"\n Last update: {state.get('last_updated', 'N/A')}") -print(f" Tables: {len(state.get('tables', {}))}\n") - -for table_id, table_state in state.get('tables', {}).items(): - table_name = table_state.get('table_name', table_id) - rows = table_state.get('rows', 0) - size_mb = table_state.get('file_size_mb', 0) - strategy = table_state.get('strategy', 'N/A') - - print(f" - {table_name}: {rows:,} rows, {size_mb:.2f} MB ({strategy})") -PYTHON -fi - -# Done -echo "" -echo "✅ Setup complete!" -echo "" -echo "💡 Data is ready for analysis with Claude Code" -echo " DuckDB database: user/duckdb/analytics.duckdb" -echo "" diff --git a/scripts/standalone_profiler.py b/scripts/standalone_profiler.py deleted file mode 100644 index 69fb241..0000000 --- a/scripts/standalone_profiler.py +++ /dev/null @@ -1,1271 +0,0 @@ -#!/usr/bin/env python3 -""" -Standalone Data Profiler — DuckDB-based table profiling for Parquet/CSV files. - -Zero external dependencies beyond DuckDB. Produces a comprehensive JSON profile -with column statistics, histograms, alerts, and sample data. - -Usage: - # Profile a single Parquet file - python standalone_profiler.py data/orders.parquet - - # Profile a directory of Parquet files (treated as one table) - python standalone_profiler.py data/partitioned_orders/ - - # Profile a CSV file - python standalone_profiler.py data/customers.csv - - # Custom output path - python standalone_profiler.py data/orders.parquet -o profiles/orders_profile.json - - # Specify primary key for duplicate detection - python standalone_profiler.py data/orders.parquet --primary-key order_id - - # Composite primary key - python standalone_profiler.py data/orders.parquet --primary-key "order_id,line_id" - - # Profile multiple files at once - python standalone_profiler.py data/orders.parquet data/customers.parquet data/products.csv - - # Generate HTML report alongside JSON - python standalone_profiler.py data/orders.parquet --html - - # Generate HTML from existing profile JSON - python standalone_profiler.py --from-json profile.json - -Output: - JSON file with table-level and column-level statistics, alerts, histograms, - top values for categorical columns, and sample rows. - With --html: self-contained HTML file viewable in any browser. - -Requirements: - pip install duckdb -""" - -import argparse -import html as html_mod -import json -import logging -import math -import os -import sys -import tempfile -from datetime import datetime, timezone -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple - -import duckdb - -# --------------------------------------------------------------------------- -# Logging -# --------------------------------------------------------------------------- -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger("profiler") - -# --------------------------------------------------------------------------- -# Profiler configuration -# --------------------------------------------------------------------------- -SAMPLE_THRESHOLD = 500_000 # Sample tables larger than this -SAMPLE_SIZE = 500_000 -MAX_CATEGORICAL_DISTINCT = 50 # Treat as categorical if unique <= this -TOP_VALUES_LIMIT = 10 # Number of top values for categorical columns -HISTOGRAM_BINS = 15 # Number of bins for numeric histograms -SAMPLE_ROWS_LIMIT = 5 # Number of sample rows to include -SAMPLE_VALUES_LIMIT = 5 # Number of sample distinct values per column - -# Alert thresholds -ALERT_HIGH_MISSING_PCT = 30.0 -ALERT_MISSING_PCT = 5.0 -ALERT_IMBALANCE_PCT = 60.0 -ALERT_ZEROS_PCT = 50.0 -ALERT_HIGH_CARDINALITY = 50 - - -# --------------------------------------------------------------------------- -# DuckDB type classification -# --------------------------------------------------------------------------- -def classify_type(duckdb_type: str) -> str: - """Map a DuckDB type string to a simplified category.""" - t = duckdb_type.upper() - if t in ("BOOLEAN", "BOOL"): - return "BOOLEAN" - if t in ("DATE",): - return "DATE" - if "TIMESTAMP" in t: - return "TIMESTAMP" - base_type = t.split("(")[0].strip() - if base_type in ( - "FLOAT", "DOUBLE", "DECIMAL", "REAL", "FLOAT4", "FLOAT8", - "NUMERIC", "HUGEINT", "INTEGER", "INT", "BIGINT", "SMALLINT", - "TINYINT", "INT8", "INT4", "INT2", "INT1", "UBIGINT", - "UINTEGER", "USMALLINT", "UTINYINT", - ): - return "NUMERIC" - return "STRING" - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- -def _round(value: Any, digits: int = 2) -> Any: - """Round a value if it is a float, otherwise return as-is.""" - if value is None: - return None - if isinstance(value, float): - if math.isnan(value) or math.isinf(value): - return None - return round(value, digits) - return value - - -def _format_number(n: float) -> str: - """Format large numbers with human-readable suffixes for histogram bin labels.""" - if n is None: - return "?" - abs_n = abs(n) - if abs_n >= 1_000_000_000: - return f"{n / 1_000_000_000:.1f}B" - if abs_n >= 1_000_000: - return f"{n / 1_000_000:.1f}M" - if abs_n >= 1_000: - return f"{n / 1_000:.1f}K" - if isinstance(n, float) and n != int(n): - return f"{n:.2f}" - return str(int(n)) - - -def write_json_atomic(path: Path, data: Any) -> None: - """Write JSON to path atomically via tempfile + os.replace.""" - path.parent.mkdir(parents=True, exist_ok=True) - fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp") - try: - with os.fdopen(fd, "w") as f: - json.dump(data, f, indent=2, default=str) - os.chmod(tmp_path, 0o644) - os.replace(tmp_path, str(path)) - logger.info("Wrote %s", path) - except Exception: - try: - os.unlink(tmp_path) - except OSError: - pass - raise - - -# --------------------------------------------------------------------------- -# Batch statistics functions -# --------------------------------------------------------------------------- -def _batch_base_stats( - con: duckdb.DuckDBPyConnection, - view_name: str, - columns: List[str], -) -> Dict[str, Tuple[int, int]]: - """Get non_null and unique counts for all columns in a single query. - - Returns: {col_name: (non_null_count, unique_count)} - """ - if not columns: - return {} - - parts = [] - for col_name in columns: - safe = f'"{col_name}"' - parts.append(f"COUNT({safe})") - parts.append(f"COUNT(DISTINCT {safe})") - - sql = f"SELECT {', '.join(parts)} FROM {view_name}" - row = con.execute(sql).fetchone() - - result: Dict[str, Tuple[int, int]] = {} - idx = 0 - for col_name in columns: - result[col_name] = (row[idx], row[idx + 1]) - idx += 2 - return result - - -def _batch_numeric_stats( - con: duckdb.DuckDBPyConnection, - view_name: str, - numeric_cols: List[str], -) -> Dict[str, Dict[str, Any]]: - """Get aggregate statistics for all numeric columns in a single query.""" - if not numeric_cols: - return {} - - parts = [] - for col_name in numeric_cols: - safe = f'"{col_name}"' - parts.extend([ - f"MIN({safe})", - f"MAX({safe})", - f"AVG({safe})", - f"MEDIAN({safe})", - f"STDDEV({safe})", - f"PERCENTILE_CONT(0.05) WITHIN GROUP (ORDER BY {safe})", - f"PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY {safe})", - f"PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY {safe})", - f"PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY {safe})", - f"SUM(CASE WHEN {safe} = 0 THEN 1 ELSE 0 END)", - f"SUM(CASE WHEN {safe} < 0 THEN 1 ELSE 0 END)", - ]) - - sql = f"SELECT {', '.join(parts)} FROM {view_name}" - row = con.execute(sql).fetchone() - - result: Dict[str, Dict[str, Any]] = {} - idx = 0 - for col_name in numeric_cols: - result[col_name] = { - "min": row[idx], "max": row[idx + 1], "mean": row[idx + 2], - "median": row[idx + 3], "stddev": row[idx + 4], - "p5": row[idx + 5], "p25": row[idx + 6], - "p75": row[idx + 7], "p95": row[idx + 8], - "zeros": row[idx + 9], "negative": row[idx + 10], - } - idx += 11 - return result - - -def _batch_string_stats( - con: duckdb.DuckDBPyConnection, - view_name: str, - string_cols: List[str], -) -> Dict[str, Dict[str, Any]]: - """Get string length statistics for all string columns in a single query.""" - if not string_cols: - return {} - - parts = [] - for col_name in string_cols: - safe = f'"{col_name}"' - parts.extend([ - f"MIN(LENGTH({safe}))", - f"MAX(LENGTH({safe}))", - f"AVG(LENGTH({safe}))", - ]) - - sql = f"SELECT {', '.join(parts)} FROM {view_name}" - row = con.execute(sql).fetchone() - - result: Dict[str, Dict[str, Any]] = {} - idx = 0 - for col_name in string_cols: - result[col_name] = { - "min_length": row[idx] if row[idx] is not None else 0, - "max_length": row[idx + 1] if row[idx + 1] is not None else 0, - "avg_length": _round(row[idx + 2]) if row[idx + 2] is not None else 0.0, - } - idx += 3 - return result - - -def _batch_date_stats( - con: duckdb.DuckDBPyConnection, - view_name: str, - date_cols: List[str], - category_map: Dict[str, str], -) -> Dict[str, Dict[str, Any]]: - """Get date range statistics for all date/timestamp columns in a single query.""" - if not date_cols: - return {} - - parts = [] - for col_name in date_cols: - safe = f'"{col_name}"' - cast_expr = f"CAST({safe} AS DATE)" if category_map[col_name] == "TIMESTAMP" else safe - parts.extend([ - f"MIN({cast_expr})", - f"MAX({cast_expr})", - ]) - - sql = f"SELECT {', '.join(parts)} FROM {view_name}" - row = con.execute(sql).fetchone() - - result: Dict[str, Dict[str, Any]] = {} - idx = 0 - for col_name in date_cols: - earliest = row[idx] - latest = row[idx + 1] - span_days = None - if earliest is not None and latest is not None: - try: - delta = latest - earliest - span_days = delta.days if hasattr(delta, "days") else int(delta) - except (TypeError, ValueError): - span_days = None - result[col_name] = { - "earliest": str(earliest) if earliest is not None else None, - "latest": str(latest) if latest is not None else None, - "span_days": span_days, - } - idx += 2 - return result - - -def _batch_boolean_stats( - con: duckdb.DuckDBPyConnection, - view_name: str, - bool_cols: List[str], -) -> Dict[str, Dict[str, Any]]: - """Get boolean true/false counts for all boolean columns in a single query.""" - if not bool_cols: - return {} - - parts = [] - for col_name in bool_cols: - safe = f'"{col_name}"' - parts.extend([ - f"SUM(CASE WHEN {safe} = TRUE THEN 1 ELSE 0 END)", - f"SUM(CASE WHEN {safe} = FALSE THEN 1 ELSE 0 END)", - ]) - - sql = f"SELECT {', '.join(parts)} FROM {view_name}" - row = con.execute(sql).fetchone() - - result: Dict[str, Dict[str, Any]] = {} - idx = 0 - for col_name in bool_cols: - true_count = int(row[idx]) if row[idx] is not None else 0 - false_count = int(row[idx + 1]) if row[idx + 1] is not None else 0 - total = true_count + false_count - result[col_name] = { - "true_count": true_count, - "false_count": false_count, - "true_pct": _round(100.0 * true_count / total) if total > 0 else 0.0, - } - idx += 2 - return result - - -# --------------------------------------------------------------------------- -# Core: profile a single file/table -# --------------------------------------------------------------------------- -def profile_table( - source_path: Path, - table_name: Optional[str] = None, - primary_key: Optional[str] = None, -) -> Dict[str, Any]: - """Profile a single Parquet file, Parquet directory, or CSV file. - - Args: - source_path: Path to .parquet file, directory of .parquet files, or .csv file. - table_name: Display name for the table (defaults to filename stem). - primary_key: Comma-separated primary key column(s) for duplicate detection. - - Returns: - Dict with complete profile (table-level + column-level statistics). - """ - source_path = Path(source_path) - if table_name is None: - table_name = source_path.stem - - pk_columns: List[str] = [] - if primary_key: - pk_columns = [c.strip() for c in primary_key.split(",")] - - con = duckdb.connect() - - # Determine read expression based on file type - if source_path.is_dir(): - read_expr = f"read_parquet('{source_path}/*.parquet')" - elif source_path.suffix.lower() == ".csv": - read_expr = f"read_csv_auto('{source_path}')" - else: - read_expr = f"read_parquet('{source_path}')" - - # Get row count to decide on sampling - total_rows = con.execute(f"SELECT COUNT(*) FROM {read_expr}").fetchone()[0] - - # Materialize into temp table (reads source files once instead of per-query) - view_name = "tbl" - sampled = total_rows > SAMPLE_THRESHOLD - if sampled: - con.execute( - f"CREATE TEMP TABLE {view_name} AS SELECT * FROM {read_expr} USING SAMPLE {SAMPLE_SIZE} ROWS" - ) - working_rows = con.execute(f"SELECT COUNT(*) FROM {view_name}").fetchone()[0] - else: - con.execute(f"CREATE TEMP TABLE {view_name} AS SELECT * FROM {read_expr}") - working_rows = total_rows - - # Column metadata - col_info = con.execute(f"DESCRIBE {view_name}").fetchall() - - # Classify columns by type - all_col_names: List[str] = [] - type_map: Dict[str, str] = {} - category_map: Dict[str, str] = {} - numeric_cols: List[str] = [] - string_cols: List[str] = [] - date_cols: List[str] = [] - bool_cols: List[str] = [] - - for col_row in col_info: - col_name = col_row[0] - col_type = col_row[1] - all_col_names.append(col_name) - type_map[col_name] = col_type - category = classify_type(col_type) - category_map[col_name] = category - if category == "NUMERIC": - numeric_cols.append(col_name) - elif category == "STRING": - string_cols.append(col_name) - elif category in ("DATE", "TIMESTAMP"): - date_cols.append(col_name) - elif category == "BOOLEAN": - bool_cols.append(col_name) - - # ---- Batch queries (one scan per type category) ---- - base_stats = _batch_base_stats(con, view_name, all_col_names) - - numeric_batch: Dict[str, Dict[str, Any]] = {} - try: - numeric_batch = _batch_numeric_stats(con, view_name, numeric_cols) - except Exception as exc: - logger.warning("Batch numeric stats failed: %s", exc) - - string_batch: Dict[str, Dict[str, Any]] = {} - try: - string_batch = _batch_string_stats(con, view_name, string_cols) - except Exception as exc: - logger.warning("Batch string stats failed: %s", exc) - - date_batch: Dict[str, Dict[str, Any]] = {} - try: - date_batch = _batch_date_stats(con, view_name, date_cols, category_map) - except Exception as exc: - logger.warning("Batch date stats failed: %s", exc) - - boolean_batch: Dict[str, Dict[str, Any]] = {} - try: - boolean_batch = _batch_boolean_stats(con, view_name, bool_cols) - except Exception as exc: - logger.warning("Batch boolean stats failed: %s", exc) - - # ---- Build column profiles ---- - columns: List[Dict[str, Any]] = [] - variable_types: Dict[str, int] = {} - total_null_count = 0 - total_cells = working_rows * len(col_info) if col_info else 0 - first_date_col: Optional[Dict[str, Any]] = None - - for col_name in all_col_names: - col_type = type_map[col_name] - category = category_map[col_name] - safe_col = f'"{col_name}"' - variable_types[category] = variable_types.get(category, 0) + 1 - - non_null, unique_count = base_stats.get(col_name, (0, 0)) - null_count = working_rows - non_null - - completeness_pct = _round(100.0 * non_null / working_rows) if working_rows > 0 else 0.0 - unique_pct = _round(100.0 * unique_count / non_null) if non_null > 0 else 0.0 - missing_pct = _round(100.0 * null_count / working_rows) if working_rows > 0 else 0.0 - is_pk = col_name in pk_columns - - # Sample values - sample_values: List[str] = [] - try: - rows = con.execute( - f""" - SELECT DISTINCT CAST({safe_col} AS VARCHAR) AS v - FROM {view_name} - WHERE {safe_col} IS NOT NULL - LIMIT {SAMPLE_VALUES_LIMIT} - """ - ).fetchall() - sample_values = [r[0] for r in rows if r[0] is not None] - except Exception: - pass - - # Alerts - alerts: List[str] = [] - if unique_count == 1 and null_count == 0: - alerts.append("constant") - if unique_pct == 100.0 and null_count == 0 and non_null > 0: - alerts.append("unique") - if missing_pct > ALERT_HIGH_MISSING_PCT: - alerts.append("high_missing") - elif missing_pct > ALERT_MISSING_PCT: - alerts.append("missing") - - col_profile: Dict[str, Any] = { - "name": col_name, - "type": col_type, - "type_category": category, - "completeness_pct": completeness_pct, - "null_count": null_count, - "unique_count": unique_count, - "unique_pct": unique_pct, - "sample_values": sample_values, - "is_primary_key": is_pk, - "alerts": alerts, - } - - # Type-specific stats - try: - if category == "NUMERIC" and col_name in numeric_batch: - raw = numeric_batch[col_name] - min_val = _round(raw["min"]) - max_val = _round(raw["max"]) - zeros = int(raw["zeros"]) if raw["zeros"] is not None else 0 - negative = int(raw["negative"]) if raw["negative"] is not None else 0 - zeros_pct = _round(100.0 * zeros / non_null) if non_null > 0 else 0.0 - negative_pct = _round(100.0 * negative / non_null) if non_null > 0 else 0.0 - - if zeros_pct > ALERT_ZEROS_PCT and "zeros" not in alerts: - alerts.append("zeros") - - # Histogram (FLOOR-based bucketing, works in all DuckDB versions) - histogram: Dict[str, Any] = {"bins": [], "counts": []} - if min_val is not None and max_val is not None and min_val != max_val: - try: - bin_width = (float(max_val) - float(min_val)) / HISTOGRAM_BINS - bucket_rows = con.execute( - f""" - SELECT - LEAST(FLOOR((CAST({safe_col} AS DOUBLE) - {float(min_val)}) / {bin_width}), {HISTOGRAM_BINS - 1}) + 1 AS bucket, - COUNT(*) AS cnt - FROM {view_name} - WHERE {safe_col} IS NOT NULL - GROUP BY bucket - ORDER BY bucket - """ - ).fetchall() - - bin_labels: List[str] = [] - bin_counts: List[int] = [] - bucket_dict = {int(r[0]): int(r[1]) for r in bucket_rows if r[0] is not None} - for i in range(1, HISTOGRAM_BINS + 1): - lo = float(min_val) + (i - 1) * bin_width - hi = float(min_val) + i * bin_width - bin_labels.append(f"{_format_number(lo)}-{_format_number(hi)}") - bin_counts.append(bucket_dict.get(i, 0)) - histogram = {"bins": bin_labels, "counts": bin_counts} - except Exception as exc: - logger.debug("Histogram failed for column %s: %s", col_name, exc) - - col_profile["numeric_stats"] = { - "min": min_val, - "max": max_val, - "mean": _round(raw["mean"]), - "median": _round(raw["median"]), - "stddev": _round(raw["stddev"]), - "p5": _round(raw["p5"]), - "p25": _round(raw["p25"]), - "p75": _round(raw["p75"]), - "p95": _round(raw["p95"]), - "zeros": zeros, - "zeros_pct": zeros_pct, - "negative": negative, - "negative_pct": negative_pct, - "histogram": histogram, - } - - elif category == "STRING" and col_name in string_batch: - sl = string_batch[col_name] - is_categorical = unique_count <= MAX_CATEGORICAL_DISTINCT - - top_values: List[Dict[str, Any]] = [] - if is_categorical and non_null > 0: - rows = con.execute( - f""" - SELECT {safe_col} AS val, COUNT(*) AS cnt - FROM {view_name} - WHERE {safe_col} IS NOT NULL - GROUP BY {safe_col} - ORDER BY cnt DESC - LIMIT {TOP_VALUES_LIMIT} - """ - ).fetchall() - for row in rows: - pct = _round(100.0 * row[1] / non_null) if non_null > 0 else 0.0 - top_values.append({"value": str(row[0]), "count": row[1], "pct": pct}) - - if top_values and top_values[0]["pct"] > ALERT_IMBALANCE_PCT: - if "imbalance" not in alerts: - alerts.append("imbalance") - else: - if unique_count > ALERT_HIGH_CARDINALITY and "high_cardinality" not in alerts: - alerts.append("high_cardinality") - - col_profile["string_stats"] = { - "min_length": sl["min_length"], - "max_length": sl["max_length"], - "avg_length": sl["avg_length"], - "top_values": top_values, - } - - elif category in ("DATE", "TIMESTAMP") and col_name in date_batch: - dr = date_batch[col_name] - cast_expr = f"CAST({safe_col} AS DATE)" if category == "TIMESTAMP" else safe_col - - # Date histogram (YEAR/QUARTER grouping) - histogram = {"bins": [], "counts": []} - try: - rows = con.execute( - f""" - SELECT - YEAR({cast_expr}) AS yr, - QUARTER({cast_expr}) AS qtr, - COUNT(*) AS cnt - FROM {view_name} - WHERE {safe_col} IS NOT NULL - GROUP BY yr, qtr - ORDER BY yr, qtr - """ - ).fetchall() - histogram["bins"] = [f"{int(r[0])}-Q{int(r[1])}" for r in rows] - histogram["counts"] = [int(r[2]) for r in rows] - except Exception as exc: - logger.debug("Date histogram failed for %s: %s", col_name, exc) - - col_profile["date_stats"] = { - "earliest": dr["earliest"], - "latest": dr["latest"], - "span_days": dr["span_days"], - "histogram": histogram, - } - - if first_date_col is None and dr["earliest"]: - first_date_col = col_profile["date_stats"] - - elif category == "BOOLEAN" and col_name in boolean_batch: - col_profile["boolean_stats"] = boolean_batch[col_name] - - except Exception as exc: - logger.warning("Type-specific stats failed for %s: %s", col_name, exc) - - columns.append(col_profile) - total_null_count += null_count - - # Table-level completeness - avg_completeness = 0.0 - if columns: - avg_completeness = _round( - sum(c["completeness_pct"] for c in columns) / len(columns) - ) - missing_cells_pct = _round(100.0 * total_null_count / total_cells) if total_cells > 0 else 0.0 - - # Duplicate rows (by primary key) - duplicate_rows = 0 - if pk_columns and working_rows > 0: - try: - pk_expr = ", ".join(f'"{c}"' for c in pk_columns) - distinct_pk = con.execute( - f"SELECT COUNT(DISTINCT ({pk_expr})) FROM {view_name}" - ).fetchone()[0] - duplicate_rows = working_rows - distinct_pk - except Exception as exc: - logger.debug("Duplicate check failed: %s", exc) - - # Sample rows - sample_rows: List[Dict[str, Any]] = [] - try: - sample_result = con.execute(f"SELECT * FROM {view_name} LIMIT {SAMPLE_ROWS_LIMIT}") - sample_col_names = [desc[0] for desc in sample_result.description] - for row in sample_result.fetchall(): - sample_rows.append( - {sample_col_names[i]: str(v) if v is not None else None for i, v in enumerate(row)} - ) - except Exception as exc: - logger.debug("Sample rows failed: %s", exc) - - # Aggregate column alerts to table level - table_alerts: List[Dict[str, str]] = [] - alert_messages = { - "constant": "{col} is constant (single value)", - "unique": "{col} has all unique values", - "high_missing": "{col} has {pct}% missing values", - "missing": "{col} has {pct}% missing values", - "imbalance": "{col} is highly imbalanced (top value {pct}%)", - "zeros": "{col} has {pct}% zero values", - "high_cardinality": "{col} has high cardinality ({n} distinct)", - } - for col in columns: - col_alert_name = col.get("name", "") - missing_pct_val = _round(100.0 - col.get("completeness_pct", 100.0)) - for a in col.get("alerts", []): - if a in ("high_missing", "missing"): - msg = alert_messages[a].format(col=col_alert_name, pct=missing_pct_val) - elif a == "imbalance": - top_pct = 0.0 - ss = col.get("string_stats", {}) - tv = ss.get("top_values", []) - if tv: - top_pct = tv[0].get("pct", 0.0) - msg = alert_messages[a].format(col=col_alert_name, pct=top_pct) - elif a == "zeros": - ns = col.get("numeric_stats", {}) - msg = alert_messages[a].format(col=col_alert_name, pct=ns.get("zeros_pct", 0.0)) - elif a == "high_cardinality": - msg = alert_messages[a].format(col=col_alert_name, n=col.get("unique_count", 0)) - else: - msg = alert_messages.get(a, f"{col_alert_name}: {a}").format(col=col_alert_name) - table_alerts.append({"column": col_alert_name, "type": a, "message": msg}) - - # File size - file_size_mb = None - try: - if source_path.is_dir(): - total_bytes = sum(f.stat().st_size for f in source_path.glob("*.parquet")) - elif source_path.exists(): - total_bytes = source_path.stat().st_size - else: - total_bytes = 0 - file_size_mb = _round(total_bytes / (1024 * 1024)) - except OSError: - pass - - # Date range from first date column - date_range = None - if first_date_col: - date_range = { - "earliest": first_date_col.get("earliest"), - "latest": first_date_col.get("latest"), - "span_days": first_date_col.get("span_days"), - } - - con.close() - - return { - "table_name": table_name, - "source_path": str(source_path), - "row_count": total_rows, - "column_count": len(col_info), - "file_size_mb": file_size_mb, - "primary_key": primary_key, - "avg_completeness": avg_completeness, - "missing_cells": total_null_count, - "missing_cells_pct": missing_cells_pct, - "duplicate_rows": duplicate_rows, - "variable_types": variable_types, - "date_range": date_range, - "alerts": table_alerts, - "sampled": sampled, - "columns": columns, - "sample_rows": sample_rows, - } - - -# --------------------------------------------------------------------------- -# HTML report generation -# --------------------------------------------------------------------------- - -_TYPE_COLORS = { - "NUMERIC": "#8b5cf6", - "STRING": "#3b82f6", - "DATE": "#f59e0b", - "TIMESTAMP": "#f59e0b", - "BOOLEAN": "#10b981", -} - -_ALERT_SEVERITY = { - "high_missing": "e", - "missing": "w", - "constant": "i", - "unique": "i", - "imbalance": "w", - "zeros": "w", - "high_cardinality": "i", -} - -_CSS = """ -*{margin:0;padding:0;box-sizing:border-box} -body{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',system-ui,sans-serif; - background:#f8fafc;color:#0f172a;line-height:1.5;font-size:14px} -.wrap{max-width:1200px;margin:0 auto;padding:20px 24px 60px} -header{padding:20px 0 16px;border-bottom:1px solid #e2e8f0;margin-bottom:24px} -h1{font-size:22px;font-weight:700} -.meta{color:#64748b;font-size:12px;margin-top:2px} -.cards{display:grid;grid-template-columns:repeat(auto-fit,minmax(140px,1fr));gap:10px;margin:16px 0} -.card{background:#fff;border-radius:8px;box-shadow:0 1px 3px rgba(0,0,0,.08);padding:14px 16px;text-align:center} -.card-v{font-size:26px;font-weight:700}.card-l{font-size:10px;color:#64748b;text-transform:uppercase;letter-spacing:.05em;margin-top:2px} -.tabs{display:flex;gap:4px;margin-bottom:20px;flex-wrap:wrap} -.tab{padding:7px 14px;border-radius:6px;cursor:pointer;font-size:13px;border:1px solid #e2e8f0;background:#fff;transition:all .15s} -.tab:hover{border-color:#93c5fd}.tab.active{background:#3b82f6;color:#fff;border-color:#3b82f6} -.tsec{display:none}.tsec.active{display:block} -.alerts{margin:12px 0} -.alert{padding:7px 12px;border-radius:6px;margin:3px 0;font-size:12px} -.alert-w{background:#fef3c7;color:#92400e}.alert-e{background:#fee2e2;color:#991b1b}.alert-i{background:#dbeafe;color:#1e40af} -.types{display:flex;gap:6px;margin:10px 0;flex-wrap:wrap} -.tbadge{padding:2px 10px;border-radius:12px;font-size:11px;font-weight:600;color:#fff} -.stitle{font-size:15px;font-weight:600;margin:20px 0 8px} -.col-list{background:#fff;border-radius:8px;box-shadow:0 1px 3px rgba(0,0,0,.08);overflow:hidden} -.col-hdr{display:grid;grid-template-columns:minmax(140px,1.5fr) 56px minmax(100px,1fr) 90px 50px; - align-items:center;padding:8px 14px;cursor:pointer;border-bottom:1px solid #f1f5f9;gap:8px;transition:background .1s} -.col-hdr:hover{background:#f8fafc} -.col-hdr-label{cursor:default;font-weight:600;font-size:11px;color:#64748b;border-bottom-width:2px} -.cn{font-weight:600;font-size:13px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap} -.pk{color:#f59e0b;font-size:10px;font-weight:700;margin-left:3px} -.ct{font-size:10px;padding:2px 6px;border-radius:4px;text-align:center;font-weight:600;color:#fff;white-space:nowrap} -.cbar-bg{height:5px;background:#e2e8f0;border-radius:3px;overflow:hidden;flex:1} -.cbar{height:100%;border-radius:3px} -.compl{display:flex;align-items:center;gap:6px} -.cpct{font-size:11px;color:#64748b;min-width:32px;text-align:right} -.cuniq{font-size:11px;color:#64748b;text-align:right;overflow:hidden;text-overflow:ellipsis;white-space:nowrap} -.calerts span{padding:1px 5px;border-radius:8px;background:#fee2e2;color:#991b1b;font-size:10px} -.col-det{display:none;padding:14px 16px;border-bottom:1px solid #e2e8f0;background:#fafbfc} -.col-det.open{display:block} -.dgrid{display:grid;grid-template-columns:1fr 1fr;gap:16px} -@media(max-width:768px){.dgrid{grid-template-columns:1fr}.col-hdr{grid-template-columns:1fr 50px 1fr 70px 40px;font-size:12px}} -.stbl{font-size:12px;width:100%;border-collapse:collapse} -.stbl td{padding:2px 0}.stbl td:first-child{color:#64748b;padding-right:10px;white-space:nowrap} -.stbl td:last-child{font-weight:500;text-align:right} -.histogram{display:flex;align-items:flex-end;gap:1px;height:72px;margin:10px 0} -.h-bar{flex:1;background:#3b82f6;border-radius:2px 2px 0 0;min-width:3px;transition:background .15s;cursor:default;min-height:1px} -.h-bar:hover{background:#2563eb} -.h-labels{display:flex;justify-content:space-between;font-size:9px;color:#94a3b8;margin-top:2px} -.tvr{display:grid;grid-template-columns:110px 1fr 42px 52px;align-items:center;gap:6px;padding:2px 0;font-size:12px} -.tvl{overflow:hidden;text-overflow:ellipsis;white-space:nowrap} -.tvb-bg{height:7px;background:#e2e8f0;border-radius:4px;overflow:hidden} -.tvb{height:100%;background:#3b82f6;border-radius:4px} -.tvp{text-align:right;color:#64748b;font-size:11px} -.tvc{text-align:right;color:#94a3b8;font-size:10px} -.bbar{display:flex;height:18px;border-radius:4px;overflow:hidden;font-size:10px} -.bt{background:#22c55e;color:#fff;display:flex;align-items:center;justify-content:center} -.bf{background:#e2e8f0;color:#64748b;display:flex;align-items:center;justify-content:center} -.svs{display:flex;gap:4px;flex-wrap:wrap;margin-top:6px} -.sv{background:#f1f5f9;padding:1px 7px;border-radius:4px;font-size:11px;color:#475569} -.swrap{margin-top:20px} -.stog{cursor:pointer;color:#3b82f6;font-size:13px;font-weight:500;user-select:none} -.sdata{display:none;margin-top:8px;overflow-x:auto} -.sdata.open{display:block} -table.dt{border-collapse:collapse;font-size:11px;width:100%} -table.dt th{background:#f1f5f9;padding:5px 8px;text-align:left;font-weight:600;border:1px solid #e2e8f0;white-space:nowrap} -table.dt td{padding:5px 8px;border:1px solid #e2e8f0;max-width:180px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap} -.foot{text-align:center;color:#94a3b8;font-size:11px;margin-top:40px;padding-top:16px;border-top:1px solid #e2e8f0} -@media print{.tabs,.stog{display:none}.tsec,.col-det,.sdata{display:block!important}body{background:#fff}.card{box-shadow:none;border:1px solid #e2e8f0}} -""" - -_JS = """ -function switchTab(n){ - document.querySelectorAll('.tab').forEach(function(t){t.classList.toggle('active',t.dataset.t===n)}); - document.querySelectorAll('.tsec').forEach(function(s){s.classList.toggle('active',s.id==='t-'+n)}); -} -function toggleCol(el){el.nextElementSibling.classList.toggle('open')} -function toggleSample(el){el.nextElementSibling.classList.toggle('open')} -""" - - -def _esc(s: Any) -> str: - return html_mod.escape(str(s)) if s is not None else "" - - -def _slug(name: str) -> str: - return name.replace(" ", "-").replace(".", "-").replace("/", "-") - - -def _fnum(n: Any) -> str: - if n is None: - return "-" - if isinstance(n, float): - if n == int(n) and abs(n) < 1e15: - return f"{int(n):,}" - return f"{n:,.2f}" - if isinstance(n, int): - return f"{n:,}" - return str(n) - - -def _compl_color(pct: float) -> str: - if pct >= 95: - return "#22c55e" - if pct >= 70: - return "#eab308" - return "#ef4444" - - -def _render_hist(bins: list, counts: list) -> str: - if not bins or not counts: - return "" - max_c = max(counts) or 1 - bars = [] - for b, c in zip(bins, counts): - pct = c / max_c * 100 - bars.append(f'
') - return ( - f'
{"".join(bars)}
' - f'
{_esc(bins[0])}{_esc(bins[-1])}
' - ) - - -def _render_top_vals(top_values: list) -> str: - if not top_values: - return "" - max_pct = max((tv.get("pct", 0) for tv in top_values), default=1) or 1 - rows = [] - for tv in top_values: - bar_w = tv.get("pct", 0) / max_pct * 100 - rows.append( - f'
' - f'{_esc(str(tv["value"])[:30])}' - f'
' - f'{tv.get("pct", 0)}%' - f'({_fnum(tv.get("count", 0))})' - f'
' - ) - return "".join(rows) - - -def _render_col_detail(col: dict) -> str: - parts: List[str] = [] - ns = col.get("numeric_stats") - if ns: - parts.append('
') - for label, key in [ - ("Min", "min"), ("Max", "max"), ("Mean", "mean"), - ("Median", "median"), ("Std Dev", "stddev"), - ("P5", "p5"), ("P25", "p25"), ("P75", "p75"), ("P95", "p95"), - ("Zeros", "zeros"), ("Zeros %", "zeros_pct"), - ("Negative", "negative"), ("Negative %", "negative_pct"), - ]: - parts.append(f'') - parts.append('
{label}{_fnum(ns.get(key))}
') - h = ns.get("histogram", {}) - parts.append(_render_hist(h.get("bins", []), h.get("counts", []))) - parts.append('
') - - ss = col.get("string_stats") - if ss: - parts.append('') - parts.append(f'') - parts.append(f'') - parts.append(f'') - parts.append('
Min length{_fnum(ss.get("min_length"))}
Max length{_fnum(ss.get("max_length"))}
Avg length{_fnum(ss.get("avg_length"))}
') - tv = ss.get("top_values", []) - if tv: - parts.append('
Top Values
') - parts.append(_render_top_vals(tv)) - - ds = col.get("date_stats") - if ds: - parts.append('
') - parts.append(f'') - parts.append(f'') - parts.append(f'') - parts.append('
Earliest{_esc(ds.get("earliest", "-"))}
Latest{_esc(ds.get("latest", "-"))}
Span{_fnum(ds.get("span_days"))} days
') - h = ds.get("histogram", {}) - parts.append(_render_hist(h.get("bins", []), h.get("counts", []))) - parts.append('
') - - bs = col.get("boolean_stats") - if bs: - tc, fc = bs.get("true_count", 0), bs.get("false_count", 0) - tp = bs.get("true_pct", 0) - fp = round(100 - tp, 1) if tp else 0 - parts.append( - f'
' - f'
True {tp}% ({tc:,})
' - f'
False {fp}% ({fc:,})
' - f'
' - ) - - sv = col.get("sample_values", []) - if sv: - parts.append('
Sample values:
') - parts.append('
') - for v in sv: - parts.append(f'{_esc(str(v)[:50])}') - parts.append('
') - - return "".join(parts) - - -def generate_html_report(profile_data: Dict[str, Any], output_path: Path) -> None: - """Generate a standalone HTML report from profile data. - - Args: - profile_data: Full profile dict with "tables" key. - output_path: Path to write the HTML file. - """ - tables = profile_data.get("tables", {}) - generated_at = profile_data.get("generated_at", "") - if not tables: - logger.warning("No tables in profile data") - return - - total_tables = len(tables) - total_rows = sum(t.get("row_count", 0) for t in tables.values()) - total_cols = sum(t.get("column_count", 0) for t in tables.values()) - compl_vals = [t.get("avg_completeness", 0) for t in tables.values()] - avg_compl = round(sum(compl_vals) / len(compl_vals), 1) if compl_vals else 0 - total_alerts = sum(len(t.get("alerts", [])) for t in tables.values()) - table_names = list(tables.keys()) - - h: List[str] = [] - h.append('') - h.append('') - h.append('Data Profile Report') - h.append(f'
') - - # Header - h.append('
') - h.append('

Data Profile Report

') - h.append(f'
Generated: {_esc(generated_at)}
') - h.append('
') - - # Summary cards - h.append('
') - for val, label in [ - (_fnum(total_tables), "Tables"), - (_fnum(total_rows), "Total Rows"), - (_fnum(total_cols), "Total Columns"), - (f"{avg_compl}%", "Avg Completeness"), - (_fnum(total_alerts), "Alerts"), - ]: - h.append(f'
{val}
{label}
') - h.append('
') - - # Table tabs - if total_tables > 1: - h.append('
') - for i, name in enumerate(table_names): - act = " active" if i == 0 else "" - sl = _slug(name) - h.append(f'
{_esc(name)}
') - h.append('
') - - # Table sections - for i, (name, tbl) in enumerate(tables.items()): - act = " active" if i == 0 or total_tables == 1 else "" - sl = _slug(name) - h.append(f'
') - h.append(f'

{_esc(name)}

') - - # Stat cards - h.append('
') - rc = tbl.get("row_count", 0) - cc = tbl.get("column_count", 0) - tc = tbl.get("avg_completeness", 0) - sz = tbl.get("file_size_mb") - dupes = tbl.get("duplicate_rows", 0) - sampled = tbl.get("sampled", False) - for val, label in [ - (_fnum(rc), "Rows"), - (_fnum(cc), "Columns"), - (f"{tc}%", "Completeness"), - (f"{sz} MB" if sz is not None else "-", "File Size"), - ]: - h.append(f'
{val}
{label}
') - dr = tbl.get("date_range") - if dr and dr.get("earliest"): - h.append( - f'
' - f'{_esc(dr["earliest"])} — {_esc(dr["latest"])}
' - f'
Date Range ({_fnum(dr.get("span_days"))} days)
' - ) - if dupes: - h.append(f'
{_fnum(dupes)}
Duplicate Rows
') - if sampled: - h.append(f'
Sampled
500K rows
') - h.append('
') - - # Variable types - vt = tbl.get("variable_types", {}) - if vt: - h.append('
') - for cat, cnt in sorted(vt.items()): - color = _TYPE_COLORS.get(cat, "#6b7280") - h.append(f'{cat} {cnt}') - h.append('
') - - # Alerts - alerts = tbl.get("alerts", []) - if alerts: - h.append('
') - for a in alerts: - sev = _ALERT_SEVERITY.get(a.get("type", ""), "i") - h.append(f'
{_esc(a.get("message", ""))}
') - h.append('
') - - # Column list - columns = tbl.get("columns", []) - if columns: - h.append('
Columns
') - h.append('
') - # Header row - h.append('
') - h.append('
Name
Type
') - h.append('
Completeness
') - h.append('
Unique
') - h.append('
') - - for col in columns: - cname = col.get("name", "") - cat = col.get("type_category", "STRING") - ctype = col.get("type", "") - cpct = col.get("completeness_pct", 0) - uniq = col.get("unique_count", 0) - upct = col.get("unique_pct", 0) - ca = col.get("alerts", []) - is_pk = col.get("is_primary_key", False) - color = _TYPE_COLORS.get(cat, "#6b7280") - cc_col = _compl_color(cpct) - pk_html = 'PK' if is_pk else "" - alert_html = f'{len(ca)}' if ca else "" - - h.append('
') - h.append(f'
{_esc(cname)}{pk_html}
') - h.append(f'
{_esc(cat[:4])}
') - h.append(f'
{cpct}%
') - h.append(f'
{_fnum(uniq)} ({upct}%)
') - h.append(f'
{alert_html}
') - h.append('
') - h.append(f'
{_render_col_detail(col)}
') - - h.append('
') - - # Sample data - sample_rows = tbl.get("sample_rows", []) - if sample_rows: - h.append('
') - h.append(f'
▶ Sample Data ({len(sample_rows)} rows)
') - h.append('
') - headers = list(sample_rows[0].keys()) - h.append('' + ''.join(f'' for hd in headers) + '') - for row in sample_rows: - h.append('' + ''.join( - f'' - for hd in headers - ) + '') - h.append('
{_esc(hd)}
{_esc(str(row.get(hd, ""))[:60])}
') - - h.append('
') - - # Footer + JS - h.append('
Generated by Standalone Data Profiler
') - h.append(f'') - h.append('
') - - output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_text("\n".join(h), encoding="utf-8") - logger.info("Wrote HTML report: %s", output_path) - - -# --------------------------------------------------------------------------- -# CLI -# --------------------------------------------------------------------------- -def main() -> None: - parser = argparse.ArgumentParser( - description="Profile Parquet/CSV files and output JSON statistics + optional HTML report.", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - %(prog)s data/orders.parquet - %(prog)s data/orders.parquet --primary-key order_id --html - %(prog)s data/orders.parquet data/customers.csv -o profiles.json --html - %(prog)s --from-json profile.json - """, - ) - parser.add_argument( - "files", - nargs="*", - help="Parquet file(s), directory of Parquet files, or CSV file(s) to profile", - ) - parser.add_argument( - "-o", "--output", - default="profile.json", - help="Output JSON file path (default: profile.json)", - ) - parser.add_argument( - "--primary-key", - default=None, - help="Comma-separated primary key column(s) for duplicate detection", - ) - parser.add_argument( - "--html", - action="store_true", - help="Also generate a standalone HTML report", - ) - parser.add_argument( - "--from-json", - metavar="PATH", - default=None, - help="Generate HTML report from existing profile JSON (no profiling)", - ) - parser.add_argument( - "--quiet", "-q", - action="store_true", - help="Suppress info logging", - ) - args = parser.parse_args() - - if args.quiet: - logging.getLogger("profiler").setLevel(logging.WARNING) - - # Mode 1: Generate HTML from existing JSON - if args.from_json: - json_path = Path(args.from_json) - if not json_path.exists(): - logger.error("File not found: %s", json_path) - sys.exit(1) - with open(json_path) as f: - profile_data = json.load(f) - html_path = json_path.with_suffix(".html") - generate_html_report(profile_data, html_path) - logger.info("Done: HTML report at %s", html_path) - return - - # Mode 2: Profile files - if not args.files: - parser.error("Provide files to profile, or use --from-json") - - profiles: Dict[str, Any] = {} - success = 0 - errors = 0 - - for file_path_str in args.files: - file_path = Path(file_path_str) - if not file_path.exists(): - logger.error("File not found: %s", file_path) - errors += 1 - continue - - try: - logger.info("Profiling %s ...", file_path) - profile = profile_table( - source_path=file_path, - primary_key=args.primary_key, - ) - profiles[profile["table_name"]] = profile - success += 1 - logger.info( - " %s: %d rows, %d cols, %d alerts", - profile["table_name"], - profile["row_count"], - profile["column_count"], - len(profile["alerts"]), - ) - except Exception as exc: - logger.error("Failed to profile %s: %s", file_path, exc) - errors += 1 - - if not profiles: - logger.error("No tables profiled successfully") - sys.exit(1) - - output = { - "generated_at": datetime.now(timezone.utc).isoformat(), - "version": "1.0", - "tables": profiles, - } - - output_path = Path(args.output) - write_json_atomic(output_path, output) - - # Generate HTML if requested - if args.html: - html_path = output_path.with_suffix(".html") - generate_html_report(output, html_path) - - logger.info("Done: %d profiled, %d errors. Output: %s", success, errors, output_path) - - -if __name__ == "__main__": - main() diff --git a/scripts/sync_config_template.yaml b/scripts/sync_config_template.yaml deleted file mode 100644 index 47c9358..0000000 --- a/scripts/sync_config_template.yaml +++ /dev/null @@ -1,23 +0,0 @@ -# Data Analyst - Sync Configuration -# -# This file controls which additional datasets are synchronized. -# Core telemetry data is ALWAYS synced (projects, users, jobs, etc.) -# -# To enable a dataset: change false -> true -# To disable a dataset: change true -> false -# -# After changing, run: bash server/scripts/sync_data.sh - -datasets: - # Jira Support tickets (SUPPORT project) - # Size: ~50MB (parquet only), Updated: real-time via webhooks - # Recommended for: Support team, Customer Success - jira: false - - # Jira attachment files (images, logs, etc.) - # Size: ~500MB+, requires jira: true - # Only enable if you need to view actual attachment files locally - jira_attachments: false - - # Future datasets: - # finance: false # Investor reporting (coming soon) diff --git a/scripts/test_sync.sh b/scripts/test_sync.sh deleted file mode 100755 index 343bfb9..0000000 --- a/scripts/test_sync.sh +++ /dev/null @@ -1,138 +0,0 @@ -#!/bin/bash -# Test rsync reliability fixes (Issue #197) -# -# Downloads data into gitignored data/test_sync/ to verify rsync_reliable -# wrapper works end-to-end without touching repo directories. -# -# Usage: -# bash scripts/test_sync.sh # Full test sync -# bash scripts/test_sync.sh --dry-run # Preview only - -set -e - -DRY_RUN="" -for arg in "$@"; do - case "$arg" in - --dry-run) DRY_RUN="--dry-run" ;; - esac -done - -# --- Rsync reliability settings (same as sync_data.sh) --- -RSYNC_SSH_OPTS='ssh -o ServerAliveInterval=60 -o ServerAliveCountMax=3 -o ConnectTimeout=30' -RSYNC_TIMEOUT=300 -RSYNC_MAX_RETRIES=3 -RSYNC_RETRY_DELAY=5 - -rsync_reliable() { - local attempt=1 - local delay=$RSYNC_RETRY_DELAY - while [[ $attempt -le $RSYNC_MAX_RETRIES ]]; do - rsync -e "$RSYNC_SSH_OPTS" --timeout="$RSYNC_TIMEOUT" \ - --partial-dir=.rsync-partial "$@" && return 0 - local exit_code=$? - # Exit codes 23/24 = partial transfer (permission denied, vanished files) — not retryable - if [[ $exit_code -eq 23 || $exit_code -eq 24 ]]; then - echo " Warning: rsync partial transfer (exit $exit_code), continuing..." - return 0 - fi - if [[ $attempt -lt $RSYNC_MAX_RETRIES ]]; then - echo " Rsync failed (exit $exit_code), retrying in ${delay}s (attempt $attempt/$RSYNC_MAX_RETRIES)..." - sleep "$delay" - delay=$((delay * 2)) - fi - attempt=$((attempt + 1)) - done - echo " ERROR: Rsync failed after $RSYNC_MAX_RETRIES attempts" - return 1 -} - -# --- Test destination (gitignored) --- -DEST="./data/test_sync" -LOG_DIR="./data/sync_diagnostics" -mkdir -p "$DEST" "$LOG_DIR" -LOG_FILE="$LOG_DIR/test_sync_$(date '+%Y%m%d_%H%M%S').log" - -log() { - echo "$@" | tee -a "$LOG_FILE" -} - -log "=== Rsync reliability test ===" -log "Started: $(date -u '+%Y-%m-%dT%H:%M:%SZ')" -log "Destination: $DEST" -log "SSH opts: $RSYNC_SSH_OPTS" -log "Timeout: ${RSYNC_TIMEOUT}s, Retries: $RSYNC_MAX_RETRIES" -log "" - -# --- Test 1: SSH connectivity --- -log "--- Test 1: SSH connectivity ---" -START=$(date +%s) -if ssh -o ConnectTimeout=10 data-analyst "echo ok" >> "$LOG_FILE" 2>&1; then - ELAPSED=$(($(date +%s) - START)) - log "PASS (${ELAPSED}s)" -else - ELAPSED=$(($(date +%s) - START)) - log "FAIL (${ELAPSED}s) — cannot reach server, aborting" - exit 1 -fi -log "" - -# --- Test 2: Small directory (docs) --- -log "--- Test 2: Sync docs (small, text, -avz) ---" -mkdir -p "$DEST/docs" -START=$(date +%s) -rsync_reliable -avz --delete $DRY_RUN "data-analyst:server/docs/" "$DEST/docs/" >> "$LOG_FILE" 2>&1 -ELAPSED=$(($(date +%s) - START)) -if [[ -z "$DRY_RUN" ]]; then - DOC_COUNT=$(find "$DEST/docs" -type f 2>/dev/null | wc -l | tr -d ' ') - DOC_SIZE=$(du -sh "$DEST/docs" 2>/dev/null | cut -f1) - log "PASS (${ELAPSED}s) — $DOC_COUNT files, $DOC_SIZE" -else - log "PASS dry-run (${ELAPSED}s)" -fi -log "" - -# --- Test 3: Scripts --- -log "--- Test 3: Sync scripts (small, text, -avz) ---" -mkdir -p "$DEST/scripts" -START=$(date +%s) -rsync_reliable -avz --delete $DRY_RUN "data-analyst:server/scripts/" "$DEST/scripts/" >> "$LOG_FILE" 2>&1 -ELAPSED=$(($(date +%s) - START)) -if [[ -z "$DRY_RUN" ]]; then - SCRIPT_COUNT=$(find "$DEST/scripts" -type f 2>/dev/null | wc -l | tr -d ' ') - log "PASS (${ELAPSED}s) — $SCRIPT_COUNT files" -else - log "PASS dry-run (${ELAPSED}s)" -fi -log "" - -# --- Test 4: Core parquet (the big one — no -z, with keepalive) --- -log "--- Test 4: Sync core parquet (large, binary, -av WITHOUT -z) ---" -log "This is the transfer that was hanging. Expect ~7000 files..." -mkdir -p "$DEST/parquet" -START=$(date +%s) -rsync_reliable -av --delete --progress \ - --exclude='jira/' --exclude='kbc_telemetry_expert/' \ - $DRY_RUN data-analyst:server/parquet/ "$DEST/parquet/" 2>&1 | \ - tee -a "$LOG_FILE" | \ - grep -E '(^sent |^total size|^rsync|Warning:|ERROR:)' || true -ELAPSED=$(($(date +%s) - START)) -if [[ -z "$DRY_RUN" ]]; then - PARQUET_COUNT=$(find "$DEST/parquet" -name '*.parquet' -type f 2>/dev/null | wc -l | tr -d ' ') - PARQUET_SIZE=$(du -sh "$DEST/parquet" 2>/dev/null | cut -f1) - log "DONE (${ELAPSED}s) — $PARQUET_COUNT parquet files, $PARQUET_SIZE" -else - log "DONE dry-run (${ELAPSED}s)" -fi -log "" - -# --- Summary --- -log "=== Summary ===" -log "Finished: $(date -u '+%Y-%m-%dT%H:%M:%SZ')" -if [[ -z "$DRY_RUN" ]]; then - TOTAL_SIZE=$(du -sh "$DEST" 2>/dev/null | cut -f1) - TOTAL_FILES=$(find "$DEST" -type f 2>/dev/null | wc -l | tr -d ' ') - log "Total: $TOTAL_FILES files, $TOTAL_SIZE in $DEST" -fi -log "Full log: $LOG_FILE" -log "" -log "To clean up test data: rm -rf $DEST" diff --git a/scripts/update.sh b/scripts/update.sh deleted file mode 100755 index 88a2d68..0000000 --- a/scripts/update.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash - -# update.sh - Data synchronization script -# -# This script performs: -# 1. Data synchronization from configured data source -# 2. DuckDB views reinitialization -# -# Note: Git pull and dependency updates are handled by deploy.sh (GitHub Actions) - -set -e # Exit on error - -echo "🔄 AI Data Analyst - Data Update" -echo "" - -# Check that we're in the correct folder (same check as config.py uses) -if [ ! -f "docs/data_description.md" ]; then - echo "❌ Run script from project root (folder with docs/data_description.md)" - exit 1 -fi - -# Note: Git pull and dependency updates are handled by deploy.sh (GitHub Actions) -# This script focuses only on data synchronization - -# Activate virtual environment -# Supports both local (./.venv) and server (/opt/data-analyst/.venv) setups -echo "" -echo "1️⃣ Activating virtual environment..." -if [ -d ".venv" ]; then - source .venv/bin/activate - echo " ✅ Virtual environment activated (local)" -elif [ -d "/opt/data-analyst/.venv" ]; then - source /opt/data-analyst/.venv/bin/activate - echo " ✅ Virtual environment activated (server)" -else - echo " ❌ Virtual environment not found. Run init.sh first." - exit 1 -fi - -# Data synchronization -echo "" -echo "2️⃣ Synchronizing data..." -echo "" - -# Run data sync -if python3 -m src.data_sync; then - echo "" - echo " ✅ Data synchronization complete" -else - echo "" - echo " ❌ Data synchronization failed. Check logs above." - exit 1 -fi - -# Generate data profiles (for catalog profiler) -echo "" -echo "3️⃣ Generating data profiles..." -if python3 -m src.profiler; then - echo " ✅ Data profiles generated" -else - echo " ⚠️ Data profiling failed (non-fatal). Check logs above." - # Non-fatal: profiling failure should not break the pipeline -fi - -# Done -echo "" -echo "✅ Data sync complete!" -echo "" -echo "💡 Parquet files are ready in data/parquet/" -echo " To setup DuckDB views, run: ./scripts/setup_views.sh" -echo ""