diff --git a/connectors/bigquery/access.py b/connectors/bigquery/access.py index e46a877..48e4e9f 100644 --- a/connectors/bigquery/access.py +++ b/connectors/bigquery/access.py @@ -249,12 +249,24 @@ def apply_bq_session_settings(conn) -> None: Call AFTER ``LOAD bigquery`` on every DuckDB session that touches BQ: BqAccess's session factory, the standalone extractor in - ``connectors/bigquery/extractor.py``, and the orchestrator's - ``_remote_attach`` path in ``src/orchestrator.py``. + ``connectors/bigquery/extractor.py``, the orchestrator's + ``_remote_attach`` path in ``src/orchestrator.py``, and ``src/db.py``'s + read-only analytics-DB factory (called from ``_reattach_remote_extensions`` + plus a belt-and-suspenders call from ``get_analytics_db_readonly`` itself). + + SET failures are logged at WARNING level (previously silent) so operators + can diagnose timeouts that surface as the extension default 90 s when the + intended value was higher. The applied value is verified via + ``current_setting('bq_query_timeout_ms')``; a mismatch is also logged. """ try: from app.instance_config import get_value - except Exception: + except Exception as e: + logger.warning( + "apply_bq_session_settings: instance_config unavailable (%s); " + "extension default bq_query_timeout_ms (90 s) will apply", + e, + ) return raw = get_value( "data_source", "bigquery", "query_timeout_ms", default=600_000, @@ -262,16 +274,61 @@ def apply_bq_session_settings(conn) -> None: try: ms = int(raw) if raw is not None else 0 except (TypeError, ValueError): + logger.warning( + "apply_bq_session_settings: query_timeout_ms=%r is not an int; " + "extension default (90 s) will apply", + raw, + ) return if ms <= 0: + # Operator opt-out: leave extension default in place. Log INFO so the + # choice shows up in startup logs without being noisy. + logger.info( + "apply_bq_session_settings: query_timeout_ms=%d (≤0); extension " + "default bq_query_timeout_ms (90 s) will apply", + ms, + ) return try: conn.execute(f"SET bq_query_timeout_ms = {int(ms)}") - except Exception: - # Fail-soft: extension version may not support the setting, or the - # session may already have been frozen — leave the default rather - # than poisoning the whole session. - pass + except Exception as e: + # Most common cause: the BigQuery extension is not loaded on this + # connection yet (caller forgot the `LOAD bigquery` step), or the + # installed extension version pre-dates the setting. Either way the + # 90 s default sticks and remote queries time out unexpectedly. + # Surface this — silent fallback was the bug behind real outages. + logger.warning( + "apply_bq_session_settings: SET bq_query_timeout_ms=%d failed (%s); " + "extension default (90 s) will apply. Likely cause: BigQuery " + "extension not loaded on this connection, or the installed " + "extension version does not support this setting.", + ms, e, + ) + return + # Verify the setting actually landed — protects against silent ignores + # the extension might do in some failure modes. + try: + result = conn.execute( + "SELECT current_setting('bq_query_timeout_ms')" + ).fetchone() + actual = int(result[0]) if result and result[0] is not None else None + except Exception as e: + logger.warning( + "apply_bq_session_settings: could not read back " + "bq_query_timeout_ms (%s); cannot verify setting was applied", + e, + ) + return + if actual != ms: + logger.warning( + "apply_bq_session_settings: requested bq_query_timeout_ms=%d but " + "current_setting reports %r — extension may have ignored the SET", + ms, actual, + ) + else: + logger.debug( + "apply_bq_session_settings: bq_query_timeout_ms=%d applied", ms, + ) class BqAccess: diff --git a/src/db.py b/src/db.py index d6c1ba4..261c7bc 100644 --- a/src/db.py +++ b/src/db.py @@ -690,10 +690,21 @@ def _reattach_remote_extensions( conn.execute( f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" ) + # Apply BQ session settings on every BQ-extension attach, + # not only the metadata-token branch above. Previously the + # token-based branch fell through without setting + # bq_query_timeout_ms, leaving the 90 s extension default + # in place and causing "remote query timeout" surprises. + if extension == "bigquery": + from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) else: conn.execute( f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" ) + if extension == "bigquery": + from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) attached_dbs.add(alias) logger.debug("Re-attached remote source %s via %s extension", alias, extension) except Exception as e: diff --git a/src/orchestrator.py b/src/orchestrator.py index 914bcd6..47f1737 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -512,11 +512,22 @@ class SyncOrchestrator: conn.execute( f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" ) + # Apply BQ session settings on every BQ-extension attach, + # not only the metadata-token branch above. The token-based + # branch previously fell through without calling + # apply_bq_session_settings, leaving the 90 s extension + # default for bq_query_timeout_ms in place. + if extension == "bigquery": + from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) else: # No auth required (or extension handles it via env automatically) conn.execute( f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" ) + if extension == "bigquery": + from connectors.bigquery.access import apply_bq_session_settings + apply_bq_session_settings(conn) logger.info("Attached remote source %s via %s extension", alias, extension) except Exception as e: diff --git a/tests/test_bq_query_timeout.py b/tests/test_bq_query_timeout.py index fa64e08..91c7f54 100644 --- a/tests/test_bq_query_timeout.py +++ b/tests/test_bq_query_timeout.py @@ -13,18 +13,45 @@ from connectors.bigquery.access import apply_bq_session_settings class _RecordingConn: """Minimal DuckDB-conn stand-in that records execute() calls. - apply_bq_session_settings only calls .execute(); we don't need a - real DuckDB to verify the SET command shape. + apply_bq_session_settings calls .execute() to issue the SET and the + follow-up ``current_setting`` readback (added so the function can + verify the extension actually accepted the setting). The readback + expects .fetchone() on the result — wire it to echo the SET value + so the verification path succeeds when nothing rejects the SET. """ + SETTING_NAME = "bq_query_timeout_ms" + SET_PREFIX = f"SET {SETTING_NAME} = " + def __init__(self, raise_on=None): self.calls: list[str] = [] self.raise_on = raise_on + # Last value the extension would report from + # current_setting('bq_query_timeout_ms') — set when SET is observed, + # echoed back from .fetchone(). + self._reported_setting: str | None = None def execute(self, sql: str): self.calls.append(sql) if self.raise_on and self.raise_on in sql: raise RuntimeError(f"simulated failure on: {sql}") + if sql.startswith(self.SET_PREFIX): + # Capture the value the production code asked the extension to + # apply so the readback below echoes a consistent answer. + self._reported_setting = sql[len(self.SET_PREFIX):] + return _RecordingResult(self._reported_setting) + + +class _RecordingResult: + """Stand-in for the DuckDB result of ``SELECT current_setting(...)``.""" + + def __init__(self, value): + self._value = value + + def fetchone(self): + # current_setting returns a one-tuple. None is the realistic + # answer when the extension doesn't have the setting registered. + return (self._value,) def _patched_get_value(value): @@ -42,7 +69,8 @@ def _patched_get_value(value): def test_default_when_config_missing(): """When get_value returns the default (None passed through, default arg used), apply_bq_session_settings should fall back to the bumped - 600 000 ms default and emit the SET.""" + 600 000 ms default, emit the SET, and verify it landed via the + current_setting readback.""" conn = _RecordingConn() # Simulate get_value returning the default we passed (600_000) by # echoing the default kwarg. @@ -50,14 +78,20 @@ def test_default_when_config_missing(): return default with patch("app.instance_config.get_value", side_effect=fake): apply_bq_session_settings(conn) - assert conn.calls == ["SET bq_query_timeout_ms = 600000"] + assert conn.calls == [ + "SET bq_query_timeout_ms = 600000", + "SELECT current_setting('bq_query_timeout_ms')", + ] def test_explicit_value(): conn = _RecordingConn() with _patched_get_value(900_000): apply_bq_session_settings(conn) - assert conn.calls == ["SET bq_query_timeout_ms = 900000"] + assert conn.calls == [ + "SET bq_query_timeout_ms = 900000", + "SELECT current_setting('bq_query_timeout_ms')", + ] def test_zero_sentinel_leaves_extension_default(): @@ -95,19 +129,63 @@ def test_string_numeric_is_coerced(): conn = _RecordingConn() with _patched_get_value("750000"): apply_bq_session_settings(conn) - assert conn.calls == ["SET bq_query_timeout_ms = 750000"] + assert conn.calls == [ + "SET bq_query_timeout_ms = 750000", + "SELECT current_setting('bq_query_timeout_ms')", + ] -def test_set_failure_does_not_propagate(): +def test_set_failure_does_not_propagate(caplog): """Older DuckDB BQ extension versions may not recognise the setting. The function must fail-soft so a session that was otherwise healthy - keeps working — just with the extension's built-in default timeout.""" + keeps working — just with the extension's built-in default timeout. + The failure is logged at WARNING so an operator who hits the 90 s + extension default unexpectedly can see why.""" conn = _RecordingConn(raise_on="SET bq_query_timeout_ms") with _patched_get_value(600_000): - # Must not raise. - apply_bq_session_settings(conn) - # The SET was attempted (recorded before the exception). + with caplog.at_level("WARNING", logger="connectors.bigquery.access"): + # Must not raise. + apply_bq_session_settings(conn) + # The SET was attempted (recorded before the exception); no readback + # because the SET path raised before reaching it. assert conn.calls == ["SET bq_query_timeout_ms = 600000"] + assert any( + "SET bq_query_timeout_ms=600000 failed" in r.message + for r in caplog.records + ), "expected a WARNING surfacing the silent-failure regression that hid 90 s timeouts" + + +def test_setting_mismatch_is_logged(caplog): + """If the extension accepts the SET silently but doesn't actually apply + it (some failure modes), the readback verification must surface the + mismatch via WARNING so operators can diagnose.""" + conn = _RecordingConn() + # Simulate extension ignoring the SET: keep the readback value at + # whatever it was before (None — extension default in effect). + conn._reported_setting = None # pre-seed: readback returns None + with _patched_get_value(600_000): + with caplog.at_level("WARNING", logger="connectors.bigquery.access"): + # _RecordingConn echoes the SET into _reported_setting on observe; + # to simulate "extension ignored SET" we override execute() to + # NOT update the setting on SET. + original_execute = conn.execute + + def execute_without_capture(sql: str): + conn.calls.append(sql) + if sql.startswith(_RecordingConn.SET_PREFIX): + # Don't update _reported_setting → readback returns None + return _RecordingResult(conn._reported_setting) + return _RecordingResult(conn._reported_setting) + + conn.execute = execute_without_capture # type: ignore[method-assign] + try: + apply_bq_session_settings(conn) + finally: + conn.execute = original_execute # type: ignore[method-assign] + assert any( + "current_setting reports" in r.message + for r in caplog.records + ), "expected a WARNING when the readback disagrees with the SET" def test_no_app_config_module_silently_skipped():