translate_bq_error previously mapped BQ's responseTooLarge failure mode
to bq_bad_request (HTTP 400 with the raw upstream message). The user-
facing implication ('your SQL has a syntax error') is wrong -- the root
cause is query shape (BQ refused to return the result inline because
it exceeded the response size limit), and the actionable remediation is
'narrow the WHERE clause, aggregate further, or use a materialized
table'.
Add bq_response_too_large as a first-class BqAccessError kind (also 400)
with a canonical hint message; original BQ message preserved in details
for operator debugging. Detection is substring-based on 'response too
large' and fires before the generic BadRequest path so the dedicated
mapping always wins. Affects every BQ-touching path since they all
share translate_bq_error -- /api/query, /api/v2/{scan,sample,schema},
materialize.
774 lines
35 KiB
Python
774 lines
35 KiB
Python
"""Tests for connectors/bigquery/access.py — the BqAccess facade."""
|
|
import pytest
|
|
import threading
|
|
|
|
|
|
class TestBqProjects:
|
|
def test_bq_projects_is_frozen_dataclass(self):
|
|
from connectors.bigquery.access import BqProjects
|
|
p = BqProjects(billing="b", data="d")
|
|
assert p.billing == "b"
|
|
assert p.data == "d"
|
|
with pytest.raises(Exception): # FrozenInstanceError or AttributeError
|
|
p.billing = "other"
|
|
|
|
|
|
class TestBqAccessError:
|
|
def test_carries_kind_message_details(self):
|
|
from connectors.bigquery.access import BqAccessError
|
|
e = BqAccessError("my_kind", "boom", {"foo": "bar"})
|
|
assert e.kind == "my_kind"
|
|
assert e.message == "boom"
|
|
assert e.details == {"foo": "bar"}
|
|
assert str(e) == "boom"
|
|
|
|
def test_default_details_is_empty_dict(self):
|
|
from connectors.bigquery.access import BqAccessError
|
|
e = BqAccessError("k", "m")
|
|
assert e.details == {}
|
|
|
|
def test_http_status_map_covers_all_kinds(self):
|
|
from connectors.bigquery.access import BqAccessError
|
|
expected = {
|
|
"not_configured": 500,
|
|
"bq_lib_missing": 500,
|
|
"auth_failed": 502,
|
|
"cross_project_forbidden": 502,
|
|
"bq_forbidden": 502,
|
|
"bq_bad_request": 400,
|
|
"bq_upstream_error": 502,
|
|
# User-facing class for "Response too large to return" — an
|
|
# upstream BQ refusal, but caused by query shape (too many rows
|
|
# to fit in a single jobs.query response) rather than auth or
|
|
# syntax. 400 so the user sees an actionable error and not a
|
|
# 502 that suggests "BQ is broken".
|
|
"bq_response_too_large": 400,
|
|
}
|
|
assert BqAccessError.HTTP_STATUS == expected
|
|
|
|
|
|
class TestTranslateBqError:
|
|
def setup_method(self):
|
|
from connectors.bigquery.access import BqProjects
|
|
self.projects = BqProjects(billing="bill", data="data")
|
|
|
|
def test_passes_through_BqAccessError(self):
|
|
"""CRITICAL: bq.client() / bq.duckdb_session() raise BqAccessError directly
|
|
for bq_lib_missing / auth_failed. translate_bq_error must pass them through,
|
|
not reclassify as 'unknown' and re-raise."""
|
|
from connectors.bigquery.access import BqAccessError, translate_bq_error
|
|
original = BqAccessError("bq_lib_missing", "no google lib")
|
|
result = translate_bq_error(original, self.projects, bad_request_status="client_error")
|
|
assert result is original
|
|
|
|
def test_forbidden_serviceusage_to_cross_project(self):
|
|
from google.api_core.exceptions import Forbidden
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = Forbidden("Permission denied: serviceusage.services.use on project foo")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="client_error")
|
|
assert result.kind == "cross_project_forbidden"
|
|
assert "billing_project" in result.details
|
|
assert "hint" in result.details
|
|
|
|
def test_forbidden_no_serviceusage_to_bq_forbidden(self):
|
|
from google.api_core.exceptions import Forbidden
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = Forbidden("Permission denied on table-level ACL")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="client_error")
|
|
assert result.kind == "bq_forbidden"
|
|
|
|
def test_forbidden_diff_projects_no_serviceusage_still_bq_forbidden(self):
|
|
"""billing != data is the NORMAL cross-project setup, not a signal of failure.
|
|
Heuristic must rely on 'serviceusage' substring only."""
|
|
from google.api_core.exceptions import Forbidden
|
|
from connectors.bigquery.access import translate_bq_error, BqProjects
|
|
e = Forbidden("Permission denied on table-level ACL")
|
|
result = translate_bq_error(e, BqProjects(billing="b", data="d"),
|
|
bad_request_status="client_error")
|
|
assert result.kind == "bq_forbidden" # NOT cross_project_forbidden
|
|
|
|
def test_bad_request_client_error_to_bq_bad_request_400(self):
|
|
from google.api_core.exceptions import BadRequest
|
|
from connectors.bigquery.access import translate_bq_error, BqAccessError
|
|
e = BadRequest("Syntax error at line 1")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="client_error")
|
|
assert result.kind == "bq_bad_request"
|
|
assert BqAccessError.HTTP_STATUS[result.kind] == 400
|
|
|
|
def test_bad_request_upstream_error_to_bq_upstream_error_502(self):
|
|
from google.api_core.exceptions import BadRequest
|
|
from connectors.bigquery.access import translate_bq_error, BqAccessError
|
|
e = BadRequest("malformed identifier")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="upstream_error")
|
|
assert result.kind == "bq_upstream_error"
|
|
assert BqAccessError.HTTP_STATUS[result.kind] == 502
|
|
|
|
def test_other_google_api_error_to_bq_upstream_error(self):
|
|
from google.api_core.exceptions import InternalServerError
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = InternalServerError("BQ borked")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="client_error")
|
|
assert result.kind == "bq_upstream_error"
|
|
|
|
def test_unknown_exception_reraises(self):
|
|
from connectors.bigquery.access import translate_bq_error
|
|
with pytest.raises(RuntimeError, match="oops"):
|
|
translate_bq_error(RuntimeError("oops"), self.projects,
|
|
bad_request_status="client_error")
|
|
|
|
def test_duckdb_native_forbidden_classified_via_string_match(self):
|
|
"""The DuckDB bigquery extension is a C++ plugin making its own HTTP
|
|
calls; BQ 403 arrives as duckdb.IOException with 'Forbidden' / '403'
|
|
in the message, NOT as gax.Forbidden. Last-resort heuristic must
|
|
classify these so /scan, /sample, /schema don't fall back to bare 500
|
|
in production. Devin ANALYSIS on PR #138 review."""
|
|
from connectors.bigquery.access import translate_bq_error
|
|
# Simulate what duckdb.IOException looks like — a plain Exception with
|
|
# the BQ error text embedded by the C++ extension's HTTP layer.
|
|
e = Exception("HTTP 403 Forbidden: serviceusage.services.use denied on project x")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="upstream_error")
|
|
assert result.kind == "cross_project_forbidden"
|
|
assert "billing_project" in result.details
|
|
|
|
def test_duckdb_native_forbidden_non_serviceusage(self):
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = Exception("HTTP 403: User does not have permission to access table foo")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="upstream_error")
|
|
assert result.kind == "bq_forbidden"
|
|
|
|
def test_duckdb_native_bad_request_classified_via_string_match(self):
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = Exception("400 Bad Request: Syntax error at line 1")
|
|
result = translate_bq_error(e, self.projects, bad_request_status="client_error")
|
|
assert result.kind == "bq_bad_request"
|
|
|
|
def test_unknown_exception_without_bq_pattern_still_reraises(self):
|
|
"""Heuristic must be specific — random exceptions without HTTP-error
|
|
keywords still re-raise (don't swallow programmer bugs)."""
|
|
from connectors.bigquery.access import translate_bq_error
|
|
with pytest.raises(ValueError, match="not a BQ error"):
|
|
translate_bq_error(ValueError("not a BQ error"), self.projects,
|
|
bad_request_status="client_error")
|
|
|
|
def test_response_too_large_via_gax_bad_request(self):
|
|
"""BQ ``responseTooLarge`` arrives as ``gax.BadRequest`` (HTTP 400
|
|
with a specific `reason` field). Pre-fix this fell through to the
|
|
generic ``bq_bad_request`` mapping — surfacing as a 400 with the
|
|
raw upstream message and no actionable hint. Now it routes to a
|
|
dedicated ``bq_response_too_large`` kind whose message tells the
|
|
user exactly what to do (narrow WHERE / aggregate / use materialized).
|
|
"""
|
|
from google.api_core.exceptions import BadRequest
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = BadRequest("Response too large to return. Consider setting allowLargeResults to true ...")
|
|
result = translate_bq_error(
|
|
e, self.projects, bad_request_status="client_error",
|
|
)
|
|
assert result.kind == "bq_response_too_large", (
|
|
f"got {result.kind!r}; expected dedicated mapping for "
|
|
"'Response too large' to avoid the generic bq_bad_request 400 "
|
|
"with no actionable hint"
|
|
)
|
|
# User-facing message must point at the actionable remediations,
|
|
# not just echo the raw BQ string.
|
|
assert "exceeded" in result.message.lower() or "too large" in result.message.lower()
|
|
assert "where" in result.message.lower() or "aggregate" in result.message.lower() or "materialized" in result.message.lower()
|
|
# Original upstream text preserved in details for operator debugging.
|
|
assert "original" in result.details
|
|
assert "Response too large" in result.details["original"]
|
|
|
|
def test_response_too_large_via_duckdb_native_string(self):
|
|
"""DuckDB-native exceptions (the BQ extension's C++ HTTP path)
|
|
carry the same 'Response too large' marker in plain ``Exception``
|
|
messages — must classify the same way as the gax.BadRequest case."""
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = Exception("HTTP 400: Response too large to return.")
|
|
result = translate_bq_error(
|
|
e, self.projects, bad_request_status="upstream_error",
|
|
)
|
|
assert result.kind == "bq_response_too_large"
|
|
|
|
def test_response_too_large_classification_is_status_independent(self):
|
|
"""The mapping must fire regardless of ``bad_request_status``
|
|
(some callers route via 'upstream_error', others via 'client_error').
|
|
It's the BQ error shape that matters, not who's calling."""
|
|
from google.api_core.exceptions import BadRequest
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = BadRequest("Response too large to return")
|
|
for status in ("client_error", "upstream_error"):
|
|
result = translate_bq_error(e, self.projects, bad_request_status=status)
|
|
assert result.kind == "bq_response_too_large", (
|
|
f"bad_request_status={status!r} routed to {result.kind!r}; "
|
|
"expected bq_response_too_large for both"
|
|
)
|
|
|
|
def test_response_too_large_does_not_trigger_on_unrelated_bad_request(self):
|
|
"""Other BadRequests (syntax errors, malformed identifiers, …)
|
|
must keep going through the generic bq_bad_request mapping — only
|
|
the 'Response too large' substring triggers the dedicated kind."""
|
|
from google.api_core.exceptions import BadRequest
|
|
from connectors.bigquery.access import translate_bq_error
|
|
e = BadRequest("Syntax error at [1:23] near unexpected token")
|
|
result = translate_bq_error(
|
|
e, self.projects, bad_request_status="client_error",
|
|
)
|
|
assert result.kind == "bq_bad_request"
|
|
|
|
|
|
class TestDefaultClientFactory:
|
|
def test_constructs_client_with_billing_project_as_quota(self, monkeypatch):
|
|
"""quota_project_id must be projects.billing, NOT projects.data."""
|
|
from connectors.bigquery.access import _default_client_factory, BqProjects
|
|
|
|
captured = {}
|
|
|
|
class FakeClientOptions:
|
|
def __init__(self, **kwargs):
|
|
captured["client_options_kwargs"] = kwargs
|
|
|
|
class FakeClient:
|
|
def __init__(self, project, client_options):
|
|
captured["project"] = project
|
|
captured["client_options"] = client_options
|
|
|
|
import google.cloud.bigquery as bq_mod
|
|
import google.api_core.client_options as co_mod
|
|
monkeypatch.setattr(bq_mod, "Client", FakeClient)
|
|
monkeypatch.setattr(co_mod, "ClientOptions", FakeClientOptions)
|
|
|
|
_default_client_factory(BqProjects(billing="bill", data="data"))
|
|
|
|
assert captured["project"] == "bill"
|
|
assert captured["client_options_kwargs"]["quota_project_id"] == "bill"
|
|
|
|
def test_raises_bq_lib_missing_on_importerror(self, monkeypatch):
|
|
"""If google-cloud-bigquery is not installed, raise BqAccessError, not ImportError."""
|
|
from connectors.bigquery.access import _default_client_factory, BqProjects, BqAccessError
|
|
import builtins
|
|
real_import = builtins.__import__
|
|
|
|
def fake_import(name, *args, **kwargs):
|
|
if name == "google.cloud" or name.startswith("google.cloud.bigquery"):
|
|
raise ImportError("no google-cloud-bigquery")
|
|
return real_import(name, *args, **kwargs)
|
|
|
|
monkeypatch.setattr(builtins, "__import__", fake_import)
|
|
with pytest.raises(BqAccessError) as exc_info:
|
|
_default_client_factory(BqProjects(billing="b", data="d"))
|
|
assert exc_info.value.kind == "bq_lib_missing"
|
|
|
|
def test_raises_auth_failed_on_default_credentials_error(self, monkeypatch):
|
|
"""bigquery.Client(...) resolves ADC at construction; missing credentials in
|
|
CI / dev raise google.auth.exceptions.DefaultCredentialsError synchronously.
|
|
Must translate to BqAccessError(auth_failed), not propagate raw."""
|
|
from connectors.bigquery.access import _default_client_factory, BqProjects, BqAccessError
|
|
from google.auth.exceptions import DefaultCredentialsError
|
|
|
|
class FakeClient:
|
|
def __init__(self, project, client_options):
|
|
raise DefaultCredentialsError("no ADC")
|
|
|
|
import google.cloud.bigquery as bq_mod
|
|
monkeypatch.setattr(bq_mod, "Client", FakeClient)
|
|
|
|
with pytest.raises(BqAccessError) as exc_info:
|
|
_default_client_factory(BqProjects(billing="b", data="d"))
|
|
assert exc_info.value.kind == "auth_failed"
|
|
assert "no ADC" in exc_info.value.message
|
|
assert "hint" in exc_info.value.details
|
|
|
|
|
|
class TestDefaultDuckdbSessionFactory:
|
|
def test_yields_duckdb_conn_with_secret_set_via_pool(self, monkeypatch):
|
|
"""The pool's first acquire on an empty pool runs the full
|
|
INSTALL/LOAD/SECRET sequence. After the with-block exits the
|
|
connection is RETURNED to the pool (not closed) so the next
|
|
acquire amortizes the extension-load cost.
|
|
|
|
Pre-pool semantics (close-on-exit) are preserved on broken
|
|
entries + on the explicit pool-reset path; covered in
|
|
TestBqSessionPool.
|
|
"""
|
|
from connectors.bigquery.access import (
|
|
_default_duckdb_session_factory, BqProjects,
|
|
_reset_session_pool_for_tests,
|
|
)
|
|
_reset_session_pool_for_tests()
|
|
|
|
executed_sql = []
|
|
|
|
class FakeConn:
|
|
def __init__(self):
|
|
self.closed = False
|
|
def execute(self, sql, params=None):
|
|
executed_sql.append((sql, params))
|
|
class _Result:
|
|
def fetchone(self_inner):
|
|
return (1,)
|
|
return _Result()
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
fake_conn = FakeConn()
|
|
monkeypatch.setattr("duckdb.connect", lambda _: fake_conn)
|
|
monkeypatch.setattr("connectors.bigquery.auth.get_metadata_token", lambda: "tok123")
|
|
|
|
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn:
|
|
assert conn is fake_conn
|
|
# Pool retains the conn — close happens at pool reset / shutdown.
|
|
assert fake_conn.closed is False
|
|
|
|
# Verify INSTALL/LOAD/SECRET sequence ran
|
|
assert any("INSTALL bigquery" in sql for sql, _ in executed_sql)
|
|
assert any("LOAD bigquery" in sql for sql, _ in executed_sql)
|
|
assert any("CREATE OR REPLACE SECRET" in sql and "tok123" in sql for sql, _ in executed_sql)
|
|
|
|
# Explicit pool reset closes the retained entry.
|
|
_reset_session_pool_for_tests()
|
|
assert fake_conn.closed is True
|
|
|
|
def test_closes_on_exception_inside_with_block(self, monkeypatch):
|
|
"""Exceptions inside the with-block leave the underlying conn in
|
|
an unknown state (half-completed query, dirty session); the pool
|
|
treats it as broken and closes it rather than returning to pool.
|
|
"""
|
|
from connectors.bigquery.access import (
|
|
_default_duckdb_session_factory, BqProjects,
|
|
_reset_session_pool_for_tests,
|
|
)
|
|
_reset_session_pool_for_tests()
|
|
|
|
class FakeConn:
|
|
closed = False
|
|
def execute(self, *a, **kw):
|
|
class _Result:
|
|
def fetchone(self_inner):
|
|
return (1,)
|
|
return _Result()
|
|
def close(self): self.closed = True
|
|
|
|
fake_conn = FakeConn()
|
|
monkeypatch.setattr("duckdb.connect", lambda _: fake_conn)
|
|
monkeypatch.setattr("connectors.bigquery.auth.get_metadata_token", lambda: "t")
|
|
|
|
with pytest.raises(RuntimeError, match="boom"):
|
|
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn:
|
|
raise RuntimeError("boom")
|
|
assert fake_conn.closed is True
|
|
|
|
def test_translates_metadata_auth_error_to_auth_failed(self, monkeypatch):
|
|
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects, BqAccessError
|
|
from connectors.bigquery.auth import BQMetadataAuthError
|
|
|
|
def fail():
|
|
raise BQMetadataAuthError("metadata server unreachable")
|
|
|
|
monkeypatch.setattr("connectors.bigquery.auth.get_metadata_token", fail)
|
|
|
|
with pytest.raises(BqAccessError) as exc_info:
|
|
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")):
|
|
pass
|
|
assert exc_info.value.kind == "auth_failed"
|
|
|
|
|
|
class TestBqAccess:
|
|
def test_uses_default_factories_when_none_passed(self, monkeypatch):
|
|
from connectors.bigquery.access import BqAccess, BqProjects
|
|
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
"connectors.bigquery.access._default_client_factory",
|
|
lambda projects: captured.append(("client", projects)) or "FAKE_CLIENT",
|
|
)
|
|
bq = BqAccess(BqProjects(billing="b", data="d"))
|
|
assert bq.client() == "FAKE_CLIENT"
|
|
assert captured == [("client", BqProjects(billing="b", data="d"))]
|
|
|
|
def test_injected_client_factory_overrides_default(self):
|
|
from connectors.bigquery.access import BqAccess, BqProjects
|
|
bq = BqAccess(
|
|
BqProjects(billing="b", data="d"),
|
|
client_factory=lambda projects: "MOCK_CLIENT",
|
|
)
|
|
assert bq.client() == "MOCK_CLIENT"
|
|
|
|
def test_injected_duckdb_session_factory_overrides_default(self):
|
|
from connectors.bigquery.access import BqAccess, BqProjects
|
|
from contextlib import contextmanager
|
|
|
|
@contextmanager
|
|
def fake_session(projects):
|
|
yield "FAKE_CONN"
|
|
|
|
bq = BqAccess(
|
|
BqProjects(billing="b", data="d"),
|
|
duckdb_session_factory=fake_session,
|
|
)
|
|
with bq.duckdb_session() as conn:
|
|
assert conn == "FAKE_CONN"
|
|
|
|
def test_projects_property(self):
|
|
from connectors.bigquery.access import BqAccess, BqProjects
|
|
p = BqProjects(billing="b", data="d")
|
|
bq = BqAccess(p)
|
|
assert bq.projects is p
|
|
|
|
|
|
class TestGetBqAccess:
|
|
def setup_method(self):
|
|
# Clear the cache between tests
|
|
from connectors.bigquery.access import get_bq_access
|
|
get_bq_access.cache_clear()
|
|
|
|
def test_env_var_wins(self, monkeypatch):
|
|
from connectors.bigquery.access import get_bq_access
|
|
monkeypatch.setenv("BIGQUERY_PROJECT", "env-proj")
|
|
bq = get_bq_access()
|
|
assert bq.projects.billing == "env-proj"
|
|
assert bq.projects.data == "env-proj"
|
|
|
|
def test_billing_project_from_yaml_when_no_env(self, monkeypatch):
|
|
from connectors.bigquery.access import get_bq_access
|
|
monkeypatch.delenv("BIGQUERY_PROJECT", raising=False)
|
|
|
|
def fake_get_value(*keys, default=""):
|
|
return {
|
|
("data_source", "bigquery", "billing_project"): "yaml-bill",
|
|
("data_source", "bigquery", "project"): "yaml-data",
|
|
}.get(keys, default)
|
|
|
|
monkeypatch.setattr("app.instance_config.get_value", fake_get_value)
|
|
bq = get_bq_access()
|
|
assert bq.projects.billing == "yaml-bill"
|
|
assert bq.projects.data == "yaml-data"
|
|
|
|
def test_billing_falls_back_to_project_when_no_billing(self, monkeypatch):
|
|
from connectors.bigquery.access import get_bq_access
|
|
monkeypatch.delenv("BIGQUERY_PROJECT", raising=False)
|
|
|
|
def fake_get_value(*keys, default=""):
|
|
return {
|
|
("data_source", "bigquery", "project"): "yaml-data",
|
|
}.get(keys, default)
|
|
|
|
monkeypatch.setattr("app.instance_config.get_value", fake_get_value)
|
|
bq = get_bq_access()
|
|
assert bq.projects.billing == "yaml-data"
|
|
assert bq.projects.data == "yaml-data"
|
|
|
|
def test_returns_sentinel_when_neither_set(self, monkeypatch):
|
|
"""get_bq_access() MUST NOT raise during dep-injection on non-BQ instances —
|
|
that would 500 every v2 endpoint request even for local-source tables.
|
|
Returns a sentinel BqAccess whose client() / duckdb_session() raise
|
|
BqAccessError(not_configured) only when actually called. The endpoint's
|
|
try/except BqAccessError catches that path normally. Devin BUG_0001 on
|
|
PR #138 review."""
|
|
from connectors.bigquery.access import get_bq_access, BqAccessError, BqAccess
|
|
monkeypatch.delenv("BIGQUERY_PROJECT", raising=False)
|
|
monkeypatch.setattr("app.instance_config.get_value", lambda *k, default="": default)
|
|
|
|
bq = get_bq_access()
|
|
assert isinstance(bq, BqAccess)
|
|
|
|
with pytest.raises(BqAccessError) as exc_info:
|
|
bq.client()
|
|
assert exc_info.value.kind == "not_configured"
|
|
assert "billing_project" in exc_info.value.details["hint"].lower() or \
|
|
"project" in exc_info.value.details["hint"].lower()
|
|
|
|
# duckdb_session() is a context manager; the BqAccessError must surface on __enter__
|
|
with pytest.raises(BqAccessError) as exc_info:
|
|
with bq.duckdb_session():
|
|
pass
|
|
assert exc_info.value.kind == "not_configured"
|
|
|
|
def test_is_cached(self, monkeypatch):
|
|
from connectors.bigquery.access import get_bq_access
|
|
monkeypatch.setenv("BIGQUERY_PROJECT", "p")
|
|
a = get_bq_access()
|
|
b = get_bq_access()
|
|
assert a is b
|
|
|
|
def test_fetch_helpers_raise_not_configured_on_sentinel_before_identifier_validation(self, monkeypatch):
|
|
"""Sentinel BqAccess has BqProjects(data=""). v2 fetch helpers must trigger
|
|
bq.client() (which raises BqAccessError(not_configured)) BEFORE calling
|
|
validate_quoted_identifier on the empty string. Otherwise the operator
|
|
sees a confusing HTTP 400 'unsafe_identifier' instead of the intended
|
|
HTTP 500 'not_configured' with hint. Devin BUG_0002 on PR #138 review."""
|
|
from connectors.bigquery.access import get_bq_access, BqAccessError
|
|
from app.api.v2_sample import _fetch_bq_sample
|
|
from app.api.v2_schema import _fetch_bq_schema, _fetch_bq_table_options
|
|
|
|
monkeypatch.delenv("BIGQUERY_PROJECT", raising=False)
|
|
monkeypatch.setattr("app.instance_config.get_value", lambda *k, default="": default)
|
|
bq = get_bq_access()
|
|
assert bq.projects.data == "", "must be the sentinel"
|
|
|
|
# Strict paths surface BqAccessError(not_configured), NOT ValueError(unsafe).
|
|
with pytest.raises(BqAccessError) as exc_info:
|
|
_fetch_bq_sample(bq, "ds", "tbl", 5)
|
|
assert exc_info.value.kind == "not_configured"
|
|
|
|
with pytest.raises(BqAccessError) as exc_info:
|
|
_fetch_bq_schema(bq, "ds", "tbl")
|
|
assert exc_info.value.kind == "not_configured"
|
|
|
|
# Best-effort path returns {} silently.
|
|
assert _fetch_bq_table_options(bq, "ds", "tbl") == {}
|
|
|
|
def test_instance_config_reset_cache_invalidates_get_bq_access(self, monkeypatch):
|
|
"""admin /api/admin/server-config save → instance_config.reset_cache() →
|
|
must also clear get_bq_access cache so v2 endpoints pick up new
|
|
BigQuery project IDs without container restart. Devin ANALYSIS_0004
|
|
on PR #138 review: pre-Phase-2 each request re-read get_value(), so
|
|
admin hot-reload worked. functools.cache on get_bq_access would have
|
|
broken that contract — this test guards against regressing it."""
|
|
from connectors.bigquery.access import get_bq_access
|
|
from app.instance_config import reset_cache
|
|
|
|
monkeypatch.setenv("BIGQUERY_PROJECT", "first")
|
|
bq1 = get_bq_access()
|
|
assert bq1.projects.billing == "first"
|
|
|
|
# Operator updates config and triggers reset_cache via admin API
|
|
monkeypatch.setenv("BIGQUERY_PROJECT", "second")
|
|
reset_cache()
|
|
|
|
bq2 = get_bq_access()
|
|
assert bq2.projects.billing == "second", \
|
|
"get_bq_access must re-resolve after instance_config.reset_cache()"
|
|
assert bq2 is not bq1
|
|
|
|
def test_sentinel_is_cached_per_process(self, monkeypatch):
|
|
"""The sentinel BqAccess is cached like any other return value. Operators
|
|
fixing instance.yaml at runtime must restart the container to pick up the
|
|
change — documented as expected behavior in the spec ('Hot-reload of
|
|
instance.yaml is out of scope')."""
|
|
from connectors.bigquery.access import get_bq_access, BqAccess
|
|
monkeypatch.delenv("BIGQUERY_PROJECT", raising=False)
|
|
monkeypatch.setattr("app.instance_config.get_value", lambda *k, default="": default)
|
|
|
|
a = get_bq_access()
|
|
b = get_bq_access()
|
|
assert a is b
|
|
assert isinstance(a, BqAccess)
|
|
assert a.projects.billing == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DuckDB BQ-extension session pool — amortizes the ~0.5 s INSTALL/LOAD/ATTACH
|
|
# cost across requests by keeping pre-warmed DuckDB connections in a
|
|
# bounded pool. Each acquire reuses an existing connection (refreshing the
|
|
# auth SECRET so token rotation doesn't break long-lived entries) instead
|
|
# of spinning up a fresh DuckDB+extension load every time.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _PoolFakeConn:
|
|
"""Fake DuckDB connection that records executed SQL and supports
|
|
``close()``. Used across pool tests so we can pin behavior without
|
|
booting the real BigQuery extension."""
|
|
_serial = 0
|
|
|
|
def __init__(self):
|
|
type(self)._serial += 1
|
|
self.id = type(self)._serial
|
|
self.closed = False
|
|
self.executed: list[str] = []
|
|
|
|
def execute(self, sql, params=None):
|
|
self.executed.append(sql)
|
|
# Liveness probe: SELECT 1 returns something fetchable.
|
|
class _Result:
|
|
def fetchone(self_inner):
|
|
return (1,)
|
|
def fetchall(self_inner):
|
|
return [(1,)]
|
|
return _Result()
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
|
|
@pytest.fixture
|
|
def reset_pool(monkeypatch):
|
|
"""Reset the BQ session pool singleton between tests so leak-detection
|
|
assertions don't carry state."""
|
|
from connectors.bigquery import access as bq_access_mod
|
|
if hasattr(bq_access_mod, "_reset_session_pool_for_tests"):
|
|
bq_access_mod._reset_session_pool_for_tests()
|
|
monkeypatch.setattr(
|
|
"connectors.bigquery.auth.get_metadata_token",
|
|
lambda: "tok-pool",
|
|
)
|
|
yield
|
|
if hasattr(bq_access_mod, "_reset_session_pool_for_tests"):
|
|
bq_access_mod._reset_session_pool_for_tests()
|
|
|
|
|
|
class TestBqSessionPool:
|
|
def test_pool_reuses_connections_across_acquires(self, monkeypatch, reset_pool):
|
|
"""Acquiring a session, releasing, then acquiring again must return
|
|
the SAME underlying DuckDB connection — no INSTALL/LOAD overhead on
|
|
the second request. This is the whole point of the pool."""
|
|
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
|
|
|
|
# Each duckdb.connect() yields a fresh _PoolFakeConn so we can tell
|
|
# them apart by id.
|
|
connections_made = []
|
|
def fake_connect(_path):
|
|
c = _PoolFakeConn()
|
|
connections_made.append(c)
|
|
return c
|
|
monkeypatch.setattr("duckdb.connect", fake_connect)
|
|
|
|
# First acquire: pool is empty, factory builds a new entry.
|
|
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1:
|
|
id1 = conn1.id
|
|
|
|
# Second acquire: pool has a warm entry, must hand back the same conn.
|
|
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2:
|
|
id2 = conn2.id
|
|
|
|
assert id1 == id2, (
|
|
"expected the same pooled connection across two acquires; "
|
|
f"got id1={id1}, id2={id2}"
|
|
)
|
|
# And we must NOT have re-INSTALLed/LOADed the extension on reuse —
|
|
# only one duckdb.connect() call ever happened.
|
|
assert len(connections_made) == 1, (
|
|
f"pool re-built the conn on second acquire; created {len(connections_made)}"
|
|
)
|
|
|
|
def test_pool_size_is_configurable(self, monkeypatch, reset_pool):
|
|
"""``data_source.bigquery.session_pool_size`` controls the upper
|
|
bound on warm entries. Above the cap, releasing extra entries
|
|
closes them rather than retaining."""
|
|
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
|
|
|
|
def fake_get_value(*keys, default=None):
|
|
if keys == ("data_source", "bigquery", "session_pool_size"):
|
|
return 2 # tiny pool
|
|
if keys == ("data_source", "bigquery", "query_timeout_ms"):
|
|
return 0 # don't try to SET timeout in tests
|
|
return default
|
|
|
|
monkeypatch.setattr("app.instance_config.get_value", fake_get_value)
|
|
monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn())
|
|
|
|
# Acquire 3 in parallel to force 3 simultaneous entries.
|
|
cm1 = _default_duckdb_session_factory(BqProjects(billing="b", data="d"))
|
|
c1 = cm1.__enter__()
|
|
cm2 = _default_duckdb_session_factory(BqProjects(billing="b", data="d"))
|
|
c2 = cm2.__enter__()
|
|
cm3 = _default_duckdb_session_factory(BqProjects(billing="b", data="d"))
|
|
c3 = cm3.__enter__()
|
|
|
|
# Release all three. The 3rd release should close the conn since
|
|
# the pool already has 2.
|
|
cm1.__exit__(None, None, None)
|
|
cm2.__exit__(None, None, None)
|
|
cm3.__exit__(None, None, None)
|
|
|
|
# At least one of the three connections must be closed (pool overflow).
|
|
closed_count = sum(1 for c in (c1, c2, c3) if c.closed)
|
|
assert closed_count >= 1, (
|
|
"pool retained more than its configured size; expected at least "
|
|
f"one close. closed_count={closed_count}"
|
|
)
|
|
# Pool retained at most `size` entries, so total live + closed = 3,
|
|
# closed >= 1 means pool size <= 2.
|
|
assert closed_count == 1
|
|
|
|
def test_pool_replaces_broken_connection(self, monkeypatch, reset_pool):
|
|
"""If a pooled entry's liveness check fails on acquire (the
|
|
underlying DuckDB conn was closed externally, BQ extension state
|
|
corrupted, etc.), the pool must drop it and build a fresh entry —
|
|
not hand the broken one to the caller."""
|
|
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
|
|
|
|
# First acquire creates entry #1; we'll then mark it broken.
|
|
all_conns: list[_PoolFakeConn] = []
|
|
def fake_connect(_path):
|
|
c = _PoolFakeConn()
|
|
all_conns.append(c)
|
|
return c
|
|
monkeypatch.setattr("duckdb.connect", fake_connect)
|
|
|
|
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn1:
|
|
id1 = conn1.id
|
|
# Simulate corruption: make execute() raise on next call.
|
|
def broken_execute(*a, **kw):
|
|
raise RuntimeError("connection broken")
|
|
conn1.execute = broken_execute # type: ignore[assignment]
|
|
|
|
# Second acquire must skip the broken entry and build a fresh one.
|
|
with _default_duckdb_session_factory(BqProjects(billing="b", data="d")) as conn2:
|
|
id2 = conn2.id
|
|
|
|
assert id1 != id2, (
|
|
f"expected a fresh conn after broken-pool reaper; both acquires "
|
|
f"returned id={id1}"
|
|
)
|
|
assert len(all_conns) >= 2
|
|
|
|
def test_pool_handles_reentrant_acquires_thread_safe(self, monkeypatch, reset_pool):
|
|
"""Concurrent acquires from multiple threads must never hand the
|
|
same underlying DuckDB conn to two threads at once. The pool's
|
|
lock acquires/releases are the load-bearing invariant here.
|
|
"""
|
|
from connectors.bigquery.access import _default_duckdb_session_factory, BqProjects
|
|
|
|
monkeypatch.setattr("duckdb.connect", lambda _: _PoolFakeConn())
|
|
|
|
active_ids: set = set()
|
|
active_lock = threading.Lock()
|
|
violations: list = []
|
|
|
|
def worker():
|
|
for _ in range(20):
|
|
with _default_duckdb_session_factory(
|
|
BqProjects(billing="b", data="d"),
|
|
) as conn:
|
|
with active_lock:
|
|
if conn.id in active_ids:
|
|
violations.append(conn.id)
|
|
active_ids.add(conn.id)
|
|
# Hold briefly to give other threads a chance to race.
|
|
time.sleep(0.001)
|
|
with active_lock:
|
|
active_ids.discard(conn.id)
|
|
|
|
import time
|
|
threads = [threading.Thread(target=worker) for _ in range(4)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert not violations, (
|
|
f"pool handed the same conn to multiple threads concurrently: "
|
|
f"{violations}"
|
|
)
|
|
|
|
def test_pool_does_not_apply_when_factory_is_injected(self, monkeypatch, reset_pool):
|
|
"""Test fixtures that inject a custom ``duckdb_session_factory``
|
|
(e.g. tests/conftest.py's ``bq_access`` fixture) MUST bypass the
|
|
pool entirely — otherwise their nullcontext-wrapped fake would
|
|
get retained between tests and corrupt downstream assertions.
|
|
"""
|
|
from connectors.bigquery.access import BqAccess, BqProjects
|
|
from contextlib import contextmanager
|
|
|
|
sentinel = object()
|
|
|
|
@contextmanager
|
|
def custom_factory(_projects):
|
|
yield sentinel
|
|
|
|
bq = BqAccess(
|
|
BqProjects(billing="b", data="d"),
|
|
duckdb_session_factory=custom_factory,
|
|
)
|
|
with bq.duckdb_session() as conn:
|
|
assert conn is sentinel
|