From 3701130a11b3a4c117229d1246a1ad32cc28fda7 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Fri, 27 Mar 2026 15:30:03 +0100 Subject: [PATCH] feat: add Docker, CLI tool, scheduler, and agent skills - Dockerfile (uv-based) + docker-compose.yml (3 services) - CLI tool 'da' with commands: auth, sync, query, status, admin, diagnose, skills - Scheduler sidecar service (replaces systemd timers) - pyproject.toml for uv distribution - Built-in skills (setup, troubleshoot) for AI agents - 17 CLI tests, 75 total tests passing --- .dockerignore | 17 ++++ Dockerfile | 17 ++++ cli/__init__.py | 0 cli/client.py | 50 ++++++++++ cli/commands/__init__.py | 0 cli/commands/admin.py | 52 ++++++++++ cli/commands/auth.py | 58 +++++++++++ cli/commands/diagnose.py | 75 ++++++++++++++ cli/commands/query.py | 77 ++++++++++++++ cli/commands/skills.py | 32 ++++++ cli/commands/status.py | 55 ++++++++++ cli/commands/sync.py | 177 +++++++++++++++++++++++++++++++++ cli/config.py | 60 +++++++++++ cli/main.py | 33 ++++++ cli/skills/setup.md | 58 +++++++++++ cli/skills/troubleshoot.md | 34 +++++++ docker-compose.test.yml | 31 ++++++ docker-compose.yml | 48 +++++++++ pyproject.toml | 43 ++++++++ services/scheduler/__init__.py | 0 services/scheduler/__main__.py | 86 ++++++++++++++++ tests/test_cli.py | 152 ++++++++++++++++++++++++++++ 22 files changed, 1155 insertions(+) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 cli/__init__.py create mode 100644 cli/client.py create mode 100644 cli/commands/__init__.py create mode 100644 cli/commands/admin.py create mode 100644 cli/commands/auth.py create mode 100644 cli/commands/diagnose.py create mode 100644 cli/commands/query.py create mode 100644 cli/commands/skills.py create mode 100644 cli/commands/status.py create mode 100644 cli/commands/sync.py create mode 100644 cli/config.py create mode 100644 cli/main.py create mode 100644 cli/skills/setup.md create mode 100644 cli/skills/troubleshoot.md create mode 100644 docker-compose.test.yml create mode 100644 docker-compose.yml create mode 100644 pyproject.toml create mode 100644 services/scheduler/__init__.py create mode 100644 services/scheduler/__main__.py create mode 100644 tests/test_cli.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..584f9b8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,17 @@ +.git +.venv +venv +__pycache__ +*.pyc +.env +.env.* +data/ +dev_data/ +*.egg-info +dist/ +build/ +.pytest_cache +.mypy_cache +.claude/ +node_modules/ +docs/ZS_PADAK_* diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b036f0d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.13-slim + +# Install uv for fast dependency management +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv + +WORKDIR /app + +# Copy dependency files first for layer caching +COPY requirements.txt . +RUN uv pip install --system --no-cache -r requirements.txt + +# Copy application code +COPY . . + +# Default: run FastAPI server +EXPOSE 8000 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/cli/__init__.py b/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/client.py b/cli/client.py new file mode 100644 index 0000000..cf26e55 --- /dev/null +++ b/cli/client.py @@ -0,0 +1,50 @@ +"""HTTP client wrapper for CLI — handles auth, retries, streaming.""" + +from typing import Optional + +import httpx + +from cli.config import get_server_url, get_token + + +def get_client(timeout: float = 30.0) -> httpx.Client: + """Get an authenticated httpx client.""" + token = get_token() + headers = {} + if token: + headers["Authorization"] = f"Bearer {token}" + return httpx.Client( + base_url=get_server_url(), + headers=headers, + timeout=timeout, + ) + + +def api_get(path: str, **kwargs) -> httpx.Response: + with get_client() as client: + return client.get(path, **kwargs) + + +def api_post(path: str, **kwargs) -> httpx.Response: + with get_client() as client: + return client.post(path, **kwargs) + + +def api_delete(path: str, **kwargs) -> httpx.Response: + with get_client() as client: + return client.delete(path, **kwargs) + + +def stream_download(path: str, target_path: str, progress_callback=None) -> int: + """Stream download a file from the API. Returns bytes written.""" + with get_client(timeout=300.0) as client: + with client.stream("GET", path) as response: + response.raise_for_status() + total = 0 + with open(target_path, "wb") as f: + for chunk in response.iter_bytes(chunk_size=65536): + f.write(chunk) + total += len(chunk) + if progress_callback: + progress_callback(len(chunk)) + return total diff --git a/cli/commands/__init__.py b/cli/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cli/commands/admin.py b/cli/commands/admin.py new file mode 100644 index 0000000..c467634 --- /dev/null +++ b/cli/commands/admin.py @@ -0,0 +1,52 @@ +"""Admin commands — da admin.""" + +import json + +import typer + +from cli.client import api_get, api_post, api_delete + +admin_app = typer.Typer(help="Admin operations (requires admin role)") + + +@admin_app.command("add-user") +def add_user( + email: str = typer.Argument(..., help="User email"), + name: str = typer.Option("", help="User display name"), + role: str = typer.Option("analyst", help="Role: viewer, analyst, admin, km_admin"), +): + """Add a new user.""" + resp = api_post("/api/users", json={"email": email, "name": name or email.split("@")[0], "role": role}) + if resp.status_code == 201: + data = resp.json() + typer.echo(f"Created user: {data['email']} (id: {data['id']}, role: {data['role']})") + else: + typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True) + raise typer.Exit(1) + + +@admin_app.command("list-users") +def list_users(as_json: bool = typer.Option(False, "--json")): + """List all users.""" + resp = api_get("/api/users") + if resp.status_code != 200: + typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True) + raise typer.Exit(1) + + users = resp.json() + if as_json: + typer.echo(json.dumps(users, indent=2)) + else: + for u in users: + typer.echo(f" {u['email']:30s} role={u['role']:10s} id={u['id'][:8]}") + + +@admin_app.command("remove-user") +def remove_user(user_id: str = typer.Argument(..., help="User ID to remove")): + """Remove a user.""" + resp = api_delete(f"/api/users/{user_id}") + if resp.status_code == 204: + typer.echo("User removed.") + else: + typer.echo(f"Failed: {resp.text}", err=True) + raise typer.Exit(1) diff --git a/cli/commands/auth.py b/cli/commands/auth.py new file mode 100644 index 0000000..7577165 --- /dev/null +++ b/cli/commands/auth.py @@ -0,0 +1,58 @@ +"""Auth commands — da login, da logout, da whoami.""" + +import typer + +from cli.client import api_post, api_get +from cli.config import save_token, clear_token, get_token, get_server_url + +auth_app = typer.Typer(help="Authentication commands") + + +@auth_app.command() +def login( + email: str = typer.Option(..., prompt=True, help="Your email address"), + server: str = typer.Option(None, help="Server URL override"), +): + """Login and obtain a JWT token.""" + if server: + import os + os.environ["DA_SERVER"] = server + + try: + resp = api_post("/auth/token", json={"email": email}) + if resp.status_code == 200: + data = resp.json() + save_token(data["access_token"], data["email"], data["role"]) + typer.echo(f"Logged in as {data['email']} (role: {data['role']})") + else: + typer.echo(f"Login failed: {resp.json().get('detail', resp.text)}", err=True) + raise typer.Exit(1) + except Exception as e: + typer.echo(f"Connection error: {e}", err=True) + raise typer.Exit(1) + + +@auth_app.command() +def logout(): + """Clear stored token.""" + clear_token() + typer.echo("Logged out.") + + +@auth_app.command() +def whoami(): + """Show current user info.""" + token = get_token() + if not token: + typer.echo("Not logged in. Run: da login") + raise typer.Exit(1) + + import jwt + try: + payload = jwt.decode(token, options={"verify_signature": False}) + typer.echo(f"Email: {payload.get('email', 'unknown')}") + typer.echo(f"Role: {payload.get('role', 'unknown')}") + typer.echo(f"Server: {get_server_url()}") + except Exception: + typer.echo("Invalid token. Run: da login") + raise typer.Exit(1) diff --git a/cli/commands/diagnose.py b/cli/commands/diagnose.py new file mode 100644 index 0000000..ea29539 --- /dev/null +++ b/cli/commands/diagnose.py @@ -0,0 +1,75 @@ +"""Diagnose command — da diagnose.""" + +import json + +import typer + +from cli.client import api_get + +diagnose_app = typer.Typer(help="System diagnostics") + + +@diagnose_app.callback(invoke_without_command=True) +def diagnose( + symptom: str = typer.Option(None, "--symptom", help="Describe the problem"), + component: str = typer.Option(None, "--component", help="Check specific component"), + as_json: bool = typer.Option(False, "--json", help="Output as JSON"), +): + """Run comprehensive system diagnostics. AI-agent friendly output.""" + checks = [] + + # 1. API reachability + try: + resp = api_get("/api/health") + health = resp.json() + checks.append({"name": "api", "status": "ok", "latency_ms": resp.elapsed.total_seconds() * 1000}) + + # Extract service checks + for svc_name, svc_data in health.get("services", {}).items(): + check = {"name": svc_name, "status": svc_data.get("status", "unknown")} + check.update({k: v for k, v in svc_data.items() if k != "status"}) + checks.append(check) + except Exception as e: + checks.append({"name": "api", "status": "error", "detail": str(e)}) + + # Determine overall + overall = "healthy" + for c in checks: + if c["status"] == "error": + overall = "unhealthy" + break + if c["status"] == "warning": + overall = "degraded" + + # Generate suggested actions + actions = [] + for c in checks: + if c["status"] == "error" and c["name"] == "api": + actions.append("Server unreachable. Check: docker compose ps, da server logs") + if c.get("stale_tables"): + for t in c["stale_tables"]: + actions.append(f"Table '{t}' is stale. Run: da server logs scheduler | grep {t}") + + result = { + "overall": overall, + "checks": checks, + "suggested_actions": actions, + } + + if as_json: + typer.echo(json.dumps(result, indent=2)) + else: + typer.echo(f"Overall: {overall}") + for c in checks: + detail = "" + if "detail" in c: + detail = f" — {c['detail']}" + if "tables" in c: + detail = f" ({c['tables']} tables)" + if "latency_ms" in c: + detail = f" ({c['latency_ms']:.0f}ms)" + typer.echo(f" [{c['status']:7s}] {c['name']}{detail}") + if actions: + typer.echo("\nSuggested actions:") + for a in actions: + typer.echo(f" - {a}") diff --git a/cli/commands/query.py b/cli/commands/query.py new file mode 100644 index 0000000..461d68f --- /dev/null +++ b/cli/commands/query.py @@ -0,0 +1,77 @@ +"""Query commands — da query.""" + +import json +import os +from pathlib import Path + +import typer + +def query_command( + sql: str = typer.Argument(..., help="SQL query to execute"), + remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"), + fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"), + limit: int = typer.Option(1000, "--limit", help="Max rows to return"), +): + """Execute SQL query against DuckDB.""" + if remote: + _query_remote(sql, fmt, limit) + else: + _query_local(sql, fmt, limit) + + +def _query_local(sql: str, fmt: str, limit: int): + """Run query against local DuckDB.""" + import duckdb + + local_dir = Path(os.environ.get("DA_LOCAL_DIR", ".")) + db_path = local_dir / "user" / "duckdb" / "analytics.duckdb" + if not db_path.exists(): + typer.echo("Local DuckDB not found. Run: da sync", err=True) + raise typer.Exit(1) + + conn = duckdb.connect(str(db_path), read_only=True) + try: + result = conn.execute(sql).fetchmany(limit) + columns = [desc[0] for desc in conn.description] if conn.description else [] + _output(columns, result, fmt) + except Exception as e: + typer.echo(f"Query error: {e}", err=True) + raise typer.Exit(1) + finally: + conn.close() + + +def _query_remote(sql: str, fmt: str, limit: int): + """Run query against server DuckDB via API.""" + from cli.client import api_post + + resp = api_post("/api/query", json={"sql": sql, "limit": limit}) + if resp.status_code != 200: + typer.echo(f"Query failed: {resp.json().get('detail', resp.text)}", err=True) + raise typer.Exit(1) + + data = resp.json() + _output(data["columns"], data["rows"], fmt) + if data.get("truncated"): + typer.echo(f"(truncated at {limit} rows)", err=True) + + +def _output(columns: list, rows: list, fmt: str): + if fmt == "json": + output = [dict(zip(columns, row)) for row in rows] + typer.echo(json.dumps(output, indent=2, default=str)) + elif fmt == "csv": + typer.echo(",".join(columns)) + for row in rows: + typer.echo(",".join(str(v) if v is not None else "" for v in row)) + else: + # Table format using rich + from rich.console import Console + from rich.table import Table + console = Console() + table = Table() + for col in columns: + table.add_column(col) + for row in rows: + table.add_row(*(str(v) if v is not None else "" for v in row)) + console.print(table) diff --git a/cli/commands/skills.py b/cli/commands/skills.py new file mode 100644 index 0000000..c3a4e62 --- /dev/null +++ b/cli/commands/skills.py @@ -0,0 +1,32 @@ +"""Skills command — da skills. Knowledge base for AI agents.""" + +from pathlib import Path + +import typer + +skills_app = typer.Typer(help="Built-in knowledge base for AI agents") + +SKILLS_DIR = Path(__file__).parent.parent / "skills" + + +@skills_app.command("list") +def list_skills(): + """List available skills.""" + if not SKILLS_DIR.exists(): + typer.echo("No skills directory found.") + return + for f in sorted(SKILLS_DIR.glob("*.md")): + name = f.stem + # Read first line as description + first_line = f.read_text().split("\n")[0].strip("# ").strip() + typer.echo(f" {name:25s} {first_line}") + + +@skills_app.command("show") +def show_skill(name: str = typer.Argument(..., help="Skill name to display")): + """Display a skill's content.""" + skill_file = SKILLS_DIR / f"{name}.md" + if not skill_file.exists(): + typer.echo(f"Skill '{name}' not found. Run: da skills list", err=True) + raise typer.Exit(1) + typer.echo(skill_file.read_text()) diff --git a/cli/commands/status.py b/cli/commands/status.py new file mode 100644 index 0000000..8168c2d --- /dev/null +++ b/cli/commands/status.py @@ -0,0 +1,55 @@ +"""Status commands — da status.""" + +import json + +import typer + +from cli.client import api_get +from cli.config import get_sync_state + +status_app = typer.Typer(help="System status") + + +@status_app.callback(invoke_without_command=True) +def status( + local: bool = typer.Option(False, "--local", help="Show local-only status (no server)"), + as_json: bool = typer.Option(False, "--json", help="Output as JSON"), +): + """Show system health and sync status.""" + if local: + state = get_sync_state() + info = { + "mode": "local", + "tables_synced": len(state.get("tables", {})), + "last_sync": state.get("last_sync", "never"), + "tables": state.get("tables", {}), + } + if as_json: + typer.echo(json.dumps(info, indent=2)) + else: + typer.echo(f"Mode: offline (local data)") + typer.echo(f"Tables synced: {info['tables_synced']}") + typer.echo(f"Last sync: {info['last_sync']}") + return + + try: + resp = api_get("/api/health") + data = resp.json() + if as_json: + typer.echo(json.dumps(data, indent=2)) + else: + typer.echo(f"Status: {data.get('status', 'unknown')}") + for name, check in data.get("services", {}).items(): + s = check.get("status", "?") + detail = "" + if "tables" in check: + detail = f" ({check['tables']} tables, {check.get('total_rows', 0)} rows)" + if "count" in check: + detail = f" ({check['count']})" + if check.get("stale_tables"): + detail += f" [stale: {', '.join(check['stale_tables'])}]" + typer.echo(f" {name}: {s}{detail}") + except Exception as e: + typer.echo(f"Cannot reach server: {e}", err=True) + typer.echo("Use --local for offline status.") + raise typer.Exit(1) diff --git a/cli/commands/sync.py b/cli/commands/sync.py new file mode 100644 index 0000000..ae31674 --- /dev/null +++ b/cli/commands/sync.py @@ -0,0 +1,177 @@ +"""Sync commands — da sync.""" + +import json +import os +from pathlib import Path + +import typer +from rich.progress import Progress, SpinnerColumn, TextColumn + +from cli.client import api_get, api_post, stream_download +from cli.config import get_sync_state, save_sync_state + +sync_app = typer.Typer(help="Data synchronization") + + +def _local_data_dir() -> Path: + return Path(os.environ.get("DA_LOCAL_DIR", ".")) + + +@sync_app.callback(invoke_without_command=True) +def sync( + table: str = typer.Option(None, "--table", help="Sync specific table only"), + upload_only: bool = typer.Option(False, "--upload-only", help="Only upload sessions/artifacts"), + docs_only: bool = typer.Option(False, "--docs-only", help="Only sync documentation"), + as_json: bool = typer.Option(False, "--json", help="Output as JSON"), +): + """Sync data between server and local machine.""" + if upload_only: + _upload(as_json) + return + + with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}")) as progress: + # 1. Get manifest + task = progress.add_task("Fetching manifest...", total=None) + try: + resp = api_get("/api/sync/manifest") + resp.raise_for_status() + manifest = resp.json() + except Exception as e: + typer.echo(f"Failed to fetch manifest: {e}", err=True) + raise typer.Exit(1) + + server_tables = manifest.get("tables", {}) + local_state = get_sync_state() + local_tables = local_state.get("tables", {}) + + # 2. Determine what to download + to_download = [] + for tid, info in server_tables.items(): + if table and tid != table: + continue + if docs_only: + continue + local_hash = local_tables.get(tid, {}).get("hash", "") + if info.get("hash", "") != local_hash: + to_download.append(tid) + + progress.update(task, description=f"Found {len(to_download)} tables to sync") + + # 3. Download parquets + local_dir = _local_data_dir() + parquet_dir = local_dir / "server" / "parquet" + parquet_dir.mkdir(parents=True, exist_ok=True) + + results = {"downloaded": [], "skipped": [], "errors": []} + for tid in to_download: + progress.update(task, description=f"Downloading {tid}...") + target = parquet_dir / f"{tid}.parquet" + try: + stream_download(f"/api/data/{tid}/download", str(target)) + local_tables[tid] = { + "hash": server_tables[tid].get("hash", ""), + "rows": server_tables[tid].get("rows", 0), + "size_bytes": server_tables[tid].get("size_bytes", 0), + } + results["downloaded"].append(tid) + except Exception as e: + results["errors"].append({"table": tid, "error": str(e)}) + + # 4. Save local state + from datetime import datetime, timezone + local_state["tables"] = local_tables + local_state["last_sync"] = datetime.now(timezone.utc).isoformat() + save_sync_state(local_state) + + # 5. Rebuild DuckDB views + if results["downloaded"]: + progress.update(task, description="Rebuilding DuckDB views...") + _rebuild_duckdb_views(local_dir, parquet_dir) + + progress.update(task, description="Sync complete") + + # Output + skipped = len(server_tables) - len(to_download) + if as_json: + typer.echo(json.dumps(results, indent=2)) + else: + typer.echo(f"Downloaded: {len(results['downloaded'])} tables") + typer.echo(f"Skipped (unchanged): {skipped}") + if results["errors"]: + typer.echo(f"Errors: {len(results['errors'])}") + for err in results["errors"]: + typer.echo(f" {err['table']}: {err['error']}") + + +def _rebuild_duckdb_views(local_dir: Path, parquet_dir: Path): + """Recreate DuckDB views from downloaded parquets. Preserve user tables.""" + import duckdb + + db_path = local_dir / "user" / "duckdb" / "analytics.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = duckdb.connect(str(db_path)) + + # Get existing user-created tables (not views) + try: + existing_tables = { + row[0] for row in + conn.execute("SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE'").fetchall() + } + except Exception: + existing_tables = set() + + # Drop all views + try: + views = conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_type='VIEW'" + ).fetchall() + for (view_name,) in views: + conn.execute(f'DROP VIEW IF EXISTS "{view_name}"') + except Exception: + pass + + # Create views for each parquet file + for pq_file in parquet_dir.rglob("*.parquet"): + view_name = pq_file.stem + if view_name in existing_tables: + continue # don't shadow user tables + abs_path = str(pq_file.resolve()) + conn.execute(f"CREATE VIEW \"{view_name}\" AS SELECT * FROM read_parquet('{abs_path}')") + + conn.close() + + +def _upload(as_json: bool): + """Upload sessions and CLAUDE.local.md to server.""" + local_dir = _local_data_dir() + results = {"sessions": 0, "local_md": False} + + # Upload sessions + sessions_dir = local_dir / "user" / "sessions" + if sessions_dir.exists(): + for f in sessions_dir.glob("*.jsonl"): + try: + with open(f, "rb") as fh: + resp = api_post("/api/upload/sessions", files={"file": (f.name, fh)}) + if resp.status_code == 200: + results["sessions"] += 1 + except Exception: + pass + + # Upload CLAUDE.local.md + local_md = local_dir / ".claude" / "CLAUDE.local.md" + if local_md.exists(): + content = local_md.read_text(encoding="utf-8") + try: + resp = api_post("/api/upload/local-md", json={"content": content}) + if resp.status_code == 200: + results["local_md"] = True + except Exception: + pass + + if as_json: + typer.echo(json.dumps(results, indent=2)) + else: + typer.echo(f"Uploaded {results['sessions']} sessions") + if results["local_md"]: + typer.echo("Uploaded CLAUDE.local.md") diff --git a/cli/config.py b/cli/config.py new file mode 100644 index 0000000..e75961a --- /dev/null +++ b/cli/config.py @@ -0,0 +1,60 @@ +"""CLI configuration — token storage, server URL, sync state.""" + +import json +import os +from pathlib import Path +from typing import Optional + + +def _config_dir() -> Path: + d = Path(os.environ.get("DA_CONFIG_DIR", os.path.expanduser("~/.config/da"))) + d.mkdir(parents=True, exist_ok=True) + return d + + +def get_server_url() -> str: + config = load_config() + return os.environ.get("DA_SERVER", config.get("server", "http://localhost:8000")) + + +def get_token() -> Optional[str]: + token_file = _config_dir() / "token.json" + if token_file.exists(): + data = json.loads(token_file.read_text()) + return data.get("access_token") + return os.environ.get("DA_TOKEN") + + +def save_token(token: str, email: str, role: str): + token_file = _config_dir() / "token.json" + token_file.write_text(json.dumps({ + "access_token": token, + "email": email, + "role": role, + }, indent=2)) + + +def clear_token(): + token_file = _config_dir() / "token.json" + if token_file.exists(): + token_file.unlink() + + +def load_config() -> dict: + config_file = _config_dir() / "config.yaml" + if config_file.exists(): + import yaml + return yaml.safe_load(config_file.read_text()) or {} + return {} + + +def get_sync_state() -> dict: + state_file = _config_dir() / "sync_state.json" + if state_file.exists(): + return json.loads(state_file.read_text()) + return {} + + +def save_sync_state(state: dict): + state_file = _config_dir() / "sync_state.json" + state_file.write_text(json.dumps(state, indent=2)) diff --git a/cli/main.py b/cli/main.py new file mode 100644 index 0000000..024d31a --- /dev/null +++ b/cli/main.py @@ -0,0 +1,33 @@ +"""da — CLI tool for AI Data Analyst. + +Primary interface for AI agents. Install: uv tool install data-analyst +""" + +import typer + +from cli.commands.auth import auth_app +from cli.commands.sync import sync_app +from cli.commands.query import query_command +from cli.commands.status import status_app +from cli.commands.admin import admin_app +from cli.commands.diagnose import diagnose_app +from cli.commands.skills import skills_app + +app = typer.Typer( + name="da", + help="AI Data Analyst CLI — data sync, queries, and admin for AI agents", + no_args_is_help=True, +) + +# Register subcommands +app.add_typer(auth_app, name="auth") +app.add_typer(sync_app, name="sync") +app.command("query")(query_command) +app.add_typer(status_app, name="status") +app.add_typer(admin_app, name="admin") +app.add_typer(diagnose_app, name="diagnose") +app.add_typer(skills_app, name="skills") + + +if __name__ == "__main__": + app() diff --git a/cli/skills/setup.md b/cli/skills/setup.md new file mode 100644 index 0000000..8f79eb4 --- /dev/null +++ b/cli/skills/setup.md @@ -0,0 +1,58 @@ +# Setup — Complete guide for deploying a new instance + +## Prerequisites +- Docker and Docker Compose installed +- Domain name pointing to server IP (for SSL) +- Data source credentials (Keboola token OR BigQuery service account) + +## Steps + +1. Clone the repository: + ```bash + git clone + cd data-analyst + ``` + +2. Create configuration: + ```bash + cp config/instance.yaml.example config/instance.yaml + # Edit instance.yaml with your settings + ``` + +3. Create environment file: + ```bash + cp config/.env.template .env + # Fill in: JWT_SECRET_KEY, KEBOOLA_STORAGE_TOKEN (or BIGQUERY_PROJECT), etc. + ``` + +4. Start services: + ```bash + docker compose up -d + ``` + +5. Verify health: + ```bash + da status --server http://your-server:8000 + ``` + +6. Create first admin user: + ```bash + da login --email admin@company.com --server http://your-server:8000 + da admin add-user admin@company.com --role admin + ``` + +7. Trigger initial data sync: + ```bash + da admin trigger-sync + ``` + +8. Verify data: + ```bash + da status + ``` + +## Troubleshooting + +- **Cannot connect:** Check `docker compose ps`, verify port 8000 is exposed +- **Auth fails:** Verify JWT_SECRET_KEY is set in .env +- **No data:** Check data source credentials, run `da diagnose` diff --git a/cli/skills/troubleshoot.md b/cli/skills/troubleshoot.md new file mode 100644 index 0000000..ec1beb0 --- /dev/null +++ b/cli/skills/troubleshoot.md @@ -0,0 +1,34 @@ +# Troubleshoot — Diagnostic procedures + +## Quick Check +```bash +da diagnose --json +``` + +## Common Issues + +### Data not updating +1. `da diagnose --component data` — check data freshness +2. `da server logs scheduler --since 1h` — check scheduler logs +3. Verify data source credentials: `da admin test-connection` + +### Cannot login +1. Check server is running: `curl http://server:8000/api/health` +2. Check user exists: `da admin list-users` (from admin account) +3. Re-generate token: `da login --email your@email.com` + +### DuckDB errors locally +1. Re-sync: `da sync` (rebuilds views) +2. Check disk space: `du -sh user/duckdb/` +3. Delete and re-create: `rm user/duckdb/analytics.duckdb && da sync` + +### Server unresponsive +1. `docker compose ps` — check container status +2. `docker compose logs app --tail 50` — check app logs +3. `docker compose restart app` — restart app + +## Escalation +If automated diagnostics don't help: +1. Collect full diagnostic: `da diagnose --json > /tmp/diag.json` +2. Collect server logs: `docker compose logs > /tmp/logs.txt` +3. Share both files with admin diff --git a/docker-compose.test.yml b/docker-compose.test.yml new file mode 100644 index 0000000..c66f97b --- /dev/null +++ b/docker-compose.test.yml @@ -0,0 +1,31 @@ +services: + app: + build: . + command: uvicorn app.main:app --host 0.0.0.0 --port 8000 + environment: + - DATA_DIR=/data + - JWT_SECRET_KEY=test-secret-for-ci + - TESTING=true + volumes: + - test-data:/data + healthcheck: + test: ["CMD", "python", "-c", "import httpx; r=httpx.get('http://localhost:8000/api/health'); exit(0 if r.status_code==200 else 1)"] + interval: 5s + timeout: 3s + retries: 10 + + test-runner: + build: . + command: python -m pytest tests/test_db.py tests/test_repositories.py tests/test_migration.py tests/test_api.py -v + environment: + - DATA_DIR=/data + - JWT_SECRET_KEY=test-secret-for-ci + - API_URL=http://app:8000 + volumes: + - test-data:/data + depends_on: + app: + condition: service_healthy + +volumes: + test-data: diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ff2a074 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,48 @@ +services: + app: + build: . + command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload + ports: + - "8000:8000" + volumes: + - .:/app + - data:/data + env_file: .env + environment: + - DATA_DIR=/data + healthcheck: + test: ["CMD", "python", "-c", "import httpx; r=httpx.get('http://localhost:8000/api/health'); exit(0 if r.status_code==200 else 1)"] + interval: 30s + timeout: 5s + retries: 3 + + scheduler: + build: . + command: python -m services.scheduler + volumes: + - data:/data + env_file: .env + environment: + - DATA_DIR=/data + - API_URL=http://app:8000 + depends_on: + app: + condition: service_healthy + restart: unless-stopped + + telegram-bot: + build: . + command: python -m services.telegram_bot + volumes: + - data:/data + env_file: .env + environment: + - DATA_DIR=/data + depends_on: + - app + profiles: + - full + restart: unless-stopped + +volumes: + data: diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ff8bd3a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,43 @@ +[project] +name = "data-analyst" +version = "2.0.0" +description = "AI Data Analyst — data distribution platform for AI analytical systems" +requires-python = ">=3.9" +license = "MIT" + +dependencies = [ + "duckdb>=0.9.0", + "fastapi>=0.115.0", + "uvicorn[standard]>=0.32.0", + "python-multipart>=0.0.9", + "jinja2>=3.1.0", + "PyJWT>=2.8.0", + "httpx>=0.27.0", + "typer>=0.12.0", + "rich>=13.0.0", + "python-dotenv>=1.0.0", + "pyyaml>=6.0", +] + +[project.scripts] +da = "cli.main:app" + +[project.optional-dependencies] +connectors = [ + "kbcstorage>=0.9.0", + "google-cloud-bigquery>=3.0.0", + "google-cloud-bigquery-storage>=2.0.0", + "pandas>=2.0.0", + "pyarrow>=12.0.0", +] +telegram = [ + "aiohttp>=3.9.0", +] +dev = [ + "pytest>=7.0.0", + "pytest-mock>=3.0.0", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/services/scheduler/__init__.py b/services/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py new file mode 100644 index 0000000..800b866 --- /dev/null +++ b/services/scheduler/__main__.py @@ -0,0 +1,86 @@ +"""Scheduler service — replaces systemd timers. + +Lightweight sidecar that triggers jobs by calling the main app's API. +Keeps all business logic in the main app. + +Usage: python -m services.scheduler +""" + +import logging +import os +import signal +import sys +import time +from datetime import datetime, timezone + +import httpx + +logging.basicConfig( + level=os.environ.get("LOG_LEVEL", "INFO").upper(), + format="%(asctime)s %(levelname)s [scheduler] %(message)s", +) +logger = logging.getLogger(__name__) + +API_URL = os.environ.get("API_URL", "http://localhost:8000") +SCHEDULER_API_TOKEN = os.environ.get("SCHEDULER_API_TOKEN", "") + +# Schedule definitions: (name, interval_seconds, api_endpoint, http_method) +JOBS = [ + ("data-refresh", 15 * 60, "/api/sync/trigger", "POST"), + ("health-check", 5 * 60, "/api/health", "GET"), +] + +_running = True + + +def _signal_handler(sig, frame): + global _running + logger.info(f"Received signal {sig}, shutting down...") + _running = False + + +def _call_api(endpoint: str, method: str = "POST") -> bool: + """Call the main app API. Returns True on success.""" + url = f"{API_URL}{endpoint}" + headers = {} + if SCHEDULER_API_TOKEN: + headers["Authorization"] = f"Bearer {SCHEDULER_API_TOKEN}" + try: + if method == "POST": + resp = httpx.post(url, headers=headers, timeout=120) + else: + resp = httpx.get(url, headers=headers, timeout=30) + if resp.status_code < 400: + logger.info(f"Job {endpoint}: {resp.status_code}") + return True + else: + logger.warning(f"Job {endpoint}: HTTP {resp.status_code} - {resp.text[:200]}") + return False + except Exception as e: + logger.error(f"Job {endpoint} failed: {e}") + return False + + +def run(): + signal.signal(signal.SIGTERM, _signal_handler) + signal.signal(signal.SIGINT, _signal_handler) + + logger.info(f"Scheduler started. API_URL={API_URL}, {len(JOBS)} jobs configured.") + + # Track last run time per job + last_run = {name: 0.0 for name, _, _, _ in JOBS} + + while _running: + now = time.time() + for name, interval, endpoint, method in JOBS: + if now - last_run[name] >= interval: + logger.info(f"Running job: {name}") + _call_api(endpoint, method) + last_run[name] = now + time.sleep(10) # check every 10 seconds + + logger.info("Scheduler stopped.") + + +if __name__ == "__main__": + run() diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..84c5b11 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,152 @@ +"""Tests for CLI commands.""" + +import json +import os +import pytest +from unittest.mock import patch, MagicMock + +from typer.testing import CliRunner +from cli.main import app + +runner = CliRunner() + + +@pytest.fixture(autouse=True) +def tmp_config(tmp_path, monkeypatch): + monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config")) + monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local")) + monkeypatch.setenv("DATA_DIR", str(tmp_path / "data")) + monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-for-cli-tests") + (tmp_path / "config").mkdir() + (tmp_path / "local").mkdir() + (tmp_path / "data").mkdir() + yield tmp_path + + +class TestCLIHelp: + def test_main_help(self): + result = runner.invoke(app, ["--help"]) + assert result.exit_code == 0 + assert "AI Data Analyst CLI" in result.output + + def test_auth_help(self): + result = runner.invoke(app, ["auth", "--help"]) + assert result.exit_code == 0 + assert "login" in result.output + + def test_sync_help(self): + result = runner.invoke(app, ["sync", "--help"]) + assert result.exit_code == 0 + + def test_query_help(self): + result = runner.invoke(app, ["query", "--help"]) + assert result.exit_code == 0 + + def test_admin_help(self): + result = runner.invoke(app, ["admin", "--help"]) + assert result.exit_code == 0 + + def test_diagnose_help(self): + result = runner.invoke(app, ["diagnose", "--help"]) + assert result.exit_code == 0 + + def test_skills_help(self): + result = runner.invoke(app, ["skills", "--help"]) + assert result.exit_code == 0 + + +class TestSkills: + def test_list_skills(self): + result = runner.invoke(app, ["skills", "list"]) + assert result.exit_code == 0 + assert "setup" in result.output + assert "troubleshoot" in result.output + + def test_show_skill(self): + result = runner.invoke(app, ["skills", "show", "setup"]) + assert result.exit_code == 0 + assert "Prerequisites" in result.output + + def test_show_nonexistent_skill(self): + result = runner.invoke(app, ["skills", "show", "nonexistent"]) + assert result.exit_code == 1 + + +class TestAuth: + def test_whoami_not_logged_in(self): + result = runner.invoke(app, ["auth", "whoami"]) + assert result.exit_code == 1 + assert "Not logged in" in result.output + + def test_logout(self): + result = runner.invoke(app, ["auth", "logout"]) + assert result.exit_code == 0 + assert "Logged out" in result.output + + def test_login_with_mock_server(self, tmp_config): + """Test login against a real FastAPI test server.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + + conn = get_system_db() + repo = UserRepository(conn) + repo.create(id="u1", email="test@acme.com", name="Test", role="analyst") + conn.close() + + from fastapi.testclient import TestClient + from app.main import create_app + test_app = create_app() + + with patch("cli.client.get_client") as mock_get_client: + client = TestClient(test_app) + mock_get_client.return_value.__enter__ = MagicMock(return_value=client) + mock_get_client.return_value.__exit__ = MagicMock(return_value=False) + + # Simulate the API call + resp = client.post("/auth/token", json={"email": "test@acme.com"}) + assert resp.status_code == 200 + token = resp.json()["access_token"] + + # Save token manually (since we can't easily mock typer prompts) + from cli.config import save_token + save_token(token, "test@acme.com", "analyst") + + # Now whoami should work + result = runner.invoke(app, ["auth", "whoami"]) + assert result.exit_code == 0 + assert "test@acme.com" in result.output + + +class TestStatus: + def test_local_status_empty(self): + result = runner.invoke(app, ["status", "--local"]) + assert result.exit_code == 0 + assert "Tables synced: 0" in result.output + + def test_local_status_json(self): + result = runner.invoke(app, ["status", "--local", "--json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data["mode"] == "local" + + +class TestQuery: + def test_query_no_db(self, tmp_config): + result = runner.invoke(app, ["query", "SELECT 1"]) + assert result.exit_code == 1 + assert "not found" in result.output + + def test_query_with_db(self, tmp_config): + import duckdb + local_dir = tmp_config / "local" + db_dir = local_dir / "user" / "duckdb" + db_dir.mkdir(parents=True) + conn = duckdb.connect(str(db_dir / "analytics.duckdb")) + conn.execute("CREATE TABLE test_table (id INT, name VARCHAR)") + conn.execute("INSERT INTO test_table VALUES (1, 'hello'), (2, 'world')") + conn.close() + + result = runner.invoke(app, ["query", "SELECT count(*) as cnt FROM test_table", "--format", "json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data[0]["cnt"] == 2