agnes-the-ai-analyst/cli/client.py
Vojtech Rysanek 0843c2bd1b fix(cli): bump --remote query timeout to 300s, add AGNES_QUERY_TIMEOUT
The httpx client behind 'agnes query --remote' used the default 30s
timeout, killing every BigQuery SELECT that took longer than half a
minute — i.e. most non-trivial remote queries.

cli/client.py now exposes QUERY_TIMEOUT_S (default 300s, override via
AGNES_QUERY_TIMEOUT) and propagates a kw-only 'timeout' through
api_get/post/delete/patch. _query_remote passes QUERY_TIMEOUT_S so only
the long-running /api/query path gets the bump; every other CLI call
keeps the 30s default.

Server-side has no read deadline on /api/query, so the client cap was
the sole bottleneck.
2026-05-05 16:40:54 +04:00

104 lines
4 KiB
Python

"""HTTP client wrapper for CLI — handles auth, retries, streaming."""
import os
import time
from pathlib import Path
from typing import Optional
import httpx
from cli.config import get_server_url, get_token
# Retry policy for transient failures during stream downloads. Scoped to
# network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via
# env for tests; defaults sit in the "one flaky network blip" window.
_RETRY_ATTEMPTS = int(os.environ.get("AGNES_STREAM_RETRIES", "3"))
_RETRY_BACKOFFS_S = (0.3, 1.0, 3.0) # seconds before attempt 2, 3, 4
# Long-running query timeout. /api/query forwards to BigQuery for remote
# tables, where SELECTs routinely run for minutes. The default 30s HTTP
# timeout dies long before BQ finishes. Operators tune via AGNES_QUERY_TIMEOUT.
QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300"))
def get_client(timeout: float = 30.0) -> httpx.Client:
"""Get an authenticated httpx client."""
token = get_token()
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
return httpx.Client(
base_url=get_server_url(),
headers=headers,
timeout=timeout,
)
def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.get(path, **kwargs)
def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.post(path, **kwargs)
def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.delete(path, **kwargs)
def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
with get_client(timeout=timeout) as client:
return client.patch(path, **kwargs)
def _is_transient(exc: Exception) -> bool:
"""Worth retrying? Network blip or 5xx — yes. Auth / 4xx — no."""
if isinstance(exc, (httpx.ConnectError, httpx.ReadError, httpx.WriteError,
httpx.RemoteProtocolError, httpx.TimeoutException)):
return True
if isinstance(exc, httpx.HTTPStatusError):
return 500 <= exc.response.status_code < 600
return False
def stream_download(path: str, target_path: str, progress_callback=None) -> int:
"""Stream a file to `target_path` atomically and with retries.
Durability properties:
- Writes to `target_path + ".tmp"`, then `os.replace` on success. The
real target file never exists in a half-written state.
- Retries up to `_RETRY_ATTEMPTS` times on transient errors (network
blip, 5xx); 4xx (auth/404) is raised immediately.
- No hash check here — that's done in the sync command against the
manifest hash, because only the caller knows the expected value.
"""
tmp_path = Path(f"{target_path}.tmp")
last_exc: Optional[Exception] = None
for attempt in range(_RETRY_ATTEMPTS + 1):
try:
tmp_path.unlink(missing_ok=True)
with get_client(timeout=300.0) as client:
with client.stream("GET", path) as response:
response.raise_for_status()
total = 0
with open(tmp_path, "wb") as f:
for chunk in response.iter_bytes(chunk_size=65536):
f.write(chunk)
total += len(chunk)
if progress_callback:
progress_callback(len(chunk))
# os.replace is atomic on POSIX and Windows for same-filesystem moves.
os.replace(tmp_path, target_path)
return total
except Exception as exc:
last_exc = exc
if attempt == _RETRY_ATTEMPTS or not _is_transient(exc):
break
time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)])
# Clean up any leftover tmp, then surface the last exception.
tmp_path.unlink(missing_ok=True)
assert last_exc is not None
raise last_exc