From bd1b5ad444af7b05344dfaf0c66a75d72be3bfc1 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Wed, 6 May 2026 13:06:36 +0200 Subject: [PATCH] perf(cli): persistent HTTP/2 client across pull invocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pool the httpx.Client used by `stream_download` so N parquet downloads share a single TLS handshake instead of one handshake each. With the optional `h2` package installed, HTTP/2 multiplexing further lets all chunk Range requests share a single TCP connection — synergizes with the range-chunked download path added in the previous commit. The shared client is created lazily on first stream-download call, kept alive for the duration of the process via a module-level slot, and closed at exit via `atexit.register`. Construction wraps in a try/except: when `h2` is unavailable (slim install), httpx raises ImportError on `http2=True` and we transparently fall back to an HTTP/1.1 client — pooling alone still amortizes TLS handshakes. `agnes pull` must never crash on a missing optional package, so the fallback path is non-negotiable. `h2>=4.1.0` is added to the core dependency set; downstream slim installs that drop it lose the HTTP/2 benefit but keep correctness. --- CHANGELOG.md | 10 ++++ cli/client.py | 98 +++++++++++++++++++++++++++++++- pyproject.toml | 7 ++- tests/test_pull_chunked.py | 34 ++++++----- tests/test_pull_shared_client.py | 85 +++++++++++++++++++++++++++ 5 files changed, 216 insertions(+), 18 deletions(-) create mode 100644 tests/test_pull_shared_client.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 489b779..0979a40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,16 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C `accept-ranges: bytes` is advertised, or when content is below the threshold — no behavior change in the small-file / non-cooperating- server cases. +- **Persistent HTTP/2 client across `agnes pull`**: `stream_download` now + routes through a process-wide pooled `httpx.Client` so N parquet + downloads share a single TLS handshake; HTTP/2 multiplexing + (when the optional `h2` package is installed) lets all chunk Range + requests share one TCP connection. Gracefully falls back to HTTP/1.1 + pooling when `h2` is missing — no crash, just slightly less benefit. + +### Added +- New optional dependency `h2>=4.1.0` (HTTP/2 transport for httpx). Pure + performance — `agnes pull` works on HTTP/1.1 if the install skips it. ## [0.38.0] — 2026-05-06 diff --git a/cli/client.py b/cli/client.py index 6ac68ce..d8ff1af 100644 --- a/cli/client.py +++ b/cli/client.py @@ -1,6 +1,8 @@ """HTTP client wrapper for CLI — handles auth, retries, streaming.""" +import atexit import os +import threading import time import traceback from concurrent.futures import ThreadPoolExecutor, as_completed @@ -156,7 +158,13 @@ def _translate_transport_error( def get_client(timeout: float = 30.0) -> httpx.Client: - """Get an authenticated httpx client.""" + """Get an authenticated httpx client. + + This factory creates a fresh client per call — used by the small + `api_*` helpers (one request, then close). The big-stream path + (`stream_download`) routes through `_get_shared_client()` to amortize + TLS handshakes and HTTP/2 multiplexing across N parquet downloads. + """ token = get_token() headers = {} if token: @@ -168,6 +176,80 @@ def get_client(timeout: float = 30.0) -> httpx.Client: ) +# ── Shared persistent client ──────────────────────────────────────────── +# `agnes pull` issues N stream_download calls — one per parquet — plus +# (with chunked downloads) M Range requests per file. Without pooling, +# each call performs a fresh TLS handshake; with HTTP/2 enabled, all +# those requests multiplex over a single TCP connection. The shared +# client is created lazily on first stream-download request, kept alive +# for the duration of the process, and closed at exit. +# +# HTTP/2 requires the optional `h2` package. If it's unavailable (slim +# install), we fall back to HTTP/1.1 — pooling alone still saves the +# handshake cost — and never raise. The CLI must not crash on `agnes +# pull` because of an h2 import error. + +_SHARED_CLIENT: Optional[httpx.Client] = None +_SHARED_CLIENT_LOCK = threading.Lock() + + +def _get_shared_client() -> httpx.Client: + """Lazily create + return a process-wide httpx.Client. + + Pool defaults: keep up to 32 keepalive connections (covers the + chunk-parallelism cap of 16 × 2 simultaneous files comfortably) and + cap the total at 64 so a runaway loop can't open thousands of + sockets. HTTP/2 is opt-in via httpx's `http2=True` and gracefully + degrades when the `h2` extra is missing. + """ + global _SHARED_CLIENT + with _SHARED_CLIENT_LOCK: + if _SHARED_CLIENT is not None: + return _SHARED_CLIENT + token = get_token() + headers = {} + if token: + headers["Authorization"] = f"Bearer {token}" + limits = httpx.Limits( + max_keepalive_connections=32, + max_connections=64, + ) + try: + client = httpx.Client( + base_url=get_server_url(), + headers=headers, + timeout=300.0, + http2=True, + limits=limits, + ) + except (ImportError, RuntimeError): + # `h2` not installed → httpx raises; fall back to HTTP/1.1. + # Pooling alone still amortizes the TLS handshake. + client = httpx.Client( + base_url=get_server_url(), + headers=headers, + timeout=300.0, + limits=limits, + ) + _SHARED_CLIENT = client + return client + + +def _close_shared_client() -> None: + """Close the shared client and clear the slot. Safe to call twice.""" + global _SHARED_CLIENT + with _SHARED_CLIENT_LOCK: + if _SHARED_CLIENT is not None: + try: + _SHARED_CLIENT.close() + except Exception: + pass + _SHARED_CLIENT = None + + +atexit.register(_close_shared_client) + + def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: try: with get_client(timeout=timeout) as client: @@ -438,10 +520,20 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int: Threading: the chunked path uses a ThreadPoolExecutor sized to the parallelism. httpx.Client.stream() is safe to call concurrently from multiple threads on a single client (the connection pool serializes - the underlying socket access). + the underlying socket access; HTTP/2 multiplexes streams when the + `h2` extra is installed). """ - with get_client(timeout=300.0) as client: + # Use the shared persistent client when available — one TLS + # handshake amortized across N stream_download calls within the same + # process, and HTTP/2 stream multiplexing across the chunk Range + # requests within a single download. Falls back to a fresh per-call + # client if shared-client construction fails for any reason. + try: + client = _get_shared_client() return _stream_download_via(client, path, target_path, progress_callback) + except Exception: + with get_client(timeout=300.0) as client: + return _stream_download_via(client, path, target_path, progress_callback) def _stream_download_via( diff --git a/pyproject.toml b/pyproject.toml index 40106fe..84b8a8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,8 +20,13 @@ dependencies = [ "itsdangerous>=2.1.0", "authlib>=1.6.11", "argon2-cffi>=23.1.0", - # HTTP client + # HTTP client. `h2` enables HTTP/2 multiplexing for the persistent + # CLI client used by `agnes pull` (one TCP connection serves N + # concurrent parquet streams + range chunks). `cli/client.py` + # gracefully falls back to HTTP/1.1 if h2 is missing, so this + # extra is for performance, not correctness. "httpx>=0.27.0", + "h2>=4.1.0", # CLI "typer>=0.12.0", "rich>=13.0.0", diff --git a/tests/test_pull_chunked.py b/tests/test_pull_chunked.py index f5ff887..3629688 100644 --- a/tests/test_pull_chunked.py +++ b/tests/test_pull_chunked.py @@ -153,13 +153,10 @@ def _isolate_config_dir(tmp_path, monkeypatch): @pytest.fixture(autouse=True) def _reset_shared_client(monkeypatch): - """Make sure no shared persistent client leaks between tests. - - The shared persistent client is introduced in Change 2 (separate - commit). When this fixture runs against the post-Change-2 tree, it - reaches the module attribute; under Change 1 alone the attribute - doesn't exist yet, so we tolerate that. - """ + """Reset the persistent shared httpx.Client between tests so each + test starts from a known state. Tests that need to inject a fake + client also stub `_get_shared_client` directly via the + `_inject_fake_client` helper below.""" import cli.client as cc if hasattr(cc, "_SHARED_CLIENT"): monkeypatch.setattr(cc, "_SHARED_CLIENT", None, raising=False) @@ -168,6 +165,15 @@ def _reset_shared_client(monkeypatch): monkeypatch.setattr(cc, "_SHARED_CLIENT", None, raising=False) +def _inject_fake_client(monkeypatch, fake): + """Patch both client factories to return the same fake. Tests target + `_get_shared_client` (the path stream_download actually takes) and + also `get_client` so the fallback path also lands on the fake.""" + monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + monkeypatch.setattr("cli.client._get_shared_client", + lambda: fake, raising=False) + + # ── Tests ─────────────────────────────────────────────────────────────── def test_chunked_download_success(tmp_path, monkeypatch): @@ -179,7 +185,7 @@ def test_chunked_download_success(tmp_path, monkeypatch): monkeypatch.setenv("AGNES_PULL_CHUNK_PARALLELISM", "4") fake = _FakeClient(body=body, accept_ranges=True) - monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + _inject_fake_client(monkeypatch, fake) from cli.client import stream_download target = tmp_path / "out.parquet" @@ -213,7 +219,7 @@ def test_chunked_download_fallback_when_server_ignores_range( # with the full body — that's the "server ignored Range" path. fake = _FakeClient(body=body, accept_ranges=True, reject_range_with_200=True) - monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + _inject_fake_client(monkeypatch, fake) from cli.client import stream_download target = tmp_path / "out.bin" @@ -233,7 +239,7 @@ def test_small_file_uses_single_stream_path(tmp_path, monkeypatch): monkeypatch.setenv("AGNES_PULL_CHUNK_PARALLELISM", "4") fake = _FakeClient(body=body, accept_ranges=True) - monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + _inject_fake_client(monkeypatch, fake) from cli.client import stream_download target = tmp_path / "out.bin" @@ -253,7 +259,7 @@ def test_chunked_download_no_accept_ranges_falls_back(tmp_path, monkeypatch): monkeypatch.setenv("AGNES_PULL_CHUNK_PARALLELISM", "4") fake = _FakeClient(body=body, accept_ranges=False) - monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + _inject_fake_client(monkeypatch, fake) from cli.client import stream_download target = tmp_path / "out.bin" @@ -276,7 +282,7 @@ def test_chunked_download_one_chunk_retries_then_succeeds( fake = _FakeClient(body=body, accept_ranges=True, fail_chunk_indices=(1,)) # second chunk blips once - monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + _inject_fake_client(monkeypatch, fake) from cli.client import stream_download target = tmp_path / "out.bin" @@ -312,7 +318,7 @@ def test_chunked_download_failure_cleans_up_part_files(tmp_path, monkeypatch): return super().stream(method, path, headers=headers, **kwargs) fake = _ChronicFail(body=body, accept_ranges=True) - monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + _inject_fake_client(monkeypatch, fake) from cli.client import stream_download target = tmp_path / "out.bin" @@ -333,7 +339,7 @@ def test_progress_callback_aggregates_across_chunks(tmp_path, monkeypatch): monkeypatch.setenv("AGNES_PULL_CHUNK_PARALLELISM", "4") fake = _FakeClient(body=body, accept_ranges=True) - monkeypatch.setattr("cli.client.get_client", lambda timeout=300.0: fake) + _inject_fake_client(monkeypatch, fake) from cli.client import stream_download target = tmp_path / "out.bin" diff --git a/tests/test_pull_shared_client.py b/tests/test_pull_shared_client.py new file mode 100644 index 0000000..113ed3a --- /dev/null +++ b/tests/test_pull_shared_client.py @@ -0,0 +1,85 @@ +"""Tests for the persistent HTTP/2-capable shared client (Change 2). + +`agnes pull` issues N stream_download calls — one per parquet. Without +pooling, each call performs a fresh TLS handshake. The shared client is +created lazily once per process and closed at exit; HTTP/2 (when `h2` is +available) further multiplexes all chunk Range requests over a single +TCP connection. +""" + +from __future__ import annotations + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_config_dir(tmp_path, monkeypatch): + cfg = tmp_path / "_cfg" + cfg.mkdir() + monkeypatch.setenv("AGNES_CONFIG_DIR", str(cfg)) + # Some dev environments point SSL_CERT_FILE / REQUESTS_CA_BUNDLE at a + # corp-CA bundle that may not exist on every laptop running the test + # suite. Clear those so httpx.Client() construction in the shared- + # client path can build a default SSL context without trying to load + # a missing PEM file. + for var in ("SSL_CERT_FILE", "REQUESTS_CA_BUNDLE", "CURL_CA_BUNDLE"): + monkeypatch.delenv(var, raising=False) + + +@pytest.fixture(autouse=True) +def _reset_shared(monkeypatch): + import cli.client as cc + cc._close_shared_client() + monkeypatch.setattr(cc, "_SHARED_CLIENT", None, raising=False) + yield + cc._close_shared_client() + + +def test_get_shared_client_is_cached(monkeypatch): + """Multiple calls return the same client instance — no fresh TLS + handshake per stream_download invocation.""" + monkeypatch.setenv("AGNES_SERVER", "https://x.example.test") + from cli.client import _get_shared_client + c1 = _get_shared_client() + c2 = _get_shared_client() + assert c1 is c2, "shared client must be a single instance" + + +def test_get_shared_client_falls_back_when_http2_unavailable(monkeypatch): + """If httpx raises during HTTP/2 client construction (e.g. `h2` not + installed in the runtime env), we must gracefully build a HTTP/1.1 + client instead of crashing the pull.""" + import httpx + + monkeypatch.setenv("AGNES_SERVER", "https://x.example.test") + import cli.client as cc + + real_client = httpx.Client + + construction_calls = [] + + def fake_client(*args, **kwargs): + construction_calls.append(kwargs.copy()) + if kwargs.get("http2") is True: + raise ImportError("Using http2=True, but the 'h2' package is not installed") + return real_client(*args, **kwargs) + + monkeypatch.setattr(httpx, "Client", fake_client) + + client = cc._get_shared_client() + assert client is not None + # Two construction attempts: first http2=True (raised), second falls + # back to HTTP/1.1 (no http2 kwarg). + assert construction_calls[0].get("http2") is True + assert construction_calls[1].get("http2") is None or construction_calls[1].get("http2") is False + cc._close_shared_client() + + +def test_close_shared_client_idempotent(monkeypatch): + """Calling close twice (once explicitly, once via atexit) must not + raise.""" + monkeypatch.setenv("AGNES_SERVER", "https://x.example.test") + from cli.client import _get_shared_client, _close_shared_client + _get_shared_client() + _close_shared_client() + _close_shared_client() # second close is a no-op