Three first-try-failure-surface fixes from Pavel's #185 trace + the template guidance question, all under PR #188's umbrella so they land together with the file_server / parallel pull / Tier 1 work. 1. CLI clean-error wrapper — new AgnesTransportError raised by the api_*/stream_download helpers when httpx times out / drops / refuses, plus a top-level Typer wrapper (cli/main.py) that prints one-line "Error: …" + actionable hint and exits non-zero. Full traceback goes to ~/.config/agnes/last-error.log for support forwarding. Unhandled Exceptions are caught at the same boundary so no Python traceback ever leaks to the analyst's terminal. Pavel's #185 Phase 3B: a 30-frame httpx traceback from a slow BQ --remote query made it look like a CLI bug. Now: clean message + hint pointing at `agnes snapshot create` / partition-column guidance. Entry point in pyproject.toml flipped from `cli.main:app` → `cli.main:_run_with_clean_errors` so the wrapper actually runs under the installed `agnes` binary. 2. agnes init / agnes pull --skip-materialize + progress bar. --skip-materialize omits query_mode='materialized' rows from the download set so a first init doesn't spend 44 minutes silently pulling a single 6 GB parquet (Pavel's #185 Phase 1). Rich-driven per-file progress bar with label/bytes/rate/ETA renders to stderr when not --quiet and not --json. Aggregates across the parallel ThreadPoolExecutor workers added earlier in this PR. 3. config/claude_md_template.txt: explicit one-line snippet pointing at `agnes catalog --json | jq '.tables[] | select(.id=="<id>")'` for per-table descriptions + restated invariant: "the description field on each catalog row is the authoritative business-rules text — re-read live, never copy into this file." Resolves the regression-or-feature debate between Pavel (wants annotations) and the user feedback that landed in the prior commit (don't embed table-specific content; tables change). Catalog command stays the source of truth.
223 lines
9.5 KiB
Python
223 lines
9.5 KiB
Python
"""HTTP client wrapper for CLI — handles auth, retries, streaming."""
|
|
|
|
import os
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
from cli.config import _config_dir, get_server_url, get_token
|
|
|
|
# Retry policy for transient failures during stream downloads. Scoped to
|
|
# network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via
|
|
# env for tests; defaults sit in the "one flaky network blip" window.
|
|
_RETRY_ATTEMPTS = int(os.environ.get("AGNES_STREAM_RETRIES", "3"))
|
|
_RETRY_BACKOFFS_S = (0.3, 1.0, 3.0) # seconds before attempt 2, 3, 4
|
|
|
|
# Long-running query timeout. /api/query forwards to BigQuery for remote
|
|
# tables, where SELECTs routinely run for minutes. The default 30s HTTP
|
|
# timeout dies long before BQ finishes. Operators tune via AGNES_QUERY_TIMEOUT.
|
|
QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300"))
|
|
|
|
|
|
# ── Transport-error translation ─────────────────────────────────────────
|
|
# Pavel's Issue #185 Phase 3B caught the failure mode: when httpx raises
|
|
# `ReadTimeout` / `ConnectError` / `RemoteProtocolError` and the CLI
|
|
# command doesn't catch it, Typer dumps a five-frame Python traceback to
|
|
# the analyst's terminal. That looks like a CLI bug to a non-Python user
|
|
# and obscures the actionable signal ("server slow, try snapshot create").
|
|
# Translate transport exceptions to `AgnesTransportError` with a typed
|
|
# user-facing message, log the full traceback to `~/.config/agnes/last-
|
|
# error.log` for debug, and let the top-level CLI handler render the
|
|
# clean message + exit non-zero.
|
|
|
|
_LOG_FILE = _config_dir() / "last-error.log"
|
|
|
|
|
|
class AgnesTransportError(Exception):
|
|
"""Network / transport failure with a user-actionable message.
|
|
|
|
Raised by the api_* / stream_download helpers when httpx surfaces a
|
|
connection / timeout / protocol error. The CLI's top-level Typer
|
|
handler catches this, prints `.user_message` (NOT the traceback),
|
|
and exits non-zero. Full traceback goes to ``~/.config/agnes/last-
|
|
error.log`` so an operator can recover it for support.
|
|
"""
|
|
|
|
def __init__(self, user_message: str, *, hint: str = "", logfile_path: Path | None = None):
|
|
super().__init__(user_message)
|
|
self.user_message = user_message
|
|
self.hint = hint
|
|
self.logfile_path = logfile_path
|
|
|
|
|
|
def _log_traceback(exc: BaseException, *, context: str) -> Path:
|
|
"""Append a timestamped traceback to ``~/.config/agnes/last-error.log``
|
|
and return the path. Best-effort — never raises (a logging failure
|
|
must not mask the original error)."""
|
|
try:
|
|
with open(_LOG_FILE, "a", encoding="utf-8") as f:
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
f.write(f"\n=== {ts} {context} ===\n")
|
|
traceback.print_exception(type(exc), exc, exc.__traceback__, file=f)
|
|
except Exception:
|
|
pass
|
|
return _LOG_FILE
|
|
|
|
|
|
def _translate_transport_error(exc: Exception, *, context: str) -> AgnesTransportError:
|
|
"""Map httpx transport exceptions to user-facing CLI messages. The
|
|
mapping is intentionally pragmatic — analysts care about "what do I
|
|
do next", not the gRPC / TCP detail."""
|
|
log = _log_traceback(exc, context=context)
|
|
if isinstance(exc, httpx.ReadTimeout):
|
|
return AgnesTransportError(
|
|
f"Server didn't respond within the read timeout ({QUERY_TIMEOUT_S:.0f}s) "
|
|
f"for {context}.",
|
|
hint=(
|
|
"If this is `agnes query --remote` against a heavy BQ view, "
|
|
"the underlying BQ job took longer than the wait window. Try:\n"
|
|
" • narrow the WHERE (especially the partition column from `agnes catalog --json`)\n"
|
|
" • `agnes snapshot create <table> ... --estimate` to materialize once + query locally\n"
|
|
" • set AGNES_QUERY_TIMEOUT=600 for a longer client-side wait\n"
|
|
f"Full traceback: {log}"
|
|
),
|
|
logfile_path=log,
|
|
)
|
|
if isinstance(exc, httpx.ConnectError):
|
|
return AgnesTransportError(
|
|
f"Can't reach the agnes server for {context}.",
|
|
hint=(
|
|
"Check the server URL with `agnes status`, network reachability "
|
|
"(VPN / DNS / firewall), and the TLS-trust setup if this is a "
|
|
f"corporate-CA deployment.\nFull traceback: {log}"
|
|
),
|
|
logfile_path=log,
|
|
)
|
|
if isinstance(exc, (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError)):
|
|
return AgnesTransportError(
|
|
f"Connection broke mid-flight on {context}.",
|
|
hint=(
|
|
"Usually a transient network blip. Re-run the command. If it "
|
|
f"keeps happening, check `agnes status`.\nFull traceback: {log}"
|
|
),
|
|
logfile_path=log,
|
|
)
|
|
if isinstance(exc, httpx.TimeoutException):
|
|
return AgnesTransportError(
|
|
f"Network timeout on {context}.",
|
|
hint=f"Re-run; if persistent, check the server.\nFull traceback: {log}",
|
|
logfile_path=log,
|
|
)
|
|
# Anything else: re-wrap with a generic message so the CLI doesn't
|
|
# dump the traceback. We'd prefer a typed translation; if you hit
|
|
# this branch, add a clause above.
|
|
return AgnesTransportError(
|
|
f"Unexpected error on {context}: {type(exc).__name__}.",
|
|
hint=f"Full traceback: {log}",
|
|
logfile_path=log,
|
|
)
|
|
|
|
|
|
def get_client(timeout: float = 30.0) -> httpx.Client:
|
|
"""Get an authenticated httpx client."""
|
|
token = get_token()
|
|
headers = {}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
return httpx.Client(
|
|
base_url=get_server_url(),
|
|
headers=headers,
|
|
timeout=timeout,
|
|
)
|
|
|
|
|
|
def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
|
|
try:
|
|
with get_client(timeout=timeout) as client:
|
|
return client.get(path, **kwargs)
|
|
except httpx.HTTPError as exc:
|
|
raise _translate_transport_error(exc, context=f"GET {path}") from exc
|
|
|
|
|
|
def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
|
|
try:
|
|
with get_client(timeout=timeout) as client:
|
|
return client.post(path, **kwargs)
|
|
except httpx.HTTPError as exc:
|
|
raise _translate_transport_error(exc, context=f"POST {path}") from exc
|
|
|
|
|
|
def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
|
|
try:
|
|
with get_client(timeout=timeout) as client:
|
|
return client.delete(path, **kwargs)
|
|
except httpx.HTTPError as exc:
|
|
raise _translate_transport_error(exc, context=f"DELETE {path}") from exc
|
|
|
|
|
|
def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
|
|
try:
|
|
with get_client(timeout=timeout) as client:
|
|
return client.patch(path, **kwargs)
|
|
except httpx.HTTPError as exc:
|
|
raise _translate_transport_error(exc, context=f"PATCH {path}") from exc
|
|
|
|
|
|
def _is_transient(exc: Exception) -> bool:
|
|
"""Worth retrying? Network blip or 5xx — yes. Auth / 4xx — no."""
|
|
if isinstance(exc, (httpx.ConnectError, httpx.ReadError, httpx.WriteError,
|
|
httpx.RemoteProtocolError, httpx.TimeoutException)):
|
|
return True
|
|
if isinstance(exc, httpx.HTTPStatusError):
|
|
return 500 <= exc.response.status_code < 600
|
|
return False
|
|
|
|
|
|
def stream_download(path: str, target_path: str, progress_callback=None) -> int:
|
|
"""Stream a file to `target_path` atomically and with retries.
|
|
|
|
Durability properties:
|
|
- Writes to `target_path + ".tmp"`, then `os.replace` on success. The
|
|
real target file never exists in a half-written state.
|
|
- Retries up to `_RETRY_ATTEMPTS` times on transient errors (network
|
|
blip, 5xx); 4xx (auth/404) is raised immediately.
|
|
- No hash check here — that's done in the sync command against the
|
|
manifest hash, because only the caller knows the expected value.
|
|
"""
|
|
tmp_path = Path(f"{target_path}.tmp")
|
|
last_exc: Optional[Exception] = None
|
|
for attempt in range(_RETRY_ATTEMPTS + 1):
|
|
try:
|
|
tmp_path.unlink(missing_ok=True)
|
|
with get_client(timeout=300.0) as client:
|
|
with client.stream("GET", path) as response:
|
|
response.raise_for_status()
|
|
total = 0
|
|
with open(tmp_path, "wb") as f:
|
|
for chunk in response.iter_bytes(chunk_size=65536):
|
|
f.write(chunk)
|
|
total += len(chunk)
|
|
if progress_callback:
|
|
progress_callback(len(chunk))
|
|
# os.replace is atomic on POSIX and Windows for same-filesystem moves.
|
|
os.replace(tmp_path, target_path)
|
|
return total
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
if attempt == _RETRY_ATTEMPTS or not _is_transient(exc):
|
|
break
|
|
time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)])
|
|
# Clean up any leftover tmp, then surface the last exception. Translate
|
|
# transport errors to AgnesTransportError so the CLI prints a clean
|
|
# message instead of a Python traceback (Pavel's #185 Phase 3B).
|
|
tmp_path.unlink(missing_ok=True)
|
|
assert last_exc is not None
|
|
if isinstance(last_exc, httpx.HTTPError):
|
|
raise _translate_transport_error(
|
|
last_exc, context=f"GET {path} (stream → {target_path})"
|
|
) from last_exc
|
|
raise last_exc
|