diff --git a/connectors/bigquery/client.py b/connectors/bigquery/client.py index 90c1161..59b0341 100644 --- a/connectors/bigquery/client.py +++ b/connectors/bigquery/client.py @@ -21,6 +21,13 @@ from datetime import datetime, timedelta import pyarrow as pa from google.cloud import bigquery +try: + from google.cloud import bigquery_storage_v1 + + _HAS_BQ_STORAGE = True +except ImportError: + _HAS_BQ_STORAGE = False + from src.config import get_config @@ -97,6 +104,20 @@ class BigQueryClient: client_kwargs["location"] = self.location self.client = bigquery.Client(**client_kwargs) + # BQ Storage API client for fast parallel reads (gRPC streams). + # Without explicit bqstorage_client, to_arrow_iterable() silently + # falls back to slow REST API pagination (~5K rows/sec vs ~300K rows/sec). + if _HAS_BQ_STORAGE: + try: + self.bqstorage_client = bigquery_storage_v1.BigQueryReadClient() + logger.info("BQ Storage API client initialized (fast parallel gRPC reads)") + except Exception as e: + self.bqstorage_client = None + logger.warning(f"BQ Storage API client failed to initialize: {e}") + else: + self.bqstorage_client = None + logger.info("BQ Storage API not available (install google-cloud-bigquery-storage)") + # Metadata cache config = get_config() self.metadata_cache: Dict[str, Dict[str, Any]] = {} @@ -275,10 +296,13 @@ class BigQueryClient: query_job = self.client.query(sql, job_config=job_config) - # Try BQ Storage API first (faster for large results). + # Use BQ Storage API for fast reads (parallel gRPC) if available. # Fall back to REST API if SA lacks bigquery.readsessions.create permission. try: - arrow_table = query_job.to_arrow() + if self.bqstorage_client: + arrow_table = query_job.to_arrow(bqstorage_client=self.bqstorage_client) + else: + arrow_table = query_job.to_arrow() except Exception as storage_err: if "readsessions" in str(storage_err) or "PERMISSION_DENIED" in str(storage_err): logger.warning( @@ -323,10 +347,16 @@ class BigQueryClient: # (QueryJob itself only has to_arrow(), not to_arrow_iterable()) row_iter = query_job.result() - # Try BQ Storage API first (faster, parallel gRPC streams). - # Fall back to REST API if SA lacks bigquery.readsessions.create permission. + # IMPORTANT: to_arrow_iterable() requires explicit bqstorage_client + # to use BQ Storage API (parallel gRPC streams, ~300K rows/sec). + # Without it, silently falls back to REST pagination (~5K rows/sec). + # This is critical when querying VIEWS (DataView): BQ materializes + # the view into a temp table, and Storage API reads from that temp table. try: - batch_iter = row_iter.to_arrow_iterable() + storage_kwargs = {} + if self.bqstorage_client: + storage_kwargs["bqstorage_client"] = self.bqstorage_client + batch_iter = row_iter.to_arrow_iterable(**storage_kwargs) # Probe first batch to detect Storage API permission errors early first_batch = next(batch_iter, None) if first_batch is not None: @@ -368,7 +398,11 @@ class BigQueryClient: if row_filter: sql += f" WHERE {row_filter}" - logger.info(f"Streaming BQ table: {table_id} (filter: {row_filter or 'none'})") + logger.info( + f"Streaming BQ table: {table_id} " + f"(filter: {row_filter or 'none'}, " + f"storage_api={'yes' if self.bqstorage_client else 'no'})" + ) yield from self.query_to_arrow_batches(sql) def read_table( @@ -442,6 +476,7 @@ class BigQueryClient: start: str, end: Optional[str] = None, columns: Optional[List[str]] = None, + column_type: str = "TIMESTAMP", ) -> pa.Table: """ Read data within a partition range. @@ -452,6 +487,7 @@ class BigQueryClient: start: Start date/timestamp (inclusive) end: End date/timestamp (exclusive). If None, reads to present. columns: Optional list of columns to select + column_type: BQ SQL type for the partition column ("DATE", "TIMESTAMP", "DATETIME") Returns: PyArrow Table with partition range data @@ -463,13 +499,13 @@ class BigQueryClient: f"WHERE `{partition_column}` >= @start_value" ) params = [ - bigquery.ScalarQueryParameter("start_value", "TIMESTAMP", start), + bigquery.ScalarQueryParameter("start_value", column_type, start), ] if end: sql += f" AND `{partition_column}` < @end_value" params.append( - bigquery.ScalarQueryParameter("end_value", "TIMESTAMP", end), + bigquery.ScalarQueryParameter("end_value", column_type, end), ) logger.info( @@ -477,6 +513,53 @@ class BigQueryClient: ) return self.query_to_arrow(sql, params=params) + def read_table_partitioned_streaming( + self, + table_id: str, + partition_column: str, + start: str, + end: Optional[str] = None, + columns: Optional[List[str]] = None, + column_type: str = "TIMESTAMP", + ): + """ + Read data within a partition range as streaming RecordBatches (constant memory). + + Unlike read_table_partitioned(), this does NOT load entire result into memory. + Each RecordBatch is a small chunk that can be written to disk immediately. + + Args: + table_id: Full table ID + partition_column: Partition column name + start: Start date/timestamp (inclusive) + end: End date/timestamp (exclusive). If None, reads to present. + columns: Optional list of columns to select + column_type: BQ SQL type for the partition column ("DATE", "TIMESTAMP", "DATETIME") + + Yields: + pyarrow.RecordBatch objects + """ + select_cols = ", ".join(f"`{c}`" for c in columns) if columns else "*" + + sql = ( + f"SELECT {select_cols} FROM `{table_id}` " + f"WHERE `{partition_column}` >= @start_value" + ) + params = [ + bigquery.ScalarQueryParameter("start_value", column_type, start), + ] + + if end: + sql += f" AND `{partition_column}` < @end_value" + params.append( + bigquery.ScalarQueryParameter("end_value", column_type, end), + ) + + logger.info( + f"Partitioned streaming read: {table_id} [{start} .. {end or 'now'})" + ) + yield from self.query_to_arrow_batches(sql, params=params) + def discover_all_tables(self, dataset_id: Optional[str] = None) -> List[Dict[str, Any]]: """ List all tables in the project (or specific dataset).