Merge pull request #197 from keboola/fix/bigquery-extension-timeout
fix(bigquery): apply bq_query_timeout_ms on every BQ attach + surface silent failures
This commit is contained in:
commit
226ec9e189
6 changed files with 199 additions and 20 deletions
22
CHANGELOG.md
22
CHANGELOG.md
|
|
@ -10,6 +10,28 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.38.2] — 2026-05-06
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- **`bq_query_timeout_ms` was not applied on every BigQuery ATTACH branch**
|
||||||
|
(`src/db.py:_reattach_remote_extensions`,
|
||||||
|
`src/orchestrator.py:_attach_remote_extensions`). Pre-fix only the
|
||||||
|
metadata-token branch (the BqAccess contract, `token_env=''`) called
|
||||||
|
`apply_bq_session_settings`. BigQuery sources registered with an explicit
|
||||||
|
`token_env`, or with no auth env, ATTACH'd without ever applying the
|
||||||
|
timeout — falling back to the extension's 90 s default. Default-config
|
||||||
|
operators on those branches now consistently get the configured 600 s
|
||||||
|
(or whatever `data_source.bigquery.query_timeout_ms` is set to).
|
||||||
|
- **`apply_bq_session_settings` swallowed every `Exception` silently**
|
||||||
|
(`connectors/bigquery/access.py`). Two realistic failure modes — the
|
||||||
|
BigQuery extension not yet loaded on the connection, or an installed
|
||||||
|
extension version that doesn't recognise the setting — left the 90 s
|
||||||
|
default in place with no log line explaining why. Each failure path
|
||||||
|
now logs `WARNING` with the actionable cause; on success the applied
|
||||||
|
value is verified via a `current_setting('bq_query_timeout_ms')`
|
||||||
|
readback (catches the silent-ignore mode some extension versions
|
||||||
|
exhibit) and a mismatch logs `WARNING` too.
|
||||||
|
|
||||||
## [0.38.1] — 2026-05-06
|
## [0.38.1] — 2026-05-06
|
||||||
|
|
||||||
### Internal
|
### Internal
|
||||||
|
|
|
||||||
|
|
@ -249,12 +249,24 @@ def apply_bq_session_settings(conn) -> None:
|
||||||
|
|
||||||
Call AFTER ``LOAD bigquery`` on every DuckDB session that touches BQ:
|
Call AFTER ``LOAD bigquery`` on every DuckDB session that touches BQ:
|
||||||
BqAccess's session factory, the standalone extractor in
|
BqAccess's session factory, the standalone extractor in
|
||||||
``connectors/bigquery/extractor.py``, and the orchestrator's
|
``connectors/bigquery/extractor.py``, the orchestrator's
|
||||||
``_remote_attach`` path in ``src/orchestrator.py``.
|
``_remote_attach`` path in ``src/orchestrator.py``, and ``src/db.py``'s
|
||||||
|
read-only analytics-DB factory (called from ``_reattach_remote_extensions``
|
||||||
|
plus a belt-and-suspenders call from ``get_analytics_db_readonly`` itself).
|
||||||
|
|
||||||
|
SET failures are logged at WARNING level (previously silent) so operators
|
||||||
|
can diagnose timeouts that surface as the extension default 90 s when the
|
||||||
|
intended value was higher. The applied value is verified via
|
||||||
|
``current_setting('bq_query_timeout_ms')``; a mismatch is also logged.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
from app.instance_config import get_value
|
from app.instance_config import get_value
|
||||||
except Exception:
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"apply_bq_session_settings: instance_config unavailable (%s); "
|
||||||
|
"extension default bq_query_timeout_ms (90 s) will apply",
|
||||||
|
e,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
raw = get_value(
|
raw = get_value(
|
||||||
"data_source", "bigquery", "query_timeout_ms", default=600_000,
|
"data_source", "bigquery", "query_timeout_ms", default=600_000,
|
||||||
|
|
@ -262,16 +274,61 @@ def apply_bq_session_settings(conn) -> None:
|
||||||
try:
|
try:
|
||||||
ms = int(raw) if raw is not None else 0
|
ms = int(raw) if raw is not None else 0
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
|
logger.warning(
|
||||||
|
"apply_bq_session_settings: query_timeout_ms=%r is not an int; "
|
||||||
|
"extension default (90 s) will apply",
|
||||||
|
raw,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
if ms <= 0:
|
if ms <= 0:
|
||||||
|
# Operator opt-out: leave extension default in place. Log INFO so the
|
||||||
|
# choice shows up in startup logs without being noisy.
|
||||||
|
logger.info(
|
||||||
|
"apply_bq_session_settings: query_timeout_ms=%d (≤0); extension "
|
||||||
|
"default bq_query_timeout_ms (90 s) will apply",
|
||||||
|
ms,
|
||||||
|
)
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
conn.execute(f"SET bq_query_timeout_ms = {int(ms)}")
|
conn.execute(f"SET bq_query_timeout_ms = {int(ms)}")
|
||||||
except Exception:
|
except Exception as e:
|
||||||
# Fail-soft: extension version may not support the setting, or the
|
# Most common cause: the BigQuery extension is not loaded on this
|
||||||
# session may already have been frozen — leave the default rather
|
# connection yet (caller forgot the `LOAD bigquery` step), or the
|
||||||
# than poisoning the whole session.
|
# installed extension version pre-dates the setting. Either way the
|
||||||
pass
|
# 90 s default sticks and remote queries time out unexpectedly.
|
||||||
|
# Surface this — silent fallback was the bug behind real outages.
|
||||||
|
logger.warning(
|
||||||
|
"apply_bq_session_settings: SET bq_query_timeout_ms=%d failed (%s); "
|
||||||
|
"extension default (90 s) will apply. Likely cause: BigQuery "
|
||||||
|
"extension not loaded on this connection, or the installed "
|
||||||
|
"extension version does not support this setting.",
|
||||||
|
ms, e,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
# Verify the setting actually landed — protects against silent ignores
|
||||||
|
# the extension might do in some failure modes.
|
||||||
|
try:
|
||||||
|
result = conn.execute(
|
||||||
|
"SELECT current_setting('bq_query_timeout_ms')"
|
||||||
|
).fetchone()
|
||||||
|
actual = int(result[0]) if result and result[0] is not None else None
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"apply_bq_session_settings: could not read back "
|
||||||
|
"bq_query_timeout_ms (%s); cannot verify setting was applied",
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if actual != ms:
|
||||||
|
logger.warning(
|
||||||
|
"apply_bq_session_settings: requested bq_query_timeout_ms=%d but "
|
||||||
|
"current_setting reports %r — extension may have ignored the SET",
|
||||||
|
ms, actual,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
"apply_bq_session_settings: bq_query_timeout_ms=%d applied", ms,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class BqAccess:
|
class BqAccess:
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[project]
|
[project]
|
||||||
name = "agnes-the-ai-analyst"
|
name = "agnes-the-ai-analyst"
|
||||||
version = "0.38.1"
|
version = "0.38.2"
|
||||||
description = "Agnes — AI Data Analyst platform for AI analytical systems"
|
description = "Agnes — AI Data Analyst platform for AI analytical systems"
|
||||||
requires-python = ">=3.11,<3.14"
|
requires-python = ">=3.11,<3.14"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|
|
||||||
11
src/db.py
11
src/db.py
|
|
@ -690,10 +690,21 @@ def _reattach_remote_extensions(
|
||||||
conn.execute(
|
conn.execute(
|
||||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')"
|
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')"
|
||||||
)
|
)
|
||||||
|
# Apply BQ session settings on every BQ-extension attach,
|
||||||
|
# not only the metadata-token branch above. Previously the
|
||||||
|
# token-based branch fell through without setting
|
||||||
|
# bq_query_timeout_ms, leaving the 90 s extension default
|
||||||
|
# in place and causing "remote query timeout" surprises.
|
||||||
|
if extension == "bigquery":
|
||||||
|
from connectors.bigquery.access import apply_bq_session_settings
|
||||||
|
apply_bq_session_settings(conn)
|
||||||
else:
|
else:
|
||||||
conn.execute(
|
conn.execute(
|
||||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
|
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
|
||||||
)
|
)
|
||||||
|
if extension == "bigquery":
|
||||||
|
from connectors.bigquery.access import apply_bq_session_settings
|
||||||
|
apply_bq_session_settings(conn)
|
||||||
attached_dbs.add(alias)
|
attached_dbs.add(alias)
|
||||||
logger.debug("Re-attached remote source %s via %s extension", alias, extension)
|
logger.debug("Re-attached remote source %s via %s extension", alias, extension)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -512,11 +512,22 @@ class SyncOrchestrator:
|
||||||
conn.execute(
|
conn.execute(
|
||||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')"
|
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')"
|
||||||
)
|
)
|
||||||
|
# Apply BQ session settings on every BQ-extension attach,
|
||||||
|
# not only the metadata-token branch above. The token-based
|
||||||
|
# branch previously fell through without calling
|
||||||
|
# apply_bq_session_settings, leaving the 90 s extension
|
||||||
|
# default for bq_query_timeout_ms in place.
|
||||||
|
if extension == "bigquery":
|
||||||
|
from connectors.bigquery.access import apply_bq_session_settings
|
||||||
|
apply_bq_session_settings(conn)
|
||||||
else:
|
else:
|
||||||
# No auth required (or extension handles it via env automatically)
|
# No auth required (or extension handles it via env automatically)
|
||||||
conn.execute(
|
conn.execute(
|
||||||
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
|
f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)"
|
||||||
)
|
)
|
||||||
|
if extension == "bigquery":
|
||||||
|
from connectors.bigquery.access import apply_bq_session_settings
|
||||||
|
apply_bq_session_settings(conn)
|
||||||
|
|
||||||
logger.info("Attached remote source %s via %s extension", alias, extension)
|
logger.info("Attached remote source %s via %s extension", alias, extension)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -13,18 +13,45 @@ from connectors.bigquery.access import apply_bq_session_settings
|
||||||
class _RecordingConn:
|
class _RecordingConn:
|
||||||
"""Minimal DuckDB-conn stand-in that records execute() calls.
|
"""Minimal DuckDB-conn stand-in that records execute() calls.
|
||||||
|
|
||||||
apply_bq_session_settings only calls .execute(); we don't need a
|
apply_bq_session_settings calls .execute() to issue the SET and the
|
||||||
real DuckDB to verify the SET command shape.
|
follow-up ``current_setting`` readback (added so the function can
|
||||||
|
verify the extension actually accepted the setting). The readback
|
||||||
|
expects .fetchone() on the result — wire it to echo the SET value
|
||||||
|
so the verification path succeeds when nothing rejects the SET.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
SETTING_NAME = "bq_query_timeout_ms"
|
||||||
|
SET_PREFIX = f"SET {SETTING_NAME} = "
|
||||||
|
|
||||||
def __init__(self, raise_on=None):
|
def __init__(self, raise_on=None):
|
||||||
self.calls: list[str] = []
|
self.calls: list[str] = []
|
||||||
self.raise_on = raise_on
|
self.raise_on = raise_on
|
||||||
|
# Last value the extension would report from
|
||||||
|
# current_setting('bq_query_timeout_ms') — set when SET is observed,
|
||||||
|
# echoed back from .fetchone().
|
||||||
|
self._reported_setting: str | None = None
|
||||||
|
|
||||||
def execute(self, sql: str):
|
def execute(self, sql: str):
|
||||||
self.calls.append(sql)
|
self.calls.append(sql)
|
||||||
if self.raise_on and self.raise_on in sql:
|
if self.raise_on and self.raise_on in sql:
|
||||||
raise RuntimeError(f"simulated failure on: {sql}")
|
raise RuntimeError(f"simulated failure on: {sql}")
|
||||||
|
if sql.startswith(self.SET_PREFIX):
|
||||||
|
# Capture the value the production code asked the extension to
|
||||||
|
# apply so the readback below echoes a consistent answer.
|
||||||
|
self._reported_setting = sql[len(self.SET_PREFIX):]
|
||||||
|
return _RecordingResult(self._reported_setting)
|
||||||
|
|
||||||
|
|
||||||
|
class _RecordingResult:
|
||||||
|
"""Stand-in for the DuckDB result of ``SELECT current_setting(...)``."""
|
||||||
|
|
||||||
|
def __init__(self, value):
|
||||||
|
self._value = value
|
||||||
|
|
||||||
|
def fetchone(self):
|
||||||
|
# current_setting returns a one-tuple. None is the realistic
|
||||||
|
# answer when the extension doesn't have the setting registered.
|
||||||
|
return (self._value,)
|
||||||
|
|
||||||
|
|
||||||
def _patched_get_value(value):
|
def _patched_get_value(value):
|
||||||
|
|
@ -42,7 +69,8 @@ def _patched_get_value(value):
|
||||||
def test_default_when_config_missing():
|
def test_default_when_config_missing():
|
||||||
"""When get_value returns the default (None passed through, default arg
|
"""When get_value returns the default (None passed through, default arg
|
||||||
used), apply_bq_session_settings should fall back to the bumped
|
used), apply_bq_session_settings should fall back to the bumped
|
||||||
600 000 ms default and emit the SET."""
|
600 000 ms default, emit the SET, and verify it landed via the
|
||||||
|
current_setting readback."""
|
||||||
conn = _RecordingConn()
|
conn = _RecordingConn()
|
||||||
# Simulate get_value returning the default we passed (600_000) by
|
# Simulate get_value returning the default we passed (600_000) by
|
||||||
# echoing the default kwarg.
|
# echoing the default kwarg.
|
||||||
|
|
@ -50,14 +78,20 @@ def test_default_when_config_missing():
|
||||||
return default
|
return default
|
||||||
with patch("app.instance_config.get_value", side_effect=fake):
|
with patch("app.instance_config.get_value", side_effect=fake):
|
||||||
apply_bq_session_settings(conn)
|
apply_bq_session_settings(conn)
|
||||||
assert conn.calls == ["SET bq_query_timeout_ms = 600000"]
|
assert conn.calls == [
|
||||||
|
"SET bq_query_timeout_ms = 600000",
|
||||||
|
"SELECT current_setting('bq_query_timeout_ms')",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_explicit_value():
|
def test_explicit_value():
|
||||||
conn = _RecordingConn()
|
conn = _RecordingConn()
|
||||||
with _patched_get_value(900_000):
|
with _patched_get_value(900_000):
|
||||||
apply_bq_session_settings(conn)
|
apply_bq_session_settings(conn)
|
||||||
assert conn.calls == ["SET bq_query_timeout_ms = 900000"]
|
assert conn.calls == [
|
||||||
|
"SET bq_query_timeout_ms = 900000",
|
||||||
|
"SELECT current_setting('bq_query_timeout_ms')",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_zero_sentinel_leaves_extension_default():
|
def test_zero_sentinel_leaves_extension_default():
|
||||||
|
|
@ -95,19 +129,63 @@ def test_string_numeric_is_coerced():
|
||||||
conn = _RecordingConn()
|
conn = _RecordingConn()
|
||||||
with _patched_get_value("750000"):
|
with _patched_get_value("750000"):
|
||||||
apply_bq_session_settings(conn)
|
apply_bq_session_settings(conn)
|
||||||
assert conn.calls == ["SET bq_query_timeout_ms = 750000"]
|
assert conn.calls == [
|
||||||
|
"SET bq_query_timeout_ms = 750000",
|
||||||
|
"SELECT current_setting('bq_query_timeout_ms')",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_set_failure_does_not_propagate():
|
def test_set_failure_does_not_propagate(caplog):
|
||||||
"""Older DuckDB BQ extension versions may not recognise the setting.
|
"""Older DuckDB BQ extension versions may not recognise the setting.
|
||||||
The function must fail-soft so a session that was otherwise healthy
|
The function must fail-soft so a session that was otherwise healthy
|
||||||
keeps working — just with the extension's built-in default timeout."""
|
keeps working — just with the extension's built-in default timeout.
|
||||||
|
The failure is logged at WARNING so an operator who hits the 90 s
|
||||||
|
extension default unexpectedly can see why."""
|
||||||
conn = _RecordingConn(raise_on="SET bq_query_timeout_ms")
|
conn = _RecordingConn(raise_on="SET bq_query_timeout_ms")
|
||||||
with _patched_get_value(600_000):
|
with _patched_get_value(600_000):
|
||||||
# Must not raise.
|
with caplog.at_level("WARNING", logger="connectors.bigquery.access"):
|
||||||
apply_bq_session_settings(conn)
|
# Must not raise.
|
||||||
# The SET was attempted (recorded before the exception).
|
apply_bq_session_settings(conn)
|
||||||
|
# The SET was attempted (recorded before the exception); no readback
|
||||||
|
# because the SET path raised before reaching it.
|
||||||
assert conn.calls == ["SET bq_query_timeout_ms = 600000"]
|
assert conn.calls == ["SET bq_query_timeout_ms = 600000"]
|
||||||
|
assert any(
|
||||||
|
"SET bq_query_timeout_ms=600000 failed" in r.message
|
||||||
|
for r in caplog.records
|
||||||
|
), "expected a WARNING surfacing the silent-failure regression that hid 90 s timeouts"
|
||||||
|
|
||||||
|
|
||||||
|
def test_setting_mismatch_is_logged(caplog):
|
||||||
|
"""If the extension accepts the SET silently but doesn't actually apply
|
||||||
|
it (some failure modes), the readback verification must surface the
|
||||||
|
mismatch via WARNING so operators can diagnose."""
|
||||||
|
conn = _RecordingConn()
|
||||||
|
# Simulate extension ignoring the SET: keep the readback value at
|
||||||
|
# whatever it was before (None — extension default in effect).
|
||||||
|
conn._reported_setting = None # pre-seed: readback returns None
|
||||||
|
with _patched_get_value(600_000):
|
||||||
|
with caplog.at_level("WARNING", logger="connectors.bigquery.access"):
|
||||||
|
# _RecordingConn echoes the SET into _reported_setting on observe;
|
||||||
|
# to simulate "extension ignored SET" we override execute() to
|
||||||
|
# NOT update the setting on SET.
|
||||||
|
original_execute = conn.execute
|
||||||
|
|
||||||
|
def execute_without_capture(sql: str):
|
||||||
|
conn.calls.append(sql)
|
||||||
|
if sql.startswith(_RecordingConn.SET_PREFIX):
|
||||||
|
# Don't update _reported_setting → readback returns None
|
||||||
|
return _RecordingResult(conn._reported_setting)
|
||||||
|
return _RecordingResult(conn._reported_setting)
|
||||||
|
|
||||||
|
conn.execute = execute_without_capture # type: ignore[method-assign]
|
||||||
|
try:
|
||||||
|
apply_bq_session_settings(conn)
|
||||||
|
finally:
|
||||||
|
conn.execute = original_execute # type: ignore[method-assign]
|
||||||
|
assert any(
|
||||||
|
"current_setting reports" in r.message
|
||||||
|
for r in caplog.records
|
||||||
|
), "expected a WARNING when the readback disagrees with the SET"
|
||||||
|
|
||||||
|
|
||||||
def test_no_app_config_module_silently_skipped():
|
def test_no_app_config_module_silently_skipped():
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue