feat(pull): parallel parquet downloads (AGNES_PULL_PARALLELISM=4 default)
The download loop in cli/lib/pull.py was strictly serial — N tables took Σ stream_download(t_i). With the Caddy file_server change in this PR, the server can now sustain many parallel sendfile transfers without blocking app workers, so the client-side serialization became the new bottleneck. Switch to ThreadPoolExecutor capped by AGNES_PULL_PARALLELISM (default 4, set 1 to restore pre-PR serial). 4 matches typical home-broadband saturation without over-subscribing the analyst's NIC. Drops to serial when len(to_download) <= 1 to avoid executor overhead in the common single-table case. Per-table error semantics preserved via (tid, entry, err) tuple — a failure on one parquet doesn't abort the rest of the batch. Verified end-to-end against a dev VM with the new Caddy file_server deployed: 2-table pull through agnes CLI works under the new concurrency.
This commit is contained in:
parent
ab61e30c91
commit
2ae486bc5d
2 changed files with 45 additions and 8 deletions
|
|
@ -12,6 +12,7 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it.
|
- **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it.
|
||||||
|
- **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch.
|
||||||
- **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it.
|
- **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it.
|
||||||
- **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/<id>.parquet`, `keboola/data/<id>.parquet`, `jira/data/<id>.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged.
|
- **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/<id>.parquet`, `keboola/data/<id>.parquet`, `jira/data/<id>.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -178,11 +178,33 @@ def run_pull(
|
||||||
result.duration_s = time.monotonic() - started
|
result.duration_s = time.monotonic() - started
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# 4. Download parquets. Lazy mkdir: only create server/parquet/
|
# 4. Download parquets in parallel. Lazy mkdir: only create
|
||||||
# when we have at least one table to write into it.
|
# server/parquet/ when we have at least one table to write into it.
|
||||||
for tid in to_download:
|
# Concurrency capped by `AGNES_PULL_PARALLELISM` (default 4) so a
|
||||||
if not parquet_dir.exists():
|
# registry of 50+ tables doesn't open 50+ TCP connections + saturate
|
||||||
parquet_dir.mkdir(parents=True, exist_ok=True)
|
# the analyst's NIC; 4 matches typical home-broadband saturation
|
||||||
|
# without over-subscribing the server's caddy file_server (each
|
||||||
|
# request is a separate goroutine + sendfile, but the analyst's
|
||||||
|
# downlink is the more frequent bottleneck). Set to 1 to restore
|
||||||
|
# the pre-PR serial behavior for debug repro. The server-side
|
||||||
|
# bypass-uvicorn fix (Caddy file_server) is the other half —
|
||||||
|
# without it, parallel downloads would still queue on the single
|
||||||
|
# uvicorn worker.
|
||||||
|
if to_download and not parquet_dir.exists():
|
||||||
|
parquet_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
workers = max(1, int(os.environ.get("AGNES_PULL_PARALLELISM", "4")))
|
||||||
|
except ValueError:
|
||||||
|
workers = 4
|
||||||
|
# Drop to serial when there's only one (or zero) tables — avoids
|
||||||
|
# the executor + thread overhead for the common single-update case.
|
||||||
|
workers = min(workers, len(to_download)) if to_download else 1
|
||||||
|
|
||||||
|
def _download_one(tid: str) -> tuple[str, dict | None, str | None]:
|
||||||
|
"""Returns (tid, local_table_entry_or_None, error_or_None).
|
||||||
|
One bound thread per call; stream_download is sync I/O so a
|
||||||
|
ThreadPoolExecutor (not asyncio) is the right tool."""
|
||||||
target = parquet_dir / f"{tid}.parquet"
|
target = parquet_dir / f"{tid}.parquet"
|
||||||
expected_hash = server_tables[tid].get("hash", "")
|
expected_hash = server_tables[tid].get("hash", "")
|
||||||
try:
|
try:
|
||||||
|
|
@ -197,14 +219,28 @@ def run_pull(
|
||||||
elif not _is_valid_parquet(target):
|
elif not _is_valid_parquet(target):
|
||||||
target.unlink(missing_ok=True)
|
target.unlink(missing_ok=True)
|
||||||
raise ValueError("not a valid parquet (missing PAR1 magic)")
|
raise ValueError("not a valid parquet (missing PAR1 magic)")
|
||||||
local_tables[tid] = {
|
entry = {
|
||||||
"hash": expected_hash,
|
"hash": expected_hash,
|
||||||
"rows": server_tables[tid].get("rows", 0),
|
"rows": server_tables[tid].get("rows", 0),
|
||||||
"size_bytes": server_tables[tid].get("size_bytes", 0),
|
"size_bytes": server_tables[tid].get("size_bytes", 0),
|
||||||
}
|
}
|
||||||
result.tables_updated += 1
|
return tid, entry, None
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
result.errors.append({"table": tid, "error": str(exc)})
|
return tid, None, str(exc)
|
||||||
|
|
||||||
|
if workers <= 1:
|
||||||
|
outcomes = [_download_one(tid) for tid in to_download]
|
||||||
|
else:
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
with ThreadPoolExecutor(max_workers=workers) as ex:
|
||||||
|
outcomes = list(ex.map(_download_one, to_download))
|
||||||
|
|
||||||
|
for tid, entry, err in outcomes:
|
||||||
|
if err is not None:
|
||||||
|
result.errors.append({"table": tid, "error": err})
|
||||||
|
else:
|
||||||
|
local_tables[tid] = entry
|
||||||
|
result.tables_updated += 1
|
||||||
|
|
||||||
# 5. Persist sync state (only on real runs).
|
# 5. Persist sync state (only on real runs).
|
||||||
# TODO(workspace-scoped-sync-state): currently saved to
|
# TODO(workspace-scoped-sync-state): currently saved to
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue