feat: clean CLI errors + init progress + skip-materialize + claude.md catalog pointer

Three first-try-failure-surface fixes from Pavel's #185 trace + the
template guidance question, all under PR #188's umbrella so they land
together with the file_server / parallel pull / Tier 1 work.

1. CLI clean-error wrapper — new AgnesTransportError raised by the
   api_*/stream_download helpers when httpx times out / drops /
   refuses, plus a top-level Typer wrapper (cli/main.py) that prints
   one-line "Error: …" + actionable hint and exits non-zero. Full
   traceback goes to ~/.config/agnes/last-error.log for support
   forwarding. Unhandled Exceptions are caught at the same boundary
   so no Python traceback ever leaks to the analyst's terminal.

   Pavel's #185 Phase 3B: a 30-frame httpx traceback from a slow BQ
   --remote query made it look like a CLI bug. Now: clean message +
   hint pointing at `agnes snapshot create` / partition-column
   guidance.

   Entry point in pyproject.toml flipped from `cli.main:app` →
   `cli.main:_run_with_clean_errors` so the wrapper actually runs
   under the installed `agnes` binary.

2. agnes init / agnes pull --skip-materialize + progress bar.
   --skip-materialize omits query_mode='materialized' rows from the
   download set so a first init doesn't spend 44 minutes silently
   pulling a single 6 GB parquet (Pavel's #185 Phase 1). Rich-driven
   per-file progress bar with label/bytes/rate/ETA renders to stderr
   when not --quiet and not --json. Aggregates across the parallel
   ThreadPoolExecutor workers added earlier in this PR.

3. config/claude_md_template.txt: explicit one-line snippet pointing
   at `agnes catalog --json | jq '.tables[] | select(.id=="<id>")'`
   for per-table descriptions + restated invariant: "the description
   field on each catalog row is the authoritative business-rules
   text — re-read live, never copy into this file." Resolves the
   regression-or-feature debate between Pavel (wants annotations)
   and the user feedback that landed in the prior commit (don't
   embed table-specific content; tables change). Catalog command
   stays the source of truth.
This commit is contained in:
ZdenekSrotyr 2026-05-05 18:11:59 +02:00
parent e5fb913cec
commit 28423907fd
8 changed files with 294 additions and 28 deletions

View file

@ -10,6 +10,11 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [Unreleased] ## [Unreleased]
### Added
- **`agnes init` / `agnes pull --skip-materialize`** — opts the first sync out of materialized-mode tables (server-side scheduled-query parquets, often multi-GB). Pavel's #185 Phase 1: a single 6.3 GB `order_economics` parquet kept first init silent for 44 minutes. Materialized rows stay discoverable via `agnes catalog`; rerun without the flag once the analyst actually needs them locally.
- **`agnes pull` progress bar** — Rich-driven aggregate transfer display rendered to stderr when not `--quiet` and not `--json`. Per-file label + bytes / total / rate / ETA, aggregated across the parallel `ThreadPoolExecutor` workers introduced earlier in this PR. Replaces the prior 0-stdout silence on first init.
- **CLI clean-error wrapper** (`cli/main.py:_run_with_clean_errors`, new entry point in `pyproject.toml`) — `httpx.ReadTimeout` / `ConnectError` / `RemoteProtocolError` etc. used to dump a five-frame Python traceback to the analyst's terminal when a `agnes query --remote` against a slow BQ view timed out client-side. Now: one-line `Error: …` message + actionable hint (e.g. "narrow the WHERE on the partition column from `agnes catalog --json`, or run `agnes snapshot create --estimate`"), exit code 1. Full traceback is appended to `~/.config/agnes/last-error.log` so an operator can recover it for support without spamming the analyst's terminal. Implemented as `AgnesTransportError` raised from the `api_get` / `api_post` / `api_delete` / `api_patch` / `stream_download` helpers in `cli/client.py`; the top-level Typer wrapper renders it. Unhandled `Exception`s are caught at the same boundary, logged, and printed as "internal CLI error (see logfile)" so a Python traceback never leaks to the analyst.
### Changed ### Changed
- **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request**`/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below. - **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request**`/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below.
- **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency. - **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency.

View file

@ -2,12 +2,14 @@
import os import os
import time import time
import traceback
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
import httpx import httpx
from cli.config import get_server_url, get_token from cli.config import _config_dir, get_server_url, get_token
# Retry policy for transient failures during stream downloads. Scoped to # Retry policy for transient failures during stream downloads. Scoped to
# network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via # network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via
@ -21,6 +23,105 @@ _RETRY_BACKOFFS_S = (0.3, 1.0, 3.0) # seconds before attempt 2, 3, 4
QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300")) QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300"))
# ── Transport-error translation ─────────────────────────────────────────
# Pavel's Issue #185 Phase 3B caught the failure mode: when httpx raises
# `ReadTimeout` / `ConnectError` / `RemoteProtocolError` and the CLI
# command doesn't catch it, Typer dumps a five-frame Python traceback to
# the analyst's terminal. That looks like a CLI bug to a non-Python user
# and obscures the actionable signal ("server slow, try snapshot create").
# Translate transport exceptions to `AgnesTransportError` with a typed
# user-facing message, log the full traceback to `~/.config/agnes/last-
# error.log` for debug, and let the top-level CLI handler render the
# clean message + exit non-zero.
_LOG_FILE = _config_dir() / "last-error.log"
class AgnesTransportError(Exception):
"""Network / transport failure with a user-actionable message.
Raised by the api_* / stream_download helpers when httpx surfaces a
connection / timeout / protocol error. The CLI's top-level Typer
handler catches this, prints `.user_message` (NOT the traceback),
and exits non-zero. Full traceback goes to ``~/.config/agnes/last-
error.log`` so an operator can recover it for support.
"""
def __init__(self, user_message: str, *, hint: str = "", logfile_path: Path | None = None):
super().__init__(user_message)
self.user_message = user_message
self.hint = hint
self.logfile_path = logfile_path
def _log_traceback(exc: BaseException, *, context: str) -> Path:
"""Append a timestamped traceback to ``~/.config/agnes/last-error.log``
and return the path. Best-effort never raises (a logging failure
must not mask the original error)."""
try:
with open(_LOG_FILE, "a", encoding="utf-8") as f:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
f.write(f"\n=== {ts} {context} ===\n")
traceback.print_exception(type(exc), exc, exc.__traceback__, file=f)
except Exception:
pass
return _LOG_FILE
def _translate_transport_error(exc: Exception, *, context: str) -> AgnesTransportError:
"""Map httpx transport exceptions to user-facing CLI messages. The
mapping is intentionally pragmatic analysts care about "what do I
do next", not the gRPC / TCP detail."""
log = _log_traceback(exc, context=context)
if isinstance(exc, httpx.ReadTimeout):
return AgnesTransportError(
f"Server didn't respond within the read timeout ({QUERY_TIMEOUT_S:.0f}s) "
f"for {context}.",
hint=(
"If this is `agnes query --remote` against a heavy BQ view, "
"the underlying BQ job took longer than the wait window. Try:\n"
" • narrow the WHERE (especially the partition column from `agnes catalog --json`)\n"
" • `agnes snapshot create <table> ... --estimate` to materialize once + query locally\n"
" • set AGNES_QUERY_TIMEOUT=600 for a longer client-side wait\n"
f"Full traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, httpx.ConnectError):
return AgnesTransportError(
f"Can't reach the agnes server for {context}.",
hint=(
"Check the server URL with `agnes status`, network reachability "
"(VPN / DNS / firewall), and the TLS-trust setup if this is a "
f"corporate-CA deployment.\nFull traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError)):
return AgnesTransportError(
f"Connection broke mid-flight on {context}.",
hint=(
"Usually a transient network blip. Re-run the command. If it "
f"keeps happening, check `agnes status`.\nFull traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, httpx.TimeoutException):
return AgnesTransportError(
f"Network timeout on {context}.",
hint=f"Re-run; if persistent, check the server.\nFull traceback: {log}",
logfile_path=log,
)
# Anything else: re-wrap with a generic message so the CLI doesn't
# dump the traceback. We'd prefer a typed translation; if you hit
# this branch, add a clause above.
return AgnesTransportError(
f"Unexpected error on {context}: {type(exc).__name__}.",
hint=f"Full traceback: {log}",
logfile_path=log,
)
def get_client(timeout: float = 30.0) -> httpx.Client: def get_client(timeout: float = 30.0) -> httpx.Client:
"""Get an authenticated httpx client.""" """Get an authenticated httpx client."""
token = get_token() token = get_token()
@ -35,23 +136,35 @@ def get_client(timeout: float = 30.0) -> httpx.Client:
def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client: try:
return client.get(path, **kwargs) with get_client(timeout=timeout) as client:
return client.get(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"GET {path}") from exc
def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client: try:
return client.post(path, **kwargs) with get_client(timeout=timeout) as client:
return client.post(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"POST {path}") from exc
def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client: try:
return client.delete(path, **kwargs) with get_client(timeout=timeout) as client:
return client.delete(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"DELETE {path}") from exc
def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client: try:
return client.patch(path, **kwargs) with get_client(timeout=timeout) as client:
return client.patch(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"PATCH {path}") from exc
def _is_transient(exc: Exception) -> bool: def _is_transient(exc: Exception) -> bool:
@ -98,7 +211,13 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int:
if attempt == _RETRY_ATTEMPTS or not _is_transient(exc): if attempt == _RETRY_ATTEMPTS or not _is_transient(exc):
break break
time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)]) time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)])
# Clean up any leftover tmp, then surface the last exception. # Clean up any leftover tmp, then surface the last exception. Translate
# transport errors to AgnesTransportError so the CLI prints a clean
# message instead of a Python traceback (Pavel's #185 Phase 3B).
tmp_path.unlink(missing_ok=True) tmp_path.unlink(missing_ok=True)
assert last_exc is not None assert last_exc is not None
if isinstance(last_exc, httpx.HTTPError):
raise _translate_transport_error(
last_exc, context=f"GET {path} (stream → {target_path})"
) from last_exc
raise last_exc raise last_exc

