Pass explicit bqstorage_client to to_arrow_iterable() for Storage API

Without explicit bqstorage_client parameter, to_arrow_iterable() silently
falls back to REST API pagination (~5K rows/sec). With explicit client,
it uses parallel gRPC streams via BQ Storage API (~300K rows/sec).

No temp table materialization - BQ already writes query results to an
internal temp table automatically. We just tell the reader to use the
fast gRPC path instead of slow HTTP pagination.
This commit is contained in:
Petr 2026-03-12 10:51:44 +01:00
parent 4f74543a12
commit 85c87ec375

View file

@ -21,6 +21,13 @@ from datetime import datetime, timedelta
import pyarrow as pa
from google.cloud import bigquery
try:
from google.cloud import bigquery_storage_v1
_HAS_BQ_STORAGE = True
except ImportError:
_HAS_BQ_STORAGE = False
from src.config import get_config
@ -97,6 +104,20 @@ class BigQueryClient:
client_kwargs["location"] = self.location
self.client = bigquery.Client(**client_kwargs)
# BQ Storage API client for fast parallel reads (gRPC streams).
# Without explicit bqstorage_client, to_arrow_iterable() silently
# falls back to slow REST API pagination (~5K rows/sec vs ~300K rows/sec).
if _HAS_BQ_STORAGE:
try:
self.bqstorage_client = bigquery_storage_v1.BigQueryReadClient()
logger.info("BQ Storage API client initialized (fast parallel gRPC reads)")
except Exception as e:
self.bqstorage_client = None
logger.warning(f"BQ Storage API client failed to initialize: {e}")
else:
self.bqstorage_client = None
logger.info("BQ Storage API not available (install google-cloud-bigquery-storage)")
# Metadata cache
config = get_config()
self.metadata_cache: Dict[str, Dict[str, Any]] = {}
@ -275,10 +296,13 @@ class BigQueryClient:
query_job = self.client.query(sql, job_config=job_config)
# Try BQ Storage API first (faster for large results).
# Use BQ Storage API for fast reads (parallel gRPC) if available.
# Fall back to REST API if SA lacks bigquery.readsessions.create permission.
try:
arrow_table = query_job.to_arrow()
if self.bqstorage_client:
arrow_table = query_job.to_arrow(bqstorage_client=self.bqstorage_client)
else:
arrow_table = query_job.to_arrow()
except Exception as storage_err:
if "readsessions" in str(storage_err) or "PERMISSION_DENIED" in str(storage_err):
logger.warning(
@ -323,10 +347,16 @@ class BigQueryClient:
# (QueryJob itself only has to_arrow(), not to_arrow_iterable())
row_iter = query_job.result()
# Try BQ Storage API first (faster, parallel gRPC streams).
# Fall back to REST API if SA lacks bigquery.readsessions.create permission.
# IMPORTANT: to_arrow_iterable() requires explicit bqstorage_client
# to use BQ Storage API (parallel gRPC streams, ~300K rows/sec).
# Without it, silently falls back to REST pagination (~5K rows/sec).
# This is critical when querying VIEWS (DataView): BQ materializes
# the view into a temp table, and Storage API reads from that temp table.
try:
batch_iter = row_iter.to_arrow_iterable()
storage_kwargs = {}
if self.bqstorage_client:
storage_kwargs["bqstorage_client"] = self.bqstorage_client
batch_iter = row_iter.to_arrow_iterable(**storage_kwargs)
# Probe first batch to detect Storage API permission errors early
first_batch = next(batch_iter, None)
if first_batch is not None:
@ -368,7 +398,11 @@ class BigQueryClient:
if row_filter:
sql += f" WHERE {row_filter}"
logger.info(f"Streaming BQ table: {table_id} (filter: {row_filter or 'none'})")
logger.info(
f"Streaming BQ table: {table_id} "
f"(filter: {row_filter or 'none'}, "
f"storage_api={'yes' if self.bqstorage_client else 'no'})"
)
yield from self.query_to_arrow_batches(sql)
def read_table(
@ -442,6 +476,7 @@ class BigQueryClient:
start: str,
end: Optional[str] = None,
columns: Optional[List[str]] = None,
column_type: str = "TIMESTAMP",
) -> pa.Table:
"""
Read data within a partition range.
@ -452,6 +487,7 @@ class BigQueryClient:
start: Start date/timestamp (inclusive)
end: End date/timestamp (exclusive). If None, reads to present.
columns: Optional list of columns to select
column_type: BQ SQL type for the partition column ("DATE", "TIMESTAMP", "DATETIME")
Returns:
PyArrow Table with partition range data
@ -463,13 +499,13 @@ class BigQueryClient:
f"WHERE `{partition_column}` >= @start_value"
)
params = [
bigquery.ScalarQueryParameter("start_value", "TIMESTAMP", start),
bigquery.ScalarQueryParameter("start_value", column_type, start),
]
if end:
sql += f" AND `{partition_column}` < @end_value"
params.append(
bigquery.ScalarQueryParameter("end_value", "TIMESTAMP", end),
bigquery.ScalarQueryParameter("end_value", column_type, end),
)
logger.info(
@ -477,6 +513,53 @@ class BigQueryClient:
)
return self.query_to_arrow(sql, params=params)
def read_table_partitioned_streaming(
self,
table_id: str,
partition_column: str,
start: str,
end: Optional[str] = None,
columns: Optional[List[str]] = None,
column_type: str = "TIMESTAMP",
):
"""
Read data within a partition range as streaming RecordBatches (constant memory).
Unlike read_table_partitioned(), this does NOT load entire result into memory.
Each RecordBatch is a small chunk that can be written to disk immediately.
Args:
table_id: Full table ID
partition_column: Partition column name
start: Start date/timestamp (inclusive)
end: End date/timestamp (exclusive). If None, reads to present.
columns: Optional list of columns to select
column_type: BQ SQL type for the partition column ("DATE", "TIMESTAMP", "DATETIME")
Yields:
pyarrow.RecordBatch objects
"""
select_cols = ", ".join(f"`{c}`" for c in columns) if columns else "*"
sql = (
f"SELECT {select_cols} FROM `{table_id}` "
f"WHERE `{partition_column}` >= @start_value"
)
params = [
bigquery.ScalarQueryParameter("start_value", column_type, start),
]
if end:
sql += f" AND `{partition_column}` < @end_value"
params.append(
bigquery.ScalarQueryParameter("end_value", column_type, end),
)
logger.info(
f"Partitioned streaming read: {table_id} [{start} .. {end or 'now'})"
)
yield from self.query_to_arrow_batches(sql, params=params)
def discover_all_tables(self, dataset_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""
List all tables in the project (or specific dataset).