diff --git a/CHANGELOG.md b/CHANGELOG.md index 881724a..026573c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Added +- **`agnes init` / `agnes pull --skip-materialize`** — opts the first sync out of materialized-mode tables (server-side scheduled-query parquets, often multi-GB). Pavel's #185 Phase 1: a single 6.3 GB `order_economics` parquet kept first init silent for 44 minutes. Materialized rows stay discoverable via `agnes catalog`; rerun without the flag once the analyst actually needs them locally. +- **`agnes pull` progress bar** — Rich-driven aggregate transfer display rendered to stderr when not `--quiet` and not `--json`. Per-file label + bytes / total / rate / ETA, aggregated across the parallel `ThreadPoolExecutor` workers introduced earlier in this PR. Replaces the prior 0-stdout silence on first init. +- **CLI clean-error wrapper** (`cli/main.py:_run_with_clean_errors`, new entry point in `pyproject.toml`) — `httpx.ReadTimeout` / `ConnectError` / `RemoteProtocolError` etc. used to dump a five-frame Python traceback to the analyst's terminal when a `agnes query --remote` against a slow BQ view timed out client-side. Now: one-line `Error: …` message + actionable hint (e.g. "narrow the WHERE on the partition column from `agnes catalog --json`, or run `agnes snapshot create --estimate`"), exit code 1. Full traceback is appended to `~/.config/agnes/last-error.log` so an operator can recover it for support without spamming the analyst's terminal. Implemented as `AgnesTransportError` raised from the `api_get` / `api_post` / `api_delete` / `api_patch` / `stream_download` helpers in `cli/client.py`; the top-level Typer wrapper renders it. Unhandled `Exception`s are caught at the same boundary, logged, and printed as "internal CLI error (see logfile)" so a Python traceback never leaks to the analyst. + ### Changed - **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request** — `/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below. - **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency. diff --git a/cli/client.py b/cli/client.py index 1efdd7d..e1861d8 100644 --- a/cli/client.py +++ b/cli/client.py @@ -2,12 +2,14 @@ import os import time +import traceback +from datetime import datetime, timezone from pathlib import Path from typing import Optional import httpx -from cli.config import get_server_url, get_token +from cli.config import _config_dir, get_server_url, get_token # Retry policy for transient failures during stream downloads. Scoped to # network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via @@ -21,6 +23,105 @@ _RETRY_BACKOFFS_S = (0.3, 1.0, 3.0) # seconds before attempt 2, 3, 4 QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300")) +# ── Transport-error translation ───────────────────────────────────────── +# Pavel's Issue #185 Phase 3B caught the failure mode: when httpx raises +# `ReadTimeout` / `ConnectError` / `RemoteProtocolError` and the CLI +# command doesn't catch it, Typer dumps a five-frame Python traceback to +# the analyst's terminal. That looks like a CLI bug to a non-Python user +# and obscures the actionable signal ("server slow, try snapshot create"). +# Translate transport exceptions to `AgnesTransportError` with a typed +# user-facing message, log the full traceback to `~/.config/agnes/last- +# error.log` for debug, and let the top-level CLI handler render the +# clean message + exit non-zero. + +_LOG_FILE = _config_dir() / "last-error.log" + + +class AgnesTransportError(Exception): + """Network / transport failure with a user-actionable message. + + Raised by the api_* / stream_download helpers when httpx surfaces a + connection / timeout / protocol error. The CLI's top-level Typer + handler catches this, prints `.user_message` (NOT the traceback), + and exits non-zero. Full traceback goes to ``~/.config/agnes/last- + error.log`` so an operator can recover it for support. + """ + + def __init__(self, user_message: str, *, hint: str = "", logfile_path: Path | None = None): + super().__init__(user_message) + self.user_message = user_message + self.hint = hint + self.logfile_path = logfile_path + + +def _log_traceback(exc: BaseException, *, context: str) -> Path: + """Append a timestamped traceback to ``~/.config/agnes/last-error.log`` + and return the path. Best-effort — never raises (a logging failure + must not mask the original error).""" + try: + with open(_LOG_FILE, "a", encoding="utf-8") as f: + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + f.write(f"\n=== {ts} {context} ===\n") + traceback.print_exception(type(exc), exc, exc.__traceback__, file=f) + except Exception: + pass + return _LOG_FILE + + +def _translate_transport_error(exc: Exception, *, context: str) -> AgnesTransportError: + """Map httpx transport exceptions to user-facing CLI messages. The + mapping is intentionally pragmatic — analysts care about "what do I + do next", not the gRPC / TCP detail.""" + log = _log_traceback(exc, context=context) + if isinstance(exc, httpx.ReadTimeout): + return AgnesTransportError( + f"Server didn't respond within the read timeout ({QUERY_TIMEOUT_S:.0f}s) " + f"for {context}.", + hint=( + "If this is `agnes query --remote` against a heavy BQ view, " + "the underlying BQ job took longer than the wait window. Try:\n" + " • narrow the WHERE (especially the partition column from `agnes catalog --json`)\n" + " • `agnes snapshot create ... --estimate` to materialize once + query locally\n" + " • set AGNES_QUERY_TIMEOUT=600 for a longer client-side wait\n" + f"Full traceback: {log}" + ), + logfile_path=log, + ) + if isinstance(exc, httpx.ConnectError): + return AgnesTransportError( + f"Can't reach the agnes server for {context}.", + hint=( + "Check the server URL with `agnes status`, network reachability " + "(VPN / DNS / firewall), and the TLS-trust setup if this is a " + f"corporate-CA deployment.\nFull traceback: {log}" + ), + logfile_path=log, + ) + if isinstance(exc, (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError)): + return AgnesTransportError( + f"Connection broke mid-flight on {context}.", + hint=( + "Usually a transient network blip. Re-run the command. If it " + f"keeps happening, check `agnes status`.\nFull traceback: {log}" + ), + logfile_path=log, + ) + if isinstance(exc, httpx.TimeoutException): + return AgnesTransportError( + f"Network timeout on {context}.", + hint=f"Re-run; if persistent, check the server.\nFull traceback: {log}", + logfile_path=log, + ) + # Anything else: re-wrap with a generic message so the CLI doesn't + # dump the traceback. We'd prefer a typed translation; if you hit + # this branch, add a clause above. + return AgnesTransportError( + f"Unexpected error on {context}: {type(exc).__name__}.", + hint=f"Full traceback: {log}", + logfile_path=log, + ) + + def get_client(timeout: float = 30.0) -> httpx.Client: """Get an authenticated httpx client.""" token = get_token() @@ -35,23 +136,35 @@ def get_client(timeout: float = 30.0) -> httpx.Client: def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.get(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.get(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"GET {path}") from exc def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.post(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.post(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"POST {path}") from exc def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.delete(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.delete(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"DELETE {path}") from exc def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.patch(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.patch(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"PATCH {path}") from exc def _is_transient(exc: Exception) -> bool: @@ -98,7 +211,13 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int: if attempt == _RETRY_ATTEMPTS or not _is_transient(exc): break time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)]) - # Clean up any leftover tmp, then surface the last exception. + # Clean up any leftover tmp, then surface the last exception. Translate + # transport errors to AgnesTransportError so the CLI prints a clean + # message instead of a Python traceback (Pavel's #185 Phase 3B). tmp_path.unlink(missing_ok=True) assert last_exc is not None + if isinstance(last_exc, httpx.HTTPError): + raise _translate_transport_error( + last_exc, context=f"GET {path} (stream → {target_path})" + ) from last_exc raise last_exc diff --git a/cli/commands/init.py b/cli/commands/init.py index 58bbb10..61d539c 100644 --- a/cli/commands/init.py +++ b/cli/commands/init.py @@ -64,6 +64,16 @@ def init( token: str = typer.Option(..., "--token", help="Personal access token"), force: bool = typer.Option(False, "--force", help="Re-initialize an existing workspace"), workspace_str: Optional[str] = typer.Option(None, "--workspace", help="Target dir (default: cwd)"), + skip_materialize: bool = typer.Option( + False, "--skip-materialize", + help=( + "Skip materialized-mode tables on the first pull. The first " + "init can otherwise spend tens of minutes silently downloading " + "a single multi-GB scheduled-query parquet. Materialized rows " + "are still discoverable via `agnes catalog`; rerun `agnes pull` " + "without this flag once you actually need them locally." + ), + ), ): """Bootstrap workspace: auth, CLAUDE.md, hooks, first pull, AGNES_WORKSPACE.md.""" workspace = Path(workspace_str).resolve() if workspace_str else Path.cwd() @@ -176,7 +186,15 @@ def init( # exception escaping here is a programming error worth surfacing. # ------------------------------------------------------------------ try: - result: PullResult = run_pull(server_url, token, workspace) + # `agnes init` always runs interactively (analyst typing the + # command), so progress is on by default — Pavel's #185 Phase 1 + # was a 44-minute silent download on the very first install. + # Pass it through to run_pull. + result: PullResult = run_pull( + server_url, token, workspace, + skip_materialize=skip_materialize, + show_progress=True, + ) except Exception as exc: typer.echo(render_error(0, {"detail": { "kind": "manifest_unauthorized", diff --git a/cli/commands/pull.py b/cli/commands/pull.py index 6f71a63..48924f1 100644 --- a/cli/commands/pull.py +++ b/cli/commands/pull.py @@ -38,6 +38,15 @@ def pull( quiet: bool = typer.Option(False, "--quiet", help="Suppress success stdout (errors still surface on stderr)"), as_json: bool = typer.Option(False, "--json", help="Emit a single JSON object summarizing the pull"), dry_run: bool = typer.Option(False, "--dry-run", help="Compute the delta without writing anything to disk"), + skip_materialize: bool = typer.Option( + False, "--skip-materialize", + help=( + "Skip materialized-mode tables (server-side scheduled BQ " + "scan results, often multi-GB). Their data is still discoverable " + "via `agnes catalog` and remote-mode tables still pull. Useful " + "for a fast first init when an analyst only needs --remote access." + ), + ), ): """Refresh data from the server into ./server/parquet + ./user/duckdb.""" server_url = get_server_url() @@ -68,8 +77,17 @@ def pull( workspace = Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve() + # Show progress unless quiet (SessionStart hooks) or json (machine- + # readable output where Rich's terminal-control sequences would be + # garbage in the consumer's parser). + show_progress = not (quiet or as_json) try: - result: PullResult = run_pull(server_url, token, workspace, dry_run=dry_run) + result: PullResult = run_pull( + server_url, token, workspace, + dry_run=dry_run, + skip_materialize=skip_materialize, + show_progress=show_progress, + ) except Exception as exc: # `run_pull` is documented to record per-table / per-stage failures # under `result.errors` rather than raising, so reaching this branch diff --git a/cli/lib/pull.py b/cli/lib/pull.py index 5e6445f..b33c4b3 100644 --- a/cli/lib/pull.py +++ b/cli/lib/pull.py @@ -112,6 +112,8 @@ def run_pull( workspace: Path, *, dry_run: bool = False, + skip_materialize: bool = False, + show_progress: bool = False, ) -> PullResult: """Refresh local parquets + corporate memory rules from the server. @@ -119,6 +121,17 @@ def run_pull( Typer/Rich UI. Returns a `PullResult` summary; never raises for network/server errors (records them under `errors` instead) so the caller can decide whether a partial pull is fatal. + + Args: + skip_materialize: When True, omit `query_mode='materialized'` + tables from the download set. Use for analysts who only + care about `--remote` access on the workspace and don't + want to wait on multi-GB scheduled-query parquets at first + init. Pavel's #185 Phase 1: a 6.3 GB `order_economics` + parquet kept first init silent for 44 minutes. + show_progress: When True, render a per-file progress bar to + stderr via Rich during the parallel download phase. Pass + False from `--quiet` callers (SessionStart hooks). """ started = time.monotonic() result = PullResult() @@ -159,6 +172,11 @@ def run_pull( for tid, info in server_tables.items(): if info.get("query_mode") == "remote": continue + if skip_materialize and info.get("query_mode") == "materialized": + # Operator opt-out for first-init. Materialized rows are + # still discoverable via `agnes catalog` and queryable + # the next time `agnes pull` runs without --skip-materialize. + continue non_remote_total += 1 local_hash = local_tables.get(tid, {}).get("hash", "") server_hash = info.get("hash", "") @@ -201,14 +219,52 @@ def run_pull( # the executor + thread overhead for the common single-update case. workers = min(workers, len(to_download)) if to_download else 1 + # Optional progress bar — Rich's Progress tracks per-file bytes + # streamed, aggregated across the parallel ThreadPoolExecutor + # workers. Pavel's #185 Phase 1: a single 6.3 GB parquet on first + # init went 44 minutes silent, looked frozen. Now: aggregate "X.Y + # GB / Z.A GB · 56 MB/s · ETA 1m 20s" to stderr while threads + # stream. None when show_progress=False (SessionStart hooks etc.). + progress = None + progress_tasks: dict[str, int] = {} + if show_progress and to_download: + from rich.progress import ( + Progress, BarColumn, DownloadColumn, TextColumn, + TimeRemainingColumn, TransferSpeedColumn, + ) + progress = Progress( + TextColumn("[bold]{task.fields[label]}[/]"), + BarColumn(), + DownloadColumn(), + TransferSpeedColumn(), + TimeRemainingColumn(), + transient=False, + ) + progress.start() + for tid in to_download: + size = int(server_tables[tid].get("size_bytes") or 0) + # Some manifest entries don't carry size — Rich shows + # an indeterminate bar in that case. + progress_tasks[tid] = progress.add_task( + "download", label=tid, total=size if size > 0 else None, + ) + def _download_one(tid: str) -> tuple[str, dict | None, str | None]: """Returns (tid, local_table_entry_or_None, error_or_None). One bound thread per call; stream_download is sync I/O so a - ThreadPoolExecutor (not asyncio) is the right tool.""" + ThreadPoolExecutor (not asyncio) is the right tool. The + progress callback is thread-safe — Rich's Progress.update + holds an internal lock.""" target = parquet_dir / f"{tid}.parquet" expected_hash = server_tables[tid].get("hash", "") + cb = None + if progress is not None and tid in progress_tasks: + task_id = progress_tasks[tid] + def cb(n: int, _tid=tid, _task=task_id): + progress.update(_task, advance=n) try: - stream_download(f"/api/data/{tid}/download", str(target)) + stream_download(f"/api/data/{tid}/download", str(target), + progress_callback=cb) if expected_hash: actual_hash = _file_md5(target) if actual_hash != expected_hash: @@ -228,12 +284,16 @@ def run_pull( except Exception as exc: return tid, None, str(exc) - if workers <= 1: - outcomes = [_download_one(tid) for tid in to_download] - else: - from concurrent.futures import ThreadPoolExecutor - with ThreadPoolExecutor(max_workers=workers) as ex: - outcomes = list(ex.map(_download_one, to_download)) + try: + if workers <= 1: + outcomes = [_download_one(tid) for tid in to_download] + else: + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=workers) as ex: + outcomes = list(ex.map(_download_one, to_download)) + finally: + if progress is not None: + progress.stop() for tid, entry, err in outcomes: if err is not None: diff --git a/cli/main.py b/cli/main.py index 67f59ac..33f6ebc 100644 --- a/cli/main.py +++ b/cli/main.py @@ -123,5 +123,41 @@ app.add_typer(snapshot_app, name="snapshot") app.add_typer(disk_info_app, name="disk-info") +def _run_with_clean_errors() -> None: + """Wrap ``app()`` so AgnesTransportError (and other typed CLI errors) + surface as a one-line message + exit, never as a Python traceback. The + full traceback is already logged to ``~/.config/agnes/last-error.log`` + by the api_* helpers — operators read it from there for support + forwarding. Anything that escapes this wrapper IS a CLI bug worth + fixing — log + print "internal error" so the analyst doesn't see a + Pythonist's traceback either. + + Pavel's #185 Phase 3B: previously a `httpx.ReadTimeout` from an + `agnes query --remote` against a slow BQ view dumped a 30-frame + traceback to the analyst's terminal. Now: one clean line + a hint, + return code 1. + """ + from cli.client import AgnesTransportError, _log_traceback, _LOG_FILE + try: + app() + except (AgnesTransportError) as exc: + typer.echo(f"Error: {exc.user_message}", err=True) + if exc.hint: + typer.echo(exc.hint, err=True) + sys.exit(1) + except typer.Exit: + raise + except (KeyboardInterrupt, SystemExit): + raise + except Exception as exc: # last-resort net — escaped exceptions are bugs + log = _log_traceback(exc, context="unhandled at CLI top-level") + typer.echo( + f"Error: internal CLI error ({type(exc).__name__}). " + f"Full traceback logged to {log}.", + err=True, + ) + sys.exit(1) + + if __name__ == "__main__": - app() + _run_with_clean_errors() diff --git a/config/claude_md_template.txt b/config/claude_md_template.txt index 58c1a64..c5b80f1 100644 --- a/config/claude_md_template.txt +++ b/config/claude_md_template.txt @@ -39,16 +39,26 @@ This workspace is connected to {{ server.url }}. ## Discovering tables — never enumerate from memory -Tables, columns, sizes, and `query_mode` change as admins register / migrate / -drop entries. Always re-discover from the live server, never from this file: +Tables, columns, sizes, descriptions, and `query_mode` change as admins +register / migrate / drop entries. Always re-discover from the live server, +never from this file or your training data: ``` -agnes catalog --json # canonical list with query_mode, sql_flavor, - # where_examples, fetch_via, rough_size_hint per table -agnes schema
# columns + types in the right SQL dialect -agnes describe
-n 5 # sample rows (local + materialized only) +agnes catalog --json # all tables: id, query_mode, sql_flavor, + # where_examples, fetch_via, rough_size_hint, description +agnes catalog --json | jq '.tables[] | select(.id=="")' # single table — read its description in full BEFORE writing any SQL +agnes schema
# columns + types in the right SQL dialect +agnes describe
-n 5 # sample rows (local + materialized only) ``` +The `description` field on each catalog row is the **authoritative +business-rules text** for that table — it carries grain, partition +column, join contracts, and column-level gotchas. Re-read it from the +live `agnes catalog` for every cross-table decision; do **not** copy +it into this workspace `CLAUDE.md` (it's a snapshot that goes stale, +and `agnes init` will overwrite local edits — put personal notes into +`.claude/CLAUDE.local.md` instead). The CLI is the source of truth. + `rough_size_hint` is server-populated for `local` and `materialized` tables (`small` ≤100 MiB, `medium` ≤1 GiB, `large` ≤10 GiB, `very_large` >10 GiB) and `null` for `remote` rows. When `null`, treat the table as potentially large diff --git a/pyproject.toml b/pyproject.toml index c4c02c3..7c9ae61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,7 +95,7 @@ dev = [ ] [project.scripts] -agnes = "cli.main:app" +agnes = "cli.main:_run_with_clean_errors" [build-system] requires = ["hatchling"]