From e72ff259f947904241e4e7e52ef006198a9de7a7 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Wed, 6 May 2026 13:09:37 +0200 Subject: [PATCH] feat(pull): aggregated progress + non-TTY textual fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two improvements to `agnes pull` progress reporting: 1. **Aggregated per-file progress across chunked downloads**: the existing Rich progress bar already used one task per file, but the chunked-download contract (one file = N parallel chunk callbacks summing to file size) meant we needed to verify that all chunk threads advance the same task. They do — the per-file callback is constructed once per tid and routes every chunk's byte delta to the same task / textual entry, so the bar shows one aggregated bytes- downloaded total rather than N separate sub-bars. 2. **Textual fallback for non-TTY stderr**: when stderr is not a terminal (SessionStart hook, CI runner, Docker log capture), Rich either suppresses output (silent multi-minute pull on a 5 GB parquet) or emits raw control sequences. The new `_TextualProgress` helper instead emits one plain-text line per file at most every 10%-of-total-bytes or 30 s, plus a final `100% done` line per file. Format: `[N/T files] : 25% (16 MB / 66 MB) at 1.5 MB/s`. The TTY path is unchanged. Detection uses `sys.stderr.isatty()` — `show_progress=True` flips into the textual fallback when that returns False. `show_progress=False` (the SessionStart hook) still emits no progress text in either mode. --- CHANGELOG.md | 8 ++ cli/lib/pull.py | 171 ++++++++++++++++++++++++++++++++++-- tests/test_pull_progress.py | 123 ++++++++++++++++++++++++++ 3 files changed, 294 insertions(+), 8 deletions(-) create mode 100644 tests/test_pull_progress.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 0979a40..a14af7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,14 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Added - New optional dependency `h2>=4.1.0` (HTTP/2 transport for httpx). Pure performance — `agnes pull` works on HTTP/1.1 if the install skips it. +- **Textual progress fallback for non-TTY `agnes pull`**: when stderr is + not a terminal (Claude Code SessionStart hook, CI runner, Docker log + capture, …), `agnes pull --no-quiet` now emits a plain-text progress + line per file at most every 10% or 30 s, plus a final completion line. + Replaces the previous Rich-bar-on-pipe behavior that either suppressed + output entirely or leaked ANSI escape sequences. TTY path unchanged + (Rich progress bar with bytes / speed / ETA, aggregated per-file + across chunked-download chunks). ## [0.38.0] — 2026-05-06 diff --git a/cli/lib/pull.py b/cli/lib/pull.py index b33c4b3..59d4db9 100644 --- a/cli/lib/pull.py +++ b/cli/lib/pull.py @@ -65,6 +65,128 @@ class PullResult: _SAFE_ID_RE = re.compile(r"^[a-zA-Z0-9_\-]{1,128}$") +class _TextualProgress: + """Plain-text progress emitter for non-TTY stderr. + + When `agnes pull` is invoked from a Claude Code SessionStart hook, + a CI runner, or any pipe consumer, stderr is not a terminal. Rich's + progress bar in that mode either suppresses output (silent for + minutes on a multi-GB parquet) or emits raw ANSI noise. This class + instead emits one terse line per file at sensible cadence. + + Cadence policy: emit when *either*: + - per-file bytes-downloaded crosses a 10%-of-total boundary, OR + - 30 s have elapsed since this file's last emission. + + Always emits one final "done" line per file via `finish()` so the + operator sees a confirmed completion even on tiny files. + + Format: `[N/T files] : 25% (16 MB / 66 MB) at 1.5 MB/s` — the + "[N/T files]" prefix lets the operator see overall pull progress + in a multi-table run without buffering all per-file lines. + + Thread-safe — `advance` is called from the chunked-download worker + threads; an internal lock serializes the update + emit. + """ + + _HUMAN_UNITS = ( + (1024 * 1024 * 1024 * 1024, "TB"), + (1024 * 1024 * 1024, "GB"), + (1024 * 1024, "MB"), + (1024, "KB"), + ) + + def __init__(self, *, stream, total_files: int, file_sizes: dict[str, int]): + import threading + self._stream = stream + self._total_files = total_files + self._file_sizes = file_sizes + self._lock = threading.Lock() + # Per-file state. + self._bytes: dict[str, int] = {tid: 0 for tid in file_sizes} + self._started_at: dict[str, float] = {} + self._last_emit_at: dict[str, float] = {} + self._last_emit_pct: dict[str, int] = {} + self._finished_idx: int = 0 # files whose `finish` line has been emitted + + def advance(self, tid: str, n: int) -> None: + """Add `n` bytes to the file's total. Emit a textual update if + the cadence policy allows.""" + with self._lock: + now = time.monotonic() + if tid not in self._started_at: + self._started_at[tid] = now + self._last_emit_at[tid] = now + self._last_emit_pct[tid] = 0 + self._bytes[tid] = self._bytes.get(tid, 0) + n + + total = self._file_sizes.get(tid, 0) + current = self._bytes[tid] + pct = int((current * 100) / total) if total > 0 else 0 + elapsed = now - self._last_emit_at[tid] + crossed_10 = pct >= self._last_emit_pct[tid] + 10 + if crossed_10 or elapsed >= 30.0: + self._last_emit_at[tid] = now + self._last_emit_pct[tid] = pct - (pct % 10) + self._emit_line(tid, current, total, now) + + def finish(self) -> None: + """Emit a final `done` line for any file we never closed out.""" + with self._lock: + now = time.monotonic() + for tid, total in self._file_sizes.items(): + # Treat any file we observed bytes for as needing a + # final line. Files that errored out before any callback + # are still announced (operator wants visibility even on + # zero-byte attempts). + self._finished_idx += 1 + bytes_ = self._bytes.get(tid, 0) + started = self._started_at.get(tid, now) + duration = max(0.001, now - started) + rate = bytes_ / duration + line = ( + f"[{self._finished_idx}/{self._total_files} files] " + f"{tid}: 100% done " + f"({self._fmt_bytes(bytes_)} in {duration:.1f}s, " + f"{self._fmt_bytes(int(rate))}/s)\n" + ) + self._stream.write(line) + try: + self._stream.flush() + except Exception: + pass + + def _emit_line(self, tid: str, current: int, total: int, now: float) -> None: + started = self._started_at.get(tid, now) + duration = max(0.001, now - started) + rate = current / duration + if total > 0: + pct_str = f"{int((current * 100) / total)}%" + size_str = ( + f"({self._fmt_bytes(current)} / {self._fmt_bytes(total)})" + ) + else: + pct_str = "?" + size_str = f"({self._fmt_bytes(current)})" + idx = self._finished_idx + 1 # 1-based "currently working on file N" + line = ( + f"[{idx}/{self._total_files} files] {tid}: {pct_str} " + f"{size_str} at {self._fmt_bytes(int(rate))}/s\n" + ) + self._stream.write(line) + try: + self._stream.flush() + except Exception: + pass + + @classmethod + def _fmt_bytes(cls, n: int) -> str: + for divisor, suffix in cls._HUMAN_UNITS: + if n >= divisor: + return f"{n / divisor:.1f} {suffix}" + return f"{n} B" + + @contextmanager def _override_server_env(server_url: str, token: str) -> Iterator[None]: """Set AGNES_SERVER + scoped token override for the duration of the call. @@ -219,15 +341,34 @@ def run_pull( # the executor + thread overhead for the common single-update case. workers = min(workers, len(to_download)) if to_download else 1 - # Optional progress bar — Rich's Progress tracks per-file bytes - # streamed, aggregated across the parallel ThreadPoolExecutor - # workers. Pavel's #185 Phase 1: a single 6.3 GB parquet on first - # init went 44 minutes silent, looked frozen. Now: aggregate "X.Y - # GB / Z.A GB · 56 MB/s · ETA 1m 20s" to stderr while threads - # stream. None when show_progress=False (SessionStart hooks etc.). + # Optional progress reporting — two paths. + # + # 1. Rich progress bar: per-file bytes-streamed bar with speed + + # ETA. Rendered to stderr when stderr is a TTY. Aggregates + # across the parallel ThreadPoolExecutor workers and across + # chunked-download chunks (all chunks call the same callback + # advancing the same task). + # 2. Textual fallback: when `show_progress=True` but stderr is + # NOT a TTY (Claude Code SessionStart hook, CI run, Docker + # log capture), Rich would either suppress the bar or emit + # raw control sequences. Instead we emit one plain-text line + # per file at most every 10% or 30 s — enough signal to know + # the pull isn't frozen on a multi-GB parquet, terse enough + # not to spam the consumer's log. + # + # Both paths receive the same per-file callback so the chunked- + # download contract ("one file = one task, sum-of-chunks bytes") + # is honored uniformly. + import sys as _sys progress = None progress_tasks: dict[str, int] = {} - if show_progress and to_download: + textual = None + use_textual_fallback = ( + show_progress + and to_download + and not _sys.stderr.isatty() + ) + if show_progress and to_download and not use_textual_fallback: from rich.progress import ( Progress, BarColumn, DownloadColumn, TextColumn, TimeRemainingColumn, TransferSpeedColumn, @@ -248,13 +389,22 @@ def run_pull( progress_tasks[tid] = progress.add_task( "download", label=tid, total=size if size > 0 else None, ) + elif use_textual_fallback: + textual = _TextualProgress( + stream=_sys.stderr, + total_files=len(to_download), + file_sizes={ + tid: int(server_tables[tid].get("size_bytes") or 0) + for tid in to_download + }, + ) def _download_one(tid: str) -> tuple[str, dict | None, str | None]: """Returns (tid, local_table_entry_or_None, error_or_None). One bound thread per call; stream_download is sync I/O so a ThreadPoolExecutor (not asyncio) is the right tool. The progress callback is thread-safe — Rich's Progress.update - holds an internal lock.""" + and the textual fallback's lock both serialize internally.""" target = parquet_dir / f"{tid}.parquet" expected_hash = server_tables[tid].get("hash", "") cb = None @@ -262,6 +412,9 @@ def run_pull( task_id = progress_tasks[tid] def cb(n: int, _tid=tid, _task=task_id): progress.update(_task, advance=n) + elif textual is not None: + def cb(n: int, _tid=tid): + textual.advance(_tid, n) try: stream_download(f"/api/data/{tid}/download", str(target), progress_callback=cb) @@ -294,6 +447,8 @@ def run_pull( finally: if progress is not None: progress.stop() + if textual is not None: + textual.finish() for tid, entry, err in outcomes: if err is not None: diff --git a/tests/test_pull_progress.py b/tests/test_pull_progress.py new file mode 100644 index 0000000..fe31c72 --- /dev/null +++ b/tests/test_pull_progress.py @@ -0,0 +1,123 @@ +"""Tests for `agnes pull` progress UX (Change 3). + +The Rich progress bar handles the TTY case fine, but Claude Code's +SessionStart context — and any hook running `agnes pull` non-interactively — +has stderr connected to a pipe, not a TTY. In that case Rich either +suppresses output entirely or emits raw ANSI noise into the consumer's +log. Goal: when the caller asks for progress and stderr is not a TTY, +emit a plain-text per-10%-or-30s update so the operator gets *some* +signal instead of multi-minute silence. +""" + +from __future__ import annotations + +import io +import time +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_config_dir(tmp_path, monkeypatch): + cfg = tmp_path / "_cfg" + cfg.mkdir() + monkeypatch.setenv("AGNES_CONFIG_DIR", str(cfg)) + + +@pytest.fixture +def fake_pull_io(monkeypatch): + """Stub the manifest + memory + download endpoints so run_pull can + execute end-to-end with a fake parquet write per table.""" + canned_manifest = { + "tables": { + "tbl_big": {"hash": "h1", "rows": 0, "size_bytes": 1_000_000}, + }, + } + canned_memory = {"mandatory": [], "approved": []} + + def _api_get(path, *args, **kwargs): + resp = MagicMock() + resp.status_code = 200 + if path == "/api/sync/manifest": + resp.json.return_value = canned_manifest + elif path == "/api/memory/bundle": + resp.json.return_value = canned_memory + resp.raise_for_status = lambda: None + return resp + + def _stream_download(path, target_path, progress_callback=None): + # Simulate a chunked download: emit progress in 4 increments + # totaling the announced size. + total = 1_000_000 + slices = [total // 4] * 3 + [total - 3 * (total // 4)] + Path(target_path).write_bytes(b"PAR1" + b"\x00" * 1000 + b"PAR1") + if progress_callback: + for s in slices: + progress_callback(s) + return total + + monkeypatch.setattr("cli.lib.pull.api_get", _api_get, raising=False) + monkeypatch.setattr("cli.lib.pull.stream_download", _stream_download, + raising=False) + monkeypatch.setattr("cli.lib.pull._is_valid_parquet", lambda p: True, + raising=False) + monkeypatch.setattr("cli.lib.pull._file_md5", lambda p: "h1", raising=False) + + +def test_textual_progress_when_stderr_is_not_tty( + tmp_path, fake_pull_io, monkeypatch, capsys, +): + """Non-TTY stderr → emit a plain-text progress line per file.""" + # Force the non-TTY branch even if pytest's fake stderr is a tty. + monkeypatch.setattr("sys.stderr.isatty", lambda: False, raising=False) + + from cli.lib.pull import run_pull + result = run_pull( + server_url="http://x", token="t", workspace=tmp_path, + show_progress=True, + ) + captured = capsys.readouterr() + # Some indication of the file + bytes ran; we don't pin exact format. + assert "tbl_big" in captured.err + assert result.tables_updated == 1 + # No raw ANSI escape sequences in the textual fallback. + assert "\x1b[" not in captured.err.split("tbl_big")[0] + + +def test_no_progress_output_when_show_progress_is_false( + tmp_path, fake_pull_io, monkeypatch, capsys, +): + """`show_progress=False` (the SessionStart hook path) emits no + progress text on stderr in either TTY or non-TTY mode.""" + monkeypatch.setattr("sys.stderr.isatty", lambda: False, raising=False) + + from cli.lib.pull import run_pull + run_pull( + server_url="http://x", token="t", workspace=tmp_path, + show_progress=False, + ) + captured = capsys.readouterr() + assert "tbl_big" not in captured.err + + +def test_textual_progress_emits_at_completion( + tmp_path, fake_pull_io, monkeypatch, capsys, +): + """At least one final completion line gets emitted per file even if + the throttle window doesn't trigger mid-file.""" + monkeypatch.setattr("sys.stderr.isatty", lambda: False, raising=False) + from cli.lib.pull import run_pull + run_pull( + server_url="http://x", token="t", workspace=tmp_path, + show_progress=True, + ) + captured = capsys.readouterr() + # Final line marks the file as done — either "100%" or a "✓ tbl_big" / + # "tbl_big done" indicator. We accept any final-completion form. + assert ( + "100%" in captured.err + or "done" in captured.err.lower() + or "complete" in captured.err.lower() + )