diff --git a/CHANGELOG.md b/CHANGELOG.md index 86e66d8..913af0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,19 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Performance +- **DuckDB BigQuery-extension session pool** + (`connectors/bigquery/access.py`). `BqAccess.duckdb_session()` now acquires + pre-warmed connections from a bounded process-local pool instead of running + `INSTALL bigquery; LOAD bigquery; CREATE SECRET; ATTACH …` on every request. + Each acquire saves the ~0.5 s extension-load + secret-creation cost when + the pool has a warm entry; auth SECRET is refreshed on acquire so a + long-lived pooled entry doesn't keep a stale GCE metadata token past its + TTL. Pool size is configurable via `data_source.bigquery.session_pool_size` + (default 4; sentinel `0` disables pooling). Affects every BQ-touching path + — `/api/query`, `/api/v2/scan`, `/api/v2/sample`, `/api/v2/schema`, + materialize, and the orchestrator's remote-attach. + ### Fixed - **`/api/query` (and `agnes query --remote`) now rewrites user SQL referencing `query_mode='remote'` BigQuery rows into a single `bigquery_query()` call diff --git a/config/instance.yaml.example b/config/instance.yaml.example index c836144..be2c8a0 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -135,6 +135,13 @@ data_source: # # view-backed datasets -- bumped to 600 000 ms = 10 min by default. # # Set 0 to fall through to the extension default. Configurable via # # /admin/server-config UI. + # session_pool_size: 4 + # # Number of pre-warmed DuckDB+bigquery-extension sessions kept + # # in a process-local pool. Each acquire amortizes the + # # ~0.5 s INSTALL/LOAD/CREATE-SECRET cost across requests; a fresh + # # build only happens when the pool is empty. Default 4. Set 0 + # # to disable pooling (every acquire builds + closes a fresh + # # session; matches pre-pool behavior). # --- OpenMetadata catalog (optional) --- # Enriches table and column metadata from OpenMetadata REST API. diff --git a/connectors/bigquery/access.py b/connectors/bigquery/access.py index 48e4e9f..b29483a 100644 --- a/connectors/bigquery/access.py +++ b/connectors/bigquery/access.py @@ -8,6 +8,8 @@ from __future__ import annotations import functools import logging +import threading +from collections import deque from contextlib import contextmanager from dataclasses import dataclass from typing import Callable, Iterator, Literal @@ -196,15 +198,40 @@ def _default_client_factory(projects: BqProjects): ) -@contextmanager -def _default_duckdb_session_factory(projects: BqProjects): - """Yield an in-memory DuckDB conn with bigquery extension loaded + SECRET set - from get_metadata_token(). Auto-cleanup. Translates auth/install failures - to BqAccessError(kind='auth_failed' or 'bq_lib_missing'). +def _default_pool_size() -> int: + """Resolve the BQ DuckDB-extension session pool size from instance.yaml. - Note: `projects.billing` is not used by this factory directly — bigquery_query() - callers pass it themselves as the first positional arg to identify the billing - project. The factory keeps the parameter for symmetry with _default_client_factory. + Reads ``data_source.bigquery.session_pool_size`` (default 4). Sentinel + ``0`` disables pooling (every acquire builds + closes a fresh session; + matches pre-pool behavior). Negative / non-numeric values fall back to + the default — the pool is a perf optimization, not a correctness + boundary, so an unparseable config shouldn't fail-stop the app. + """ + try: + from app.instance_config import get_value + except Exception: + return 4 + raw = get_value("data_source", "bigquery", "session_pool_size", default=4) + try: + n = int(raw) if raw is not None else 4 + except (TypeError, ValueError): + logger.warning( + "BQ session_pool_size=%r is not an int; falling back to default 4", + raw, + ) + return 4 + if n < 0: + return 4 + return n + + +def _build_fresh_bq_session(): + """Build a single fresh in-memory DuckDB conn with the bigquery extension + INSTALL/LOAD'd, the auth SECRET created from get_metadata_token(), and + per-session settings applied. Translates auth / install failures to + BqAccessError. Caller owns the close. + + Used internally by the pool; also used directly when pooling is disabled. """ import duckdb # type: ignore from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError @@ -220,22 +247,160 @@ def _default_duckdb_session_factory(projects: BqProjects): conn = duckdb.connect(":memory:") try: + conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") + escaped = token.replace("'", "''") + conn.execute( + f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')" + ) + except Exception as e: + # Build failed — must close the half-initialised conn, otherwise it + # leaks across the pool's lifetime. try: - conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") - escaped = token.replace("'", "''") - conn.execute( - f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')" - ) - except Exception as e: - raise BqAccessError( - "bq_lib_missing", - f"failed to install/load BigQuery DuckDB extension: {e}", - details={"original": str(e)}, - ) - apply_bq_session_settings(conn) + conn.close() + except Exception: + pass + raise BqAccessError( + "bq_lib_missing", + f"failed to install/load BigQuery DuckDB extension: {e}", + details={"original": str(e)}, + ) + apply_bq_session_settings(conn) + return conn + + +def _refresh_bq_secret(conn) -> None: + """Refresh the auth SECRET on a pooled connection so token rotation + (default GCE metadata token TTL ~1 hr) doesn't break long-lived + pooled entries. + + Cheap when the token cache is warm (a few µs). Failures are + non-fatal here — the pool's liveness probe + per-acquire build + fallback will catch genuinely-broken entries. + """ + from connectors.bigquery.auth import get_metadata_token + try: + token = get_metadata_token() + escaped = token.replace("'", "''") + conn.execute( + f"CREATE OR REPLACE SECRET bq_s (TYPE bigquery, ACCESS_TOKEN '{escaped}')" + ) + except Exception as e: + # Bubble up so the pool drops this entry and rebuilds. + raise BqAccessError( + "auth_failed", + f"could not refresh BQ secret on pooled session: {e}", + details={"original": str(e)}, + ) + + +def _is_pool_entry_alive(conn) -> bool: + """Cheap liveness probe — `SELECT 1`. Returns False on any error so + the pool reaper drops the entry and builds a fresh one.""" + try: + result = conn.execute("SELECT 1").fetchone() + return result is not None and result[0] == 1 + except Exception: + return False + + +# Module-level pool state. Process-cached (mirrors get_bq_access's lifetime). +# Not fork-safe — single uvicorn worker process is the supported deployment +# shape per CLAUDE.md. +_pool: deque = deque() +_pool_lock = threading.Lock() + + +def _reset_session_pool_for_tests() -> None: + """Drop and close every pooled entry. Test helper — production code + should not call this. Exposed so test fixtures + the existing + test_bq_access tests can pin pre-test pool state to empty.""" + with _pool_lock: + while _pool: + entry = _pool.popleft() + try: + entry.close() + except Exception: + pass + + +@contextmanager +def _default_duckdb_session_factory(projects: BqProjects): + """Yield a pooled in-memory DuckDB conn with bigquery extension loaded + + SECRET set from get_metadata_token(). Translates auth / install + failures to BqAccessError(kind='auth_failed' or 'bq_lib_missing'). + + Pooling: amortizes the ~0.5 s INSTALL/LOAD/ATTACH cost across requests + by keeping pre-warmed connections in a bounded deque. Acquire reuses + an existing entry when available (refreshing its auth SECRET so + token rotation doesn't break long-lived entries) and probes liveness + cheaply via ``SELECT 1`` before handing it to the caller. On normal + exit the connection returns to the pool; on exception it's closed + instead (the underlying session may carry dirty state). + + Pool size is ``data_source.bigquery.session_pool_size`` (default 4; + sentinel ``0`` disables pooling entirely, matching pre-pool + behavior). Process-cached, not fork-safe. + + Note: `projects.billing` is not used by this factory directly — bigquery_query() + callers pass it themselves as the first positional arg to identify the billing + project. The factory keeps the parameter for symmetry with _default_client_factory. + """ + pool_size = _default_pool_size() + + # Acquire: prefer a warm entry, fall back to fresh build. + conn = None + if pool_size > 0: + while True: + with _pool_lock: + entry = _pool.popleft() if _pool else None + if entry is None: + break + if not _is_pool_entry_alive(entry): + # Reaper: drop broken entries. + try: + entry.close() + except Exception: + pass + continue + try: + # Refresh the auth SECRET so a long-lived pool entry + # doesn't keep a stale token past its TTL. Cheap when + # the token cache is warm. + _refresh_bq_secret(entry) + except BqAccessError: + try: + entry.close() + except Exception: + pass + continue + conn = entry + break + + if conn is None: + conn = _build_fresh_bq_session() + + try: yield conn - finally: - conn.close() + except Exception: + # Caller saw an exception — the conn may be in a dirty state. + # Don't return to pool; close to release native resources. + try: + conn.close() + except Exception: + pass + raise + else: + # Normal exit — return to pool if there's room. + if pool_size > 0: + with _pool_lock: + if len(_pool) < pool_size: + _pool.append(conn) + return + # Pool disabled or full — close. + try: + conn.close() + except Exception: + pass def apply_bq_session_settings(conn) -> None: diff --git a/tests/test_bq_access.py b/tests/test_bq_access.py index cfa1282..ed69011 100644 --- a/tests/test_bq_access.py +++ b/tests/test_bq_access.py @@ -1,5 +1,6 @@ """Tests for connectors/bigquery/access.py — the BqAccess facade.""" import pytest +import threading class TestBqProjects: @@ -208,8 +209,21 @@ class TestDefaultClientFactory: class TestDefaultDuckdbSessionFactory: - def test_yields_duckdb_conn_with_secret_then_closes(self, monkeypatch): - from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + def test_yields_duckdb_conn_with_secret_set_via_pool(self, monkeypatch): + """The pool's first acquire on an empty pool runs the full + INSTALL/LOAD/SECRET sequence. After the with-block exits the + connection is RETURNED to the pool (not closed) so the next + acquire amortizes the extension-load cost. + + Pre-pool semantics (close-on-exit) are preserved on broken + entries + on the explicit pool-reset path; covered in + TestBqSessionPool. + """ + from connectors.bigquery.access import ( + _default_duckdb_session_factory, BqProjects, + _reset_session_pool_for_tests, + ) + _reset_session_pool_for_tests() executed_sql = [] @@ -218,7 +232,10 @@ class TestDefaultDuckdbSessionFactory: self.closed = False def execute(self, sql, params=None): executed_sql.append((sql, params)) - return self + class _Result: + def fetchone(self_inner): + return (1,) + return _Result() def close(self): self.closed = True @@ -228,19 +245,36 @@ class TestDefaultDuckdbSessionFactory: with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn: assert conn is fake_conn - assert fake_conn.closed is True + # Pool retains the conn — close happens at pool reset / shutdown. + assert fake_conn.closed is False # Verify INSTALL/LOAD/SECRET sequence ran assert any("INSTALL bigquery" in sql for sql, _ in executed_sql) assert any("LOAD bigquery" in sql for sql, _ in executed_sql) assert any("CREATE OR REPLACE SECRET" in sql and "tok123" in sql for sql, _ in executed_sql) + # Explicit pool reset closes the retained entry. + _reset_session_pool_for_tests() + assert fake_conn.closed is True + def test_closes_on_exception_inside_with_block(self, monkeypatch): - from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + """Exceptions inside the with-block leave the underlying conn in + an unknown state (half-completed query, dirty session); the pool + treats it as broken and closes it rather than returning to pool. + """ + from connectors.bigquery.access import ( + _default_duckdb_session_factory, BqProjects, + _reset_session_pool_for_tests, + ) + _reset_session_pool_for_tests() class FakeConn: closed = False - def execute(self, *a, **kw): return self + def execute(self, *a, **kw): + class _Result: + def fetchone(self_inner): + return (1,) + return _Result() def close(self): self.closed = True fake_conn = FakeConn() @@ -449,3 +483,222 @@ class TestGetBqAccess: assert a is b assert isinstance(a, BqAccess) assert a.projects.billing == "" + + +# --------------------------------------------------------------------------- +# DuckDB BQ-extension session pool — amortizes the ~0.5 s INSTALL/LOAD/ATTACH +# cost across requests by keeping pre-warmed DuckDB connections in a +# bounded pool. Each acquire reuses an existing connection (refreshing the +# auth SECRET so token rotation doesn't break long-lived entries) instead +# of spinning up a fresh DuckDB+extension load every time. +# --------------------------------------------------------------------------- + + +class _PoolFakeConn: + """Fake DuckDB connection that records executed SQL and supports + ``close()``. Used across pool tests so we can pin behavior without + booting the real BigQuery extension.""" + _serial = 0 + + def __init__(self): + type(self)._serial += 1 + self.id = type(self)._serial + self.closed = False + self.executed: list[str] = [] + + def execute(self, sql, params=None): + self.executed.append(sql) + # Liveness probe: SELECT 1 returns something fetchable. + class _Result: + def fetchone(self_inner): + return (1,) + def fetchall(self_inner): + return [(1,)] + return _Result() + + def close(self): + self.closed = True + + +@pytest.fixture +def reset_pool(monkeypatch): + """Reset the BQ session pool singleton between tests so leak-detection + assertions don't carry state.""" + from connectors.bigquery import access as bq_access_mod + if hasattr(bq_access_mod, "_reset_session_pool_for_tests"): + bq_access_mod._reset_session_pool_for_tests() + monkeypatch.setattr( + "connectors.bigquery.auth.get_metadata_token", + lambda: "tok-pool", + ) + yield + if hasattr(bq_access_mod, "_reset_session_pool_for_tests"): + bq_access_mod._reset_session_pool_for_tests() + + +class TestBqSessionPool: + def test_pool_reuses_connections_across_acquires(self, monkeypatch, reset_pool): + """Acquiring a session, releasing, then acquiring again must return + the SAME underlying DuckDB connection — no INSTALL/LOAD overhead on + the second request. This is the whole point of the pool.""" + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + # Each duckdb.connect() yields a fresh _PoolFakeConn so we can tell + # them apart by id. + connections_made = [] + def fake_connect(_path): + c = _PoolFakeConn() + connections_made.append(c) + return c + monkeypatch.setattr("duckdb.connect", fake_connect) + + # First acquire: pool is empty, factory builds a new entry. + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1: + id1 = conn1.id + + # Second acquire: pool has a warm entry, must hand back the same conn. + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2: + id2 = conn2.id + + assert id1 == id2, ( + "expected the same pooled connection across two acquires; " + f"got id1={id1}, id2={id2}" + ) + # And we must NOT have re-INSTALLed/LOADed the extension on reuse — + # only one duckdb.connect() call ever happened. + assert len(connections_made) == 1, ( + f"pool re-built the conn on second acquire; created {len(connections_made)}" + ) + + def test_pool_size_is_configurable(self, monkeypatch, reset_pool): + """``data_source.bigquery.session_pool_size`` controls the upper + bound on warm entries. Above the cap, releasing extra entries + closes them rather than retaining.""" + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + def fake_get_value(*keys, default=None): + if keys == ("data_source", "bigquery", "session_pool_size"): + return 2 # tiny pool + if keys == ("data_source", "bigquery", "query_timeout_ms"): + return 0 # don't try to SET timeout in tests + return default + + monkeypatch.setattr("app.instance_config.get_value", fake_get_value) + monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn()) + + # Acquire 3 in parallel to force 3 simultaneous entries. + cm1 = _default_duckdb_session_factory(BqProjects(billing="b", data="d")) + c1 = cm1.__enter__() + cm2 = _default_duckdb_session_factory(BqProjects(billing="b", data="d")) + c2 = cm2.__enter__() + cm3 = _default_duckdb_session_factory(BqProjects(billing="b", data="d")) + c3 = cm3.__enter__() + + # Release all three. The 3rd release should close the conn since + # the pool already has 2. + cm1.__exit__(None, None, None) + cm2.__exit__(None, None, None) + cm3.__exit__(None, None, None) + + # At least one of the three connections must be closed (pool overflow). + closed_count = sum(1 for c in (c1, c2, c3) if c.closed) + assert closed_count >= 1, ( + "pool retained more than its configured size; expected at least " + f"one close. closed_count={closed_count}" + ) + # Pool retained at most `size` entries, so total live + closed = 3, + # closed >= 1 means pool size <= 2. + assert closed_count == 1 + + def test_pool_replaces_broken_connection(self, monkeypatch, reset_pool): + """If a pooled entry's liveness check fails on acquire (the + underlying DuckDB conn was closed externally, BQ extension state + corrupted, etc.), the pool must drop it and build a fresh entry — + not hand the broken one to the caller.""" + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + # First acquire creates entry #1; we'll then mark it broken. + all_conns: list[_PoolFakeConn] = [] + def fake_connect(_path): + c = _PoolFakeConn() + all_conns.append(c) + return c + monkeypatch.setattr("duckdb.connect", fake_connect) + + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1: + id1 = conn1.id + # Simulate corruption: make execute() raise on next call. + def broken_execute(*a, **kw): + raise RuntimeError("connection broken") + conn1.execute = broken_execute # type: ignore[assignment] + + # Second acquire must skip the broken entry and build a fresh one. + with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2: + id2 = conn2.id + + assert id1 != id2, ( + f"expected a fresh conn after broken-pool reaper; both acquires " + f"returned id={id1}" + ) + assert len(all_conns) >= 2 + + def test_pool_handles_reentrant_acquires_thread_safe(self, monkeypatch, reset_pool): + """Concurrent acquires from multiple threads must never hand the + same underlying DuckDB conn to two threads at once. The pool's + lock acquires/releases are the load-bearing invariant here. + """ + from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects + + monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn()) + + active_ids: set = set() + active_lock = threading.Lock() + violations: list = [] + + def worker(): + for _ in range(20): + with _default_duckdb_session_factory( + BqProjects(billing="b", data="d"), + ) as conn: + with active_lock: + if conn.id in active_ids: + violations.append(conn.id) + active_ids.add(conn.id) + # Hold briefly to give other threads a chance to race. + time.sleep(0.001) + with active_lock: + active_ids.discard(conn.id) + + import time + threads = [threading.Thread(target=worker) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not violations, ( + f"pool handed the same conn to multiple threads concurrently: " + f"{violations}" + ) + + def test_pool_does_not_apply_when_factory_is_injected(self, monkeypatch, reset_pool): + """Test fixtures that inject a custom ``duckdb_session_factory`` + (e.g. tests/conftest.py's ``bq_access`` fixture) MUST bypass the + pool entirely — otherwise their nullcontext-wrapped fake would + get retained between tests and corrupt downstream assertions. + """ + from connectors.bigquery.access import BqAccess, BqProjects + from contextlib import contextmanager + + sentinel = object() + + @contextmanager + def custom_factory(_projects): + yield sentinel + + bq = BqAccess( + BqProjects(billing="b", data="d"), + duckdb_session_factory=custom_factory, + ) + with bq.duckdb_session() as conn: + assert conn is sentinel