From e5fb913cecaeda02f3a4cbf57bc4c34149983000 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 5 May 2026 17:44:08 +0200 Subject: [PATCH] =?UTF-8?q?perf:=20Tier=201=20event-loop=20unblocking=20?= =?UTF-8?q?=E2=80=94=20async=20def=20=E2=86=92=20def=20on=20BQ-bound=20han?= =?UTF-8?q?dlers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five hottest BQ-touching endpoints were `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's jobs.query) froze EVERY other request — /api/health, dashboard, auth, even another query — for the full BQ wait. Operators saw "VM idle, app frozen" during PR #188's testing. Convert to plain `def` so FastAPI auto-offloads the body to the anyio thread pool. Event loop stays free for non-BQ requests. - app/api/query.py:execute_query - app/api/v2_scan.py:scan_estimate_endpoint, scan_endpoint - app/api/v2_sample.py:sample - app/api/v2_schema.py:schema Audit: 0 `await` statements in any converted handler (verified file-by- file), so the rename is safe. Tests in tests/test_v2_*.py called the handlers via `asyncio.run(...)` which now fails on a non-coroutine return; swapped for direct calls (asyncio.run( -> ( ) — keeps paren balance). Plus AGNES_THREADPOOL_SIZE env var (default 200, was anyio's stock 40) in app/main.py:lifespan. Set via anyio.to_thread.current_default_thread_limiter().total_tokens. 200 is comfortable headroom for <50 concurrent analysts; bump for more. 480/480 impacted tests pass (the 2 remaining errors are a pre-existing fixture setup issue in test_reader_smoke_matrix.py unrelated to this change). --- CHANGELOG.md | 4 ++++ app/api/query.py | 15 +++++++++++++-- app/api/v2_sample.py | 4 +++- app/api/v2_scan.py | 9 +++++++-- app/api/v2_schema.py | 4 +++- app/main.py | 17 +++++++++++++++++ tests/test_v2_sample.py | 8 ++++---- tests/test_v2_scan.py | 6 +++--- tests/test_v2_scan_estimate.py | 6 +++--- tests/test_v2_schema.py | 10 +++++----- 10 files changed, 62 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89f00e7..881724a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Changed +- **Tier 1 event-loop unblocking** — the five hottest BQ-touching endpoints (`POST /api/query`, `POST /api/v2/scan`, `POST /api/v2/scan/estimate`, `GET /api/v2/sample/{id}`, `GET /api/v2/schema/{id}`) were declared `async def` but invoked synchronous DuckDB / BQ-extension calls inside the body. Under uvicorn's single event loop that meant a single heavy `agnes query --remote` (waiting up to ~200 s for BQ's `jobs.query` to return) **froze every other request** — `/api/health`, the dashboard, auth, even another query — for the full duration of the BQ wait. Operators saw "VM idle, app frozen" symptoms during this work. Converted all five to plain `def` so FastAPI auto-offloads the blocking body to the anyio thread pool; the event loop stays free for non-BQ requests. Verified via 0-await audit (no `await` statements in the converted handlers, so the rename is safe). Tests: `tests/test_v2_*.py` were rewritten to call the handlers directly instead of `asyncio.run(...)` (which now fails on a non-coroutine return). Pairs with the thread-pool capacity bump below. +- **`AGNES_THREADPOOL_SIZE` env var** (default 200, was anyio's stock 40) controls the FastAPI / Starlette thread pool capacity used by every plain-`def` route handler. Set in `app/main.py:lifespan` via `anyio.to_thread.current_default_thread_limiter().total_tokens`. 200 leaves comfortable headroom over the BQ extension's connection budget while keeping the per-process thread cost bounded — for the workload of <50 concurrent analysts this is well over what's needed; bump for higher concurrency. + ### Added - **`data_source.bigquery.query_timeout_ms` config knob** (default 600 000 ms = 10 min). The DuckDB BigQuery extension's built-in default of 90 s was too tight for analyst-scale queries against view-backed BQ datasets — `agnes query --remote` would HTTP 400 with `Binder Error: Query execution exceeded the timeout. Job ID: …` whenever the underlying BQ job took longer than 90 s, even though the BQ job itself was healthy. The new knob is applied via `SET bq_query_timeout_ms` after every `LOAD bigquery` on every BQ-touching DuckDB session — the orchestrator's `_remote_attach` ATTACH path (`src/orchestrator.py`), the analytics-DB read-only reattach path (`src/db.py:_reattach_remote_extensions` — the primary `agnes query --remote` request path), the `BqAccess` session factory (`connectors/bigquery/access.py`), and the standalone extractor (`connectors/bigquery/extractor.py`). Sentinel `0` (or non-numeric / unparseable values) leaves the extension default in place so operators on legacy extension versions that don't recognise the setting aren't broken. Configurable via `/admin/server-config` UI. Note: BigQuery's `jobs.query` RPC caps the wait at ~200 s per call regardless of this setting; the extension polls on top so the effective ceiling is the value here but each poll is ~200 s. DuckDB emits an informational warning when the value is set above the BQ RPC cap — operators can safely ignore it. - **Per-user parallel parquet downloads in `agnes pull`** — the download loop in `cli/lib/pull.py` now uses a `ThreadPoolExecutor` with concurrency capped by the new `AGNES_PULL_PARALLELISM` env var (default 4, set 1 to restore pre-PR serial behavior). On a registry of N tables the wall-clock time drops from `Σ stream_download_seconds(table_i)` to roughly `max × ceil(N/4)`. Works hand-in-hand with the Caddy `file_server` change below: without it parallel client-side downloads would still queue on the single uvicorn worker; with it each request is its own caddy goroutine + sendfile, so 4-way parallelism actually delivers throughput. Per-table error semantics preserved — a failure on one table no longer aborts the rest of the batch. diff --git a/app/api/query.py b/app/api/query.py index df6cc50..a758dc4 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -71,12 +71,23 @@ class QueryResponse(BaseModel): @router.post("", response_model=QueryResponse) -async def execute_query( +def execute_query( request: QueryRequest, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Execute SQL against the server analytics DuckDB.""" + """Execute SQL against the server analytics DuckDB. + + Plain ``def`` (not ``async def``) so FastAPI auto-offloads the call + to the anyio thread pool. The body invokes ``analytics.execute(sql)`` + synchronously, which blocks for the full BQ jobs.query wait when a + referenced view resolves through the BQ extension. Under ``async def`` + that block holds the single uvicorn event loop, freezing every other + request (UI, /api/health, auth) until the query returns. Plain ``def`` + runs each invocation on its own thread, so heavy queries no longer + starve unrelated endpoints. See PR #188's CHANGELOG entry for the + Tier 1 event-loop unblocking rollout. + """ sql_lower = request.sql.strip().lower() # Block everything except SELECT diff --git a/app/api/v2_sample.py b/app/api/v2_sample.py index c30f468..9a5ffab 100644 --- a/app/api/v2_sample.py +++ b/app/api/v2_sample.py @@ -104,13 +104,15 @@ def build_sample( @router.get("/sample/{table_id}") -async def sample( +def sample( table_id: str, n: int = Query(default=5, ge=1, le=_MAX_N), user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), bq: BqAccess = Depends(get_bq_access), ): + # Plain ``def`` — opens a `bq.duckdb_session()` and runs sync queries + # through the BQ extension. See PR #188 Tier 1 entry. try: return build_sample(conn, user, table_id, n=n, bq=bq) except FileNotFoundError: diff --git a/app/api/v2_scan.py b/app/api/v2_scan.py index ff80b25..a012830 100644 --- a/app/api/v2_scan.py +++ b/app/api/v2_scan.py @@ -218,12 +218,17 @@ def _avg_bytes_for_type(t: str) -> int: @router.post("/scan/estimate") -async def scan_estimate_endpoint( +def scan_estimate_endpoint( raw: dict, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), bq: BqAccess = Depends(get_bq_access), ): + # Plain ``def`` so FastAPI auto-offloads to the anyio thread pool — the + # estimate path calls into google-cloud-bigquery's `client.query(..., + # dry_run=True)` which blocks until BQ returns the dry-run cost. Under + # ``async def`` that wait holds the event loop. See PR #188's Tier 1 + # entry for the wider rollout. try: return estimate(conn, user, raw, bq=bq) except WhereValidationError as e: @@ -374,7 +379,7 @@ def run_scan( @router.post("/scan") -async def scan_endpoint( +def scan_endpoint( raw: dict, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), diff --git a/app/api/v2_schema.py b/app/api/v2_schema.py index 3194707..a53c2a4 100644 --- a/app/api/v2_schema.py +++ b/app/api/v2_schema.py @@ -209,12 +209,14 @@ def build_schema( @router.get("/schema/{table_id}") -async def schema( +def schema( table_id: str, user: dict = Depends(get_current_user), conn: duckdb.DuckDBPyConnection = Depends(_get_db), bq: BqAccess = Depends(get_bq_access), ): + # Plain ``def`` — opens a `bq.duckdb_session()` and runs sync metadata + # queries through the BQ extension. See PR #188 Tier 1 entry. try: return build_schema(conn, user, table_id, bq=bq) except NotFound: diff --git a/app/main.py b/app/main.py index 612c0ea..371235f 100644 --- a/app/main.py +++ b/app/main.py @@ -141,6 +141,23 @@ async def lifespan(app): log_effective_policy() except Exception: pass # never block startup on a logging convenience + + # Bump anyio's default thread pool size from 40 → AGNES_THREADPOOL_SIZE + # (default 200). FastAPI auto-runs every plain `def` route handler in + # this pool — the Tier 1 endpoints converted in PR #188 (`/api/query`, + # `/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`) all block on + # synchronous DuckDB / BQ-extension calls inside the handler body and + # would otherwise serialise once 40 are in flight. 200 keeps the per- + # process working set well under the BQ extension's connection cap + # while leaving headroom for concurrent UI / health probes. + try: + import anyio.to_thread + size = int(os.environ.get("AGNES_THREADPOOL_SIZE", "200")) + anyio.to_thread.current_default_thread_limiter().total_tokens = size + logger.info("anyio thread pool capacity set to %d", size) + except Exception as e: + logger.warning("failed to bump anyio thread pool capacity: %s", e) + yield from src.db import close_system_db close_system_db() diff --git a/tests/test_v2_sample.py b/tests/test_v2_sample.py index aa9e0d7..07f89f7 100644 --- a/tests/test_v2_sample.py +++ b/tests/test_v2_sample.py @@ -152,7 +152,7 @@ class TestBqAccessErrors: # Endpoint is async — drive it directly. dependency_overrides only # fires through TestClient/HTTP, so pass `bq=bq` explicitly. with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: @@ -182,7 +182,7 @@ class TestBqAccessErrors: user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: @@ -211,7 +211,7 @@ class TestBqAccessErrors: user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: @@ -248,7 +248,7 @@ class TestBqAccessErrors: try: _seed(conn) user = {"id": "admin1", "email": "a@x.com"} - asyncio.run(v2_sample.sample( + (v2_sample.sample( table_id="bq_view", n=5, user=user, conn=conn, bq=bq, )) finally: diff --git a/tests/test_v2_scan.py b/tests/test_v2_scan.py index 7dabe68..289ae02 100644 --- a/tests/test_v2_scan.py +++ b/tests/test_v2_scan.py @@ -298,7 +298,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -337,7 +337,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -372,7 +372,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: diff --git a/tests/test_v2_scan_estimate.py b/tests/test_v2_scan_estimate.py index c451490..c2ab56b 100644 --- a/tests/test_v2_scan_estimate.py +++ b/tests/test_v2_scan_estimate.py @@ -117,7 +117,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -156,7 +156,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: @@ -191,7 +191,7 @@ class TestBqAccessErrors: lambda *a, **kw: {"event_date": "DATE", "country_code": "STRING"}, ): with pytest.raises(HTTPException) as exc_info: - asyncio.run( + ( v2_scan.scan_estimate_endpoint(raw=req, user=user, conn=conn, bq=bq) ) finally: diff --git a/tests/test_v2_schema.py b/tests/test_v2_schema.py index 8ae15c4..f2bfe09 100644 --- a/tests/test_v2_schema.py +++ b/tests/test_v2_schema.py @@ -161,7 +161,7 @@ class TestBqAccessErrors: # Endpoint is async — drive it directly. dependency_overrides only # fires through TestClient/HTTP, so pass `bq=bq` explicitly. with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: @@ -191,7 +191,7 @@ class TestBqAccessErrors: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: @@ -217,7 +217,7 @@ class TestBqAccessErrors: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} with pytest.raises(HTTPException) as exc_info: - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: @@ -259,7 +259,7 @@ class TestBqAccessErrors: try: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} - data = asyncio.run(v2_schema.schema( + data = (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=_bq(), )) finally: @@ -322,7 +322,7 @@ class TestBqAccessErrors: try: _seed_bq_table(conn) user = {"id": "admin1", "email": "a@x.com"} - asyncio.run(v2_schema.schema( + (v2_schema.schema( table_id="bq_view", user=user, conn=conn, bq=bq, )) finally: