diff --git a/CLAUDE.md b/CLAUDE.md index 4171b63..c5fe212 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,6 +1,6 @@ # AI Data Analyst -Open-source data distribution platform for AI analytical systems. Syncs data from various sources, converts to Parquet, and distributes to analysts who use Claude Code for local analysis. +Open-source data distribution platform for AI analytical systems. Extracts data from sources into DuckDB, serves via FastAPI, and distributes parquets to analysts who use Claude Code for local analysis. ## First-Time Setup @@ -9,7 +9,7 @@ When a user opens this project for the first time, guide them through interactiv ### Step 1: Gather Information Ask the user for: 1. Company domain (e.g., "acme.com") - used for Google OAuth -2. Data source type: keboola / csv / bigquery (future) +2. Data source type: keboola / bigquery / csv 3. Instance name (e.g., "Acme Data Analyst") ### Step 2: Generate Configuration @@ -18,201 +18,156 @@ Ask the user for: 3. If Keboola: ask for Storage API token, stack URL, project ID 4. Create `.env` from `config/.env.template` -### Step 3: Generate Data Description -1. If Keboola adapter: use the API to fetch table metadata and generate `docs/data_description.md` -2. If CSV: ask user to describe their data files -3. The file defines tables, sync strategies, and schema +### Step 3: Register Tables +1. Use the FastAPI admin API (`POST /api/admin/tables/{id}`) or webapp UI to register tables +2. Tables are stored in DuckDB `table_registry` with source_type, bucket, source_table, query_mode +3. For migration from old format: `python scripts/migrate_registry_to_duckdb.py` -### Step 4: Server Setup (if deploying) -1. Guide VM provisioning (or use existing server) -2. Run `server/setup.sh` on the target VM -3. Run `server/webapp-setup.sh` for the web portal -4. Set up CI/CD from `.github/workflows/deploy.yml.example` +### Step 4: Docker Deployment +```bash +docker compose up # Start app + scheduler +docker compose --profile full up # Include telegram bot +``` ## Project Structure ``` -├── src/ # Core data sync engine (vendor-neutral) -│ ├── config.py # Configuration from data_description.md -│ ├── data_sync.py # Sync orchestration + DataSource ABC -│ ├── parquet_manager.py # Parquet file management -│ └── profiler.py # Data profiling -├── connectors/ # Data source connectors (pluggable) -│ ├── keboola/ # Keboola Storage connector -│ └── jira/ # Jira webhook connector -├── auth/ # Authentication providers (pluggable) -│ ├── google/ # Google OAuth provider -│ ├── email/ # Email magic link provider -│ └── desktop/ # Desktop JWT provider (API-only) -├── services/ # Standalone services (own systemd units) -│ ├── telegram_bot/ # Telegram notification bot -│ ├── ws_gateway/ # WebSocket notification gateway -│ ├── corporate_memory/ # AI knowledge aggregation -│ └── session_collector/ # Claude Code session collector -├── webapp/ # Flask web portal (login, dashboard, API) -├── server/ # Deployment infrastructure only -├── scripts/ # Utility scripts (sync, DuckDB setup, dev) -├── config/ # Configuration templates -│ ├── instance.yaml.example -│ └── data_description.md.example -├── docs/ # Documentation -│ └── metrics/ # Business metric YAML definitions -│ ├── revenue/ # Revenue metrics (total_revenue, AOV, etc.) -│ ├── customers/ # Customer metrics (count, repeat rate) -│ ├── marketing/ # Marketing metrics (ROI, CPA, conversion) -│ └── support/ # Support metrics (resolution time, CSAT) -└── tests/ # Test suite +├── src/ # Core engine +│ ├── db.py # DuckDB schema (system.duckdb, analytics.duckdb) +│ ├── orchestrator.py # SyncOrchestrator — ATTACHes extract.duckdb files +│ ├── repositories/ # DuckDB-backed CRUD (sync_state, table_registry, users, etc.) +│ ├── profiler.py # Data profiling +│ └── catalog_export.py # OpenMetadata catalog export +├── app/ # FastAPI application +│ ├── main.py # App setup, router registration +│ ├── api/ # REST API (sync, data, catalog, admin, auth) +│ └── web/ # HTML dashboard routes +├── connectors/ # Data source connectors (extract.duckdb contract) +│ ├── keboola/ # Keboola: extractor.py (DuckDB extension) + client.py (fallback) +│ ├── bigquery/ # BigQuery: extractor.py (remote-only via DuckDB BQ extension) +│ └── jira/ # Jira: webhook + incremental parquet → extract.duckdb +├── cli/ # CLI tool (`da sync`, `da query`, `da admin`) +├── auth/ # Authentication providers (google, email, password, desktop) +├── services/ # Standalone services (scheduler, telegram_bot, ws_gateway, etc.) +├── webapp/ # Legacy Flask web portal +├── server/ # Legacy deployment infrastructure +├── scripts/ # Utility + migration scripts +├── config/ # Configuration templates (instance.yaml.example) +├── docs/ # Documentation + metric YAML definitions +└── tests/ # Test suite (704 tests) ``` -## Architecture +## Architecture: extract.duckdb Contract + +Every data source produces the same output: +``` +/data/extracts/{source_name}/ +├── extract.duckdb ← _meta table + views +└── data/ ← parquet files (local sources only) +``` + +The SyncOrchestrator scans `/data/extracts/*/extract.duckdb`, ATTACHes each into master `analytics.duckdb`, and creates views. ``` -Data Source (Keboola / CSV / BigQuery) - │ - ▼ -┌─────────────────────────────────┐ -│ Data Broker Server │ -│ ├── /data/src_data/parquet/ │ Converted data -│ ├── /data/docs/ │ Documentation -│ └── /data/scripts/ │ Helper scripts -└─────────────────────────────────┘ - │ rsync (via ~/server/ symlinks) - ▼ -┌─────────────────────────────────┐ -│ Analyst (local machine) │ -│ ├── ./server/ (read-only) │ parquet, docs, scripts -│ └── ./user/ (workspace) │ duckdb, notifications -└─────────────────────────────────┘ +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ Keboola │ │ BigQuery │ │ Jira │ +│ extractor │ │ extractor │ │ webhooks │ +│ (DuckDB ext) │ │ (remote BQ) │ │ (incremental)│ +└──────┬───────┘ └──────┬───────┘ └──────┬───────┘ + │ │ │ + ▼ ▼ ▼ + extract.duckdb extract.duckdb extract.duckdb + + data/*.parquet (views → BQ) + data/*.parquet + │ │ │ + └─────────────────┼─────────────────┘ + ▼ + SyncOrchestrator.rebuild() + ATTACH → master views in analytics.duckdb + │ + ┌──────────┼──────────┐ + ▼ ▼ ▼ + FastAPI CLI Webapp + (serve) (da sync) (dashboard) ``` +Three source types: +- **Batch pull** (Keboola): DuckDB extension downloads to parquet, scheduled +- **Remote attach** (BigQuery): DuckDB BQ extension, no download, queries go to BQ +- **Real-time push** (Jira): Webhooks update parquets incrementally + ## Configuration -Instance-specific config is in `config/instance.yaml`. See `config/instance.yaml.example` for all options. - -Environment variables go in `.env` (never committed to git). - -Data schema is defined in `docs/data_description.md` (YAML blocks in markdown). - -### Dual-Repo Deployment -Production uses two repos on the server: -- **OSS repo** (`/opt/data-analyst/repo/`): application code, no secrets or config -- **Instance repo** (`/opt/data-analyst/instance/`): private config, secrets template, data schema - -Symlinks bridge them: `repo/config/instance.yaml -> instance/config/instance.yaml`. -Each repo has its own SSH deploy key (github-oss / github-cfg aliases). -See `docs/auto-install.md` for full setup guide. +Instance-specific config: `config/instance.yaml` (see example). +Environment variables: `.env` (never committed). +Table definitions: DuckDB `table_registry` table in `system.duckdb`. ## Development ```bash # Setup -python3 -m venv .venv -source .venv/bin/activate +python3 -m venv .venv && source .venv/bin/activate pip install -r requirements.txt -# Run webapp locally +# Run FastAPI locally +uvicorn app.main:app --reload + +# Run legacy Flask webapp flask --app webapp.app run --debug # Run tests pytest tests/ -v -# Sync data -python -m src.data_sync +# Trigger sync manually +curl -X POST http://localhost:8000/api/sync/trigger + +# Docker +docker compose up ``` ## Extensibility -### Data Sources -Pluggable data source connectors in `connectors/`: -- **Keboola** (`keboola`): Syncs from Keboola Storage API -- **CSV** (`csv`): Import from local CSV files (planned) -- New connector = `connectors//adapter.py` implementing `DataSource` +### Data Sources (extract.duckdb contract) +New connector = `connectors//extractor.py` producing `extract.duckdb + data/`. +Must create `_meta` table with columns: table_name, description, rows, size_bytes, extracted_at, query_mode. +Orchestrator ATTACHes it automatically. ### Authentication Pluggable auth providers in `auth/`: - **Google** (`google`): OAuth via Google -- **Email** (`email`): Email magic link (itsdangerous token, no password needed) -- **Password** (`password`): Username/password authentication -- **Desktop** (`desktop`): JWT for desktop app API +- **Email** (`email`): Email magic link (itsdangerous token) +- **Password** (`password`): Username/password +- **Desktop** (`desktop`): JWT for API - New provider = `auth//provider.py` implementing `AuthProvider` -Configure data source in `config/instance.yaml` under `data_source.type`. - -## Server Management - -```bash -# Add analyst user -sudo add-analyst username "ssh-rsa AAAA..." - -# Add privileged analyst -sudo add-analyst username "ssh-rsa AAAA..." --private - -# List analysts -list-analysts - -# Server monitoring -uptime && free -h && df -h /data -``` - -## Returning Users - -When reopening the project in Claude Code: -1. Sync latest data: `rsync -avz --no-perms --no-group data-analyst:server/parquet/ ./server/parquet/` -2. Verify DuckDB: `ls -lh user/duckdb/analytics.duckdb` -3. Start analyzing with Claude Code - ## Key Implementation Details -### Config Loading Chain -1. `config/loader.py` loads `instance.yaml` (checks `$CONFIG_DIR`, then `./config/`) -2. `webapp/config.py` calls `_load_instance_config()` at module level -3. `_get(config, *keys, default="")` traverses nested dicts safely -4. `inject_config()` context processor exposes `Config` to all Jinja templates -5. Templates use `{{ config.INSTANCE_NAME }}`, `{{ config.INSTANCE_SUBTITLE }}`, etc. +### DuckDB Schema (src/db.py) +- Schema v2 with auto-migration from v1 +- `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc. +- `sync_state`, `sync_history`: track extraction progress +- `users`, `dataset_permissions`, `audit_log`: auth + RBAC +- System DB at `{DATA_DIR}/state/system.duckdb` +- Analytics DB at `{DATA_DIR}/analytics/server.duckdb` + +### SyncOrchestrator (src/orchestrator.py) +- `rebuild()`: scans extracts dir, ATTACHes all, creates master views, updates sync_state +- `rebuild_source(name)`: single source (used after Jira webhooks) +- Thread-safe via `_rebuild_lock` ### Connector Pattern -- ABC: `DataSource` class in `src/data_sync.py` -- Registry: `create_data_source()` in `src/data_sync.py` auto-discovers connectors in `connectors/` -- Keboola: `connectors/keboola/adapter.py` -> `KeboolaDataSource` implementing `DataSource` -- Core Keboola logic: `connectors/keboola/client.py` (Keboola Storage API wrapper) +- **Keboola**: `connectors/keboola/extractor.py` uses DuckDB Keboola extension, fallback to `client.py` +- **BigQuery**: `connectors/bigquery/extractor.py` uses DuckDB BQ extension (remote-only, no download) +- **Jira**: `connectors/jira/webhook.py` → `incremental_transform.py` → `extract_init.py` updates `_meta` +- `connectors/keboola/client.py`: legacy Keboola Storage API wrapper (kept as fallback) -### Auth Provider Pattern -- ABC: `AuthProvider` class in `auth/__init__.py` -- Discovery: `discover_providers()` scans `auth/*/provider.py` -- Providers: google, email, desktop (each exports `provider` instance) -- Email provider: uses `itsdangerous.URLSafeTimedSerializer` for magic link tokens -- Multi-domain: `auth.allowed_domain` in instance.yaml supports comma-separated domains -- Session contract: all providers set `session["user"] = {"email", "name", "picture"}` - -### Service Pattern -- Self-contained modules in `services/` with `__main__.py` for `python -m services.` -- Systemd files in `services//systemd/`, auto-discovered by `deploy.sh` -- Services: telegram_bot, ws_gateway, corporate_memory, session_collector - -### Business Metrics Pattern -- YAML definitions in `docs/metrics/{category}/{metric}.yml` (list with one dict) -- `webapp/utils/metric_parser.py` - parses YAML, structures for modal UI, auto-discovers `sql_*` fields -- `webapp/app.py` `_load_metrics_data()` - scans metrics dir, groups by category, returns ordered list -- Catalog template renders dynamically via Jinja loop (no hardcoded metrics) -- Profiler links metrics to tables via `used_by_metrics` in `profiles.json` -- Production: metrics in instance repo deployed to `/data/docs/metrics/` -- Sample/dev: OSS repo `docs/metrics/` (10 e-commerce metrics) - -### Table Registry Pattern -- `src/table_registry.py` - central CRUD for registered tables with atomic JSON persistence -- Audit logging for register/unregister operations -- Generates `data_description.md` from registry state - -### Server Patterns -- Atomic JSON writes: `tempfile.mkstemp()` + `os.fchmod(fd, 0o660)` + `os.replace()` -- User home writes: `sudo /usr/bin/install -o {user} -g {user}` pattern -- Staging dir: `/tmp/data_analyst_staging` (deploy.sh creates it with setgid) -- Dev docs: `dev_docs/server.md` documents all established patterns +### Config Loading +1. `config/loader.py` loads `instance.yaml` +2. `app/instance_config.py` exposes `get_data_source_type()`, `get_value()` +3. Table config lives in DuckDB `table_registry` (not markdown files) ### Files NOT to modify (stable infrastructure) -- `src/parquet_manager.py` - Parquet conversion engine - `connectors/jira/file_lock.py` - Advisory file locking -- `connectors/jira/incremental_transform.py` - Jira monthly Parquet transform +- `connectors/jira/transform.py` - Core Jira transform logic - `services/ws_gateway/` - WebSocket notification gateway ## Git Commits & Pull Requests diff --git a/docker-compose.yml b/docker-compose.yml index ff2a074..ebc712b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,6 +16,20 @@ services: timeout: 5s retries: 3 + # One-shot: run extractor then rebuild orchestrator views + extract: + build: . + command: > + sh -c "python -m connectors.keboola.extractor && + python -c 'from src.orchestrator import SyncOrchestrator; print(SyncOrchestrator().rebuild())'" + volumes: + - data:/data + env_file: .env + environment: + - DATA_DIR=/data + profiles: + - extract + scheduler: build: . command: python -m services.scheduler