feat: add Docker, CLI tool, scheduler, and agent skills

- Dockerfile (uv-based) + docker-compose.yml (3 services)
- CLI tool 'da' with commands: auth, sync, query, status, admin, diagnose, skills
- Scheduler sidecar service (replaces systemd timers)
- pyproject.toml for uv distribution
- Built-in skills (setup, troubleshoot) for AI agents
- 17 CLI tests, 75 total tests passing
This commit is contained in:
ZdenekSrotyr 2026-03-27 15:30:03 +01:00
parent a3918d3833
commit 3701130a11
22 changed files with 1155 additions and 0 deletions

17
.dockerignore Normal file
View file

@ -0,0 +1,17 @@
.git
.venv
venv
__pycache__
*.pyc
.env
.env.*
data/
dev_data/
*.egg-info
dist/
build/
.pytest_cache
.mypy_cache
.claude/
node_modules/
docs/ZS_PADAK_*

17
Dockerfile Normal file
View file

@ -0,0 +1,17 @@
FROM python:3.13-slim
# Install uv for fast dependency management
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
WORKDIR /app
# Copy dependency files first for layer caching
COPY requirements.txt .
RUN uv pip install --system --no-cache -r requirements.txt
# Copy application code
COPY . .
# Default: run FastAPI server
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

0
cli/__init__.py Normal file
View file

50
cli/client.py Normal file
View file

@ -0,0 +1,50 @@
"""HTTP client wrapper for CLI — handles auth, retries, streaming."""
from typing import Optional
import httpx
from cli.config import get_server_url, get_token
def get_client(timeout: float = 30.0) -> httpx.Client:
"""Get an authenticated httpx client."""
token = get_token()
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
return httpx.Client(
base_url=get_server_url(),
headers=headers,
timeout=timeout,
)
def api_get(path: str, **kwargs) -> httpx.Response:
with get_client() as client:
return client.get(path, **kwargs)
def api_post(path: str, **kwargs) -> httpx.Response:
with get_client() as client:
return client.post(path, **kwargs)
def api_delete(path: str, **kwargs) -> httpx.Response:
with get_client() as client:
return client.delete(path, **kwargs)
def stream_download(path: str, target_path: str, progress_callback=None) -> int:
"""Stream download a file from the API. Returns bytes written."""
with get_client(timeout=300.0) as client:
with client.stream("GET", path) as response:
response.raise_for_status()
total = 0
with open(target_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=65536):
f.write(chunk)
total += len(chunk)
if progress_callback:
progress_callback(len(chunk))
return total

0
cli/commands/__init__.py Normal file
View file

52
cli/commands/admin.py Normal file
View file

@ -0,0 +1,52 @@
"""Admin commands — da admin."""
import json
import typer
from cli.client import api_get, api_post, api_delete
admin_app = typer.Typer(help="Admin operations (requires admin role)")
@admin_app.command("add-user")
def add_user(
email: str = typer.Argument(..., help="User email"),
name: str = typer.Option("", help="User display name"),
role: str = typer.Option("analyst", help="Role: viewer, analyst, admin, km_admin"),
):
"""Add a new user."""
resp = api_post("/api/users", json={"email": email, "name": name or email.split("@")[0], "role": role})
if resp.status_code == 201:
data = resp.json()
typer.echo(f"Created user: {data['email']} (id: {data['id']}, role: {data['role']})")
else:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
@admin_app.command("list-users")
def list_users(as_json: bool = typer.Option(False, "--json")):
"""List all users."""
resp = api_get("/api/users")
if resp.status_code != 200:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
users = resp.json()
if as_json:
typer.echo(json.dumps(users, indent=2))
else:
for u in users:
typer.echo(f" {u['email']:30s} role={u['role']:10s} id={u['id'][:8]}")
@admin_app.command("remove-user")
def remove_user(user_id: str = typer.Argument(..., help="User ID to remove")):
"""Remove a user."""
resp = api_delete(f"/api/users/{user_id}")
if resp.status_code == 204:
typer.echo("User removed.")
else:
typer.echo(f"Failed: {resp.text}", err=True)
raise typer.Exit(1)

