chore: remove 17 dead files from v1 architecture

Removes unused scripts (collect_session, generate_user_sync_configs,
standalone_profiler, remote_query, update, setup_views, test_sync,
activate_venv, backfill_gap, sync_config_template), legacy config
(data_description.md.example), llms.txt, completed planning docs
(plan-rsync-fix, plan_parquet_types_fix, plan-corporate-memory), and
notification examples/ directory.
This commit is contained in:
ZdenekSrotyr 2026-04-09 17:14:06 +02:00
parent 988cdb4320
commit f3bd378b47
18 changed files with 0 additions and 3143 deletions

View file

@ -1,164 +0,0 @@
# Data Description
This file defines the tables available for synchronization and analysis.
Copy this file to `data_description.md` and customize for your data sources.
## Tables
```yaml
# Folder mapping: data source bucket -> local folder name
folder_mapping:
"in.c-example": "example"
tables:
# Small reference table - full refresh, no automatic sync
- id: "in.c-example.customers"
name: "customers"
description: "Customer master data"
primary_key: "id"
sync_strategy: "full_refresh"
# Large transactional table - daily automatic sync with profiling
- id: "in.c-example.orders"
name: "orders"
description: "Order transactions with line items"
primary_key: "id"
sync_strategy: "partitioned"
partition_by: "created_at"
partition_column_type: "DATE"
partition_granularity: "day"
incremental_window_days: 3
max_history_days: 450
query_mode: "local"
sync_schedule: "daily 05:00"
profile_after_sync: true
# Frequently updated table - sync every hour, skip profiling
- id: "in.c-example.events"
name: "events"
description: "Real-time event stream"
primary_key: "event_id"
sync_strategy: "partitioned"
partition_by: "event_date"
partition_column_type: "DATE"
partition_granularity: "day"
incremental_window_days: 1
query_mode: "local"
sync_schedule: "every 1h"
profile_after_sync: false
# Remote table - too large for local sync, queried via BigQuery
- id: "project.dataset.page_views"
name: "page_views"
description: "Page-level traffic metrics (~5M rows/day)"
primary_key: "page_view_id"
sync_strategy: "partitioned"
partition_by: "event_date"
partition_column_type: "DATE"
partition_granularity: "day"
query_mode: "remote"
bq_entity_type: "view"
```
## Table Configuration Reference
### Required Fields
| Field | Description | Example |
|-------|-------------|---------|
| `id` | Full table identifier in data source | `"in.c-crm.company"` |
| `name` | Short name (used for Parquet filenames) | `"company"` |
| `description` | Human-readable description | `"Company master data"` |
| `primary_key` | Primary key column(s), comma-separated | `"id"` or `"order_id, line_id"` |
| `sync_strategy` | How data is downloaded (see below) | `"full_refresh"` |
### Sync Strategy
| Strategy | Description | Use for |
|----------|-------------|---------|
| `full_refresh` | Downloads entire table each sync | Small reference tables (< 100K rows) |
| `incremental` | Downloads changed rows via changedSince | Medium tables with update tracking |
| `partitioned` | Downloads by time partitions, overwrites only recent ones | Large tables with date column |
### Partitioning
| Field | Default | Description |
|-------|---------|-------------|
| `partition_by` | *(none)* | Column to partition by (e.g., `"created_at"`, `"event_date"`) |
| `partition_granularity` | `"month"` | `"day"`, `"month"`, or `"year"` |
| `partition_column_type` | `"TIMESTAMP"` | SQL type: `"DATE"`, `"TIMESTAMP"`, or `"DATETIME"` |
| `incremental_window_days` | `7` | How many recent days to re-download on each sync |
| `max_history_days` | *(all)* | Maximum history to keep (e.g., `450` for ~15 months) |
| `initial_load_chunk_days` | `30` | Chunk size for first-time download |
### Query Mode
| Field | Default | Description |
|-------|---------|-------------|
| `query_mode` | `"local"` | How the AI agent queries this table |
| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) |
| Mode | Description | Best for |
|------|-------------|----------|
| `local` | Synced to Parquet, queried via DuckDB | Tables < 2 GB, fast queries |
| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data |
| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data |
### Remote Table Metadata
Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent
write safe, efficient queries:
| Field | Description |
|-------|-------------|
| `grain` | Row granularity description |
| `volume` | Size estimates (rows_per_day, unique entities) |
| `columns` | Column descriptions with value distributions |
| `dimension_profile` | Cardinality per dimension |
| `query_result_estimates` | Expected rows after GROUP BY combinations |
| `join_keys` | How to join with other tables |
### Automatic Sync Schedule
| Field | Default | Description |
|-------|---------|-------------|
| `sync_schedule` | *(none)* | When to automatically sync this table |
| `profile_after_sync` | `true` | Run data profiler after sync completes |
The `sync_schedule` field controls automatic synchronization via the `data-refresh`
systemd timer (runs every 15 minutes). If omitted, the table is only synced manually.
**Schedule formats:**
| Format | Example | Description |
|--------|---------|-------------|
| `every {N}m` | `"every 15m"`, `"every 30m"` | Sync every N minutes |
| `every {N}h` | `"every 1h"`, `"every 6h"` | Sync every N hours |
| `daily HH:MM` | `"daily 05:00"`, `"daily 17:30"` | Sync once per day at HH:MM UTC |
| *(omitted)* | - | Manual sync only (`python -m src.data_sync`) |
**How scheduling works:**
- A systemd timer runs `python -m src.data_sync --scheduled` every 15 minutes
- For each table with `sync_schedule`, it checks the last sync time from `sync_state.json`
- `every` schedules: syncs if enough time has elapsed since last sync
- `daily` schedules: syncs once after the target time passes (skips if already synced today)
- Tables without `sync_schedule` are never synced automatically
**Profiling control:**
- `profile_after_sync: true` (default) - runs profiler after sync to update column statistics
- `profile_after_sync: false` - skips profiler (use for frequently synced tables where
profiling overhead is not worth it; the AI agent uses slightly older statistics)
- When profiling runs, the webapp is automatically restarted to load new statistics
### Optional Fields
| Field | Default | Description |
|-------|---------|-------------|
| `folder` | *(from folder_mapping)* | Override output folder name |
| `row_filter` | *(none)* | SQL WHERE clause (e.g., `"date >= DATE_SUB(CURRENT_DATE(), INTERVAL 15 MONTH)"`) |
| `columns` | *(all)* | List of columns to sync (subset) |
| `incremental_column` | *(none)* | Column for timestamp-based incremental sync (BigQuery) |
| `dataset` | *(none)* | Dataset group name for on-demand tables |
| `catalog_fqn` | *(auto)* | OpenMetadata FQN override (auto-derived from table ID if not set) |
| `foreign_keys` | `[]` | List of foreign key relationships |
| `where_filters` | `[]` | List of filters for Keboola Storage API |

View file

@ -1,348 +0,0 @@
# Corporate Memory Module - Implementation Plan
## Overview
Server-side modul, který každých 30 minut sbírá znalosti z `CLAUDE.local.md` všech uživatelů, pomocí Claude HAIKU je extrahuje a filtruje, a vytváří sdílenou firemní knowledge base. Uživatelé mohou hlasovat a oblíbené znalosti se jim syncují do `.claude/rules/`.
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ Cron (*/30 * * * *) │
│ └── /usr/local/bin/collect-knowledge │
│ 1. Čte /home/*/CLAUDE.local.md │
│ 2. Volá Claude HAIKU pro extrakci (AI filtering) │
│ 3. Ukládá do /data/corporate-memory/knowledge.json │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ /data/corporate-memory/ (deploy:data-ops 2770)│
│ ├── knowledge.json # {items: {id: {title, content, ...}}} │
│ ├── votes.json # {username: {item_id: 1|-1}} │
│ ├── user_hashes.json # {username: {file_hash, last_proc}} │
│ └── collection.log # Logy z HAIKU procesingu │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Webapp │
│ ├── Dashboard widget: stats (contributors, items, your rules) │
│ ├── /corporate-memory: sub-page s 👍/👎 hlasováním │
│ └── API: /api/corporate-memory/* │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ sync_data.sh │
│ └── Stáhne upvoted items do .claude/rules/*.md │
└─────────────────────────────────────────────────────────────────┘
```
## Data Model
**knowledge.json:**
```json
{
"items": {
"km_abc123": {
"id": "km_abc123",
"title": "DuckDB query optimization",
"content": "When querying large parquet files...",
"category": "data_analysis",
"tags": ["duckdb", "performance"],
"source_users": ["john.doe", "jane.smith", "bob"],
"votes_up": 5,
"votes_down": 1,
"score": 4,
"extracted_at": "2026-02-05T10:30:00Z",
"updated_at": "2026-02-05T12:00:00Z"
}
},
"metadata": {
"last_collection": "2026-02-05T10:30:00Z",
"total_users": 8
}
}
```
**votes.json** (kdo jak hlasoval - pro UI "already voted"):
```json
{
"john.doe": {"km_abc123": 1, "km_def456": -1},
"jane.smith": {"km_abc123": 1}
}
```
### Voting & Popularity
- `votes_up` / `votes_down` - počet palců nahoru/dolů (viditelné v UI)
- `score` = votes_up - votes_down (pro řazení)
- votes.json trackuje KDO hlasoval (pro zabránění dvojího hlasování)
- UI NEZOBRAZUJE jména hlasujících, jen počty
**Řazení v UI:**
- **Nejoblíbenější** (Most Popular): ORDER BY score DESC
- **Nejméně oblíbené** (Least Popular): ORDER BY score ASC
- **Nejnovější** (Recent): ORDER BY extracted_at DESC
- **Nejvíc přispěvatelů** (Most Contributors): ORDER BY len(source_users) DESC
**user_hashes.json** (tracking změn CLAUDE.local.md):
```json
{
"john.doe": {
"file_hash": "a1b2c3d4e5f6...",
"last_processed": "2026-02-05T10:30:00Z"
},
"jane.smith": {
"file_hash": "f6e5d4c3b2a1...",
"last_processed": "2026-02-05T09:00:00Z"
}
}
```
## Change Detection & Deduplication
### Change Detection (šetření tokenů)
**DŮLEŽITÉ**: Každých 30 minut se zpracovávají POUZE změněné soubory!
```
┌─────────────────────────────────────────────────────────────────┐
│ Cron běží každých 30 min │
│ │
│ Pro každého uživatele: │
│ 1. Spočítej MD5 hash /home/$user/CLAUDE.local.md │
│ 2. Porovnej s uloženým hashem v user_hashes.json │
│ 3. Pokud STEJNÝ → PŘESKOČ (žádné API volání) │
│ 4. Pokud ZMĚNĚNÝ → zpracuj pomocí HAIKU API │
│ │
│ Typicky: 8 uživatelů, 1-2 změny denně = 2-4 API volání/den │
└─────────────────────────────────────────────────────────────────┘
```
```python
def collect_all():
users_processed = 0
users_skipped = 0
for user_dir in Path("/home").iterdir():
claude_file = user_dir / "CLAUDE.local.md"
if not claude_file.exists():
continue
username = user_dir.name
current_hash = hashlib.md5(claude_file.read_bytes()).hexdigest()
stored_hash = user_hashes.get(username, {}).get("file_hash")
if current_hash == stored_hash:
users_skipped += 1
continue # ← PŘESKOČÍ - žádné API volání!
# Pouze změněné soubory volají HAIKU
new_knowledge = extract_knowledge(claude_file.read_text())
deduplicate_and_merge(new_knowledge, username)
update_user_hash(username, current_hash)
users_processed += 1
log(f"Processed: {users_processed}, Skipped: {users_skipped}")
```
### Deduplikace znalostí
Při extrakci nových znalostí od změněného uživatele:
1. HAIKU vrátí seznam znalostí
2. Pro každou novou znalost:
- Hledej podobnou v CELÉ knowledge base (všichni uživatelé)
- Pokud existuje podobná → přidej uživatele do `source_users[]`
- Pokud neexistuje → vytvoř novou
```python
def deduplicate_and_merge(new_items: list, username: str):
"""Deduplikuje nové znalosti proti celé knowledge base."""
for new_item in new_items:
similar_id = find_similar_knowledge(new_item, knowledge_base)
if similar_id:
# Existující znalost - přidej uživatele
knowledge_base["items"][similar_id]["source_users"].append(username)
knowledge_base["items"][similar_id]["updated_at"] = now()
else:
# Nová znalost
item_id = generate_id()
knowledge_base["items"][item_id] = {
**new_item,
"id": item_id,
"source_users": [username],
"extracted_at": now()
}
```
### Příklad deduplikace
5 uživatelů má podobnou znalost o DuckDB:
- User A: "Pro rychlé dotazy v DuckDB použij WHERE před JOIN"
- User B: "DuckDB je rychlejší když filtruješ před joinem"
- User C: "Filtruj data před JOIN operací v DuckDB"
- User D: "WHERE klauzule před JOIN zrychlí DuckDB"
- User E: "V DuckDB dej WHERE před JOIN"
→ HAIKU rozpozná jako JEDNU znalost → `source_users: ["user_a", "user_b", "user_c", "user_d", "user_e"]`
## Files to Create
### 1. Server-side Collector
**`services/corporate_memory/collector.py`** - Main collection logic:
- `collect_all()` - iteruje přes /home/*/CLAUDE.local.md
- `should_process_user(username)` - kontrola hash změn (šetří tokeny)
- `extract_knowledge(content)` - volá HAIKU API
- `find_similar_knowledge(new, existing)` - hledá duplicity
- `merge_knowledge(existing, new, username)` - sloučí a přidá uživatele
- `save_knowledge(data)` - atomic JSON write
- `update_user_hash(username, hash)` - uloží hash pro příští run
**`services/corporate_memory/prompts.py`** - HAIKU prompts:
- Prompt pro extrakci znalostí
- Prompt pro AI filtering citlivých dat
- Prompt pro merge podobných znalostí (optional)
**`server/bin/collect-knowledge`** - Shell wrapper:
```bash
#!/bin/bash
cd /opt/data-analyst/repo
/opt/data-analyst/.venv/bin/python -m services.corporate_memory
```
**`services/corporate_memory/systemd/corporate-memory.timer`** + **`services/corporate_memory/systemd/corporate-memory.service`** - Systemd timer (30 min)
### 2. Webapp Backend
**`webapp/corporate_memory_service.py`** (pattern: telegram_service.py):
- `get_knowledge(filter, page)` - seznam znalostí
- `get_stats()` - pro dashboard widget
- `vote(username, item_id, vote)` - +1/-1
- `get_user_rules(username)` - upvoted items pro sync
- `generate_user_rules_file(username)` - vytvoří JSON pro sync
**`webapp/app.py`** - nové routes:
- `GET /corporate-memory` - sub-page
- `GET /api/corporate-memory/knowledge` - list items
- `GET /api/corporate-memory/stats` - dashboard stats
- `POST /api/corporate-memory/vote` - hlasování
- `GET /api/corporate-memory/my-rules` - user's upvoted
### 3. Webapp Frontend
**`webapp/templates/corporate_memory.html`** - sub-page:
- Header se stats
- Řazení: Most Popular | Least Popular | Recent | Most Contributors
- Seznam znalostí s filtry (category, search)
- Každá položka:
- Title + content preview
- Tags (category badges)
- 👍 (count) / 👎 (count) buttons
- "From X contributors" badge
- "Synced to your rules" indicator (if user upvoted)
```
┌─────────────────────────────────────────────────────────────────┐
│ DuckDB query optimization [data_analysis] │
│ When querying large parquet files, filter before JOIN... │
│ │
│ 👍 5 👎 1 | From 3 contributors | ✓ In your rules │
└─────────────────────────────────────────────────────────────────┘
```
**`webapp/templates/dashboard.html`** - přidat widget:
```html
<div class="card-v2 memory-card">
<h3>Corporate Memory</h3>
<div class="memory-stats">
<div class="stat">{{ stats.contributors }} contributors</div>
<div class="stat">{{ stats.knowledge_count }} knowledge items</div>
<div class="stat">{{ stats.your_rules }} your rules</div>
</div>
<a href="/corporate-memory">Browse Knowledge →</a>
</div>
```
### 4. Client Sync
Server generuje .md soubory přímo do `/home/$user/.claude_rules/` pro každého uživatele.
Klient si je jen stáhne.
**`scripts/sync_data.sh`** - přidat:
```bash
# --- Sync corporate memory rules ---
echo "📚 Syncing corporate memory rules..."
mkdir -p .claude/rules
rsync -avz data-analyst:~/.claude_rules/ .claude/rules/ 2>/dev/null || \
scp -r data-analyst:~/.claude_rules/* .claude/rules/ 2>/dev/null || true
```
**Server-side** (v `corporate_memory_service.py`):
- `generate_user_rules(username)` - při hlasování regeneruje .md soubory
- Zapisuje do `/home/$user/.claude_rules/km_*.md`
- Webapp volá po každém vote změně
### 5. Deployment
**`server/deploy.sh`** - additions:
- Vytvořit /data/corporate-memory/ (2770 setgid)
- Nainstalovat collect-knowledge do /usr/local/bin/
- Deploy systemd timer
- Přidat ANTHROPIC_API_KEY do .env
**`server/sudoers-deploy`** - přidat práva pro /data/corporate-memory/
**GitHub Secrets**: `ANTHROPIC_API_KEY`
## Implementation Phases
### Phase 1: Server Infrastructure
1. `services/corporate_memory/` Python modul
2. `server/bin/collect-knowledge` wrapper
3. `services/corporate_memory/systemd/corporate-memory.service` + `.timer`
4. Update `server/deploy.sh`
5. Update sudoers
### Phase 2: Webapp Backend
1. `webapp/corporate_memory_service.py`
2. API endpoints v `webapp/app.py`
3. Update `webapp/config.py` (ANTHROPIC_API_KEY, paths)
### Phase 3: Webapp Frontend
1. Dashboard widget v `dashboard.html`
2. Sub-page `corporate_memory.html`
3. CSS styles
### Phase 4: Client Sync
1. Update `scripts/sync_data.sh` - přidat rsync pro `.claude_rules/`
## Key Files to Modify
| File | Changes |
|------|---------|
| `server/deploy.sh` | Add /data/corporate-memory/, ANTHROPIC_API_KEY, systemd timer |
| `server/sudoers-deploy` | Add permissions for corporate-memory dir |
| `webapp/app.py` | Add routes and API endpoints |
| `webapp/config.py` | Add ANTHROPIC_API_KEY, CORPORATE_MEMORY_DIR |
| `webapp/templates/dashboard.html` | Add Corporate Memory widget |
| `scripts/sync_data.sh` | Add corporate rules sync step |
## Reusable Patterns (from codebase)
- **JSON I/O**: `webapp/telegram_service.py` - atomic writes with tempfile
- **Dashboard widget**: `webapp/templates/dashboard.html` - KPI card pattern
- **Sub-page**: `webapp/templates/catalog.html` - route + template pattern
- **Systemd service**: `services/telegram_bot/systemd/notify-bot.service` - timer pattern
- **Deploy**: `server/deploy.sh` - directory setup, script installation
## Verification
1. **Collector**: `ssh kids "/usr/local/bin/collect-knowledge --dry-run"`
2. **API**: `curl https://your-instance.example.com/api/corporate-memory/stats`
3. **Widget**: Check dashboard shows stats
4. **Voting**: Click 👍/👎, verify votes.json updated
5. **Sync**: Run `sync_data.sh`, check `.claude/rules/` has .md files

