From f19ff10e1a1f0ce742570096c9fa87ae4d888683 Mon Sep 17 00:00:00 2001 From: Petr Date: Mon, 16 Mar 2026 23:01:35 +0100 Subject: [PATCH] Fix: don't update last_sync when partitioned sync gets 0 new rows When BQ returns empty results (e.g., data not yet refreshed), the scheduler was marking sync as complete for the day. This meant the next 15-min tick would skip it ("none are due") and data would stay stale until the next day's scheduled run. Now: if partitioned sync processes partitions but gets 0 new rows, last_sync is NOT updated. The scheduler will retry on the next tick (15 min later) when data may be available. --- connectors/bigquery/adapter.py | 35 ++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/connectors/bigquery/adapter.py b/connectors/bigquery/adapter.py index 19fde49..b63bc04 100644 --- a/connectors/bigquery/adapter.py +++ b/connectors/bigquery/adapter.py @@ -104,17 +104,30 @@ class BigQueryDataSource(DataSource): else: raise ValueError(f"Unknown sync strategy: {table_config.sync_strategy}") - # Update sync state - sync_state.update_sync( - table_id=table_config.id, - table_name=table_config.name, - strategy=table_config.sync_strategy, - rows=result["rows"], - file_size_bytes=result["file_size_bytes"], - columns=result.get("columns", 0), - uncompressed_bytes=result.get("uncompressed_bytes", 0), + # Skip sync state update if partitioned sync got no new data. + # This lets the scheduler retry on the next tick instead of + # marking the sync as done for the day with stale data. + skip_state_update = ( + table_config.sync_strategy == "partitioned" + and result.get("partitions_updated", -1) == 0 ) + if skip_state_update: + logger.warning( + f"Partitioned sync for {table_config.name} got 0 new partitions " + f"- NOT updating last_sync (will retry next tick)" + ) + else: + sync_state.update_sync( + table_id=table_config.id, + table_name=table_config.name, + strategy=table_config.sync_strategy, + rows=result["rows"], + file_size_bytes=result["file_size_bytes"], + columns=result.get("columns", 0), + uncompressed_bytes=result.get("uncompressed_bytes", 0), + ) + return { "success": True, "rows": result["rows"], @@ -412,7 +425,9 @@ class BigQueryDataSource(DataSource): f"{total_rows} total rows processed" ) - return self._get_partition_totals(partition_dir) + result = self._get_partition_totals(partition_dir) + result["partitions_updated"] = partitions_updated + return result @staticmethod def _generate_partition_dates(