feat(pull): aggregated progress + non-TTY textual fallback

Two improvements to `agnes pull` progress reporting:

1. **Aggregated per-file progress across chunked downloads**: the
   existing Rich progress bar already used one task per file, but the
   chunked-download contract (one file = N parallel chunk callbacks
   summing to file size) meant we needed to verify that all chunk
   threads advance the same task. They do — the per-file callback is
   constructed once per tid and routes every chunk's byte delta to the
   same task / textual entry, so the bar shows one aggregated bytes-
   downloaded total rather than N separate sub-bars.

2. **Textual fallback for non-TTY stderr**: when stderr is not a
   terminal (SessionStart hook, CI runner, Docker log capture), Rich
   either suppresses output (silent multi-minute pull on a 5 GB
   parquet) or emits raw control sequences. The new `_TextualProgress`
   helper instead emits one plain-text line per file at most every
   10%-of-total-bytes or 30 s, plus a final `100% done` line per file.
   Format: `[N/T files] <tid>: 25% (16 MB / 66 MB) at 1.5 MB/s`.

The TTY path is unchanged. Detection uses `sys.stderr.isatty()` —
`show_progress=True` flips into the textual fallback when that returns
False. `show_progress=False` (the SessionStart hook) still emits no
progress text in either mode.
This commit is contained in:
ZdenekSrotyr 2026-05-06 13:09:37 +02:00
parent bd1b5ad444
commit e72ff259f9
3 changed files with 294 additions and 8 deletions

View file

@ -34,6 +34,14 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
### Added
- New optional dependency `h2>=4.1.0` (HTTP/2 transport for httpx). Pure
performance — `agnes pull` works on HTTP/1.1 if the install skips it.
- **Textual progress fallback for non-TTY `agnes pull`**: when stderr is
not a terminal (Claude Code SessionStart hook, CI runner, Docker log
capture, …), `agnes pull --no-quiet` now emits a plain-text progress
line per file at most every 10% or 30 s, plus a final completion line.
Replaces the previous Rich-bar-on-pipe behavior that either suppressed
output entirely or leaked ANSI escape sequences. TTY path unchanged
(Rich progress bar with bytes / speed / ETA, aggregated per-file
across chunked-download chunks).
## [0.38.0] — 2026-05-06

View file

@ -65,6 +65,128 @@ class PullResult:
_SAFE_ID_RE = re.compile(r"^[a-zA-Z0-9_\-]{1,128}$")
class _TextualProgress:
"""Plain-text progress emitter for non-TTY stderr.
When `agnes pull` is invoked from a Claude Code SessionStart hook,
a CI runner, or any pipe consumer, stderr is not a terminal. Rich's
progress bar in that mode either suppresses output (silent for
minutes on a multi-GB parquet) or emits raw ANSI noise. This class
instead emits one terse line per file at sensible cadence.
Cadence policy: emit when *either*:
- per-file bytes-downloaded crosses a 10%-of-total boundary, OR
- 30 s have elapsed since this file's last emission.
Always emits one final "done" line per file via `finish()` so the
operator sees a confirmed completion even on tiny files.
Format: `[N/T files] <tid>: 25% (16 MB / 66 MB) at 1.5 MB/s` the
"[N/T files]" prefix lets the operator see overall pull progress
in a multi-table run without buffering all per-file lines.
Thread-safe `advance` is called from the chunked-download worker
threads; an internal lock serializes the update + emit.
"""
_HUMAN_UNITS = (
(1024 * 1024 * 1024 * 1024, "TB"),
(1024 * 1024 * 1024, "GB"),
(1024 * 1024, "MB"),
(1024, "KB"),
)
def __init__(self, *, stream, total_files: int, file_sizes: dict[str, int]):
import threading
self._stream = stream
self._total_files = total_files
self._file_sizes = file_sizes
self._lock = threading.Lock()
# Per-file state.
self._bytes: dict[str, int] = {tid: 0 for tid in file_sizes}
self._started_at: dict[str, float] = {}
self._last_emit_at: dict[str, float] = {}
self._last_emit_pct: dict[str, int] = {}
self._finished_idx: int = 0 # files whose `finish` line has been emitted
def advance(self, tid: str, n: int) -> None:
"""Add `n` bytes to the file's total. Emit a textual update if
the cadence policy allows."""
with self._lock:
now = time.monotonic()
if tid not in self._started_at:
self._started_at[tid] = now
self._last_emit_at[tid] = now
self._last_emit_pct[tid] = 0
self._bytes[tid] = self._bytes.get(tid, 0) + n
total = self._file_sizes.get(tid, 0)
current = self._bytes[tid]
pct = int((current * 100) / total) if total > 0 else 0
elapsed = now - self._last_emit_at[tid]
crossed_10 = pct >= self._last_emit_pct[tid] + 10
if crossed_10 or elapsed >= 30.0:
self._last_emit_at[tid] = now
self._last_emit_pct[tid] = pct - (pct % 10)
self._emit_line(tid, current, total, now)
def finish(self) -> None:
"""Emit a final `done` line for any file we never closed out."""
with self._lock:
now = time.monotonic()
for tid, total in self._file_sizes.items():
# Treat any file we observed bytes for as needing a
# final line. Files that errored out before any callback
# are still announced (operator wants visibility even on
# zero-byte attempts).
self._finished_idx += 1
bytes_ = self._bytes.get(tid, 0)
started = self._started_at.get(tid, now)
duration = max(0.001, now - started)
rate = bytes_ / duration
line = (
f"[{self._finished_idx}/{self._total_files} files] "
f"{tid}: 100% done "
f"({self._fmt_bytes(bytes_)} in {duration:.1f}s, "
f"{self._fmt_bytes(int(rate))}/s)\n"
)
self._stream.write(line)
try:
self._stream.flush()
except Exception:
pass
def _emit_line(self, tid: str, current: int, total: int, now: float) -> None:
started = self._started_at.get(tid, now)
duration = max(0.001, now - started)
rate = current / duration
if total > 0:
pct_str = f"{int((current * 100) / total)}%"
size_str = (
f"({self._fmt_bytes(current)} / {self._fmt_bytes(total)})"
)
else:
pct_str = "?"
size_str = f"({self._fmt_bytes(current)})"
idx = self._finished_idx + 1 # 1-based "currently working on file N"
line = (
f"[{idx}/{self._total_files} files] {tid}: {pct_str} "
f"{size_str} at {self._fmt_bytes(int(rate))}/s\n"
)
self._stream.write(line)
try:
self._stream.flush()
except Exception:
pass
@classmethod
def _fmt_bytes(cls, n: int) -> str:
for divisor, suffix in cls._HUMAN_UNITS:
if n >= divisor:
return f"{n / divisor:.1f} {suffix}"
return f"{n} B"
@contextmanager
def _override_server_env(server_url: str, token: str) -> Iterator[None]:
"""Set AGNES_SERVER + scoped token override for the duration of the call.
@ -219,15 +341,34 @@ def run_pull(
# the executor + thread overhead for the common single-update case.
workers = min(workers, len(to_download)) if to_download else 1
# Optional progress bar — Rich's Progress tracks per-file bytes
# streamed, aggregated across the parallel ThreadPoolExecutor
# workers. Pavel's #185 Phase 1: a single 6.3 GB parquet on first
# init went 44 minutes silent, looked frozen. Now: aggregate "X.Y
# GB / Z.A GB · 56 MB/s · ETA 1m 20s" to stderr while threads
# stream. None when show_progress=False (SessionStart hooks etc.).
# Optional progress reporting — two paths.
#
# 1. Rich progress bar: per-file bytes-streamed bar with speed +
# ETA. Rendered to stderr when stderr is a TTY. Aggregates
# across the parallel ThreadPoolExecutor workers and across
# chunked-download chunks (all chunks call the same callback
# advancing the same task).
# 2. Textual fallback: when `show_progress=True` but stderr is
# NOT a TTY (Claude Code SessionStart hook, CI run, Docker
# log capture), Rich would either suppress the bar or emit
# raw control sequences. Instead we emit one plain-text line
# per file at most every 10% or 30 s — enough signal to know
# the pull isn't frozen on a multi-GB parquet, terse enough
# not to spam the consumer's log.
#
# Both paths receive the same per-file callback so the chunked-
# download contract ("one file = one task, sum-of-chunks bytes")
# is honored uniformly.
import sys as _sys
progress = None
progress_tasks: dict[str, int] = {}
if show_progress and to_download:
textual = None
use_textual_fallback = (
show_progress
and to_download
and not _sys.stderr.isatty()
)
if show_progress and to_download and not use_textual_fallback:
from rich.progress import (
Progress, BarColumn, DownloadColumn, TextColumn,
TimeRemainingColumn, TransferSpeedColumn,
@ -248,13 +389,22 @@ def run_pull(
progress_tasks[tid] = progress.add_task(
"download", label=tid, total=size if size > 0 else None,
)
elif use_textual_fallback:
textual = _TextualProgress(
stream=_sys.stderr,
total_files=len(to_download),
file_sizes={
tid: int(server_tables[tid].get("size_bytes") or 0)
for tid in to_download
},
)
def _download_one(tid: str) -> tuple[str, dict | None, str | None]:
"""Returns (tid, local_table_entry_or_None, error_or_None).
One bound thread per call; stream_download is sync I/O so a
ThreadPoolExecutor (not asyncio) is the right tool. The
progress callback is thread-safe Rich's Progress.update
holds an internal lock."""
and the textual fallback's lock both serialize internally."""
target = parquet_dir / f"{tid}.parquet"
expected_hash = server_tables[tid].get("hash", "")
cb = None
@ -262,6 +412,9 @@ def run_pull(
task_id = progress_tasks[tid]
def cb(n: int, _tid=tid, _task=task_id):
progress.update(_task, advance=n)
elif textual is not None:
def cb(n: int, _tid=tid):
textual.advance(_tid, n)
try:
stream_download(f"/api/data/{tid}/download", str(target),
progress_callback=cb)
@ -294,6 +447,8 @@ def run_pull(
finally:
if progress is not None:
progress.stop()
if textual is not None:
textual.finish()
for tid, entry, err in outcomes:
if err is not None:

123
tests/test_pull_progress.py Normal file
View file

@ -0,0 +1,123 @@
"""Tests for `agnes pull` progress UX (Change 3).
The Rich progress bar handles the TTY case fine, but Claude Code's
SessionStart context and any hook running `agnes pull` non-interactively
has stderr connected to a pipe, not a TTY. In that case Rich either
suppresses output entirely or emits raw ANSI noise into the consumer's
log. Goal: when the caller asks for progress and stderr is not a TTY,
emit a plain-text per-10%-or-30s update so the operator gets *some*
signal instead of multi-minute silence.
"""
from __future__ import annotations
import io
import time
from pathlib import Path
from unittest.mock import MagicMock
import pytest
@pytest.fixture(autouse=True)
def _isolate_config_dir(tmp_path, monkeypatch):
cfg = tmp_path / "_cfg"
cfg.mkdir()
monkeypatch.setenv("AGNES_CONFIG_DIR", str(cfg))
@pytest.fixture
def fake_pull_io(monkeypatch):
"""Stub the manifest + memory + download endpoints so run_pull can
execute end-to-end with a fake parquet write per table."""
canned_manifest = {
"tables": {
"tbl_big": {"hash": "h1", "rows": 0, "size_bytes": 1_000_000},
},
}
canned_memory = {"mandatory": [], "approved": []}
def _api_get(path, *args, **kwargs):
resp = MagicMock()
resp.status_code = 200
if path == "/api/sync/manifest":
resp.json.return_value = canned_manifest
elif path == "/api/memory/bundle":
resp.json.return_value = canned_memory
resp.raise_for_status = lambda: None
return resp
def _stream_download(path, target_path, progress_callback=None):
# Simulate a chunked download: emit progress in 4 increments
# totaling the announced size.
total = 1_000_000
slices = [total // 4] * 3 + [total - 3 * (total // 4)]
Path(target_path).write_bytes(b"PAR1" + b"\x00" * 1000 + b"PAR1")
if progress_callback:
for s in slices:
progress_callback(s)
return total
monkeypatch.setattr("cli.lib.pull.api_get", _api_get, raising=False)
monkeypatch.setattr("cli.lib.pull.stream_download", _stream_download,
raising=False)
monkeypatch.setattr("cli.lib.pull._is_valid_parquet", lambda p: True,
raising=False)
monkeypatch.setattr("cli.lib.pull._file_md5", lambda p: "h1", raising=False)
def test_textual_progress_when_stderr_is_not_tty(
tmp_path, fake_pull_io, monkeypatch, capsys,
):
"""Non-TTY stderr → emit a plain-text progress line per file."""
# Force the non-TTY branch even if pytest's fake stderr is a tty.
monkeypatch.setattr("sys.stderr.isatty", lambda: False, raising=False)
from cli.lib.pull import run_pull
result = run_pull(
server_url="http://x", token="t", workspace=tmp_path,
show_progress=True,
)
captured = capsys.readouterr()
# Some indication of the file + bytes ran; we don't pin exact format.
assert "tbl_big" in captured.err
assert result.tables_updated == 1
# No raw ANSI escape sequences in the textual fallback.
assert "\x1b[" not in captured.err.split("tbl_big")[0]
def test_no_progress_output_when_show_progress_is_false(
tmp_path, fake_pull_io, monkeypatch, capsys,
):
"""`show_progress=False` (the SessionStart hook path) emits no
progress text on stderr in either TTY or non-TTY mode."""
monkeypatch.setattr("sys.stderr.isatty", lambda: False, raising=False)
from cli.lib.pull import run_pull
run_pull(
server_url="http://x", token="t", workspace=tmp_path,
show_progress=False,
)
captured = capsys.readouterr()
assert "tbl_big" not in captured.err
def test_textual_progress_emits_at_completion(
tmp_path, fake_pull_io, monkeypatch, capsys,
):
"""At least one final completion line gets emitted per file even if
the throttle window doesn't trigger mid-file."""
monkeypatch.setattr("sys.stderr.isatty", lambda: False, raising=False)
from cli.lib.pull import run_pull
run_pull(
server_url="http://x", token="t", workspace=tmp_path,
show_progress=True,
)
captured = capsys.readouterr()
# Final line marks the file as done — either "100%" or a "✓ tbl_big" /
# "tbl_big done" indicator. We accept any final-completion form.
assert (
"100%" in captured.err
or "done" in captured.err.lower()
or "complete" in captured.err.lower()
)