fix(bigquery): apply bq_query_timeout_ms on every BQ-extension attach + surface silent failures
The DuckDB BigQuery extension defaults bq_query_timeout_ms to 90 s,
which is too tight for analyst-scale queries against view-backed BQ
datasets. Agnes already has apply_bq_session_settings() that bumps it
to 600 s (configurable via data_source.bigquery.query_timeout_ms), but
two regressions let the 90 s default leak through to live queries:
1. apply_bq_session_settings() swallowed every Exception silently. If
the BigQuery extension wasn't loaded on the connection yet, or the
installed extension version didn't recognise the setting, the SET
would fail and the function would return without surfacing the
problem. Operators saw 90 s timeouts on 'agnes query --remote' with
no log line explaining why.
2. The call sites in src/db.py:_reattach_remote_extensions and
src/orchestrator.py:_remote_attach only invoked
apply_bq_session_settings on the metadata-token branch (token_env
empty, the BqAccess contract). The token-based and no-auth branches
ran ATTACH against the BigQuery extension without ever applying the
timeout setting — so any BQ source registered with an explicit
token_env, or with no auth env at all, fell back to the 90 s default.
Fix:
- apply_bq_session_settings now logs WARNING on each failure path
(instance_config import error, non-numeric value, SET execution
failure, readback error). It also verifies the setting actually
landed via SELECT current_setting('bq_query_timeout_ms') and logs
WARNING when the readback disagrees with the requested value, which
catches the silent-ignore case some extension versions exhibit.
- Both _reattach_remote_extensions (src/db.py) and _remote_attach
(src/orchestrator.py) now call apply_bq_session_settings on every
branch that ATTACHes a BigQuery alias, not only the metadata-token
branch. Idempotent: calling it twice on the metadata-token path is a
no-op SET.
Tests:
- Extended the _RecordingConn fixture to support .fetchone() so the
readback assertion path works. Updated existing call-shape
assertions to expect the SELECT current_setting readback alongside
the SET. Added two new tests covering the WARNING surfaces for SET
failure and readback mismatch — regression guards for the silent-
fallback bug this PR addresses.
- Full BQ-touching suite (398 tests) passes.
This commit is contained in:
parent
f598b7e2f6
commit
32c8ea601a
4 changed files with 176 additions and 19 deletions
|
|
@ -249,12 +249,24 @@ def apply_bq_session_settings(conn) -> None:
|
|||
|
||||
Call AFTER ``LOAD bigquery`` on every DuckDB session that touches BQ:
|
||||
BqAccess's session factory, the standalone extractor in
|
||||
``connectors/bigquery/extractor.py``, and the orchestrator's
|
||||
``_remote_attach`` path in ``src/orchestrator.py``.
|
||||
``connectors/bigquery/extractor.py``, the orchestrator's
|
||||
``_remote_attach`` path in ``src/orchestrator.py``, and ``src/db.py``'s
|
||||
read-only analytics-DB factory (called from ``_reattach_remote_extensions``
|
||||
plus a belt-and-suspenders call from ``get_analytics_db_readonly`` itself).
|
||||
|
||||
SET failures are logged at WARNING level (previously silent) so operators
|
||||
can diagnose timeouts that surface as the extension default 90 s when the
|
||||
intended value was higher. The applied value is verified via
|
||||
``current_setting('bq_query_timeout_ms')``; a mismatch is also logged.
|
||||
"""
|
||||
try:
|
||||
from app.instance_config import get_value
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"apply_bq_session_settings: instance_config unavailable (%s); "
|
||||
"extension default bq_query_timeout_ms (90 s) will apply",
|
||||
e,
|
||||
)
|
||||
return
|
||||
raw = get_value(
|
||||
"data_source", "bigquery", "query_timeout_ms", default=600_000,
|
||||
|
|
@ -262,16 +274,61 @@ def apply_bq_session_settings(conn) -> None:
|
|||
try:
|
||||
ms = int(raw) if raw is not None else 0
|
||||
except (TypeError, ValueError):
|
||||
logger.warning(
|
||||
"apply_bq_session_settings: query_timeout_ms=%r is not an int; "
|
||||
"extension default (90 s) will apply",
|
||||
raw,
|
||||
)
|
||||
return
|
||||
if ms <= 0:
|
||||
# Operator opt-out: leave extension default in place. Log INFO so the
|
||||
# choice shows up in startup logs without being noisy.
|
||||
logger.info(
|
||||
"apply_bq_session_settings: query_timeout_ms=%d (≤0); extension "
|
||||
"default bq_query_timeout_ms (90 s) will apply",
|
||||
ms,
|
||||
)
|
||||
return
|
||||
try:
|
||||
conn.execute(f"SET bq_query_timeout_ms = {int(ms)}")
|
||||
except Exception:
|
||||
# Fail-soft: extension version may not support the setting, or the
|
||||
# session may already have been frozen — leave the default rather
|
||||
# than poisoning the whole session.
|
||||
pass
|
||||
except Exception as e:
|
||||
# Most common cause: the BigQuery extension is not loaded on this
|
||||
# connection yet (caller forgot the `LOAD bigquery` step), or the
|
||||
# installed extension version pre-dates the setting. Either way the
|
||||
# 90 s default sticks and remote queries time out unexpectedly.
|
||||
# Surface this — silent fallback was the bug behind real outages.
|
||||
logger.warning(
|
||||
"apply_bq_session_settings: SET bq_query_timeout_ms=%d failed (%s); "
|
||||
"extension default (90 s) will apply. Likely cause: BigQuery "
|
||||
"extension not loaded on this connection, or the installed "
|
||||
"extension version does not support this setting.",
|
||||
ms, e,
|
||||
)
|
||||
return
|
||||
# Verify the setting actually landed — protects against silent ignores
|
||||
# the extension might do in some failure modes.
|
||||
try:
|
||||
result = conn.execute(
|
||||
"SELECT current_setting('bq_query_timeout_ms')"
|
||||
).fetchone()
|
||||
actual = int(result[0]) if result and result[0] is not None else None
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"apply_bq_session_settings: could not read back "
|
||||
"bq_query_timeout_ms (%s); cannot verify setting was applied",
|
||||
e,
|
||||
)
|
||||
return
|
||||
if actual != ms:
|
||||
logger.warning(
|
||||
"apply_bq_session_settings: requested bq_query_timeout_ms=%d but "
|
||||
"current_setting reports %r — extension may have ignored the SET",
|
||||
ms, actual,
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"apply_bq_session_settings: bq_query_timeout_ms=%d applied", ms,
|
||||
)
|
||||
|
||||
|
||||
class BqAccess:
|
||||
|
|
|
|||
11
src/db.py
11
src/db.py
|
|
@ -690,10 +690,21 @@ def _reattach_remote_extensions(
|
|||
conn.execute(
|
||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')"
|
||||
)
|
||||
# Apply BQ session settings on every BQ-extension attach,
|
||||
# not only the metadata-token branch above. Previously the
|
||||
# token-based branch fell through without setting
|
||||
# bq_query_timeout_ms, leaving the 90 s extension default
|
||||
# in place and causing "remote query timeout" surprises.
|
||||
if extension == "bigquery":
|
||||
from connectors.bigquery.access import apply_bq_session_settings
|
||||
apply_bq_session_settings(conn)
|
||||
else:
|
||||
conn.execute(
|
||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
|
||||
)
|
||||
if extension == "bigquery":
|
||||
from connectors.bigquery.access import apply_bq_session_settings
|
||||
apply_bq_session_settings(conn)
|
||||
attached_dbs.add(alias)
|
||||
logger.debug("Re-attached remote source %s via %s extension", alias, extension)
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -512,11 +512,22 @@ class SyncOrchestrator:
|
|||
conn.execute(
|
||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')"
|
||||
)
|
||||
# Apply BQ session settings on every BQ-extension attach,
|
||||
# not only the metadata-token branch above. The token-based
|
||||
# branch previously fell through without calling
|
||||
# apply_bq_session_settings, leaving the 90 s extension
|
||||
# default for bq_query_timeout_ms in place.
|
||||
if extension == "bigquery":
|
||||
from connectors.bigquery.access import apply_bq_session_settings
|
||||
apply_bq_session_settings(conn)
|
||||
else:
|
||||
# No auth required (or extension handles it via env automatically)
|
||||
conn.execute(
|
||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
|
||||
)
|
||||
if extension == "bigquery":
|
||||
from connectors.bigquery.access import apply_bq_session_settings
|
||||
apply_bq_session_settings(conn)
|
||||
|
||||
logger.info("Attached remote source %s via %s extension", alias, extension)
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -13,18 +13,45 @@ from connectors.bigquery.access import apply_bq_session_settings
|
|||
class _RecordingConn:
|
||||
"""Minimal DuckDB-conn stand-in that records execute() calls.
|
||||
|
||||
apply_bq_session_settings only calls .execute(); we don't need a
|
||||
real DuckDB to verify the SET command shape.
|
||||
apply_bq_session_settings calls .execute() to issue the SET and the
|
||||
follow-up ``current_setting`` readback (added so the function can
|
||||
verify the extension actually accepted the setting). The readback
|
||||
expects .fetchone() on the result — wire it to echo the SET value
|
||||
so the verification path succeeds when nothing rejects the SET.
|
||||
"""
|
||||
|
||||
SETTING_NAME = "bq_query_timeout_ms"
|
||||
SET_PREFIX = f"SET {SETTING_NAME} = "
|
||||
|
||||
def __init__(self, raise_on=None):
|
||||
self.calls: list[str] = []
|
||||
self.raise_on = raise_on
|
||||
# Last value the extension would report from
|
||||
# current_setting('bq_query_timeout_ms') — set when SET is observed,
|
||||
# echoed back from .fetchone().
|
||||
self._reported_setting: str | None = None
|
||||
|
||||
def execute(self, sql: str):
|
||||
self.calls.append(sql)
|
||||
if self.raise_on and self.raise_on in sql:
|
||||
raise RuntimeError(f"simulated failure on: {sql}")
|
||||
if sql.startswith(self.SET_PREFIX):
|
||||
# Capture the value the production code asked the extension to
|
||||
# apply so the readback below echoes a consistent answer.
|
||||
self._reported_setting = sql[len(self.SET_PREFIX):]
|
||||
return _RecordingResult(self._reported_setting)
|
||||
|
||||
|
||||
class _RecordingResult:
|
||||
"""Stand-in for the DuckDB result of ``SELECT current_setting(...)``."""
|
||||
|
||||
def __init__(self, value):
|
||||
self._value = value
|
||||
|
||||
def fetchone(self):
|
||||
# current_setting returns a one-tuple. None is the realistic
|
||||
# answer when the extension doesn't have the setting registered.
|
||||
return (self._value,)
|
||||
|
||||
|
||||
def _patched_get_value(value):
|
||||
|
|
@ -42,7 +69,8 @@ def _patched_get_value(value):
|
|||
def test_default_when_config_missing():
|
||||
"""When get_value returns the default (None passed through, default arg
|
||||
used), apply_bq_session_settings should fall back to the bumped
|
||||
600 000 ms default and emit the SET."""
|
||||
600 000 ms default, emit the SET, and verify it landed via the
|
||||
current_setting readback."""
|
||||
conn = _RecordingConn()
|
||||
# Simulate get_value returning the default we passed (600_000) by
|
||||
# echoing the default kwarg.
|
||||
|
|
@ -50,14 +78,20 @@ def test_default_when_config_missing():
|
|||
return default
|
||||
with patch("app.instance_config.get_value", side_effect=fake):
|
||||
apply_bq_session_settings(conn)
|
||||
assert conn.calls == ["SET bq_query_timeout_ms = 600000"]
|
||||
assert conn.calls == [
|
||||
"SET bq_query_timeout_ms = 600000",
|
||||
"SELECT current_setting('bq_query_timeout_ms')",
|
||||
]
|
||||
|
||||
|
||||
def test_explicit_value():
|
||||
conn = _RecordingConn()
|
||||
with _patched_get_value(900_000):
|
||||
apply_bq_session_settings(conn)
|
||||
assert conn.calls == ["SET bq_query_timeout_ms = 900000"]
|
||||
assert conn.calls == [
|
||||
"SET bq_query_timeout_ms = 900000",
|
||||
"SELECT current_setting('bq_query_timeout_ms')",
|
||||
]
|
||||
|
||||
|
||||
def test_zero_sentinel_leaves_extension_default():
|
||||
|
|
@ -95,19 +129,63 @@ def test_string_numeric_is_coerced():
|
|||
conn = _RecordingConn()
|
||||
with _patched_get_value("750000"):
|
||||
apply_bq_session_settings(conn)
|
||||
assert conn.calls == ["SET bq_query_timeout_ms = 750000"]
|
||||
assert conn.calls == [
|
||||
"SET bq_query_timeout_ms = 750000",
|
||||
"SELECT current_setting('bq_query_timeout_ms')",
|
||||
]
|
||||
|
||||
|
||||
def test_set_failure_does_not_propagate():
|
||||
def test_set_failure_does_not_propagate(caplog):
|
||||
"""Older DuckDB BQ extension versions may not recognise the setting.
|
||||
The function must fail-soft so a session that was otherwise healthy
|
||||
keeps working — just with the extension's built-in default timeout."""
|
||||
keeps working — just with the extension's built-in default timeout.
|
||||
The failure is logged at WARNING so an operator who hits the 90 s
|
||||
extension default unexpectedly can see why."""
|
||||
conn = _RecordingConn(raise_on="SET bq_query_timeout_ms")
|
||||
with _patched_get_value(600_000):
|
||||
# Must not raise.
|
||||
apply_bq_session_settings(conn)
|
||||
# The SET was attempted (recorded before the exception).
|
||||
with caplog.at_level("WARNING", logger="connectors.bigquery.access"):
|
||||
# Must not raise.
|
||||
apply_bq_session_settings(conn)
|
||||
# The SET was attempted (recorded before the exception); no readback
|
||||
# because the SET path raised before reaching it.
|
||||
assert conn.calls == ["SET bq_query_timeout_ms = 600000"]
|
||||
assert any(
|
||||
"SET bq_query_timeout_ms=600000 failed" in r.message
|
||||
for r in caplog.records
|
||||
), "expected a WARNING surfacing the silent-failure regression that hid 90 s timeouts"
|
||||
|
||||
|
||||
def test_setting_mismatch_is_logged(caplog):
|
||||
"""If the extension accepts the SET silently but doesn't actually apply
|
||||
it (some failure modes), the readback verification must surface the
|
||||
mismatch via WARNING so operators can diagnose."""
|
||||
conn = _RecordingConn()
|
||||
# Simulate extension ignoring the SET: keep the readback value at
|
||||
# whatever it was before (None — extension default in effect).
|
||||
conn._reported_setting = None # pre-seed: readback returns None
|
||||
with _patched_get_value(600_000):
|
||||
with caplog.at_level("WARNING", logger="connectors.bigquery.access"):
|
||||
# _RecordingConn echoes the SET into _reported_setting on observe;
|
||||
# to simulate "extension ignored SET" we override execute() to
|
||||
# NOT update the setting on SET.
|
||||
original_execute = conn.execute
|
||||
|
||||
def execute_without_capture(sql: str):
|
||||
conn.calls.append(sql)
|
||||
if sql.startswith(_RecordingConn.SET_PREFIX):
|
||||
# Don't update _reported_setting → readback returns None
|
||||
return _RecordingResult(conn._reported_setting)
|
||||
return _RecordingResult(conn._reported_setting)
|
||||
|
||||
conn.execute = execute_without_capture # type: ignore[method-assign]
|
||||
try:
|
||||
apply_bq_session_settings(conn)
|
||||
finally:
|
||||
conn.execute = original_execute # type: ignore[method-assign]
|
||||
assert any(
|
||||
"current_setting reports" in r.message
|
||||
for r in caplog.records
|
||||
), "expected a WARNING when the readback disagrees with the SET"
|
||||
|
||||
|
||||
def test_no_app_config_module_silently_skipped():
|
||||
|
|
|
|||
Loading…
Reference in a new issue