diff --git a/config/data_description.md.example b/config/data_description.md.example
deleted file mode 100644
index 033c25c..0000000
--- a/config/data_description.md.example
+++ /dev/null
@@ -1,164 +0,0 @@
-# Data Description
-
-This file defines the tables available for synchronization and analysis.
-Copy this file to `data_description.md` and customize for your data sources.
-
-## Tables
-
-```yaml
-# Folder mapping: data source bucket -> local folder name
-folder_mapping:
- "in.c-example": "example"
-
-tables:
- # Small reference table - full refresh, no automatic sync
- - id: "in.c-example.customers"
- name: "customers"
- description: "Customer master data"
- primary_key: "id"
- sync_strategy: "full_refresh"
-
- # Large transactional table - daily automatic sync with profiling
- - id: "in.c-example.orders"
- name: "orders"
- description: "Order transactions with line items"
- primary_key: "id"
- sync_strategy: "partitioned"
- partition_by: "created_at"
- partition_column_type: "DATE"
- partition_granularity: "day"
- incremental_window_days: 3
- max_history_days: 450
- query_mode: "local"
- sync_schedule: "daily 05:00"
- profile_after_sync: true
-
- # Frequently updated table - sync every hour, skip profiling
- - id: "in.c-example.events"
- name: "events"
- description: "Real-time event stream"
- primary_key: "event_id"
- sync_strategy: "partitioned"
- partition_by: "event_date"
- partition_column_type: "DATE"
- partition_granularity: "day"
- incremental_window_days: 1
- query_mode: "local"
- sync_schedule: "every 1h"
- profile_after_sync: false
-
- # Remote table - too large for local sync, queried via BigQuery
- - id: "project.dataset.page_views"
- name: "page_views"
- description: "Page-level traffic metrics (~5M rows/day)"
- primary_key: "page_view_id"
- sync_strategy: "partitioned"
- partition_by: "event_date"
- partition_column_type: "DATE"
- partition_granularity: "day"
- query_mode: "remote"
- bq_entity_type: "view"
-```
-
-## Table Configuration Reference
-
-### Required Fields
-
-| Field | Description | Example |
-|-------|-------------|---------|
-| `id` | Full table identifier in data source | `"in.c-crm.company"` |
-| `name` | Short name (used for Parquet filenames) | `"company"` |
-| `description` | Human-readable description | `"Company master data"` |
-| `primary_key` | Primary key column(s), comma-separated | `"id"` or `"order_id, line_id"` |
-| `sync_strategy` | How data is downloaded (see below) | `"full_refresh"` |
-
-### Sync Strategy
-
-| Strategy | Description | Use for |
-|----------|-------------|---------|
-| `full_refresh` | Downloads entire table each sync | Small reference tables (< 100K rows) |
-| `incremental` | Downloads changed rows via changedSince | Medium tables with update tracking |
-| `partitioned` | Downloads by time partitions, overwrites only recent ones | Large tables with date column |
-
-### Partitioning
-
-| Field | Default | Description |
-|-------|---------|-------------|
-| `partition_by` | *(none)* | Column to partition by (e.g., `"created_at"`, `"event_date"`) |
-| `partition_granularity` | `"month"` | `"day"`, `"month"`, or `"year"` |
-| `partition_column_type` | `"TIMESTAMP"` | SQL type: `"DATE"`, `"TIMESTAMP"`, or `"DATETIME"` |
-| `incremental_window_days` | `7` | How many recent days to re-download on each sync |
-| `max_history_days` | *(all)* | Maximum history to keep (e.g., `450` for ~15 months) |
-| `initial_load_chunk_days` | `30` | Chunk size for first-time download |
-
-### Query Mode
-
-| Field | Default | Description |
-|-------|---------|-------------|
-| `query_mode` | `"local"` | How the AI agent queries this table |
-| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) |
-
-| Mode | Description | Best for |
-|------|-------------|----------|
-| `local` | Synced to Parquet, queried via DuckDB | Tables < 2 GB, fast queries |
-| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data |
-| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data |
-
-### Remote Table Metadata
-
-Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent
-write safe, efficient queries:
-
-| Field | Description |
-|-------|-------------|
-| `grain` | Row granularity description |
-| `volume` | Size estimates (rows_per_day, unique entities) |
-| `columns` | Column descriptions with value distributions |
-| `dimension_profile` | Cardinality per dimension |
-| `query_result_estimates` | Expected rows after GROUP BY combinations |
-| `join_keys` | How to join with other tables |
-
-### Automatic Sync Schedule
-
-| Field | Default | Description |
-|-------|---------|-------------|
-| `sync_schedule` | *(none)* | When to automatically sync this table |
-| `profile_after_sync` | `true` | Run data profiler after sync completes |
-
-The `sync_schedule` field controls automatic synchronization via the `data-refresh`
-systemd timer (runs every 15 minutes). If omitted, the table is only synced manually.
-
-**Schedule formats:**
-
-| Format | Example | Description |
-|--------|---------|-------------|
-| `every {N}m` | `"every 15m"`, `"every 30m"` | Sync every N minutes |
-| `every {N}h` | `"every 1h"`, `"every 6h"` | Sync every N hours |
-| `daily HH:MM` | `"daily 05:00"`, `"daily 17:30"` | Sync once per day at HH:MM UTC |
-| *(omitted)* | - | Manual sync only (`python -m src.data_sync`) |
-
-**How scheduling works:**
-- A systemd timer runs `python -m src.data_sync --scheduled` every 15 minutes
-- For each table with `sync_schedule`, it checks the last sync time from `sync_state.json`
-- `every` schedules: syncs if enough time has elapsed since last sync
-- `daily` schedules: syncs once after the target time passes (skips if already synced today)
-- Tables without `sync_schedule` are never synced automatically
-
-**Profiling control:**
-- `profile_after_sync: true` (default) - runs profiler after sync to update column statistics
-- `profile_after_sync: false` - skips profiler (use for frequently synced tables where
- profiling overhead is not worth it; the AI agent uses slightly older statistics)
-- When profiling runs, the webapp is automatically restarted to load new statistics
-
-### Optional Fields
-
-| Field | Default | Description |
-|-------|---------|-------------|
-| `folder` | *(from folder_mapping)* | Override output folder name |
-| `row_filter` | *(none)* | SQL WHERE clause (e.g., `"date >= DATE_SUB(CURRENT_DATE(), INTERVAL 15 MONTH)"`) |
-| `columns` | *(all)* | List of columns to sync (subset) |
-| `incremental_column` | *(none)* | Column for timestamp-based incremental sync (BigQuery) |
-| `dataset` | *(none)* | Dataset group name for on-demand tables |
-| `catalog_fqn` | *(auto)* | OpenMetadata FQN override (auto-derived from table ID if not set) |
-| `foreign_keys` | `[]` | List of foreign key relationships |
-| `where_filters` | `[]` | List of filters for Keboola Storage API |
diff --git a/dev_docs/plan-corporate-memory.md b/dev_docs/plan-corporate-memory.md
deleted file mode 100644
index 4eb65f8..0000000
--- a/dev_docs/plan-corporate-memory.md
+++ /dev/null
@@ -1,348 +0,0 @@
-# Corporate Memory Module - Implementation Plan
-
-## Overview
-
-Server-side modul, který každých 30 minut sbírá znalosti z `CLAUDE.local.md` všech uživatelů, pomocí Claude HAIKU je extrahuje a filtruje, a vytváří sdílenou firemní knowledge base. Uživatelé mohou hlasovat a oblíbené znalosti se jim syncují do `.claude/rules/`.
-
-## Architecture
-
-```
-┌─────────────────────────────────────────────────────────────────┐
-│ Cron (*/30 * * * *) │
-│ └── /usr/local/bin/collect-knowledge │
-│ 1. Čte /home/*/CLAUDE.local.md │
-│ 2. Volá Claude HAIKU pro extrakci (AI filtering) │
-│ 3. Ukládá do /data/corporate-memory/knowledge.json │
-└─────────────────────────────────────────────────────────────────┘
- │
- ▼
-┌─────────────────────────────────────────────────────────────────┐
-│ /data/corporate-memory/ (deploy:data-ops 2770)│
-│ ├── knowledge.json # {items: {id: {title, content, ...}}} │
-│ ├── votes.json # {username: {item_id: 1|-1}} │
-│ ├── user_hashes.json # {username: {file_hash, last_proc}} │
-│ └── collection.log # Logy z HAIKU procesingu │
-└─────────────────────────────────────────────────────────────────┘
- │
- ▼
-┌─────────────────────────────────────────────────────────────────┐
-│ Webapp │
-│ ├── Dashboard widget: stats (contributors, items, your rules) │
-│ ├── /corporate-memory: sub-page s 👍/👎 hlasováním │
-│ └── API: /api/corporate-memory/* │
-└─────────────────────────────────────────────────────────────────┘
- │
- ▼
-┌─────────────────────────────────────────────────────────────────┐
-│ sync_data.sh │
-│ └── Stáhne upvoted items do .claude/rules/*.md │
-└─────────────────────────────────────────────────────────────────┘
-```
-
-## Data Model
-
-**knowledge.json:**
-```json
-{
- "items": {
- "km_abc123": {
- "id": "km_abc123",
- "title": "DuckDB query optimization",
- "content": "When querying large parquet files...",
- "category": "data_analysis",
- "tags": ["duckdb", "performance"],
- "source_users": ["john.doe", "jane.smith", "bob"],
- "votes_up": 5,
- "votes_down": 1,
- "score": 4,
- "extracted_at": "2026-02-05T10:30:00Z",
- "updated_at": "2026-02-05T12:00:00Z"
- }
- },
- "metadata": {
- "last_collection": "2026-02-05T10:30:00Z",
- "total_users": 8
- }
-}
-```
-
-**votes.json** (kdo jak hlasoval - pro UI "already voted"):
-```json
-{
- "john.doe": {"km_abc123": 1, "km_def456": -1},
- "jane.smith": {"km_abc123": 1}
-}
-```
-
-### Voting & Popularity
-
-- `votes_up` / `votes_down` - počet palců nahoru/dolů (viditelné v UI)
-- `score` = votes_up - votes_down (pro řazení)
-- votes.json trackuje KDO hlasoval (pro zabránění dvojího hlasování)
-- UI NEZOBRAZUJE jména hlasujících, jen počty
-
-**Řazení v UI:**
-- **Nejoblíbenější** (Most Popular): ORDER BY score DESC
-- **Nejméně oblíbené** (Least Popular): ORDER BY score ASC
-- **Nejnovější** (Recent): ORDER BY extracted_at DESC
-- **Nejvíc přispěvatelů** (Most Contributors): ORDER BY len(source_users) DESC
-
-**user_hashes.json** (tracking změn CLAUDE.local.md):
-```json
-{
- "john.doe": {
- "file_hash": "a1b2c3d4e5f6...",
- "last_processed": "2026-02-05T10:30:00Z"
- },
- "jane.smith": {
- "file_hash": "f6e5d4c3b2a1...",
- "last_processed": "2026-02-05T09:00:00Z"
- }
-}
-```
-
-## Change Detection & Deduplication
-
-### Change Detection (šetření tokenů)
-
-**DŮLEŽITÉ**: Každých 30 minut se zpracovávají POUZE změněné soubory!
-
-```
-┌─────────────────────────────────────────────────────────────────┐
-│ Cron běží každých 30 min │
-│ │
-│ Pro každého uživatele: │
-│ 1. Spočítej MD5 hash /home/$user/CLAUDE.local.md │
-│ 2. Porovnej s uloženým hashem v user_hashes.json │
-│ 3. Pokud STEJNÝ → PŘESKOČ (žádné API volání) │
-│ 4. Pokud ZMĚNĚNÝ → zpracuj pomocí HAIKU API │
-│ │
-│ Typicky: 8 uživatelů, 1-2 změny denně = 2-4 API volání/den │
-└─────────────────────────────────────────────────────────────────┘
-```
-
-```python
-def collect_all():
- users_processed = 0
- users_skipped = 0
-
- for user_dir in Path("/home").iterdir():
- claude_file = user_dir / "CLAUDE.local.md"
- if not claude_file.exists():
- continue
-
- username = user_dir.name
- current_hash = hashlib.md5(claude_file.read_bytes()).hexdigest()
- stored_hash = user_hashes.get(username, {}).get("file_hash")
-
- if current_hash == stored_hash:
- users_skipped += 1
- continue # ← PŘESKOČÍ - žádné API volání!
-
- # Pouze změněné soubory volají HAIKU
- new_knowledge = extract_knowledge(claude_file.read_text())
- deduplicate_and_merge(new_knowledge, username)
- update_user_hash(username, current_hash)
- users_processed += 1
-
- log(f"Processed: {users_processed}, Skipped: {users_skipped}")
-```
-
-### Deduplikace znalostí
-
-Při extrakci nových znalostí od změněného uživatele:
-1. HAIKU vrátí seznam znalostí
-2. Pro každou novou znalost:
- - Hledej podobnou v CELÉ knowledge base (všichni uživatelé)
- - Pokud existuje podobná → přidej uživatele do `source_users[]`
- - Pokud neexistuje → vytvoř novou
-
-```python
-def deduplicate_and_merge(new_items: list, username: str):
- """Deduplikuje nové znalosti proti celé knowledge base."""
- for new_item in new_items:
- similar_id = find_similar_knowledge(new_item, knowledge_base)
-
- if similar_id:
- # Existující znalost - přidej uživatele
- knowledge_base["items"][similar_id]["source_users"].append(username)
- knowledge_base["items"][similar_id]["updated_at"] = now()
- else:
- # Nová znalost
- item_id = generate_id()
- knowledge_base["items"][item_id] = {
- **new_item,
- "id": item_id,
- "source_users": [username],
- "extracted_at": now()
- }
-```
-
-### Příklad deduplikace
-
-5 uživatelů má podobnou znalost o DuckDB:
-- User A: "Pro rychlé dotazy v DuckDB použij WHERE před JOIN"
-- User B: "DuckDB je rychlejší když filtruješ před joinem"
-- User C: "Filtruj data před JOIN operací v DuckDB"
-- User D: "WHERE klauzule před JOIN zrychlí DuckDB"
-- User E: "V DuckDB dej WHERE před JOIN"
-
-→ HAIKU rozpozná jako JEDNU znalost → `source_users: ["user_a", "user_b", "user_c", "user_d", "user_e"]`
-
-## Files to Create
-
-### 1. Server-side Collector
-
-**`services/corporate_memory/collector.py`** - Main collection logic:
-- `collect_all()` - iteruje přes /home/*/CLAUDE.local.md
-- `should_process_user(username)` - kontrola hash změn (šetří tokeny)
-- `extract_knowledge(content)` - volá HAIKU API
-- `find_similar_knowledge(new, existing)` - hledá duplicity
-- `merge_knowledge(existing, new, username)` - sloučí a přidá uživatele
-- `save_knowledge(data)` - atomic JSON write
-- `update_user_hash(username, hash)` - uloží hash pro příští run
-
-**`services/corporate_memory/prompts.py`** - HAIKU prompts:
-- Prompt pro extrakci znalostí
-- Prompt pro AI filtering citlivých dat
-- Prompt pro merge podobných znalostí (optional)
-
-**`server/bin/collect-knowledge`** - Shell wrapper:
-```bash
-#!/bin/bash
-cd /opt/data-analyst/repo
-/opt/data-analyst/.venv/bin/python -m services.corporate_memory
-```
-
-**`services/corporate_memory/systemd/corporate-memory.timer`** + **`services/corporate_memory/systemd/corporate-memory.service`** - Systemd timer (30 min)
-
-### 2. Webapp Backend
-
-**`webapp/corporate_memory_service.py`** (pattern: telegram_service.py):
-- `get_knowledge(filter, page)` - seznam znalostí
-- `get_stats()` - pro dashboard widget
-- `vote(username, item_id, vote)` - +1/-1
-- `get_user_rules(username)` - upvoted items pro sync
-- `generate_user_rules_file(username)` - vytvoří JSON pro sync
-
-**`webapp/app.py`** - nové routes:
-- `GET /corporate-memory` - sub-page
-- `GET /api/corporate-memory/knowledge` - list items
-- `GET /api/corporate-memory/stats` - dashboard stats
-- `POST /api/corporate-memory/vote` - hlasování
-- `GET /api/corporate-memory/my-rules` - user's upvoted
-
-### 3. Webapp Frontend
-
-**`webapp/templates/corporate_memory.html`** - sub-page:
-- Header se stats
-- Řazení: Most Popular | Least Popular | Recent | Most Contributors
-- Seznam znalostí s filtry (category, search)
-- Každá položka:
- - Title + content preview
- - Tags (category badges)
- - 👍 (count) / 👎 (count) buttons
- - "From X contributors" badge
- - "Synced to your rules" indicator (if user upvoted)
-
-```
-┌─────────────────────────────────────────────────────────────────┐
-│ DuckDB query optimization [data_analysis] │
-│ When querying large parquet files, filter before JOIN... │
-│ │
-│ 👍 5 👎 1 | From 3 contributors | ✓ In your rules │
-└─────────────────────────────────────────────────────────────────┘
-```
-
-**`webapp/templates/dashboard.html`** - přidat widget:
-```html
-
-
Corporate Memory
-
-
{{ stats.contributors }} contributors
-
{{ stats.knowledge_count }} knowledge items
-
{{ stats.your_rules }} your rules
-
-
Browse Knowledge →
-
-```
-
-### 4. Client Sync
-
-Server generuje .md soubory přímo do `/home/$user/.claude_rules/` pro každého uživatele.
-Klient si je jen stáhne.
-
-**`scripts/sync_data.sh`** - přidat:
-```bash
-# --- Sync corporate memory rules ---
-echo "📚 Syncing corporate memory rules..."
-mkdir -p .claude/rules
-rsync -avz data-analyst:~/.claude_rules/ .claude/rules/ 2>/dev/null || \
- scp -r data-analyst:~/.claude_rules/* .claude/rules/ 2>/dev/null || true
-```
-
-**Server-side** (v `corporate_memory_service.py`):
-- `generate_user_rules(username)` - při hlasování regeneruje .md soubory
-- Zapisuje do `/home/$user/.claude_rules/km_*.md`
-- Webapp volá po každém vote změně
-
-### 5. Deployment
-
-**`server/deploy.sh`** - additions:
-- Vytvořit /data/corporate-memory/ (2770 setgid)
-- Nainstalovat collect-knowledge do /usr/local/bin/
-- Deploy systemd timer
-- Přidat ANTHROPIC_API_KEY do .env
-
-**`server/sudoers-deploy`** - přidat práva pro /data/corporate-memory/
-
-**GitHub Secrets**: `ANTHROPIC_API_KEY`
-
-## Implementation Phases
-
-### Phase 1: Server Infrastructure
-1. `services/corporate_memory/` Python modul
-2. `server/bin/collect-knowledge` wrapper
-3. `services/corporate_memory/systemd/corporate-memory.service` + `.timer`
-4. Update `server/deploy.sh`
-5. Update sudoers
-
-### Phase 2: Webapp Backend
-1. `webapp/corporate_memory_service.py`
-2. API endpoints v `webapp/app.py`
-3. Update `webapp/config.py` (ANTHROPIC_API_KEY, paths)
-
-### Phase 3: Webapp Frontend
-1. Dashboard widget v `dashboard.html`
-2. Sub-page `corporate_memory.html`
-3. CSS styles
-
-### Phase 4: Client Sync
-1. Update `scripts/sync_data.sh` - přidat rsync pro `.claude_rules/`
-
-## Key Files to Modify
-
-| File | Changes |
-|------|---------|
-| `server/deploy.sh` | Add /data/corporate-memory/, ANTHROPIC_API_KEY, systemd timer |
-| `server/sudoers-deploy` | Add permissions for corporate-memory dir |
-| `webapp/app.py` | Add routes and API endpoints |
-| `webapp/config.py` | Add ANTHROPIC_API_KEY, CORPORATE_MEMORY_DIR |
-| `webapp/templates/dashboard.html` | Add Corporate Memory widget |
-| `scripts/sync_data.sh` | Add corporate rules sync step |
-
-## Reusable Patterns (from codebase)
-
-- **JSON I/O**: `webapp/telegram_service.py` - atomic writes with tempfile
-- **Dashboard widget**: `webapp/templates/dashboard.html` - KPI card pattern
-- **Sub-page**: `webapp/templates/catalog.html` - route + template pattern
-- **Systemd service**: `services/telegram_bot/systemd/notify-bot.service` - timer pattern
-- **Deploy**: `server/deploy.sh` - directory setup, script installation
-
-## Verification
-
-1. **Collector**: `ssh kids "/usr/local/bin/collect-knowledge --dry-run"`
-2. **API**: `curl https://your-instance.example.com/api/corporate-memory/stats`
-3. **Widget**: Check dashboard shows stats
-4. **Voting**: Click 👍/👎, verify votes.json updated
-5. **Sync**: Run `sync_data.sh`, check `.claude/rules/` has .md files
diff --git a/dev_docs/plan-rsync-fix.md b/dev_docs/plan-rsync-fix.md
deleted file mode 100644
index 7f37e3b..0000000
--- a/dev_docs/plan-rsync-fix.md
+++ /dev/null
@@ -1,137 +0,0 @@
-# Plan: Fix rsync synchronization hang (#197)
-
-## Problem
-
-Rsync from GCP server (YOUR_SERVER_IP) hangs after 1-5 minutes. Process exists but has 0% CPU and no network activity. 100% reproducible with ~7000 parquet files.
-
-## Root Cause Analysis
-
-The rsync commands in `scripts/sync_data.sh` use plain `rsync -avz` without:
-
-1. **SSH keepalive** - No `-e "ssh -o ServerAliveInterval=60"` to keep the TCP connection alive through firewall/NAT
-2. **I/O timeout** - No `--timeout=N` to detect stalled transfers
-3. **Retry logic** - A single connection drop kills the entire sync
-4. **Partial resume** - No `--partial-dir` so retries restart file transfers from scratch
-5. **Compression skip** - `-z` compresses parquet files that are already snappy-compressed (CPU waste)
-
-A stateful firewall or NAT device (GCP Cloud NAT, VPC firewall, or analyst's home router) drops idle TCP connections. Server-side `ClientAliveInterval=300s` sends keepalive, but the client side sends nothing. The firewall sees idle traffic and silently drops the connection. Neither rsync nor SSH client detects the dead connection.
-
-Note: GCP VPC firewall default idle timeout is 600s (10 min). The 1-5 minute hang suggests a more aggressive timeout on Cloud NAT or analyst's local network.
-
-## Implementation Plan
-
-### Step 1: Create diagnostic tests (`tests/test_sync_data.py`)
-
-Real end-to-end tests that download data to gitignored `data/` folder with detailed logging to pinpoint exactly where and why the hang occurs.
-
-**Live diagnostic tests (require server access, marked `@pytest.mark.live`):**
-
-| Test | Purpose |
-|------|---------|
-| `test_ssh_connectivity` | Basic SSH connection test with timeout |
-| `test_rsync_small_directory` | Sync docs (~few files) to verify baseline works |
-| `test_rsync_with_keepalive` | Sync parquet with SSH keepalive options |
-| `test_rsync_with_timeout` | Sync parquet with `--timeout=60` to detect hang quickly |
-| `test_rsync_per_subdirectory` | Sync each parquet subdir separately to isolate which triggers the hang |
-| `test_rsync_without_compression` | Sync parquet without `-z` to test if compression causes stall |
-
-Each test logs to `data/sync_diagnostics/` with timestamps, exit codes, bytes transferred, duration.
-
-**Static regression tests (run in CI without server access):**
-
-| Test | Purpose |
-|------|---------|
-| `test_rsync_commands_use_ssh_keepalive` | Every rsync in scripts has `ServerAliveInterval` |
-| `test_rsync_commands_have_timeout` | Every rsync has `--timeout=N` |
-| `test_parquet_rsync_does_not_compress` | Parquet rsync uses `-av` not `-avz` |
-| `test_sync_scripts_have_retry_logic` | Scripts define retry wrapper |
-
-### Step 2: Run diagnostic tests and analyze results
-
-Before implementing any fix, run the live diagnostic tests to confirm the root cause:
-
-```bash
-pytest tests/test_sync_data.py -v -k "live" --tb=short 2>&1 | tee data/sync_diagnostics/test_run.log
-```
-
-**Expected outcomes to validate:**
-- `test_ssh_connectivity` passes → SSH connection itself is fine
-- `test_rsync_small_directory` passes → small syncs work, problem is scale/duration-related
-- `test_rsync_with_timeout` fails with timeout → confirms connection drop (rsync exits instead of hanging forever)
-- `test_rsync_with_keepalive` passes → confirms keepalive fixes the issue
-- `test_rsync_per_subdirectory` → identifies if specific directory triggers the hang or if it's purely time-based
-- `test_rsync_without_compression` → reveals if `-z` contributes to the stall
-
-**Decision gate:** Based on results, confirm which combination of fixes is needed before proceeding to Step 3. If keepalive alone fixes it, the other changes are still applied as defense-in-depth.
-
-### Step 3: Fix `scripts/sync_data.sh`
-
-Add reliability wrapper after argument parsing:
-
-```bash
-# --- Rsync reliability settings (Issue #197) ---
-RSYNC_SSH_OPTS='ssh -o ServerAliveInterval=60 -o ServerAliveCountMax=3 -o ConnectTimeout=30'
-RSYNC_TIMEOUT=300
-RSYNC_MAX_RETRIES=3
-RSYNC_RETRY_DELAY=5
-
-rsync_reliable() {
- local attempt=1
- local delay=$RSYNC_RETRY_DELAY
- while [[ $attempt -le $RSYNC_MAX_RETRIES ]]; do
- if rsync -e "$RSYNC_SSH_OPTS" --timeout="$RSYNC_TIMEOUT" \
- --partial-dir=.rsync-partial "$@"; then
- return 0
- fi
- local exit_code=$?
- if [[ $attempt -lt $RSYNC_MAX_RETRIES ]]; then
- echo " Rsync failed (exit $exit_code), retrying in ${delay}s ($attempt/$RSYNC_MAX_RETRIES)..."
- sleep "$delay"
- delay=$((delay * 2))
- fi
- attempt=$((attempt + 1))
- done
- echo " ERROR: Rsync failed after $RSYNC_MAX_RETRIES attempts"
- return 1
-}
-```
-
-Replace all rsync invocations to use `rsync_reliable()` wrapper:
-- Drop `-z` for parquet transfers (already snappy-compressed)
-- Keep `-z` for text transfers (docs, scripts, metadata)
-- `--partial-dir=.rsync-partial` enables resume on retry (Gemini review recommendation)
-
-### Step 4: Fix `scripts/sync_jira.sh`
-
-Same `rsync_reliable()` wrapper and fixes.
-
-### Step 5: Update CI
-
-Add `tests/test_sync_data.py` to `.github/workflows/deploy-guard.yml` (static regression tests only, live diagnostics excluded via marker).
-
-### Step 6: Verify fix end-to-end
-
-Run full sync and confirm no hanging:
-```bash
-bash scripts/sync_data.sh --dry-run # script syntax OK
-bash scripts/sync_data.sh # full sync completes
-```
-
-## Files to Modify
-
-| File | Action | Description |
-|------|--------|-------------|
-| `tests/test_sync_data.py` | CREATE | Diagnostic + regression tests |
-| `scripts/sync_data.sh` | EDIT | Add reliability wrapper, fix all rsync calls |
-| `scripts/sync_jira.sh` | EDIT | Same fixes |
-| `.github/workflows/deploy-guard.yml` | EDIT | Add static tests to CI |
-
-## Review Notes
-
-Plan reviewed by Google Gemini (2026-02-17). Key feedback incorporated:
-- **`--partial-dir=.rsync-partial`** added to wrapper for efficient retry resume
-- **Root cause confirmed** as most likely firewall/NAT idle timeout (textbook signature)
-- **`ServerAliveInterval=60`** confirmed as correct value (low overhead, safe interval)
-- **`--timeout=300`** confirmed as reasonable (long enough to avoid false positives)
-- **Drop `-z`** confirmed correct (compressing snappy data wastes CPU)
-- **Future optimization**: `--files-from` approach for large file sets if needed
diff --git a/dev_docs/plan_parquet_types_fix.md b/dev_docs/plan_parquet_types_fix.md
deleted file mode 100644
index bf324be..0000000
--- a/dev_docs/plan_parquet_types_fix.md
+++ /dev/null
@@ -1,139 +0,0 @@
-# Plan: Fix Parquet Type Preservation & DuckDB Schema Mismatch
-
-## Context
-
-Issues #185, #186, #187 report that DuckDB views fail for partitioned tables when parquet files have schema mismatches. The root cause is twofold:
-1. **DuckDB** reads partitioned parquet files without `union_by_name=true`, so if a column is `null` type in one partition and `VARCHAR` in another, it crashes.
-2. **Parquet writing** uses `pa.Table.from_pandas()` without explicit schema — columns with all NULLs get inferred as `null` type instead of proper type (STRING, DATE, etc.).
-
-**Note:** #187 is a duplicate of #186.
-
-## Changes
-
-### 1. DuckDB fix — `scripts/duckdb_manager.py` (line 235)
-
-Add `union_by_name=true` to `read_parquet()` for partitioned views:
-
-```python
-# Before:
-sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}')"
-# After:
-sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)"
-```
-
-This is an immediate fix — existing partition files with `null`-typed columns will be readable without re-syncing.
-
-### 2. PyArrow type mapping — `src/keboola_client.py` (after line 48)
-
-Add `KEBOOLA_TO_PYARROW_TYPES` dict alongside existing `KEBOOLA_TO_PANDAS_TYPES`:
-
-```python
-KEBOOLA_TO_PYARROW_TYPES = {
- "STRING": pa.string(), "VARCHAR": pa.string(), "TEXT": pa.string(),
- "INTEGER": pa.int64(), "BIGINT": pa.int64(),
- "NUMERIC": pa.float64(), "DECIMAL": pa.float64(), "FLOAT": pa.float64(), "DOUBLE": pa.float64(),
- "BOOLEAN": pa.bool_(),
- "DATE": pa.date32(),
- "TIMESTAMP": pa.timestamp("us"), "TIMESTAMP_NTZ": pa.timestamp("us"), "TIMESTAMP_TZ": pa.timestamp("us", tz="UTC"),
-}
-```
-
-### 3. Refactor provider cascade + new `get_pyarrow_schema()` — `src/keboola_client.py`
-
-**3a.** Extract duplicated provider cascade logic from `get_pandas_dtypes()`, `get_date_columns()` into a shared `_resolve_keboola_type(col_meta_list) -> str` helper. This logic is repeated 3x — the new method would be the 4th copy without refactoring.
-
-**3b.** Add `get_pyarrow_schema(table_id) -> Optional[pa.Schema]` method to `KeboolaClient`:
-- Builds `pa.Schema` from Keboola metadata using `_resolve_keboola_type()` + `KEBOOLA_TO_PYARROW_TYPES`
-- Returns `None` with WARNING log if metadata unavailable (graceful fallback)
-- Refactor `get_pandas_dtypes()` and `get_date_columns()` to also use `_resolve_keboola_type()`
-
-### 4. New helper `apply_schema_to_table()` — `src/parquet_manager.py` (after line 103)
-
-Function that casts a `pa.Table` to match a target `pa.Schema`:
-- Matches columns by name (not position)
-- For `null`-type columns: creates typed null array via `pa.nulls(len, type=target_type)`
-- For other mismatches: uses `col.cast(target_type, safe=True)` — if cast fails, keeps original type and logs warning (no data is changed or lost)
-- Columns not in schema keep their inferred type
-- Logs warnings on cast failures, doesn't crash
-
-### 5. Integrate schema into all parquet write paths
-
-Add `pyarrow_schema` parameter to these methods and apply `apply_schema_to_table()` after `pa.Table.from_pandas()` and `convert_date_columns_to_date32()`:
-
-| Method | File | Line | Current issue |
-|--------|------|------|---------------|
-| `csv_to_parquet()` | `parquet_manager.py` | 274 | No explicit schema |
-| `merge_parquet()` | `parquet_manager.py` | 493 | No explicit schema |
-| `_process_csv_to_partitions()` | `data_sync.py` | 848, 854 | No explicit schema |
-| `_deduplicate_partitions()` | `data_sync.py` | 890 | Uses `df.to_parquet()` — bypasses entire PyArrow pipeline |
-
-**`_deduplicate_partitions()` fix (line 890):** Replace `df.to_parquet()` with full PyArrow pipeline:
-```python
-table = pa.Table.from_pandas(df, preserve_index=False)
-if date_columns:
- table = convert_date_columns_to_date32(table, date_columns)
-if pyarrow_schema:
- table = apply_schema_to_table(table, pyarrow_schema)
-pq.write_table(table, partition_path, compression="snappy")
-```
-
-### 6. Update all callers to fetch and pass `pyarrow_schema`
-
-Each caller already fetches `dtypes` and `date_columns`. Add one line after them:
-
-```python
-pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id)
-```
-
-| Caller method | File:Line | Passes to |
-|---------------|-----------|-----------|
-| `_full_refresh()` | `data_sync.py:318-328` | `csv_to_parquet()` |
-| `_incremental_single_file_sync()` | `data_sync.py:455-508` | `merge_parquet()`, `csv_to_parquet()` |
-| `_incremental_partitioned_sync()` | `data_sync.py:600-612` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` |
-| `_chunked_initial_load()` | `data_sync.py:673-766` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` |
-| `_partitioned_sync()` | `data_sync.py:989-1001` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` |
-
-## Tests — `data/tests/test_parquet_types.py` (local only, gitignored)
-
-Test file lives in `data/tests/` which is gitignored (`data/` is in `.gitignore`). Tests are for **local verification only** — they don't go to the server or CI.
-
-All test data is created via pytest's `tmp_path` fixture (auto-cleaned) or in `data/tests/tmp/`. No test artifacts persist after the run.
-
-10 test cases, all local (no Keboola API calls):
-
-**Core tests:**
-1. **`test_union_by_name_resolves_schema_mismatch`** — Two parquet files with conflicting types, DuckDB reads with `union_by_name=true`
-2. **`test_apply_schema_fixes_null_type_columns`** — `apply_schema_to_table()` converts null-type columns to proper types
-3. **`test_parquet_preserves_types_with_all_null_columns`** — End-to-end: CSV with all-NULL column → Parquet → verify correct type
-4. **`test_deduplicate_preserves_date32_types`** — After dedup, DATE32 columns retain their type
-5. **`test_keboola_to_pyarrow_mapping_completeness`** — Every key in `KEBOOLA_TO_PANDAS_TYPES` exists in `KEBOOLA_TO_PYARROW_TYPES`
-6. **`test_get_pyarrow_schema_from_metadata`** — Unit test with mocked Keboola metadata → verify correct schema
-
-**Additional tests (from Gemini review):**
-7. **`test_apply_schema_handles_cast_error_gracefully`** — Column with uncastable values (e.g. "abc" → int64) doesn't crash, logs warning, keeps original type
-8. **`test_schema_with_extra_csv_column`** — CSV has column not in metadata → column preserved with inferred type
-9. **`test_get_pyarrow_schema_handles_all_types`** — All types from `KEBOOLA_TO_PYARROW_TYPES` mapping verified
-10. **`test_deduplicate_with_mixed_types_and_nulls`** — Partition with all-null + mixed-data columns → correct schema after dedup
-
-## Files modified
-
-- `scripts/duckdb_manager.py` — 1 line change (union_by_name)
-- `src/keboola_client.py` — new `KEBOOLA_TO_PYARROW_TYPES` mapping + `_resolve_keboola_type()` helper + `get_pyarrow_schema()` method + refactor existing methods
-- `src/parquet_manager.py` — new `apply_schema_to_table()` function + `pyarrow_schema` param on `csv_to_parquet()` and `merge_parquet()`
-- `src/data_sync.py` — `pyarrow_schema` param on `_process_csv_to_partitions()` and `_deduplicate_partitions()` + update all 5 callers
-- `data/tests/test_parquet_types.py` — new file, 10 tests (**gitignored**, local verification only)
-
-## Verification
-
-1. Run `pytest data/tests/test_parquet_types.py -v` — all 10 tests pass
-2. Run `pytest tests/` — existing CI tests still pass (no regressions)
-3. Existing parquet files don't need re-sync — the `union_by_name=true` fix handles them immediately
-4. On next sync cycle, partitions get re-written with correct schema
-
-## Cleanup after verification
-
-After all tests pass and code changes are committed:
-```bash
-rm -rf data/tests/
-```
-Test file and all test artifacts live in `data/` (gitignored) — nothing gets committed or deployed to server.
diff --git a/examples/notifications/data_freshness.py b/examples/notifications/data_freshness.py
deleted file mode 100644
index e2c1510..0000000
--- a/examples/notifications/data_freshness.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/env python3
-"""
-Example notification: Data freshness alert.
-
-Checks if the local data is stale (older than threshold) and notifies.
-Outputs JSON to stdout for notify-runner.
-"""
-
-import json
-import os
-import sys
-from datetime import datetime, timezone
-from pathlib import Path
-
-# Configuration
-DATA_DIR = Path.home() / "server" / "parquet"
-STALE_THRESHOLD_HOURS = 24
-
-
-def check_freshness() -> dict:
- """Check data freshness by examining parquet file modification times."""
- if not DATA_DIR.exists():
- return {
- "notify": True,
- "title": "Data directory missing",
- "message": f"Data directory not found: {DATA_DIR}\nRun: bash scripts/sync_data.sh",
- "cooldown": "6h",
- }
-
- # Find newest parquet file
- parquet_files = list(DATA_DIR.rglob("*.parquet"))
- if not parquet_files:
- return {
- "notify": True,
- "title": "No data files found",
- "message": "No parquet files in data directory.\nRun: bash scripts/sync_data.sh",
- "cooldown": "6h",
- }
-
- newest_mtime = max(f.stat().st_mtime for f in parquet_files)
- newest_dt = datetime.fromtimestamp(newest_mtime, tz=timezone.utc)
- now = datetime.now(tz=timezone.utc)
- age_hours = (now - newest_dt).total_seconds() / 3600
-
- if age_hours > STALE_THRESHOLD_HOURS:
- return {
- "notify": True,
- "title": f"Data is {age_hours:.0f}h old",
- "message": (
- f"Latest file: {newest_dt:%Y-%m-%d %H:%M} UTC\n"
- f"Age: {age_hours:.1f} hours (threshold: {STALE_THRESHOLD_HOURS}h)\n"
- f"Run: bash scripts/sync_data.sh"
- ),
- "cooldown": "6h",
- }
-
- return {"notify": False}
-
-
-if __name__ == "__main__":
- result = check_freshness()
- print(json.dumps(result))
diff --git a/examples/notifications/metric_report.py b/examples/notifications/metric_report.py
deleted file mode 100644
index f9eca15..0000000
--- a/examples/notifications/metric_report.py
+++ /dev/null
@@ -1,125 +0,0 @@
-#!/usr/bin/env python3
-"""
-Example notification: Daily metric report with chart image.
-
-Generates a summary chart using matplotlib and sends it as a Telegram photo.
-Outputs JSON to stdout for notify-runner.
-"""
-
-import json
-import os
-import sys
-import tempfile
-from datetime import datetime
-from pathlib import Path
-
-import duckdb
-
-DB_PATH = Path.home() / "user" / "duckdb" / "analytics.duckdb"
-
-
-def generate_chart(data: list[tuple]) -> str | None:
- """Generate a bar chart and return the file path."""
- try:
- import matplotlib
-
- matplotlib.use("Agg") # Non-interactive backend
- import matplotlib.pyplot as plt
-
- dates = [row[0].strftime("%m/%d") for row in data]
- values = [float(row[1]) for row in data]
-
- fig, ax = plt.subplots(figsize=(8, 4))
- bars = ax.bar(dates, values, color="#0073D1", width=0.6)
-
- # Highlight today
- if len(bars) > 0:
- bars[-1].set_color("#EA580C")
-
- ax.set_title("Daily Revenue - Last 7 Days", fontsize=14, fontweight="bold")
- ax.set_ylabel("Revenue ($)")
- ax.spines["top"].set_visible(False)
- ax.spines["right"].set_visible(False)
- plt.xticks(rotation=45, ha="right")
- plt.tight_layout()
-
- # Save to temp file
- chart_path = os.path.join(
- tempfile.gettempdir(),
- f"notify_{os.environ.get('USER', 'user')}_metric_{datetime.now():%Y%m%d}.png",
- )
- plt.savefig(chart_path, dpi=150, bbox_inches="tight")
- plt.close()
-
- return chart_path
-
- except ImportError:
- print("matplotlib not installed, skipping chart", file=sys.stderr)
- return None
- except Exception as e:
- print(f"Chart generation error: {e}", file=sys.stderr)
- return None
-
-
-def build_report() -> dict:
- """Build daily metric report."""
- if not DB_PATH.exists():
- return {"notify": False}
-
- try:
- conn = duckdb.connect(str(DB_PATH), read_only=True)
-
- # TODO: Adapt this query to your schema.
- # DuckDB views use relative paths, so scripts must run from ~/
- # (notify-runner sets cwd to home directory automatically).
- #
- # Example for a table with date + numeric columns:
- # SELECT DATE_TRUNC('day', created_at)::DATE AS day,
- # SUM(amount) AS revenue
- # FROM my_table
- # WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
- # GROUP BY 1 ORDER BY 1
- rows = conn.execute("""
- SELECT
- DATE_TRUNC('day', created_at)::DATE AS day,
- COUNT(*) AS cnt
- FROM kbc_project
- WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
- GROUP BY 1
- ORDER BY 1
- """).fetchall()
-
- conn.close()
-
- if not rows:
- return {"notify": False}
-
- today_val = float(rows[-1][1]) if rows else 0
- total_7d = sum(float(r[1]) for r in rows)
-
- chart_path = generate_chart(rows)
-
- result = {
- "notify": True,
- "title": "Daily Metric Report",
- "message": (
- f"Today: {today_val:,.0f}\n"
- f"7d total: {total_7d:,.0f}\n"
- f"7d avg: {total_7d / len(rows):,.0f}"
- ),
- "cooldown": "1d",
- }
-
- if chart_path:
- result["image_path"] = chart_path
-
- return result
-
- except Exception as e:
- print(f"metric_report error: {e}", file=sys.stderr)
- return {"notify": False}
-
-
-if __name__ == "__main__":
- result = build_report()
- print(json.dumps(result))
diff --git a/examples/notifications/revenue_drop.py b/examples/notifications/revenue_drop.py
deleted file mode 100644
index 8c437e8..0000000
--- a/examples/notifications/revenue_drop.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#!/usr/bin/env python3
-"""
-Example notification: Revenue drop alert (text only).
-
-Checks if today's revenue dropped significantly vs 7-day average.
-Outputs JSON to stdout for notify-runner.
-"""
-
-import json
-import sys
-from pathlib import Path
-
-import duckdb
-
-# Configuration
-DB_PATH = Path.home() / "user" / "duckdb" / "analytics.duckdb"
-DROP_THRESHOLD_PERCENT = 20
-
-
-def check_revenue_drop() -> dict:
- """Query revenue data and check for significant drop."""
- if not DB_PATH.exists():
- return {"notify": False}
-
- try:
- conn = duckdb.connect(str(DB_PATH), read_only=True)
-
- # Example query - adjust table/column names to your schema
- result = conn.execute("""
- WITH daily AS (
- SELECT
- DATE_TRUNC('day', created_at) AS day,
- SUM(amount) AS revenue
- FROM payments
- WHERE created_at >= CURRENT_DATE - INTERVAL '8 days'
- GROUP BY 1
- ),
- stats AS (
- SELECT
- (SELECT revenue FROM daily WHERE day = CURRENT_DATE) AS today_rev,
- AVG(CASE WHEN day < CURRENT_DATE AND day >= CURRENT_DATE - INTERVAL '7 days'
- THEN revenue END) AS avg_7d
- FROM daily
- )
- SELECT today_rev, avg_7d,
- ROUND((1 - today_rev / NULLIF(avg_7d, 0)) * 100, 1) AS drop_pct
- FROM stats
- """).fetchone()
-
- conn.close()
-
- if result is None or result[0] is None or result[1] is None:
- return {"notify": False}
-
- today_rev, avg_7d, drop_pct = result
-
- if drop_pct >= DROP_THRESHOLD_PERCENT:
- return {
- "notify": True,
- "title": f"Revenue dropped {drop_pct}%",
- "message": (
- f"Today: ${today_rev:,.0f}\n"
- f"7d avg: ${avg_7d:,.0f}\n"
- f"Drop: {drop_pct}%"
- ),
- "cooldown": "6h",
- "data": {
- "today_revenue": float(today_rev),
- "avg_7d_revenue": float(avg_7d),
- "drop_percent": float(drop_pct),
- },
- }
-
- return {"notify": False}
-
- except Exception as e:
- print(f"revenue_drop error: {e}", file=sys.stderr)
- return {"notify": False}
-
-
-if __name__ == "__main__":
- result = check_revenue_drop()
- print(json.dumps(result))
diff --git a/llms.txt b/llms.txt
deleted file mode 100644
index efc1a0d..0000000
--- a/llms.txt
+++ /dev/null
@@ -1,47 +0,0 @@
-# AI Data Analyst
-
-> A data distribution platform for AI analytical systems. Syncs data from configured sources (Keboola, CSV, BigQuery), converts to Parquet, and distributes to analysts who query locally using Claude Code and DuckDB.
-
-## Key Files
-
-- [README](README.md): Project overview, quick start, and feature list
-- [CLAUDE.md](CLAUDE.md): Claude Code project context and development instructions
-- [ARCHITECTURE.md](ARCHITECTURE.md): System architecture, data flow, and component overview
-
-## Documentation
-
-- [Quick Start](docs/QUICKSTART.md): End-to-end setup for new deployments
-- [Configuration](docs/CONFIGURATION.md): All instance.yaml options explained
-- [Deployment](docs/DEPLOYMENT.md): Server provisioning and deployment guide
-- [Data Sources](docs/DATA_SOURCES.md): Adapter system and data source configuration
-
-## Core Source Code
-
-- [src/data_sync.py](src/data_sync.py): Data sync orchestration and DataSource ABC
-- [src/adapters/](src/adapters/): Pluggable data source adapters (Keboola, CSV)
-- [src/parquet_manager.py](src/parquet_manager.py): CSV to Parquet conversion engine
-- [src/config.py](src/config.py): Runtime configuration from data_description.md
-- [config/loader.py](config/loader.py): Instance config loader with ${ENV_VAR} interpolation
-
-## Web Application
-
-- [webapp/app.py](webapp/app.py): Flask application entry point
-- [webapp/config.py](webapp/config.py): Webapp configuration from instance.yaml
-- [webapp/account_service.py](webapp/account_service.py): User account and sync management
-
-## Server Operations
-
-- [server/deploy.sh](server/deploy.sh): Deployment script
-- [server/setup.sh](server/setup.sh): Initial server provisioning
-- [server/webapp-setup.sh](server/webapp-setup.sh): Web application setup (Nginx, SSL, systemd)
-- [config/instance.yaml.example](config/instance.yaml.example): Configuration template
-
-## Configuration
-
-Instance config: `config/instance.yaml` (YAML with `${ENV_VAR}` references for secrets).
-Environment variables: `.env` file (never committed). See `config/.env.template`.
-Data schema: `docs/data_description.md` (YAML blocks in markdown).
-
-## Architecture Summary
-
-Data Source -> Source Adapter -> Parquet files on server -> rsync over SSH -> Analyst machine -> DuckDB views -> Claude Code queries
diff --git a/scripts/activate_venv.sh b/scripts/activate_venv.sh
deleted file mode 100644
index d60306f..0000000
--- a/scripts/activate_venv.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-# Helper script to activate virtual environment
-# Usage: source scripts/activate_venv.sh
-
-# Detect project root (should be run from project directory)
-if [ ! -d ".venv" ]; then
- echo "❌ Virtual environment not found. Are you in the project directory?"
- echo " Expected: A directory with .venv/ folder"
- return 1 2>/dev/null || exit 1
-fi
-
-# Activate based on platform
-if [ -f ".venv/bin/activate" ]; then
- # Unix/macOS
- source .venv/bin/activate
- echo "✅ Virtual environment activated (Unix)"
-elif [ -f ".venv/Scripts/activate" ]; then
- # Windows (Git Bash)
- source .venv/Scripts/activate
- echo "✅ Virtual environment activated (Windows)"
-else
- echo "❌ Could not find activation script in .venv/"
- return 1 2>/dev/null || exit 1
-fi
-
-# Show Python version
-echo " Python: $(python --version 2>&1)"
-echo " Location: $(which python)"
diff --git a/scripts/backfill_gap.sh b/scripts/backfill_gap.sh
deleted file mode 100755
index cd910a0..0000000
--- a/scripts/backfill_gap.sh
+++ /dev/null
@@ -1,109 +0,0 @@
-#!/bin/bash
-# Backfill missing Jira issues from GitHub issue #101
-# Range: SUPPORT-15166 to SUPPORT-15243 (71 missing of 78 total)
-# Safe to run while webhook processing is active.
-#
-# Usage:
-# ssh kids
-# cd /opt/data-analyst/repo
-# source /opt/data-analyst/.venv/bin/activate
-# bash scripts/backfill_gap.sh [--dry-run]
-
-set -euo pipefail
-
-REPO_DIR="/opt/data-analyst/repo"
-VENV_DIR="/opt/data-analyst/.venv"
-RAW_DIR="/data/src_data/raw/jira"
-PARQUET_DIR="/data/src_data/parquet/jira"
-LOG_FILE="/opt/data-analyst/logs/backfill_gap.log"
-JIRA_PROJECT="${JIRA_PROJECT:-}"
-if [ -n "$JIRA_PROJECT" ]; then
- JQL="project = \"${JIRA_PROJECT}\" AND key >= SUPPORT-15166 AND key <= SUPPORT-15243"
-else
- JQL='key >= SUPPORT-15166 AND key <= SUPPORT-15243'
-fi
-RANGE_START=15166
-RANGE_END=15243
-DRY_RUN=false
-
-# Parse args
-if [[ "${1:-}" == "--dry-run" ]]; then
- DRY_RUN=true
-fi
-
-# Ensure log directory exists
-mkdir -p "$(dirname "$LOG_FILE")"
-
-# Log to both stdout and file
-exec > >(tee -a "$LOG_FILE") 2>&1
-echo "=== Backfill started: $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
-
-cd "$REPO_DIR"
-
-# --- Phase 1: Download raw JSON ---
-echo ""
-echo "--- Phase 1: Download raw JSON ---"
-if $DRY_RUN; then
- python -m connectors.jira.scripts.backfill --jql "$JQL" --dry-run
- echo "Dry run complete. Exiting."
- exit 0
-fi
-
-python -m connectors.jira.scripts.backfill --jql "$JQL" --skip-existing --parallel 4
-
-# --- Phase 2: Incremental Parquet transform ---
-echo ""
-echo "--- Phase 2: Incremental Parquet transform ---"
-success=0
-skipped=0
-failed=0
-
-for issue_num in $(seq $RANGE_START $RANGE_END); do
- issue_key="SUPPORT-${issue_num}"
- json_file="${RAW_DIR}/issues/${issue_key}.json"
-
- if [ ! -f "$json_file" ]; then
- echo "SKIP: $issue_key (no JSON)"
- skipped=$((skipped + 1))
- continue
- fi
-
- echo -n "Transform $issue_key... "
- if python -m src.incremental_jira_transform "$issue_key" 2>&1 | tail -1; then
- success=$((success + 1))
- else
- echo "FAILED: $issue_key"
- failed=$((failed + 1))
- fi
-
- sleep 0.5 # reduce collision window with live webhooks
-done
-
-echo ""
-echo "Transform complete: $success ok, $skipped skipped, $failed failed"
-
-# --- Phase 3: Verification ---
-echo ""
-echo "--- Phase 3: Verification ---"
-python -c "
-import pyarrow.parquet as pq
-from pathlib import Path
-
-parquet_dir = Path('$PARQUET_DIR/issues')
-all_keys = set()
-for pf in parquet_dir.glob('*.parquet'):
- table = pq.read_table(pf, columns=['issue_key'])
- all_keys.update(table.column('issue_key').to_pylist())
-
-expected = {f'SUPPORT-{n}' for n in range($RANGE_START, $RANGE_END + 1)}
-found = expected & all_keys
-missing = expected - all_keys
-print(f'Found: {len(found)}/{len(expected)} issues in Parquet')
-if missing:
- print(f'STILL MISSING ({len(missing)}): {sorted(missing)}')
-else:
- print('SUCCESS: All issues present in Parquet')
-"
-
-echo ""
-echo "=== Backfill finished: $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
diff --git a/scripts/collect_session.py b/scripts/collect_session.py
deleted file mode 100644
index d3d0779..0000000
--- a/scripts/collect_session.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#!/usr/bin/env python3
-"""Collect Claude Code session transcript to user/sessions/.
-
-This script is invoked by Claude Code's SessionEnd hook.
-It reads JSON from stdin containing session_id, transcript_path, and cwd,
-then copies the transcript JSONL file to user/sessions/ with a date prefix.
-
-Design principles:
-- Stdlib only (no external dependencies)
-- Must NEVER crash or produce non-zero exit - Claude Code expects clean exit
-- Uses shutil.copy2 (not move) - Claude Code still references the transcript
-- UTC date for consistency across timezones
-"""
-
-import json
-import shutil
-import sys
-from datetime import datetime, timezone
-from pathlib import Path
-
-
-def main() -> None:
- try:
- raw = sys.stdin.read()
- if not raw.strip():
- return
-
- data = json.loads(raw)
- session_id = data.get("session_id", "")
- transcript_path = data.get("transcript_path", "")
- cwd = data.get("cwd", "")
-
- if not transcript_path or not session_id:
- return
-
- source = Path(transcript_path)
- if not source.exists() or not source.is_file():
- return
-
- # Determine target directory: cwd/user/sessions/
- if not cwd:
- return
-
- target_dir = Path(cwd) / "user" / "sessions"
-
- # Only collect if we're inside a project that has user/ directory
- user_dir = Path(cwd) / "user"
- if not user_dir.is_dir():
- return
-
- target_dir.mkdir(parents=True, exist_ok=True)
-
- # Format: YYYY-MM-DD_{full_session_id}.jsonl
- date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
- target_file = target_dir / f"{date_str}_{session_id}.jsonl"
-
- # Skip if already collected (check by session_id suffix to handle date changes)
- for existing in target_dir.glob(f"*_{session_id}.jsonl"):
- return
-
- shutil.copy2(str(source), str(target_file))
-
- except Exception:
- # Must never crash - Claude Code depends on clean exit
- pass
-
-
-if __name__ == "__main__":
- main()
diff --git a/scripts/generate_user_sync_configs.py b/scripts/generate_user_sync_configs.py
deleted file mode 100755
index 5c692f7..0000000
--- a/scripts/generate_user_sync_configs.py
+++ /dev/null
@@ -1,162 +0,0 @@
-#!/usr/bin/env python3
-"""
-Generate per-user sync config files from central sync_settings.json.
-
-This script reads /data/notifications/sync_settings.json and writes
-~/.sync_settings.yaml for each user with their configured settings.
-
-Usage:
- python3 generate_user_sync_configs.py [--dry-run]
-
-Run this script:
-- After manual changes to sync_settings.json
-- As a cron job to ensure configs stay in sync
-- The webapp calls this automatically after settings changes
-"""
-
-import argparse
-import json
-import os
-import pwd
-import sys
-from pathlib import Path
-
-SYNC_SETTINGS_FILE = Path("/data/notifications/sync_settings.json")
-DEFAULT_SETTINGS = {
- "jira": False,
- "jira_attachments": False,
-}
-
-
-def read_settings() -> dict:
- """Read sync settings from JSON file."""
- if not SYNC_SETTINGS_FILE.exists():
- return {}
- try:
- with open(SYNC_SETTINGS_FILE) as f:
- return json.load(f)
- except (json.JSONDecodeError, IOError) as e:
- print(f"Error reading {SYNC_SETTINGS_FILE}: {e}", file=sys.stderr)
- return {}
-
-
-def generate_yaml(settings: dict) -> str:
- """Generate YAML content for a user's sync config."""
- lines = [
- "# AI Data Analyst - Data Sync Configuration",
- "# Managed by web portal - changes here may be overwritten",
- "#",
- "# To manage settings, visit the web portal (Data Settings page)",
- "",
- "datasets:",
- ]
-
- for dataset, enabled in sorted(settings.items()):
- value = "true" if enabled else "false"
- lines.append(f" {dataset}: {value}")
-
- lines.append("")
- return "\n".join(lines)
-
-
-def get_user_home(username: str) -> Path | None:
- """Get home directory for a user."""
- try:
- return Path(pwd.getpwnam(username).pw_dir)
- except KeyError:
- return None
-
-
-def write_user_config(username: str, yaml_content: str, dry_run: bool = False) -> bool:
- """Write config file to user's home directory.
-
- Returns True on success, False on failure.
- """
- home = get_user_home(username)
- if not home:
- print(f" [SKIP] User {username} not found on system")
- return False
-
- config_path = home / ".sync_settings.yaml"
-
- if dry_run:
- print(f" [DRY-RUN] Would write {config_path}")
- return True
-
- try:
- # Write to temp file first
- import tempfile
-
- fd, tmp_path = tempfile.mkstemp(suffix=".yaml", dir=str(home.parent))
- try:
- with os.fdopen(fd, "w") as f:
- f.write(yaml_content)
-
- # Get user's uid/gid
- user_info = pwd.getpwnam(username)
-
- # Set ownership
- os.chown(tmp_path, user_info.pw_uid, user_info.pw_gid)
- os.chmod(tmp_path, 0o644)
-
- # Atomic move
- os.replace(tmp_path, config_path)
-
- print(f" [OK] {config_path}")
- return True
-
- except Exception as e:
- os.unlink(tmp_path)
- raise e
-
- except PermissionError:
- print(f" [ERROR] Permission denied for {username}")
- return False
- except Exception as e:
- print(f" [ERROR] {username}: {e}")
- return False
-
-
-def main():
- parser = argparse.ArgumentParser(
- description="Generate per-user sync config files from central settings."
- )
- parser.add_argument(
- "--dry-run", action="store_true", help="Show what would be done without making changes"
- )
- parser.add_argument("--user", help="Generate config for specific user only")
- args = parser.parse_args()
-
- all_settings = read_settings()
-
- if not all_settings:
- print("No sync settings found or file is empty.")
- return 0
-
- if args.user:
- users = {args.user: all_settings.get(args.user, {})}
- else:
- users = all_settings
-
- print(f"Generating sync configs for {len(users)} user(s)...")
- success = 0
- failed = 0
-
- for username, user_data in users.items():
- # Merge with defaults
- settings = dict(DEFAULT_SETTINGS)
- settings.update(user_data.get("datasets", {}))
-
- yaml_content = generate_yaml(settings)
-
- if write_user_config(username, yaml_content, args.dry_run):
- success += 1
- else:
- failed += 1
-
- print(f"\nDone: {success} succeeded, {failed} failed")
- return 0 if failed == 0 else 1
-
-
-if __name__ == "__main__":
- sys.exit(main())
diff --git a/scripts/remote_query.sh b/scripts/remote_query.sh
deleted file mode 100644
index 5e05492..0000000
--- a/scripts/remote_query.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/bin/bash
-# Remote Query - wrapper for src.remote_query
-#
-# Runs DuckDB queries spanning local Parquet + remote BigQuery tables.
-# Sets up the correct environment (PYTHONPATH, CONFIG_DIR, env vars) automatically.
-#
-# Usage (via SSH from analyst machine):
-# ssh 'bash ~/server/scripts/remote_query.sh \
-# --register-bq "traffic=SELECT ... FROM \`project.dataset.table\` WHERE ... GROUP BY ..." \
-# --sql "SELECT * FROM traffic ORDER BY ..." \
-# --format table'
-#
-# All arguments are passed directly to python -m src.remote_query.
-# See: python -m src.remote_query --help
-
-set -e
-
-APP_DIR="/opt/data-analyst"
-SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
-
-# Load BigQuery environment variables
-# Try analyst-readable env first (deployed to /data/scripts/), fall back to app .env
-if [[ -f "${SCRIPT_DIR}/.remote_query.env" ]]; then
- set -a
- source "${SCRIPT_DIR}/.remote_query.env"
- set +a
-elif [[ -r "${APP_DIR}/.env" ]]; then
- set -a
- source "${APP_DIR}/.env"
- set +a
-else
- echo "ERROR: No environment file found. Contact your admin." >&2
- echo " Tried: ${SCRIPT_DIR}/.remote_query.env, ${APP_DIR}/.env" >&2
- exit 1
-fi
-
-# Run remote_query with correct paths
-cd "${APP_DIR}"
-PYTHONPATH="${APP_DIR}/repo" \
-CONFIG_DIR="${APP_DIR}/instance/config" \
-exec "${APP_DIR}/.venv/bin/python3" -m src.remote_query "$@"
diff --git a/scripts/setup_views.sh b/scripts/setup_views.sh
deleted file mode 100644
index 2029710..0000000
--- a/scripts/setup_views.sh
+++ /dev/null
@@ -1,126 +0,0 @@
-#!/bin/bash
-
-# setup_views.sh - DuckDB views initialization script
-#
-# This script performs:
-# 1. DuckDB views initialization from Parquet files
-#
-# Run this after syncing Parquet files (either via update.sh or rsync)
-
-set -e # Exit on error
-
-echo "🦆 DuckDB Views Setup"
-echo ""
-
-# Check that we're in the correct folder
-# Support both local (server/docs/) and server (~/ with server/ symlinks) layouts
-if [ ! -f "server/docs/data_description.md" ] && [ ! -f "docs/data_description.md" ]; then
- echo "❌ Run script from project root (folder with server/docs/data_description.md)"
- exit 1
-fi
-
-# Activate virtual environment
-echo "1️⃣ Activating virtual environment..."
-if [ -d ".venv" ]; then
- # Try Unix-style activation first
- if [ -f ".venv/bin/activate" ]; then
- source .venv/bin/activate
- # Windows-style activation
- elif [ -f ".venv/Scripts/activate" ]; then
- source .venv/Scripts/activate
- else
- echo " ❌ Virtual environment activation script not found."
- exit 1
- fi
- echo " ✅ Virtual environment activated"
-elif [ -d "venv" ]; then
- # Legacy support for old venv name
- if [ -f "venv/bin/activate" ]; then
- source venv/bin/activate
- elif [ -f "venv/Scripts/activate" ]; then
- source venv/Scripts/activate
- else
- echo " ❌ Virtual environment activation script not found."
- exit 1
- fi
- echo " ✅ Virtual environment activated (legacy venv)"
-else
- echo " ❌ Virtual environment not found (.venv or venv)."
- echo " Run bootstrap setup first."
- exit 1
-fi
-
-# Initialize DuckDB views
-echo ""
-echo "2️⃣ Initializing DuckDB views..."
-echo ""
-
-# Use python3 if available, otherwise python (Windows compatibility)
-if command -v python3 >/dev/null 2>&1; then
- PYTHON_CMD=python3
-else
- PYTHON_CMD=python
-fi
-
-# Determine scripts location (local: server/scripts/, server deploy: same dir as this script)
-SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
-DUCKDB_MANAGER="${SCRIPT_DIR}/duckdb_manager.py"
-
-if $PYTHON_CMD "$DUCKDB_MANAGER" --reinit; then
- echo ""
- echo " ✅ DuckDB views initialized"
-else
- echo ""
- echo " ❌ DuckDB initialization failed. Check logs above."
- exit 1
-fi
-
-# Optional dataset views (each sync script manages its own views)
-if [ -d "server/parquet/jira" ]; then
- JIRA_SCRIPT="${SCRIPT_DIR}/sync_jira.sh"
- if [ -f "$JIRA_SCRIPT" ]; then
- echo ""
- echo "3️⃣ Creating Jira views..."
- bash "$JIRA_SCRIPT" --views-only
- fi
-fi
-
-# Display sync state
-echo ""
-echo "📊 Data state:"
-# Check for sync state in either location
-SYNC_STATE=""
-if [ -f "server/metadata/sync_state.json" ]; then
- SYNC_STATE="server/metadata/sync_state.json"
-elif [ -f "data/metadata/sync_state.json" ]; then
- SYNC_STATE="data/metadata/sync_state.json"
-fi
-
-if [ -n "$SYNC_STATE" ]; then
- $PYTHON_CMD << PYTHON
-import json
-from datetime import datetime
-
-with open("$SYNC_STATE", "r") as f:
- state = json.load(f)
-
-print(f"\n Last update: {state.get('last_updated', 'N/A')}")
-print(f" Tables: {len(state.get('tables', {}))}\n")
-
-for table_id, table_state in state.get('tables', {}).items():
- table_name = table_state.get('table_name', table_id)
- rows = table_state.get('rows', 0)
- size_mb = table_state.get('file_size_mb', 0)
- strategy = table_state.get('strategy', 'N/A')
-
- print(f" - {table_name}: {rows:,} rows, {size_mb:.2f} MB ({strategy})")
-PYTHON
-fi
-
-# Done
-echo ""
-echo "✅ Setup complete!"
-echo ""
-echo "💡 Data is ready for analysis with Claude Code"
-echo " DuckDB database: user/duckdb/analytics.duckdb"
-echo ""
diff --git a/scripts/standalone_profiler.py b/scripts/standalone_profiler.py
deleted file mode 100644
index 69fb241..0000000
--- a/scripts/standalone_profiler.py
+++ /dev/null
@@ -1,1271 +0,0 @@
-#!/usr/bin/env python3
-"""
-Standalone Data Profiler — DuckDB-based table profiling for Parquet/CSV files.
-
-Zero external dependencies beyond DuckDB. Produces a comprehensive JSON profile
-with column statistics, histograms, alerts, and sample data.
-
-Usage:
- # Profile a single Parquet file
- python standalone_profiler.py data/orders.parquet
-
- # Profile a directory of Parquet files (treated as one table)
- python standalone_profiler.py data/partitioned_orders/
-
- # Profile a CSV file
- python standalone_profiler.py data/customers.csv
-
- # Custom output path
- python standalone_profiler.py data/orders.parquet -o profiles/orders_profile.json
-
- # Specify primary key for duplicate detection
- python standalone_profiler.py data/orders.parquet --primary-key order_id
-
- # Composite primary key
- python standalone_profiler.py data/orders.parquet --primary-key "order_id,line_id"
-
- # Profile multiple files at once
- python standalone_profiler.py data/orders.parquet data/customers.parquet data/products.csv
-
- # Generate HTML report alongside JSON
- python standalone_profiler.py data/orders.parquet --html
-
- # Generate HTML from existing profile JSON
- python standalone_profiler.py --from-json profile.json
-
-Output:
- JSON file with table-level and column-level statistics, alerts, histograms,
- top values for categorical columns, and sample rows.
- With --html: self-contained HTML file viewable in any browser.
-
-Requirements:
- pip install duckdb
-"""
-
-import argparse
-import html as html_mod
-import json
-import logging
-import math
-import os
-import sys
-import tempfile
-from datetime import datetime, timezone
-from pathlib import Path
-from typing import Any, Dict, List, Optional, Tuple
-
-import duckdb
-
-# ---------------------------------------------------------------------------
-# Logging
-# ---------------------------------------------------------------------------
-logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
-)
-logger = logging.getLogger("profiler")
-
-# ---------------------------------------------------------------------------
-# Profiler configuration
-# ---------------------------------------------------------------------------
-SAMPLE_THRESHOLD = 500_000 # Sample tables larger than this
-SAMPLE_SIZE = 500_000
-MAX_CATEGORICAL_DISTINCT = 50 # Treat as categorical if unique <= this
-TOP_VALUES_LIMIT = 10 # Number of top values for categorical columns
-HISTOGRAM_BINS = 15 # Number of bins for numeric histograms
-SAMPLE_ROWS_LIMIT = 5 # Number of sample rows to include
-SAMPLE_VALUES_LIMIT = 5 # Number of sample distinct values per column
-
-# Alert thresholds
-ALERT_HIGH_MISSING_PCT = 30.0
-ALERT_MISSING_PCT = 5.0
-ALERT_IMBALANCE_PCT = 60.0
-ALERT_ZEROS_PCT = 50.0
-ALERT_HIGH_CARDINALITY = 50
-
-
-# ---------------------------------------------------------------------------
-# DuckDB type classification
-# ---------------------------------------------------------------------------
-def classify_type(duckdb_type: str) -> str:
- """Map a DuckDB type string to a simplified category."""
- t = duckdb_type.upper()
- if t in ("BOOLEAN", "BOOL"):
- return "BOOLEAN"
- if t in ("DATE",):
- return "DATE"
- if "TIMESTAMP" in t:
- return "TIMESTAMP"
- base_type = t.split("(")[0].strip()
- if base_type in (
- "FLOAT", "DOUBLE", "DECIMAL", "REAL", "FLOAT4", "FLOAT8",
- "NUMERIC", "HUGEINT", "INTEGER", "INT", "BIGINT", "SMALLINT",
- "TINYINT", "INT8", "INT4", "INT2", "INT1", "UBIGINT",
- "UINTEGER", "USMALLINT", "UTINYINT",
- ):
- return "NUMERIC"
- return "STRING"
-
-
-# ---------------------------------------------------------------------------
-# Helpers
-# ---------------------------------------------------------------------------
-def _round(value: Any, digits: int = 2) -> Any:
- """Round a value if it is a float, otherwise return as-is."""
- if value is None:
- return None
- if isinstance(value, float):
- if math.isnan(value) or math.isinf(value):
- return None
- return round(value, digits)
- return value
-
-
-def _format_number(n: float) -> str:
- """Format large numbers with human-readable suffixes for histogram bin labels."""
- if n is None:
- return "?"
- abs_n = abs(n)
- if abs_n >= 1_000_000_000:
- return f"{n / 1_000_000_000:.1f}B"
- if abs_n >= 1_000_000:
- return f"{n / 1_000_000:.1f}M"
- if abs_n >= 1_000:
- return f"{n / 1_000:.1f}K"
- if isinstance(n, float) and n != int(n):
- return f"{n:.2f}"
- return str(int(n))
-
-
-def write_json_atomic(path: Path, data: Any) -> None:
- """Write JSON to path atomically via tempfile + os.replace."""
- path.parent.mkdir(parents=True, exist_ok=True)
- fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp")
- try:
- with os.fdopen(fd, "w") as f:
- json.dump(data, f, indent=2, default=str)
- os.chmod(tmp_path, 0o644)
- os.replace(tmp_path, str(path))
- logger.info("Wrote %s", path)
- except Exception:
- try:
- os.unlink(tmp_path)
- except OSError:
- pass
- raise
-
-
-# ---------------------------------------------------------------------------
-# Batch statistics functions
-# ---------------------------------------------------------------------------
-def _batch_base_stats(
- con: duckdb.DuckDBPyConnection,
- view_name: str,
- columns: List[str],
-) -> Dict[str, Tuple[int, int]]:
- """Get non_null and unique counts for all columns in a single query.
-
- Returns: {col_name: (non_null_count, unique_count)}
- """
- if not columns:
- return {}
-
- parts = []
- for col_name in columns:
- safe = f'"{col_name}"'
- parts.append(f"COUNT({safe})")
- parts.append(f"COUNT(DISTINCT {safe})")
-
- sql = f"SELECT {', '.join(parts)} FROM {view_name}"
- row = con.execute(sql).fetchone()
-
- result: Dict[str, Tuple[int, int]] = {}
- idx = 0
- for col_name in columns:
- result[col_name] = (row[idx], row[idx + 1])
- idx += 2
- return result
-
-
-def _batch_numeric_stats(
- con: duckdb.DuckDBPyConnection,
- view_name: str,
- numeric_cols: List[str],
-) -> Dict[str, Dict[str, Any]]:
- """Get aggregate statistics for all numeric columns in a single query."""
- if not numeric_cols:
- return {}
-
- parts = []
- for col_name in numeric_cols:
- safe = f'"{col_name}"'
- parts.extend([
- f"MIN({safe})",
- f"MAX({safe})",
- f"AVG({safe})",
- f"MEDIAN({safe})",
- f"STDDEV({safe})",
- f"PERCENTILE_CONT(0.05) WITHIN GROUP (ORDER BY {safe})",
- f"PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY {safe})",
- f"PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY {safe})",
- f"PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY {safe})",
- f"SUM(CASE WHEN {safe} = 0 THEN 1 ELSE 0 END)",
- f"SUM(CASE WHEN {safe} < 0 THEN 1 ELSE 0 END)",
- ])
-
- sql = f"SELECT {', '.join(parts)} FROM {view_name}"
- row = con.execute(sql).fetchone()
-
- result: Dict[str, Dict[str, Any]] = {}
- idx = 0
- for col_name in numeric_cols:
- result[col_name] = {
- "min": row[idx], "max": row[idx + 1], "mean": row[idx + 2],
- "median": row[idx + 3], "stddev": row[idx + 4],
- "p5": row[idx + 5], "p25": row[idx + 6],
- "p75": row[idx + 7], "p95": row[idx + 8],
- "zeros": row[idx + 9], "negative": row[idx + 10],
- }
- idx += 11
- return result
-
-
-def _batch_string_stats(
- con: duckdb.DuckDBPyConnection,
- view_name: str,
- string_cols: List[str],
-) -> Dict[str, Dict[str, Any]]:
- """Get string length statistics for all string columns in a single query."""
- if not string_cols:
- return {}
-
- parts = []
- for col_name in string_cols:
- safe = f'"{col_name}"'
- parts.extend([
- f"MIN(LENGTH({safe}))",
- f"MAX(LENGTH({safe}))",
- f"AVG(LENGTH({safe}))",
- ])
-
- sql = f"SELECT {', '.join(parts)} FROM {view_name}"
- row = con.execute(sql).fetchone()
-
- result: Dict[str, Dict[str, Any]] = {}
- idx = 0
- for col_name in string_cols:
- result[col_name] = {
- "min_length": row[idx] if row[idx] is not None else 0,
- "max_length": row[idx + 1] if row[idx + 1] is not None else 0,
- "avg_length": _round(row[idx + 2]) if row[idx + 2] is not None else 0.0,
- }
- idx += 3
- return result
-
-
-def _batch_date_stats(
- con: duckdb.DuckDBPyConnection,
- view_name: str,
- date_cols: List[str],
- category_map: Dict[str, str],
-) -> Dict[str, Dict[str, Any]]:
- """Get date range statistics for all date/timestamp columns in a single query."""
- if not date_cols:
- return {}
-
- parts = []
- for col_name in date_cols:
- safe = f'"{col_name}"'
- cast_expr = f"CAST({safe} AS DATE)" if category_map[col_name] == "TIMESTAMP" else safe
- parts.extend([
- f"MIN({cast_expr})",
- f"MAX({cast_expr})",
- ])
-
- sql = f"SELECT {', '.join(parts)} FROM {view_name}"
- row = con.execute(sql).fetchone()
-
- result: Dict[str, Dict[str, Any]] = {}
- idx = 0
- for col_name in date_cols:
- earliest = row[idx]
- latest = row[idx + 1]
- span_days = None
- if earliest is not None and latest is not None:
- try:
- delta = latest - earliest
- span_days = delta.days if hasattr(delta, "days") else int(delta)
- except (TypeError, ValueError):
- span_days = None
- result[col_name] = {
- "earliest": str(earliest) if earliest is not None else None,
- "latest": str(latest) if latest is not None else None,
- "span_days": span_days,
- }
- idx += 2
- return result
-
-
-def _batch_boolean_stats(
- con: duckdb.DuckDBPyConnection,
- view_name: str,
- bool_cols: List[str],
-) -> Dict[str, Dict[str, Any]]:
- """Get boolean true/false counts for all boolean columns in a single query."""
- if not bool_cols:
- return {}
-
- parts = []
- for col_name in bool_cols:
- safe = f'"{col_name}"'
- parts.extend([
- f"SUM(CASE WHEN {safe} = TRUE THEN 1 ELSE 0 END)",
- f"SUM(CASE WHEN {safe} = FALSE THEN 1 ELSE 0 END)",
- ])
-
- sql = f"SELECT {', '.join(parts)} FROM {view_name}"
- row = con.execute(sql).fetchone()
-
- result: Dict[str, Dict[str, Any]] = {}
- idx = 0
- for col_name in bool_cols:
- true_count = int(row[idx]) if row[idx] is not None else 0
- false_count = int(row[idx + 1]) if row[idx + 1] is not None else 0
- total = true_count + false_count
- result[col_name] = {
- "true_count": true_count,
- "false_count": false_count,
- "true_pct": _round(100.0 * true_count / total) if total > 0 else 0.0,
- }
- idx += 2
- return result
-
-
-# ---------------------------------------------------------------------------
-# Core: profile a single file/table
-# ---------------------------------------------------------------------------
-def profile_table(
- source_path: Path,
- table_name: Optional[str] = None,
- primary_key: Optional[str] = None,
-) -> Dict[str, Any]:
- """Profile a single Parquet file, Parquet directory, or CSV file.
-
- Args:
- source_path: Path to .parquet file, directory of .parquet files, or .csv file.
- table_name: Display name for the table (defaults to filename stem).
- primary_key: Comma-separated primary key column(s) for duplicate detection.
-
- Returns:
- Dict with complete profile (table-level + column-level statistics).
- """
- source_path = Path(source_path)
- if table_name is None:
- table_name = source_path.stem
-
- pk_columns: List[str] = []
- if primary_key:
- pk_columns = [c.strip() for c in primary_key.split(",")]
-
- con = duckdb.connect()
-
- # Determine read expression based on file type
- if source_path.is_dir():
- read_expr = f"read_parquet('{source_path}/*.parquet')"
- elif source_path.suffix.lower() == ".csv":
- read_expr = f"read_csv_auto('{source_path}')"
- else:
- read_expr = f"read_parquet('{source_path}')"
-
- # Get row count to decide on sampling
- total_rows = con.execute(f"SELECT COUNT(*) FROM {read_expr}").fetchone()[0]
-
- # Materialize into temp table (reads source files once instead of per-query)
- view_name = "tbl"
- sampled = total_rows > SAMPLE_THRESHOLD
- if sampled:
- con.execute(
- f"CREATE TEMP TABLE {view_name} AS SELECT * FROM {read_expr} USING SAMPLE {SAMPLE_SIZE} ROWS"
- )
- working_rows = con.execute(f"SELECT COUNT(*) FROM {view_name}").fetchone()[0]
- else:
- con.execute(f"CREATE TEMP TABLE {view_name} AS SELECT * FROM {read_expr}")
- working_rows = total_rows
-
- # Column metadata
- col_info = con.execute(f"DESCRIBE {view_name}").fetchall()
-
- # Classify columns by type
- all_col_names: List[str] = []
- type_map: Dict[str, str] = {}
- category_map: Dict[str, str] = {}
- numeric_cols: List[str] = []
- string_cols: List[str] = []
- date_cols: List[str] = []
- bool_cols: List[str] = []
-
- for col_row in col_info:
- col_name = col_row[0]
- col_type = col_row[1]
- all_col_names.append(col_name)
- type_map[col_name] = col_type
- category = classify_type(col_type)
- category_map[col_name] = category
- if category == "NUMERIC":
- numeric_cols.append(col_name)
- elif category == "STRING":
- string_cols.append(col_name)
- elif category in ("DATE", "TIMESTAMP"):
- date_cols.append(col_name)
- elif category == "BOOLEAN":
- bool_cols.append(col_name)
-
- # ---- Batch queries (one scan per type category) ----
- base_stats = _batch_base_stats(con, view_name, all_col_names)
-
- numeric_batch: Dict[str, Dict[str, Any]] = {}
- try:
- numeric_batch = _batch_numeric_stats(con, view_name, numeric_cols)
- except Exception as exc:
- logger.warning("Batch numeric stats failed: %s", exc)
-
- string_batch: Dict[str, Dict[str, Any]] = {}
- try:
- string_batch = _batch_string_stats(con, view_name, string_cols)
- except Exception as exc:
- logger.warning("Batch string stats failed: %s", exc)
-
- date_batch: Dict[str, Dict[str, Any]] = {}
- try:
- date_batch = _batch_date_stats(con, view_name, date_cols, category_map)
- except Exception as exc:
- logger.warning("Batch date stats failed: %s", exc)
-
- boolean_batch: Dict[str, Dict[str, Any]] = {}
- try:
- boolean_batch = _batch_boolean_stats(con, view_name, bool_cols)
- except Exception as exc:
- logger.warning("Batch boolean stats failed: %s", exc)
-
- # ---- Build column profiles ----
- columns: List[Dict[str, Any]] = []
- variable_types: Dict[str, int] = {}
- total_null_count = 0
- total_cells = working_rows * len(col_info) if col_info else 0
- first_date_col: Optional[Dict[str, Any]] = None
-
- for col_name in all_col_names:
- col_type = type_map[col_name]
- category = category_map[col_name]
- safe_col = f'"{col_name}"'
- variable_types[category] = variable_types.get(category, 0) + 1
-
- non_null, unique_count = base_stats.get(col_name, (0, 0))
- null_count = working_rows - non_null
-
- completeness_pct = _round(100.0 * non_null / working_rows) if working_rows > 0 else 0.0
- unique_pct = _round(100.0 * unique_count / non_null) if non_null > 0 else 0.0
- missing_pct = _round(100.0 * null_count / working_rows) if working_rows > 0 else 0.0
- is_pk = col_name in pk_columns
-
- # Sample values
- sample_values: List[str] = []
- try:
- rows = con.execute(
- f"""
- SELECT DISTINCT CAST({safe_col} AS VARCHAR) AS v
- FROM {view_name}
- WHERE {safe_col} IS NOT NULL
- LIMIT {SAMPLE_VALUES_LIMIT}
- """
- ).fetchall()
- sample_values = [r[0] for r in rows if r[0] is not None]
- except Exception:
- pass
-
- # Alerts
- alerts: List[str] = []
- if unique_count == 1 and null_count == 0:
- alerts.append("constant")
- if unique_pct == 100.0 and null_count == 0 and non_null > 0:
- alerts.append("unique")
- if missing_pct > ALERT_HIGH_MISSING_PCT:
- alerts.append("high_missing")
- elif missing_pct > ALERT_MISSING_PCT:
- alerts.append("missing")
-
- col_profile: Dict[str, Any] = {
- "name": col_name,
- "type": col_type,
- "type_category": category,
- "completeness_pct": completeness_pct,
- "null_count": null_count,
- "unique_count": unique_count,
- "unique_pct": unique_pct,
- "sample_values": sample_values,
- "is_primary_key": is_pk,
- "alerts": alerts,
- }
-
- # Type-specific stats
- try:
- if category == "NUMERIC" and col_name in numeric_batch:
- raw = numeric_batch[col_name]
- min_val = _round(raw["min"])
- max_val = _round(raw["max"])
- zeros = int(raw["zeros"]) if raw["zeros"] is not None else 0
- negative = int(raw["negative"]) if raw["negative"] is not None else 0
- zeros_pct = _round(100.0 * zeros / non_null) if non_null > 0 else 0.0
- negative_pct = _round(100.0 * negative / non_null) if non_null > 0 else 0.0
-
- if zeros_pct > ALERT_ZEROS_PCT and "zeros" not in alerts:
- alerts.append("zeros")
-
- # Histogram (FLOOR-based bucketing, works in all DuckDB versions)
- histogram: Dict[str, Any] = {"bins": [], "counts": []}
- if min_val is not None and max_val is not None and min_val != max_val:
- try:
- bin_width = (float(max_val) - float(min_val)) / HISTOGRAM_BINS
- bucket_rows = con.execute(
- f"""
- SELECT
- LEAST(FLOOR((CAST({safe_col} AS DOUBLE) - {float(min_val)}) / {bin_width}), {HISTOGRAM_BINS - 1}) + 1 AS bucket,
- COUNT(*) AS cnt
- FROM {view_name}
- WHERE {safe_col} IS NOT NULL
- GROUP BY bucket
- ORDER BY bucket
- """
- ).fetchall()
-
- bin_labels: List[str] = []
- bin_counts: List[int] = []
- bucket_dict = {int(r[0]): int(r[1]) for r in bucket_rows if r[0] is not None}
- for i in range(1, HISTOGRAM_BINS + 1):
- lo = float(min_val) + (i - 1) * bin_width
- hi = float(min_val) + i * bin_width
- bin_labels.append(f"{_format_number(lo)}-{_format_number(hi)}")
- bin_counts.append(bucket_dict.get(i, 0))
- histogram = {"bins": bin_labels, "counts": bin_counts}
- except Exception as exc:
- logger.debug("Histogram failed for column %s: %s", col_name, exc)
-
- col_profile["numeric_stats"] = {
- "min": min_val,
- "max": max_val,
- "mean": _round(raw["mean"]),
- "median": _round(raw["median"]),
- "stddev": _round(raw["stddev"]),
- "p5": _round(raw["p5"]),
- "p25": _round(raw["p25"]),
- "p75": _round(raw["p75"]),
- "p95": _round(raw["p95"]),
- "zeros": zeros,
- "zeros_pct": zeros_pct,
- "negative": negative,
- "negative_pct": negative_pct,
- "histogram": histogram,
- }
-
- elif category == "STRING" and col_name in string_batch:
- sl = string_batch[col_name]
- is_categorical = unique_count <= MAX_CATEGORICAL_DISTINCT
-
- top_values: List[Dict[str, Any]] = []
- if is_categorical and non_null > 0:
- rows = con.execute(
- f"""
- SELECT {safe_col} AS val, COUNT(*) AS cnt
- FROM {view_name}
- WHERE {safe_col} IS NOT NULL
- GROUP BY {safe_col}
- ORDER BY cnt DESC
- LIMIT {TOP_VALUES_LIMIT}
- """
- ).fetchall()
- for row in rows:
- pct = _round(100.0 * row[1] / non_null) if non_null > 0 else 0.0
- top_values.append({"value": str(row[0]), "count": row[1], "pct": pct})
-
- if top_values and top_values[0]["pct"] > ALERT_IMBALANCE_PCT:
- if "imbalance" not in alerts:
- alerts.append("imbalance")
- else:
- if unique_count > ALERT_HIGH_CARDINALITY and "high_cardinality" not in alerts:
- alerts.append("high_cardinality")
-
- col_profile["string_stats"] = {
- "min_length": sl["min_length"],
- "max_length": sl["max_length"],
- "avg_length": sl["avg_length"],
- "top_values": top_values,
- }
-
- elif category in ("DATE", "TIMESTAMP") and col_name in date_batch:
- dr = date_batch[col_name]
- cast_expr = f"CAST({safe_col} AS DATE)" if category == "TIMESTAMP" else safe_col
-
- # Date histogram (YEAR/QUARTER grouping)
- histogram = {"bins": [], "counts": []}
- try:
- rows = con.execute(
- f"""
- SELECT
- YEAR({cast_expr}) AS yr,
- QUARTER({cast_expr}) AS qtr,
- COUNT(*) AS cnt
- FROM {view_name}
- WHERE {safe_col} IS NOT NULL
- GROUP BY yr, qtr
- ORDER BY yr, qtr
- """
- ).fetchall()
- histogram["bins"] = [f"{int(r[0])}-Q{int(r[1])}" for r in rows]
- histogram["counts"] = [int(r[2]) for r in rows]
- except Exception as exc:
- logger.debug("Date histogram failed for %s: %s", col_name, exc)
-
- col_profile["date_stats"] = {
- "earliest": dr["earliest"],
- "latest": dr["latest"],
- "span_days": dr["span_days"],
- "histogram": histogram,
- }
-
- if first_date_col is None and dr["earliest"]:
- first_date_col = col_profile["date_stats"]
-
- elif category == "BOOLEAN" and col_name in boolean_batch:
- col_profile["boolean_stats"] = boolean_batch[col_name]
-
- except Exception as exc:
- logger.warning("Type-specific stats failed for %s: %s", col_name, exc)
-
- columns.append(col_profile)
- total_null_count += null_count
-
- # Table-level completeness
- avg_completeness = 0.0
- if columns:
- avg_completeness = _round(
- sum(c["completeness_pct"] for c in columns) / len(columns)
- )
- missing_cells_pct = _round(100.0 * total_null_count / total_cells) if total_cells > 0 else 0.0
-
- # Duplicate rows (by primary key)
- duplicate_rows = 0
- if pk_columns and working_rows > 0:
- try:
- pk_expr = ", ".join(f'"{c}"' for c in pk_columns)
- distinct_pk = con.execute(
- f"SELECT COUNT(DISTINCT ({pk_expr})) FROM {view_name}"
- ).fetchone()[0]
- duplicate_rows = working_rows - distinct_pk
- except Exception as exc:
- logger.debug("Duplicate check failed: %s", exc)
-
- # Sample rows
- sample_rows: List[Dict[str, Any]] = []
- try:
- sample_result = con.execute(f"SELECT * FROM {view_name} LIMIT {SAMPLE_ROWS_LIMIT}")
- sample_col_names = [desc[0] for desc in sample_result.description]
- for row in sample_result.fetchall():
- sample_rows.append(
- {sample_col_names[i]: str(v) if v is not None else None for i, v in enumerate(row)}
- )
- except Exception as exc:
- logger.debug("Sample rows failed: %s", exc)
-
- # Aggregate column alerts to table level
- table_alerts: List[Dict[str, str]] = []
- alert_messages = {
- "constant": "{col} is constant (single value)",
- "unique": "{col} has all unique values",
- "high_missing": "{col} has {pct}% missing values",
- "missing": "{col} has {pct}% missing values",
- "imbalance": "{col} is highly imbalanced (top value {pct}%)",
- "zeros": "{col} has {pct}% zero values",
- "high_cardinality": "{col} has high cardinality ({n} distinct)",
- }
- for col in columns:
- col_alert_name = col.get("name", "")
- missing_pct_val = _round(100.0 - col.get("completeness_pct", 100.0))
- for a in col.get("alerts", []):
- if a in ("high_missing", "missing"):
- msg = alert_messages[a].format(col=col_alert_name, pct=missing_pct_val)
- elif a == "imbalance":
- top_pct = 0.0
- ss = col.get("string_stats", {})
- tv = ss.get("top_values", [])
- if tv:
- top_pct = tv[0].get("pct", 0.0)
- msg = alert_messages[a].format(col=col_alert_name, pct=top_pct)
- elif a == "zeros":
- ns = col.get("numeric_stats", {})
- msg = alert_messages[a].format(col=col_alert_name, pct=ns.get("zeros_pct", 0.0))
- elif a == "high_cardinality":
- msg = alert_messages[a].format(col=col_alert_name, n=col.get("unique_count", 0))
- else:
- msg = alert_messages.get(a, f"{col_alert_name}: {a}").format(col=col_alert_name)
- table_alerts.append({"column": col_alert_name, "type": a, "message": msg})
-
- # File size
- file_size_mb = None
- try:
- if source_path.is_dir():
- total_bytes = sum(f.stat().st_size for f in source_path.glob("*.parquet"))
- elif source_path.exists():
- total_bytes = source_path.stat().st_size
- else:
- total_bytes = 0
- file_size_mb = _round(total_bytes / (1024 * 1024))
- except OSError:
- pass
-
- # Date range from first date column
- date_range = None
- if first_date_col:
- date_range = {
- "earliest": first_date_col.get("earliest"),
- "latest": first_date_col.get("latest"),
- "span_days": first_date_col.get("span_days"),
- }
-
- con.close()
-
- return {
- "table_name": table_name,
- "source_path": str(source_path),
- "row_count": total_rows,
- "column_count": len(col_info),
- "file_size_mb": file_size_mb,
- "primary_key": primary_key,
- "avg_completeness": avg_completeness,
- "missing_cells": total_null_count,
- "missing_cells_pct": missing_cells_pct,
- "duplicate_rows": duplicate_rows,
- "variable_types": variable_types,
- "date_range": date_range,
- "alerts": table_alerts,
- "sampled": sampled,
- "columns": columns,
- "sample_rows": sample_rows,
- }
-
-
-# ---------------------------------------------------------------------------
-# HTML report generation
-# ---------------------------------------------------------------------------
-
-_TYPE_COLORS = {
- "NUMERIC": "#8b5cf6",
- "STRING": "#3b82f6",
- "DATE": "#f59e0b",
- "TIMESTAMP": "#f59e0b",
- "BOOLEAN": "#10b981",
-}
-
-_ALERT_SEVERITY = {
- "high_missing": "e",
- "missing": "w",
- "constant": "i",
- "unique": "i",
- "imbalance": "w",
- "zeros": "w",
- "high_cardinality": "i",
-}
-
-_CSS = """
-*{margin:0;padding:0;box-sizing:border-box}
-body{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',system-ui,sans-serif;
- background:#f8fafc;color:#0f172a;line-height:1.5;font-size:14px}
-.wrap{max-width:1200px;margin:0 auto;padding:20px 24px 60px}
-header{padding:20px 0 16px;border-bottom:1px solid #e2e8f0;margin-bottom:24px}
-h1{font-size:22px;font-weight:700}
-.meta{color:#64748b;font-size:12px;margin-top:2px}
-.cards{display:grid;grid-template-columns:repeat(auto-fit,minmax(140px,1fr));gap:10px;margin:16px 0}
-.card{background:#fff;border-radius:8px;box-shadow:0 1px 3px rgba(0,0,0,.08);padding:14px 16px;text-align:center}
-.card-v{font-size:26px;font-weight:700}.card-l{font-size:10px;color:#64748b;text-transform:uppercase;letter-spacing:.05em;margin-top:2px}
-.tabs{display:flex;gap:4px;margin-bottom:20px;flex-wrap:wrap}
-.tab{padding:7px 14px;border-radius:6px;cursor:pointer;font-size:13px;border:1px solid #e2e8f0;background:#fff;transition:all .15s}
-.tab:hover{border-color:#93c5fd}.tab.active{background:#3b82f6;color:#fff;border-color:#3b82f6}
-.tsec{display:none}.tsec.active{display:block}
-.alerts{margin:12px 0}
-.alert{padding:7px 12px;border-radius:6px;margin:3px 0;font-size:12px}
-.alert-w{background:#fef3c7;color:#92400e}.alert-e{background:#fee2e2;color:#991b1b}.alert-i{background:#dbeafe;color:#1e40af}
-.types{display:flex;gap:6px;margin:10px 0;flex-wrap:wrap}
-.tbadge{padding:2px 10px;border-radius:12px;font-size:11px;font-weight:600;color:#fff}
-.stitle{font-size:15px;font-weight:600;margin:20px 0 8px}
-.col-list{background:#fff;border-radius:8px;box-shadow:0 1px 3px rgba(0,0,0,.08);overflow:hidden}
-.col-hdr{display:grid;grid-template-columns:minmax(140px,1.5fr) 56px minmax(100px,1fr) 90px 50px;
- align-items:center;padding:8px 14px;cursor:pointer;border-bottom:1px solid #f1f5f9;gap:8px;transition:background .1s}
-.col-hdr:hover{background:#f8fafc}
-.col-hdr-label{cursor:default;font-weight:600;font-size:11px;color:#64748b;border-bottom-width:2px}
-.cn{font-weight:600;font-size:13px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap}
-.pk{color:#f59e0b;font-size:10px;font-weight:700;margin-left:3px}
-.ct{font-size:10px;padding:2px 6px;border-radius:4px;text-align:center;font-weight:600;color:#fff;white-space:nowrap}
-.cbar-bg{height:5px;background:#e2e8f0;border-radius:3px;overflow:hidden;flex:1}
-.cbar{height:100%;border-radius:3px}
-.compl{display:flex;align-items:center;gap:6px}
-.cpct{font-size:11px;color:#64748b;min-width:32px;text-align:right}
-.cuniq{font-size:11px;color:#64748b;text-align:right;overflow:hidden;text-overflow:ellipsis;white-space:nowrap}
-.calerts span{padding:1px 5px;border-radius:8px;background:#fee2e2;color:#991b1b;font-size:10px}
-.col-det{display:none;padding:14px 16px;border-bottom:1px solid #e2e8f0;background:#fafbfc}
-.col-det.open{display:block}
-.dgrid{display:grid;grid-template-columns:1fr 1fr;gap:16px}
-@media(max-width:768px){.dgrid{grid-template-columns:1fr}.col-hdr{grid-template-columns:1fr 50px 1fr 70px 40px;font-size:12px}}
-.stbl{font-size:12px;width:100%;border-collapse:collapse}
-.stbl td{padding:2px 0}.stbl td:first-child{color:#64748b;padding-right:10px;white-space:nowrap}
-.stbl td:last-child{font-weight:500;text-align:right}
-.histogram{display:flex;align-items:flex-end;gap:1px;height:72px;margin:10px 0}
-.h-bar{flex:1;background:#3b82f6;border-radius:2px 2px 0 0;min-width:3px;transition:background .15s;cursor:default;min-height:1px}
-.h-bar:hover{background:#2563eb}
-.h-labels{display:flex;justify-content:space-between;font-size:9px;color:#94a3b8;margin-top:2px}
-.tvr{display:grid;grid-template-columns:110px 1fr 42px 52px;align-items:center;gap:6px;padding:2px 0;font-size:12px}
-.tvl{overflow:hidden;text-overflow:ellipsis;white-space:nowrap}
-.tvb-bg{height:7px;background:#e2e8f0;border-radius:4px;overflow:hidden}
-.tvb{height:100%;background:#3b82f6;border-radius:4px}
-.tvp{text-align:right;color:#64748b;font-size:11px}
-.tvc{text-align:right;color:#94a3b8;font-size:10px}
-.bbar{display:flex;height:18px;border-radius:4px;overflow:hidden;font-size:10px}
-.bt{background:#22c55e;color:#fff;display:flex;align-items:center;justify-content:center}
-.bf{background:#e2e8f0;color:#64748b;display:flex;align-items:center;justify-content:center}
-.svs{display:flex;gap:4px;flex-wrap:wrap;margin-top:6px}
-.sv{background:#f1f5f9;padding:1px 7px;border-radius:4px;font-size:11px;color:#475569}
-.swrap{margin-top:20px}
-.stog{cursor:pointer;color:#3b82f6;font-size:13px;font-weight:500;user-select:none}
-.sdata{display:none;margin-top:8px;overflow-x:auto}
-.sdata.open{display:block}
-table.dt{border-collapse:collapse;font-size:11px;width:100%}
-table.dt th{background:#f1f5f9;padding:5px 8px;text-align:left;font-weight:600;border:1px solid #e2e8f0;white-space:nowrap}
-table.dt td{padding:5px 8px;border:1px solid #e2e8f0;max-width:180px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap}
-.foot{text-align:center;color:#94a3b8;font-size:11px;margin-top:40px;padding-top:16px;border-top:1px solid #e2e8f0}
-@media print{.tabs,.stog{display:none}.tsec,.col-det,.sdata{display:block!important}body{background:#fff}.card{box-shadow:none;border:1px solid #e2e8f0}}
-"""
-
-_JS = """
-function switchTab(n){
- document.querySelectorAll('.tab').forEach(function(t){t.classList.toggle('active',t.dataset.t===n)});
- document.querySelectorAll('.tsec').forEach(function(s){s.classList.toggle('active',s.id==='t-'+n)});
-}
-function toggleCol(el){el.nextElementSibling.classList.toggle('open')}
-function toggleSample(el){el.nextElementSibling.classList.toggle('open')}
-"""
-
-
-def _esc(s: Any) -> str:
- return html_mod.escape(str(s)) if s is not None else ""
-
-
-def _slug(name: str) -> str:
- return name.replace(" ", "-").replace(".", "-").replace("/", "-")
-
-
-def _fnum(n: Any) -> str:
- if n is None:
- return "-"
- if isinstance(n, float):
- if n == int(n) and abs(n) < 1e15:
- return f"{int(n):,}"
- return f"{n:,.2f}"
- if isinstance(n, int):
- return f"{n:,}"
- return str(n)
-
-
-def _compl_color(pct: float) -> str:
- if pct >= 95:
- return "#22c55e"
- if pct >= 70:
- return "#eab308"
- return "#ef4444"
-
-
-def _render_hist(bins: list, counts: list) -> str:
- if not bins or not counts:
- return ""
- max_c = max(counts) or 1
- bars = []
- for b, c in zip(bins, counts):
- pct = c / max_c * 100
- bars.append(f'')
- return (
- f'{"".join(bars)}
'
- f'{_esc(bins[0])}{_esc(bins[-1])}
'
- )
-
-
-def _render_top_vals(top_values: list) -> str:
- if not top_values:
- return ""
- max_pct = max((tv.get("pct", 0) for tv in top_values), default=1) or 1
- rows = []
- for tv in top_values:
- bar_w = tv.get("pct", 0) / max_pct * 100
- rows.append(
- f''
- f'
{_esc(str(tv["value"])[:30])}'
- f'
'
- f'
{tv.get("pct", 0)}%'
- f'
({_fnum(tv.get("count", 0))})'
- f'
'
- )
- return "".join(rows)
-
-
-def _render_col_detail(col: dict) -> str:
- parts: List[str] = []
- ns = col.get("numeric_stats")
- if ns:
- parts.append('')
- for label, key in [
- ("Min", "min"), ("Max", "max"), ("Mean", "mean"),
- ("Median", "median"), ("Std Dev", "stddev"),
- ("P5", "p5"), ("P25", "p25"), ("P75", "p75"), ("P95", "p95"),
- ("Zeros", "zeros"), ("Zeros %", "zeros_pct"),
- ("Negative", "negative"), ("Negative %", "negative_pct"),
- ]:
- parts.append(f'| {label} | {_fnum(ns.get(key))} |
')
- parts.append('
')
- h = ns.get("histogram", {})
- parts.append(_render_hist(h.get("bins", []), h.get("counts", [])))
- parts.append('
')
-
- ss = col.get("string_stats")
- if ss:
- parts.append('')
- parts.append(f'| Min length | {_fnum(ss.get("min_length"))} |
')
- parts.append(f'| Max length | {_fnum(ss.get("max_length"))} |
')
- parts.append(f'| Avg length | {_fnum(ss.get("avg_length"))} |
')
- parts.append('
')
- tv = ss.get("top_values", [])
- if tv:
- parts.append('Top Values
')
- parts.append(_render_top_vals(tv))
-
- ds = col.get("date_stats")
- if ds:
- parts.append('')
- parts.append(f'| Earliest | {_esc(ds.get("earliest", "-"))} |
')
- parts.append(f'| Latest | {_esc(ds.get("latest", "-"))} |
')
- parts.append(f'| Span | {_fnum(ds.get("span_days"))} days |
')
- parts.append('
')
- h = ds.get("histogram", {})
- parts.append(_render_hist(h.get("bins", []), h.get("counts", [])))
- parts.append('
')
-
- bs = col.get("boolean_stats")
- if bs:
- tc, fc = bs.get("true_count", 0), bs.get("false_count", 0)
- tp = bs.get("true_pct", 0)
- fp = round(100 - tp, 1) if tp else 0
- parts.append(
- f''
- f'
True {tp}% ({tc:,})
'
- f'
False {fp}% ({fc:,})
'
- f'
'
- )
-
- sv = col.get("sample_values", [])
- if sv:
- parts.append('Sample values:
')
- parts.append('')
- for v in sv:
- parts.append(f'{_esc(str(v)[:50])}')
- parts.append('
')
-
- return "".join(parts)
-
-
-def generate_html_report(profile_data: Dict[str, Any], output_path: Path) -> None:
- """Generate a standalone HTML report from profile data.
-
- Args:
- profile_data: Full profile dict with "tables" key.
- output_path: Path to write the HTML file.
- """
- tables = profile_data.get("tables", {})
- generated_at = profile_data.get("generated_at", "")
- if not tables:
- logger.warning("No tables in profile data")
- return
-
- total_tables = len(tables)
- total_rows = sum(t.get("row_count", 0) for t in tables.values())
- total_cols = sum(t.get("column_count", 0) for t in tables.values())
- compl_vals = [t.get("avg_completeness", 0) for t in tables.values()]
- avg_compl = round(sum(compl_vals) / len(compl_vals), 1) if compl_vals else 0
- total_alerts = sum(len(t.get("alerts", [])) for t in tables.values())
- table_names = list(tables.keys())
-
- h: List[str] = []
- h.append('')
- h.append('')
- h.append('Data Profile Report')
- h.append(f'')
-
- # Header
- h.append('
')
-
- # Summary cards
- h.append('
')
- for val, label in [
- (_fnum(total_tables), "Tables"),
- (_fnum(total_rows), "Total Rows"),
- (_fnum(total_cols), "Total Columns"),
- (f"{avg_compl}%", "Avg Completeness"),
- (_fnum(total_alerts), "Alerts"),
- ]:
- h.append(f'
')
- h.append('
')
-
- # Table tabs
- if total_tables > 1:
- h.append('
')
- for i, name in enumerate(table_names):
- act = " active" if i == 0 else ""
- sl = _slug(name)
- h.append(f'
{_esc(name)}
')
- h.append('
')
-
- # Table sections
- for i, (name, tbl) in enumerate(tables.items()):
- act = " active" if i == 0 or total_tables == 1 else ""
- sl = _slug(name)
- h.append(f'
')
- h.append(f'{_esc(name)}
')
-
- # Stat cards
- h.append('')
- rc = tbl.get("row_count", 0)
- cc = tbl.get("column_count", 0)
- tc = tbl.get("avg_completeness", 0)
- sz = tbl.get("file_size_mb")
- dupes = tbl.get("duplicate_rows", 0)
- sampled = tbl.get("sampled", False)
- for val, label in [
- (_fnum(rc), "Rows"),
- (_fnum(cc), "Columns"),
- (f"{tc}%", "Completeness"),
- (f"{sz} MB" if sz is not None else "-", "File Size"),
- ]:
- h.append(f'
')
- dr = tbl.get("date_range")
- if dr and dr.get("earliest"):
- h.append(
- f'
'
- f'{_esc(dr["earliest"])} — {_esc(dr["latest"])}
'
- f'
Date Range ({_fnum(dr.get("span_days"))} days)
'
- )
- if dupes:
- h.append(f'
{_fnum(dupes)}
Duplicate Rows
')
- if sampled:
- h.append(f'
')
- h.append('
')
-
- # Variable types
- vt = tbl.get("variable_types", {})
- if vt:
- h.append('')
- for cat, cnt in sorted(vt.items()):
- color = _TYPE_COLORS.get(cat, "#6b7280")
- h.append(f'{cat} {cnt}')
- h.append('
')
-
- # Alerts
- alerts = tbl.get("alerts", [])
- if alerts:
- h.append('')
- for a in alerts:
- sev = _ALERT_SEVERITY.get(a.get("type", ""), "i")
- h.append(f'
{_esc(a.get("message", ""))}
')
- h.append('
')
-
- # Column list
- columns = tbl.get("columns", [])
- if columns:
- h.append('Columns
')
- h.append('')
- # Header row
- h.append('
')
- h.append('
Name
Type
')
- h.append('
Completeness
')
- h.append('
Unique
')
- h.append('
')
-
- for col in columns:
- cname = col.get("name", "")
- cat = col.get("type_category", "STRING")
- ctype = col.get("type", "")
- cpct = col.get("completeness_pct", 0)
- uniq = col.get("unique_count", 0)
- upct = col.get("unique_pct", 0)
- ca = col.get("alerts", [])
- is_pk = col.get("is_primary_key", False)
- color = _TYPE_COLORS.get(cat, "#6b7280")
- cc_col = _compl_color(cpct)
- pk_html = '
PK' if is_pk else ""
- alert_html = f'
{len(ca)}' if ca else ""
-
- h.append('
')
- h.append(f'
{_esc(cname)}{pk_html}
')
- h.append(f'
{_esc(cat[:4])}
')
- h.append(f'
')
- h.append(f'
{_fnum(uniq)} ({upct}%)
')
- h.append(f'
{alert_html}
')
- h.append('
')
- h.append(f'
{_render_col_detail(col)}
')
-
- h.append('
')
-
- # Sample data
- sample_rows = tbl.get("sample_rows", [])
- if sample_rows:
- h.append('')
- h.append(f'
▶ Sample Data ({len(sample_rows)} rows)
')
- h.append('
')
- headers = list(sample_rows[0].keys())
- h.append('' + ''.join(f'| {_esc(hd)} | ' for hd in headers) + '
')
- for row in sample_rows:
- h.append('' + ''.join(
- f'| {_esc(str(row.get(hd, ""))[:60])} | '
- for hd in headers
- ) + '
')
- h.append('
')
-
- h.append('')
-
- # Footer + JS
- h.append('')
- h.append(f'')
- h.append('
')
-
- output_path.parent.mkdir(parents=True, exist_ok=True)
- output_path.write_text("\n".join(h), encoding="utf-8")
- logger.info("Wrote HTML report: %s", output_path)
-
-
-# ---------------------------------------------------------------------------
-# CLI
-# ---------------------------------------------------------------------------
-def main() -> None:
- parser = argparse.ArgumentParser(
- description="Profile Parquet/CSV files and output JSON statistics + optional HTML report.",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
-Examples:
- %(prog)s data/orders.parquet
- %(prog)s data/orders.parquet --primary-key order_id --html
- %(prog)s data/orders.parquet data/customers.csv -o profiles.json --html
- %(prog)s --from-json profile.json
- """,
- )
- parser.add_argument(
- "files",
- nargs="*",
- help="Parquet file(s), directory of Parquet files, or CSV file(s) to profile",
- )
- parser.add_argument(
- "-o", "--output",
- default="profile.json",
- help="Output JSON file path (default: profile.json)",
- )
- parser.add_argument(
- "--primary-key",
- default=None,
- help="Comma-separated primary key column(s) for duplicate detection",
- )
- parser.add_argument(
- "--html",
- action="store_true",
- help="Also generate a standalone HTML report",
- )
- parser.add_argument(
- "--from-json",
- metavar="PATH",
- default=None,
- help="Generate HTML report from existing profile JSON (no profiling)",
- )
- parser.add_argument(
- "--quiet", "-q",
- action="store_true",
- help="Suppress info logging",
- )
- args = parser.parse_args()
-
- if args.quiet:
- logging.getLogger("profiler").setLevel(logging.WARNING)
-
- # Mode 1: Generate HTML from existing JSON
- if args.from_json:
- json_path = Path(args.from_json)
- if not json_path.exists():
- logger.error("File not found: %s", json_path)
- sys.exit(1)
- with open(json_path) as f:
- profile_data = json.load(f)
- html_path = json_path.with_suffix(".html")
- generate_html_report(profile_data, html_path)
- logger.info("Done: HTML report at %s", html_path)
- return
-
- # Mode 2: Profile files
- if not args.files:
- parser.error("Provide files to profile, or use --from-json")
-
- profiles: Dict[str, Any] = {}
- success = 0
- errors = 0
-
- for file_path_str in args.files:
- file_path = Path(file_path_str)
- if not file_path.exists():
- logger.error("File not found: %s", file_path)
- errors += 1
- continue
-
- try:
- logger.info("Profiling %s ...", file_path)
- profile = profile_table(
- source_path=file_path,
- primary_key=args.primary_key,
- )
- profiles[profile["table_name"]] = profile
- success += 1
- logger.info(
- " %s: %d rows, %d cols, %d alerts",
- profile["table_name"],
- profile["row_count"],
- profile["column_count"],
- len(profile["alerts"]),
- )
- except Exception as exc:
- logger.error("Failed to profile %s: %s", file_path, exc)
- errors += 1
-
- if not profiles:
- logger.error("No tables profiled successfully")
- sys.exit(1)
-
- output = {
- "generated_at": datetime.now(timezone.utc).isoformat(),
- "version": "1.0",
- "tables": profiles,
- }
-
- output_path = Path(args.output)
- write_json_atomic(output_path, output)
-
- # Generate HTML if requested
- if args.html:
- html_path = output_path.with_suffix(".html")
- generate_html_report(output, html_path)
-
- logger.info("Done: %d profiled, %d errors. Output: %s", success, errors, output_path)
-
-
-if __name__ == "__main__":
- main()
diff --git a/scripts/sync_config_template.yaml b/scripts/sync_config_template.yaml
deleted file mode 100644
index 47c9358..0000000
--- a/scripts/sync_config_template.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-# Data Analyst - Sync Configuration
-#
-# This file controls which additional datasets are synchronized.
-# Core telemetry data is ALWAYS synced (projects, users, jobs, etc.)
-#
-# To enable a dataset: change false -> true
-# To disable a dataset: change true -> false
-#
-# After changing, run: bash server/scripts/sync_data.sh
-
-datasets:
- # Jira Support tickets (SUPPORT project)
- # Size: ~50MB (parquet only), Updated: real-time via webhooks
- # Recommended for: Support team, Customer Success
- jira: false
-
- # Jira attachment files (images, logs, etc.)
- # Size: ~500MB+, requires jira: true
- # Only enable if you need to view actual attachment files locally
- jira_attachments: false
-
- # Future datasets:
- # finance: false # Investor reporting (coming soon)
diff --git a/scripts/test_sync.sh b/scripts/test_sync.sh
deleted file mode 100755
index 343bfb9..0000000
--- a/scripts/test_sync.sh
+++ /dev/null
@@ -1,138 +0,0 @@
-#!/bin/bash
-# Test rsync reliability fixes (Issue #197)
-#
-# Downloads data into gitignored data/test_sync/ to verify rsync_reliable
-# wrapper works end-to-end without touching repo directories.
-#
-# Usage:
-# bash scripts/test_sync.sh # Full test sync
-# bash scripts/test_sync.sh --dry-run # Preview only
-
-set -e
-
-DRY_RUN=""
-for arg in "$@"; do
- case "$arg" in
- --dry-run) DRY_RUN="--dry-run" ;;
- esac
-done
-
-# --- Rsync reliability settings (same as sync_data.sh) ---
-RSYNC_SSH_OPTS='ssh -o ServerAliveInterval=60 -o ServerAliveCountMax=3 -o ConnectTimeout=30'
-RSYNC_TIMEOUT=300
-RSYNC_MAX_RETRIES=3
-RSYNC_RETRY_DELAY=5
-
-rsync_reliable() {
- local attempt=1
- local delay=$RSYNC_RETRY_DELAY
- while [[ $attempt -le $RSYNC_MAX_RETRIES ]]; do
- rsync -e "$RSYNC_SSH_OPTS" --timeout="$RSYNC_TIMEOUT" \
- --partial-dir=.rsync-partial "$@" && return 0
- local exit_code=$?
- # Exit codes 23/24 = partial transfer (permission denied, vanished files) — not retryable
- if [[ $exit_code -eq 23 || $exit_code -eq 24 ]]; then
- echo " Warning: rsync partial transfer (exit $exit_code), continuing..."
- return 0
- fi
- if [[ $attempt -lt $RSYNC_MAX_RETRIES ]]; then
- echo " Rsync failed (exit $exit_code), retrying in ${delay}s (attempt $attempt/$RSYNC_MAX_RETRIES)..."
- sleep "$delay"
- delay=$((delay * 2))
- fi
- attempt=$((attempt + 1))
- done
- echo " ERROR: Rsync failed after $RSYNC_MAX_RETRIES attempts"
- return 1
-}
-
-# --- Test destination (gitignored) ---
-DEST="./data/test_sync"
-LOG_DIR="./data/sync_diagnostics"
-mkdir -p "$DEST" "$LOG_DIR"
-LOG_FILE="$LOG_DIR/test_sync_$(date '+%Y%m%d_%H%M%S').log"
-
-log() {
- echo "$@" | tee -a "$LOG_FILE"
-}
-
-log "=== Rsync reliability test ==="
-log "Started: $(date -u '+%Y-%m-%dT%H:%M:%SZ')"
-log "Destination: $DEST"
-log "SSH opts: $RSYNC_SSH_OPTS"
-log "Timeout: ${RSYNC_TIMEOUT}s, Retries: $RSYNC_MAX_RETRIES"
-log ""
-
-# --- Test 1: SSH connectivity ---
-log "--- Test 1: SSH connectivity ---"
-START=$(date +%s)
-if ssh -o ConnectTimeout=10 data-analyst "echo ok" >> "$LOG_FILE" 2>&1; then
- ELAPSED=$(($(date +%s) - START))
- log "PASS (${ELAPSED}s)"
-else
- ELAPSED=$(($(date +%s) - START))
- log "FAIL (${ELAPSED}s) — cannot reach server, aborting"
- exit 1
-fi
-log ""
-
-# --- Test 2: Small directory (docs) ---
-log "--- Test 2: Sync docs (small, text, -avz) ---"
-mkdir -p "$DEST/docs"
-START=$(date +%s)
-rsync_reliable -avz --delete $DRY_RUN "data-analyst:server/docs/" "$DEST/docs/" >> "$LOG_FILE" 2>&1
-ELAPSED=$(($(date +%s) - START))
-if [[ -z "$DRY_RUN" ]]; then
- DOC_COUNT=$(find "$DEST/docs" -type f 2>/dev/null | wc -l | tr -d ' ')
- DOC_SIZE=$(du -sh "$DEST/docs" 2>/dev/null | cut -f1)
- log "PASS (${ELAPSED}s) — $DOC_COUNT files, $DOC_SIZE"
-else
- log "PASS dry-run (${ELAPSED}s)"
-fi
-log ""
-
-# --- Test 3: Scripts ---
-log "--- Test 3: Sync scripts (small, text, -avz) ---"
-mkdir -p "$DEST/scripts"
-START=$(date +%s)
-rsync_reliable -avz --delete $DRY_RUN "data-analyst:server/scripts/" "$DEST/scripts/" >> "$LOG_FILE" 2>&1
-ELAPSED=$(($(date +%s) - START))
-if [[ -z "$DRY_RUN" ]]; then
- SCRIPT_COUNT=$(find "$DEST/scripts" -type f 2>/dev/null | wc -l | tr -d ' ')
- log "PASS (${ELAPSED}s) — $SCRIPT_COUNT files"
-else
- log "PASS dry-run (${ELAPSED}s)"
-fi
-log ""
-
-# --- Test 4: Core parquet (the big one — no -z, with keepalive) ---
-log "--- Test 4: Sync core parquet (large, binary, -av WITHOUT -z) ---"
-log "This is the transfer that was hanging. Expect ~7000 files..."
-mkdir -p "$DEST/parquet"
-START=$(date +%s)
-rsync_reliable -av --delete --progress \
- --exclude='jira/' --exclude='kbc_telemetry_expert/' \
- $DRY_RUN data-analyst:server/parquet/ "$DEST/parquet/" 2>&1 | \
- tee -a "$LOG_FILE" | \
- grep -E '(^sent |^total size|^rsync|Warning:|ERROR:)' || true
-ELAPSED=$(($(date +%s) - START))
-if [[ -z "$DRY_RUN" ]]; then
- PARQUET_COUNT=$(find "$DEST/parquet" -name '*.parquet' -type f 2>/dev/null | wc -l | tr -d ' ')
- PARQUET_SIZE=$(du -sh "$DEST/parquet" 2>/dev/null | cut -f1)
- log "DONE (${ELAPSED}s) — $PARQUET_COUNT parquet files, $PARQUET_SIZE"
-else
- log "DONE dry-run (${ELAPSED}s)"
-fi
-log ""
-
-# --- Summary ---
-log "=== Summary ==="
-log "Finished: $(date -u '+%Y-%m-%dT%H:%M:%SZ')"
-if [[ -z "$DRY_RUN" ]]; then
- TOTAL_SIZE=$(du -sh "$DEST" 2>/dev/null | cut -f1)
- TOTAL_FILES=$(find "$DEST" -type f 2>/dev/null | wc -l | tr -d ' ')
- log "Total: $TOTAL_FILES files, $TOTAL_SIZE in $DEST"
-fi
-log "Full log: $LOG_FILE"
-log ""
-log "To clean up test data: rm -rf $DEST"
diff --git a/scripts/update.sh b/scripts/update.sh
deleted file mode 100755
index 88a2d68..0000000
--- a/scripts/update.sh
+++ /dev/null
@@ -1,71 +0,0 @@
-#!/bin/bash
-
-# update.sh - Data synchronization script
-#
-# This script performs:
-# 1. Data synchronization from configured data source
-# 2. DuckDB views reinitialization
-#
-# Note: Git pull and dependency updates are handled by deploy.sh (GitHub Actions)
-
-set -e # Exit on error
-
-echo "🔄 AI Data Analyst - Data Update"
-echo ""
-
-# Check that we're in the correct folder (same check as config.py uses)
-if [ ! -f "docs/data_description.md" ]; then
- echo "❌ Run script from project root (folder with docs/data_description.md)"
- exit 1
-fi
-
-# Note: Git pull and dependency updates are handled by deploy.sh (GitHub Actions)
-# This script focuses only on data synchronization
-
-# Activate virtual environment
-# Supports both local (./.venv) and server (/opt/data-analyst/.venv) setups
-echo ""
-echo "1️⃣ Activating virtual environment..."
-if [ -d ".venv" ]; then
- source .venv/bin/activate
- echo " ✅ Virtual environment activated (local)"
-elif [ -d "/opt/data-analyst/.venv" ]; then
- source /opt/data-analyst/.venv/bin/activate
- echo " ✅ Virtual environment activated (server)"
-else
- echo " ❌ Virtual environment not found. Run init.sh first."
- exit 1
-fi
-
-# Data synchronization
-echo ""
-echo "2️⃣ Synchronizing data..."
-echo ""
-
-# Run data sync
-if python3 -m src.data_sync; then
- echo ""
- echo " ✅ Data synchronization complete"
-else
- echo ""
- echo " ❌ Data synchronization failed. Check logs above."
- exit 1
-fi
-
-# Generate data profiles (for catalog profiler)
-echo ""
-echo "3️⃣ Generating data profiles..."
-if python3 -m src.profiler; then
- echo " ✅ Data profiles generated"
-else
- echo " ⚠️ Data profiling failed (non-fatal). Check logs above."
- # Non-fatal: profiling failure should not break the pipeline
-fi
-
-# Done
-echo ""
-echo "✅ Data sync complete!"
-echo ""
-echo "💡 Parquet files are ready in data/parquet/"
-echo " To setup DuckDB views, run: ./scripts/setup_views.sh"
-echo ""