View file

@ -64,6 +64,16 @@ def init(
token: str = typer.Option(..., "--token", help="Personal access token"), token: str = typer.Option(..., "--token", help="Personal access token"),
force: bool = typer.Option(False, "--force", help="Re-initialize an existing workspace"), force: bool = typer.Option(False, "--force", help="Re-initialize an existing workspace"),
workspace_str: Optional[str] = typer.Option(None, "--workspace", help="Target dir (default: cwd)"), workspace_str: Optional[str] = typer.Option(None, "--workspace", help="Target dir (default: cwd)"),
skip_materialize: bool = typer.Option(
False, "--skip-materialize",
help=(
"Skip materialized-mode tables on the first pull. The first "
"init can otherwise spend tens of minutes silently downloading "
"a single multi-GB scheduled-query parquet. Materialized rows "
"are still discoverable via `agnes catalog`; rerun `agnes pull` "
"without this flag once you actually need them locally."
),
),
): ):
"""Bootstrap workspace: auth, CLAUDE.md, hooks, first pull, AGNES_WORKSPACE.md.""" """Bootstrap workspace: auth, CLAUDE.md, hooks, first pull, AGNES_WORKSPACE.md."""
workspace = Path(workspace_str).resolve() if workspace_str else Path.cwd() workspace = Path(workspace_str).resolve() if workspace_str else Path.cwd()
@ -176,7 +186,15 @@ def init(
# exception escaping here is a programming error worth surfacing. # exception escaping here is a programming error worth surfacing.
# ------------------------------------------------------------------ # ------------------------------------------------------------------
try: try:
result: PullResult = run_pull(server_url, token, workspace) # `agnes init` always runs interactively (analyst typing the
# command), so progress is on by default — Pavel's #185 Phase 1
# was a 44-minute silent download on the very first install.
# Pass it through to run_pull.
result: PullResult = run_pull(
server_url, token, workspace,
skip_materialize=skip_materialize,
show_progress=True,
)
except Exception as exc: except Exception as exc:
typer.echo(render_error(0, {"detail": { typer.echo(render_error(0, {"detail": {
"kind": "manifest_unauthorized", "kind": "manifest_unauthorized",

View file

@ -38,6 +38,15 @@ def pull(
quiet: bool = typer.Option(False, "--quiet", help="Suppress success stdout (errors still surface on stderr)"), quiet: bool = typer.Option(False, "--quiet", help="Suppress success stdout (errors still surface on stderr)"),
as_json: bool = typer.Option(False, "--json", help="Emit a single JSON object summarizing the pull"), as_json: bool = typer.Option(False, "--json", help="Emit a single JSON object summarizing the pull"),
dry_run: bool = typer.Option(False, "--dry-run", help="Compute the delta without writing anything to disk"), dry_run: bool = typer.Option(False, "--dry-run", help="Compute the delta without writing anything to disk"),
skip_materialize: bool = typer.Option(
False, "--skip-materialize",
help=(
"Skip materialized-mode tables (server-side scheduled BQ "
"scan results, often multi-GB). Their data is still discoverable "
"via `agnes catalog` and remote-mode tables still pull. Useful "
"for a fast first init when an analyst only needs --remote access."
),
),
): ):
"""Refresh data from the server into ./server/parquet + ./user/duckdb.""" """Refresh data from the server into ./server/parquet + ./user/duckdb."""
server_url = get_server_url() server_url = get_server_url()
@ -68,8 +77,17 @@ def pull(
workspace = Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve() workspace = Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve()
# Show progress unless quiet (SessionStart hooks) or json (machine-
# readable output where Rich's terminal-control sequences would be
# garbage in the consumer's parser).
show_progress = not (quiet or as_json)
try: try:
result: PullResult = run_pull(server_url, token, workspace, dry_run=dry_run) result: PullResult = run_pull(
server_url, token, workspace,
dry_run=dry_run,
skip_materialize=skip_materialize,
show_progress=show_progress,
)
except Exception as exc: except Exception as exc:
# `run_pull` is documented to record per-table / per-stage failures # `run_pull` is documented to record per-table / per-stage failures
# under `result.errors` rather than raising, so reaching this branch # under `result.errors` rather than raising, so reaching this branch

View file

@ -112,6 +112,8 @@ def run_pull(
workspace: Path, workspace: Path,
*, *,
dry_run: bool = False, dry_run: bool = False,
skip_materialize: bool = False,
show_progress: bool = False,
) -> PullResult: ) -> PullResult:
"""Refresh local parquets + corporate memory rules from the server. """Refresh local parquets + corporate memory rules from the server.
@ -119,6 +121,17 @@ def run_pull(
Typer/Rich UI. Returns a `PullResult` summary; never raises for Typer/Rich UI. Returns a `PullResult` summary; never raises for
network/server errors (records them under `errors` instead) so the network/server errors (records them under `errors` instead) so the
caller can decide whether a partial pull is fatal. caller can decide whether a partial pull is fatal.
Args:
skip_materialize: When True, omit `query_mode='materialized'`
tables from the download set. Use for analysts who only
care about `--remote` access on the workspace and don't
want to wait on multi-GB scheduled-query parquets at first
init. Pavel's #185 Phase 1: a 6.3 GB `order_economics`
parquet kept first init silent for 44 minutes.
show_progress: When True, render a per-file progress bar to
stderr via Rich during the parallel download phase. Pass
False from `--quiet` callers (SessionStart hooks).
""" """
started = time.monotonic() started = time.monotonic()
result = PullResult() result = PullResult()
@ -159,6 +172,11 @@ def run_pull(
for tid, info in server_tables.items(): for tid, info in server_tables.items():
if info.get("query_mode") == "remote": if info.get("query_mode") == "remote":
continue continue
if skip_materialize and info.get("query_mode") == "materialized":
# Operator opt-out for first-init. Materialized rows are
# still discoverable via `agnes catalog` and queryable
# the next time `agnes pull` runs without --skip-materialize.
continue
non_remote_total += 1 non_remote_total += 1
local_hash = local_tables.get(tid, {}).get("hash", "") local_hash = local_tables.get(tid, {}).get("hash", "")
server_hash = info.get("hash", "") server_hash = info.get("hash", "")
@ -201,14 +219,52 @@ def run_pull(
# the executor + thread overhead for the common single-update case. # the executor + thread overhead for the common single-update case.
workers = min(workers, len(to_download)) if to_download else 1 workers = min(workers, len(to_download)) if to_download else 1
# Optional progress bar — Rich's Progress tracks per-file bytes
# streamed, aggregated across the parallel ThreadPoolExecutor
# workers. Pavel's #185 Phase 1: a single 6.3 GB parquet on first
# init went 44 minutes silent, looked frozen. Now: aggregate "X.Y
# GB / Z.A GB · 56 MB/s · ETA 1m 20s" to stderr while threads
# stream. None when show_progress=False (SessionStart hooks etc.).
progress = None
progress_tasks: dict[str, int] = {}
if show_progress and to_download:
from rich.progress import (
Progress, BarColumn, DownloadColumn, TextColumn,
TimeRemainingColumn, TransferSpeedColumn,
)
progress = Progress(
TextColumn("[bold]{task.fields[label]}[/]"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TimeRemainingColumn(),
transient=False,
)
progress.start()
for tid in to_download:
size = int(server_tables[tid].get("size_bytes") or 0)
# Some manifest entries don't carry size — Rich shows
# an indeterminate bar in that case.
progress_tasks[tid] = progress.add_task(
"download", label=tid, total=size if size > 0 else None,
)
def _download_one(tid: str) -> tuple[str, dict | None, str | None]: def _download_one(tid: str) -> tuple[str, dict | None, str | None]:
"""Returns (tid, local_table_entry_or_None, error_or_None). """Returns (tid, local_table_entry_or_None, error_or_None).
One bound thread per call; stream_download is sync I/O so a One bound thread per call; stream_download is sync I/O so a
ThreadPoolExecutor (not asyncio) is the right tool.""" ThreadPoolExecutor (not asyncio) is the right tool. The
progress callback is thread-safe Rich's Progress.update
holds an internal lock."""
target = parquet_dir / f"{tid}.parquet" target = parquet_dir / f"{tid}.parquet"
expected_hash = server_tables[tid].get("hash", "") expected_hash = server_tables[tid].get("hash", "")
cb = None
if progress is not None and tid in progress_tasks:
task_id = progress_tasks[tid]
def cb(n: int, _tid=tid, _task=task_id):
progress.update(_task, advance=n)
try: try:
stream_download(f"/api/data/{tid}/download", str(target)) stream_download(f"/api/data/{tid}/download", str(target),
progress_callback=cb)
if expected_hash: if expected_hash:
actual_hash = _file_md5(target) actual_hash = _file_md5(target)
if actual_hash != expected_hash: if actual_hash != expected_hash:
@ -228,12 +284,16 @@ def run_pull(
except Exception as exc: except Exception as exc:
return tid, None, str(exc) return tid, None, str(exc)
if workers <= 1: try:
outcomes = [_download_one(tid) for tid in to_download] if workers <= 1:
else: outcomes = [_download_one(tid) for tid in to_download]
from concurrent.futures import ThreadPoolExecutor else:
with ThreadPoolExecutor(max_workers=workers) as ex: from concurrent.futures import ThreadPoolExecutor
outcomes = list(ex.map(_download_one, to_download)) with ThreadPoolExecutor(max_workers=workers) as ex:
outcomes = list(ex.map(_download_one, to_download))
finally:
if progress is not None:
progress.stop()
for tid, entry, err in outcomes: for tid, entry, err in outcomes:
if err is not None: if err is not None:

View file

@ -123,5 +123,41 @@ app.add_typer(snapshot_app, name="snapshot")
app.add_typer(disk_info_app, name="disk-info") app.add_typer(disk_info_app, name="disk-info")
def _run_with_clean_errors() -> None:
"""Wrap ``app()`` so AgnesTransportError (and other typed CLI errors)
surface as a one-line message + exit, never as a Python traceback. The
full traceback is already logged to ``~/.config/agnes/last-error.log``
by the api_* helpers operators read it from there for support
forwarding. Anything that escapes this wrapper IS a CLI bug worth
fixing log + print "internal error" so the analyst doesn't see a
Pythonist's traceback either.
Pavel's #185 Phase 3B: previously a `httpx.ReadTimeout` from an
`agnes query --remote` against a slow BQ view dumped a 30-frame
traceback to the analyst's terminal. Now: one clean line + a hint,
return code 1.
"""
from cli.client import AgnesTransportError, _log_traceback, _LOG_FILE
try:
app()
except (AgnesTransportError) as exc:
typer.echo(f"Error: {exc.user_message}", err=True)
if exc.hint:
typer.echo(exc.hint, err=True)
sys.exit(1)
except typer.Exit:
raise
except (KeyboardInterrupt, SystemExit):
raise
except Exception as exc: # last-resort net — escaped exceptions are bugs
log = _log_traceback(exc, context="unhandled at CLI top-level")
typer.echo(
f"Error: internal CLI error ({type(exc).__name__}). "
f"Full traceback logged to {log}.",
err=True,
)
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
app() _run_with_clean_errors()

View file

@ -39,16 +39,26 @@ This workspace is connected to {{ server.url }}.
## Discovering tables — never enumerate from memory ## Discovering tables — never enumerate from memory
Tables, columns, sizes, and `query_mode` change as admins register / migrate / Tables, columns, sizes, descriptions, and `query_mode` change as admins
drop entries. Always re-discover from the live server, never from this file: register / migrate / drop entries. Always re-discover from the live server,
never from this file or your training data:
``` ```
agnes catalog --json # canonical list with query_mode, sql_flavor, agnes catalog --json # all tables: id, query_mode, sql_flavor,
# where_examples, fetch_via, rough_size_hint per table # where_examples, fetch_via, rough_size_hint, description
agnes schema <table> # columns + types in the right SQL dialect agnes catalog --json | jq '.tables[] | select(.id=="<id>")' # single table — read its description in full BEFORE writing any SQL
agnes describe <table> -n 5 # sample rows (local + materialized only) agnes schema <table> # columns + types in the right SQL dialect
agnes describe <table> -n 5 # sample rows (local + materialized only)
``` ```
The `description` field on each catalog row is the **authoritative
business-rules text** for that table — it carries grain, partition
column, join contracts, and column-level gotchas. Re-read it from the
live `agnes catalog` for every cross-table decision; do **not** copy
it into this workspace `CLAUDE.md` (it's a snapshot that goes stale,
and `agnes init` will overwrite local edits — put personal notes into
`.claude/CLAUDE.local.md` instead). The CLI is the source of truth.
`rough_size_hint` is server-populated for `local` and `materialized` tables `rough_size_hint` is server-populated for `local` and `materialized` tables
(`small` ≤100 MiB, `medium` ≤1 GiB, `large` ≤10 GiB, `very_large` >10 GiB) and (`small` ≤100 MiB, `medium` ≤1 GiB, `large` ≤10 GiB, `very_large` >10 GiB) and
`null` for `remote` rows. When `null`, treat the table as potentially large `null` for `remote` rows. When `null`, treat the table as potentially large

View file

@ -95,7 +95,7 @@ dev = [
] ]
[project.scripts] [project.scripts]
agnes = "cli.main:app" agnes = "cli.main:_run_with_clean_errors"
[build-system] [build-system]
requires = ["hatchling"] requires = ["hatchling"]