Fix: don't update last_sync when partitioned sync gets 0 new rows

When BQ returns empty results (e.g., data not yet refreshed), the
scheduler was marking sync as complete for the day. This meant the
next 15-min tick would skip it ("none are due") and data would stay
stale until the next day's scheduled run.

Now: if partitioned sync processes partitions but gets 0 new rows,
last_sync is NOT updated. The scheduler will retry on the next tick
(15 min later) when data may be available.
This commit is contained in:
Petr 2026-03-16 23:01:35 +01:00
parent 6c0abf275b
commit f19ff10e1a

View file

@ -104,17 +104,30 @@ class BigQueryDataSource(DataSource):
else: else:
raise ValueError(f"Unknown sync strategy: {table_config.sync_strategy}") raise ValueError(f"Unknown sync strategy: {table_config.sync_strategy}")
# Update sync state # Skip sync state update if partitioned sync got no new data.
sync_state.update_sync( # This lets the scheduler retry on the next tick instead of
table_id=table_config.id, # marking the sync as done for the day with stale data.
table_name=table_config.name, skip_state_update = (
strategy=table_config.sync_strategy, table_config.sync_strategy == "partitioned"
rows=result["rows"], and result.get("partitions_updated", -1) == 0
file_size_bytes=result["file_size_bytes"],
columns=result.get("columns", 0),
uncompressed_bytes=result.get("uncompressed_bytes", 0),
) )
if skip_state_update:
logger.warning(
f"Partitioned sync for {table_config.name} got 0 new partitions "
f"- NOT updating last_sync (will retry next tick)"
)
else:
sync_state.update_sync(
table_id=table_config.id,
table_name=table_config.name,
strategy=table_config.sync_strategy,
rows=result["rows"],
file_size_bytes=result["file_size_bytes"],
columns=result.get("columns", 0),
uncompressed_bytes=result.get("uncompressed_bytes", 0),
)
return { return {
"success": True, "success": True,
"rows": result["rows"], "rows": result["rows"],
@ -412,7 +425,9 @@ class BigQueryDataSource(DataSource):
f"{total_rows} total rows processed" f"{total_rows} total rows processed"
) )
return self._get_partition_totals(partition_dir) result = self._get_partition_totals(partition_dir)
result["partitions_updated"] = partitions_updated
return result
@staticmethod @staticmethod
def _generate_partition_dates( def _generate_partition_dates(