Merge pull request #188 from keboola/zs/combined-perf-and-clarity

release: 0.36.0 — perf + analyst-clarity bundle

BQ query timeout knob, Caddy file_server parquet bypass, parallel
parquet pulls, auto-upgrade self-update, Tier 1 event-loop unblocking,
clean CLI errors + init progress + skip-materialize, workspace prompt
decision tree + size hint.
This commit is contained in:
ZdenekSrotyr 2026-05-05 19:22:53 +02:00 committed by GitHub
commit 1315f9f93c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 1062 additions and 74 deletions

View file

@ -10,8 +10,29 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [Unreleased] ## [Unreleased]
## [0.36.0] — 2026-05-05
Combined performance + analyst-clarity bundle. Folds three previously-staged work streams into one PR (#188): the long-running `agnes query --remote` timeout (#181), the Caddy parquet-download bypass (#182), and Pavel's #185 Phase 1 trace findings (silent 44-min first-init, opaque CLI tracebacks, no analyst-Claude size signal). Also performs the Tier 1 event-loop unblocking — the five hottest BQ-touching endpoints were `async def` over synchronous DuckDB / BQ-extension calls, so a single heavy `agnes query --remote` froze every other request for the duration of the BQ wait. The image-side fixes ship in this release; for existing VMs, the new auto-upgrade.sh self-fetches the matching Caddyfile + compose overlays from `main` on its next 5-minute tick, so deployment requires no operator action beyond letting the cron run.
### Added
- **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it.
- **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch.
- **`agnes init` / `agnes pull --skip-materialize`** — opts the first sync out of materialized-mode tables (server-side scheduled-query parquets, often multi-GB). Pavel's #185 Phase 1: a single 6.3 GB `order_economics` parquet kept first init silent for 44 minutes. Materialized rows stay discoverable via `agnes catalog`; rerun without the flag once the analyst actually needs them locally.
- **`agnes pull` progress bar** — Rich-driven aggregate transfer display rendered to stderr when not `--quiet` and not `--json`. Per-file label + bytes / total / rate / ETA, aggregated across the parallel `ThreadPoolExecutor` workers introduced earlier in this PR. Replaces the prior 0-stdout silence on first init.
- **CLI clean-error wrapper** (`cli/main.py:_run_with_clean_errors`, new entry point in `pyproject.toml`) — `httpx.ReadTimeout` / `ConnectError` / `RemoteProtocolError` etc. used to dump a five-frame Python traceback to the analyst's terminal when a `agnes query --remote` against a slow BQ view timed out client-side. Now: one-line `Error: …` message + actionable hint (e.g. "narrow the WHERE on the partition column from `agnes catalog --json`, or run `agnes snapshot create --estimate`"), exit code 1. Full traceback is appended to `~/.config/agnes/last-error.log` so an operator can recover it for support without spamming the analyst's terminal. Implemented as `AgnesTransportError` raised from the `api_get` / `api_post` / `api_delete` / `api_patch` / `stream_download` helpers in `cli/client.py`; the top-level Typer wrapper renders it. Unhandled `Exception`s are caught at the same boundary, logged, and printed as "internal CLI error (see logfile)" so a Python traceback never leaks to the analyst.
- **`scripts/ops/agnes-auto-upgrade.sh` now re-fetches Caddyfile + every compose overlay** from `keboola/agnes-the-ai-analyst@main` on every tick, hashes them, and triggers a `docker compose up -d` recreation when the hash changes — same path as an image-digest change. Pre-fix the script only watched `docker images` digests, so a Caddyfile or compose change in main never reached running VMs (only fresh boots ran `startup.sh`'s file fetch). Without this, the new file_server downloads-path below would land in the image but stay inert against an old Caddyfile. The script also self-updates from the same path so the very fix that watches config files isn't itself stuck on running VMs. Fail-soft on curl errors — keeps the existing file rather than blanking it.
- **Caddy `file_server` for parquet downloads**`GET /api/data/{table_id}/download` is now intercepted at the Caddy layer (TLS profile only) and served directly via sendfile/zero-copy from the data volume mounted read-only at `/srv` inside the caddy container. Caddy authorises every request via a new lightweight RBAC probe `GET /api/data/{table_id}/check-access` (returns 204 when the caller has read access on the table, 403 otherwise) using the `forward_auth` directive — the bulk byte transfer never touches uvicorn workers. Resolves a real production failure mode where a single multi-GB analyst pull held the app's only uvicorn worker for the duration of the stream and starved the UI / `/api/health` / every other API endpoint, eventually flipping the container to `unhealthy`. Path discovery uses Caddy's `try_files` over the known `extract.duckdb` v2 source subdirs (`bigquery/data/<id>.parquet`, `keboola/data/<id>.parquet`, `jira/data/<id>.parquet`); a parquet not at any of those paths transparently falls through to the existing app handler so legacy `src_data/parquet` layouts and future connectors keep working with no Caddyfile change. Non-Caddy deployments (dev `docker compose up` without `--profile tls`) continue to use the app handler unchanged.
- **Workspace prompt: decision tree, common-mistakes callout, failure-mode dictionary** in `config/claude_md_template.txt` (the template `agnes init` writes to `<workspace>/CLAUDE.md`). Surfaces every catalog-row field analyst Claude should read before deciding which command to use (`query_mode`, `sql_flavor`, `where_examples`, `fetch_via`, `rough_size_hint`); explicitly binds `--estimate` to `agnes snapshot create` ONLY (was the most-failed first-try misuse — fails with `No such option: --estimate` on `agnes query`); calls out the `agnes fetch``agnes snapshot create` rename so stale-doc analysts don't run a non-command; documents the BQ permission model (server SA, not personal Google identity) and a 6-row failure-mode table mapping each common error wording to its cause + the right next step.
- **`rough_size_hint` populated for `local` + `materialized` catalog rows** in `GET /api/v2/catalog` (was hardcoded `null` with a "Task 8" TODO). Reads the parquet file size at `${DATA_DIR}/extracts/<source_type>/data/<table_id>.parquet` and buckets into `small` (≤100 MiB), `medium` (≤1 GiB), `large` (≤10 GiB), `very_large` (>10 GiB). `remote` rows stay `null` for now (size requires a BQ INFORMATION_SCHEMA call; tracked separately). Lets analyst Claude pick `agnes snapshot create` over `agnes query --remote` by inspecting `agnes catalog --json` rather than discovering size empirically via a failed `--remote` round-trip.
### Changed
- **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request**`/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below.
- **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency.
- **CLI update-banner now says `agnes` instead of `da`** (`cli/update_check.py:format_outdated_notice`). The string `[update] da X is out of date` had survived the `da``agnes` CLI rename and was the most-visible stale identifier in the analyst-facing surface — every CLI command printed it on stderr when a newer wheel was available.
### Fixed ### Fixed
- **CLI ReadTimeout message reports the actual httpx timeout** (was hardcoded to `QUERY_TIMEOUT_S` = 300s). On a 30s-default call (`agnes catalog`, `agnes auth`, …) the analyst saw "didn't respond within the read timeout (300s)" while the call had actually given up after 30s — confusing and unactionable. The translator now takes the real timeout from the calling helper and renders it; the long-running-BQ advisory only appears for calls where the timeout was set ≥ 60s. Devin Review on PR #188.
- Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes: - Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes:
- `kbcstorage>=0.9.0` is promoted from optional to core dependency. The legacy fallback path in `connectors/keboola/extractor.py:_extract_via_legacy` has been there since the extension landed, but until now the bare `from kbcstorage.client import Client` would crash any default install with `ModuleNotFoundError`. - `kbcstorage>=0.9.0` is promoted from optional to core dependency. The legacy fallback path in `connectors/keboola/extractor.py:_extract_via_legacy` has been there since the extension landed, but until now the bare `from kbcstorage.client import Client` would crash any default install with `ModuleNotFoundError`.
- `connectors/keboola/extractor.py:run` now wraps `_extract_via_extension` in a per-table try/except — on any per-table scan failure it retries via the legacy client. Previously, when `ATTACH` succeeded but the table-level `COPY (SELECT * FROM kbc."<bucket>"."<table>")` failed, the table was just marked failed with no retry. - `connectors/keboola/extractor.py:run` now wraps `_extract_via_extension` in a per-table try/except — on any per-table scan failure it retries via the legacy client. Previously, when `ATTACH` succeeded but the table-level `COPY (SELECT * FROM kbc."<bucket>"."<table>")` failed, the table was just marked failed with no retry.

View file

@ -34,6 +34,60 @@
-Server -Server
} }
# Direct file_server for parquet downloads bypasses uvicorn so a
# multi-GB pull from one analyst can't starve the app workers and
# block UI / health / API for everyone else. forward_auth calls the
# app's lightweight ``/api/data/{id}/check-access`` (RBAC only,
# ~1 ms) on every request; on 2xx Caddy serves the file directly
# via sendfile/zero-copy from the data volume mounted read-only.
#
# Path layout matches `app/api/data.py`'s extract.duckdb v2 search:
# /data/extracts/<source_type>/data/<table_id>.parquet
# try_files probes known source subdirs in order; first hit wins.
# If a deployment adds a new connector and lands parquets at a fresh
# subdir, extend the try_files list. Anything that misses falls
# through to the app reverse_proxy below so an unmapped source
# degrades to "downloads work, just through uvicorn" never 404.
@download path_regexp tid ^/api/data/([^/]+)/download$
handle @download {
forward_auth app:8000 {
uri /api/data/{re.tid.1}/check-access
# Bearer PAT or session cookie travels in Authorization
# / Cookie; copy_headers ensures the upstream sees them.
copy_headers Authorization Cookie
}
# Caddy's own /data is occupied by the caddy_data volume, so the
# agnes data dir is mounted at /srv (read-only) instead see the
# `data:/srv:ro` line in docker-compose.yml's caddy service. The
# root + try_files combo therefore probes /srv/extracts/...
#
# Devin Review caught: `try_files A B C` rewrites the URI to its
# LAST entry when no file matches (per Caddy docs). Without an
# explicit "rewrite back to original URI" fallback, a parquet
# missing from all three known paths would get rewritten to the
# last static candidate (`/jira/data/<id>.parquet`), and the
# reverse_proxy below would forward THAT rewritten URI to
# app:8000 app has no such route 404. To make the documented
# "missed falls through to app handler" promise hold, append
# the original `/api/data/<id>/download` path as the final
# try_files entry: when no file matches, the URI is rewritten
# back to the analyst-facing path and the app's `download_table`
# handler picks it up via the reverse_proxy fallback below.
root * /srv/extracts
try_files /bigquery/data/{re.tid.1}.parquet /keboola/data/{re.tid.1}.parquet /jira/data/{re.tid.1}.parquet /api/data/{re.tid.1}/download
@found file
handle @found {
header Content-Disposition "attachment; filename=\"{re.tid.1}.parquet\""
file_server
}
# Fallback: parquet not at any known static path defer to app
# (handles legacy src_data/parquet layout + future connectors).
reverse_proxy app:8000 {
header_up X-Forwarded-Proto https
header_up X-Forwarded-Host {host}
}
}
reverse_proxy app:8000 { reverse_proxy app:8000 {
# App's uvicorn runs with --proxy-headers, so stamping these # App's uvicorn runs with --proxy-headers, so stamping these
# ourselves makes OAuth callback URLs and Set-Cookie Secure # ourselves makes OAuth callback URLs and Set-Cookie Secure

View file

@ -285,6 +285,24 @@ _KNOWN_FIELDS: dict[str, dict[str, dict]] = {
"`agnes snapshot create` suggestion. 0 disables the gate. Default 5368709120 = 5 GiB." "`agnes snapshot create` suggestion. 0 disables the gate. Default 5368709120 = 5 GiB."
), ),
}, },
"query_timeout_ms": {
"kind": "int",
"default": 600000,
"hint": (
"DuckDB BigQuery extension query timeout (milliseconds). Applied "
"via `SET bq_query_timeout_ms` after every `LOAD bigquery` on "
"every BQ-touching DuckDB session (orchestrator remote-view "
"ATTACH, BqAccess factory, standalone extractor). Extension "
"default is 90 000 ms = 90 s, which is too tight for analyst "
"queries against view-backed datasets — bumped to 600 000 ms = "
"10 min by default. Set 0 to fall through to the extension "
"default. Note: the underlying BQ jobs.query RPC caps the wait "
"at ~200 s per call; the extension polls on top, so the "
"effective ceiling is this value but each poll round-trip is "
"~200 s. DuckDB itself emits a warning when this is set above "
"~200 s — that warning is informational, not an error."
),
},
}, },
}, },
"keboola": { "keboola": {

View file

@ -1,6 +1,6 @@
"""Data download endpoint — streaming parquet files.""" """Data download endpoint — streaming parquet files."""
from fastapi import APIRouter, Depends, HTTPException, Request from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
import duckdb import duckdb
@ -12,6 +12,36 @@ from src.rbac import can_access_table
router = APIRouter(prefix="/api/data", tags=["data"]) router = APIRouter(prefix="/api/data", tags=["data"])
@router.get("/{table_id}/check-access")
async def check_access(
table_id: str,
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Lightweight RBAC probe used by Caddy's ``forward_auth`` directive
to gate file_server-served parquet downloads without involving the
app's request workers in the bulk byte transfer.
Returns HTTP 204 No Content when the caller has read access to
``table_id``; HTTP 403 (via ``can_access_table`` returning False)
otherwise. Caddy treats 2xx as authorized and forwards the request
to its own ``file_server`` block; non-2xx is returned to the client
verbatim.
Why a separate endpoint and not just ``HEAD /download``: ``HEAD`` on
the FileResponse-based ``download`` handler still opens the file and
runs stat() to populate Content-Length / ETag. ``forward_auth`` calls
this endpoint on every request, so the per-call cost matters; a pure
RBAC check is ~1 ms while a HEAD path involves filesystem walks
(``rglob`` for the parquet across source subdirs).
"""
if not _SAFE_QUOTED_IDENTIFIER.match(table_id):
raise HTTPException(status_code=404, detail="Table not found")
if not can_access_table(user, table_id, conn):
raise HTTPException(status_code=403, detail="Access denied to this table")
return Response(status_code=204)
@router.get("/{table_id}/download") @router.get("/{table_id}/download")
async def download_table( async def download_table(
table_id: str, table_id: str,
@ -19,7 +49,16 @@ async def download_table(
user: dict = Depends(get_current_user), user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),
): ):
"""Stream a parquet file for download. Supports ETag for caching.""" """Stream a parquet file for download. Supports ETag for caching.
On Caddy-fronted deployments the matching Caddyfile rule intercepts
``GET /api/data/{table_id}/download``, calls ``check-access`` via
``forward_auth``, and serves the parquet directly via ``file_server``
bypassing this handler entirely. This handler stays as the
canonical fallback for non-Caddy deployments (dev `docker compose
up`, alternative reverse proxies, direct :8000 access) where the
bulk transfer goes through uvicorn.
"""
# Reject unsafe table_id before any filesystem or DB operations. # Reject unsafe table_id before any filesystem or DB operations.
# Use the relaxed quoted-identifier check that allows dots and hyphens # Use the relaxed quoted-identifier check that allows dots and hyphens
# (Keboola table IDs like "in.c-crm.orders") while still blocking # (Keboola table IDs like "in.c-crm.orders") while still blocking
@ -53,7 +92,6 @@ async def download_table(
etag = f'"{stat.st_mtime_ns}"' etag = f'"{stat.st_mtime_ns}"'
if_none_match = request.headers.get("if-none-match") if_none_match = request.headers.get("if-none-match")
if if_none_match == etag: if if_none_match == etag:
from starlette.responses import Response
return Response(status_code=304) return Response(status_code=304)
return FileResponse( return FileResponse(

View file

@ -71,12 +71,23 @@ class QueryResponse(BaseModel):
@router.post("", response_model=QueryResponse) @router.post("", response_model=QueryResponse)
async def execute_query( def execute_query(
request: QueryRequest, request: QueryRequest,
user: dict = Depends(get_current_user), user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),
): ):
"""Execute SQL against the server analytics DuckDB.""" """Execute SQL against the server analytics DuckDB.
Plain ``def`` (not ``async def``) so FastAPI auto-offloads the call
to the anyio thread pool. The body invokes ``analytics.execute(sql)``
synchronously, which blocks for the full BQ jobs.query wait when a
referenced view resolves through the BQ extension. Under ``async def``
that block holds the single uvicorn event loop, freezing every other
request (UI, /api/health, auth) until the query returns. Plain ``def``
runs each invocation on its own thread, so heavy queries no longer
starve unrelated endpoints. See PR #188's CHANGELOG entry for the
Tier 1 event-loop unblocking rollout.
"""
sql_lower = request.sql.strip().lower() sql_lower = request.sql.strip().lower()
# Block everything except SELECT # Block everything except SELECT

View file

@ -2,10 +2,12 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
import duckdb import duckdb
from app.auth.dependencies import get_current_user, _get_db from app.auth.dependencies import get_current_user, _get_db
from app.utils import get_data_dir as _get_data_dir
from src.rbac import can_access_table from src.rbac import can_access_table
from src.repositories.table_registry import TableRegistryRepository from src.repositories.table_registry import TableRegistryRepository
from app.api.v2_cache import TTLCache from app.api.v2_cache import TTLCache
@ -43,6 +45,52 @@ def _fetch_hint(table_id: str, source_type: str) -> str:
return "already local — query directly via `agnes query`" return "already local — query directly via `agnes query`"
# Coarse size buckets for `rough_size_hint`. Boundaries chosen so an analyst
# Claude can decide tool by inspection: anything `large` or worse implies
# `agnes snapshot create` over `agnes query --remote`. Numbers reflect the
# default `bq_max_scan_bytes` 5 GiB ceiling — at "large" you're already at
# half the per-query gate and a naive `--remote` is likely to refuse.
_SIZE_BUCKETS = (
(10 * 2**20, "small"), # ≤10 MiB
(100 * 2**20, "small"), # ≤100 MiB still small (analyst-laptop scale)
(1 * 2**30, "medium"), # ≤1 GiB
(10 * 2**30, "large"), # ≤10 GiB
)
def _bucket_size(byte_count: int) -> str:
for cap, label in _SIZE_BUCKETS:
if byte_count <= cap:
return label
return "very_large"
def _materialized_size_hint(table_id: str, source_type: str, query_mode: str) -> str | None:
"""Return a rough size bucket for a row whose data is on the server's
local filesystem (any `query_mode` that produces a parquet `local` and
`materialized`). Returns ``None`` for `remote` (size requires a BQ
INFORMATION_SCHEMA round-trip; tracked separately) and for tables whose
parquet hasn't been materialised yet so the AI gets ``null`` not a
misleading "small".
Layout matches the v2 extract.duckdb contract:
${DATA_DIR}/extracts/<source_type>/data/<table_id>.parquet
"""
if query_mode == "remote":
return None
if not source_type:
return None
try:
path = Path(_get_data_dir()) / "extracts" / source_type / "data" / f"{table_id}.parquet"
if not path.exists():
return None
return _bucket_size(path.stat().st_size)
except Exception:
# Filesystem stat() race / permissions / weird DATA_DIR — fall back
# to null rather than crash the whole catalog response.
return None
def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict:
rows = _table_rows_cache.get(_TABLE_ROWS_KEY) rows = _table_rows_cache.get(_TABLE_ROWS_KEY)
if rows is None: if rows is None:
@ -66,7 +114,10 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict:
"sql_flavor": _flavor_for(r.get("source_type") or ""), "sql_flavor": _flavor_for(r.get("source_type") or ""),
"where_examples": _examples_for(r.get("source_type") or ""), "where_examples": _examples_for(r.get("source_type") or ""),
"fetch_via": _fetch_hint(r["id"], r.get("source_type") or ""), "fetch_via": _fetch_hint(r["id"], r.get("source_type") or ""),
"rough_size_hint": None, # populated by Task 8 schema endpoint when called "rough_size_hint": _materialized_size_hint(
r["id"], r.get("source_type") or "",
r.get("query_mode") or "local",
),
}) })
return { return {
@ -76,8 +127,17 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict:
@router.get("/catalog") @router.get("/catalog")
async def catalog( def catalog(
user: dict = Depends(get_current_user), user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),
): ):
# Plain ``def`` so FastAPI auto-offloads to the anyio thread pool —
# build_catalog now calls `_materialized_size_hint` for every visible
# row, which does sync `Path.stat()` / `Path.exists()` on the data
# volume. On local FS that's microseconds, but on a network-mounted
# DATA_DIR (NFS / CIFS / GCS-FUSE) those calls can block. Plain ``def``
# means each request runs on its own thread; the event loop stays
# free for non-catalog traffic. Mirrors the Tier 1 conversion of
# /api/query, /api/v2/scan, /api/v2/sample, /api/v2/schema —
# Devin Review on PR #188.
return build_catalog(conn, user) return build_catalog(conn, user)

View file

@ -104,13 +104,15 @@ def build_sample(
@router.get("/sample/{table_id}") @router.get("/sample/{table_id}")
async def sample( def sample(
table_id: str, table_id: str,
n: int = Query(default=5, ge=1, le=_MAX_N), n: int = Query(default=5, ge=1, le=_MAX_N),
user: dict = Depends(get_current_user), user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),
bq: BqAccess = Depends(get_bq_access), bq: BqAccess = Depends(get_bq_access),
): ):
# Plain ``def`` — opens a `bq.duckdb_session()` and runs sync queries
# through the BQ extension. See PR #188 Tier 1 entry.
try: try:
return build_sample(conn, user, table_id, n=n, bq=bq) return build_sample(conn, user, table_id, n=n, bq=bq)
except FileNotFoundError: except FileNotFoundError:

View file

@ -218,12 +218,17 @@ def _avg_bytes_for_type(t: str) -> int:
@router.post("/scan/estimate") @router.post("/scan/estimate")
async def scan_estimate_endpoint( def scan_estimate_endpoint(
raw: dict, raw: dict,
user: dict = Depends(get_current_user), user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),
bq: BqAccess = Depends(get_bq_access), bq: BqAccess = Depends(get_bq_access),
): ):
# Plain ``def`` so FastAPI auto-offloads to the anyio thread pool — the
# estimate path calls into google-cloud-bigquery's `client.query(...,
# dry_run=True)` which blocks until BQ returns the dry-run cost. Under
# ``async def`` that wait holds the event loop. See PR #188's Tier 1
# entry for the wider rollout.
try: try:
return estimate(conn, user, raw, bq=bq) return estimate(conn, user, raw, bq=bq)
except WhereValidationError as e: except WhereValidationError as e:
@ -374,7 +379,7 @@ def run_scan(
@router.post("/scan") @router.post("/scan")
async def scan_endpoint( def scan_endpoint(
raw: dict, raw: dict,
user: dict = Depends(get_current_user), user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),

View file

@ -209,12 +209,14 @@ def build_schema(
@router.get("/schema/{table_id}") @router.get("/schema/{table_id}")
async def schema( def schema(
table_id: str, table_id: str,
user: dict = Depends(get_current_user), user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),
bq: BqAccess = Depends(get_bq_access), bq: BqAccess = Depends(get_bq_access),
): ):
# Plain ``def`` — opens a `bq.duckdb_session()` and runs sync metadata
# queries through the BQ extension. See PR #188 Tier 1 entry.
try: try:
return build_schema(conn, user, table_id, bq=bq) return build_schema(conn, user, table_id, bq=bq)
except NotFound: except NotFound:

View file

@ -141,6 +141,23 @@ async def lifespan(app):
log_effective_policy() log_effective_policy()
except Exception: except Exception:
pass # never block startup on a logging convenience pass # never block startup on a logging convenience
# Bump anyio's default thread pool size from 40 → AGNES_THREADPOOL_SIZE
# (default 200). FastAPI auto-runs every plain `def` route handler in
# this pool — the Tier 1 endpoints converted in PR #188 (`/api/query`,
# `/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`) all block on
# synchronous DuckDB / BQ-extension calls inside the handler body and
# would otherwise serialise once 40 are in flight. 200 keeps the per-
# process working set well under the BQ extension's connection cap
# while leaving headroom for concurrent UI / health probes.
try:
import anyio.to_thread
size = int(os.environ.get("AGNES_THREADPOOL_SIZE", "200"))
anyio.to_thread.current_default_thread_limiter().total_tokens = size
logger.info("anyio thread pool capacity set to %d", size)
except Exception as e:
logger.warning("failed to bump anyio thread pool capacity: %s", e)
yield yield
from src.db import close_system_db from src.db import close_system_db
close_system_db() close_system_db()

View file

@ -2,12 +2,14 @@
import os import os
import time import time
import traceback
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
import httpx import httpx
from cli.config import get_server_url, get_token from cli.config import _config_dir, get_server_url, get_token
# Retry policy for transient failures during stream downloads. Scoped to # Retry policy for transient failures during stream downloads. Scoped to
# network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via # network issues and 5xx — 4xx (auth, 404, 400) is NOT retried. Tunable via
@ -21,6 +23,125 @@ _RETRY_BACKOFFS_S = (0.3, 1.0, 3.0) # seconds before attempt 2, 3, 4
QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300")) QUERY_TIMEOUT_S = float(os.environ.get("AGNES_QUERY_TIMEOUT", "300"))
# ── Transport-error translation ─────────────────────────────────────────
# Pavel's Issue #185 Phase 3B caught the failure mode: when httpx raises
# `ReadTimeout` / `ConnectError` / `RemoteProtocolError` and the CLI
# command doesn't catch it, Typer dumps a five-frame Python traceback to
# the analyst's terminal. That looks like a CLI bug to a non-Python user
# and obscures the actionable signal ("server slow, try snapshot create").
# Translate transport exceptions to `AgnesTransportError` with a typed
# user-facing message, log the full traceback to `~/.config/agnes/last-
# error.log` for debug, and let the top-level CLI handler render the
# clean message + exit non-zero.
_LOG_FILE = _config_dir() / "last-error.log"
class AgnesTransportError(Exception):
"""Network / transport failure with a user-actionable message.
Raised by the api_* / stream_download helpers when httpx surfaces a
connection / timeout / protocol error. The CLI's top-level Typer
handler catches this, prints `.user_message` (NOT the traceback),
and exits non-zero. Full traceback goes to ``~/.config/agnes/last-
error.log`` so an operator can recover it for support.
"""
def __init__(self, user_message: str, *, hint: str = "", logfile_path: Path | None = None):
super().__init__(user_message)
self.user_message = user_message
self.hint = hint
self.logfile_path = logfile_path
def _log_traceback(exc: BaseException, *, context: str) -> Path:
"""Append a timestamped traceback to ``~/.config/agnes/last-error.log``
and return the path. Best-effort never raises (a logging failure
must not mask the original error)."""
try:
with open(_LOG_FILE, "a", encoding="utf-8") as f:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
f.write(f"\n=== {ts} {context} ===\n")
traceback.print_exception(type(exc), exc, exc.__traceback__, file=f)
except Exception:
pass
return _LOG_FILE
def _translate_transport_error(
exc: Exception, *, context: str, timeout_s: float | None = None,
) -> AgnesTransportError:
"""Map httpx transport exceptions to user-facing CLI messages. The
mapping is intentionally pragmatic analysts care about "what do I
do next", not the gRPC / TCP detail.
`timeout_s`, when supplied, is the actual httpx timeout used by the
failing call so the ReadTimeout message reports the real wait window
(a `agnes catalog` GET dies at 30s, not 300s Devin Review on PR
#188 caught the original signature hardcoding `QUERY_TIMEOUT_S`,
which only matches `agnes query --remote`)."""
log = _log_traceback(exc, context=context)
if isinstance(exc, httpx.ReadTimeout):
wait_s = timeout_s if timeout_s is not None else QUERY_TIMEOUT_S
# The "long-running BQ" advisory only makes sense when the call
# actually hit the query path (timeout ≥ ~60s). For short calls
# (the 30s default on `agnes catalog` etc.) it's just confusing.
if wait_s >= 60:
hint = (
"If this is `agnes query --remote` against a heavy BQ view, "
"the underlying BQ job took longer than the wait window. Try:\n"
" • narrow the WHERE (especially the partition column from `agnes catalog --json`)\n"
" • `agnes snapshot create <table> ... --estimate` to materialize once + query locally\n"
" • set AGNES_QUERY_TIMEOUT=600 for a longer client-side wait\n"
f"Full traceback: {log}"
)
else:
hint = (
"Server is slow or unreachable. Check `agnes status`; "
"re-run if transient.\n"
f"Full traceback: {log}"
)
return AgnesTransportError(
f"Server didn't respond within the read timeout ({wait_s:.0f}s) "
f"for {context}.",
hint=hint,
logfile_path=log,
)
if isinstance(exc, httpx.ConnectError):
return AgnesTransportError(
f"Can't reach the agnes server for {context}.",
hint=(
"Check the server URL with `agnes status`, network reachability "
"(VPN / DNS / firewall), and the TLS-trust setup if this is a "
f"corporate-CA deployment.\nFull traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, (httpx.RemoteProtocolError, httpx.ReadError, httpx.WriteError)):
return AgnesTransportError(
f"Connection broke mid-flight on {context}.",
hint=(
"Usually a transient network blip. Re-run the command. If it "
f"keeps happening, check `agnes status`.\nFull traceback: {log}"
),
logfile_path=log,
)
if isinstance(exc, httpx.TimeoutException):
return AgnesTransportError(
f"Network timeout on {context}.",
hint=f"Re-run; if persistent, check the server.\nFull traceback: {log}",
logfile_path=log,
)
# Anything else: re-wrap with a generic message so the CLI doesn't
# dump the traceback. We'd prefer a typed translation; if you hit
# this branch, add a clause above.
return AgnesTransportError(
f"Unexpected error on {context}: {type(exc).__name__}.",
hint=f"Full traceback: {log}",
logfile_path=log,
)
def get_client(timeout: float = 30.0) -> httpx.Client: def get_client(timeout: float = 30.0) -> httpx.Client:
"""Get an authenticated httpx client.""" """Get an authenticated httpx client."""
token = get_token() token = get_token()
@ -35,23 +156,35 @@ def get_client(timeout: float = 30.0) -> httpx.Client:
def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_get(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
try:
with get_client(timeout=timeout) as client: with get_client(timeout=timeout) as client:
return client.get(path, **kwargs) return client.get(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"GET {path}", timeout_s=timeout) from exc
def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_post(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
try:
with get_client(timeout=timeout) as client: with get_client(timeout=timeout) as client:
return client.post(path, **kwargs) return client.post(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"POST {path}", timeout_s=timeout) from exc
def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_delete(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
try:
with get_client(timeout=timeout) as client: with get_client(timeout=timeout) as client:
return client.delete(path, **kwargs) return client.delete(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"DELETE {path}", timeout_s=timeout) from exc
def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response:
try:
with get_client(timeout=timeout) as client: with get_client(timeout=timeout) as client:
return client.patch(path, **kwargs) return client.patch(path, **kwargs)
except httpx.HTTPError as exc:
raise _translate_transport_error(exc, context=f"PATCH {path}", timeout_s=timeout) from exc
def _is_transient(exc: Exception) -> bool: def _is_transient(exc: Exception) -> bool:
@ -98,7 +231,23 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int:
if attempt == _RETRY_ATTEMPTS or not _is_transient(exc): if attempt == _RETRY_ATTEMPTS or not _is_transient(exc):
break break
time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)]) time.sleep(_RETRY_BACKOFFS_S[min(attempt, len(_RETRY_BACKOFFS_S) - 1)])
# Clean up any leftover tmp, then surface the last exception. # Clean up any leftover tmp, then surface the last exception. Translate
# transport errors (timeouts, connection drops, protocol errors) to
# AgnesTransportError so the CLI prints a clean message instead of a
# Python traceback (Pavel's #185 Phase 3B). HTTPStatusError (4xx/5xx
# response from the server) is NOT a transport failure and must
# re-raise verbatim so the caller's status-code handling + the rich
# server error body (e.g. 401 with "token expired", 403 with
# cross_project_forbidden detail) reach the analyst — Devin Review on
# PR #188 caught: HTTPStatusError is a subclass of HTTPError, so the
# generic isinstance(HTTPError) translation was eating status codes.
tmp_path.unlink(missing_ok=True) tmp_path.unlink(missing_ok=True)
assert last_exc is not None assert last_exc is not None
if isinstance(last_exc, httpx.HTTPStatusError):
raise last_exc
if isinstance(last_exc, httpx.HTTPError):
raise _translate_transport_error(
last_exc, context=f"GET {path} (stream → {target_path})",
timeout_s=300.0,
) from last_exc
raise last_exc raise last_exc

View file

@ -64,6 +64,16 @@ def init(
token: str = typer.Option(..., "--token", help="Personal access token"), token: str = typer.Option(..., "--token", help="Personal access token"),
force: bool = typer.Option(False, "--force", help="Re-initialize an existing workspace"), force: bool = typer.Option(False, "--force", help="Re-initialize an existing workspace"),
workspace_str: Optional[str] = typer.Option(None, "--workspace", help="Target dir (default: cwd)"), workspace_str: Optional[str] = typer.Option(None, "--workspace", help="Target dir (default: cwd)"),
skip_materialize: bool = typer.Option(
False, "--skip-materialize",
help=(
"Skip materialized-mode tables on the first pull. The first "
"init can otherwise spend tens of minutes silently downloading "
"a single multi-GB scheduled-query parquet. Materialized rows "
"are still discoverable via `agnes catalog`; rerun `agnes pull` "
"without this flag once you actually need them locally."
),
),
): ):
"""Bootstrap workspace: auth, CLAUDE.md, hooks, first pull, AGNES_WORKSPACE.md.""" """Bootstrap workspace: auth, CLAUDE.md, hooks, first pull, AGNES_WORKSPACE.md."""
workspace = Path(workspace_str).resolve() if workspace_str else Path.cwd() workspace = Path(workspace_str).resolve() if workspace_str else Path.cwd()
@ -176,7 +186,15 @@ def init(
# exception escaping here is a programming error worth surfacing. # exception escaping here is a programming error worth surfacing.
# ------------------------------------------------------------------ # ------------------------------------------------------------------
try: try:
result: PullResult = run_pull(server_url, token, workspace) # `agnes init` always runs interactively (analyst typing the
# command), so progress is on by default — Pavel's #185 Phase 1
# was a 44-minute silent download on the very first install.
# Pass it through to run_pull.
result: PullResult = run_pull(
server_url, token, workspace,
skip_materialize=skip_materialize,
show_progress=True,
)
except Exception as exc: except Exception as exc:
typer.echo(render_error(0, {"detail": { typer.echo(render_error(0, {"detail": {
"kind": "manifest_unauthorized", "kind": "manifest_unauthorized",

View file

@ -38,6 +38,15 @@ def pull(
quiet: bool = typer.Option(False, "--quiet", help="Suppress success stdout (errors still surface on stderr)"), quiet: bool = typer.Option(False, "--quiet", help="Suppress success stdout (errors still surface on stderr)"),
as_json: bool = typer.Option(False, "--json", help="Emit a single JSON object summarizing the pull"), as_json: bool = typer.Option(False, "--json", help="Emit a single JSON object summarizing the pull"),
dry_run: bool = typer.Option(False, "--dry-run", help="Compute the delta without writing anything to disk"), dry_run: bool = typer.Option(False, "--dry-run", help="Compute the delta without writing anything to disk"),
skip_materialize: bool = typer.Option(
False, "--skip-materialize",
help=(
"Skip materialized-mode tables (server-side scheduled BQ "
"scan results, often multi-GB). Their data is still discoverable "
"via `agnes catalog` and remote-mode tables still pull. Useful "
"for a fast first init when an analyst only needs --remote access."
),
),
): ):
"""Refresh data from the server into ./server/parquet + ./user/duckdb.""" """Refresh data from the server into ./server/parquet + ./user/duckdb."""
server_url = get_server_url() server_url = get_server_url()
@ -68,8 +77,17 @@ def pull(
workspace = Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve() workspace = Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve()
# Show progress unless quiet (SessionStart hooks) or json (machine-
# readable output where Rich's terminal-control sequences would be
# garbage in the consumer's parser).
show_progress = not (quiet or as_json)
try: try:
result: PullResult = run_pull(server_url, token, workspace, dry_run=dry_run) result: PullResult = run_pull(
server_url, token, workspace,
dry_run=dry_run,
skip_materialize=skip_materialize,
show_progress=show_progress,
)
except Exception as exc: except Exception as exc:
# `run_pull` is documented to record per-table / per-stage failures # `run_pull` is documented to record per-table / per-stage failures
# under `result.errors` rather than raising, so reaching this branch # under `result.errors` rather than raising, so reaching this branch

View file

@ -112,6 +112,8 @@ def run_pull(
workspace: Path, workspace: Path,
*, *,
dry_run: bool = False, dry_run: bool = False,
skip_materialize: bool = False,
show_progress: bool = False,
) -> PullResult: ) -> PullResult:
"""Refresh local parquets + corporate memory rules from the server. """Refresh local parquets + corporate memory rules from the server.
@ -119,6 +121,17 @@ def run_pull(
Typer/Rich UI. Returns a `PullResult` summary; never raises for Typer/Rich UI. Returns a `PullResult` summary; never raises for
network/server errors (records them under `errors` instead) so the network/server errors (records them under `errors` instead) so the
caller can decide whether a partial pull is fatal. caller can decide whether a partial pull is fatal.
Args:
skip_materialize: When True, omit `query_mode='materialized'`
tables from the download set. Use for analysts who only
care about `--remote` access on the workspace and don't
want to wait on multi-GB scheduled-query parquets at first
init. Pavel's #185 Phase 1: a 6.3 GB `order_economics`
parquet kept first init silent for 44 minutes.
show_progress: When True, render a per-file progress bar to
stderr via Rich during the parallel download phase. Pass
False from `--quiet` callers (SessionStart hooks).
""" """
started = time.monotonic() started = time.monotonic()
result = PullResult() result = PullResult()
@ -159,6 +172,11 @@ def run_pull(
for tid, info in server_tables.items(): for tid, info in server_tables.items():
if info.get("query_mode") == "remote": if info.get("query_mode") == "remote":
continue continue
if skip_materialize and info.get("query_mode") == "materialized":
# Operator opt-out for first-init. Materialized rows are
# still discoverable via `agnes catalog` and queryable
# the next time `agnes pull` runs without --skip-materialize.
continue
non_remote_total += 1 non_remote_total += 1
local_hash = local_tables.get(tid, {}).get("hash", "") local_hash = local_tables.get(tid, {}).get("hash", "")
server_hash = info.get("hash", "") server_hash = info.get("hash", "")
@ -178,15 +196,75 @@ def run_pull(
result.duration_s = time.monotonic() - started result.duration_s = time.monotonic() - started
return result return result
# 4. Download parquets. Lazy mkdir: only create server/parquet/ # 4. Download parquets in parallel. Lazy mkdir: only create
# when we have at least one table to write into it. # server/parquet/ when we have at least one table to write into it.
for tid in to_download: # Concurrency capped by `AGNES_PULL_PARALLELISM` (default 4) so a
if not parquet_dir.exists(): # registry of 50+ tables doesn't open 50+ TCP connections + saturate
# the analyst's NIC; 4 matches typical home-broadband saturation
# without over-subscribing the server's caddy file_server (each
# request is a separate goroutine + sendfile, but the analyst's
# downlink is the more frequent bottleneck). Set to 1 to restore
# the pre-PR serial behavior for debug repro. The server-side
# bypass-uvicorn fix (Caddy file_server) is the other half —
# without it, parallel downloads would still queue on the single
# uvicorn worker.
if to_download and not parquet_dir.exists():
parquet_dir.mkdir(parents=True, exist_ok=True) parquet_dir.mkdir(parents=True, exist_ok=True)
try:
workers = max(1, int(os.environ.get("AGNES_PULL_PARALLELISM", "4")))
except ValueError:
workers = 4
# Drop to serial when there's only one (or zero) tables — avoids
# the executor + thread overhead for the common single-update case.
workers = min(workers, len(to_download)) if to_download else 1
# Optional progress bar — Rich's Progress tracks per-file bytes
# streamed, aggregated across the parallel ThreadPoolExecutor
# workers. Pavel's #185 Phase 1: a single 6.3 GB parquet on first
# init went 44 minutes silent, looked frozen. Now: aggregate "X.Y
# GB / Z.A GB · 56 MB/s · ETA 1m 20s" to stderr while threads
# stream. None when show_progress=False (SessionStart hooks etc.).
progress = None
progress_tasks: dict[str, int] = {}
if show_progress and to_download:
from rich.progress import (
Progress, BarColumn, DownloadColumn, TextColumn,
TimeRemainingColumn, TransferSpeedColumn,
)
progress = Progress(
TextColumn("[bold]{task.fields[label]}[/]"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TimeRemainingColumn(),
transient=False,
)
progress.start()
for tid in to_download:
size = int(server_tables[tid].get("size_bytes") or 0)
# Some manifest entries don't carry size — Rich shows
# an indeterminate bar in that case.
progress_tasks[tid] = progress.add_task(
"download", label=tid, total=size if size > 0 else None,
)
def _download_one(tid: str) -> tuple[str, dict | None, str | None]:
"""Returns (tid, local_table_entry_or_None, error_or_None).
One bound thread per call; stream_download is sync I/O so a
ThreadPoolExecutor (not asyncio) is the right tool. The
progress callback is thread-safe Rich's Progress.update
holds an internal lock."""
target = parquet_dir / f"{tid}.parquet" target = parquet_dir / f"{tid}.parquet"
expected_hash = server_tables[tid].get("hash", "") expected_hash = server_tables[tid].get("hash", "")
cb = None
if progress is not None and tid in progress_tasks:
task_id = progress_tasks[tid]
def cb(n: int, _tid=tid, _task=task_id):
progress.update(_task, advance=n)
try: try:
stream_download(f"/api/data/{tid}/download", str(target)) stream_download(f"/api/data/{tid}/download", str(target),
progress_callback=cb)
if expected_hash: if expected_hash:
actual_hash = _file_md5(target) actual_hash = _file_md5(target)
if actual_hash != expected_hash: if actual_hash != expected_hash:
@ -197,14 +275,32 @@ def run_pull(
elif not _is_valid_parquet(target): elif not _is_valid_parquet(target):
target.unlink(missing_ok=True) target.unlink(missing_ok=True)
raise ValueError("not a valid parquet (missing PAR1 magic)") raise ValueError("not a valid parquet (missing PAR1 magic)")
local_tables[tid] = { entry = {
"hash": expected_hash, "hash": expected_hash,
"rows": server_tables[tid].get("rows", 0), "rows": server_tables[tid].get("rows", 0),
"size_bytes": server_tables[tid].get("size_bytes", 0), "size_bytes": server_tables[tid].get("size_bytes", 0),
} }
result.tables_updated += 1 return tid, entry, None
except Exception as exc: except Exception as exc:
result.errors.append({"table": tid, "error": str(exc)}) return tid, None, str(exc)
try:
if workers <= 1:
outcomes = [_download_one(tid) for tid in to_download]
else:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as ex:
outcomes = list(ex.map(_download_one, to_download))
finally:
if progress is not None:
progress.stop()
for tid, entry, err in outcomes:
if err is not None:
result.errors.append({"table": tid, "error": err})
else:
local_tables[tid] = entry
result.tables_updated += 1
# 5. Persist sync state (only on real runs). # 5. Persist sync state (only on real runs).
# TODO(workspace-scoped-sync-state): currently saved to # TODO(workspace-scoped-sync-state): currently saved to

View file

@ -123,5 +123,41 @@ app.add_typer(snapshot_app, name="snapshot")
app.add_typer(disk_info_app, name="disk-info") app.add_typer(disk_info_app, name="disk-info")
if __name__ == "__main__": def _run_with_clean_errors() -> None:
"""Wrap ``app()`` so AgnesTransportError (and other typed CLI errors)
surface as a one-line message + exit, never as a Python traceback. The
full traceback is already logged to ``~/.config/agnes/last-error.log``
by the api_* helpers operators read it from there for support
forwarding. Anything that escapes this wrapper IS a CLI bug worth
fixing log + print "internal error" so the analyst doesn't see a
Pythonist's traceback either.
Pavel's #185 Phase 3B: previously a `httpx.ReadTimeout` from an
`agnes query --remote` against a slow BQ view dumped a 30-frame
traceback to the analyst's terminal. Now: one clean line + a hint,
return code 1.
"""
from cli.client import AgnesTransportError, _log_traceback, _LOG_FILE
try:
app() app()
except (AgnesTransportError) as exc:
typer.echo(f"Error: {exc.user_message}", err=True)
if exc.hint:
typer.echo(exc.hint, err=True)
sys.exit(1)
except typer.Exit:
raise
except (KeyboardInterrupt, SystemExit):
raise
except Exception as exc: # last-resort net — escaped exceptions are bugs
log = _log_traceback(exc, context="unhandled at CLI top-level")
typer.echo(
f"Error: internal CLI error ({type(exc).__name__}). "
f"Full traceback logged to {log}.",
err=True,
)
sys.exit(1)
if __name__ == "__main__":
_run_with_clean_errors()

View file

@ -184,7 +184,7 @@ def format_outdated_notice(info: UpdateInfo) -> str:
literal string "None" into a copy-pasteable command drop the upgrade literal string "None" into a copy-pasteable command drop the upgrade
snippet in that case. snippet in that case.
""" """
msg = f"[update] da {info.installed} is out of date — latest on this server is {info.latest}." msg = f"[update] agnes {info.installed} is out of date — latest on this server is {info.latest}."
if info.download_url: if info.download_url:
msg += f" Upgrade: uv tool install --force {info.download_url}" msg += f" Upgrade: uv tool install --force {info.download_url}"
return msg return msg

View file

@ -28,22 +28,41 @@ This workspace is connected to {{ server.url }}.
- **Personal customizations go in `.claude/CLAUDE.local.md`, NOT here.** This file is regenerated by `agnes init --force`; edits here will be lost. CLAUDE.local.md is preserved across regeneration and uploaded on `agnes push`. - **Personal customizations go in `.claude/CLAUDE.local.md`, NOT here.** This file is regenerated by `agnes init --force`; edits here will be lost. CLAUDE.local.md is preserved across regeneration and uploaded on `agnes push`.
## Metrics Workflow ## Metrics Workflow
1. `agnes catalog --metrics` — find the relevant metric ({{ metrics.count }} available, categories: {{ metrics.categories | join(", ") or "none yet" }}) 1. `agnes catalog --metrics` — list registered metrics + categories
2. `agnes catalog --metrics --show <category>/<name>` — read SQL and business rules 2. `agnes catalog --metrics --show <category>/<name>` — read the canonical SQL + business rules
3. Use the canonical SQL from the metric definition, adapt to the question 3. Adapt the canonical SQL; never invent metric calculations
4. Never invent metric calculations — always check existing definitions first
## Data Sync ## Data Sync
- `agnes pull` — download current data from server - `agnes pull` — download current data from server
- `agnes push` — upload sessions and local notes to server - `agnes push` — upload sessions and local notes to server
- Data on the server refreshes every {{ sync_interval }} - Data on the server refreshes every {{ sync_interval }}
## Available Datasets ## Discovering tables — never enumerate from memory
{% for t in tables -%}
- `{{ t.name }}`{% if t.description %} — {{ t.description }}{% endif %}{% if t.query_mode == "remote" %} *(remote, queried on demand)*{% endif %} Tables, columns, sizes, descriptions, and `query_mode` change as admins
{% else -%} register / migrate / drop entries. Always re-discover from the live server,
- _No tables registered yet — ask an admin to register tables in the dashboard._ never from this file or your training data:
{% endfor %}
```
agnes catalog --json # all tables: id, query_mode, sql_flavor,
# where_examples, fetch_via, rough_size_hint, description
agnes catalog --json | jq '.tables[] | select(.id=="<id>")' # single table — read its description in full BEFORE writing any SQL
agnes schema <table> # columns + types in the right SQL dialect
agnes describe <table> -n 5 # sample rows (local + materialized only)
```
The `description` field on each catalog row is the **authoritative
business-rules text** for that table — it carries grain, partition
column, join contracts, and column-level gotchas. Re-read it from the
live `agnes catalog` for every cross-table decision; do **not** copy
it into this workspace `CLAUDE.md` (it's a snapshot that goes stale,
and `agnes init` will overwrite local edits — put personal notes into
`.claude/CLAUDE.local.md` instead). The CLI is the source of truth.
`rough_size_hint` is server-populated for `local` and `materialized` tables
(`small` ≤100 MiB, `medium` ≤1 GiB, `large` ≤10 GiB, `very_large` >10 GiB) and
`null` for `remote` rows. When `null`, treat the table as potentially large
and use `agnes snapshot create --estimate` to size-check before fetching.
{% if marketplaces -%} {% if marketplaces -%}
## Plugins available to you ## Plugins available to you
@ -58,15 +77,43 @@ Not every table is synced. Tables registered with `query_mode: "remote"` live in
BigQuery, accessed server-side via DuckDB's BQ extension — no parquet on disk. BigQuery, accessed server-side via DuckDB's BQ extension — no parquet on disk.
Tables you don't see in `server/parquet/` may still be queryable. Tables you don't see in `server/parquet/` may still be queryable.
### Discovery first ### Discovery first — read `agnes catalog --json` BEFORE every cross-table decision
`agnes catalog --json` returns one row per table with these fields. Use them; don't guess:
| Field | What it tells you | How to use it |
|---|---|---|
| `query_mode` | `local` (parquet on laptop) / `remote` (BQ on demand) / `materialized` (synced parquet of a BQ result) | Picks the tool — see decision tree below |
| `source_type` | `keboola` / `bigquery` / `jira` | Determines SQL dialect |
| `sql_flavor` | `duckdb` for local sources, `bigquery` for `--remote` queries on BQ rows | What syntax `--where` expects |
| `where_examples` | 13 example WHERE predicates that are valid for this table's dialect | Copy as starting point for `--where` |
| `fetch_via` | Pre-formatted `agnes snapshot create …` template for this table | The canonical "how do I get a slice of this table" command |
| `rough_size_hint` | Coarse size hint (`small` / `medium` / `large` or null when unknown) | Bigger than `medium` → never `agnes query --remote` without a tight `--where`; use `agnes snapshot create` |
``` ```
agnes catalog --json | jq '.[] | {name, source_type, query_mode}' # see all tables + their modes agnes catalog --json # full structured view (use this in scripts)
agnes schema <table> # columns + types agnes catalog # human-readable summary
agnes describe <table> -n 5 # sample rows agnes schema <table> # columns + types (BIGQUERY/DUCKDB dialect printed in header)
agnes describe <table> -n 5 # sample rows (works on local & materialized only)
``` ```
For local-mode tables, query directly with `agnes query "SELECT … FROM <table>"`. ### Decision tree — pick the right tool BEFORE writing SQL
```
┌─ local → agnes query "SELECT ..."
agnes catalog → ─────┤
query_mode of <table> ├─ materialized → agnes query (parquet was synced by agnes pull)
│ (if missing locally, run `agnes pull` first)
└─ remote → choose by table size + query shape:
- one cheap probe (COUNT, schema-confirm, single agg ≤200s)
→ agnes query --remote "..."
- repeated questions on same slice / large scan
→ agnes snapshot create <table> --select ... --where ... --as <name>
then agnes query "SELECT ... FROM <name>"
- join with a local table
→ agnes query --register-bq "alias=BQ_SQL" --sql "..."
```
### Three patterns for `query_mode: "remote"` tables ### Three patterns for `query_mode: "remote"` tables
@ -76,13 +123,30 @@ For local-mode tables, query directly with `agnes query "SELECT … FROM <table>
| **`agnes query --remote`** | one-shot, server-side execution against BigQuery (works for BASE TABLE rows directly + VIEW/MATERIALIZED_VIEW rows via the BQ jobs API; cost-guarded by a 5 GiB scan cap configurable in /admin/server-config) | single aggregate / cheap probe | | **`agnes query --remote`** | one-shot, server-side execution against BigQuery (works for BASE TABLE rows directly + VIEW/MATERIALIZED_VIEW rows via the BQ jobs API; cost-guarded by a 5 GiB scan cap configurable in /admin/server-config) | single aggregate / cheap probe |
| **`agnes query --register-bq`** | hybrid joins between local snapshots and ad-hoc BQ subqueries | crossing local + remote | | **`agnes query --register-bq`** | hybrid joins between local snapshots and ad-hoc BQ subqueries | crossing local + remote |
### Permission model + cost — important ### Common mistakes — avoid on first try
- BQ access goes through the **agnes server's GCE service account**, not your personal Google credentials. If a query fails with a permission error, the table is in a project the server SA cannot read — escalate to admin, do NOT try to authenticate yourself. - **`--estimate` is on `agnes snapshot create` ONLY.** Do NOT pass it to `agnes query` — fails with `No such option: --estimate`. The estimate flow is a snapshot-creation cost gate, not a query primitive.
- Every BQ query bills the SA's GCP project for **bytes scanned**. A naive `SELECT * FROM <large_table>` can cost real money. ALWAYS: - **Old `agnes fetch` / `da fetch` / `da query` references in stale docs** — the CLI is `agnes`; `agnes fetch` was renamed to `agnes snapshot create`. If you see those names, translate before running.
- filter via `--where` on the partition column (typically a date) - **Don't attempt personal GCP auth** if a BQ query fails with permission errors. BQ access uses the **server's service account**, not your Google identity — escalate to admin instead.
- list specific columns in `--select` — column-store BQ skips the rest, cheaper - **Don't `agnes query --remote "SELECT * FROM <large_table>"`** without a `--where`. Even if the scan-byte gate refuses, you've wasted the round-trip; gate yourself first by reading `rough_size_hint` and `where_examples` from `agnes catalog --json`.
- run `--estimate` first when unsure of the table size or partitioning
### Failure-mode dictionary — what each error means + the right response
| Error wording (substring) | Cause | Response |
|---|---|---|
| `Binder Error: Query execution exceeded the timeout. Job ID: ...` | BQ-side query took >~200 s wall-clock; the DuckDB BQ extension's `bq_query_timeout_ms` (default 90 s, server may bump to 600 s) elapsed | Narrow `--where` (especially partition column), drop unused columns from `--select`, or switch to `agnes snapshot create` to materialise once + query locally |
| `HTTP 400: remote_scan_too_large` | Server's `bq_max_scan_bytes` cost gate refused the query (default 5 GiB) | Tighten `--where`; consider `agnes snapshot create` so the cost is paid once, then local queries are free |
| `HTTP 401: ... unauthorized` | PAT expired or wrong | `agnes init --server-url ... --token <new-PAT>`; re-mint via the dashboard's "Personal Access Tokens" page |
| `HTTP 403: cross_project_forbidden` (with `serviceusage` mention) | Server SA lacks `serviceusage.services.use` on the BQ data project | Escalate to admin to set `data_source.bigquery.billing_project`; do NOT try personal auth |
| `ReadTimeout` (client-side) on `agnes query --remote` | CLI is older than 0.35.1 (had 30 s default) | `agnes --version`; if <0.35.1, upgrade with `uv tool install --force <wheel-from-server>` (the URL is in the `[update]` banner that prints on every command). Then retry. |
| `unknown columns: [...]` from `agnes snapshot create` | `--select` lists columns that don't exist | Run `agnes schema <table>` and copy column names verbatim |
### Cost discipline — every BQ query bills bytes scanned
A naive `SELECT * FROM <large_table>` can cost real money. ALWAYS:
- filter via `--where` on the partition column (typically a date) — read `where_examples` in `agnes catalog --json`
- list specific columns in `--select` — column-store BQ skips the rest
- run `--estimate` first (only valid on `agnes snapshot create`) when the table is partitioned/clustered or when `rough_size_hint` is unknown
### `agnes snapshot create` discipline ### `agnes snapshot create` discipline

View file

@ -127,6 +127,14 @@ data_source:
# # Dry-run check before running; exceeding -> registration / sync # # Dry-run check before running; exceeding -> registration / sync
# # rejected. Default 10 GiB (10737418240). Set 0 to disable. # # rejected. Default 10 GiB (10737418240). Set 0 to disable.
# # null falls through to default. Configurable via /admin/server-config UI. # # null falls through to default. Configurable via /admin/server-config UI.
# query_timeout_ms: 600000
# # DuckDB BigQuery extension query timeout (milliseconds).
# # Applied via `SET bq_query_timeout_ms` after every LOAD bigquery
# # on every BQ-touching DuckDB session. Extension default is
# # 90 000 ms = 90 s, which is too tight for analyst queries against
# # view-backed datasets -- bumped to 600 000 ms = 10 min by default.
# # Set 0 to fall through to the extension default. Configurable via
# # /admin/server-config UI.
# --- OpenMetadata catalog (optional) --- # --- OpenMetadata catalog (optional) ---
# Enriches table and column metadata from OpenMetadata REST API. # Enriches table and column metadata from OpenMetadata REST API.

View file

@ -232,11 +232,48 @@ def _default_duckdb_session_factory(projects: BqProjects):
f"failed to install/load BigQuery DuckDB extension: {e}", f"failed to install/load BigQuery DuckDB extension: {e}",
details={"original": str(e)}, details={"original": str(e)},
) )
apply_bq_session_settings(conn)
yield conn yield conn
finally: finally:
conn.close() conn.close()
def apply_bq_session_settings(conn) -> None:
"""Apply per-session DuckDB BigQuery-extension settings from instance config.
Currently sets ``bq_query_timeout_ms`` from
``data_source.bigquery.query_timeout_ms``. The extension default is 90 s,
which is too tight for analyst-scale queries against view-backed BQ
datasets bumping the default to 600 s here. Sentinel ``0`` (or a
non-numeric / unparseable value) leaves the extension default in place.
Call AFTER ``LOAD bigquery`` on every DuckDB session that touches BQ:
BqAccess's session factory, the standalone extractor in
``connectors/bigquery/extractor.py``, and the orchestrator's
``_remote_attach`` path in ``src/orchestrator.py``.
"""
try:
from app.instance_config import get_value
except Exception:
return
raw = get_value(
"data_source", "bigquery", "query_timeout_ms", default=600_000,
)
try:
ms = int(raw) if raw is not None else 0
except (TypeError, ValueError):
return
if ms <= 0:
return
try:
conn.execute(f"SET bq_query_timeout_ms = {int(ms)}")
except Exception:
# Fail-soft: extension version may not support the setting, or the
# session may already have been frozen — leave the default rather
# than poisoning the whole session.
pass
class BqAccess: class BqAccess:
"""Single entry point for BigQuery access. Stateless after construction. """Single entry point for BigQuery access. Stateless after construction.

View file

@ -359,6 +359,8 @@ def _init_extract_locked(
conn.execute( conn.execute(
f"CREATE SECRET bq_session (TYPE bigquery, ACCESS_TOKEN '{escaped_token}')" f"CREATE SECRET bq_session (TYPE bigquery, ACCESS_TOKEN '{escaped_token}')"
) )
from connectors.bigquery.access import apply_bq_session_settings
apply_bq_session_settings(conn)
conn.execute( conn.execute(
f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)" f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)"
) )

View file

@ -112,6 +112,11 @@ services:
- /data/state/certs:/certs:ro - /data/state/certs:/certs:ro
- caddy_data:/data - caddy_data:/data
- caddy_config:/config - caddy_config:/config
# Read-only mount of the agnes data dir so Caddy's file_server can
# serve parquets directly (sendfile/zero-copy) and bypass the app's
# uvicorn workers — see Caddyfile's @download handler. Mounted at
# /srv (not /data) because /data is already the caddy_data volume.
- data:/srv:ro
environment: environment:
- DOMAIN=${DOMAIN:-localhost} - DOMAIN=${DOMAIN:-localhost}
# Passes through whatever the operator set in .env. Caddyfile uses # Passes through whatever the operator set in .env. Caddyfile uses

View file

@ -1,6 +1,6 @@
[project] [project]
name = "agnes-the-ai-analyst" name = "agnes-the-ai-analyst"
version = "0.35.1" version = "0.36.0"
description = "Agnes — AI Data Analyst platform for AI analytical systems" description = "Agnes — AI Data Analyst platform for AI analytical systems"
requires-python = ">=3.11,<3.14" requires-python = ">=3.11,<3.14"
license = "MIT" license = "MIT"
@ -95,7 +95,7 @@ dev = [
] ]
[project.scripts] [project.scripts]
agnes = "cli.main:app" agnes = "cli.main:_run_with_clean_errors"
[build-system] [build-system]
requires = ["hatchling"] requires = ["hatchling"]

View file

@ -53,8 +53,57 @@ IMAGE="ghcr.io/keboola/agnes-the-ai-analyst:${AGNES_TAG:-stable}"
# Array form (vs. word-split string) — quoted expansion survives paths # Array form (vs. word-split string) — quoted expansion survives paths
# with spaces and is the modern bash idiom. Functionally identical here # with spaces and is the modern bash idiom. Functionally identical here
# since /opt/agnes paths are tame, but it's a cheap habit to keep. # since /opt/agnes paths are tame, but it's a cheap habit to keep.
#
# The TLS-overlay decision deliberately runs BELOW the config re-fetch
# (Devin Review caught: this used to live here, evaluating Caddyfile
# existence against the PRE-fetch state. If the fetch added a
# previously-missing Caddyfile, this tick's docker compose would still
# omit `--profile tls` until the next 5-minute tick — a window where
# the recreate uses the wrong overlay set). Base file list is fine to
# initialise here because the tls overlay is the only conditional one.
COMPOSE_FILES=( -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.host-mount.yml ) COMPOSE_FILES=( -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.host-mount.yml )
PROFILE_ARGS=() PROFILE_ARGS=()
# Re-fetch the bind-mounted config files (compose overlays + Caddyfile)
# from the OSS main branch on every tick. Without this, an image-only
# change is fine, but a change to the Caddyfile or any compose overlay
# (e.g. a new bind mount, a route, an env_file path) only lands on VMs
# that get a fresh `startup.sh` boot — leaving long-uptime VMs running
# the new image against stale config. Confirmed live on 2026-05-05
# when a Caddyfile change adding a `data:/srv:ro` mount + a new
# `forward_auth` + `file_server` route for parquet downloads landed
# in main but stayed inert on running VMs because auto-upgrade only
# watched image digests.
#
# Hash before/after to detect content drift; treat as "trigger recreate"
# alongside an image digest change. Atomic move-after-fetch guards
# against a partial download corrupting compose at the next docker
# action — `curl --fail` plus the `.new` rename means a 404 / network
# blip leaves the existing file untouched.
RAW_BASE="https://raw.githubusercontent.com/keboola/agnes-the-ai-analyst/main"
CONFIG_FILES=(
docker-compose.yml docker-compose.prod.yml docker-compose.host-mount.yml
docker-compose.tls.yml Caddyfile
)
hash_config_files() {
# Sort to keep hash stable across operator add/remove, missing files
# contribute the empty string (sha256 of "" is well-defined). Run
# from /opt/agnes to keep relative paths terse in the hash input.
( cd /opt/agnes && for f in "${CONFIG_FILES[@]}"; do
sha256sum "$f" 2>/dev/null || printf 'missing %s\n' "$f"
done ) | sort | sha256sum | awk '{print $1}'
}
CONFIG_BEFORE=$(hash_config_files)
for f in "${CONFIG_FILES[@]}"; do
if curl -fsSL "$RAW_BASE/$f" -o "/opt/agnes/$f.new" 2>/dev/null; then
mv -f "/opt/agnes/$f.new" "/opt/agnes/$f"
else
rm -f "/opt/agnes/$f.new"
logger -t agnes-auto-upgrade "WARN: failed to fetch $f from $RAW_BASE — keeping existing /opt/agnes/$f"
fi
done
CONFIG_AFTER=$(hash_config_files)
# `-s` (size > 0) instead of `-f` — guards against the corner case where # `-s` (size > 0) instead of `-f` — guards against the corner case where
# rotate.sh wrote a 0-byte cert and exited (or got SIGKILLed mid-write). # rotate.sh wrote a 0-byte cert and exited (or got SIGKILLed mid-write).
# Bringing up the tls profile against an empty cert would just crash # Bringing up the tls profile against an empty cert would just crash
@ -63,19 +112,46 @@ PROFILE_ARGS=()
# with an empty one) the caddy service crash-loops while the tls overlay # with an empty one) the caddy service crash-loops while the tls overlay
# has already closed :8000 — net effect is "app unreachable". Skipping # has already closed :8000 — net effect is "app unreachable". Skipping
# the overlay keeps the app on plain :8000 until config lands. # the overlay keeps the app on plain :8000 until config lands.
#
# Evaluated AFTER the config re-fetch above so a freshly-added or
# freshly-removed Caddyfile is reflected in this tick's compose set,
# not the next one.
if [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ] && [ -s Caddyfile ]; then if [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ] && [ -s Caddyfile ]; then
COMPOSE_FILES+=( -f docker-compose.tls.yml ) COMPOSE_FILES+=( -f docker-compose.tls.yml )
PROFILE_ARGS=( --profile tls ) PROFILE_ARGS=( --profile tls )
elif [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ]; then elif [ -s /data/state/certs/fullchain.pem ] && [ -s /data/state/certs/privkey.pem ]; then
logger -t agnes-auto-upgrade "WARN: certs present but Caddyfile missing/empty — skipping tls overlay" logger -t agnes-auto-upgrade "WARN: certs present but Caddyfile missing/empty — skipping tls overlay"
fi fi
BEFORE=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) BEFORE=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1)
docker compose "${COMPOSE_FILES[@]}" pull >/dev/null 2>&1 docker compose "${COMPOSE_FILES[@]}" pull >/dev/null 2>&1
AFTER=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1) AFTER=$(docker images --no-trunc --format '{{.Digest}}' "$IMAGE" | head -1)
if [ "$BEFORE" != "$AFTER" ]; then
echo "$(date): new digest for $IMAGE — recreating containers" if [ "$BEFORE" != "$AFTER" ] || [ "$CONFIG_BEFORE" != "$CONFIG_AFTER" ]; then
REASON=()
[ "$BEFORE" != "$AFTER" ] && REASON+=("image digest")
[ "$CONFIG_BEFORE" != "$CONFIG_AFTER" ] && REASON+=("config files")
echo "$(date): change detected (${REASON[*]}) — recreating containers"
# ${arr[@]+"${arr[@]}"} pattern: expands to nothing when array is # ${arr[@]+"${arr[@]}"} pattern: expands to nothing when array is
# empty (vs. plain "${arr[@]}" which trips `set -u` on bash <4.4). # empty (vs. plain "${arr[@]}" which trips `set -u` on bash <4.4).
docker compose "${COMPOSE_FILES[@]}" ${PROFILE_ARGS[@]+"${PROFILE_ARGS[@]}"} up -d docker compose "${COMPOSE_FILES[@]}" ${PROFILE_ARGS[@]+"${PROFILE_ARGS[@]}"} up -d
docker image prune -f >/dev/null 2>&1 docker image prune -f >/dev/null 2>&1
fi fi
# Self-update: re-fetch *this* script too. Without this, the very fix
# that lets auto-upgrade watch config files would itself never land on
# running VMs — a self-perpetuating "old script" problem. Atomic via
# .new + mv; chmod preserved. The next tick (5 min later) runs the
# new logic. Skipping if curl fails leaves the existing script in place.
if curl -fsSL "$RAW_BASE/scripts/ops/agnes-auto-upgrade.sh" \
-o /usr/local/bin/agnes-auto-upgrade.sh.new 2>/dev/null; then
if ! cmp -s /usr/local/bin/agnes-auto-upgrade.sh.new \
/usr/local/bin/agnes-auto-upgrade.sh; then
chmod +x /usr/local/bin/agnes-auto-upgrade.sh.new
mv -f /usr/local/bin/agnes-auto-upgrade.sh.new \
/usr/local/bin/agnes-auto-upgrade.sh
logger -t agnes-auto-upgrade "self-update: replaced /usr/local/bin/agnes-auto-upgrade.sh"
else
rm -f /usr/local/bin/agnes-auto-upgrade.sh.new
fi
fi

View file

@ -614,6 +614,8 @@ def _reattach_remote_extensions(
f"CREATE OR REPLACE SECRET {secret_name} " f"CREATE OR REPLACE SECRET {secret_name} "
f"(TYPE bigquery, ACCESS_TOKEN '{escaped}')" f"(TYPE bigquery, ACCESS_TOKEN '{escaped}')"
) )
from connectors.bigquery.access import apply_bq_session_settings
apply_bq_session_settings(conn)
conn.execute( conn.execute(
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
) )

View file

@ -502,6 +502,8 @@ class SyncOrchestrator:
f"CREATE OR REPLACE SECRET {secret_name} " f"CREATE OR REPLACE SECRET {secret_name} "
f"(TYPE bigquery, ACCESS_TOKEN '{escaped}')" f"(TYPE bigquery, ACCESS_TOKEN '{escaped}')"
) )
from connectors.bigquery.access import apply_bq_session_settings
apply_bq_session_settings(conn)
conn.execute( conn.execute(
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
) )

View file

@ -0,0 +1,123 @@
"""Unit tests for apply_bq_session_settings.
Covers the data_source.bigquery.query_timeout_ms knob added so that
agnes query --remote no longer trips the DuckDB BigQuery extension's
built-in 90 s wait timeout when the underlying BQ job takes longer.
"""
from unittest.mock import patch
from connectors.bigquery.access import apply_bq_session_settings
class _RecordingConn:
"""Minimal DuckDB-conn stand-in that records execute() calls.
apply_bq_session_settings only calls .execute(); we don't need a
real DuckDB to verify the SET command shape.
"""
def __init__(self, raise_on=None):
self.calls: list[str] = []
self.raise_on = raise_on
def execute(self, sql: str):
self.calls.append(sql)
if self.raise_on and self.raise_on in sql:
raise RuntimeError(f"simulated failure on: {sql}")
def _patched_get_value(value):
"""Helper: build a patch target that returns *value* for the
data_source.bigquery.query_timeout_ms key and propagates the
`default=` kwarg for any other lookup so we don't accidentally
break tests that read other keys via the same module."""
def fake(*keys, default=None):
if keys == ("data_source", "bigquery", "query_timeout_ms"):
return value
return default
return patch("app.instance_config.get_value", side_effect=fake)
def test_default_when_config_missing():
"""When get_value returns the default (None passed through, default arg
used), apply_bq_session_settings should fall back to the bumped
600 000 ms default and emit the SET."""
conn = _RecordingConn()
# Simulate get_value returning the default we passed (600_000) by
# echoing the default kwarg.
def fake(*keys, default=None):
return default
with patch("app.instance_config.get_value", side_effect=fake):
apply_bq_session_settings(conn)
assert conn.calls == ["SET bq_query_timeout_ms = 600000"]
def test_explicit_value():
conn = _RecordingConn()
with _patched_get_value(900_000):
apply_bq_session_settings(conn)
assert conn.calls == ["SET bq_query_timeout_ms = 900000"]
def test_zero_sentinel_leaves_extension_default():
"""0 means 'use the DuckDB BQ extension's built-in default' — no SET
must be emitted so a non-zero default doesn't override an operator's
explicit opt-out."""
conn = _RecordingConn()
with _patched_get_value(0):
apply_bq_session_settings(conn)
assert conn.calls == []
def test_negative_value_treated_as_zero():
"""Negative is nonsensical for a timeout; treat as 'extension default'
rather than emitting a negative SET that the extension might reject
or interpret unexpectedly."""
conn = _RecordingConn()
with _patched_get_value(-1):
apply_bq_session_settings(conn)
assert conn.calls == []
def test_non_numeric_silently_skipped():
"""A string-typed YAML value (e.g. operator typo) shouldn't crash
the BQ session fall through to the extension default."""
conn = _RecordingConn()
with _patched_get_value("notanumber"):
apply_bq_session_settings(conn)
assert conn.calls == []
def test_string_numeric_is_coerced():
"""YAML loaders sometimes deliver int-like values as strings; accept
those rather than failing."""
conn = _RecordingConn()
with _patched_get_value("750000"):
apply_bq_session_settings(conn)
assert conn.calls == ["SET bq_query_timeout_ms = 750000"]
def test_set_failure_does_not_propagate():
"""Older DuckDB BQ extension versions may not recognise the setting.
The function must fail-soft so a session that was otherwise healthy
keeps working just with the extension's built-in default timeout."""
conn = _RecordingConn(raise_on="SET bq_query_timeout_ms")
with _patched_get_value(600_000):
# Must not raise.
apply_bq_session_settings(conn)
# The SET was attempted (recorded before the exception).
assert conn.calls == ["SET bq_query_timeout_ms = 600000"]
def test_no_app_config_module_silently_skipped():
"""Unit-test contexts that don't bring up the app config layer must
still be able to construct BQ sessions for narrow tests; an
ImportError on app.instance_config means we can't read the knob,
so we leave the extension default in place."""
conn = _RecordingConn()
with patch.dict(
"sys.modules", {"app.instance_config": None},
):
apply_bq_session_settings(conn)
assert conn.calls == []

View file

@ -0,0 +1,124 @@
"""Unit tests for ``GET /api/data/{table_id}/check-access`` — the
lightweight RBAC probe used by Caddy's ``forward_auth`` directive to gate
file_server-served parquet downloads without involving the app's request
workers in the bulk byte transfer.
The endpoint must:
- return 204 when the caller has read access (admin always; non-admin
only with an explicit ``resource_grants`` row),
- return 403 with no body / minimal body when the caller does not,
- return 404 for unsafe identifiers (path-traversal guard),
- return 401 when the request has no auth.
"""
from tests.conftest import create_mock_extract
def _auth(token):
return {"Authorization": f"Bearer {token}"}
def test_admin_gets_204(seeded_app):
"""Admin short-circuits all RBAC checks — must always succeed."""
c = seeded_app["client"]
env = seeded_app["env"]
create_mock_extract(env["extracts_dir"], "keboola", [
{"name": "salaries", "data": [{"id": "1"}]},
])
from src.orchestrator import SyncOrchestrator
SyncOrchestrator().rebuild()
c.post(
"/api/admin/register-table",
json={"name": "salaries", "source_type": "keboola"},
headers=_auth(seeded_app["admin_token"]),
)
resp = c.get(
"/api/data/salaries/check-access",
headers=_auth(seeded_app["admin_token"]),
)
assert resp.status_code == 204
assert resp.content == b""
def test_analyst_without_grant_gets_403(seeded_app):
"""Non-admin without an explicit `resource_grants` row must be denied
the production failure mode where Caddy's forward_auth returns the
403 to the client and never invokes file_server."""
c = seeded_app["client"]
env = seeded_app["env"]
create_mock_extract(env["extracts_dir"], "keboola", [
{"name": "salaries", "data": [{"id": "1"}]},
])
from src.orchestrator import SyncOrchestrator
SyncOrchestrator().rebuild()
c.post(
"/api/admin/register-table",
json={"name": "salaries", "source_type": "keboola"},
headers=_auth(seeded_app["admin_token"]),
)
resp = c.get(
"/api/data/salaries/check-access",
headers=_auth(seeded_app["analyst_token"]),
)
assert resp.status_code == 403
def test_analyst_with_grant_gets_204(seeded_app):
"""Once the analyst has a TABLE grant, check-access flips to 204
and Caddy is free to serve the file directly. Mirrors the same
grant flow used by ``/api/data/{id}/download``."""
c = seeded_app["client"]
env = seeded_app["env"]
create_mock_extract(env["extracts_dir"], "keboola", [
{"name": "salaries", "data": [{"id": "1"}]},
])
from src.orchestrator import SyncOrchestrator
SyncOrchestrator().rebuild()
c.post(
"/api/admin/register-table",
json={"name": "salaries", "source_type": "keboola"},
headers=_auth(seeded_app["admin_token"]),
)
# Mint the grant via the admin API the same way the existing download
# access-control tests do — see test_access_control.py.
from tests.test_access_control import _grant_table_to_analyst
from src.db import get_system_db
conn = get_system_db()
try:
_grant_table_to_analyst(conn, "salaries")
finally:
conn.close()
resp = c.get(
"/api/data/salaries/check-access",
headers=_auth(seeded_app["analyst_token"]),
)
assert resp.status_code == 204
def test_unsafe_table_id_gets_404(seeded_app):
"""Identifier validation runs BEFORE RBAC — keeps path-traversal
payloads (``../etc/passwd``) from reaching ``can_access_table`` and
matches the pre-existing behavior of ``/download``."""
c = seeded_app["client"]
resp = c.get(
"/api/data/..%2Fetc%2Fpasswd/check-access",
headers=_auth(seeded_app["admin_token"]),
)
# FastAPI's path converter rejects encoded slashes outright; either
# 404 from the validator or 404 from no-such-route is acceptable —
# both block the traversal. The point is no 5xx and no 204.
assert resp.status_code in (404, 422)
def test_no_auth_gets_401(seeded_app):
"""Caddy will only call the auth-check endpoint when the client sent
credentials but if a request slips through without them, the
endpoint must reject with 401 so Caddy returns 401 to the client
instead of falling through to file_server with no identity."""
c = seeded_app["client"]
resp = c.get("/api/data/salaries/check-access")
assert resp.status_code == 401

View file

@ -152,7 +152,7 @@ class TestBqAccessErrors:
# Endpoint is async — drive it directly. dependency_overrides only # Endpoint is async — drive it directly. dependency_overrides only
# fires through TestClient/HTTP, so pass `bq=bq` explicitly. # fires through TestClient/HTTP, so pass `bq=bq` explicitly.
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run(v2_sample.sample( (v2_sample.sample(
table_id="bq_view", n=5, user=user, conn=conn, bq=bq, table_id="bq_view", n=5, user=user, conn=conn, bq=bq,
)) ))
finally: finally:
@ -182,7 +182,7 @@ class TestBqAccessErrors:
user = {"id": "admin1", "email": "a@x.com"} user = {"id": "admin1", "email": "a@x.com"}
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run(v2_sample.sample( (v2_sample.sample(
table_id="bq_view", n=5, user=user, conn=conn, bq=bq, table_id="bq_view", n=5, user=user, conn=conn, bq=bq,
)) ))
finally: finally:
@ -211,7 +211,7 @@ class TestBqAccessErrors:
user = {"id": "admin1", "email": "a@x.com"} user = {"id": "admin1", "email": "a@x.com"}
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run(v2_sample.sample( (v2_sample.sample(
table_id="bq_view", n=5, user=user, conn=conn, bq=bq, table_id="bq_view", n=5, user=user, conn=conn, bq=bq,
)) ))
finally: finally:
@ -248,7 +248,7 @@ class TestBqAccessErrors:
try: try:
_seed(conn) _seed(conn)
user = {"id": "admin1", "email": "a@x.com"} user = {"id": "admin1", "email": "a@x.com"}
asyncio.run(v2_sample.sample( (v2_sample.sample(
table_id="bq_view", n=5, user=user, conn=conn, bq=bq, table_id="bq_view", n=5, user=user, conn=conn, bq=bq,
)) ))
finally: finally:

View file

@ -298,7 +298,7 @@ class TestBqAccessErrors:
lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"},
): ):
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run( (
v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq)
) )
finally: finally:
@ -337,7 +337,7 @@ class TestBqAccessErrors:
lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"},
): ):
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run( (
v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq)
) )
finally: finally:
@ -372,7 +372,7 @@ class TestBqAccessErrors:
lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"},
): ):
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run( (
v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq)
) )
finally: finally:

View file

@ -117,7 +117,7 @@ class TestBqAccessErrors:
lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"},
): ):
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run( (
v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq)
) )
finally: finally:
@ -156,7 +156,7 @@ class TestBqAccessErrors:
lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"},
): ):
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run( (
v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq)
) )
finally: finally:
@ -191,7 +191,7 @@ class TestBqAccessErrors:
lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"},
): ):
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run( (
v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq)
) )
finally: finally:

View file

@ -161,7 +161,7 @@ class TestBqAccessErrors:
# Endpoint is async — drive it directly. dependency_overrides only # Endpoint is async — drive it directly. dependency_overrides only
# fires through TestClient/HTTP, so pass `bq=bq` explicitly. # fires through TestClient/HTTP, so pass `bq=bq` explicitly.
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run(v2_schema.schema( (v2_schema.schema(
table_id="bq_view", user=user, conn=conn, bq=bq, table_id="bq_view", user=user, conn=conn, bq=bq,
)) ))
finally: finally:
@ -191,7 +191,7 @@ class TestBqAccessErrors:
_seed_bq_table(conn) _seed_bq_table(conn)
user = {"id": "admin1", "email": "a@x.com"} user = {"id": "admin1", "email": "a@x.com"}
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run(v2_schema.schema( (v2_schema.schema(
table_id="bq_view", user=user, conn=conn, bq=bq, table_id="bq_view", user=user, conn=conn, bq=bq,
)) ))
finally: finally:
@ -217,7 +217,7 @@ class TestBqAccessErrors:
_seed_bq_table(conn) _seed_bq_table(conn)
user = {"id": "admin1", "email": "a@x.com"} user = {"id": "admin1", "email": "a@x.com"}
with pytest.raises(HTTPException) as exc_info: with pytest.raises(HTTPException) as exc_info:
asyncio.run(v2_schema.schema( (v2_schema.schema(
table_id="bq_view", user=user, conn=conn, bq=bq, table_id="bq_view", user=user, conn=conn, bq=bq,
)) ))
finally: finally:
@ -259,7 +259,7 @@ class TestBqAccessErrors:
try: try:
_seed_bq_table(conn) _seed_bq_table(conn)
user = {"id": "admin1", "email": "a@x.com"} user = {"id": "admin1", "email": "a@x.com"}
data = asyncio.run(v2_schema.schema( data = (v2_schema.schema(
table_id="bq_view", user=user, conn=conn, bq=_bq(), table_id="bq_view", user=user, conn=conn, bq=_bq(),
)) ))
finally: finally:
@ -322,7 +322,7 @@ class TestBqAccessErrors:
try: try:
_seed_bq_table(conn) _seed_bq_table(conn)
user = {"id": "admin1", "email": "a@x.com"} user = {"id": "admin1", "email": "a@x.com"}
asyncio.run(v2_schema.schema( (v2_schema.schema(
table_id="bq_view", user=user, conn=conn, bq=bq, table_id="bq_view", user=user, conn=conn, bq=bq,
)) ))
finally: finally: