Support multiple daily sync times (e.g., "daily 07:00,13:00,18:00")

Scheduler now accepts comma-separated HH:MM times in daily schedules.
Each time slot is independently evaluated - if any slot has passed and
last_sync is before it, the table is marked as due.

This lets tables sync multiple times per day to pick up data refreshes
that happen throughout the day (e.g., Keboola pipelines running 3x/day).
This commit is contained in:
Petr 2026-03-16 23:09:48 +01:00
parent f19ff10e1a
commit 5f27d05894
2 changed files with 106 additions and 35 deletions

View file

@ -5,9 +5,10 @@ Parses sync_schedule strings from table configuration and determines
whether a table is due for synchronization based on its last sync time. whether a table is due for synchronization based on its last sync time.
Schedule formats: Schedule formats:
"every 15m" - every 15 minutes "every 15m" - every 15 minutes
"every 1h" - every hour "every 1h" - every hour
"daily 05:00" - once per day at 05:00 UTC "daily 05:00" - once per day at 05:00 UTC
"daily 07:00,13:00,18:00" - multiple times per day (UTC)
""" """
import logging import logging
@ -20,8 +21,8 @@ logger = logging.getLogger(__name__)
# Pattern: "every 15m", "every 2h" # Pattern: "every 15m", "every 2h"
INTERVAL_PATTERN = re.compile(r"^every (\d+)([mh])$") INTERVAL_PATTERN = re.compile(r"^every (\d+)([mh])$")
# Pattern: "daily 05:00", "daily 17:30" # Pattern: "daily 05:00", "daily 17:30", "daily 07:00,13:00,18:00"
DAILY_PATTERN = re.compile(r"^daily (\d{2}):(\d{2})$") DAILY_PATTERN = re.compile(r"^daily ([\d:,]+)$")
def parse_interval_minutes(schedule: str) -> Optional[int]: def parse_interval_minutes(schedule: str) -> Optional[int]:
@ -87,50 +88,61 @@ def is_table_due(
) )
return due return due
# Check daily schedule: "daily HH:MM" # Check daily schedule: "daily HH:MM" or "daily HH:MM,HH:MM,..."
match = DAILY_PATTERN.match(schedule) match = DAILY_PATTERN.match(schedule)
if match: if match:
target_hour = int(match.group(1)) times_str = match.group(1)
target_minute = int(match.group(2)) target_times = _parse_daily_times(times_str)
return _is_daily_due(last_sync, now, target_hour, target_minute) if not target_times:
logger.warning(f"Invalid daily schedule times: {schedule}")
return False
return _is_daily_due(last_sync, now, target_times)
logger.warning(f"Unknown schedule format: {schedule}") logger.warning(f"Unknown schedule format: {schedule}")
return False return False
def _parse_daily_times(times_str: str) -> list[tuple[int, int]]:
"""Parse comma-separated HH:MM times into list of (hour, minute) tuples."""
time_pattern = re.compile(r"^(\d{2}):(\d{2})$")
result = []
for part in times_str.split(","):
m = time_pattern.match(part.strip())
if not m:
return []
hour, minute = int(m.group(1)), int(m.group(2))
if hour > 23 or minute > 59:
return []
result.append((hour, minute))
return result
def _is_daily_due( def _is_daily_due(
last_sync: datetime, last_sync: datetime,
now: datetime, now: datetime,
target_hour: int, target_times: list[tuple[int, int]],
target_minute: int,
) -> bool: ) -> bool:
"""Check if a daily schedule is due. """Check if a daily schedule is due.
A daily schedule at HH:MM is due when: Supports multiple target times per day. A target time is due when:
1. Current time is at or past HH:MM today, AND 1. Current time is at or past HH:MM today, AND
2. Last sync was before HH:MM today 2. Last sync was before HH:MM today
This means: once HH:MM passes, the first scheduler tick will trigger it, Returns True if ANY of the target times is due.
and subsequent ticks on the same day will skip it.
""" """
# Today's target time for target_hour, target_minute in target_times:
today_target = now.replace( today_target = now.replace(
hour=target_hour, minute=target_minute, second=0, microsecond=0 hour=target_hour, minute=target_minute, second=0, microsecond=0
) )
# Not yet time today if now >= today_target and last_sync < today_target:
if now < today_target: logger.debug(
return False f"Daily schedule: target {target_hour:02d}:{target_minute:02d} UTC, "
f"last sync {last_sync.isoformat()}, now {now.isoformat()} -> due"
)
return True
# Time has passed, check if we already synced after today's target return False
if last_sync >= today_target:
return False
logger.debug(
f"Daily schedule: target {target_hour:02d}:{target_minute:02d} UTC, "
f"last sync {last_sync.isoformat()}, now {now.isoformat()} -> due"
)
return True
def _parse_timestamp(iso_string: str) -> Optional[datetime]: def _parse_timestamp(iso_string: str) -> Optional[datetime]:

View file

@ -7,6 +7,7 @@ import pytest
from src.scheduler import ( from src.scheduler import (
_is_daily_due, _is_daily_due,
_parse_daily_times,
_parse_timestamp, _parse_timestamp,
is_table_due, is_table_due,
parse_interval_minutes, parse_interval_minutes,
@ -229,27 +230,85 @@ class TestIsDailyDue:
def test_before_target_not_due(self) -> None: def test_before_target_not_due(self) -> None:
now = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc) now = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 14, 5, 30, 0, tzinfo=timezone.utc) last_sync = datetime(2026, 3, 14, 5, 30, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, target_hour=5, target_minute=0) is False assert _is_daily_due(last_sync, now, [(5, 0)]) is False
def test_after_target_last_sync_before_target_is_due(self) -> None: def test_after_target_last_sync_before_target_is_due(self) -> None:
now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc) now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc) last_sync = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, target_hour=5, target_minute=0) is True assert _is_daily_due(last_sync, now, [(5, 0)]) is True
def test_after_target_last_sync_after_target_not_due(self) -> None: def test_after_target_last_sync_after_target_not_due(self) -> None:
now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc) now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc) last_sync = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, target_hour=5, target_minute=0) is False assert _is_daily_due(last_sync, now, [(5, 0)]) is False
def test_target_with_minutes(self) -> None: def test_target_with_minutes(self) -> None:
now = datetime(2026, 3, 15, 17, 45, 0, tzinfo=timezone.utc) now = datetime(2026, 3, 15, 17, 45, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc) last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, target_hour=17, target_minute=30) is True assert _is_daily_due(last_sync, now, [(17, 30)]) is True
def test_target_with_minutes_not_yet(self) -> None: def test_target_with_minutes_not_yet(self) -> None:
now = datetime(2026, 3, 15, 17, 15, 0, tzinfo=timezone.utc) now = datetime(2026, 3, 15, 17, 15, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc) last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, target_hour=17, target_minute=30) is False assert _is_daily_due(last_sync, now, [(17, 30)]) is False
class TestMultipleDailyTimes:
"""Tests for multiple daily schedule times."""
def test_multi_time_first_due(self) -> None:
now = datetime(2026, 3, 15, 8, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 14, 19, 0, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is True
def test_multi_time_second_due(self) -> None:
now = datetime(2026, 3, 15, 14, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is True
def test_multi_time_third_due(self) -> None:
now = datetime(2026, 3, 15, 19, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 13, 30, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is True
def test_multi_time_between_slots_not_due(self) -> None:
now = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is False
def test_multi_time_all_done_not_due(self) -> None:
now = datetime(2026, 3, 15, 20, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 18, 30, 0, tzinfo=timezone.utc)
assert _is_daily_due(last_sync, now, [(7, 0), (13, 0), (18, 0)]) is False
def test_is_table_due_multi_time_format(self) -> None:
now = datetime(2026, 3, 15, 14, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc).isoformat()
assert is_table_due("daily 07:00,13:00,18:00", last_sync_iso=last_sync, now=now) is True
def test_is_table_due_multi_time_not_due(self) -> None:
now = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc)
last_sync = datetime(2026, 3, 15, 7, 30, 0, tzinfo=timezone.utc).isoformat()
assert is_table_due("daily 07:00,13:00,18:00", last_sync_iso=last_sync, now=now) is False
class TestParseDailyTimes:
"""Tests for _parse_daily_times()."""
def test_single_time(self) -> None:
assert _parse_daily_times("05:00") == [(5, 0)]
def test_multiple_times(self) -> None:
assert _parse_daily_times("07:00,13:00,18:00") == [(7, 0), (13, 0), (18, 0)]
def test_invalid_format(self) -> None:
assert _parse_daily_times("7:00") == []
def test_invalid_hour(self) -> None:
assert _parse_daily_times("25:00") == []
def test_invalid_minute(self) -> None:
assert _parse_daily_times("12:60") == []
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------