View file

@ -1,137 +0,0 @@
# Plan: Fix rsync synchronization hang (#197)
## Problem
Rsync from GCP server (YOUR_SERVER_IP) hangs after 1-5 minutes. Process exists but has 0% CPU and no network activity. 100% reproducible with ~7000 parquet files.
## Root Cause Analysis
The rsync commands in `scripts/sync_data.sh` use plain `rsync -avz` without:
1. **SSH keepalive** - No `-e "ssh -o ServerAliveInterval=60"` to keep the TCP connection alive through firewall/NAT
2. **I/O timeout** - No `--timeout=N` to detect stalled transfers
3. **Retry logic** - A single connection drop kills the entire sync
4. **Partial resume** - No `--partial-dir` so retries restart file transfers from scratch
5. **Compression skip** - `-z` compresses parquet files that are already snappy-compressed (CPU waste)
A stateful firewall or NAT device (GCP Cloud NAT, VPC firewall, or analyst's home router) drops idle TCP connections. Server-side `ClientAliveInterval=300s` sends keepalive, but the client side sends nothing. The firewall sees idle traffic and silently drops the connection. Neither rsync nor SSH client detects the dead connection.
Note: GCP VPC firewall default idle timeout is 600s (10 min). The 1-5 minute hang suggests a more aggressive timeout on Cloud NAT or analyst's local network.
## Implementation Plan
### Step 1: Create diagnostic tests (`tests/test_sync_data.py`)
Real end-to-end tests that download data to gitignored `data/` folder with detailed logging to pinpoint exactly where and why the hang occurs.
**Live diagnostic tests (require server access, marked `@pytest.mark.live`):**
| Test | Purpose |
|------|---------|
| `test_ssh_connectivity` | Basic SSH connection test with timeout |
| `test_rsync_small_directory` | Sync docs (~few files) to verify baseline works |
| `test_rsync_with_keepalive` | Sync parquet with SSH keepalive options |
| `test_rsync_with_timeout` | Sync parquet with `--timeout=60` to detect hang quickly |
| `test_rsync_per_subdirectory` | Sync each parquet subdir separately to isolate which triggers the hang |
| `test_rsync_without_compression` | Sync parquet without `-z` to test if compression causes stall |
Each test logs to `data/sync_diagnostics/` with timestamps, exit codes, bytes transferred, duration.
**Static regression tests (run in CI without server access):**
| Test | Purpose |
|------|---------|
| `test_rsync_commands_use_ssh_keepalive` | Every rsync in scripts has `ServerAliveInterval` |
| `test_rsync_commands_have_timeout` | Every rsync has `--timeout=N` |
| `test_parquet_rsync_does_not_compress` | Parquet rsync uses `-av` not `-avz` |
| `test_sync_scripts_have_retry_logic` | Scripts define retry wrapper |
### Step 2: Run diagnostic tests and analyze results
Before implementing any fix, run the live diagnostic tests to confirm the root cause:
```bash
pytest tests/test_sync_data.py -v -k "live" --tb=short 2>&1 | tee data/sync_diagnostics/test_run.log
```
**Expected outcomes to validate:**
- `test_ssh_connectivity` passes → SSH connection itself is fine
- `test_rsync_small_directory` passes → small syncs work, problem is scale/duration-related
- `test_rsync_with_timeout` fails with timeout → confirms connection drop (rsync exits instead of hanging forever)
- `test_rsync_with_keepalive` passes → confirms keepalive fixes the issue
- `test_rsync_per_subdirectory` → identifies if specific directory triggers the hang or if it's purely time-based
- `test_rsync_without_compression` → reveals if `-z` contributes to the stall
**Decision gate:** Based on results, confirm which combination of fixes is needed before proceeding to Step 3. If keepalive alone fixes it, the other changes are still applied as defense-in-depth.
### Step 3: Fix `scripts/sync_data.sh`
Add reliability wrapper after argument parsing:
```bash
# --- Rsync reliability settings (Issue #197) ---
RSYNC_SSH_OPTS='ssh -o ServerAliveInterval=60 -o ServerAliveCountMax=3 -o ConnectTimeout=30'
RSYNC_TIMEOUT=300
RSYNC_MAX_RETRIES=3
RSYNC_RETRY_DELAY=5
rsync_reliable() {
local attempt=1
local delay=$RSYNC_RETRY_DELAY
while [[ $attempt -le $RSYNC_MAX_RETRIES ]]; do
if rsync -e "$RSYNC_SSH_OPTS" --timeout="$RSYNC_TIMEOUT" \
--partial-dir=.rsync-partial "$@"; then
return 0
fi
local exit_code=$?
if [[ $attempt -lt $RSYNC_MAX_RETRIES ]]; then
echo " Rsync failed (exit $exit_code), retrying in ${delay}s ($attempt/$RSYNC_MAX_RETRIES)..."
sleep "$delay"
delay=$((delay * 2))
fi
attempt=$((attempt + 1))
done
echo " ERROR: Rsync failed after $RSYNC_MAX_RETRIES attempts"
return 1
}
```
Replace all rsync invocations to use `rsync_reliable()` wrapper:
- Drop `-z` for parquet transfers (already snappy-compressed)
- Keep `-z` for text transfers (docs, scripts, metadata)
- `--partial-dir=.rsync-partial` enables resume on retry (Gemini review recommendation)
### Step 4: Fix `scripts/sync_jira.sh`
Same `rsync_reliable()` wrapper and fixes.
### Step 5: Update CI
Add `tests/test_sync_data.py` to `.github/workflows/deploy-guard.yml` (static regression tests only, live diagnostics excluded via marker).
### Step 6: Verify fix end-to-end
Run full sync and confirm no hanging:
```bash
bash scripts/sync_data.sh --dry-run # script syntax OK
bash scripts/sync_data.sh # full sync completes
```
## Files to Modify
| File | Action | Description |
|------|--------|-------------|
| `tests/test_sync_data.py` | CREATE | Diagnostic + regression tests |
| `scripts/sync_data.sh` | EDIT | Add reliability wrapper, fix all rsync calls |
| `scripts/sync_jira.sh` | EDIT | Same fixes |
| `.github/workflows/deploy-guard.yml` | EDIT | Add static tests to CI |
## Review Notes
Plan reviewed by Google Gemini (2026-02-17). Key feedback incorporated:
- **`--partial-dir=.rsync-partial`** added to wrapper for efficient retry resume
- **Root cause confirmed** as most likely firewall/NAT idle timeout (textbook signature)
- **`ServerAliveInterval=60`** confirmed as correct value (low overhead, safe interval)
- **`--timeout=300`** confirmed as reasonable (long enough to avoid false positives)
- **Drop `-z`** confirmed correct (compressing snappy data wastes CPU)
- **Future optimization**: `--files-from` approach for large file sets if needed

View file

@ -1,139 +0,0 @@
# Plan: Fix Parquet Type Preservation & DuckDB Schema Mismatch
## Context
Issues #185, #186, #187 report that DuckDB views fail for partitioned tables when parquet files have schema mismatches. The root cause is twofold:
1. **DuckDB** reads partitioned parquet files without `union_by_name=true`, so if a column is `null` type in one partition and `VARCHAR` in another, it crashes.
2. **Parquet writing** uses `pa.Table.from_pandas()` without explicit schema — columns with all NULLs get inferred as `null` type instead of proper type (STRING, DATE, etc.).
**Note:** #187 is a duplicate of #186.
## Changes
### 1. DuckDB fix — `scripts/duckdb_manager.py` (line 235)
Add `union_by_name=true` to `read_parquet()` for partitioned views:
```python
# Before:
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}')"
# After:
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)"
```
This is an immediate fix — existing partition files with `null`-typed columns will be readable without re-syncing.
### 2. PyArrow type mapping — `src/keboola_client.py` (after line 48)
Add `KEBOOLA_TO_PYARROW_TYPES` dict alongside existing `KEBOOLA_TO_PANDAS_TYPES`:
```python
KEBOOLA_TO_PYARROW_TYPES = {
"STRING": pa.string(), "VARCHAR": pa.string(), "TEXT": pa.string(),
"INTEGER": pa.int64(), "BIGINT": pa.int64(),
"NUMERIC": pa.float64(), "DECIMAL": pa.float64(), "FLOAT": pa.float64(), "DOUBLE": pa.float64(),
"BOOLEAN": pa.bool_(),
"DATE": pa.date32(),
"TIMESTAMP": pa.timestamp("us"), "TIMESTAMP_NTZ": pa.timestamp("us"), "TIMESTAMP_TZ": pa.timestamp("us", tz="UTC"),
}
```
### 3. Refactor provider cascade + new `get_pyarrow_schema()``src/keboola_client.py`
**3a.** Extract duplicated provider cascade logic from `get_pandas_dtypes()`, `get_date_columns()` into a shared `_resolve_keboola_type(col_meta_list) -> str` helper. This logic is repeated 3x — the new method would be the 4th copy without refactoring.
**3b.** Add `get_pyarrow_schema(table_id) -> Optional[pa.Schema]` method to `KeboolaClient`:
- Builds `pa.Schema` from Keboola metadata using `_resolve_keboola_type()` + `KEBOOLA_TO_PYARROW_TYPES`
- Returns `None` with WARNING log if metadata unavailable (graceful fallback)
- Refactor `get_pandas_dtypes()` and `get_date_columns()` to also use `_resolve_keboola_type()`
### 4. New helper `apply_schema_to_table()``src/parquet_manager.py` (after line 103)
Function that casts a `pa.Table` to match a target `pa.Schema`:
- Matches columns by name (not position)
- For `null`-type columns: creates typed null array via `pa.nulls(len, type=target_type)`
- For other mismatches: uses `col.cast(target_type, safe=True)` — if cast fails, keeps original type and logs warning (no data is changed or lost)
- Columns not in schema keep their inferred type
- Logs warnings on cast failures, doesn't crash
### 5. Integrate schema into all parquet write paths
Add `pyarrow_schema` parameter to these methods and apply `apply_schema_to_table()` after `pa.Table.from_pandas()` and `convert_date_columns_to_date32()`:
| Method | File | Line | Current issue |
|--------|------|------|---------------|
| `csv_to_parquet()` | `parquet_manager.py` | 274 | No explicit schema |
| `merge_parquet()` | `parquet_manager.py` | 493 | No explicit schema |
| `_process_csv_to_partitions()` | `data_sync.py` | 848, 854 | No explicit schema |
| `_deduplicate_partitions()` | `data_sync.py` | 890 | Uses `df.to_parquet()` — bypasses entire PyArrow pipeline |
**`_deduplicate_partitions()` fix (line 890):** Replace `df.to_parquet()` with full PyArrow pipeline:
```python
table = pa.Table.from_pandas(df, preserve_index=False)
if date_columns:
table = convert_date_columns_to_date32(table, date_columns)
if pyarrow_schema:
table = apply_schema_to_table(table, pyarrow_schema)
pq.write_table(table, partition_path, compression="snappy")
```
### 6. Update all callers to fetch and pass `pyarrow_schema`
Each caller already fetches `dtypes` and `date_columns`. Add one line after them:
```python
pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id)
```
| Caller method | File:Line | Passes to |
|---------------|-----------|-----------|
| `_full_refresh()` | `data_sync.py:318-328` | `csv_to_parquet()` |
| `_incremental_single_file_sync()` | `data_sync.py:455-508` | `merge_parquet()`, `csv_to_parquet()` |
| `_incremental_partitioned_sync()` | `data_sync.py:600-612` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` |
| `_chunked_initial_load()` | `data_sync.py:673-766` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` |
| `_partitioned_sync()` | `data_sync.py:989-1001` | `_process_csv_to_partitions()`, `_deduplicate_partitions()` |
## Tests — `data/tests/test_parquet_types.py` (local only, gitignored)
Test file lives in `data/tests/` which is gitignored (`data/` is in `.gitignore`). Tests are for **local verification only** — they don't go to the server or CI.
All test data is created via pytest's `tmp_path` fixture (auto-cleaned) or in `data/tests/tmp/`. No test artifacts persist after the run.
10 test cases, all local (no Keboola API calls):
**Core tests:**
1. **`test_union_by_name_resolves_schema_mismatch`** — Two parquet files with conflicting types, DuckDB reads with `union_by_name=true`
2. **`test_apply_schema_fixes_null_type_columns`** — `apply_schema_to_table()` converts null-type columns to proper types
3. **`test_parquet_preserves_types_with_all_null_columns`** — End-to-end: CSV with all-NULL column → Parquet → verify correct type
4. **`test_deduplicate_preserves_date32_types`** — After dedup, DATE32 columns retain their type
5. **`test_keboola_to_pyarrow_mapping_completeness`** — Every key in `KEBOOLA_TO_PANDAS_TYPES` exists in `KEBOOLA_TO_PYARROW_TYPES`
6. **`test_get_pyarrow_schema_from_metadata`** — Unit test with mocked Keboola metadata → verify correct schema
**Additional tests (from Gemini review):**
7. **`test_apply_schema_handles_cast_error_gracefully`** — Column with uncastable values (e.g. "abc" → int64) doesn't crash, logs warning, keeps original type
8. **`test_schema_with_extra_csv_column`** — CSV has column not in metadata → column preserved with inferred type
9. **`test_get_pyarrow_schema_handles_all_types`** — All types from `KEBOOLA_TO_PYARROW_TYPES` mapping verified
10. **`test_deduplicate_with_mixed_types_and_nulls`** — Partition with all-null + mixed-data columns → correct schema after dedup
## Files modified
- `scripts/duckdb_manager.py` — 1 line change (union_by_name)
- `src/keboola_client.py` — new `KEBOOLA_TO_PYARROW_TYPES` mapping + `_resolve_keboola_type()` helper + `get_pyarrow_schema()` method + refactor existing methods
- `src/parquet_manager.py` — new `apply_schema_to_table()` function + `pyarrow_schema` param on `csv_to_parquet()` and `merge_parquet()`
- `src/data_sync.py``pyarrow_schema` param on `_process_csv_to_partitions()` and `_deduplicate_partitions()` + update all 5 callers
- `data/tests/test_parquet_types.py` — new file, 10 tests (**gitignored**, local verification only)
## Verification
1. Run `pytest data/tests/test_parquet_types.py -v` — all 10 tests pass
2. Run `pytest tests/` — existing CI tests still pass (no regressions)
3. Existing parquet files don't need re-sync — the `union_by_name=true` fix handles them immediately
4. On next sync cycle, partitions get re-written with correct schema
## Cleanup after verification
After all tests pass and code changes are committed:
```bash
rm -rf data/tests/
```
Test file and all test artifacts live in `data/` (gitignored) — nothing gets committed or deployed to server.

View file

@ -1,62 +0,0 @@
#!/usr/bin/env python3
"""
Example notification: Data freshness alert.
Checks if the local data is stale (older than threshold) and notifies.
Outputs JSON to stdout for notify-runner.
"""
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
# Configuration
DATA_DIR = Path.home() / "server" / "parquet"
STALE_THRESHOLD_HOURS = 24
def check_freshness() -> dict:
"""Check data freshness by examining parquet file modification times."""
if not DATA_DIR.exists():
return {
"notify": True,
"title": "Data directory missing",
"message": f"Data directory not found: {DATA_DIR}\nRun: bash scripts/sync_data.sh",
"cooldown": "6h",
}
# Find newest parquet file
parquet_files = list(DATA_DIR.rglob("*.parquet"))
if not parquet_files:
return {
"notify": True,
"title": "No data files found",
"message": "No parquet files in data directory.\nRun: bash scripts/sync_data.sh",
"cooldown": "6h",
}
newest_mtime = max(f.stat().st_mtime for f in parquet_files)
newest_dt = datetime.fromtimestamp(newest_mtime, tz=timezone.utc)
now = datetime.now(tz=timezone.utc)
age_hours = (now - newest_dt).total_seconds() / 3600
if age_hours > STALE_THRESHOLD_HOURS:
return {
"notify": True,
"title": f"Data is {age_hours:.0f}h old",
"message": (
f"Latest file: {newest_dt:%Y-%m-%d %H:%M} UTC\n"
f"Age: {age_hours:.1f} hours (threshold: {STALE_THRESHOLD_HOURS}h)\n"
f"Run: bash scripts/sync_data.sh"
),
"cooldown": "6h",
}
return {"notify": False}
if __name__ == "__main__":
result = check_freshness()
print(json.dumps(result))

View file

@ -1,125 +0,0 @@
#!/usr/bin/env python3
"""
Example notification: Daily metric report with chart image.
Generates a summary chart using matplotlib and sends it as a Telegram photo.
Outputs JSON to stdout for notify-runner.
"""
import json
import os
import sys
import tempfile
from datetime import datetime
from pathlib import Path
import duckdb
DB_PATH = Path.home() / "user" / "duckdb" / "analytics.duckdb"
def generate_chart(data: list[tuple]) -> str | None:
"""Generate a bar chart and return the file path."""
try:
import matplotlib
matplotlib.use("Agg") # Non-interactive backend
import matplotlib.pyplot as plt
dates = [row[0].strftime("%m/%d") for row in data]
values = [float(row[1]) for row in data]
fig, ax = plt.subplots(figsize=(8, 4))
bars = ax.bar(dates, values, color="#0073D1", width=0.6)
# Highlight today
if len(bars) > 0:
bars[-1].set_color("#EA580C")
ax.set_title("Daily Revenue - Last 7 Days", fontsize=14, fontweight="bold")
ax.set_ylabel("Revenue ($)")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
# Save to temp file
chart_path = os.path.join(
tempfile.gettempdir(),
f"notify_{os.environ.get('USER', 'user')}_metric_{datetime.now():%Y%m%d}.png",
)
plt.savefig(chart_path, dpi=150, bbox_inches="tight")
plt.close()
return chart_path
except ImportError:
print("matplotlib not installed, skipping chart", file=sys.stderr)
return None
except Exception as e:
print(f"Chart generation error: {e}", file=sys.stderr)
return None
def build_report() -> dict:
"""Build daily metric report."""
if not DB_PATH.exists():
return {"notify": False}
try:
conn = duckdb.connect(str(DB_PATH), read_only=True)
# TODO: Adapt this query to your schema.
# DuckDB views use relative paths, so scripts must run from ~/
# (notify-runner sets cwd to home directory automatically).
#
# Example for a table with date + numeric columns:
# SELECT DATE_TRUNC('day', created_at)::DATE AS day,
# SUM(amount) AS revenue
# FROM my_table
# WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
# GROUP BY 1 ORDER BY 1
rows = conn.execute("""
SELECT
DATE_TRUNC('day', created_at)::DATE AS day,
COUNT(*) AS cnt
FROM kbc_project
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1
""").fetchall()
conn.close()
if not rows:
return {"notify": False}
today_val = float(rows[-1][1]) if rows else 0
total_7d = sum(float(r[1]) for r in rows)
chart_path = generate_chart(rows)
result = {
"notify": True,
"title": "Daily Metric Report",
"message": (
f"Today: {today_val:,.0f}\n"
f"7d total: {total_7d:,.0f}\n"
f"7d avg: {total_7d / len(rows):,.0f}"
),
"cooldown": "1d",
}
if chart_path:
result["image_path"] = chart_path
return result
except Exception as e:
print(f"metric_report error: {e}", file=sys.stderr)
return {"notify": False}
if __name__ == "__main__":
result = build_report()
print(json.dumps(result))

View file

@ -1,83 +0,0 @@
#!/usr/bin/env python3
"""
Example notification: Revenue drop alert (text only).
Checks if today's revenue dropped significantly vs 7-day average.
Outputs JSON to stdout for notify-runner.
"""
import json
import sys
from pathlib import Path
import duckdb
# Configuration
DB_PATH = Path.home() / "user" / "duckdb" / "analytics.duckdb"
DROP_THRESHOLD_PERCENT = 20
def check_revenue_drop() -> dict:
"""Query revenue data and check for significant drop."""
if not DB_PATH.exists():
return {"notify": False}
try:
conn = duckdb.connect(str(DB_PATH), read_only=True)
# Example query - adjust table/column names to your schema
result = conn.execute("""
WITH daily AS (
SELECT
DATE_TRUNC('day', created_at) AS day,
SUM(amount) AS revenue
FROM payments
WHERE created_at >= CURRENT_DATE - INTERVAL '8 days'
GROUP BY 1
),
stats AS (
SELECT
(SELECT revenue FROM daily WHERE day = CURRENT_DATE) AS today_rev,
AVG(CASE WHEN day < CURRENT_DATE AND day >= CURRENT_DATE - INTERVAL '7 days'
THEN revenue END) AS avg_7d
FROM daily
)
SELECT today_rev, avg_7d,
ROUND((1 - today_rev / NULLIF(avg_7d, 0)) * 100, 1) AS drop_pct
FROM stats
""").fetchone()
conn.close()
if result is None or result[0] is None or result[1] is None:
return {"notify": False}
today_rev, avg_7d, drop_pct = result
if drop_pct >= DROP_THRESHOLD_PERCENT:
return {
"notify": True,
"title": f"Revenue dropped {drop_pct}%",
"message": (
f"Today: ${today_rev:,.0f}\n"
f"7d avg: ${avg_7d:,.0f}\n"
f"Drop: {drop_pct}%"
),
"cooldown": "6h",
"data": {
"today_revenue": float(today_rev),
"avg_7d_revenue": float(avg_7d),
"drop_percent": float(drop_pct),
},
}
return {"notify": False}
except Exception as e:
print(f"revenue_drop error: {e}", file=sys.stderr)
return {"notify": False}
if __name__ == "__main__":
result = check_revenue_drop()
print(json.dumps(result))

View file

@ -1,47 +0,0 @@
# AI Data Analyst
> A data distribution platform for AI analytical systems. Syncs data from configured sources (Keboola, CSV, BigQuery), converts to Parquet, and distributes to analysts who query locally using Claude Code and DuckDB.
## Key Files
- [README](README.md): Project overview, quick start, and feature list
- [CLAUDE.md](CLAUDE.md): Claude Code project context and development instructions
- [ARCHITECTURE.md](ARCHITECTURE.md): System architecture, data flow, and component overview
## Documentation
- [Quick Start](docs/QUICKSTART.md): End-to-end setup for new deployments
- [Configuration](docs/CONFIGURATION.md): All instance.yaml options explained
- [Deployment](docs/DEPLOYMENT.md): Server provisioning and deployment guide
- [Data Sources](docs/DATA_SOURCES.md): Adapter system and data source configuration
## Core Source Code
- [src/data_sync.py](src/data_sync.py): Data sync orchestration and DataSource ABC
- [src/adapters/](src/adapters/): Pluggable data source adapters (Keboola, CSV)
- [src/parquet_manager.py](src/parquet_manager.py): CSV to Parquet conversion engine
- [src/config.py](src/config.py): Runtime configuration from data_description.md
- [config/loader.py](config/loader.py): Instance config loader with ${ENV_VAR} interpolation
## Web Application
- [webapp/app.py](webapp/app.py): Flask application entry point
- [webapp/config.py](webapp/config.py): Webapp configuration from instance.yaml
- [webapp/account_service.py](webapp/account_service.py): User account and sync management
## Server Operations
- [server/deploy.sh](server/deploy.sh): Deployment script
- [server/setup.sh](server/setup.sh): Initial server provisioning
- [server/webapp-setup.sh](server/webapp-setup.sh): Web application setup (Nginx, SSL, systemd)
- [config/instance.yaml.example](config/instance.yaml.example): Configuration template
## Configuration
Instance config: `config/instance.yaml` (YAML with `${ENV_VAR}` references for secrets).
Environment variables: `.env` file (never committed). See `config/.env.template`.
Data schema: `docs/data_description.md` (YAML blocks in markdown).
## Architecture Summary
Data Source -> Source Adapter -> Parquet files on server -> rsync over SSH -> Analyst machine -> DuckDB views -> Claude Code queries

View file

@ -1,28 +0,0 @@
#!/bin/bash
# Helper script to activate virtual environment
# Usage: source scripts/activate_venv.sh
# Detect project root (should be run from project directory)
if [ ! -d ".venv" ]; then
echo "❌ Virtual environment not found. Are you in the project directory?"
echo " Expected: A directory with .venv/ folder"
return 1 2>/dev/null || exit 1
fi
# Activate based on platform
if [ -f ".venv/bin/activate" ]; then
# Unix/macOS
source .venv/bin/activate
echo "✅ Virtual environment activated (Unix)"
elif [ -f ".venv/Scripts/activate" ]; then
# Windows (Git Bash)
source .venv/Scripts/activate
echo "✅ Virtual environment activated (Windows)"
else
echo "❌ Could not find activation script in .venv/"
return 1 2>/dev/null || exit 1
fi
# Show Python version
echo " Python: $(python --version 2>&1)"
echo " Location: $(which python)"

View file

@ -1,109 +0,0 @@
#!/bin/bash
# Backfill missing Jira issues from GitHub issue #101
# Range: SUPPORT-15166 to SUPPORT-15243 (71 missing of 78 total)
# Safe to run while webhook processing is active.
#
# Usage:
# ssh kids
# cd /opt/data-analyst/repo
# source /opt/data-analyst/.venv/bin/activate
# bash scripts/backfill_gap.sh [--dry-run]
set -euo pipefail
REPO_DIR="/opt/data-analyst/repo"
VENV_DIR="/opt/data-analyst/.venv"
RAW_DIR="/data/src_data/raw/jira"
PARQUET_DIR="/data/src_data/parquet/jira"
LOG_FILE="/opt/data-analyst/logs/backfill_gap.log"
JIRA_PROJECT="${JIRA_PROJECT:-}"
if [ -n "$JIRA_PROJECT" ]; then
JQL="project = \"${JIRA_PROJECT}\" AND key >= SUPPORT-15166 AND key <= SUPPORT-15243"
else
JQL='key >= SUPPORT-15166 AND key <= SUPPORT-15243'
fi
RANGE_START=15166
RANGE_END=15243
DRY_RUN=false
# Parse args
if [[ "${1:-}" == "--dry-run" ]]; then
DRY_RUN=true
fi
# Ensure log directory exists
mkdir -p "$(dirname "$LOG_FILE")"
# Log to both stdout and file
exec > >(tee -a "$LOG_FILE") 2>&1
echo "=== Backfill started: $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="
cd "$REPO_DIR"
# --- Phase 1: Download raw JSON ---
echo ""
echo "--- Phase 1: Download raw JSON ---"
if $DRY_RUN; then
python -m connectors.jira.scripts.backfill --jql "$JQL" --dry-run
echo "Dry run complete. Exiting."
exit 0
fi
python -m connectors.jira.scripts.backfill --jql "$JQL" --skip-existing --parallel 4
# --- Phase 2: Incremental Parquet transform ---
echo ""
echo "--- Phase 2: Incremental Parquet transform ---"
success=0
skipped=0
failed=0
for issue_num in $(seq $RANGE_START $RANGE_END); do
issue_key="SUPPORT-${issue_num}"
json_file="${RAW_DIR}/issues/${issue_key}.json"
if [ ! -f "$json_file" ]; then
echo "SKIP: $issue_key (no JSON)"
skipped=$((skipped + 1))
continue
fi
echo -n "Transform $issue_key... "
if python -m src.incremental_jira_transform "$issue_key" 2>&1 | tail -1; then
success=$((success + 1))
else
echo "FAILED: $issue_key"
failed=$((failed + 1))
fi
sleep 0.5 # reduce collision window with live webhooks
done
echo ""
echo "Transform complete: $success ok, $skipped skipped, $failed failed"
# --- Phase 3: Verification ---
echo ""
echo "--- Phase 3: Verification ---"
python -c "
import pyarrow.parquet as pq
from pathlib import Path
parquet_dir = Path('$PARQUET_DIR/issues')
all_keys = set()
for pf in parquet_dir.glob('*.parquet'):
table = pq.read_table(pf, columns=['issue_key'])
all_keys.update(table.column('issue_key').to_pylist())
expected = {f'SUPPORT-{n}' for n in range($RANGE_START, $RANGE_END + 1)}
found = expected & all_keys
missing = expected - all_keys
print(f'Found: {len(found)}/{len(expected)} issues in Parquet')
if missing:
print(f'STILL MISSING ({len(missing)}): {sorted(missing)}')
else:
print('SUCCESS: All issues present in Parquet')
"
echo ""
echo "=== Backfill finished: $(date -u +%Y-%m-%dT%H:%M:%SZ) ==="

View file

@ -1,69 +0,0 @@
#!/usr/bin/env python3
"""Collect Claude Code session transcript to user/sessions/.
This script is invoked by Claude Code's SessionEnd hook.
It reads JSON from stdin containing session_id, transcript_path, and cwd,
then copies the transcript JSONL file to user/sessions/ with a date prefix.
Design principles:
- Stdlib only (no external dependencies)
- Must NEVER crash or produce non-zero exit - Claude Code expects clean exit
- Uses shutil.copy2 (not move) - Claude Code still references the transcript
- UTC date for consistency across timezones
"""
import json
import shutil
import sys
from datetime import datetime, timezone
from pathlib import Path
def main() -> None:
try:
raw = sys.stdin.read()
if not raw.strip():
return
data = json.loads(raw)
session_id = data.get("session_id", "")
transcript_path = data.get("transcript_path", "")
cwd = data.get("cwd", "")
if not transcript_path or not session_id:
return
source = Path(transcript_path)
if not source.exists() or not source.is_file():
return
# Determine target directory: cwd/user/sessions/
if not cwd:
return
target_dir = Path(cwd) / "user" / "sessions"
# Only collect if we're inside a project that has user/ directory
user_dir = Path(cwd) / "user"
if not user_dir.is_dir():
return
target_dir.mkdir(parents=True, exist_ok=True)
# Format: YYYY-MM-DD_{full_session_id}.jsonl
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
target_file = target_dir / f"{date_str}_{session_id}.jsonl"
# Skip if already collected (check by session_id suffix to handle date changes)
for existing in target_dir.glob(f"*_{session_id}.jsonl"):
return
shutil.copy2(str(source), str(target_file))
except Exception:
# Must never crash - Claude Code depends on clean exit
pass
if __name__ == "__main__":
main()

View file

@ -1,162 +0,0 @@
#!/usr/bin/env python3
"""
Generate per-user sync config files from central sync_settings.json.
This script reads /data/notifications/sync_settings.json and writes
~/.sync_settings.yaml for each user with their configured settings.
Usage:
python3 generate_user_sync_configs.py [--dry-run]
Run this script:
- After manual changes to sync_settings.json
- As a cron job to ensure configs stay in sync
- The webapp calls this automatically after settings changes
"""
import argparse
import json
import os
import pwd
import sys
from pathlib import Path
SYNC_SETTINGS_FILE = Path("/data/notifications/sync_settings.json")
DEFAULT_SETTINGS = {
"jira": False,
"jira_attachments": False,
}
def read_settings() -> dict:
"""Read sync settings from JSON file."""
if not SYNC_SETTINGS_FILE.exists():
return {}
try:
with open(SYNC_SETTINGS_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"Error reading {SYNC_SETTINGS_FILE}: {e}", file=sys.stderr)
return {}
def generate_yaml(settings: dict) -> str:
"""Generate YAML content for a user's sync config."""
lines = [
"# AI Data Analyst - Data Sync Configuration",
"# Managed by web portal - changes here may be overwritten",
"#",
"# To manage settings, visit the web portal (Data Settings page)",
"",
"datasets:",
]
for dataset, enabled in sorted(settings.items()):
value = "true" if enabled else "false"
lines.append(f" {dataset}: {value}")
lines.append("")
return "\n".join(lines)
def get_user_home(username: str) -> Path | None:
"""Get home directory for a user."""
try:
return Path(pwd.getpwnam(username).pw_dir)
except KeyError:
return None
def write_user_config(username: str, yaml_content: str, dry_run: bool = False) -> bool:
"""Write config file to user's home directory.
Returns True on success, False on failure.
"""
home = get_user_home(username)
if not home:
print(f" [SKIP] User {username} not found on system")
return False
config_path = home / ".sync_settings.yaml"
if dry_run:
print(f" [DRY-RUN] Would write {config_path}")
return True
try:
# Write to temp file first
import tempfile
fd, tmp_path = tempfile.mkstemp(suffix=".yaml", dir=str(home.parent))
try:
with os.fdopen(fd, "w") as f:
f.write(yaml_content)
# Get user's uid/gid
user_info = pwd.getpwnam(username)
# Set ownership
os.chown(tmp_path, user_info.pw_uid, user_info.pw_gid)
os.chmod(tmp_path, 0o644)
# Atomic move
os.replace(tmp_path, config_path)
print(f" [OK] {config_path}")
return True
except Exception as e:
os.unlink(tmp_path)
raise e
except PermissionError:
print(f" [ERROR] Permission denied for {username}")
return False
except Exception as e:
print(f" [ERROR] {username}: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description="Generate per-user sync config files from central settings."
)
parser.add_argument(
"--dry-run", action="store_true", help="Show what would be done without making changes"
)
parser.add_argument("--user", help="Generate config for specific user only")
args = parser.parse_args()
all_settings = read_settings()
if not all_settings:
print("No sync settings found or file is empty.")
return 0
if args.user:
users = {args.user: all_settings.get(args.user, {})}
else:
users = all_settings
print(f"Generating sync configs for {len(users)} user(s)...")
success = 0
failed = 0
for username, user_data in users.items():
# Merge with defaults
settings = dict(DEFAULT_SETTINGS)
settings.update(user_data.get("datasets", {}))
yaml_content = generate_yaml(settings)
if write_user_config(username, yaml_content, args.dry_run):
success += 1
else:
failed += 1
print(f"\nDone: {success} succeeded, {failed} failed")
return 0 if failed == 0 else 1
if __name__ == "__main__":
sys.exit(main())

View file

@ -1,41 +0,0 @@
#!/bin/bash
# Remote Query - wrapper for src.remote_query
#
# Runs DuckDB queries spanning local Parquet + remote BigQuery tables.
# Sets up the correct environment (PYTHONPATH, CONFIG_DIR, env vars) automatically.
#
# Usage (via SSH from analyst machine):
# ssh <alias> 'bash ~/server/scripts/remote_query.sh \
# --register-bq "traffic=SELECT ... FROM \`project.dataset.table\` WHERE ... GROUP BY ..." \
# --sql "SELECT * FROM traffic ORDER BY ..." \
# --format table'
#
# All arguments are passed directly to python -m src.remote_query.
# See: python -m src.remote_query --help
set -e
APP_DIR="/opt/data-analyst"
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
# Load BigQuery environment variables
# Try analyst-readable env first (deployed to /data/scripts/), fall back to app .env
if [[ -f "${SCRIPT_DIR}/.remote_query.env" ]]; then
set -a
source "${SCRIPT_DIR}/.remote_query.env"
set +a
elif [[ -r "${APP_DIR}/.env" ]]; then
set -a
source "${APP_DIR}/.env"
set +a
else
echo "ERROR: No environment file found. Contact your admin." >&2
echo " Tried: ${SCRIPT_DIR}/.remote_query.env, ${APP_DIR}/.env" >&2
exit 1
fi
# Run remote_query with correct paths
cd "${APP_DIR}"
PYTHONPATH="${APP_DIR}/repo" \
CONFIG_DIR="${APP_DIR}/instance/config" \
exec "${APP_DIR}/.venv/bin/python3" -m src.remote_query "$@"

View file

@ -1,126 +0,0 @@
#!/bin/bash
# setup_views.sh - DuckDB views initialization script
#
# This script performs:
# 1. DuckDB views initialization from Parquet files
#
# Run this after syncing Parquet files (either via update.sh or rsync)
set -e # Exit on error
echo "🦆 DuckDB Views Setup"
echo ""
# Check that we're in the correct folder
# Support both local (server/docs/) and server (~/ with server/ symlinks) layouts
if [ ! -f "server/docs/data_description.md" ] && [ ! -f "docs/data_description.md" ]; then
echo "❌ Run script from project root (folder with server/docs/data_description.md)"
exit 1
fi
# Activate virtual environment
echo "1⃣ Activating virtual environment..."
if [ -d ".venv" ]; then
# Try Unix-style activation first
if [ -f ".venv/bin/activate" ]; then
source .venv/bin/activate
# Windows-style activation
elif [ -f ".venv/Scripts/activate" ]; then
source .venv/Scripts/activate
else
echo " ❌ Virtual environment activation script not found."
exit 1
fi
echo " ✅ Virtual environment activated"
elif [ -d "venv" ]; then
# Legacy support for old venv name
if [ -f "venv/bin/activate" ]; then
source venv/bin/activate
elif [ -f "venv/Scripts/activate" ]; then
source venv/Scripts/activate
else
echo " ❌ Virtual environment activation script not found."
exit 1
fi
echo " ✅ Virtual environment activated (legacy venv)"
else
echo " ❌ Virtual environment not found (.venv or venv)."
echo " Run bootstrap setup first."
exit 1
fi
# Initialize DuckDB views
echo ""
echo "2⃣ Initializing DuckDB views..."
echo ""
# Use python3 if available, otherwise python (Windows compatibility)
if command -v python3 >/dev/null 2>&1; then
PYTHON_CMD=python3
else
PYTHON_CMD=python
fi
# Determine scripts location (local: server/scripts/, server deploy: same dir as this script)
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
DUCKDB_MANAGER="${SCRIPT_DIR}/duckdb_manager.py"
if $PYTHON_CMD "$DUCKDB_MANAGER" --reinit; then
echo ""
echo " ✅ DuckDB views initialized"
else
echo ""
echo " ❌ DuckDB initialization failed. Check logs above."
exit 1
fi
# Optional dataset views (each sync script manages its own views)
if [ -d "server/parquet/jira" ]; then
JIRA_SCRIPT="${SCRIPT_DIR}/sync_jira.sh"
if [ -f "$JIRA_SCRIPT" ]; then
echo ""
echo "3⃣ Creating Jira views..."
bash "$JIRA_SCRIPT" --views-only
fi
fi
# Display sync state
echo ""
echo "📊 Data state:"
# Check for sync state in either location
SYNC_STATE=""
if [ -f "server/metadata/sync_state.json" ]; then
SYNC_STATE="server/metadata/sync_state.json"
elif [ -f "data/metadata/sync_state.json" ]; then
SYNC_STATE="data/metadata/sync_state.json"
fi
if [ -n "$SYNC_STATE" ]; then
$PYTHON_CMD << PYTHON
import json
from datetime import datetime
with open("$SYNC_STATE", "r") as f:
state = json.load(f)
print(f"\n Last update: {state.get('last_updated', 'N/A')}")
print(f" Tables: {len(state.get('tables', {}))}\n")
for table_id, table_state in state.get('tables', {}).items():
table_name = table_state.get('table_name', table_id)
rows = table_state.get('rows', 0)
size_mb = table_state.get('file_size_mb', 0)
strategy = table_state.get('strategy', 'N/A')
print(f" - {table_name}: {rows:,} rows, {size_mb:.2f} MB ({strategy})")
PYTHON
fi
# Done
echo ""
echo "✅ Setup complete!"
echo ""
echo "💡 Data is ready for analysis with Claude Code"
echo " DuckDB database: user/duckdb/analytics.duckdb"
echo ""

File diff suppressed because it is too large Load diff

View file

@ -1,23 +0,0 @@
# Data Analyst - Sync Configuration
#
# This file controls which additional datasets are synchronized.
# Core telemetry data is ALWAYS synced (projects, users, jobs, etc.)
#
# To enable a dataset: change false -> true
# To disable a dataset: change true -> false
#
# After changing, run: bash server/scripts/sync_data.sh
datasets:
# Jira Support tickets (SUPPORT project)
# Size: ~50MB (parquet only), Updated: real-time via webhooks
# Recommended for: Support team, Customer Success
jira: false
# Jira attachment files (images, logs, etc.)
# Size: ~500MB+, requires jira: true
# Only enable if you need to view actual attachment files locally
jira_attachments: false
# Future datasets:
# finance: false # Investor reporting (coming soon)

View file

@ -1,138 +0,0 @@
#!/bin/bash
# Test rsync reliability fixes (Issue #197)
#
# Downloads data into gitignored data/test_sync/ to verify rsync_reliable
# wrapper works end-to-end without touching repo directories.
#
# Usage:
# bash scripts/test_sync.sh # Full test sync
# bash scripts/test_sync.sh --dry-run # Preview only
set -e
DRY_RUN=""
for arg in "$@"; do
case "$arg" in
--dry-run) DRY_RUN="--dry-run" ;;
esac
done
# --- Rsync reliability settings (same as sync_data.sh) ---
RSYNC_SSH_OPTS='ssh -o ServerAliveInterval=60 -o ServerAliveCountMax=3 -o ConnectTimeout=30'
RSYNC_TIMEOUT=300
RSYNC_MAX_RETRIES=3
RSYNC_RETRY_DELAY=5
rsync_reliable() {
local attempt=1
local delay=$RSYNC_RETRY_DELAY
while [[ $attempt -le $RSYNC_MAX_RETRIES ]]; do
rsync -e "$RSYNC_SSH_OPTS" --timeout="$RSYNC_TIMEOUT" \
--partial-dir=.rsync-partial "$@" && return 0
local exit_code=$?
# Exit codes 23/24 = partial transfer (permission denied, vanished files) — not retryable
if [[ $exit_code -eq 23 || $exit_code -eq 24 ]]; then
echo " Warning: rsync partial transfer (exit $exit_code), continuing..."
return 0
fi
if [[ $attempt -lt $RSYNC_MAX_RETRIES ]]; then
echo " Rsync failed (exit $exit_code), retrying in ${delay}s (attempt $attempt/$RSYNC_MAX_RETRIES)..."
sleep "$delay"
delay=$((delay * 2))
fi
attempt=$((attempt + 1))
done
echo " ERROR: Rsync failed after $RSYNC_MAX_RETRIES attempts"
return 1
}
# --- Test destination (gitignored) ---
DEST="./data/test_sync"
LOG_DIR="./data/sync_diagnostics"
mkdir -p "$DEST" "$LOG_DIR"
LOG_FILE="$LOG_DIR/test_sync_$(date '+%Y%m%d_%H%M%S').log"
log() {
echo "$@" | tee -a "$LOG_FILE"
}
log "=== Rsync reliability test ==="
log "Started: $(date -u '+%Y-%m-%dT%H:%M:%SZ')"
log "Destination: $DEST"
log "SSH opts: $RSYNC_SSH_OPTS"
log "Timeout: ${RSYNC_TIMEOUT}s, Retries: $RSYNC_MAX_RETRIES"
log ""
# --- Test 1: SSH connectivity ---
log "--- Test 1: SSH connectivity ---"
START=$(date +%s)
if ssh -o ConnectTimeout=10 data-analyst "echo ok" >> "$LOG_FILE" 2>&1; then
ELAPSED=$(($(date +%s) - START))
log "PASS (${ELAPSED}s)"
else
ELAPSED=$(($(date +%s) - START))
log "FAIL (${ELAPSED}s) — cannot reach server, aborting"
exit 1
fi
log ""
# --- Test 2: Small directory (docs) ---
log "--- Test 2: Sync docs (small, text, -avz) ---"
mkdir -p "$DEST/docs"
START=$(date +%s)
rsync_reliable -avz --delete $DRY_RUN "data-analyst:server/docs/" "$DEST/docs/" >> "$LOG_FILE" 2>&1
ELAPSED=$(($(date +%s) - START))
if [[ -z "$DRY_RUN" ]]; then
DOC_COUNT=$(find "$DEST/docs" -type f 2>/dev/null | wc -l | tr -d ' ')
DOC_SIZE=$(du -sh "$DEST/docs" 2>/dev/null | cut -f1)
log "PASS (${ELAPSED}s) — $DOC_COUNT files, $DOC_SIZE"
else
log "PASS dry-run (${ELAPSED}s)"
fi
log ""
# --- Test 3: Scripts ---
log "--- Test 3: Sync scripts (small, text, -avz) ---"
mkdir -p "$DEST/scripts"
START=$(date +%s)
rsync_reliable -avz --delete $DRY_RUN "data-analyst:server/scripts/" "$DEST/scripts/" >> "$LOG_FILE" 2>&1
ELAPSED=$(($(date +%s) - START))
if [[ -z "$DRY_RUN" ]]; then
SCRIPT_COUNT=$(find "$DEST/scripts" -type f 2>/dev/null | wc -l | tr -d ' ')
log "PASS (${ELAPSED}s) — $SCRIPT_COUNT files"
else
log "PASS dry-run (${ELAPSED}s)"
fi
log ""
# --- Test 4: Core parquet (the big one — no -z, with keepalive) ---
log "--- Test 4: Sync core parquet (large, binary, -av WITHOUT -z) ---"
log "This is the transfer that was hanging. Expect ~7000 files..."
mkdir -p "$DEST/parquet"
START=$(date +%s)
rsync_reliable -av --delete --progress \
--exclude='jira/' --exclude='kbc_telemetry_expert/' \
$DRY_RUN data-analyst:server/parquet/ "$DEST/parquet/" 2>&1 | \
tee -a "$LOG_FILE" | \
grep -E '(^sent |^total size|^rsync|Warning:|ERROR:)' || true
ELAPSED=$(($(date +%s) - START))
if [[ -z "$DRY_RUN" ]]; then
PARQUET_COUNT=$(find "$DEST/parquet" -name '*.parquet' -type f 2>/dev/null | wc -l | tr -d ' ')
PARQUET_SIZE=$(du -sh "$DEST/parquet" 2>/dev/null | cut -f1)
log "DONE (${ELAPSED}s) — $PARQUET_COUNT parquet files, $PARQUET_SIZE"
else
log "DONE dry-run (${ELAPSED}s)"
fi
log ""
# --- Summary ---
log "=== Summary ==="
log "Finished: $(date -u '+%Y-%m-%dT%H:%M:%SZ')"
if [[ -z "$DRY_RUN" ]]; then
TOTAL_SIZE=$(du -sh "$DEST" 2>/dev/null | cut -f1)
TOTAL_FILES=$(find "$DEST" -type f 2>/dev/null | wc -l | tr -d ' ')
log "Total: $TOTAL_FILES files, $TOTAL_SIZE in $DEST"
fi
log "Full log: $LOG_FILE"
log ""
log "To clean up test data: rm -rf $DEST"

View file

@ -1,71 +0,0 @@
#!/bin/bash
# update.sh - Data synchronization script
#
# This script performs:
# 1. Data synchronization from configured data source
# 2. DuckDB views reinitialization
#
# Note: Git pull and dependency updates are handled by deploy.sh (GitHub Actions)
set -e # Exit on error
echo "🔄 AI Data Analyst - Data Update"
echo ""
# Check that we're in the correct folder (same check as config.py uses)
if [ ! -f "docs/data_description.md" ]; then
echo "❌ Run script from project root (folder with docs/data_description.md)"
exit 1
fi
# Note: Git pull and dependency updates are handled by deploy.sh (GitHub Actions)
# This script focuses only on data synchronization
# Activate virtual environment
# Supports both local (./.venv) and server (/opt/data-analyst/.venv) setups
echo ""
echo "1⃣ Activating virtual environment..."
if [ -d ".venv" ]; then
source .venv/bin/activate
echo " ✅ Virtual environment activated (local)"
elif [ -d "/opt/data-analyst/.venv" ]; then
source /opt/data-analyst/.venv/bin/activate
echo " ✅ Virtual environment activated (server)"
else
echo " ❌ Virtual environment not found. Run init.sh first."
exit 1
fi
# Data synchronization
echo ""
echo "2⃣ Synchronizing data..."
echo ""
# Run data sync
if python3 -m src.data_sync; then
echo ""
echo " ✅ Data synchronization complete"
else
echo ""
echo " ❌ Data synchronization failed. Check logs above."
exit 1
fi
# Generate data profiles (for catalog profiler)
echo ""
echo "3⃣ Generating data profiles..."
if python3 -m src.profiler; then
echo " ✅ Data profiles generated"
else
echo " ⚠️ Data profiling failed (non-fatal). Check logs above."
# Non-fatal: profiling failure should not break the pipeline
fi
# Done
echo ""
echo "✅ Data sync complete!"
echo ""
echo "💡 Parquet files are ready in data/parquet/"
echo " To setup DuckDB views, run: ./scripts/setup_views.sh"
echo ""