58
cli/commands/auth.py Normal file
View file

@ -0,0 +1,58 @@
"""Auth commands — da login, da logout, da whoami."""
import typer
from cli.client import api_post, api_get
from cli.config import save_token, clear_token, get_token, get_server_url
auth_app = typer.Typer(help="Authentication commands")
@auth_app.command()
def login(
email: str = typer.Option(..., prompt=True, help="Your email address"),
server: str = typer.Option(None, help="Server URL override"),
):
"""Login and obtain a JWT token."""
if server:
import os
os.environ["DA_SERVER"] = server
try:
resp = api_post("/auth/token", json={"email": email})
if resp.status_code == 200:
data = resp.json()
save_token(data["access_token"], data["email"], data["role"])
typer.echo(f"Logged in as {data['email']} (role: {data['role']})")
else:
typer.echo(f"Login failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
except Exception as e:
typer.echo(f"Connection error: {e}", err=True)
raise typer.Exit(1)
@auth_app.command()
def logout():
"""Clear stored token."""
clear_token()
typer.echo("Logged out.")
@auth_app.command()
def whoami():
"""Show current user info."""
token = get_token()
if not token:
typer.echo("Not logged in. Run: da login")
raise typer.Exit(1)
import jwt
try:
payload = jwt.decode(token, options={"verify_signature": False})
typer.echo(f"Email: {payload.get('email', 'unknown')}")
typer.echo(f"Role: {payload.get('role', 'unknown')}")
typer.echo(f"Server: {get_server_url()}")
except Exception:
typer.echo("Invalid token. Run: da login")
raise typer.Exit(1)

75
cli/commands/diagnose.py Normal file
View file

@ -0,0 +1,75 @@
"""Diagnose command — da diagnose."""
import json
import typer
from cli.client import api_get
diagnose_app = typer.Typer(help="System diagnostics")
@diagnose_app.callback(invoke_without_command=True)
def diagnose(
symptom: str = typer.Option(None, "--symptom", help="Describe the problem"),
component: str = typer.Option(None, "--component", help="Check specific component"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Run comprehensive system diagnostics. AI-agent friendly output."""
checks = []
# 1. API reachability
try:
resp = api_get("/api/health")
health = resp.json()
checks.append({"name": "api", "status": "ok", "latency_ms": resp.elapsed.total_seconds() * 1000})
# Extract service checks
for svc_name, svc_data in health.get("services", {}).items():
check = {"name": svc_name, "status": svc_data.get("status", "unknown")}
check.update({k: v for k, v in svc_data.items() if k != "status"})
checks.append(check)
except Exception as e:
checks.append({"name": "api", "status": "error", "detail": str(e)})
# Determine overall
overall = "healthy"
for c in checks:
if c["status"] == "error":
overall = "unhealthy"
break
if c["status"] == "warning":
overall = "degraded"
# Generate suggested actions
actions = []
for c in checks:
if c["status"] == "error" and c["name"] == "api":
actions.append("Server unreachable. Check: docker compose ps, da server logs")
if c.get("stale_tables"):
for t in c["stale_tables"]:
actions.append(f"Table '{t}' is stale. Run: da server logs scheduler | grep {t}")
result = {
"overall": overall,
"checks": checks,
"suggested_actions": actions,
}
if as_json:
typer.echo(json.dumps(result, indent=2))
else:
typer.echo(f"Overall: {overall}")
for c in checks:
detail = ""
if "detail" in c:
detail = f"{c['detail']}"
if "tables" in c:
detail = f" ({c['tables']} tables)"
if "latency_ms" in c:
detail = f" ({c['latency_ms']:.0f}ms)"
typer.echo(f" [{c['status']:7s}] {c['name']}{detail}")
if actions:
typer.echo("\nSuggested actions:")
for a in actions:
typer.echo(f" - {a}")

77
cli/commands/query.py Normal file
View file

@ -0,0 +1,77 @@
"""Query commands — da query."""
import json
import os
from pathlib import Path
import typer
def query_command(
sql: str = typer.Argument(..., help="SQL query to execute"),
remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"),
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"),
limit: int = typer.Option(1000, "--limit", help="Max rows to return"),
):
"""Execute SQL query against DuckDB."""
if remote:
_query_remote(sql, fmt, limit)
else:
_query_local(sql, fmt, limit)
def _query_local(sql: str, fmt: str, limit: int):
"""Run query against local DuckDB."""
import duckdb
local_dir = Path(os.environ.get("DA_LOCAL_DIR", "."))
db_path = local_dir / "user" / "duckdb" / "analytics.duckdb"
if not db_path.exists():
typer.echo("Local DuckDB not found. Run: da sync", err=True)
raise typer.Exit(1)
conn = duckdb.connect(str(db_path), read_only=True)
try:
result = conn.execute(sql).fetchmany(limit)
columns = [desc[0] for desc in conn.description] if conn.description else []
_output(columns, result, fmt)
except Exception as e:
typer.echo(f"Query error: {e}", err=True)
raise typer.Exit(1)
finally:
conn.close()
def _query_remote(sql: str, fmt: str, limit: int):
"""Run query against server DuckDB via API."""
from cli.client import api_post
resp = api_post("/api/query", json={"sql": sql, "limit": limit})
if resp.status_code != 200:
typer.echo(f"Query failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
data = resp.json()
_output(data["columns"], data["rows"], fmt)
if data.get("truncated"):
typer.echo(f"(truncated at {limit} rows)", err=True)
def _output(columns: list, rows: list, fmt: str):
if fmt == "json":
output = [dict(zip(columns, row)) for row in rows]
typer.echo(json.dumps(output, indent=2, default=str))
elif fmt == "csv":
typer.echo(",".join(columns))
for row in rows:
typer.echo(",".join(str(v) if v is not None else "" for v in row))
else:
# Table format using rich
from rich.console import Console
from rich.table import Table
console = Console()
table = Table()
for col in columns:
table.add_column(col)
for row in rows:
table.add_row(*(str(v) if v is not None else "" for v in row))
console.print(table)

32
cli/commands/skills.py Normal file
View file

@ -0,0 +1,32 @@
"""Skills command — da skills. Knowledge base for AI agents."""
from pathlib import Path
import typer
skills_app = typer.Typer(help="Built-in knowledge base for AI agents")
SKILLS_DIR = Path(__file__).parent.parent / "skills"
@skills_app.command("list")
def list_skills():
"""List available skills."""
if not SKILLS_DIR.exists():
typer.echo("No skills directory found.")
return
for f in sorted(SKILLS_DIR.glob("*.md")):
name = f.stem
# Read first line as description
first_line = f.read_text().split("\n")[0].strip("# ").strip()
typer.echo(f" {name:25s} {first_line}")
@skills_app.command("show")
def show_skill(name: str = typer.Argument(..., help="Skill name to display")):
"""Display a skill's content."""
skill_file = SKILLS_DIR / f"{name}.md"
if not skill_file.exists():
typer.echo(f"Skill '{name}' not found. Run: da skills list", err=True)
raise typer.Exit(1)
typer.echo(skill_file.read_text())

55
cli/commands/status.py Normal file
View file

@ -0,0 +1,55 @@
"""Status commands — da status."""
import json
import typer
from cli.client import api_get
from cli.config import get_sync_state
status_app = typer.Typer(help="System status")
@status_app.callback(invoke_without_command=True)
def status(
local: bool = typer.Option(False, "--local", help="Show local-only status (no server)"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Show system health and sync status."""
if local:
state = get_sync_state()
info = {
"mode": "local",
"tables_synced": len(state.get("tables", {})),
"last_sync": state.get("last_sync", "never"),
"tables": state.get("tables", {}),
}
if as_json:
typer.echo(json.dumps(info, indent=2))
else:
typer.echo(f"Mode: offline (local data)")
typer.echo(f"Tables synced: {info['tables_synced']}")
typer.echo(f"Last sync: {info['last_sync']}")
return
try:
resp = api_get("/api/health")
data = resp.json()
if as_json:
typer.echo(json.dumps(data, indent=2))
else:
typer.echo(f"Status: {data.get('status', 'unknown')}")
for name, check in data.get("services", {}).items():
s = check.get("status", "?")
detail = ""
if "tables" in check:
detail = f" ({check['tables']} tables, {check.get('total_rows', 0)} rows)"
if "count" in check:
detail = f" ({check['count']})"
if check.get("stale_tables"):
detail += f" [stale: {', '.join(check['stale_tables'])}]"
typer.echo(f" {name}: {s}{detail}")
except Exception as e:
typer.echo(f"Cannot reach server: {e}", err=True)
typer.echo("Use --local for offline status.")
raise typer.Exit(1)

177
cli/commands/sync.py Normal file
View file

@ -0,0 +1,177 @@
"""Sync commands — da sync."""
import json
import os
from pathlib import Path
import typer
from rich.progress import Progress, SpinnerColumn, TextColumn
from cli.client import api_get, api_post, stream_download
from cli.config import get_sync_state, save_sync_state
sync_app = typer.Typer(help="Data synchronization")
def _local_data_dir() -> Path:
return Path(os.environ.get("DA_LOCAL_DIR", "."))
@sync_app.callback(invoke_without_command=True)
def sync(
table: str = typer.Option(None, "--table", help="Sync specific table only"),
upload_only: bool = typer.Option(False, "--upload-only", help="Only upload sessions/artifacts"),
docs_only: bool = typer.Option(False, "--docs-only", help="Only sync documentation"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Sync data between server and local machine."""
if upload_only:
_upload(as_json)
return
with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}")) as progress:
# 1. Get manifest
task = progress.add_task("Fetching manifest...", total=None)
try:
resp = api_get("/api/sync/manifest")
resp.raise_for_status()
manifest = resp.json()
except Exception as e:
typer.echo(f"Failed to fetch manifest: {e}", err=True)
raise typer.Exit(1)
server_tables = manifest.get("tables", {})
local_state = get_sync_state()
local_tables = local_state.get("tables", {})
# 2. Determine what to download
to_download = []
for tid, info in server_tables.items():
if table and tid != table:
continue
if docs_only:
continue
local_hash = local_tables.get(tid, {}).get("hash", "")
if info.get("hash", "") != local_hash:
to_download.append(tid)
progress.update(task, description=f"Found {len(to_download)} tables to sync")
# 3. Download parquets
local_dir = _local_data_dir()
parquet_dir = local_dir / "server" / "parquet"
parquet_dir.mkdir(parents=True, exist_ok=True)
results = {"downloaded": [], "skipped": [], "errors": []}
for tid in to_download:
progress.update(task, description=f"Downloading {tid}...")
target = parquet_dir / f"{tid}.parquet"
try:
stream_download(f"/api/data/{tid}/download", str(target))
local_tables[tid] = {
"hash": server_tables[tid].get("hash", ""),
"rows": server_tables[tid].get("rows", 0),
"size_bytes": server_tables[tid].get("size_bytes", 0),
}
results["downloaded"].append(tid)
except Exception as e:
results["errors"].append({"table": tid, "error": str(e)})
# 4. Save local state
from datetime import datetime, timezone
local_state["tables"] = local_tables
local_state["last_sync"] = datetime.now(timezone.utc).isoformat()
save_sync_state(local_state)
# 5. Rebuild DuckDB views
if results["downloaded"]:
progress.update(task, description="Rebuilding DuckDB views...")
_rebuild_duckdb_views(local_dir, parquet_dir)
progress.update(task, description="Sync complete")
# Output
skipped = len(server_tables) - len(to_download)
if as_json:
typer.echo(json.dumps(results, indent=2))
else:
typer.echo(f"Downloaded: {len(results['downloaded'])} tables")
typer.echo(f"Skipped (unchanged): {skipped}")
if results["errors"]:
typer.echo(f"Errors: {len(results['errors'])}")
for err in results["errors"]:
typer.echo(f" {err['table']}: {err['error']}")
def _rebuild_duckdb_views(local_dir: Path, parquet_dir: Path):
"""Recreate DuckDB views from downloaded parquets. Preserve user tables."""
import duckdb
db_path = local_dir / "user" / "duckdb" / "analytics.duckdb"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = duckdb.connect(str(db_path))
# Get existing user-created tables (not views)
try:
existing_tables = {
row[0] for row in
conn.execute("SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE'").fetchall()
}
except Exception:
existing_tables = set()
# Drop all views
try:
views = conn.execute(
"SELECT table_name FROM information_schema.tables WHERE table_type='VIEW'"
).fetchall()
for (view_name,) in views:
conn.execute(f'DROP VIEW IF EXISTS "{view_name}"')
except Exception:
pass
# Create views for each parquet file
for pq_file in parquet_dir.rglob("*.parquet"):
view_name = pq_file.stem
if view_name in existing_tables:
continue # don't shadow user tables
abs_path = str(pq_file.resolve())
conn.execute(f"CREATE VIEW \"{view_name}\" AS SELECT * FROM read_parquet('{abs_path}')")
conn.close()
def _upload(as_json: bool):
"""Upload sessions and CLAUDE.local.md to server."""
local_dir = _local_data_dir()
results = {"sessions": 0, "local_md": False}
# Upload sessions
sessions_dir = local_dir / "user" / "sessions"
if sessions_dir.exists():
for f in sessions_dir.glob("*.jsonl"):
try:
with open(f, "rb") as fh:
resp = api_post("/api/upload/sessions", files={"file": (f.name, fh)})
if resp.status_code == 200:
results["sessions"] += 1
except Exception:
pass
# Upload CLAUDE.local.md
local_md = local_dir / ".claude" / "CLAUDE.local.md"
if local_md.exists():
content = local_md.read_text(encoding="utf-8")
try:
resp = api_post("/api/upload/local-md", json={"content": content})
if resp.status_code == 200:
results["local_md"] = True
except Exception:
pass
if as_json:
typer.echo(json.dumps(results, indent=2))
else:
typer.echo(f"Uploaded {results['sessions']} sessions")
if results["local_md"]:
typer.echo("Uploaded CLAUDE.local.md")

60
cli/config.py Normal file
View file

@ -0,0 +1,60 @@
"""CLI configuration — token storage, server URL, sync state."""
import json
import os
from pathlib import Path
from typing import Optional
def _config_dir() -> Path:
d = Path(os.environ.get("DA_CONFIG_DIR", os.path.expanduser("~/.config/da")))
d.mkdir(parents=True, exist_ok=True)
return d
def get_server_url() -> str:
config = load_config()
return os.environ.get("DA_SERVER", config.get("server", "http://localhost:8000"))
def get_token() -> Optional[str]:
token_file = _config_dir() / "token.json"
if token_file.exists():
data = json.loads(token_file.read_text())
return data.get("access_token")
return os.environ.get("DA_TOKEN")
def save_token(token: str, email: str, role: str):
token_file = _config_dir() / "token.json"
token_file.write_text(json.dumps({
"access_token": token,
"email": email,
"role": role,
}, indent=2))
def clear_token():
token_file = _config_dir() / "token.json"
if token_file.exists():
token_file.unlink()
def load_config() -> dict:
config_file = _config_dir() / "config.yaml"
if config_file.exists():
import yaml
return yaml.safe_load(config_file.read_text()) or {}
return {}
def get_sync_state() -> dict:
state_file = _config_dir() / "sync_state.json"
if state_file.exists():
return json.loads(state_file.read_text())
return {}
def save_sync_state(state: dict):
state_file = _config_dir() / "sync_state.json"
state_file.write_text(json.dumps(state, indent=2))

33
cli/main.py Normal file
View file

@ -0,0 +1,33 @@
"""da — CLI tool for AI Data Analyst.
Primary interface for AI agents. Install: uv tool install data-analyst
"""
import typer
from cli.commands.auth import auth_app
from cli.commands.sync import sync_app
from cli.commands.query import query_command
from cli.commands.status import status_app
from cli.commands.admin import admin_app
from cli.commands.diagnose import diagnose_app
from cli.commands.skills import skills_app
app = typer.Typer(
name="da",
help="AI Data Analyst CLI — data sync, queries, and admin for AI agents",
no_args_is_help=True,
)
# Register subcommands
app.add_typer(auth_app, name="auth")
app.add_typer(sync_app, name="sync")
app.command("query")(query_command)
app.add_typer(status_app, name="status")
app.add_typer(admin_app, name="admin")
app.add_typer(diagnose_app, name="diagnose")
app.add_typer(skills_app, name="skills")
if __name__ == "__main__":
app()

58
cli/skills/setup.md Normal file
View file

@ -0,0 +1,58 @@
# Setup — Complete guide for deploying a new instance
## Prerequisites
- Docker and Docker Compose installed
- Domain name pointing to server IP (for SSL)
- Data source credentials (Keboola token OR BigQuery service account)
## Steps
1. Clone the repository:
```bash
git clone <repo-url>
cd data-analyst
```
2. Create configuration:
```bash
cp config/instance.yaml.example config/instance.yaml
# Edit instance.yaml with your settings
```
3. Create environment file:
```bash
cp config/.env.template .env
# Fill in: JWT_SECRET_KEY, KEBOOLA_STORAGE_TOKEN (or BIGQUERY_PROJECT), etc.
```
4. Start services:
```bash
docker compose up -d
```
5. Verify health:
```bash
da status --server http://your-server:8000
```
6. Create first admin user:
```bash
da login --email admin@company.com --server http://your-server:8000
da admin add-user admin@company.com --role admin
```
7. Trigger initial data sync:
```bash
da admin trigger-sync
```
8. Verify data:
```bash
da status
```
## Troubleshooting
- **Cannot connect:** Check `docker compose ps`, verify port 8000 is exposed
- **Auth fails:** Verify JWT_SECRET_KEY is set in .env
- **No data:** Check data source credentials, run `da diagnose`

View file

@ -0,0 +1,34 @@
# Troubleshoot — Diagnostic procedures
## Quick Check
```bash
da diagnose --json
```
## Common Issues
### Data not updating
1. `da diagnose --component data` — check data freshness
2. `da server logs scheduler --since 1h` — check scheduler logs
3. Verify data source credentials: `da admin test-connection`
### Cannot login
1. Check server is running: `curl http://server:8000/api/health`
2. Check user exists: `da admin list-users` (from admin account)
3. Re-generate token: `da login --email your@email.com`
### DuckDB errors locally
1. Re-sync: `da sync` (rebuilds views)
2. Check disk space: `du -sh user/duckdb/`
3. Delete and re-create: `rm user/duckdb/analytics.duckdb && da sync`
### Server unresponsive
1. `docker compose ps` — check container status
2. `docker compose logs app --tail 50` — check app logs
3. `docker compose restart app` — restart app
## Escalation
If automated diagnostics don't help:
1. Collect full diagnostic: `da diagnose --json > /tmp/diag.json`
2. Collect server logs: `docker compose logs > /tmp/logs.txt`
3. Share both files with admin

31
docker-compose.test.yml Normal file
View file

@ -0,0 +1,31 @@
services:
app:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000
environment:
- DATA_DIR=/data
- JWT_SECRET_KEY=test-secret-for-ci
- TESTING=true
volumes:
- test-data:/data
healthcheck:
test: ["CMD", "python", "-c", "import httpx; r=httpx.get('http://localhost:8000/api/health'); exit(0 if r.status_code==200 else 1)"]
interval: 5s
timeout: 3s
retries: 10
test-runner:
build: .
command: python -m pytest tests/test_db.py tests/test_repositories.py tests/test_migration.py tests/test_api.py -v
environment:
- DATA_DIR=/data
- JWT_SECRET_KEY=test-secret-for-ci
- API_URL=http://app:8000
volumes:
- test-data:/data
depends_on:
app:
condition: service_healthy
volumes:
test-data:

48
docker-compose.yml Normal file
View file

@ -0,0 +1,48 @@
services:
app:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
ports:
- "8000:8000"
volumes:
- .:/app
- data:/data
env_file: .env
environment:
- DATA_DIR=/data
healthcheck:
test: ["CMD", "python", "-c", "import httpx; r=httpx.get('http://localhost:8000/api/health'); exit(0 if r.status_code==200 else 1)"]
interval: 30s
timeout: 5s
retries: 3
scheduler:
build: .
command: python -m services.scheduler
volumes:
- data:/data
env_file: .env
environment:
- DATA_DIR=/data
- API_URL=http://app:8000
depends_on:
app:
condition: service_healthy
restart: unless-stopped
telegram-bot:
build: .
command: python -m services.telegram_bot
volumes:
- data:/data
env_file: .env
environment:
- DATA_DIR=/data
depends_on:
- app
profiles:
- full
restart: unless-stopped
volumes:
data:

43
pyproject.toml Normal file
View file

@ -0,0 +1,43 @@
[project]
name = "data-analyst"
version = "2.0.0"
description = "AI Data Analyst — data distribution platform for AI analytical systems"
requires-python = ">=3.9"
license = "MIT"
dependencies = [
"duckdb>=0.9.0",
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
"python-multipart>=0.0.9",
"jinja2>=3.1.0",
"PyJWT>=2.8.0",
"httpx>=0.27.0",
"typer>=0.12.0",
"rich>=13.0.0",
"python-dotenv>=1.0.0",
"pyyaml>=6.0",
]
[project.scripts]
da = "cli.main:app"
[project.optional-dependencies]
connectors = [
"kbcstorage>=0.9.0",
"google-cloud-bigquery>=3.0.0",
"google-cloud-bigquery-storage>=2.0.0",
"pandas>=2.0.0",
"pyarrow>=12.0.0",
]
telegram = [
"aiohttp>=3.9.0",
]
dev = [
"pytest>=7.0.0",
"pytest-mock>=3.0.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View file

View file

@ -0,0 +1,86 @@
"""Scheduler service — replaces systemd timers.
Lightweight sidecar that triggers jobs by calling the main app's API.
Keeps all business logic in the main app.
Usage: python -m services.scheduler
"""
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
import httpx
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s [scheduler] %(message)s",
)
logger = logging.getLogger(__name__)
API_URL = os.environ.get("API_URL", "http://localhost:8000")
SCHEDULER_API_TOKEN = os.environ.get("SCHEDULER_API_TOKEN", "")
# Schedule definitions: (name, interval_seconds, api_endpoint, http_method)
JOBS = [
("data-refresh", 15 * 60, "/api/sync/trigger", "POST"),
("health-check", 5 * 60, "/api/health", "GET"),
]
_running = True
def _signal_handler(sig, frame):
global _running
logger.info(f"Received signal {sig}, shutting down...")
_running = False
def _call_api(endpoint: str, method: str = "POST") -> bool:
"""Call the main app API. Returns True on success."""
url = f"{API_URL}{endpoint}"
headers = {}
if SCHEDULER_API_TOKEN:
headers["Authorization"] = f"Bearer {SCHEDULER_API_TOKEN}"
try:
if method == "POST":
resp = httpx.post(url, headers=headers, timeout=120)
else:
resp = httpx.get(url, headers=headers, timeout=30)
if resp.status_code < 400:
logger.info(f"Job {endpoint}: {resp.status_code}")
return True
else:
logger.warning(f"Job {endpoint}: HTTP {resp.status_code} - {resp.text[:200]}")
return False
except Exception as e:
logger.error(f"Job {endpoint} failed: {e}")
return False
def run():
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
logger.info(f"Scheduler started. API_URL={API_URL}, {len(JOBS)} jobs configured.")
# Track last run time per job
last_run = {name: 0.0 for name, _, _, _ in JOBS}
while _running:
now = time.time()
for name, interval, endpoint, method in JOBS:
if now - last_run[name] >= interval:
logger.info(f"Running job: {name}")
_call_api(endpoint, method)
last_run[name] = now
time.sleep(10) # check every 10 seconds
logger.info("Scheduler stopped.")
if __name__ == "__main__":
run()

152
tests/test_cli.py Normal file
View file

@ -0,0 +1,152 @@
"""Tests for CLI commands."""
import json
import os
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local"))
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-for-cli-tests")
(tmp_path / "config").mkdir()
(tmp_path / "local").mkdir()
(tmp_path / "data").mkdir()
yield tmp_path
class TestCLIHelp:
def test_main_help(self):
result = runner.invoke(app, ["--help"])
assert result.exit_code == 0
assert "AI Data Analyst CLI" in result.output
def test_auth_help(self):
result = runner.invoke(app, ["auth", "--help"])
assert result.exit_code == 0
assert "login" in result.output
def test_sync_help(self):
result = runner.invoke(app, ["sync", "--help"])
assert result.exit_code == 0
def test_query_help(self):
result = runner.invoke(app, ["query", "--help"])
assert result.exit_code == 0
def test_admin_help(self):
result = runner.invoke(app, ["admin", "--help"])
assert result.exit_code == 0
def test_diagnose_help(self):
result = runner.invoke(app, ["diagnose", "--help"])
assert result.exit_code == 0
def test_skills_help(self):
result = runner.invoke(app, ["skills", "--help"])
assert result.exit_code == 0
class TestSkills:
def test_list_skills(self):
result = runner.invoke(app, ["skills", "list"])
assert result.exit_code == 0
assert "setup" in result.output
assert "troubleshoot" in result.output
def test_show_skill(self):
result = runner.invoke(app, ["skills", "show", "setup"])
assert result.exit_code == 0
assert "Prerequisites" in result.output
def test_show_nonexistent_skill(self):
result = runner.invoke(app, ["skills", "show", "nonexistent"])
assert result.exit_code == 1
class TestAuth:
def test_whoami_not_logged_in(self):
result = runner.invoke(app, ["auth", "whoami"])
assert result.exit_code == 1
assert "Not logged in" in result.output
def test_logout(self):
result = runner.invoke(app, ["auth", "logout"])
assert result.exit_code == 0
assert "Logged out" in result.output
def test_login_with_mock_server(self, tmp_config):
"""Test login against a real FastAPI test server."""
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
repo = UserRepository(conn)
repo.create(id="u1", email="test@acme.com", name="Test", role="analyst")
conn.close()
from fastapi.testclient import TestClient
from app.main import create_app
test_app = create_app()
with patch("cli.client.get_client") as mock_get_client:
client = TestClient(test_app)
mock_get_client.return_value.__enter__ = MagicMock(return_value=client)
mock_get_client.return_value.__exit__ = MagicMock(return_value=False)
# Simulate the API call
resp = client.post("/auth/token", json={"email": "test@acme.com"})
assert resp.status_code == 200
token = resp.json()["access_token"]
# Save token manually (since we can't easily mock typer prompts)
from cli.config import save_token
save_token(token, "test@acme.com", "analyst")
# Now whoami should work
result = runner.invoke(app, ["auth", "whoami"])
assert result.exit_code == 0
assert "test@acme.com" in result.output
class TestStatus:
def test_local_status_empty(self):
result = runner.invoke(app, ["status", "--local"])
assert result.exit_code == 0
assert "Tables synced: 0" in result.output
def test_local_status_json(self):
result = runner.invoke(app, ["status", "--local", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["mode"] == "local"
class TestQuery:
def test_query_no_db(self, tmp_config):
result = runner.invoke(app, ["query", "SELECT 1"])
assert result.exit_code == 1
assert "not found" in result.output
def test_query_with_db(self, tmp_config):
import duckdb
local_dir = tmp_config / "local"
db_dir = local_dir / "user" / "duckdb"
db_dir.mkdir(parents=True)
conn = duckdb.connect(str(db_dir / "analytics.duckdb"))
conn.execute("CREATE TABLE test_table (id INT, name VARCHAR)")
conn.execute("INSERT INTO test_table VALUES (1, 'hello'), (2, 'world')")
conn.close()
result = runner.invoke(app, ["query", "SELECT count(*) as cnt FROM test_table", "--format", "json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data[0]["cnt"] == 2