diff --git a/CHANGELOG.md b/CHANGELOG.md index 406f6bb..b34d7ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Added - **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. +- **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch. - **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it. - **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/.parquet`, `keboola/data/.parquet`, `jira/data/.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged. diff --git a/cli/lib/pull.py b/cli/lib/pull.py index c2eadc5..5e6445f 100644 --- a/cli/lib/pull.py +++ b/cli/lib/pull.py @@ -178,11 +178,33 @@ def run_pull( result.duration_s = time.monotonic() - started return result - # 4. Download parquets. Lazy mkdir: only create server/parquet/ - # when we have at least one table to write into it. - for tid in to_download: - if not parquet_dir.exists(): - parquet_dir.mkdir(parents=True, exist_ok=True) + # 4. Download parquets in parallel. Lazy mkdir: only create + # server/parquet/ when we have at least one table to write into it. + # Concurrency capped by `AGNES_PULL_PARALLELISM` (default 4) so a + # registry of 50+ tables doesn't open 50+ TCP connections + saturate + # the analyst's NIC; 4 matches typical home-broadband saturation + # without over-subscribing the server's caddy file_server (each + # request is a separate goroutine + sendfile, but the analyst's + # downlink is the more frequent bottleneck). Set to 1 to restore + # the pre-PR serial behavior for debug repro. The server-side + # bypass-uvicorn fix (Caddy file_server) is the other half — + # without it, parallel downloads would still queue on the single + # uvicorn worker. + if to_download and not parquet_dir.exists(): + parquet_dir.mkdir(parents=True, exist_ok=True) + + try: + workers = max(1, int(os.environ.get("AGNES_PULL_PARALLELISM", "4"))) + except ValueError: + workers = 4 + # Drop to serial when there's only one (or zero) tables — avoids + # the executor + thread overhead for the common single-update case. + workers = min(workers, len(to_download)) if to_download else 1 + + def _download_one(tid: str) -> tuple[str, dict | None, str | None]: + """Returns (tid, local_table_entry_or_None, error_or_None). + One bound thread per call; stream_download is sync I/O so a + ThreadPoolExecutor (not asyncio) is the right tool.""" target = parquet_dir / f"{tid}.parquet" expected_hash = server_tables[tid].get("hash", "") try: @@ -197,14 +219,28 @@ def run_pull( elif not _is_valid_parquet(target): target.unlink(missing_ok=True) raise ValueError("not a valid parquet (missing PAR1 magic)") - local_tables[tid] = { + entry = { "hash": expected_hash, "rows": server_tables[tid].get("rows", 0), "size_bytes": server_tables[tid].get("size_bytes", 0), } - result.tables_updated += 1 + return tid, entry, None except Exception as exc: - result.errors.append({"table": tid, "error": str(exc)}) + return tid, None, str(exc) + + if workers <= 1: + outcomes = [_download_one(tid) for tid in to_download] + else: + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=workers) as ex: + outcomes = list(ex.map(_download_one, to_download)) + + for tid, entry, err in outcomes: + if err is not None: + result.errors.append({"table": tid, "error": err}) + else: + local_tables[tid] = entry + result.tables_updated += 1 # 5. Persist sync state (only on real runs). # TODO(workspace-scoped-sync-state): currently saved to