diff --git a/connectors/bigquery/adapter.py b/connectors/bigquery/adapter.py index 19fde49..b63bc04 100644 --- a/connectors/bigquery/adapter.py +++ b/connectors/bigquery/adapter.py @@ -104,17 +104,30 @@ class BigQueryDataSource(DataSource): else: raise ValueError(f"Unknown sync strategy: {table_config.sync_strategy}") - # Update sync state - sync_state.update_sync( - table_id=table_config.id, - table_name=table_config.name, - strategy=table_config.sync_strategy, - rows=result["rows"], - file_size_bytes=result["file_size_bytes"], - columns=result.get("columns", 0), - uncompressed_bytes=result.get("uncompressed_bytes", 0), + # Skip sync state update if partitioned sync got no new data. + # This lets the scheduler retry on the next tick instead of + # marking the sync as done for the day with stale data. + skip_state_update = ( + table_config.sync_strategy == "partitioned" + and result.get("partitions_updated", -1) == 0 ) + if skip_state_update: + logger.warning( + f"Partitioned sync for {table_config.name} got 0 new partitions " + f"- NOT updating last_sync (will retry next tick)" + ) + else: + sync_state.update_sync( + table_id=table_config.id, + table_name=table_config.name, + strategy=table_config.sync_strategy, + rows=result["rows"], + file_size_bytes=result["file_size_bytes"], + columns=result.get("columns", 0), + uncompressed_bytes=result.get("uncompressed_bytes", 0), + ) + return { "success": True, "rows": result["rows"], @@ -412,7 +425,9 @@ class BigQueryDataSource(DataSource): f"{total_rows} total rows processed" ) - return self._get_partition_totals(partition_dir) + result = self._get_partition_totals(partition_dir) + result["partitions_updated"] = partitions_updated + return result @staticmethod def _generate_partition_dates(