diff --git a/connectors/bigquery/adapter.py b/connectors/bigquery/adapter.py index e20fe0c..62ecd30 100644 --- a/connectors/bigquery/adapter.py +++ b/connectors/bigquery/adapter.py @@ -141,7 +141,11 @@ class BigQueryDataSource(DataSource): pyarrow_schema = self.bq_client.get_pyarrow_schema(table_config.id) # Read full table from BigQuery -> PyArrow - arrow_table = self.bq_client.read_table(table_config.id) + arrow_table = self.bq_client.read_table( + table_config.id, + columns=table_config.columns, + row_filter=table_config.row_filter, + ) # Apply schema enforcement if date_columns: @@ -226,6 +230,7 @@ class BigQueryDataSource(DataSource): table_id=table_config.id, incremental_column=table_config.incremental_column, since_value=since_value, + columns=table_config.columns, ) if new_data.num_rows == 0: @@ -275,9 +280,14 @@ class BigQueryDataSource(DataSource): table_id=table_config.id, incremental_column=table_config.incremental_column, since_value=since_dt.isoformat(), + columns=table_config.columns, ) else: - arrow_table = self.bq_client.read_table(table_config.id) + arrow_table = self.bq_client.read_table( + table_config.id, + columns=table_config.columns, + row_filter=table_config.row_filter, + ) # Apply schema enforcement if date_columns: @@ -349,9 +359,14 @@ class BigQueryDataSource(DataSource): table_id=table_config.id, partition_column=partition_col, start=start_dt.isoformat(), + columns=table_config.columns, ) else: - arrow_table = self.bq_client.read_table(table_config.id) + arrow_table = self.bq_client.read_table( + table_config.id, + columns=table_config.columns, + row_filter=table_config.row_filter, + ) if arrow_table.num_rows == 0: logger.info(" -> No data to sync") diff --git a/src/config.py b/src/config.py index 578f6ee..980d08e 100644 --- a/src/config.py +++ b/src/config.py @@ -102,6 +102,8 @@ class TableConfig: dataset: Optional[str] = None initial_load_chunk_days: int = 30 incremental_column: Optional[str] = None # Column for timestamp-based incremental sync (BigQuery) + columns: Optional[List[str]] = None # Subset of columns to sync (None = all) + row_filter: Optional[str] = None # SQL WHERE clause for filtering (e.g., "event_date >= '2024-01-01'") def __post_init__(self): """Validate configuration after initialization.""" @@ -431,6 +433,8 @@ class Config: dataset=table_data.get("dataset"), initial_load_chunk_days=table_data.get("initial_load_chunk_days", 30), incremental_column=table_data.get("incremental_column"), + columns=table_data.get("columns"), + row_filter=table_data.get("row_filter"), ) table_configs.append(config) diff --git a/tests/test_bigquery_adapter.py b/tests/test_bigquery_adapter.py index edec77a..d85e11d 100644 --- a/tests/test_bigquery_adapter.py +++ b/tests/test_bigquery_adapter.py @@ -494,7 +494,9 @@ class TestFirstSyncDownloadsAll: assert result["success"] is True assert result["rows"] == 3 # Should call read_table (full), not read_table_incremental - mock_bq_client.read_table.assert_called_once_with(table_config.id) + mock_bq_client.read_table.assert_called_once_with( + table_config.id, columns=None, row_filter=None, + ) mock_bq_client.read_table_incremental.assert_not_called() def test_first_sync_with_max_history_days(