feat: clean CLI errors + init progress + skip-materialize + claude.md catalog pointer

Three first-try-failure-surface fixes from Pavel's #185 trace + the
template guidance question, all under PR #188's umbrella so they land
together with the file_server / parallel pull / Tier 1 work.

1. CLI clean-error wrapper — new AgnesTransportError raised by the
   api_*/stream_download helpers when httpx times out / drops /
   refuses, plus a top-level Typer wrapper (cli/main.py) that prints
   one-line "Error: …" + actionable hint and exits non-zero. Full
   traceback goes to ~/.config/agnes/last-error.log for support
   forwarding. Unhandled Exceptions are caught at the same boundary
   so no Python traceback ever leaks to the analyst's terminal.

   Pavel's #185 Phase 3B: a 30-frame httpx traceback from a slow BQ
   --remote query made it look like a CLI bug. Now: clean message +
   hint pointing at `agnes snapshot create` / partition-column
   guidance.

   Entry point in pyproject.toml flipped from `cli.main:app` →
   `cli.main:_run_with_clean_errors` so the wrapper actually runs
   under the installed `agnes` binary.

2. agnes init / agnes pull --skip-materialize + progress bar.
   --skip-materialize omits query_mode='materialized' rows from the
   download set so a first init doesn't spend 44 minutes silently
   pulling a single 6 GB parquet (Pavel's #185 Phase 1). Rich-driven
   per-file progress bar with label/bytes/rate/ETA renders to stderr
   when not --quiet and not --json. Aggregates across the parallel
   ThreadPoolExecutor workers added earlier in this PR.

3. config/claude_md_template.txt: explicit one-line snippet pointing
   at `agnes catalog --json | jq '.tables[] | select(.id=="<id>")'`
   for per-table descriptions + restated invariant: "the description
   field on each catalog row is the authoritative business-rules
   text — re-read live, never copy into this file." Resolves the
   regression-or-feature debate between Pavel (wants annotations)
   and the user feedback that landed in the prior commit (don't
   embed table-specific content; tables change). Catalog command
   stays the source of truth.
This commit is contained in:
ZdenekSrotyr 2026-05-05 18:11:59 +02:00
parent e5fb913cec
commit 28423907fd
8 changed files with 294 additions and 28 deletions

View file

@ -10,6 +10,11 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [Unreleased]
### Added
- **`agnes init` / `agnes pull --skip-materialize`** — opts the first sync out of materialized-mode tables (server-side scheduled-query parquets, often multi-GB). Pavel's #185 Phase 1: a single 6.3 GB `order_economics` parquet kept first init silent for 44 minutes. Materialized rows stay discoverable via `agnes catalog`; rerun without the flag once the analyst actually needs them locally.
- **`agnes pull` progress bar** — Rich-driven aggregate transfer display rendered to stderr when not `--quiet` and not `--json`. Per-file label + bytes / total / rate / ETA, aggregated across the parallel `ThreadPoolExecutor` workers introduced earlier in this PR. Replaces the prior 0-stdout silence on first init.
- **CLI clean-error wrapper** (`cli/main.py:_run_with_clean_errors`, new entry point in `pyproject.toml`) — `httpx.ReadTimeout` / `ConnectError` / `RemoteProtocolError` etc. used to dump a five-frame Python traceback to the analyst's terminal when a `agnes query --remote` against a slow BQ view timed out client-side. Now: one-line `Error: …` message + actionable hint (e.g. "narrow the WHERE on the partition column from `agnes catalog --json`, or run `agnes snapshot create --estimate`"), exit code 1. Full traceback is appended to `~/.config/agnes/last-error.log` so an operator can recover it for support without spamming the analyst's terminal. Implemented as `AgnesTransportError` raised from the `api_get` / `api_post` / `api_delete` / `api_patch` / `stream_download` helpers in `cli/client.py`; the top-level Typer wrapper renders it. Unhandled `Exception`s are caught at the same boundary, logged, and printed as "internal CLI error (see logfile)" so a Python traceback never leaks to the analyst.
### Changed
- **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request**`/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below.
- **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency.

View file

@ -2,12 +2,14 @@
import os
import time
import traceback
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import httpx
from cli.config import get_server_url, get_token
from cli.config import _config_dir, get_server_url, get_token
# Retry policy for transient failures during stream downloads. Scoped to
# network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via
@ -21,6 +23,105 @@ _RETRY_BACKOFFS_S = (0.3, 1.0, 3.0) # seconds before attempt 2, 3, 4
QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300"))
# ── Transport-error translation ─────────────────────────────────────────
# Pavel's Issue #185 Phase 3B caught the failure mode: when httpx raises
# `ReadTimeout` / `ConnectError` / `RemoteProtocolError` and the CLI
# command doesn't catch it, Typer dumps a five-frame Python traceback to
# the analyst's terminal. That looks like a CLI bug to a non-Python user
# and obscures the actionable signal ("server slow, try snapshot create").
# Translate transport exceptions to `AgnesTransportError` with a typed
# user-facing message, log the full traceback to `~/.config/agnes/last-
# error.log` for debug, and let the top-level CLI handler render the
# clean message + exit non-zero.
_LOG_FILE = _config_dir() / "last-error.log"
class AgnesTransportError(Exception):
"""Network / transport failure with a user-actionable message.
Raised by the api_* / stream_download helpers when httpx surfaces a
connection / timeout / protocol error. The CLI's top-level Typer
handler catches this, prints `.user_message` (NOT the traceback),
and exits non-zero. Full traceback goes to ``~/.config/agnes/last-
error.log`` so an operator can recover it for support.
"""
def __init__(self, user_message: str, *, hint: str = "", logfile_path: Path | None = None):
super().__init__(user_message)
self.user_message = user_message
self.hint = hint
self.logfile_path = logfile_path
def _log_traceback(exc: BaseException, *, context: str) -> Path:
"""Append a timestamped traceback to ``~/.config/agnes/last-error.log``
and return the path. Best-effort never raises (a logging failure
must not mask the original error)."""
try:
with open(_LOG_FILE, "a", encoding="utf-8") as f:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
f.write(f"\n=== {ts} {context} ===\n")
traceback.print_exception(type(exc), exc, exc.__traceback__, file=f)
except Exception:
pass
return _LOG_FILE
def _translate_transport_error(exc: Exception, *, context: str) -> AgnesTransportError:
"""Map httpx transport exceptions to user-facing CLI messages. The
mapping is intentionally pragmatic analysts care about "what do I
do next", not the gRPC / TCP detail."""
log = _log_traceback(exc, context=context)
if isinstance(exc, httpx.ReadTimeout):
return AgnesTransportError(
f"Server didn't respond within the read timeout ({QUERY_TIMEOUT_S:.0f}s) "
f"for {context}.",
hint=(
"If this is `agnes query --remote` against a heavy BQ view, "
"the underlying BQ job took longer than the wait window. Try:\n"
" • narrow the WHERE (especially the partition column from `agnes catalog --json`)\n"
" • `agnes snapshot create <table> ... --estimate` to materialize once + query locally\n"
" • set AGNES_QUERY_TIMEOUT=600 for a longer client-side wait\n"
f"Full traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, httpx.ConnectError):
return AgnesTransportError(
f"Can't reach the agnes server for {context}.",
hint=(
"Check the server URL with `agnes status`, network reachability "
"(VPN / DNS / firewall), and the TLS-trust setup if this is a "
f"corporate-CA deployment.\nFull traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError)):
return AgnesTransportError(
f"Connection broke mid-flight on {context}.",
hint=(
"Usually a transient network blip. Re-run the command. If it "
f"keeps happening, check `agnes status`.\nFull traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, httpx.TimeoutException):
return AgnesTransportError(
f"Network timeout on {context}.",
hint=f"Re-run; if persistent, check the server.\nFull traceback: {log}",
logfile_path=log,
)
# Anything else: re-wrap with a generic message so the CLI doesn't
# dump the traceback. We'd prefer a typed translation; if you hit
# this branch, add a clause above.
return AgnesTransportError(
f"Unexpected error on {context}: {type(exc).__name__}.",
hint=f"Full traceback: {log}",
logfile_path=log,
)
def get_client(timeout: float = 30.0) -> httpx.Client:
"""Get an authenticated httpx client."""
token = get_token()
@ -35,23 +136,35 @@ def get_client(timeout: float = 30.0) -> httpx.Client:
def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.get(path, **kwargs)
try:
with get_client(timeout=timeout) as client:
return client.get(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"GET {path}") from exc
def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.post(path, **kwargs)
try:
with get_client(timeout=timeout) as client:
return client.post(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"POST {path}") from exc
def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.delete(path, **kwargs)
try:
with get_client(timeout=timeout) as client:
return client.delete(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"DELETE {path}") from exc
def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.patch(path, **kwargs)
try:
with get_client(timeout=timeout) as client:
return client.patch(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"PATCH {path}") from exc
def _is_transient(exc: Exception) -> bool:
@ -98,7 +211,13 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int:
if attempt == _RETRY_ATTEMPTS or not _is_transient(exc):
break
time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)])
# Clean up any leftover tmp, then surface the last exception.
# Clean up any leftover tmp, then surface the last exception. Translate
# transport errors to AgnesTransportError so the CLI prints a clean
# message instead of a Python traceback (Pavel's #185 Phase 3B).
tmp_path.unlink(missing_ok=True)
assert last_exc is not None
if isinstance(last_exc, httpx.HTTPError):
raise _translate_transport_error(
last_exc, context=f"GET {path} (stream → {target_path})"
) from last_exc
raise last_exc

View file

@ -64,6 +64,16 @@ def init(
token: str = typer.Option(..., "--token", help="Personal access token"),
force: bool = typer.Option(False, "--force", help="Re-initialize an existing workspace"),
workspace_str: Optional[str] = typer.Option(None, "--workspace", help="Target dir (default: cwd)"),
skip_materialize: bool = typer.Option(
False, "--skip-materialize",
help=(
"Skip materialized-mode tables on the first pull. The first "
"init can otherwise spend tens of minutes silently downloading "
"a single multi-GB scheduled-query parquet. Materialized rows "
"are still discoverable via `agnes catalog`; rerun `agnes pull` "
"without this flag once you actually need them locally."
),
),
):
"""Bootstrap workspace: auth, CLAUDE.md, hooks, first pull, AGNES_WORKSPACE.md."""
workspace = Path(workspace_str).resolve() if workspace_str else Path.cwd()
@ -176,7 +186,15 @@ def init(
# exception escaping here is a programming error worth surfacing.
# ------------------------------------------------------------------
try:
result: PullResult = run_pull(server_url, token, workspace)
# `agnes init` always runs interactively (analyst typing the
# command), so progress is on by default — Pavel's #185 Phase 1
# was a 44-minute silent download on the very first install.
# Pass it through to run_pull.
result: PullResult = run_pull(
server_url, token, workspace,
skip_materialize=skip_materialize,
show_progress=True,
)
except Exception as exc:
typer.echo(render_error(0, {"detail": {
"kind": "manifest_unauthorized",

View file

@ -38,6 +38,15 @@ def pull(
quiet: bool = typer.Option(False, "--quiet", help="Suppress success stdout (errors still surface on stderr)"),
as_json: bool = typer.Option(False, "--json", help="Emit a single JSON object summarizing the pull"),
dry_run: bool = typer.Option(False, "--dry-run", help="Compute the delta without writing anything to disk"),
skip_materialize: bool = typer.Option(
False, "--skip-materialize",
help=(
"Skip materialized-mode tables (server-side scheduled BQ "
"scan results, often multi-GB). Their data is still discoverable "
"via `agnes catalog` and remote-mode tables still pull. Useful "
"for a fast first init when an analyst only needs --remote access."
),
),
):
"""Refresh data from the server into ./server/parquet + ./user/duckdb."""
server_url = get_server_url()
@ -68,8 +77,17 @@ def pull(
workspace = Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve()
# Show progress unless quiet (SessionStart hooks) or json (machine-
# readable output where Rich's terminal-control sequences would be
# garbage in the consumer's parser).
show_progress = not (quiet or as_json)
try:
result: PullResult = run_pull(server_url, token, workspace, dry_run=dry_run)
result: PullResult = run_pull(
server_url, token, workspace,
dry_run=dry_run,
skip_materialize=skip_materialize,
show_progress=show_progress,
)
except Exception as exc:
# `run_pull` is documented to record per-table / per-stage failures
# under `result.errors` rather than raising, so reaching this branch

View file

@ -112,6 +112,8 @@ def run_pull(
workspace: Path,
*,
dry_run: bool = False,
skip_materialize: bool = False,
show_progress: bool = False,
) -> PullResult:
"""Refresh local parquets + corporate memory rules from the server.
@ -119,6 +121,17 @@ def run_pull(
Typer/Rich UI. Returns a `PullResult` summary; never raises for
network/server errors (records them under `errors` instead) so the
caller can decide whether a partial pull is fatal.
Args:
skip_materialize: When True, omit `query_mode='materialized'`
tables from the download set. Use for analysts who only
care about `--remote` access on the workspace and don't
want to wait on multi-GB scheduled-query parquets at first
init. Pavel's #185 Phase 1: a 6.3 GB `order_economics`
parquet kept first init silent for 44 minutes.
show_progress: When True, render a per-file progress bar to
stderr via Rich during the parallel download phase. Pass
False from `--quiet` callers (SessionStart hooks).
"""
started = time.monotonic()
result = PullResult()
@ -159,6 +172,11 @@ def run_pull(
for tid, info in server_tables.items():
if info.get("query_mode") == "remote":
continue
if skip_materialize and info.get("query_mode") == "materialized":
# Operator opt-out for first-init. Materialized rows are
# still discoverable via `agnes catalog` and queryable
# the next time `agnes pull` runs without --skip-materialize.
continue
non_remote_total += 1
local_hash = local_tables.get(tid, {}).get("hash", "")
server_hash = info.get("hash", "")
@ -201,14 +219,52 @@ def run_pull(
# the executor + thread overhead for the common single-update case.
workers = min(workers, len(to_download)) if to_download else 1
# Optional progress bar — Rich's Progress tracks per-file bytes
# streamed, aggregated across the parallel ThreadPoolExecutor
# workers. Pavel's #185 Phase 1: a single 6.3 GB parquet on first
# init went 44 minutes silent, looked frozen. Now: aggregate "X.Y
# GB / Z.A GB · 56 MB/s · ETA 1m 20s" to stderr while threads
# stream. None when show_progress=False (SessionStart hooks etc.).
progress = None
progress_tasks: dict[str, int] = {}
if show_progress and to_download:
from rich.progress import (
Progress, BarColumn, DownloadColumn, TextColumn,
TimeRemainingColumn, TransferSpeedColumn,
)
progress = Progress(
TextColumn("[bold]{task.fields[label]}[/]"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TimeRemainingColumn(),
transient=False,
)
progress.start()
for tid in to_download:
size = int(server_tables[tid].get("size_bytes") or 0)
# Some manifest entries don't carry size — Rich shows
# an indeterminate bar in that case.
progress_tasks[tid] = progress.add_task(
"download", label=tid, total=size if size > 0 else None,
)
def _download_one(tid: str) -> tuple[str, dict | None, str | None]:
"""Returns (tid, local_table_entry_or_None, error_or_None).
One bound thread per call; stream_download is sync I/O so a
ThreadPoolExecutor (not asyncio) is the right tool."""
ThreadPoolExecutor (not asyncio) is the right tool. The
progress callback is thread-safe Rich's Progress.update
holds an internal lock."""
target = parquet_dir / f"{tid}.parquet"
expected_hash = server_tables[tid].get("hash", "")
cb = None
if progress is not None and tid in progress_tasks:
task_id = progress_tasks[tid]
def cb(n: int, _tid=tid, _task=task_id):
progress.update(_task, advance=n)
try:
stream_download(f"/api/data/{tid}/download", str(target))
stream_download(f"/api/data/{tid}/download", str(target),
progress_callback=cb)
if expected_hash:
actual_hash = _file_md5(target)
if actual_hash != expected_hash:
@ -228,12 +284,16 @@ def run_pull(
except Exception as exc:
return tid, None, str(exc)
if workers <= 1:
outcomes = [_download_one(tid) for tid in to_download]
else:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as ex:
outcomes = list(ex.map(_download_one, to_download))
try:
if workers <= 1:
outcomes = [_download_one(tid) for tid in to_download]
else:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as ex:
outcomes = list(ex.map(_download_one, to_download))
finally:
if progress is not None:
progress.stop()
for tid, entry, err in outcomes:
if err is not None:

View file

@ -123,5 +123,41 @@ app.add_typer(snapshot_app, name="snapshot")
app.add_typer(disk_info_app, name="disk-info")
def _run_with_clean_errors() -> None:
"""Wrap ``app()`` so AgnesTransportError (and other typed CLI errors)
surface as a one-line message + exit, never as a Python traceback. The
full traceback is already logged to ``~/.config/agnes/last-error.log``
by the api_* helpers operators read it from there for support
forwarding. Anything that escapes this wrapper IS a CLI bug worth
fixing log + print "internal error" so the analyst doesn't see a
Pythonist's traceback either.
Pavel's #185 Phase 3B: previously a `httpx.ReadTimeout` from an
`agnes query --remote` against a slow BQ view dumped a 30-frame
traceback to the analyst's terminal. Now: one clean line + a hint,
return code 1.
"""
from cli.client import AgnesTransportError, _log_traceback, _LOG_FILE
try:
app()
except (AgnesTransportError) as exc:
typer.echo(f"Error: {exc.user_message}", err=True)
if exc.hint:
typer.echo(exc.hint, err=True)
sys.exit(1)
except typer.Exit:
raise
except (KeyboardInterrupt, SystemExit):
raise
except Exception as exc: # last-resort net — escaped exceptions are bugs
log = _log_traceback(exc, context="unhandled at CLI top-level")
typer.echo(
f"Error: internal CLI error ({type(exc).__name__}). "
f"Full traceback logged to {log}.",
err=True,
)
sys.exit(1)
if __name__ == "__main__":
app()
_run_with_clean_errors()

View file

@ -39,16 +39,26 @@ This workspace is connected to {{ server.url }}.
## Discovering tables — never enumerate from memory
Tables, columns, sizes, and `query_mode` change as admins register / migrate /
drop entries. Always re-discover from the live server, never from this file:
Tables, columns, sizes, descriptions, and `query_mode` change as admins
register / migrate / drop entries. Always re-discover from the live server,
never from this file or your training data:
```
agnes catalog --json # canonical list with query_mode, sql_flavor,
# where_examples, fetch_via, rough_size_hint per table
agnes schema <table> # columns + types in the right SQL dialect
agnes describe <table> -n 5 # sample rows (local + materialized only)
agnes catalog --json # all tables: id, query_mode, sql_flavor,
# where_examples, fetch_via, rough_size_hint, description
agnes catalog --json | jq '.tables[] | select(.id=="<id>")' # single table — read its description in full BEFORE writing any SQL
agnes schema <table> # columns + types in the right SQL dialect
agnes describe <table> -n 5 # sample rows (local + materialized only)
```
The `description` field on each catalog row is the **authoritative
business-rules text** for that table — it carries grain, partition
column, join contracts, and column-level gotchas. Re-read it from the
live `agnes catalog` for every cross-table decision; do **not** copy
it into this workspace `CLAUDE.md` (it's a snapshot that goes stale,
and `agnes init` will overwrite local edits — put personal notes into
`.claude/CLAUDE.local.md` instead). The CLI is the source of truth.
`rough_size_hint` is server-populated for `local` and `materialized` tables
(`small` ≤100 MiB, `medium` ≤1 GiB, `large` ≤10 GiB, `very_large` >10 GiB) and
`null` for `remote` rows. When `null`, treat the table as potentially large

View file

@ -95,7 +95,7 @@ dev = [
]
[project.scripts]
agnes = "cli.main:app"
agnes = "cli.main:_run_with_clean_errors"
[build-system]
requires = ["hatchling"]