From 4f042355020d32b03643b9219090a23550b3a3b3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 12:29:57 +0200 Subject: [PATCH 01/12] feat(bigquery): bq_query_timeout_ms knob; default 600s (was 90s) DuckDB BigQuery extension defaults `bq_query_timeout_ms` to 90 s, which is too tight for analyst-scale queries against view-backed BQ datasets. `agnes query --remote` HTTP 400'd with `Binder Error: Query execution exceeded the timeout. Job ID: ...` whenever the underlying BQ job ran longer than 90 s, even though the job itself was healthy. Add `data_source.bigquery.query_timeout_ms` (default 600 000 ms = 10 min, sentinel 0 falls through to the extension default). Applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session: orchestrator's `_remote_attach` ATTACH path, BqAccess session factory, and the standalone extractor. Configurable via `/admin/server-config` UI. Fail-soft: extension versions that don't recognise the setting silently keep the default rather than poisoning the session. --- CHANGELOG.md | 3 + app/api/admin.py | 18 +++++ config/instance.yaml.example | 8 ++ connectors/bigquery/access.py | 37 ++++++++++ connectors/bigquery/extractor.py | 2 + src/orchestrator.py | 2 + tests/test_bq_query_timeout.py | 123 +++++++++++++++++++++++++++++++ 7 files changed, 193 insertions(+) create mode 100644 tests/test_bq_query_timeout.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8af7204..7d7a5f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Added +- **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. + ### Fixed - Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes: diff --git a/app/api/admin.py b/app/api/admin.py index 9355c6e..fe43805 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -285,6 +285,24 @@ _KNOWN_FIELDS: dict[str, dict[str, dict]] = { "`agnes snapshot create` suggestion. 0 disables the gate. Default 5368709120 = 5 GiB." ), }, + "query_timeout_ms": { + "kind": "int", + "default": 600000, + "hint": ( + "DuckDB BigQuery extension query timeout (milliseconds). Applied " + "via `SET bq_query_timeout_ms` after every `LOAD bigquery` on " + "every BQ-touching DuckDB session (orchestrator remote-view " + "ATTACH, BqAccess factory, standalone extractor). Extension " + "default is 90 000 ms = 90 s, which is too tight for analyst " + "queries against view-backed datasets — bumped to 600 000 ms = " + "10 min by default. Set 0 to fall through to the extension " + "default. Note: the underlying BQ jobs.query RPC caps the wait " + "at ~200 s per call; the extension polls on top, so the " + "effective ceiling is this value but each poll round-trip is " + "~200 s. DuckDB itself emits a warning when this is set above " + "~200 s — that warning is informational, not an error." + ), + }, }, }, "keboola": { diff --git a/config/instance.yaml.example b/config/instance.yaml.example index 82b8367..c836144 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -127,6 +127,14 @@ data_source: # # Dry-run check before running; exceeding -> registration / sync # # rejected. Default 10 GiB (10737418240). Set 0 to disable. # # null falls through to default. Configurable via /admin/server-config UI. + # query_timeout_ms: 600000 + # # DuckDB BigQuery extension query timeout (milliseconds). + # # Applied via `SET bq_query_timeout_ms` after every LOAD bigquery + # # on every BQ-touching DuckDB session. Extension default is + # # 90 000 ms = 90 s, which is too tight for analyst queries against + # # view-backed datasets -- bumped to 600 000 ms = 10 min by default. + # # Set 0 to fall through to the extension default. Configurable via + # # /admin/server-config UI. # --- OpenMetadata catalog (optional) --- # Enriches table and column metadata from OpenMetadata REST API. diff --git a/connectors/bigquery/access.py b/connectors/bigquery/access.py index 193fbbc..e46a877 100644 --- a/connectors/bigquery/access.py +++ b/connectors/bigquery/access.py @@ -232,11 +232,48 @@ def _default_duckdb_session_factory(projects: BqProjects): f"failed to install/load BigQuery DuckDB extension: {e}", details={"original": str(e)}, ) + apply_bq_session_settings(conn) yield conn finally: conn.close() +def apply_bq_session_settings(conn) -> None: + """Apply per-session DuckDB BigQuery-extension settings from instance config. + + Currently sets ``bq_query_timeout_ms`` from + ``data_source.bigquery.query_timeout_ms``. The extension default is 90 s, + which is too tight for analyst-scale queries against view-backed BQ + datasets — bumping the default to 600 s here. Sentinel ``0`` (or a + non-numeric / unparseable value) leaves the extension default in place. + + Call AFTER ``LOAD bigquery`` on every DuckDB session that touches BQ: + BqAccess's session factory, the standalone extractor in + ``connectors/bigquery/extractor.py``, and the orchestrator's + ``_remote_attach`` path in ``src/orchestrator.py``. + """ + try: + from app.instance_config import get_value + except Exception: + return + raw = get_value( + "data_source", "bigquery", "query_timeout_ms", default=600_000, + ) + try: + ms = int(raw) if raw is not None else 0 + except (TypeError, ValueError): + return + if ms <= 0: + return + try: + conn.execute(f"SET bq_query_timeout_ms = {int(ms)}") + except Exception: + # Fail-soft: extension version may not support the setting, or the + # session may already have been frozen — leave the default rather + # than poisoning the whole session. + pass + + class BqAccess: """Single entry point for BigQuery access. Stateless after construction. diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index c140dd6..d90005d 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -359,6 +359,8 @@ def _init_extract_locked( conn.execute( f"CREATE SECRET bq_session (TYPE bigquery, ACCESS_TOKEN '{escaped_token}')" ) + from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) conn.execute( f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)" ) diff --git a/src/orchestrator.py b/src/orchestrator.py index e003f41..914bcd6 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -502,6 +502,8 @@ class SyncOrchestrator: f"CREATE OR REPLACE SECRET {secret_name} " f"(TYPE bigquery, ACCESS_TOKEN '{escaped}')" ) + from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) conn.execute( f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" ) diff --git a/tests/test_bq_query_timeout.py b/tests/test_bq_query_timeout.py new file mode 100644 index 0000000..fa64e08 --- /dev/null +++ b/tests/test_bq_query_timeout.py @@ -0,0 +1,123 @@ +"""Unit tests for apply_bq_session_settings. + +Covers the data_source.bigquery.query_timeout_ms knob added so that +agnes query --remote no longer trips the DuckDB BigQuery extension's +built-in 90 s wait timeout when the underlying BQ job takes longer. +""" + +from unittest.mock import patch + +from connectors.bigquery.access import apply_bq_session_settings + + +class _RecordingConn: + """Minimal DuckDB-conn stand-in that records execute() calls. + + apply_bq_session_settings only calls .execute(); we don't need a + real DuckDB to verify the SET command shape. + """ + + def __init__(self, raise_on=None): + self.calls: list[str] = [] + self.raise_on = raise_on + + def execute(self, sql: str): + self.calls.append(sql) + if self.raise_on and self.raise_on in sql: + raise RuntimeError(f"simulated failure on: {sql}") + + +def _patched_get_value(value): + """Helper: build a patch target that returns *value* for the + data_source.bigquery.query_timeout_ms key and propagates the + `default=` kwarg for any other lookup so we don't accidentally + break tests that read other keys via the same module.""" + def fake(*keys, default=None): + if keys == ("data_source", "bigquery", "query_timeout_ms"): + return value + return default + return patch("app.instance_config.get_value", side_effect=fake) + + +def test_default_when_config_missing(): + """When get_value returns the default (None passed through, default arg + used), apply_bq_session_settings should fall back to the bumped + 600 000 ms default and emit the SET.""" + conn = _RecordingConn() + # Simulate get_value returning the default we passed (600_000) by + # echoing the default kwarg. + def fake(*keys, default=None): + return default + with patch("app.instance_config.get_value", side_effect=fake): + apply_bq_session_settings(conn) + assert conn.calls == ["SET bq_query_timeout_ms = 600000"] + + +def test_explicit_value(): + conn = _RecordingConn() + with _patched_get_value(900_000): + apply_bq_session_settings(conn) + assert conn.calls == ["SET bq_query_timeout_ms = 900000"] + + +def test_zero_sentinel_leaves_extension_default(): + """0 means 'use the DuckDB BQ extension's built-in default' — no SET + must be emitted so a non-zero default doesn't override an operator's + explicit opt-out.""" + conn = _RecordingConn() + with _patched_get_value(0): + apply_bq_session_settings(conn) + assert conn.calls == [] + + +def test_negative_value_treated_as_zero(): + """Negative is nonsensical for a timeout; treat as 'extension default' + rather than emitting a negative SET that the extension might reject + or interpret unexpectedly.""" + conn = _RecordingConn() + with _patched_get_value(-1): + apply_bq_session_settings(conn) + assert conn.calls == [] + + +def test_non_numeric_silently_skipped(): + """A string-typed YAML value (e.g. operator typo) shouldn't crash + the BQ session — fall through to the extension default.""" + conn = _RecordingConn() + with _patched_get_value("notanumber"): + apply_bq_session_settings(conn) + assert conn.calls == [] + + +def test_string_numeric_is_coerced(): + """YAML loaders sometimes deliver int-like values as strings; accept + those rather than failing.""" + conn = _RecordingConn() + with _patched_get_value("750000"): + apply_bq_session_settings(conn) + assert conn.calls == ["SET bq_query_timeout_ms = 750000"] + + +def test_set_failure_does_not_propagate(): + """Older DuckDB BQ extension versions may not recognise the setting. + The function must fail-soft so a session that was otherwise healthy + keeps working — just with the extension's built-in default timeout.""" + conn = _RecordingConn(raise_on="SET bq_query_timeout_ms") + with _patched_get_value(600_000): + # Must not raise. + apply_bq_session_settings(conn) + # The SET was attempted (recorded before the exception). + assert conn.calls == ["SET bq_query_timeout_ms = 600000"] + + +def test_no_app_config_module_silently_skipped(): + """Unit-test contexts that don't bring up the app config layer must + still be able to construct BQ sessions for narrow tests; an + ImportError on app.instance_config means we can't read the knob, + so we leave the extension default in place.""" + conn = _RecordingConn() + with patch.dict( + "sys.modules", {"app.instance_config": None}, + ): + apply_bq_session_settings(conn) + assert conn.calls == [] From 025a2b5c0e7b5d8bfcb0d47b2a35a10962fd8d77 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 16:27:48 +0200 Subject: [PATCH 02/12] fix(db): apply bq_query_timeout_ms to read-only reattach path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Devin Review on PR #181: caught that the original PR plumbed the new SET into the orchestrator's _remote_attach (rebuild path), the BqAccess factory (materialize path), and the standalone extractor — but missed the actual primary `agnes query --remote` request path: every read-only analytics-DB connection runs `_reattach_remote_extensions` in `src/db.py` on open, and that LOAD bigquery + ATTACH cycle was unconfigured. Without this commit, the very flow the PR was meant to fix — analyst queries hitting BQ views > 90s — would still 400 with the same Binder Error / Job ID wording, because the runtime LOAD bigquery happens here not in the orchestrator's rebuild path. Apply apply_bq_session_settings(conn) right after the BQ secret is created and before ATTACH, mirroring what every other PR site does. --- src/db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/db.py b/src/db.py index 27617d7..2591244 100644 --- a/src/db.py +++ b/src/db.py @@ -614,6 +614,8 @@ def _reattach_remote_extensions( f"CREATE OR REPLACE SECRET {secret_name} " f"(TYPE bigquery, ACCESS_TOKEN '{escaped}')" ) + from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) conn.execute( f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" ) From 1be997f6d48bbbd2bfda69496a2199e4f6414a56 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 14:15:08 +0200 Subject: [PATCH 03/12] =?UTF-8?q?feat(caddy):=20file=5Fserver=20for=20parq?= =?UTF-8?q?uet=20downloads=20=E2=80=94=20bypass=20uvicorn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A single analyst's multi-GB `agnes pull` held the only uvicorn worker for the duration of the stream, starving UI / /api/health / every other API endpoint. Container flipped to `unhealthy`. Triggered while a 6.8 GB `order_economics` pull was in-flight on prod 2026-05-05. Caddy now intercepts `GET /api/data/{table_id}/download` and serves the parquet directly via sendfile from the data volume (mounted r-o at /srv inside the caddy container). RBAC enforced by `forward_auth` to a new lightweight `GET /api/data/{table_id}/check-access` endpoint (returns 204 / 403) — the bulk transfer never reaches uvicorn. Path discovery via `try_files` over the known extract.duckdb v2 source subdirs. Anything not at a static path falls through to the existing app handler so legacy `src_data/parquet` and future connectors still work without a Caddyfile change. Non-Caddy deployments are unchanged. Stage 1 (multi-worker uvicorn) was considered but blocked by the single-writer DuckDB lock on system.duckdb — workers > 1 would crash at startup on "Could not set lock on file", the same race that pushed the scheduler from in-process writes to HTTP-via-app. Multi-reader workers + single-writer coordination is out of scope for this PR. --- CHANGELOG.md | 1 + Caddyfile | 41 +++++++++ app/api/data.py | 44 +++++++++- docker-compose.yml | 5 ++ tests/test_check_access_endpoint.py | 124 ++++++++++++++++++++++++++++ 5 files changed, 212 insertions(+), 3 deletions(-) create mode 100644 tests/test_check_access_endpoint.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d7a5f3..b23d870 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Added - **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. +- **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/.parquet`, `keboola/data/.parquet`, `jira/data/.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged. ### Fixed diff --git a/Caddyfile b/Caddyfile index 6085b6e..9a39211 100644 --- a/Caddyfile +++ b/Caddyfile @@ -34,6 +34,47 @@ -Server } + # Direct file_server for parquet downloads — bypasses uvicorn so a + # multi-GB pull from one analyst can't starve the app workers and + # block UI / health / API for everyone else. forward_auth calls the + # app's lightweight ``/api/data/{id}/check-access`` (RBAC only, + # ~1 ms) on every request; on 2xx Caddy serves the file directly + # via sendfile/zero-copy from the data volume mounted read-only. + # + # Path layout matches `app/api/data.py`'s extract.duckdb v2 search: + # /data/extracts//data/.parquet + # try_files probes known source subdirs in order; first hit wins. + # If a deployment adds a new connector and lands parquets at a fresh + # subdir, extend the try_files list. Anything that misses falls + # through to the app reverse_proxy below — so an unmapped source + # degrades to "downloads work, just through uvicorn" — never 404. + @download path_regexp tid ^/api/data/([^/]+)/download$ + handle @download { + forward_auth app:8000 { + uri /api/data/{re.tid.1}/check-access + # Bearer PAT or session cookie travels in Authorization + # / Cookie; copy_headers ensures the upstream sees them. + copy_headers Authorization Cookie + } + # Caddy's own /data is occupied by the caddy_data volume, so the + # agnes data dir is mounted at /srv (read-only) instead — see the + # `data:/srv:ro` line in docker-compose.yml's caddy service. The + # root + try_files combo therefore probes /srv/extracts/... + root * /srv/extracts + try_files /bigquery/data/{re.tid.1}.parquet /keboola/data/{re.tid.1}.parquet /jira/data/{re.tid.1}.parquet + @found file + handle @found { + header Content-Disposition "attachment; filename=\"{re.tid.1}.parquet\"" + file_server + } + # Fallback: parquet not at any known static path → defer to app + # (handles legacy src_data/parquet layout + future connectors). + reverse_proxy app:8000 { + header_up X-Forwarded-Proto https + header_up X-Forwarded-Host {host} + } + } + reverse_proxy app:8000 { # App's uvicorn runs with --proxy-headers, so stamping these # ourselves makes OAuth callback URLs and Set-Cookie Secure diff --git a/app/api/data.py b/app/api/data.py index 8cf26f2..f0044f5 100644 --- a/app/api/data.py +++ b/app/api/data.py @@ -1,6 +1,6 @@ """Data download endpoint — streaming parquet files.""" -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, Depends, HTTPException, Request, Response from fastapi.responses import FileResponse import duckdb @@ -12,6 +12,36 @@ from src.rbac import can_access_table router = APIRouter(prefix="/api/data", tags=["data"]) +@router.get("/{table_id}/check-access") +async def check_access( + table_id: str, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Lightweight RBAC probe used by Caddy's ``forward_auth`` directive + to gate file_server-served parquet downloads without involving the + app's request workers in the bulk byte transfer. + + Returns HTTP 204 No Content when the caller has read access to + ``table_id``; HTTP 403 (via ``can_access_table`` returning False) + otherwise. Caddy treats 2xx as authorized and forwards the request + to its own ``file_server`` block; non-2xx is returned to the client + verbatim. + + Why a separate endpoint and not just ``HEAD /download``: ``HEAD`` on + the FileResponse-based ``download`` handler still opens the file and + runs stat() to populate Content-Length / ETag. ``forward_auth`` calls + this endpoint on every request, so the per-call cost matters; a pure + RBAC check is ~1 ms while a HEAD path involves filesystem walks + (``rglob`` for the parquet across source subdirs). + """ + if not _SAFE_QUOTED_IDENTIFIER.match(table_id): + raise HTTPException(status_code=404, detail="Table not found") + if not can_access_table(user, table_id, conn): + raise HTTPException(status_code=403, detail="Access denied to this table") + return Response(status_code=204) + + @router.get("/{table_id}/download") async def download_table( table_id: str, @@ -19,7 +49,16 @@ async def download_table( user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Stream a parquet file for download. Supports ETag for caching.""" + """Stream a parquet file for download. Supports ETag for caching. + + On Caddy-fronted deployments the matching Caddyfile rule intercepts + ``GET /api/data/{table_id}/download``, calls ``check-access`` via + ``forward_auth``, and serves the parquet directly via ``file_server`` + — bypassing this handler entirely. This handler stays as the + canonical fallback for non-Caddy deployments (dev `docker compose + up`, alternative reverse proxies, direct :8000 access) where the + bulk transfer goes through uvicorn. + """ # Reject unsafe table_id before any filesystem or DB operations. # Use the relaxed quoted-identifier check that allows dots and hyphens # (Keboola table IDs like "in.c-crm.orders") while still blocking @@ -53,7 +92,6 @@ async def download_table( etag = f'"{stat.st_mtime_ns}"' if_none_match = request.headers.get("if-none-match") if if_none_match == etag: - from starlette.responses import Response return Response(status_code=304) return FileResponse( diff --git a/docker-compose.yml b/docker-compose.yml index 4240d28..59bacf2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -112,6 +112,11 @@ services: - /data/state/certs:/certs:ro - caddy_data:/data - caddy_config:/config + # Read-only mount of the agnes data dir so Caddy's file_server can + # serve parquets directly (sendfile/zero-copy) and bypass the app's + # uvicorn workers — see Caddyfile's @download handler. Mounted at + # /srv (not /data) because /data is already the caddy_data volume. + - data:/srv:ro environment: - DOMAIN=${DOMAIN:-localhost} # Passes through whatever the operator set in .env. Caddyfile uses diff --git a/tests/test_check_access_endpoint.py b/tests/test_check_access_endpoint.py new file mode 100644 index 0000000..0fa054b --- /dev/null +++ b/tests/test_check_access_endpoint.py @@ -0,0 +1,124 @@ +"""Unit tests for ``GET /api/data/{table_id}/check-access`` — the +lightweight RBAC probe used by Caddy's ``forward_auth`` directive to gate +file_server-served parquet downloads without involving the app's request +workers in the bulk byte transfer. + +The endpoint must: + - return 204 when the caller has read access (admin → always; non-admin + only with an explicit ``resource_grants`` row), + - return 403 with no body / minimal body when the caller does not, + - return 404 for unsafe identifiers (path-traversal guard), + - return 401 when the request has no auth. +""" + +from tests.conftest import create_mock_extract + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +def test_admin_gets_204(seeded_app): + """Admin short-circuits all RBAC checks — must always succeed.""" + c = seeded_app["client"] + env = seeded_app["env"] + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + c.post( + "/api/admin/register-table", + json={"name": "salaries", "source_type": "keboola"}, + headers=_auth(seeded_app["admin_token"]), + ) + + resp = c.get( + "/api/data/salaries/check-access", + headers=_auth(seeded_app["admin_token"]), + ) + assert resp.status_code == 204 + assert resp.content == b"" + + +def test_analyst_without_grant_gets_403(seeded_app): + """Non-admin without an explicit `resource_grants` row must be denied + — the production failure mode where Caddy's forward_auth returns the + 403 to the client and never invokes file_server.""" + c = seeded_app["client"] + env = seeded_app["env"] + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + c.post( + "/api/admin/register-table", + json={"name": "salaries", "source_type": "keboola"}, + headers=_auth(seeded_app["admin_token"]), + ) + + resp = c.get( + "/api/data/salaries/check-access", + headers=_auth(seeded_app["analyst_token"]), + ) + assert resp.status_code == 403 + + +def test_analyst_with_grant_gets_204(seeded_app): + """Once the analyst has a TABLE grant, check-access flips to 204 + and Caddy is free to serve the file directly. Mirrors the same + grant flow used by ``/api/data/{id}/download``.""" + c = seeded_app["client"] + env = seeded_app["env"] + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + c.post( + "/api/admin/register-table", + json={"name": "salaries", "source_type": "keboola"}, + headers=_auth(seeded_app["admin_token"]), + ) + + # Mint the grant via the admin API the same way the existing download + # access-control tests do — see test_access_control.py. + from tests.test_access_control import _grant_table_to_analyst + from src.db import get_system_db + conn = get_system_db() + try: + _grant_table_to_analyst(conn, "salaries") + finally: + conn.close() + + resp = c.get( + "/api/data/salaries/check-access", + headers=_auth(seeded_app["analyst_token"]), + ) + assert resp.status_code == 204 + + +def test_unsafe_table_id_gets_404(seeded_app): + """Identifier validation runs BEFORE RBAC — keeps path-traversal + payloads (``../etc/passwd``) from reaching ``can_access_table`` and + matches the pre-existing behavior of ``/download``.""" + c = seeded_app["client"] + resp = c.get( + "/api/data/..%2Fetc%2Fpasswd/check-access", + headers=_auth(seeded_app["admin_token"]), + ) + # FastAPI's path converter rejects encoded slashes outright; either + # 404 from the validator or 404 from no-such-route is acceptable — + # both block the traversal. The point is no 5xx and no 204. + assert resp.status_code in (404, 422) + + +def test_no_auth_gets_401(seeded_app): + """Caddy will only call the auth-check endpoint when the client sent + credentials — but if a request slips through without them, the + endpoint must reject with 401 so Caddy returns 401 to the client + instead of falling through to file_server with no identity.""" + c = seeded_app["client"] + resp = c.get("/api/data/salaries/check-access") + assert resp.status_code == 401 From ab61e30c91680865c570d75aaf4b21d0175a3ef2 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 15:18:48 +0200 Subject: [PATCH 04/12] chore(auto-upgrade): re-fetch compose + Caddyfile, self-update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sibling change to the Caddy file_server PR (#182). Without this, existing long-uptime VMs would pull the new agnes image on auto-upgrade but keep their stale Caddyfile + docker-compose.yml — leaving the file_server route + the data:/srv:ro mount inert. Confirmed live 2026-05-05 when the file_server change merged in main but stayed unreachable on a running dev VM until /opt/agnes/* was scp'd by hand. agnes-auto-upgrade.sh now hashes the bind-mounted config files (Caddyfile + every docker-compose overlay) on every 5 min tick and triggers a `docker compose up -d` recreation when the hash drifts — same trigger path as an image-digest change. Fail-soft via the .new-then-mv pattern: a curl 404 / network blip leaves the existing file untouched. Self-update at the bottom of the script: re-fetch /usr/local/bin/agnes-auto-upgrade.sh itself so the very fix that watches config files lands on running VMs without a manual ssh-and- curl cycle. Otherwise we'd have a self-perpetuating "old script problem" — the watch-config logic never propagating to the VMs that need it. Operators no longer need to ssh + scp Caddyfile/compose changes. --- CHANGELOG.md | 1 + scripts/ops/agnes-auto-upgrade.sh | 66 ++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b23d870..406f6bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Added - **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. +- **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it. - **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/.parquet`, `keboola/data/.parquet`, `jira/data/.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged. ### Fixed diff --git a/scripts/ops/agnes-auto-upgrade.sh b/scripts/ops/agnes-auto-upgrade.sh index 537f2cf..2dbd922 100755 --- a/scripts/ops/agnes-auto-upgrade.sh +++ b/scripts/ops/agnes-auto-upgrade.sh @@ -72,10 +72,72 @@ fi BEFORE=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) docker compose "${COMPOSE_FILES[@]}" pull >/dev/null 2>&1 AFTER=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) -if [ "$BEFORE" != "$AFTER" ]; then - echo "$(date): new digest for $IMAGE — recreating containers" + +# Re-fetch the bind-mounted config files (compose overlays + Caddyfile) +# from the OSS main branch on every tick. Without this, an image-only +# change is fine, but a change to the Caddyfile or any compose overlay +# (e.g. a new bind mount, a route, an env_file path) only lands on VMs +# that get a fresh `startup.sh` boot — leaving long-uptime VMs running +# the new image against stale config. Confirmed live on 2026-05-05 +# when a Caddyfile change adding a `data:/srv:ro` mount + a new +# `forward_auth` + `file_server` route for parquet downloads landed +# in main but stayed inert on running VMs because auto-upgrade only +# watched image digests. +# +# Hash before/after to detect content drift; treat as "trigger recreate" +# alongside an image digest change. Atomic move-after-fetch guards +# against a partial download corrupting compose at the next docker +# action — `curl --fail` plus the `.new` rename means a 404 / network +# blip leaves the existing file untouched. +RAW_BASE="https://raw.githubusercontent.com/keboola/agnes-the-ai-analyst/main" +CONFIG_FILES=( + docker-compose.yml docker-compose.prod.yml docker-compose.host-mount.yml + docker-compose.tls.yml Caddyfile +) +hash_config_files() { + # Sort to keep hash stable across operator add/remove, missing files + # contribute the empty string (sha256 of "" is well-defined). Run + # from /opt/agnes to keep relative paths terse in the hash input. + ( cd /opt/agnes && for f in "${CONFIG_FILES[@]}"; do + sha256sum "$f" 2>/dev/null || printf 'missing %s\n' "$f" + done ) | sort | sha256sum | awk '{print $1}' +} +CONFIG_BEFORE=$(hash_config_files) +for f in "${CONFIG_FILES[@]}"; do + if curl -fsSL "$RAW_BASE/$f" -o "/opt/agnes/$f.new" 2>/dev/null; then + mv -f "/opt/agnes/$f.new" "/opt/agnes/$f" + else + rm -f "/opt/agnes/$f.new" + logger -t agnes-auto-upgrade "WARN: failed to fetch $f from $RAW_BASE — keeping existing /opt/agnes/$f" + fi +done +CONFIG_AFTER=$(hash_config_files) + +if [ "$BEFORE" != "$AFTER" ] || [ "$CONFIG_BEFORE" != "$CONFIG_AFTER" ]; then + REASON=() + [ "$BEFORE" != "$AFTER" ] && REASON+=("image digest") + [ "$CONFIG_BEFORE" != "$CONFIG_AFTER" ] && REASON+=("config files") + echo "$(date): change detected (${REASON[*]}) — recreating containers" # ${arr[@]+"${arr[@]}"} pattern: expands to nothing when array is # empty (vs. plain "${arr[@]}" which trips `set -u` on bash <4.4). docker compose "${COMPOSE_FILES[@]}" ${PROFILE_ARGS[@]+"${PROFILE_ARGS[@]}"} up -d docker image prune -f >/dev/null 2>&1 fi + +# Self-update: re-fetch *this* script too. Without this, the very fix +# that lets auto-upgrade watch config files would itself never land on +# running VMs — a self-perpetuating "old script" problem. Atomic via +# .new + mv; chmod preserved. The next tick (5 min later) runs the +# new logic. Skipping if curl fails leaves the existing script in place. +if curl -fsSL "$RAW_BASE/scripts/ops/agnes-auto-upgrade.sh" \ + -o /usr/local/bin/agnes-auto-upgrade.sh.new 2>/dev/null; then + if ! cmp -s /usr/local/bin/agnes-auto-upgrade.sh.new \ + /usr/local/bin/agnes-auto-upgrade.sh; then + chmod +x /usr/local/bin/agnes-auto-upgrade.sh.new + mv -f /usr/local/bin/agnes-auto-upgrade.sh.new \ + /usr/local/bin/agnes-auto-upgrade.sh + logger -t agnes-auto-upgrade "self-update: replaced /usr/local/bin/agnes-auto-upgrade.sh" + else + rm -f /usr/local/bin/agnes-auto-upgrade.sh.new + fi +fi From 2ae486bc5d8a59edcd1eef62ce588059c5563558 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 16:09:32 +0200 Subject: [PATCH 05/12] feat(pull): parallel parquet downloads (AGNES_PULL_PARALLELISM=4 default) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The download loop in cli/lib/pull.py was strictly serial — N tables took Σ stream_download(t_i). With the Caddy file_server change in this PR, the server can now sustain many parallel sendfile transfers without blocking app workers, so the client-side serialization became the new bottleneck. Switch to ThreadPoolExecutor capped by AGNES_PULL_PARALLELISM (default 4, set 1 to restore pre-PR serial). 4 matches typical home-broadband saturation without over-subscribing the analyst's NIC. Drops to serial when len(to_download) <= 1 to avoid executor overhead in the common single-table case. Per-table error semantics preserved via (tid, entry, err) tuple — a failure on one parquet doesn't abort the rest of the batch. Verified end-to-end against a dev VM with the new Caddy file_server deployed: 2-table pull through agnes CLI works under the new concurrency. --- CHANGELOG.md | 1 + cli/lib/pull.py | 52 +++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 406f6bb..b34d7ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Added - **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. +- **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch. - **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it. - **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/.parquet`, `keboola/data/.parquet`, `jira/data/.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged. diff --git a/cli/lib/pull.py b/cli/lib/pull.py index c2eadc5..5e6445f 100644 --- a/cli/lib/pull.py +++ b/cli/lib/pull.py @@ -178,11 +178,33 @@ def run_pull( result.duration_s = time.monotonic() - started return result - # 4. Download parquets. Lazy mkdir: only create server/parquet/ - # when we have at least one table to write into it. - for tid in to_download: - if not parquet_dir.exists(): - parquet_dir.mkdir(parents=True, exist_ok=True) + # 4. Download parquets in parallel. Lazy mkdir: only create + # server/parquet/ when we have at least one table to write into it. + # Concurrency capped by `AGNES_PULL_PARALLELISM` (default 4) so a + # registry of 50+ tables doesn't open 50+ TCP connections + saturate + # the analyst's NIC; 4 matches typical home-broadband saturation + # without over-subscribing the server's caddy file_server (each + # request is a separate goroutine + sendfile, but the analyst's + # downlink is the more frequent bottleneck). Set to 1 to restore + # the pre-PR serial behavior for debug repro. The server-side + # bypass-uvicorn fix (Caddy file_server) is the other half — + # without it, parallel downloads would still queue on the single + # uvicorn worker. + if to_download and not parquet_dir.exists(): + parquet_dir.mkdir(parents=True, exist_ok=True) + + try: + workers = max(1, int(os.environ.get("AGNES_PULL_PARALLELISM", "4"))) + except ValueError: + workers = 4 + # Drop to serial when there's only one (or zero) tables — avoids + # the executor + thread overhead for the common single-update case. + workers = min(workers, len(to_download)) if to_download else 1 + + def _download_one(tid: str) -> tuple[str, dict | None, str | None]: + """Returns (tid, local_table_entry_or_None, error_or_None). + One bound thread per call; stream_download is sync I/O so a + ThreadPoolExecutor (not asyncio) is the right tool.""" target = parquet_dir / f"{tid}.parquet" expected_hash = server_tables[tid].get("hash", "") try: @@ -197,14 +219,28 @@ def run_pull( elif not _is_valid_parquet(target): target.unlink(missing_ok=True) raise ValueError("not a valid parquet (missing PAR1 magic)") - local_tables[tid] = { + entry = { "hash": expected_hash, "rows": server_tables[tid].get("rows", 0), "size_bytes": server_tables[tid].get("size_bytes", 0), } - result.tables_updated += 1 + return tid, entry, None except Exception as exc: - result.errors.append({"table": tid, "error": str(exc)}) + return tid, None, str(exc) + + if workers <= 1: + outcomes = [_download_one(tid) for tid in to_download] + else: + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=workers) as ex: + outcomes = list(ex.map(_download_one, to_download)) + + for tid, entry, err in outcomes: + if err is not None: + result.errors.append({"table": tid, "error": err}) + else: + local_tables[tid] = entry + result.tables_updated += 1 # 5. Persist sync state (only on real runs). # TODO(workspace-scoped-sync-state): currently saved to From 30e81a15b99ac918599ddd90ae810e103b902e74 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 16:38:32 +0200 Subject: [PATCH 06/12] feat(workspace-prompt): decision tree + size-hint so analyst Claude gets it right first try MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three concrete changes addressing the "analyst Claude misuses the CLI" class of bugs (image.png table — issues #3, #5, plus the recurrent "how big is this table" guesswork): 1. config/claude_md_template.txt — the template agnes init writes to /CLAUDE.md. Surfaces every catalog-row field with a why, adds a query_mode-based decision tree, explicit --estimate scoping (snapshot create ONLY — was the #1 first-try error), an agnes fetch → agnes snapshot create rename note, and a 6-row failure-mode table that maps each common error wording to its right next step. 2. app/api/v2_catalog.py — populate rough_size_hint for local + materialized rows from the on-disk parquet size, bucketed small/medium/large/very_large. Was hardcoded null with a TODO; AI couldn't tell "is this 6.8 GB" without a failed --remote round-trip. 3. cli/update_check.py — the [update] banner survived the da→agnes rename and printed "[update] da X is out of date" on every command, training analysts to associate the binary with the old name. Verified by rendering the template against representative contexts (33/33 tests pass) and running every use case from the original screenshot through the real CLI against a dev VM. --- CHANGELOG.md | 5 +++ app/api/v2_catalog.py | 53 ++++++++++++++++++++++++++- cli/update_check.py | 2 +- config/claude_md_template.txt | 67 +++++++++++++++++++++++++++++------ 4 files changed, 114 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b34d7ae..89f00e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,11 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C - **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch. - **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it. - **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/.parquet`, `keboola/data/.parquet`, `jira/data/.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged. +- **Workspace prompt: decision tree, common-mistakes callout, failure-mode dictionary** in `config/claude_md_template.txt` (the template `agnes init` writes to `/CLAUDE.md`). Surfaces every catalog-row field analyst Claude should read before deciding which command to use (`query_mode`, `sql_flavor`, `where_examples`, `fetch_via`, `rough_size_hint`); explicitly binds `--estimate` to `agnes snapshot create` ONLY (was the most-failed first-try misuse — fails with `No such option: --estimate` on `agnes query`); calls out the `agnes fetch` → `agnes snapshot create` rename so stale-doc analysts don't run a non-command; documents the BQ permission model (server SA, not personal Google identity) and a 6-row failure-mode table mapping each common error wording to its cause + the right next step. +- **`rough_size_hint` populated for `local` + `materialized` catalog rows** in `GET /api/v2/catalog` (was hardcoded `null` with a "Task 8" TODO). Reads the parquet file size at `${DATA_DIR}/extracts//data/.parquet` and buckets into `small` (≤100 MiB), `medium` (≤1 GiB), `large` (≤10 GiB), `very_large` (>10 GiB). `remote` rows stay `null` for now (size requires a BQ INFORMATION_SCHEMA call; tracked separately). Lets analyst Claude pick `agnes snapshot create` over `agnes query --remote` by inspecting `agnes catalog --json` rather than discovering size empirically via a failed `--remote` round-trip. + +### Changed +- **CLI update-banner now says `agnes` instead of `da`** (`cli/update_check.py:format_outdated_notice`). The string `[update] da X is out of date` had survived the `da` → `agnes` CLI rename and was the most-visible stale identifier in the analyst-facing surface — every CLI command printed it on stderr when a newer wheel was available. ### Fixed diff --git a/app/api/v2_catalog.py b/app/api/v2_catalog.py index 426b320..a5b660e 100644 --- a/app/api/v2_catalog.py +++ b/app/api/v2_catalog.py @@ -2,10 +2,12 @@ from __future__ import annotations from datetime import datetime, timezone +from pathlib import Path from fastapi import APIRouter, Depends import duckdb from app.auth.dependencies import get_current_user, _get_db +from app.utils import get_data_dir as _get_data_dir from src.rbac import can_access_table from src.repositories.table_registry import TableRegistryRepository from app.api.v2_cache import TTLCache @@ -43,6 +45,52 @@ def _fetch_hint(table_id: str, source_type: str) -> str: return "already local — query directly via `agnes query`" +# Coarse size buckets for `rough_size_hint`. Boundaries chosen so an analyst +# Claude can decide tool by inspection: anything `large` or worse implies +# `agnes snapshot create` over `agnes query --remote`. Numbers reflect the +# default `bq_max_scan_bytes` 5 GiB ceiling — at "large" you're already at +# half the per-query gate and a naive `--remote` is likely to refuse. +_SIZE_BUCKETS = ( + (10 * 2**20, "small"), # ≤10 MiB + (100 * 2**20, "small"), # ≤100 MiB still small (analyst-laptop scale) + (1 * 2**30, "medium"), # ≤1 GiB + (10 * 2**30, "large"), # ≤10 GiB +) + + +def _bucket_size(byte_count: int) -> str: + for cap, label in _SIZE_BUCKETS: + if byte_count <= cap: + return label + return "very_large" + + +def _materialized_size_hint(table_id: str, source_type: str, query_mode: str) -> str | None: + """Return a rough size bucket for a row whose data is on the server's + local filesystem (any `query_mode` that produces a parquet — `local` and + `materialized`). Returns ``None`` for `remote` (size requires a BQ + INFORMATION_SCHEMA round-trip; tracked separately) and for tables whose + parquet hasn't been materialised yet so the AI gets ``null`` not a + misleading "small". + + Layout matches the v2 extract.duckdb contract: + ${DATA_DIR}/extracts//data/.parquet + """ + if query_mode == "remote": + return None + if not source_type: + return None + try: + path = Path(_get_data_dir()) / "extracts" / source_type / "data" / f"{table_id}.parquet" + if not path.exists(): + return None + return _bucket_size(path.stat().st_size) + except Exception: + # Filesystem stat() race / permissions / weird DATA_DIR — fall back + # to null rather than crash the whole catalog response. + return None + + def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: rows = _table_rows_cache.get(_TABLE_ROWS_KEY) if rows is None: @@ -66,7 +114,10 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: "sql_flavor": _flavor_for(r.get("source_type") or ""), "where_examples": _examples_for(r.get("source_type") or ""), "fetch_via": _fetch_hint(r["id"], r.get("source_type") or ""), - "rough_size_hint": None, # populated by Task 8 schema endpoint when called + "rough_size_hint": _materialized_size_hint( + r["id"], r.get("source_type") or "", + r.get("query_mode") or "local", + ), }) return { diff --git a/cli/update_check.py b/cli/update_check.py index d278218..90cb2f2 100644 --- a/cli/update_check.py +++ b/cli/update_check.py @@ -184,7 +184,7 @@ def format_outdated_notice(info: UpdateInfo) -> str: literal string "None" into a copy-pasteable command — drop the upgrade snippet in that case. """ - msg = f"[update] da {info.installed} is out of date — latest on this server is {info.latest}." + msg = f"[update] agnes {info.installed} is out of date — latest on this server is {info.latest}." if info.download_url: msg += f" Upgrade: uv tool install --force {info.download_url}" return msg diff --git a/config/claude_md_template.txt b/config/claude_md_template.txt index b3320df..31c78d3 100644 --- a/config/claude_md_template.txt +++ b/config/claude_md_template.txt @@ -58,15 +58,43 @@ Not every table is synced. Tables registered with `query_mode: "remote"` live in BigQuery, accessed server-side via DuckDB's BQ extension — no parquet on disk. Tables you don't see in `server/parquet/` may still be queryable. -### Discovery first +### Discovery first — read `agnes catalog --json` BEFORE every cross-table decision + +`agnes catalog --json` returns one row per table with these fields. Use them; don't guess: + +| Field | What it tells you | How to use it | +|---|---|---| +| `query_mode` | `local` (parquet on laptop) / `remote` (BQ on demand) / `materialized` (synced parquet of a BQ result) | Picks the tool — see decision tree below | +| `source_type` | `keboola` / `bigquery` / `jira` | Determines SQL dialect | +| `sql_flavor` | `duckdb` for local sources, `bigquery` for `--remote` queries on BQ rows | What syntax `--where` expects | +| `where_examples` | 1–3 example WHERE predicates that are valid for this table's dialect | Copy as starting point for `--where` | +| `fetch_via` | Pre-formatted `agnes snapshot create …` template for this table | The canonical "how do I get a slice of this table" command | +| `rough_size_hint` | Coarse size hint (`small` / `medium` / `large` or null when unknown) | Bigger than `medium` → never `agnes query --remote` without a tight `--where`; use `agnes snapshot create` | ``` -agnes catalog --json | jq '.[] | {name, source_type, query_mode}' # see all tables + their modes -agnes schema # columns + types -agnes describe
-n 5 # sample rows +agnes catalog --json # full structured view (use this in scripts) +agnes catalog # human-readable summary +agnes schema
# columns + types (BIGQUERY/DUCKDB dialect printed in header) +agnes describe
-n 5 # sample rows (works on local & materialized only) ``` -For local-mode tables, query directly with `agnes query "SELECT … FROM
"`. +### Decision tree — pick the right tool BEFORE writing SQL + +``` + ┌─ local → agnes query "SELECT ..." +agnes catalog → ─────┤ +query_mode of
├─ materialized → agnes query (parquet was synced by agnes pull) + │ (if missing locally, run `agnes pull` first) + │ + └─ remote → choose by table size + query shape: + - one cheap probe (COUNT, schema-confirm, single agg ≤200s) + → agnes query --remote "..." + - repeated questions on same slice / large scan + → agnes snapshot create
--select ... --where ... --as + then agnes query "SELECT ... FROM " + - join with a local table + → agnes query --register-bq "alias=BQ_SQL" --sql "..." +``` ### Three patterns for `query_mode: "remote"` tables @@ -76,13 +104,30 @@ For local-mode tables, query directly with `agnes query "SELECT … FROM
| **`agnes query --remote`** | one-shot, server-side execution against BigQuery (works for BASE TABLE rows directly + VIEW/MATERIALIZED_VIEW rows via the BQ jobs API; cost-guarded by a 5 GiB scan cap configurable in /admin/server-config) | single aggregate / cheap probe | | **`agnes query --register-bq`** | hybrid joins between local snapshots and ad-hoc BQ subqueries | crossing local + remote | -### Permission model + cost — important +### Common mistakes — avoid on first try -- BQ access goes through the **agnes server's GCE service account**, not your personal Google credentials. If a query fails with a permission error, the table is in a project the server SA cannot read — escalate to admin, do NOT try to authenticate yourself. -- Every BQ query bills the SA's GCP project for **bytes scanned**. A naive `SELECT * FROM ` can cost real money. ALWAYS: - - filter via `--where` on the partition column (typically a date) - - list specific columns in `--select` — column-store BQ skips the rest, cheaper - - run `--estimate` first when unsure of the table size or partitioning +- **`--estimate` is on `agnes snapshot create` ONLY.** Do NOT pass it to `agnes query` — fails with `No such option: --estimate`. The estimate flow is a snapshot-creation cost gate, not a query primitive. +- **Old `agnes fetch` / `da fetch` / `da query` references in stale docs** — the CLI is `agnes`; `agnes fetch` was renamed to `agnes snapshot create`. If you see those names, translate before running. +- **Don't attempt personal GCP auth** if a BQ query fails with permission errors. BQ access uses the **server's service account**, not your Google identity — escalate to admin instead. +- **Don't `agnes query --remote "SELECT * FROM "`** without a `--where`. Even if the scan-byte gate refuses, you've wasted the round-trip; gate yourself first by reading `rough_size_hint` and `where_examples` from `agnes catalog --json`. + +### Failure-mode dictionary — what each error means + the right response + +| Error wording (substring) | Cause | Response | +|---|---|---| +| `Binder Error: Query execution exceeded the timeout. Job ID: ...` | BQ-side query took >~200 s wall-clock; the DuckDB BQ extension's `bq_query_timeout_ms` (default 90 s, server may bump to 600 s) elapsed | Narrow `--where` (especially partition column), drop unused columns from `--select`, or switch to `agnes snapshot create` to materialise once + query locally | +| `HTTP 400: remote_scan_too_large` | Server's `bq_max_scan_bytes` cost gate refused the query (default 5 GiB) | Tighten `--where`; consider `agnes snapshot create` so the cost is paid once, then local queries are free | +| `HTTP 401: ... unauthorized` | PAT expired or wrong | `agnes init --server-url ... --token `; re-mint via the dashboard's "Personal Access Tokens" page | +| `HTTP 403: cross_project_forbidden` (with `serviceusage` mention) | Server SA lacks `serviceusage.services.use` on the BQ data project | Escalate to admin to set `data_source.bigquery.billing_project`; do NOT try personal auth | +| `ReadTimeout` (client-side) on `agnes query --remote` | CLI is older than 0.35.1 (had 30 s default) | `agnes --version`; if <0.35.1, upgrade with `uv tool install --force ` (the URL is in the `[update]` banner that prints on every command). Then retry. | +| `unknown columns: [...]` from `agnes snapshot create` | `--select` lists columns that don't exist | Run `agnes schema
` and copy column names verbatim | + +### Cost discipline — every BQ query bills bytes scanned + +A naive `SELECT * FROM ` can cost real money. ALWAYS: +- filter via `--where` on the partition column (typically a date) — read `where_examples` in `agnes catalog --json` +- list specific columns in `--select` — column-store BQ skips the rest +- run `--estimate` first (only valid on `agnes snapshot create`) when the table is partitioned/clustered or when `rough_size_hint` is unknown ### `agnes snapshot create` discipline From 7a72ea9c3748e1935997fce71d88cc67e3403556 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 17:24:42 +0200 Subject: [PATCH 07/12] =?UTF-8?q?fix:=20Devin=20Review=20on=20#188=20?= =?UTF-8?q?=E2=80=94=20try=5Ffiles=20fallback=20+=20auto-upgrade=20orderin?= =?UTF-8?q?g?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs Devin caught: 1. Caddy `try_files A B C` rewrites the URI to its LAST entry when no file matches (per Caddy docs). Without an explicit "back to original URI" fallback, a parquet missing from all three known static paths would get rewritten to `/jira/data/.parquet`, and the reverse_proxy below would forward THAT rewritten URI to app:8000 → 404. The PR's documented "missed → falls through to app handler" promise didn't actually hold for legacy / future connectors. Append `/api/data//download` as the final try_files entry so the reverse_proxy receives the analyst-facing URI. 2. agnes-auto-upgrade.sh's TLS-overlay decision (which checks Caddyfile existence) ran BEFORE the config re-fetch loop. If a tick's fetch added a previously-missing Caddyfile, this tick's docker compose would still omit `--profile tls` until the next 5-min tick — a window where the recreate uses the wrong overlay set. Move the COMPOSE_FILES tls extension AFTER the fetch. Also strip the workspace prompt of table-list / metric-count enumerations (per user feedback): those are dynamic snapshots that go stale; replace with explicit "use `agnes catalog` / `agnes schema` / `agnes describe` to discover" guidance plus a note about `rough_size_hint` semantics. The Available Datasets `{% for t in tables %}` loop is gone — analysts use the live CLI instead. --- Caddyfile | 15 +++++++++- config/claude_md_template.txt | 29 ++++++++++++------- scripts/ops/agnes-auto-upgrade.sh | 48 ++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 28 deletions(-) diff --git a/Caddyfile b/Caddyfile index 9a39211..d8e33c6 100644 --- a/Caddyfile +++ b/Caddyfile @@ -60,8 +60,21 @@ # agnes data dir is mounted at /srv (read-only) instead — see the # `data:/srv:ro` line in docker-compose.yml's caddy service. The # root + try_files combo therefore probes /srv/extracts/... + # + # Devin Review caught: `try_files A B C` rewrites the URI to its + # LAST entry when no file matches (per Caddy docs). Without an + # explicit "rewrite back to original URI" fallback, a parquet + # missing from all three known paths would get rewritten to the + # last static candidate (`/jira/data/.parquet`), and the + # reverse_proxy below would forward THAT rewritten URI to + # app:8000 → app has no such route → 404. To make the documented + # "missed → falls through to app handler" promise hold, append + # the original `/api/data//download` path as the final + # try_files entry: when no file matches, the URI is rewritten + # back to the analyst-facing path and the app's `download_table` + # handler picks it up via the reverse_proxy fallback below. root * /srv/extracts - try_files /bigquery/data/{re.tid.1}.parquet /keboola/data/{re.tid.1}.parquet /jira/data/{re.tid.1}.parquet + try_files /bigquery/data/{re.tid.1}.parquet /keboola/data/{re.tid.1}.parquet /jira/data/{re.tid.1}.parquet /api/data/{re.tid.1}/download @found file handle @found { header Content-Disposition "attachment; filename=\"{re.tid.1}.parquet\"" diff --git a/config/claude_md_template.txt b/config/claude_md_template.txt index 31c78d3..58c1a64 100644 --- a/config/claude_md_template.txt +++ b/config/claude_md_template.txt @@ -28,22 +28,31 @@ This workspace is connected to {{ server.url }}. - **Personal customizations go in `.claude/CLAUDE.local.md`, NOT here.** This file is regenerated by `agnes init --force`; edits here will be lost. CLAUDE.local.md is preserved across regeneration and uploaded on `agnes push`. ## Metrics Workflow -1. `agnes catalog --metrics` — find the relevant metric ({{ metrics.count }} available, categories: {{ metrics.categories | join(", ") or "none yet" }}) -2. `agnes catalog --metrics --show /` — read SQL and business rules -3. Use the canonical SQL from the metric definition, adapt to the question -4. Never invent metric calculations — always check existing definitions first +1. `agnes catalog --metrics` — list registered metrics + categories +2. `agnes catalog --metrics --show /` — read the canonical SQL + business rules +3. Adapt the canonical SQL; never invent metric calculations ## Data Sync - `agnes pull` — download current data from server - `agnes push` — upload sessions and local notes to server - Data on the server refreshes every {{ sync_interval }} -## Available Datasets -{% for t in tables -%} -- `{{ t.name }}`{% if t.description %} — {{ t.description }}{% endif %}{% if t.query_mode == "remote" %} *(remote, queried on demand)*{% endif %} -{% else -%} -- _No tables registered yet — ask an admin to register tables in the dashboard._ -{% endfor %} +## Discovering tables — never enumerate from memory + +Tables, columns, sizes, and `query_mode` change as admins register / migrate / +drop entries. Always re-discover from the live server, never from this file: + +``` +agnes catalog --json # canonical list with query_mode, sql_flavor, + # where_examples, fetch_via, rough_size_hint per table +agnes schema
# columns + types in the right SQL dialect +agnes describe
-n 5 # sample rows (local + materialized only) +``` + +`rough_size_hint` is server-populated for `local` and `materialized` tables +(`small` ≤100 MiB, `medium` ≤1 GiB, `large` ≤10 GiB, `very_large` >10 GiB) and +`null` for `remote` rows. When `null`, treat the table as potentially large +and use `agnes snapshot create --estimate` to size-check before fetching. {% if marketplaces -%} ## Plugins available to you diff --git a/scripts/ops/agnes-auto-upgrade.sh b/scripts/ops/agnes-auto-upgrade.sh index 2dbd922..54bdf43 100755 --- a/scripts/ops/agnes-auto-upgrade.sh +++ b/scripts/ops/agnes-auto-upgrade.sh @@ -53,25 +53,16 @@ IMAGE="ghcr.io/keboola/agnes-the-ai-analyst:${AGNES_TAG:-stable}" # Array form (vs. word-split string) — quoted expansion survives paths # with spaces and is the modern bash idiom. Functionally identical here # since /opt/agnes paths are tame, but it's a cheap habit to keep. +# +# The TLS-overlay decision deliberately runs BELOW the config re-fetch +# (Devin Review caught: this used to live here, evaluating Caddyfile +# existence against the PRE-fetch state. If the fetch added a +# previously-missing Caddyfile, this tick's docker compose would still +# omit `--profile tls` until the next 5-minute tick — a window where +# the recreate uses the wrong overlay set). Base file list is fine to +# initialise here because the tls overlay is the only conditional one. COMPOSE_FILES=( -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.host-mount.yml ) PROFILE_ARGS=() -# `-s` (size > 0) instead of `-f` — guards against the corner case where -# rotate.sh wrote a 0-byte cert and exited (or got SIGKILLed mid-write). -# Bringing up the tls profile against an empty cert would just crash -# Caddy on start; better to fall back to plain :8000 until rotate -# regenerates real bytes. Same `-s` rule for Caddyfile: without it (or -# with an empty one) the caddy service crash-loops while the tls overlay -# has already closed :8000 — net effect is "app unreachable". Skipping -# the overlay keeps the app on plain :8000 until config lands. -if [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ] && [ -s Caddyfile ]; then - COMPOSE_FILES+=( -f docker-compose.tls.yml ) - PROFILE_ARGS=( --profile tls ) -elif [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ]; then - logger -t agnes-auto-upgrade "WARN: certs present but Caddyfile missing/empty — skipping tls overlay" -fi -BEFORE=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) -docker compose "${COMPOSE_FILES[@]}" pull >/dev/null 2>&1 -AFTER=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) # Re-fetch the bind-mounted config files (compose overlays + Caddyfile) # from the OSS main branch on every tick. Without this, an image-only @@ -113,6 +104,29 @@ for f in "${CONFIG_FILES[@]}"; do done CONFIG_AFTER=$(hash_config_files) +# `-s` (size > 0) instead of `-f` — guards against the corner case where +# rotate.sh wrote a 0-byte cert and exited (or got SIGKILLed mid-write). +# Bringing up the tls profile against an empty cert would just crash +# Caddy on start; better to fall back to plain :8000 until rotate +# regenerates real bytes. Same `-s` rule for Caddyfile: without it (or +# with an empty one) the caddy service crash-loops while the tls overlay +# has already closed :8000 — net effect is "app unreachable". Skipping +# the overlay keeps the app on plain :8000 until config lands. +# +# Evaluated AFTER the config re-fetch above so a freshly-added or +# freshly-removed Caddyfile is reflected in this tick's compose set, +# not the next one. +if [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ] && [ -s Caddyfile ]; then + COMPOSE_FILES+=( -f docker-compose.tls.yml ) + PROFILE_ARGS=( --profile tls ) +elif [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ]; then + logger -t agnes-auto-upgrade "WARN: certs present but Caddyfile missing/empty — skipping tls overlay" +fi + +BEFORE=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) +docker compose "${COMPOSE_FILES[@]}" pull >/dev/null 2>&1 +AFTER=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) + if [ "$BEFORE" != "$AFTER" ] || [ "$CONFIG_BEFORE" != "$CONFIG_AFTER" ]; then REASON=() [ "$BEFORE" != "$AFTER" ] && REASON+=("image digest") From e5fb913cecaeda02f3a4cbf57bc4c34149983000 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 17:44:08 +0200 Subject: [PATCH 08/12] =?UTF-8?q?perf:=20Tier=201=20event-loop=20unblockin?= =?UTF-8?q?g=20=E2=80=94=20async=20def=20=E2=86=92=20def=20on=20BQ-bound?= =?UTF-8?q?=20handlers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five hottest BQ-touching endpoints were `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's jobs.query) froze EVERY other request — /api/health, dashboard, auth, even another query — for the full BQ wait. Operators saw "VM idle, app frozen" during PR #188's testing. Convert to plain `def` so FastAPI auto-offloads the body to the anyio thread pool. Event loop stays free for non-BQ requests. - app/api/query.py:execute_query - app/api/v2_scan.py:scan_estimate_endpoint, scan_endpoint - app/api/v2_sample.py:sample - app/api/v2_schema.py:schema Audit: 0 `await` statements in any converted handler (verified file-by- file), so the rename is safe. Tests in tests/test_v2_*.py called the handlers via `asyncio.run(...)` which now fails on a non-coroutine return; swapped for direct calls (asyncio.run( -> ( ) — keeps paren balance). Plus AGNES_THREADPOOL_SIZE env var (default 200, was anyio's stock 40) in app/main.py:lifespan. Set via anyio.to_thread.current_default_thread_limiter().total_tokens. 200 is comfortable headroom for <50 concurrent analysts; bump for more. 480/480 impacted tests pass (the 2 remaining errors are a pre-existing fixture setup issue in test_reader_smoke_matrix.py unrelated to this change). --- CHANGELOG.md | 4 ++++ app/api/query.py | 15 +++++++++++++-- app/api/v2_sample.py | 4 +++- app/api/v2_scan.py | 9 +++++++-- app/api/v2_schema.py | 4 +++- app/main.py | 17 +++++++++++++++++ tests/test_v2_sample.py | 8 ++++---- tests/test_v2_scan.py | 6 +++--- tests/test_v2_scan_estimate.py | 6 +++--- tests/test_v2_schema.py | 10 +++++----- 10 files changed, 62 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89f00e7..881724a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Changed +- **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request** — `/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below. +- **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency. + ### Added - **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. - **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch. diff --git a/app/api/query.py b/app/api/query.py index df6cc50..a758dc4 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -71,12 +71,23 @@ class QueryResponse(BaseModel): @router.post("", response_model=QueryResponse) -async def execute_query( +def execute_query( request: QueryRequest, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Execute SQL against the server analytics DuckDB.""" + """Execute SQL against the server analytics DuckDB. + + Plain ``def`` (not ``async def``) so FastAPI auto-offloads the call + to the anyio thread pool. The body invokes ``analytics.execute(sql)`` + synchronously, which blocks for the full BQ jobs.query wait when a + referenced view resolves through the BQ extension. Under ``async def`` + that block holds the single uvicorn event loop, freezing every other + request (UI, /api/health, auth) until the query returns. Plain ``def`` + runs each invocation on its own thread, so heavy queries no longer + starve unrelated endpoints. See PR #188's CHANGELOG entry for the + Tier 1 event-loop unblocking rollout. + """ sql_lower = request.sql.strip().lower() # Block everything except SELECT diff --git a/app/api/v2_sample.py b/app/api/v2_sample.py index c30f468..9a5ffab 100644 --- a/app/api/v2_sample.py +++ b/app/api/v2_sample.py @@ -104,13 +104,15 @@ def build_sample( @router.get("/sample/{table_id}") -async def sample( +def sample( table_id: str, n: int = Query(default=5, ge=1, le=_MAX_N), user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), bq: BqAccess = Depends(get_bq_access), ): + # Plain ``def`` — opens a `bq.duckdb_session()` and runs sync queries + # through the BQ extension. See PR #188 Tier 1 entry. try: return build_sample(conn, user, table_id, n=n, bq=bq) except FileNotFoundError: diff --git a/app/api/v2_scan.py b/app/api/v2_scan.py index ff80b25..a012830 100644 --- a/app/api/v2_scan.py +++ b/app/api/v2_scan.py @@ -218,12 +218,17 @@ def _avg_bytes_for_type(t: str) -> int: @router.post("/scan/estimate") -async def scan_estimate_endpoint( +def scan_estimate_endpoint( raw: dict, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), bq: BqAccess = Depends(get_bq_access), ): + # Plain ``def`` so FastAPI auto-offloads to the anyio thread pool — the + # estimate path calls into google-cloud-bigquery's `client.query(..., + # dry_run=True)` which blocks until BQ returns the dry-run cost. Under + # ``async def`` that wait holds the event loop. See PR #188's Tier 1 + # entry for the wider rollout. try: return estimate(conn, user, raw, bq=bq) except WhereValidationError as e: @@ -374,7 +379,7 @@ def run_scan( @router.post("/scan") -async def scan_endpoint( +def scan_endpoint( raw: dict, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), diff --git a/app/api/v2_schema.py b/app/api/v2_schema.py index 3194707..a53c2a4 100644 --- a/app/api/v2_schema.py +++ b/app/api/v2_schema.py @@ -209,12 +209,14 @@ def build_schema( @router.get("/schema/{table_id}") -async def schema( +def schema( table_id: str, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), bq: BqAccess = Depends(get_bq_access), ): + # Plain ``def`` — opens a `bq.duckdb_session()` and runs sync metadata + # queries through the BQ extension. See PR #188 Tier 1 entry. try: return build_schema(conn, user, table_id, bq=bq) except NotFound: diff --git a/app/main.py b/app/main.py index 612c0ea..371235f 100644 --- a/app/main.py +++ b/app/main.py @@ -141,6 +141,23 @@ async def lifespan(app): log_effective_policy() except Exception: pass # never block startup on a logging convenience + + # Bump anyio's default thread pool size from 40 → AGNES_THREADPOOL_SIZE + # (default 200). FastAPI auto-runs every plain `def` route handler in + # this pool — the Tier 1 endpoints converted in PR #188 (`/api/query`, + # `/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`) all block on + # synchronous DuckDB / BQ-extension calls inside the handler body and + # would otherwise serialise once 40 are in flight. 200 keeps the per- + # process working set well under the BQ extension's connection cap + # while leaving headroom for concurrent UI / health probes. + try: + import anyio.to_thread + size = int(os.environ.get("AGNES_THREADPOOL_SIZE", "200")) + anyio.to_thread.current_default_thread_limiter().total_tokens = size + logger.info("anyio thread pool capacity set to %d", size) + except Exception as e: + logger.warning("failed to bump anyio thread pool capacity: %s", e) + yield from src.db import close_system_db close_system_db() diff --git a/tests/test_v2_sample.py b/tests/test_v2_sample.py index aa9e0d7..07f89f7 100644 --- a/tests/test_v2_sample.py +++ b/tests/test_v2_sample.py @@ -152,7 +152,7 @@ class TestBqAccessErrors: # Endpoint is async — drive it directly. dependency_overrides only # fires through TestClient/HTTP, so pass `bq=bq` explicitly. with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: @@ -182,7 +182,7 @@ class TestBqAccessErrors: user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: @@ -211,7 +211,7 @@ class TestBqAccessErrors: user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: @@ -248,7 +248,7 @@ class TestBqAccessErrors: try: _seed(conn) user = {"id": "admin1", "email": "a@x.com"} - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: diff --git a/tests/test_v2_scan.py b/tests/test_v2_scan.py index 7dabe68..289ae02 100644 --- a/tests/test_v2_scan.py +++ b/tests/test_v2_scan.py @@ -298,7 +298,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -337,7 +337,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -372,7 +372,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: diff --git a/tests/test_v2_scan_estimate.py b/tests/test_v2_scan_estimate.py index c451490..c2ab56b 100644 --- a/tests/test_v2_scan_estimate.py +++ b/tests/test_v2_scan_estimate.py @@ -117,7 +117,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -156,7 +156,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -191,7 +191,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: diff --git a/tests/test_v2_schema.py b/tests/test_v2_schema.py index 8ae15c4..f2bfe09 100644 --- a/tests/test_v2_schema.py +++ b/tests/test_v2_schema.py @@ -161,7 +161,7 @@ class TestBqAccessErrors: # Endpoint is async — drive it directly. dependency_overrides only # fires through TestClient/HTTP, so pass `bq=bq` explicitly. with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: @@ -191,7 +191,7 @@ class TestBqAccessErrors: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: @@ -217,7 +217,7 @@ class TestBqAccessErrors: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: @@ -259,7 +259,7 @@ class TestBqAccessErrors: try: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} - data = asyncio.run(v2_schema.schema( + data = (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=_bq(), )) finally: @@ -322,7 +322,7 @@ class TestBqAccessErrors: try: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: From 28423907fde8314af0c9c161680bceeb268be9e0 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 18:11:59 +0200 Subject: [PATCH 09/12] feat: clean CLI errors + init progress + skip-materialize + claude.md catalog pointer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three first-try-failure-surface fixes from Pavel's #185 trace + the template guidance question, all under PR #188's umbrella so they land together with the file_server / parallel pull / Tier 1 work. 1. CLI clean-error wrapper — new AgnesTransportError raised by the api_*/stream_download helpers when httpx times out / drops / refuses, plus a top-level Typer wrapper (cli/main.py) that prints one-line "Error: …" + actionable hint and exits non-zero. Full traceback goes to ~/.config/agnes/last-error.log for support forwarding. Unhandled Exceptions are caught at the same boundary so no Python traceback ever leaks to the analyst's terminal. Pavel's #185 Phase 3B: a 30-frame httpx traceback from a slow BQ --remote query made it look like a CLI bug. Now: clean message + hint pointing at `agnes snapshot create` / partition-column guidance. Entry point in pyproject.toml flipped from `cli.main:app` → `cli.main:_run_with_clean_errors` so the wrapper actually runs under the installed `agnes` binary. 2. agnes init / agnes pull --skip-materialize + progress bar. --skip-materialize omits query_mode='materialized' rows from the download set so a first init doesn't spend 44 minutes silently pulling a single 6 GB parquet (Pavel's #185 Phase 1). Rich-driven per-file progress bar with label/bytes/rate/ETA renders to stderr when not --quiet and not --json. Aggregates across the parallel ThreadPoolExecutor workers added earlier in this PR. 3. config/claude_md_template.txt: explicit one-line snippet pointing at `agnes catalog --json | jq '.tables[] | select(.id=="")'` for per-table descriptions + restated invariant: "the description field on each catalog row is the authoritative business-rules text — re-read live, never copy into this file." Resolves the regression-or-feature debate between Pavel (wants annotations) and the user feedback that landed in the prior commit (don't embed table-specific content; tables change). Catalog command stays the source of truth. --- CHANGELOG.md | 5 ++ cli/client.py | 139 +++++++++++++++++++++++++++++++--- cli/commands/init.py | 20 ++++- cli/commands/pull.py | 20 ++++- cli/lib/pull.py | 76 +++++++++++++++++-- cli/main.py | 38 +++++++++- config/claude_md_template.txt | 22 ++++-- pyproject.toml | 2 +- 8 files changed, 294 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 881724a..026573c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Added +- **`agnes init` / `agnes pull --skip-materialize`** — opts the first sync out of materialized-mode tables (server-side scheduled-query parquets, often multi-GB). Pavel's #185 Phase 1: a single 6.3 GB `order_economics` parquet kept first init silent for 44 minutes. Materialized rows stay discoverable via `agnes catalog`; rerun without the flag once the analyst actually needs them locally. +- **`agnes pull` progress bar** — Rich-driven aggregate transfer display rendered to stderr when not `--quiet` and not `--json`. Per-file label + bytes / total / rate / ETA, aggregated across the parallel `ThreadPoolExecutor` workers introduced earlier in this PR. Replaces the prior 0-stdout silence on first init. +- **CLI clean-error wrapper** (`cli/main.py:_run_with_clean_errors`, new entry point in `pyproject.toml`) — `httpx.ReadTimeout` / `ConnectError` / `RemoteProtocolError` etc. used to dump a five-frame Python traceback to the analyst's terminal when a `agnes query --remote` against a slow BQ view timed out client-side. Now: one-line `Error: …` message + actionable hint (e.g. "narrow the WHERE on the partition column from `agnes catalog --json`, or run `agnes snapshot create --estimate`"), exit code 1. Full traceback is appended to `~/.config/agnes/last-error.log` so an operator can recover it for support without spamming the analyst's terminal. Implemented as `AgnesTransportError` raised from the `api_get` / `api_post` / `api_delete` / `api_patch` / `stream_download` helpers in `cli/client.py`; the top-level Typer wrapper renders it. Unhandled `Exception`s are caught at the same boundary, logged, and printed as "internal CLI error (see logfile)" so a Python traceback never leaks to the analyst. + ### Changed - **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request** — `/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below. - **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency. diff --git a/cli/client.py b/cli/client.py index 1efdd7d..e1861d8 100644 --- a/cli/client.py +++ b/cli/client.py @@ -2,12 +2,14 @@ import os import time +import traceback +from datetime import datetime, timezone from pathlib import Path from typing import Optional import httpx -from cli.config import get_server_url, get_token +from cli.config import _config_dir, get_server_url, get_token # Retry policy for transient failures during stream downloads. Scoped to # network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via @@ -21,6 +23,105 @@ _RETRY_BACKOFFS_S = (0.3, 1.0, 3.0) # seconds before attempt 2, 3, 4 QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300")) +# ── Transport-error translation ───────────────────────────────────────── +# Pavel's Issue #185 Phase 3B caught the failure mode: when httpx raises +# `ReadTimeout` / `ConnectError` / `RemoteProtocolError` and the CLI +# command doesn't catch it, Typer dumps a five-frame Python traceback to +# the analyst's terminal. That looks like a CLI bug to a non-Python user +# and obscures the actionable signal ("server slow, try snapshot create"). +# Translate transport exceptions to `AgnesTransportError` with a typed +# user-facing message, log the full traceback to `~/.config/agnes/last- +# error.log` for debug, and let the top-level CLI handler render the +# clean message + exit non-zero. + +_LOG_FILE = _config_dir() / "last-error.log" + + +class AgnesTransportError(Exception): + """Network / transport failure with a user-actionable message. + + Raised by the api_* / stream_download helpers when httpx surfaces a + connection / timeout / protocol error. The CLI's top-level Typer + handler catches this, prints `.user_message` (NOT the traceback), + and exits non-zero. Full traceback goes to ``~/.config/agnes/last- + error.log`` so an operator can recover it for support. + """ + + def __init__(self, user_message: str, *, hint: str = "", logfile_path: Path | None = None): + super().__init__(user_message) + self.user_message = user_message + self.hint = hint + self.logfile_path = logfile_path + + +def _log_traceback(exc: BaseException, *, context: str) -> Path: + """Append a timestamped traceback to ``~/.config/agnes/last-error.log`` + and return the path. Best-effort — never raises (a logging failure + must not mask the original error).""" + try: + with open(_LOG_FILE, "a", encoding="utf-8") as f: + ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + f.write(f"\n=== {ts} {context} ===\n") + traceback.print_exception(type(exc), exc, exc.__traceback__, file=f) + except Exception: + pass + return _LOG_FILE + + +def _translate_transport_error(exc: Exception, *, context: str) -> AgnesTransportError: + """Map httpx transport exceptions to user-facing CLI messages. The + mapping is intentionally pragmatic — analysts care about "what do I + do next", not the gRPC / TCP detail.""" + log = _log_traceback(exc, context=context) + if isinstance(exc, httpx.ReadTimeout): + return AgnesTransportError( + f"Server didn't respond within the read timeout ({QUERY_TIMEOUT_S:.0f}s) " + f"for {context}.", + hint=( + "If this is `agnes query --remote` against a heavy BQ view, " + "the underlying BQ job took longer than the wait window. Try:\n" + " • narrow the WHERE (especially the partition column from `agnes catalog --json`)\n" + " • `agnes snapshot create
... --estimate` to materialize once + query locally\n" + " • set AGNES_QUERY_TIMEOUT=600 for a longer client-side wait\n" + f"Full traceback: {log}" + ), + logfile_path=log, + ) + if isinstance(exc, httpx.ConnectError): + return AgnesTransportError( + f"Can't reach the agnes server for {context}.", + hint=( + "Check the server URL with `agnes status`, network reachability " + "(VPN / DNS / firewall), and the TLS-trust setup if this is a " + f"corporate-CA deployment.\nFull traceback: {log}" + ), + logfile_path=log, + ) + if isinstance(exc, (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError)): + return AgnesTransportError( + f"Connection broke mid-flight on {context}.", + hint=( + "Usually a transient network blip. Re-run the command. If it " + f"keeps happening, check `agnes status`.\nFull traceback: {log}" + ), + logfile_path=log, + ) + if isinstance(exc, httpx.TimeoutException): + return AgnesTransportError( + f"Network timeout on {context}.", + hint=f"Re-run; if persistent, check the server.\nFull traceback: {log}", + logfile_path=log, + ) + # Anything else: re-wrap with a generic message so the CLI doesn't + # dump the traceback. We'd prefer a typed translation; if you hit + # this branch, add a clause above. + return AgnesTransportError( + f"Unexpected error on {context}: {type(exc).__name__}.", + hint=f"Full traceback: {log}", + logfile_path=log, + ) + + def get_client(timeout: float = 30.0) -> httpx.Client: """Get an authenticated httpx client.""" token = get_token() @@ -35,23 +136,35 @@ def get_client(timeout: float = 30.0) -> httpx.Client: def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.get(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.get(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"GET {path}") from exc def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.post(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.post(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"POST {path}") from exc def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.delete(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.delete(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"DELETE {path}") from exc def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: - with get_client(timeout=timeout) as client: - return client.patch(path, **kwargs) + try: + with get_client(timeout=timeout) as client: + return client.patch(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"PATCH {path}") from exc def _is_transient(exc: Exception) -> bool: @@ -98,7 +211,13 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int: if attempt == _RETRY_ATTEMPTS or not _is_transient(exc): break time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)]) - # Clean up any leftover tmp, then surface the last exception. + # Clean up any leftover tmp, then surface the last exception. Translate + # transport errors to AgnesTransportError so the CLI prints a clean + # message instead of a Python traceback (Pavel's #185 Phase 3B). tmp_path.unlink(missing_ok=True) assert last_exc is not None + if isinstance(last_exc, httpx.HTTPError): + raise _translate_transport_error( + last_exc, context=f"GET {path} (stream → {target_path})" + ) from last_exc raise last_exc diff --git a/cli/commands/init.py b/cli/commands/init.py index 58bbb10..61d539c 100644 --- a/cli/commands/init.py +++ b/cli/commands/init.py @@ -64,6 +64,16 @@ def init( token: str = typer.Option(..., "--token", help="Personal access token"), force: bool = typer.Option(False, "--force", help="Re-initialize an existing workspace"), workspace_str: Optional[str] = typer.Option(None, "--workspace", help="Target dir (default: cwd)"), + skip_materialize: bool = typer.Option( + False, "--skip-materialize", + help=( + "Skip materialized-mode tables on the first pull. The first " + "init can otherwise spend tens of minutes silently downloading " + "a single multi-GB scheduled-query parquet. Materialized rows " + "are still discoverable via `agnes catalog`; rerun `agnes pull` " + "without this flag once you actually need them locally." + ), + ), ): """Bootstrap workspace: auth, CLAUDE.md, hooks, first pull, AGNES_WORKSPACE.md.""" workspace = Path(workspace_str).resolve() if workspace_str else Path.cwd() @@ -176,7 +186,15 @@ def init( # exception escaping here is a programming error worth surfacing. # ------------------------------------------------------------------ try: - result: PullResult = run_pull(server_url, token, workspace) + # `agnes init` always runs interactively (analyst typing the + # command), so progress is on by default — Pavel's #185 Phase 1 + # was a 44-minute silent download on the very first install. + # Pass it through to run_pull. + result: PullResult = run_pull( + server_url, token, workspace, + skip_materialize=skip_materialize, + show_progress=True, + ) except Exception as exc: typer.echo(render_error(0, {"detail": { "kind": "manifest_unauthorized", diff --git a/cli/commands/pull.py b/cli/commands/pull.py index 6f71a63..48924f1 100644 --- a/cli/commands/pull.py +++ b/cli/commands/pull.py @@ -38,6 +38,15 @@ def pull( quiet: bool = typer.Option(False, "--quiet", help="Suppress success stdout (errors still surface on stderr)"), as_json: bool = typer.Option(False, "--json", help="Emit a single JSON object summarizing the pull"), dry_run: bool = typer.Option(False, "--dry-run", help="Compute the delta without writing anything to disk"), + skip_materialize: bool = typer.Option( + False, "--skip-materialize", + help=( + "Skip materialized-mode tables (server-side scheduled BQ " + "scan results, often multi-GB). Their data is still discoverable " + "via `agnes catalog` and remote-mode tables still pull. Useful " + "for a fast first init when an analyst only needs --remote access." + ), + ), ): """Refresh data from the server into ./server/parquet + ./user/duckdb.""" server_url = get_server_url() @@ -68,8 +77,17 @@ def pull( workspace = Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve() + # Show progress unless quiet (SessionStart hooks) or json (machine- + # readable output where Rich's terminal-control sequences would be + # garbage in the consumer's parser). + show_progress = not (quiet or as_json) try: - result: PullResult = run_pull(server_url, token, workspace, dry_run=dry_run) + result: PullResult = run_pull( + server_url, token, workspace, + dry_run=dry_run, + skip_materialize=skip_materialize, + show_progress=show_progress, + ) except Exception as exc: # `run_pull` is documented to record per-table / per-stage failures # under `result.errors` rather than raising, so reaching this branch diff --git a/cli/lib/pull.py b/cli/lib/pull.py index 5e6445f..b33c4b3 100644 --- a/cli/lib/pull.py +++ b/cli/lib/pull.py @@ -112,6 +112,8 @@ def run_pull( workspace: Path, *, dry_run: bool = False, + skip_materialize: bool = False, + show_progress: bool = False, ) -> PullResult: """Refresh local parquets + corporate memory rules from the server. @@ -119,6 +121,17 @@ def run_pull( Typer/Rich UI. Returns a `PullResult` summary; never raises for network/server errors (records them under `errors` instead) so the caller can decide whether a partial pull is fatal. + + Args: + skip_materialize: When True, omit `query_mode='materialized'` + tables from the download set. Use for analysts who only + care about `--remote` access on the workspace and don't + want to wait on multi-GB scheduled-query parquets at first + init. Pavel's #185 Phase 1: a 6.3 GB `order_economics` + parquet kept first init silent for 44 minutes. + show_progress: When True, render a per-file progress bar to + stderr via Rich during the parallel download phase. Pass + False from `--quiet` callers (SessionStart hooks). """ started = time.monotonic() result = PullResult() @@ -159,6 +172,11 @@ def run_pull( for tid, info in server_tables.items(): if info.get("query_mode") == "remote": continue + if skip_materialize and info.get("query_mode") == "materialized": + # Operator opt-out for first-init. Materialized rows are + # still discoverable via `agnes catalog` and queryable + # the next time `agnes pull` runs without --skip-materialize. + continue non_remote_total += 1 local_hash = local_tables.get(tid, {}).get("hash", "") server_hash = info.get("hash", "") @@ -201,14 +219,52 @@ def run_pull( # the executor + thread overhead for the common single-update case. workers = min(workers, len(to_download)) if to_download else 1 + # Optional progress bar — Rich's Progress tracks per-file bytes + # streamed, aggregated across the parallel ThreadPoolExecutor + # workers. Pavel's #185 Phase 1: a single 6.3 GB parquet on first + # init went 44 minutes silent, looked frozen. Now: aggregate "X.Y + # GB / Z.A GB · 56 MB/s · ETA 1m 20s" to stderr while threads + # stream. None when show_progress=False (SessionStart hooks etc.). + progress = None + progress_tasks: dict[str, int] = {} + if show_progress and to_download: + from rich.progress import ( + Progress, BarColumn, DownloadColumn, TextColumn, + TimeRemainingColumn, TransferSpeedColumn, + ) + progress = Progress( + TextColumn("[bold]{task.fields[label]}[/]"), + BarColumn(), + DownloadColumn(), + TransferSpeedColumn(), + TimeRemainingColumn(), + transient=False, + ) + progress.start() + for tid in to_download: + size = int(server_tables[tid].get("size_bytes") or 0) + # Some manifest entries don't carry size — Rich shows + # an indeterminate bar in that case. + progress_tasks[tid] = progress.add_task( + "download", label=tid, total=size if size > 0 else None, + ) + def _download_one(tid: str) -> tuple[str, dict | None, str | None]: """Returns (tid, local_table_entry_or_None, error_or_None). One bound thread per call; stream_download is sync I/O so a - ThreadPoolExecutor (not asyncio) is the right tool.""" + ThreadPoolExecutor (not asyncio) is the right tool. The + progress callback is thread-safe — Rich's Progress.update + holds an internal lock.""" target = parquet_dir / f"{tid}.parquet" expected_hash = server_tables[tid].get("hash", "") + cb = None + if progress is not None and tid in progress_tasks: + task_id = progress_tasks[tid] + def cb(n: int, _tid=tid, _task=task_id): + progress.update(_task, advance=n) try: - stream_download(f"/api/data/{tid}/download", str(target)) + stream_download(f"/api/data/{tid}/download", str(target), + progress_callback=cb) if expected_hash: actual_hash = _file_md5(target) if actual_hash != expected_hash: @@ -228,12 +284,16 @@ def run_pull( except Exception as exc: return tid, None, str(exc) - if workers <= 1: - outcomes = [_download_one(tid) for tid in to_download] - else: - from concurrent.futures import ThreadPoolExecutor - with ThreadPoolExecutor(max_workers=workers) as ex: - outcomes = list(ex.map(_download_one, to_download)) + try: + if workers <= 1: + outcomes = [_download_one(tid) for tid in to_download] + else: + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=workers) as ex: + outcomes = list(ex.map(_download_one, to_download)) + finally: + if progress is not None: + progress.stop() for tid, entry, err in outcomes: if err is not None: diff --git a/cli/main.py b/cli/main.py index 67f59ac..33f6ebc 100644 --- a/cli/main.py +++ b/cli/main.py @@ -123,5 +123,41 @@ app.add_typer(snapshot_app, name="snapshot") app.add_typer(disk_info_app, name="disk-info") +def _run_with_clean_errors() -> None: + """Wrap ``app()`` so AgnesTransportError (and other typed CLI errors) + surface as a one-line message + exit, never as a Python traceback. The + full traceback is already logged to ``~/.config/agnes/last-error.log`` + by the api_* helpers — operators read it from there for support + forwarding. Anything that escapes this wrapper IS a CLI bug worth + fixing — log + print "internal error" so the analyst doesn't see a + Pythonist's traceback either. + + Pavel's #185 Phase 3B: previously a `httpx.ReadTimeout` from an + `agnes query --remote` against a slow BQ view dumped a 30-frame + traceback to the analyst's terminal. Now: one clean line + a hint, + return code 1. + """ + from cli.client import AgnesTransportError, _log_traceback, _LOG_FILE + try: + app() + except (AgnesTransportError) as exc: + typer.echo(f"Error: {exc.user_message}", err=True) + if exc.hint: + typer.echo(exc.hint, err=True) + sys.exit(1) + except typer.Exit: + raise + except (KeyboardInterrupt, SystemExit): + raise + except Exception as exc: # last-resort net — escaped exceptions are bugs + log = _log_traceback(exc, context="unhandled at CLI top-level") + typer.echo( + f"Error: internal CLI error ({type(exc).__name__}). " + f"Full traceback logged to {log}.", + err=True, + ) + sys.exit(1) + + if __name__ == "__main__": - app() + _run_with_clean_errors() diff --git a/config/claude_md_template.txt b/config/claude_md_template.txt index 58c1a64..c5b80f1 100644 --- a/config/claude_md_template.txt +++ b/config/claude_md_template.txt @@ -39,16 +39,26 @@ This workspace is connected to {{ server.url }}. ## Discovering tables — never enumerate from memory -Tables, columns, sizes, and `query_mode` change as admins register / migrate / -drop entries. Always re-discover from the live server, never from this file: +Tables, columns, sizes, descriptions, and `query_mode` change as admins +register / migrate / drop entries. Always re-discover from the live server, +never from this file or your training data: ``` -agnes catalog --json # canonical list with query_mode, sql_flavor, - # where_examples, fetch_via, rough_size_hint per table -agnes schema
# columns + types in the right SQL dialect -agnes describe
-n 5 # sample rows (local + materialized only) +agnes catalog --json # all tables: id, query_mode, sql_flavor, + # where_examples, fetch_via, rough_size_hint, description +agnes catalog --json | jq '.tables[] | select(.id=="")' # single table — read its description in full BEFORE writing any SQL +agnes schema
# columns + types in the right SQL dialect +agnes describe
-n 5 # sample rows (local + materialized only) ``` +The `description` field on each catalog row is the **authoritative +business-rules text** for that table — it carries grain, partition +column, join contracts, and column-level gotchas. Re-read it from the +live `agnes catalog` for every cross-table decision; do **not** copy +it into this workspace `CLAUDE.md` (it's a snapshot that goes stale, +and `agnes init` will overwrite local edits — put personal notes into +`.claude/CLAUDE.local.md` instead). The CLI is the source of truth. + `rough_size_hint` is server-populated for `local` and `materialized` tables (`small` ≤100 MiB, `medium` ≤1 GiB, `large` ≤10 GiB, `very_large` >10 GiB) and `null` for `remote` rows. When `null`, treat the table as potentially large diff --git a/pyproject.toml b/pyproject.toml index c4c02c3..7c9ae61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,7 +95,7 @@ dev = [ ] [project.scripts] -agnes = "cli.main:app" +agnes = "cli.main:_run_with_clean_errors" [build-system] requires = ["hatchling"] From f2ce9154585df2f71e6aed5743d0fecec708c754 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 18:29:44 +0200 Subject: [PATCH 10/12] =?UTF-8?q?fix:=20Devin=20Review=20on=20#188=20commi?= =?UTF-8?q?t=2028423907=20=E2=80=94=202=20bugs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🚩 /api/v2/catalog still async def while now calling sync stat() `/api/v2/catalog` was left as `async def` when the rest of Tier 1 was converted, on the assumption it was lightweight. The new `_materialized_size_hint` populator added in this PR calls `Path.stat()` / `Path.exists()` for every visible row to bucket the parquet size — on a local FS that's microseconds, but on a network-mounted DATA_DIR (NFS / CIFS / GCS-FUSE) those syscalls can block the event loop. Convert to plain `def` so FastAPI auto-offloads to the thread pool, mirroring /api/query etc. 🔴 stream_download translates HTTPStatusError as generic transport error `response.raise_for_status()` inside the retry loop raises `httpx.HTTPStatusError` on 4xx/5xx. After retries exhaust, the new `isinstance(last_exc, httpx.HTTPError)` check at line 219 was eating the status code: HTTPStatusError is a subclass of HTTPError, so the generic transport translation produced "Unexpected error: HTTPStatusError" instead of the informative "Client error '401 Unauthorized' for url …" that callers expect. Fix: short-circuit HTTPStatusError before the HTTPError branch — it re-raises verbatim so the caller's status-code handling + the rich server error body (e.g. 401 expired token, 403 cross_project_forbidden) reach the analyst. api_get / api_post / api_delete / api_patch don't have the same bug: httpx Client.get/etc. don't raise HTTPStatusError unless the caller explicitly calls .raise_for_status(), and our wrappers don't. Only stream_download does, hence the targeted fix there. --- app/api/v2_catalog.py | 11 ++++++++++- cli/client.py | 13 +++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/app/api/v2_catalog.py b/app/api/v2_catalog.py index a5b660e..43ade88 100644 --- a/app/api/v2_catalog.py +++ b/app/api/v2_catalog.py @@ -127,8 +127,17 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: @router.get("/catalog") -async def catalog( +def catalog( user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): + # Plain ``def`` so FastAPI auto-offloads to the anyio thread pool — + # build_catalog now calls `_materialized_size_hint` for every visible + # row, which does sync `Path.stat()` / `Path.exists()` on the data + # volume. On local FS that's microseconds, but on a network-mounted + # DATA_DIR (NFS / CIFS / GCS-FUSE) those calls can block. Plain ``def`` + # means each request runs on its own thread; the event loop stays + # free for non-catalog traffic. Mirrors the Tier 1 conversion of + # /api/query, /api/v2/scan, /api/v2/sample, /api/v2/schema — + # Devin Review on PR #188. return build_catalog(conn, user) diff --git a/cli/client.py b/cli/client.py index e1861d8..1c665b9 100644 --- a/cli/client.py +++ b/cli/client.py @@ -212,10 +212,19 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int: break time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)]) # Clean up any leftover tmp, then surface the last exception. Translate - # transport errors to AgnesTransportError so the CLI prints a clean - # message instead of a Python traceback (Pavel's #185 Phase 3B). + # transport errors (timeouts, connection drops, protocol errors) to + # AgnesTransportError so the CLI prints a clean message instead of a + # Python traceback (Pavel's #185 Phase 3B). HTTPStatusError (4xx/5xx + # response from the server) is NOT a transport failure and must + # re-raise verbatim so the caller's status-code handling + the rich + # server error body (e.g. 401 with "token expired", 403 with + # cross_project_forbidden detail) reach the analyst — Devin Review on + # PR #188 caught: HTTPStatusError is a subclass of HTTPError, so the + # generic isinstance(HTTPError) translation was eating status codes. tmp_path.unlink(missing_ok=True) assert last_exc is not None + if isinstance(last_exc, httpx.HTTPStatusError): + raise last_exc if isinstance(last_exc, httpx.HTTPError): raise _translate_transport_error( last_exc, context=f"GET {path} (stream → {target_path})" From f33475cec3e7339a799811c79babae443f5bf19c Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 18:57:04 +0200 Subject: [PATCH 11/12] =?UTF-8?q?release:=200.36.0=20=E2=80=94=20perf=20+?= =?UTF-8?q?=20analyst-clarity=20bundle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Renames the [Unreleased] section to [0.36.0] in CHANGELOG, adds the top-level summary, drops a fresh empty [Unreleased] above, and bumps pyproject from 0.35.1. Also fixes the third Devin Review finding on this PR: the CLI ReadTimeout message hardcoded QUERY_TIMEOUT_S (300s) so a 30s-default call (agnes catalog, agnes auth, …) reported a wait window that didn't match reality. _translate_transport_error now takes the actual httpx timeout from the calling helper; the BQ-job advisory only appears for calls where the timeout was set ≥ 60s. --- CHANGELOG.md | 5 +++++ cli/client.py | 45 +++++++++++++++++++++++++++++++++------------ pyproject.toml | 2 +- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 026573c..766a997 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.36.0] — 2026-05-05 + +Combined performance + analyst-clarity bundle. Folds three previously-staged work streams into one PR (#188): the long-running `agnes query --remote` timeout (#181), the Caddy parquet-download bypass (#182), and Pavel's #185 Phase 1 trace findings (silent 44-min first-init, opaque CLI tracebacks, no analyst-Claude size signal). Also performs the Tier 1 event-loop unblocking — the five hottest BQ-touching endpoints were `async def` over synchronous DuckDB / BQ-extension calls, so a single heavy `agnes query --remote` froze every other request for the duration of the BQ wait. The image-side fixes ship in this release; for existing VMs, the new auto-upgrade.sh self-fetches the matching Caddyfile + compose overlays from `main` on its next 5-minute tick, so deployment requires no operator action beyond letting the cron run. + ### Added - **`agnes init` / `agnes pull --skip-materialize`** — opts the first sync out of materialized-mode tables (server-side scheduled-query parquets, often multi-GB). Pavel's #185 Phase 1: a single 6.3 GB `order_economics` parquet kept first init silent for 44 minutes. Materialized rows stay discoverable via `agnes catalog`; rerun without the flag once the analyst actually needs them locally. - **`agnes pull` progress bar** — Rich-driven aggregate transfer display rendered to stderr when not `--quiet` and not `--json`. Per-file label + bytes / total / rate / ETA, aggregated across the parallel `ThreadPoolExecutor` workers introduced earlier in this PR. Replaces the prior 0-stdout silence on first init. @@ -32,6 +36,7 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Fixed +- **CLI ReadTimeout message reports the actual httpx timeout** (was hardcoded to `QUERY_TIMEOUT_S` = 300s). On a 30s-default call (`agnes catalog`, `agnes auth`, …) the analyst saw "didn't respond within the read timeout (300s)" while the call had actually given up after 30s — confusing and unactionable. The translator now takes the real timeout from the calling helper and renders it; the long-running-BQ advisory only appears for calls where the timeout was set ≥ 60s. Devin Review on PR #188. - Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes: - `kbcstorage>=0.9.0` is promoted from optional to core dependency. The legacy fallback path in `connectors/keboola/extractor.py:_extract_via_legacy` has been there since the extension landed, but until now the bare `from kbcstorage.client import Client` would crash any default install with `ModuleNotFoundError`. - `connectors/keboola/extractor.py:run` now wraps `_extract_via_extension` in a per-table try/except — on any per-table scan failure it retries via the legacy client. Previously, when `ATTACH` succeeded but the table-level `COPY (SELECT * FROM kbc.""."
")` failed, the table was just marked failed with no retry. diff --git a/cli/client.py b/cli/client.py index 1c665b9..544026c 100644 --- a/cli/client.py +++ b/cli/client.py @@ -68,23 +68,43 @@ def _log_traceback(exc: BaseException, *, context: str) -> Path: return _LOG_FILE -def _translate_transport_error(exc: Exception, *, context: str) -> AgnesTransportError: +def _translate_transport_error( + exc: Exception, *, context: str, timeout_s: float | None = None, +) -> AgnesTransportError: """Map httpx transport exceptions to user-facing CLI messages. The mapping is intentionally pragmatic — analysts care about "what do I - do next", not the gRPC / TCP detail.""" + do next", not the gRPC / TCP detail. + + `timeout_s`, when supplied, is the actual httpx timeout used by the + failing call so the ReadTimeout message reports the real wait window + (a `agnes catalog` GET dies at 30s, not 300s — Devin Review on PR + #188 caught the original signature hardcoding `QUERY_TIMEOUT_S`, + which only matches `agnes query --remote`).""" log = _log_traceback(exc, context=context) if isinstance(exc, httpx.ReadTimeout): - return AgnesTransportError( - f"Server didn't respond within the read timeout ({QUERY_TIMEOUT_S:.0f}s) " - f"for {context}.", - hint=( + wait_s = timeout_s if timeout_s is not None else QUERY_TIMEOUT_S + # The "long-running BQ" advisory only makes sense when the call + # actually hit the query path (timeout ≥ ~60s). For short calls + # (the 30s default on `agnes catalog` etc.) it's just confusing. + if wait_s >= 60: + hint = ( "If this is `agnes query --remote` against a heavy BQ view, " "the underlying BQ job took longer than the wait window. Try:\n" " • narrow the WHERE (especially the partition column from `agnes catalog --json`)\n" " • `agnes snapshot create
... --estimate` to materialize once + query locally\n" " • set AGNES_QUERY_TIMEOUT=600 for a longer client-side wait\n" f"Full traceback: {log}" - ), + ) + else: + hint = ( + "Server is slow or unreachable. Check `agnes status`; " + "re-run if transient.\n" + f"Full traceback: {log}" + ) + return AgnesTransportError( + f"Server didn't respond within the read timeout ({wait_s:.0f}s) " + f"for {context}.", + hint=hint, logfile_path=log, ) if isinstance(exc, httpx.ConnectError): @@ -140,7 +160,7 @@ def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: with get_client(timeout=timeout) as client: return client.get(path, **kwargs) except httpx.HTTPError as exc: - raise _translate_transport_error(exc, context=f"GET {path}") from exc + raise _translate_transport_error(exc, context=f"GET {path}", timeout_s=timeout) from exc def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: @@ -148,7 +168,7 @@ def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: with get_client(timeout=timeout) as client: return client.post(path, **kwargs) except httpx.HTTPError as exc: - raise _translate_transport_error(exc, context=f"POST {path}") from exc + raise _translate_transport_error(exc, context=f"POST {path}", timeout_s=timeout) from exc def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: @@ -156,7 +176,7 @@ def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: with get_client(timeout=timeout) as client: return client.delete(path, **kwargs) except httpx.HTTPError as exc: - raise _translate_transport_error(exc, context=f"DELETE {path}") from exc + raise _translate_transport_error(exc, context=f"DELETE {path}", timeout_s=timeout) from exc def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: @@ -164,7 +184,7 @@ def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: with get_client(timeout=timeout) as client: return client.patch(path, **kwargs) except httpx.HTTPError as exc: - raise _translate_transport_error(exc, context=f"PATCH {path}") from exc + raise _translate_transport_error(exc, context=f"PATCH {path}", timeout_s=timeout) from exc def _is_transient(exc: Exception) -> bool: @@ -227,6 +247,7 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int: raise last_exc if isinstance(last_exc, httpx.HTTPError): raise _translate_transport_error( - last_exc, context=f"GET {path} (stream → {target_path})" + last_exc, context=f"GET {path} (stream → {target_path})", + timeout_s=300.0, ) from last_exc raise last_exc diff --git a/pyproject.toml b/pyproject.toml index 7c9ae61..8fddab2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.35.1" +version = "0.36.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" From e2f740d7abfaa06803a2da10ad1061436a0ee767 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 19:04:51 +0200 Subject: [PATCH 12/12] fix(changelog): consolidate duplicate Added/Changed sections in 0.36.0 Devin Review on PR #188 (15:53Z): the renamed [0.36.0] section had two separate ### Added blocks and two separate ### Changed blocks, which violates Keep-a-Changelog grouping (and CLAUDE.md's explicit 'group by section' rule). Merged each set into a single ordered block: Added, Changed, Fixed. No content removed; only reflowed. --- CHANGELOG.md | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 766a997..888c5ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,23 +15,19 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C Combined performance + analyst-clarity bundle. Folds three previously-staged work streams into one PR (#188): the long-running `agnes query --remote` timeout (#181), the Caddy parquet-download bypass (#182), and Pavel's #185 Phase 1 trace findings (silent 44-min first-init, opaque CLI tracebacks, no analyst-Claude size signal). Also performs the Tier 1 event-loop unblocking — the five hottest BQ-touching endpoints were `async def` over synchronous DuckDB / BQ-extension calls, so a single heavy `agnes query --remote` froze every other request for the duration of the BQ wait. The image-side fixes ship in this release; for existing VMs, the new auto-upgrade.sh self-fetches the matching Caddyfile + compose overlays from `main` on its next 5-minute tick, so deployment requires no operator action beyond letting the cron run. ### Added +- **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. +- **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch. - **`agnes init` / `agnes pull --skip-materialize`** — opts the first sync out of materialized-mode tables (server-side scheduled-query parquets, often multi-GB). Pavel's #185 Phase 1: a single 6.3 GB `order_economics` parquet kept first init silent for 44 minutes. Materialized rows stay discoverable via `agnes catalog`; rerun without the flag once the analyst actually needs them locally. - **`agnes pull` progress bar** — Rich-driven aggregate transfer display rendered to stderr when not `--quiet` and not `--json`. Per-file label + bytes / total / rate / ETA, aggregated across the parallel `ThreadPoolExecutor` workers introduced earlier in this PR. Replaces the prior 0-stdout silence on first init. - **CLI clean-error wrapper** (`cli/main.py:_run_with_clean_errors`, new entry point in `pyproject.toml`) — `httpx.ReadTimeout` / `ConnectError` / `RemoteProtocolError` etc. used to dump a five-frame Python traceback to the analyst's terminal when a `agnes query --remote` against a slow BQ view timed out client-side. Now: one-line `Error: …` message + actionable hint (e.g. "narrow the WHERE on the partition column from `agnes catalog --json`, or run `agnes snapshot create --estimate`"), exit code 1. Full traceback is appended to `~/.config/agnes/last-error.log` so an operator can recover it for support without spamming the analyst's terminal. Implemented as `AgnesTransportError` raised from the `api_get` / `api_post` / `api_delete` / `api_patch` / `stream_download` helpers in `cli/client.py`; the top-level Typer wrapper renders it. Unhandled `Exception`s are caught at the same boundary, logged, and printed as "internal CLI error (see logfile)" so a Python traceback never leaks to the analyst. - -### Changed -- **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request** — `/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below. -- **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency. - -### Added -- **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. -- **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch. - **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it. - **Caddy `file_server` for parquet downloads** — `GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/.parquet`, `keboola/data/.parquet`, `jira/data/.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged. - **Workspace prompt: decision tree, common-mistakes callout, failure-mode dictionary** in `config/claude_md_template.txt` (the template `agnes init` writes to `/CLAUDE.md`). Surfaces every catalog-row field analyst Claude should read before deciding which command to use (`query_mode`, `sql_flavor`, `where_examples`, `fetch_via`, `rough_size_hint`); explicitly binds `--estimate` to `agnes snapshot create` ONLY (was the most-failed first-try misuse — fails with `No such option: --estimate` on `agnes query`); calls out the `agnes fetch` → `agnes snapshot create` rename so stale-doc analysts don't run a non-command; documents the BQ permission model (server SA, not personal Google identity) and a 6-row failure-mode table mapping each common error wording to its cause + the right next step. - **`rough_size_hint` populated for `local` + `materialized` catalog rows** in `GET /api/v2/catalog` (was hardcoded `null` with a "Task 8" TODO). Reads the parquet file size at `${DATA_DIR}/extracts//data/.parquet` and buckets into `small` (≤100 MiB), `medium` (≤1 GiB), `large` (≤10 GiB), `very_large` (>10 GiB). `remote` rows stay `null` for now (size requires a BQ INFORMATION_SCHEMA call; tracked separately). Lets analyst Claude pick `agnes snapshot create` over `agnes query --remote` by inspecting `agnes catalog --json` rather than discovering size empirically via a failed `--remote` round-trip. ### Changed +- **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request** — `/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below. +- **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency. - **CLI update-banner now says `agnes` instead of `da`** (`cli/update_check.py:format_outdated_notice`). The string `[update] da X is out of date` had survived the `da` → `agnes` CLI rename and was the most-visible stale identifier in the analyst-facing surface — every CLI command printed it on stderr when a newer wheel was available. ### Fixed