docs: rewrite CLAUDE.md for extract.duckdb architecture
Update project structure, architecture diagram, key implementation details, development commands, and extensibility docs. Add extract service to docker-compose.yml for one-shot extraction.
This commit is contained in:
parent
b502bd8bdd
commit
9fef90a729
2 changed files with 123 additions and 154 deletions
263
CLAUDE.md
263
CLAUDE.md
|
|
@ -1,6 +1,6 @@
|
||||||
# AI Data Analyst
|
# AI Data Analyst
|
||||||
|
|
||||||
Open-source data distribution platform for AI analytical systems. Syncs data from various sources, converts to Parquet, and distributes to analysts who use Claude Code for local analysis.
|
Open-source data distribution platform for AI analytical systems. Extracts data from sources into DuckDB, serves via FastAPI, and distributes parquets to analysts who use Claude Code for local analysis.
|
||||||
|
|
||||||
## First-Time Setup
|
## First-Time Setup
|
||||||
|
|
||||||
|
|
@ -9,7 +9,7 @@ When a user opens this project for the first time, guide them through interactiv
|
||||||
### Step 1: Gather Information
|
### Step 1: Gather Information
|
||||||
Ask the user for:
|
Ask the user for:
|
||||||
1. Company domain (e.g., "acme.com") - used for Google OAuth
|
1. Company domain (e.g., "acme.com") - used for Google OAuth
|
||||||
2. Data source type: keboola / csv / bigquery (future)
|
2. Data source type: keboola / bigquery / csv
|
||||||
3. Instance name (e.g., "Acme Data Analyst")
|
3. Instance name (e.g., "Acme Data Analyst")
|
||||||
|
|
||||||
### Step 2: Generate Configuration
|
### Step 2: Generate Configuration
|
||||||
|
|
@ -18,201 +18,156 @@ Ask the user for:
|
||||||
3. If Keboola: ask for Storage API token, stack URL, project ID
|
3. If Keboola: ask for Storage API token, stack URL, project ID
|
||||||
4. Create `.env` from `config/.env.template`
|
4. Create `.env` from `config/.env.template`
|
||||||
|
|
||||||
### Step 3: Generate Data Description
|
### Step 3: Register Tables
|
||||||
1. If Keboola adapter: use the API to fetch table metadata and generate `docs/data_description.md`
|
1. Use the FastAPI admin API (`POST /api/admin/tables/{id}`) or webapp UI to register tables
|
||||||
2. If CSV: ask user to describe their data files
|
2. Tables are stored in DuckDB `table_registry` with source_type, bucket, source_table, query_mode
|
||||||
3. The file defines tables, sync strategies, and schema
|
3. For migration from old format: `python scripts/migrate_registry_to_duckdb.py`
|
||||||
|
|
||||||
### Step 4: Server Setup (if deploying)
|
### Step 4: Docker Deployment
|
||||||
1. Guide VM provisioning (or use existing server)
|
```bash
|
||||||
2. Run `server/setup.sh` on the target VM
|
docker compose up # Start app + scheduler
|
||||||
3. Run `server/webapp-setup.sh` for the web portal
|
docker compose --profile full up # Include telegram bot
|
||||||
4. Set up CI/CD from `.github/workflows/deploy.yml.example`
|
```
|
||||||
|
|
||||||
## Project Structure
|
## Project Structure
|
||||||
|
|
||||||
```
|
```
|
||||||
├── src/ # Core data sync engine (vendor-neutral)
|
├── src/ # Core engine
|
||||||
│ ├── config.py # Configuration from data_description.md
|
│ ├── db.py # DuckDB schema (system.duckdb, analytics.duckdb)
|
||||||
│ ├── data_sync.py # Sync orchestration + DataSource ABC
|
│ ├── orchestrator.py # SyncOrchestrator — ATTACHes extract.duckdb files
|
||||||
│ ├── parquet_manager.py # Parquet file management
|
│ ├── repositories/ # DuckDB-backed CRUD (sync_state, table_registry, users, etc.)
|
||||||
│ └── profiler.py # Data profiling
|
│ ├── profiler.py # Data profiling
|
||||||
├── connectors/ # Data source connectors (pluggable)
|
│ └── catalog_export.py # OpenMetadata catalog export
|
||||||
│ ├── keboola/ # Keboola Storage connector
|
├── app/ # FastAPI application
|
||||||
│ └── jira/ # Jira webhook connector
|
│ ├── main.py # App setup, router registration
|
||||||
├── auth/ # Authentication providers (pluggable)
|
│ ├── api/ # REST API (sync, data, catalog, admin, auth)
|
||||||
│ ├── google/ # Google OAuth provider
|
│ └── web/ # HTML dashboard routes
|
||||||
│ ├── email/ # Email magic link provider
|
├── connectors/ # Data source connectors (extract.duckdb contract)
|
||||||
│ └── desktop/ # Desktop JWT provider (API-only)
|
│ ├── keboola/ # Keboola: extractor.py (DuckDB extension) + client.py (fallback)
|
||||||
├── services/ # Standalone services (own systemd units)
|
│ ├── bigquery/ # BigQuery: extractor.py (remote-only via DuckDB BQ extension)
|
||||||
│ ├── telegram_bot/ # Telegram notification bot
|
│ └── jira/ # Jira: webhook + incremental parquet → extract.duckdb
|
||||||
│ ├── ws_gateway/ # WebSocket notification gateway
|
├── cli/ # CLI tool (`da sync`, `da query`, `da admin`)
|
||||||
│ ├── corporate_memory/ # AI knowledge aggregation
|
├── auth/ # Authentication providers (google, email, password, desktop)
|
||||||
│ └── session_collector/ # Claude Code session collector
|
├── services/ # Standalone services (scheduler, telegram_bot, ws_gateway, etc.)
|
||||||
├── webapp/ # Flask web portal (login, dashboard, API)
|
├── webapp/ # Legacy Flask web portal
|
||||||
├── server/ # Deployment infrastructure only
|
├── server/ # Legacy deployment infrastructure
|
||||||
├── scripts/ # Utility scripts (sync, DuckDB setup, dev)
|
├── scripts/ # Utility + migration scripts
|
||||||
├── config/ # Configuration templates
|
├── config/ # Configuration templates (instance.yaml.example)
|
||||||
│ ├── instance.yaml.example
|
├── docs/ # Documentation + metric YAML definitions
|
||||||
│ └── data_description.md.example
|
└── tests/ # Test suite (704 tests)
|
||||||
├── docs/ # Documentation
|
|
||||||
│ └── metrics/ # Business metric YAML definitions
|
|
||||||
│ ├── revenue/ # Revenue metrics (total_revenue, AOV, etc.)
|
|
||||||
│ ├── customers/ # Customer metrics (count, repeat rate)
|
|
||||||
│ ├── marketing/ # Marketing metrics (ROI, CPA, conversion)
|
|
||||||
│ └── support/ # Support metrics (resolution time, CSAT)
|
|
||||||
└── tests/ # Test suite
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Architecture
|
## Architecture: extract.duckdb Contract
|
||||||
|
|
||||||
|
Every data source produces the same output:
|
||||||
|
```
|
||||||
|
/data/extracts/{source_name}/
|
||||||
|
├── extract.duckdb ← _meta table + views
|
||||||
|
└── data/ ← parquet files (local sources only)
|
||||||
|
```
|
||||||
|
|
||||||
|
The SyncOrchestrator scans `/data/extracts/*/extract.duckdb`, ATTACHes each into master `analytics.duckdb`, and creates views.
|
||||||
|
|
||||||
```
|
```
|
||||||
Data Source (Keboola / CSV / BigQuery)
|
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
|
||||||
│
|
│ Keboola │ │ BigQuery │ │ Jira │
|
||||||
▼
|
│ extractor │ │ extractor │ │ webhooks │
|
||||||
┌─────────────────────────────────┐
|
│ (DuckDB ext) │ │ (remote BQ) │ │ (incremental)│
|
||||||
│ Data Broker Server │
|
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
|
||||||
│ ├── /data/src_data/parquet/ │ Converted data
|
│ │ │
|
||||||
│ ├── /data/docs/ │ Documentation
|
▼ ▼ ▼
|
||||||
│ └── /data/scripts/ │ Helper scripts
|
extract.duckdb extract.duckdb extract.duckdb
|
||||||
└─────────────────────────────────┘
|
+ data/*.parquet (views → BQ) + data/*.parquet
|
||||||
│ rsync (via ~/server/ symlinks)
|
│ │ │
|
||||||
▼
|
└─────────────────┼─────────────────┘
|
||||||
┌─────────────────────────────────┐
|
▼
|
||||||
│ Analyst (local machine) │
|
SyncOrchestrator.rebuild()
|
||||||
│ ├── ./server/ (read-only) │ parquet, docs, scripts
|
ATTACH → master views in analytics.duckdb
|
||||||
│ └── ./user/ (workspace) │ duckdb, notifications
|
│
|
||||||
└─────────────────────────────────┘
|
┌──────────┼──────────┐
|
||||||
|
▼ ▼ ▼
|
||||||
|
FastAPI CLI Webapp
|
||||||
|
(serve) (da sync) (dashboard)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Three source types:
|
||||||
|
- **Batch pull** (Keboola): DuckDB extension downloads to parquet, scheduled
|
||||||
|
- **Remote attach** (BigQuery): DuckDB BQ extension, no download, queries go to BQ
|
||||||
|
- **Real-time push** (Jira): Webhooks update parquets incrementally
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
Instance-specific config is in `config/instance.yaml`. See `config/instance.yaml.example` for all options.
|
Instance-specific config: `config/instance.yaml` (see example).
|
||||||
|
Environment variables: `.env` (never committed).
|
||||||
Environment variables go in `.env` (never committed to git).
|
Table definitions: DuckDB `table_registry` table in `system.duckdb`.
|
||||||
|
|
||||||
Data schema is defined in `docs/data_description.md` (YAML blocks in markdown).
|
|
||||||
|
|
||||||
### Dual-Repo Deployment
|
|
||||||
Production uses two repos on the server:
|
|
||||||
- **OSS repo** (`/opt/data-analyst/repo/`): application code, no secrets or config
|
|
||||||
- **Instance repo** (`/opt/data-analyst/instance/`): private config, secrets template, data schema
|
|
||||||
|
|
||||||
Symlinks bridge them: `repo/config/instance.yaml -> instance/config/instance.yaml`.
|
|
||||||
Each repo has its own SSH deploy key (github-oss / github-cfg aliases).
|
|
||||||
See `docs/auto-install.md` for full setup guide.
|
|
||||||
|
|
||||||
## Development
|
## Development
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Setup
|
# Setup
|
||||||
python3 -m venv .venv
|
python3 -m venv .venv && source .venv/bin/activate
|
||||||
source .venv/bin/activate
|
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
|
|
||||||
# Run webapp locally
|
# Run FastAPI locally
|
||||||
|
uvicorn app.main:app --reload
|
||||||
|
|
||||||
|
# Run legacy Flask webapp
|
||||||
flask --app webapp.app run --debug
|
flask --app webapp.app run --debug
|
||||||
|
|
||||||
# Run tests
|
# Run tests
|
||||||
pytest tests/ -v
|
pytest tests/ -v
|
||||||
|
|
||||||
# Sync data
|
# Trigger sync manually
|
||||||
python -m src.data_sync
|
curl -X POST http://localhost:8000/api/sync/trigger
|
||||||
|
|
||||||
|
# Docker
|
||||||
|
docker compose up
|
||||||
```
|
```
|
||||||
|
|
||||||
## Extensibility
|
## Extensibility
|
||||||
|
|
||||||
### Data Sources
|
### Data Sources (extract.duckdb contract)
|
||||||
Pluggable data source connectors in `connectors/`:
|
New connector = `connectors/<name>/extractor.py` producing `extract.duckdb + data/`.
|
||||||
- **Keboola** (`keboola`): Syncs from Keboola Storage API
|
Must create `_meta` table with columns: table_name, description, rows, size_bytes, extracted_at, query_mode.
|
||||||
- **CSV** (`csv`): Import from local CSV files (planned)
|
Orchestrator ATTACHes it automatically.
|
||||||
- New connector = `connectors/<name>/adapter.py` implementing `DataSource`
|
|
||||||
|
|
||||||
### Authentication
|
### Authentication
|
||||||
Pluggable auth providers in `auth/`:
|
Pluggable auth providers in `auth/`:
|
||||||
- **Google** (`google`): OAuth via Google
|
- **Google** (`google`): OAuth via Google
|
||||||
- **Email** (`email`): Email magic link (itsdangerous token, no password needed)
|
- **Email** (`email`): Email magic link (itsdangerous token)
|
||||||
- **Password** (`password`): Username/password authentication
|
- **Password** (`password`): Username/password
|
||||||
- **Desktop** (`desktop`): JWT for desktop app API
|
- **Desktop** (`desktop`): JWT for API
|
||||||
- New provider = `auth/<name>/provider.py` implementing `AuthProvider`
|
- New provider = `auth/<name>/provider.py` implementing `AuthProvider`
|
||||||
|
|
||||||
Configure data source in `config/instance.yaml` under `data_source.type`.
|
|
||||||
|
|
||||||
## Server Management
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Add analyst user
|
|
||||||
sudo add-analyst username "ssh-rsa AAAA..."
|
|
||||||
|
|
||||||
# Add privileged analyst
|
|
||||||
sudo add-analyst username "ssh-rsa AAAA..." --private
|
|
||||||
|
|
||||||
# List analysts
|
|
||||||
list-analysts
|
|
||||||
|
|
||||||
# Server monitoring
|
|
||||||
uptime && free -h && df -h /data
|
|
||||||
```
|
|
||||||
|
|
||||||
## Returning Users
|
|
||||||
|
|
||||||
When reopening the project in Claude Code:
|
|
||||||
1. Sync latest data: `rsync -avz --no-perms --no-group data-analyst:server/parquet/ ./server/parquet/`
|
|
||||||
2. Verify DuckDB: `ls -lh user/duckdb/analytics.duckdb`
|
|
||||||
3. Start analyzing with Claude Code
|
|
||||||
|
|
||||||
## Key Implementation Details
|
## Key Implementation Details
|
||||||
|
|
||||||
### Config Loading Chain
|
### DuckDB Schema (src/db.py)
|
||||||
1. `config/loader.py` loads `instance.yaml` (checks `$CONFIG_DIR`, then `./config/`)
|
- Schema v2 with auto-migration from v1
|
||||||
2. `webapp/config.py` calls `_load_instance_config()` at module level
|
- `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc.
|
||||||
3. `_get(config, *keys, default="")` traverses nested dicts safely
|
- `sync_state`, `sync_history`: track extraction progress
|
||||||
4. `inject_config()` context processor exposes `Config` to all Jinja templates
|
- `users`, `dataset_permissions`, `audit_log`: auth + RBAC
|
||||||
5. Templates use `{{ config.INSTANCE_NAME }}`, `{{ config.INSTANCE_SUBTITLE }}`, etc.
|
- System DB at `{DATA_DIR}/state/system.duckdb`
|
||||||
|
- Analytics DB at `{DATA_DIR}/analytics/server.duckdb`
|
||||||
|
|
||||||
|
### SyncOrchestrator (src/orchestrator.py)
|
||||||
|
- `rebuild()`: scans extracts dir, ATTACHes all, creates master views, updates sync_state
|
||||||
|
- `rebuild_source(name)`: single source (used after Jira webhooks)
|
||||||
|
- Thread-safe via `_rebuild_lock`
|
||||||
|
|
||||||
### Connector Pattern
|
### Connector Pattern
|
||||||
- ABC: `DataSource` class in `src/data_sync.py`
|
- **Keboola**: `connectors/keboola/extractor.py` uses DuckDB Keboola extension, fallback to `client.py`
|
||||||
- Registry: `create_data_source()` in `src/data_sync.py` auto-discovers connectors in `connectors/`
|
- **BigQuery**: `connectors/bigquery/extractor.py` uses DuckDB BQ extension (remote-only, no download)
|
||||||
- Keboola: `connectors/keboola/adapter.py` -> `KeboolaDataSource` implementing `DataSource`
|
- **Jira**: `connectors/jira/webhook.py` → `incremental_transform.py` → `extract_init.py` updates `_meta`
|
||||||
- Core Keboola logic: `connectors/keboola/client.py` (Keboola Storage API wrapper)
|
- `connectors/keboola/client.py`: legacy Keboola Storage API wrapper (kept as fallback)
|
||||||
|
|
||||||
### Auth Provider Pattern
|
### Config Loading
|
||||||
- ABC: `AuthProvider` class in `auth/__init__.py`
|
1. `config/loader.py` loads `instance.yaml`
|
||||||
- Discovery: `discover_providers()` scans `auth/*/provider.py`
|
2. `app/instance_config.py` exposes `get_data_source_type()`, `get_value()`
|
||||||
- Providers: google, email, desktop (each exports `provider` instance)
|
3. Table config lives in DuckDB `table_registry` (not markdown files)
|
||||||
- Email provider: uses `itsdangerous.URLSafeTimedSerializer` for magic link tokens
|
|
||||||
- Multi-domain: `auth.allowed_domain` in instance.yaml supports comma-separated domains
|
|
||||||
- Session contract: all providers set `session["user"] = {"email", "name", "picture"}`
|
|
||||||
|
|
||||||
### Service Pattern
|
|
||||||
- Self-contained modules in `services/` with `__main__.py` for `python -m services.<name>`
|
|
||||||
- Systemd files in `services/<name>/systemd/`, auto-discovered by `deploy.sh`
|
|
||||||
- Services: telegram_bot, ws_gateway, corporate_memory, session_collector
|
|
||||||
|
|
||||||
### Business Metrics Pattern
|
|
||||||
- YAML definitions in `docs/metrics/{category}/{metric}.yml` (list with one dict)
|
|
||||||
- `webapp/utils/metric_parser.py` - parses YAML, structures for modal UI, auto-discovers `sql_*` fields
|
|
||||||
- `webapp/app.py` `_load_metrics_data()` - scans metrics dir, groups by category, returns ordered list
|
|
||||||
- Catalog template renders dynamically via Jinja loop (no hardcoded metrics)
|
|
||||||
- Profiler links metrics to tables via `used_by_metrics` in `profiles.json`
|
|
||||||
- Production: metrics in instance repo deployed to `/data/docs/metrics/`
|
|
||||||
- Sample/dev: OSS repo `docs/metrics/` (10 e-commerce metrics)
|
|
||||||
|
|
||||||
### Table Registry Pattern
|
|
||||||
- `src/table_registry.py` - central CRUD for registered tables with atomic JSON persistence
|
|
||||||
- Audit logging for register/unregister operations
|
|
||||||
- Generates `data_description.md` from registry state
|
|
||||||
|
|
||||||
### Server Patterns
|
|
||||||
- Atomic JSON writes: `tempfile.mkstemp()` + `os.fchmod(fd, 0o660)` + `os.replace()`
|
|
||||||
- User home writes: `sudo /usr/bin/install -o {user} -g {user}` pattern
|
|
||||||
- Staging dir: `/tmp/data_analyst_staging` (deploy.sh creates it with setgid)
|
|
||||||
- Dev docs: `dev_docs/server.md` documents all established patterns
|
|
||||||
|
|
||||||
### Files NOT to modify (stable infrastructure)
|
### Files NOT to modify (stable infrastructure)
|
||||||
- `src/parquet_manager.py` - Parquet conversion engine
|
|
||||||
- `connectors/jira/file_lock.py` - Advisory file locking
|
- `connectors/jira/file_lock.py` - Advisory file locking
|
||||||
- `connectors/jira/incremental_transform.py` - Jira monthly Parquet transform
|
- `connectors/jira/transform.py` - Core Jira transform logic
|
||||||
- `services/ws_gateway/` - WebSocket notification gateway
|
- `services/ws_gateway/` - WebSocket notification gateway
|
||||||
|
|
||||||
## Git Commits & Pull Requests
|
## Git Commits & Pull Requests
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,20 @@ services:
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 3
|
retries: 3
|
||||||
|
|
||||||
|
# One-shot: run extractor then rebuild orchestrator views
|
||||||
|
extract:
|
||||||
|
build: .
|
||||||
|
command: >
|
||||||
|
sh -c "python -m connectors.keboola.extractor &&
|
||||||
|
python -c 'from src.orchestrator import SyncOrchestrator; print(SyncOrchestrator().rebuild())'"
|
||||||
|
volumes:
|
||||||
|
- data:/data
|
||||||
|
env_file: .env
|
||||||
|
environment:
|
||||||
|
- DATA_DIR=/data
|
||||||
|
profiles:
|
||||||
|
- extract
|
||||||
|
|
||||||
scheduler:
|
scheduler:
|
||||||
build: .
|
build: .
|
||||||
command: python -m services.scheduler
|
command: python -m services.scheduler
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue