From 80c5b902e0e19ca1b009d8da6e76b28fc491c9c7 Mon Sep 17 00:00:00 2001 From: Petr Date: Sun, 15 Mar 2026 02:16:31 +0100 Subject: [PATCH] Add scheduled data sync and catalog refresh with systemd timers - New sync_schedule and profile_after_sync fields in TableConfig (formats: "every 15m", "every 1h", "daily 05:00") - New src/scheduler.py with schedule evaluation logic (is_table_due) - New --scheduled mode in data_sync.py: only syncs tables that are due, respects profile_after_sync flag, auto-restarts webapp after profiling - Systemd timer+service for data-refresh (every 15 min) - Systemd timer+service for catalog-refresh (every 15 min) - deploy.sh enables new timers automatically - Complete table config reference in data_description.md.example - 58 new scheduler tests --- config/data_description.md.example | 119 ++++++- server/deploy.sh | 2 +- .../systemd/catalog-refresh.service | 27 ++ .../systemd/catalog-refresh.timer | 11 + .../data_refresh/systemd/data-refresh.service | 33 ++ .../data_refresh/systemd/data-refresh.timer | 11 + src/config.py | 19 ++ src/data_sync.py | 177 ++++++++-- src/scheduler.py | 158 +++++++++ tests/test_scheduler.py | 321 ++++++++++++++++++ 10 files changed, 846 insertions(+), 32 deletions(-) create mode 100644 services/catalog_refresh/systemd/catalog-refresh.service create mode 100644 services/catalog_refresh/systemd/catalog-refresh.timer create mode 100644 services/data_refresh/systemd/data-refresh.service create mode 100644 services/data_refresh/systemd/data-refresh.timer create mode 100644 src/scheduler.py create mode 100644 tests/test_scheduler.py diff --git a/config/data_description.md.example b/config/data_description.md.example index 7dd7596..a246e33 100644 --- a/config/data_description.md.example +++ b/config/data_description.md.example @@ -11,30 +11,127 @@ folder_mapping: "in.c-example": "example" tables: + # Small reference table - full refresh, no automatic sync - id: "in.c-example.customers" name: "customers" description: "Customer master data" primary_key: "id" sync_strategy: "full_refresh" + # Large transactional table - daily automatic sync with profiling - id: "in.c-example.orders" name: "orders" description: "Order transactions with line items" primary_key: "id" - sync_strategy: "incremental" - incremental_window_days: 7 + sync_strategy: "partitioned" partition_by: "created_at" - partition_granularity: "month" + partition_column_type: "DATE" + partition_granularity: "day" + incremental_window_days: 3 + max_history_days: 450 + query_mode: "local" + sync_schedule: "daily 05:00" + profile_after_sync: true + + # Frequently updated table - sync every hour, skip profiling + - id: "in.c-example.events" + name: "events" + description: "Real-time event stream" + primary_key: "event_id" + sync_strategy: "partitioned" + partition_by: "event_date" + partition_column_type: "DATE" + partition_granularity: "day" + incremental_window_days: 1 + query_mode: "local" + sync_schedule: "every 1h" + profile_after_sync: false ``` -## Sync Strategies +## Table Configuration Reference -- **full_refresh**: Downloads entire table on each sync. Best for small reference tables. -- **incremental**: Downloads only new/changed rows based on a date column. Best for large transactional tables. +### Required Fields -## Partition Granularity +| Field | Description | Example | +|-------|-------------|---------| +| `id` | Full table identifier in data source | `"in.c-crm.company"` | +| `name` | Short name (used for Parquet filenames) | `"company"` | +| `description` | Human-readable description | `"Company master data"` | +| `primary_key` | Primary key column(s), comma-separated | `"id"` or `"order_id, line_id"` | +| `sync_strategy` | How data is downloaded (see below) | `"full_refresh"` | -When using `partition_by`, data is split into separate Parquet files by time period: -- **month**: One file per month (e.g., `orders/2024-01.parquet`) -- **day**: One file per day (e.g., `events/2024-01-15.parquet`) -- **none**: Single file (default) +### Sync Strategy + +| Strategy | Description | Use for | +|----------|-------------|---------| +| `full_refresh` | Downloads entire table each sync | Small reference tables (< 100K rows) | +| `incremental` | Downloads changed rows via changedSince | Medium tables with update tracking | +| `partitioned` | Downloads by time partitions, overwrites only recent ones | Large tables with date column | + +### Partitioning + +| Field | Default | Description | +|-------|---------|-------------| +| `partition_by` | *(none)* | Column to partition by (e.g., `"created_at"`, `"event_date"`) | +| `partition_granularity` | `"month"` | `"day"`, `"month"`, or `"year"` | +| `partition_column_type` | `"TIMESTAMP"` | SQL type: `"DATE"`, `"TIMESTAMP"`, or `"DATETIME"` | +| `incremental_window_days` | `7` | How many recent days to re-download on each sync | +| `max_history_days` | *(all)* | Maximum history to keep (e.g., `450` for ~15 months) | +| `initial_load_chunk_days` | `30` | Chunk size for first-time download | + +### Query Mode + +| Field | Default | Description | +|-------|---------|-------------| +| `query_mode` | `"local"` | How the AI agent queries this table | + +| Mode | Description | Best for | +|------|-------------|----------| +| `local` | Synced to Parquet, queried via DuckDB | Tables < 2 GB, fast queries | +| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data | +| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data | + +### Automatic Sync Schedule + +| Field | Default | Description | +|-------|---------|-------------| +| `sync_schedule` | *(none)* | When to automatically sync this table | +| `profile_after_sync` | `true` | Run data profiler after sync completes | + +The `sync_schedule` field controls automatic synchronization via the `data-refresh` +systemd timer (runs every 15 minutes). If omitted, the table is only synced manually. + +**Schedule formats:** + +| Format | Example | Description | +|--------|---------|-------------| +| `every {N}m` | `"every 15m"`, `"every 30m"` | Sync every N minutes | +| `every {N}h` | `"every 1h"`, `"every 6h"` | Sync every N hours | +| `daily HH:MM` | `"daily 05:00"`, `"daily 17:30"` | Sync once per day at HH:MM UTC | +| *(omitted)* | - | Manual sync only (`python -m src.data_sync`) | + +**How scheduling works:** +- A systemd timer runs `python -m src.data_sync --scheduled` every 15 minutes +- For each table with `sync_schedule`, it checks the last sync time from `sync_state.json` +- `every` schedules: syncs if enough time has elapsed since last sync +- `daily` schedules: syncs once after the target time passes (skips if already synced today) +- Tables without `sync_schedule` are never synced automatically + +**Profiling control:** +- `profile_after_sync: true` (default) - runs profiler after sync to update column statistics +- `profile_after_sync: false` - skips profiler (use for frequently synced tables where + profiling overhead is not worth it; the AI agent uses slightly older statistics) +- When profiling runs, the webapp is automatically restarted to load new statistics + +### Optional Fields + +| Field | Default | Description | +|-------|---------|-------------| +| `folder` | *(from folder_mapping)* | Override output folder name | +| `row_filter` | *(none)* | SQL WHERE clause (e.g., `"date >= DATE_SUB(CURRENT_DATE(), INTERVAL 15 MONTH)"`) | +| `columns` | *(all)* | List of columns to sync (subset) | +| `incremental_column` | *(none)* | Column for timestamp-based incremental sync (BigQuery) | +| `dataset` | *(none)* | Dataset group name for on-demand tables | +| `catalog_fqn` | *(auto)* | OpenMetadata FQN override (auto-derived from table ID if not set) | +| `foreign_keys` | `[]` | List of foreign key relationships | +| `where_filters` | `[]` | List of filters for Keboola Storage API | diff --git a/server/deploy.sh b/server/deploy.sh index 1850b24..55d667c 100755 --- a/server/deploy.sh +++ b/server/deploy.sh @@ -363,7 +363,7 @@ if [[ -n "${DESKTOP_JWT_SECRET:-}" ]] && ! systemctl is-active --quiet ws-gatewa fi # Enable timers (only if service files exist) -for timer in corporate-memory session-collector jira-sla-poll jira-consistency jira-consistency-deep; do +for timer in corporate-memory session-collector jira-sla-poll jira-consistency jira-consistency-deep data-refresh catalog-refresh; do if [[ -f "/etc/systemd/system/${timer}.timer" ]]; then if ! systemctl is-enabled --quiet "${timer}.timer" 2>/dev/null; then log "Enabling ${timer} timer..." diff --git a/services/catalog_refresh/systemd/catalog-refresh.service b/services/catalog_refresh/systemd/catalog-refresh.service new file mode 100644 index 0000000..8b82895 --- /dev/null +++ b/services/catalog_refresh/systemd/catalog-refresh.service @@ -0,0 +1,27 @@ +[Unit] +Description=Catalog Refresh - export metrics and tables from OpenMetadata to YAML +After=network-online.target +Wants=network-online.target + +[Service] +Type=oneshot +User=root +Group=data-ops +WorkingDirectory=/opt/data-analyst/repo +ExecStart=/opt/data-analyst/.venv/bin/python3 -m src.catalog_export + +# Environment +EnvironmentFile=/opt/data-analyst/.env +Environment=PYTHONPATH=/opt/data-analyst/repo +Environment=CONFIG_DIR=/opt/data-analyst/instance/config + +# Write access to docs output directory +ProtectSystem=strict +ReadWritePaths=/data/docs /opt/data-analyst/logs +PrivateTmp=true + +# Catalog export is fast (seconds) +TimeoutStartSec=120 + +[Install] +WantedBy=multi-user.target diff --git a/services/catalog_refresh/systemd/catalog-refresh.timer b/services/catalog_refresh/systemd/catalog-refresh.timer new file mode 100644 index 0000000..fdde9c0 --- /dev/null +++ b/services/catalog_refresh/systemd/catalog-refresh.timer @@ -0,0 +1,11 @@ +[Unit] +Description=Run Catalog Refresh every 15 minutes + +[Timer] +OnBootSec=1min +OnUnitActiveSec=15min +RandomizedDelaySec=30 +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/services/data_refresh/systemd/data-refresh.service b/services/data_refresh/systemd/data-refresh.service new file mode 100644 index 0000000..7ed0b9c --- /dev/null +++ b/services/data_refresh/systemd/data-refresh.service @@ -0,0 +1,33 @@ +[Unit] +Description=Data Refresh - scheduled sync from BigQuery to local Parquet +After=network-online.target +Wants=network-online.target + +[Service] +Type=oneshot +User=root +Group=data-ops +WorkingDirectory=/opt/data-analyst/repo +ExecStart=/opt/data-analyst/.venv/bin/python3 -m src.data_sync --scheduled + +# Environment +EnvironmentFile=/opt/data-analyst/.env +Environment=PYTHONPATH=/opt/data-analyst/repo +Environment=CONFIG_DIR=/opt/data-analyst/instance/config + +# Write access to data directory and logs +ProtectSystem=strict +ReadWritePaths=/data /opt/data-analyst/logs /tmp/data_analyst_staging +PrivateTmp=false + +# Sync can take a while for large tables +TimeoutStartSec=3600 + +# Prevent overlapping runs +ExecCondition=/usr/bin/test ! -f /tmp/data-refresh.lock +ExecStartPre=/usr/bin/touch /tmp/data-refresh.lock +ExecStartPost=/usr/bin/rm -f /tmp/data-refresh.lock +ExecStopPost=/usr/bin/rm -f /tmp/data-refresh.lock + +[Install] +WantedBy=multi-user.target diff --git a/services/data_refresh/systemd/data-refresh.timer b/services/data_refresh/systemd/data-refresh.timer new file mode 100644 index 0000000..8500599 --- /dev/null +++ b/services/data_refresh/systemd/data-refresh.timer @@ -0,0 +1,11 @@ +[Unit] +Description=Run Data Refresh every 15 minutes + +[Timer] +OnBootSec=3min +OnUnitActiveSec=15min +RandomizedDelaySec=30 +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/src/config.py b/src/config.py index 1975dc9..3ff5374 100644 --- a/src/config.py +++ b/src/config.py @@ -86,6 +86,8 @@ class TableConfig: max_history_days: Max days of history for initial incremental load (None = download all) dataset: Dataset group name for on-demand tables (e.g., "kbc_telemetry_expert") initial_load_chunk_days: Chunk size in days for chunked initial load (default: 30) + sync_schedule: Schedule for automatic sync: "every 15m", "every 1h", "daily 05:00" (UTC) + profile_after_sync: Run profiler after sync (default True; disable for frequently synced tables) """ id: str name: str @@ -107,6 +109,8 @@ class TableConfig: query_mode: str = "local" # "local" (Parquet) | "remote" (BQ direct) | "hybrid" (sync subset, query BQ) partition_column_type: str = "TIMESTAMP" # BQ SQL type for partition column: "DATE", "TIMESTAMP", "DATETIME" catalog_fqn: Optional[str] = None # Explicit OpenMetadata FQN override (auto-derived if not set) + sync_schedule: Optional[str] = None # Schedule: "every 15m", "every 1h", "daily 05:00" (UTC) + profile_after_sync: bool = True # Run profiler after sync (disable for frequently synced tables) def __post_init__(self): """Validate configuration after initialization.""" @@ -158,6 +162,19 @@ class TableConfig: f"Allowed values: {', '.join(valid_column_types)}" ) + # Validate sync_schedule format + if self.sync_schedule: + import re as _re + valid_schedule = ( + _re.match(r"^every \d+[mh]$", self.sync_schedule) + or _re.match(r"^daily \d{2}:\d{2}$", self.sync_schedule) + ) + if not valid_schedule: + raise ValueError( + f"Invalid sync_schedule '{self.sync_schedule}' for table {self.id}. " + f"Allowed formats: 'every 15m', 'every 1h', 'daily 05:00'" + ) + # For partitioned, partition_by must be defined if self.sync_strategy == "partitioned": if not self.partition_by: @@ -457,6 +474,8 @@ class Config: query_mode=table_data.get("query_mode", "local"), partition_column_type=table_data.get("partition_column_type", "TIMESTAMP"), catalog_fqn=table_data.get("catalog_fqn"), + sync_schedule=table_data.get("sync_schedule"), + profile_after_sync=table_data.get("profile_after_sync", True), ) table_configs.append(config) diff --git a/src/data_sync.py b/src/data_sync.py index 337c42f..34f97f0 100644 --- a/src/data_sync.py +++ b/src/data_sync.py @@ -481,24 +481,152 @@ class DataSyncManager: # Auto-profile changed tables if success_count > 0: - try: - from src.profiler import profile_changed_tables - changed = [ - self.config.get_table_config(tid).name - for tid, r in results.items() - if r.get("success") and self.config.get_table_config(tid) - ] - if changed: - result = profile_changed_tables(changed) - logger.info( - f"Auto-profiling: {result['success']} profiled, " - f"{result['errors']} errors, {result['skipped']} skipped" - ) - except Exception as e: - logger.warning(f"Auto-profiling failed (non-fatal): {e}") + self._auto_profile(results) return results + def _auto_profile( + self, + results: Dict[str, Dict[str, Any]], + skip_tables: Optional[List[str]] = None, + ): + """Run profiler on successfully synced tables. + + Args: + results: Sync results dict {table_id: result} + skip_tables: Table IDs to skip profiling for + """ + skip_set = set(skip_tables or []) + try: + from src.profiler import profile_changed_tables + changed = [ + self.config.get_table_config(tid).name + for tid, r in results.items() + if r.get("success") + and self.config.get_table_config(tid) + and tid not in skip_set + ] + if changed: + result = profile_changed_tables(changed) + logger.info( + f"Auto-profiling: {result['success']} profiled, " + f"{result['errors']} errors, {result['skipped']} skipped" + ) + else: + logger.info("No tables to profile (all skipped or none succeeded)") + except Exception as e: + logger.warning(f"Auto-profiling failed (non-fatal): {e}") + + def sync_scheduled(self) -> Dict[str, Dict[str, Any]]: + """Synchronize only tables whose sync_schedule says they are due. + + Evaluates each table's sync_schedule against its last_sync timestamp. + Only syncs tables that are due. Respects profile_after_sync flag. + + Returns: + Dictionary {table_id: result} with sync results (only for synced tables) + """ + from src.scheduler import is_table_due + + scheduled_tables = [ + tc for tc in self.config.tables + if tc.sync_schedule and tc.query_mode != "remote" + ] + + if not scheduled_tables: + logger.info("No tables with sync_schedule configured") + return {} + + # Evaluate which tables are due + due_tables = [] + for tc in scheduled_tables: + last_sync = self.sync_state.get_last_sync(tc.id) + if is_table_due(tc.sync_schedule, last_sync): + due_tables.append(tc) + logger.info(f"Table {tc.name} is DUE (schedule: {tc.sync_schedule})") + else: + logger.debug(f"Table {tc.name} is not due (schedule: {tc.sync_schedule})") + + if not due_tables: + logger.info( + f"Checked {len(scheduled_tables)} scheduled tables, none are due" + ) + return {} + + logger.info( + f"Syncing {len(due_tables)}/{len(scheduled_tables)} due tables: " + f"{', '.join(tc.name for tc in due_tables)}" + ) + + # Sync due tables + results = {} + for table_config in due_tables: + try: + result = self.data_source.sync_table(table_config, self.sync_state) + results[table_config.id] = result + if result["success"]: + logger.info( + f" {table_config.name}: {result['rows']:,} rows, " + f"{result['file_size_mb']:.2f} MB" + ) + else: + logger.error(f" {table_config.name}: {result['error']}") + except Exception as e: + logger.error(f" {table_config.name}: sync failed: {e}") + results[table_config.id] = {"success": False, "error": str(e)} + + success_count = sum(1 for r in results.values() if r["success"]) + logger.info(f"Scheduled sync: {success_count}/{len(results)} tables successful") + + # Generate schema.yml + if success_count > 0: + try: + self._generate_schema_yaml() + except Exception as e: + logger.warning(f"Failed to generate schema.yml: {e}") + + # Profile only tables with profile_after_sync=True + skip_profiler = [ + tc.id for tc in due_tables if not tc.profile_after_sync + ] + if skip_profiler: + logger.info( + f"Skipping profiler for: " + f"{', '.join(self.config.get_table_config(tid).name for tid in skip_profiler)}" + ) + + profiled_any = False + if success_count > 0: + tables_to_profile = [ + tid for tid, r in results.items() + if r.get("success") and tid not in set(skip_profiler) + ] + if tables_to_profile: + self._auto_profile(results, skip_tables=skip_profiler) + profiled_any = True + + # Restart webapp if profiler ran (new profiles.json needs reload) + if profiled_any: + self._restart_webapp() + + return results + + def _restart_webapp(self): + """Restart webapp service to pick up new profiles.json.""" + import subprocess + try: + subprocess.run( + ["sudo", "systemctl", "restart", "webapp"], + check=True, + capture_output=True, + timeout=30, + ) + logger.info("Webapp restarted successfully") + except subprocess.CalledProcessError as e: + logger.warning(f"Failed to restart webapp: {e.stderr.decode() if e.stderr else e}") + except FileNotFoundError: + logger.debug("systemctl not found (not running on server)") + def create_sync_manager() -> DataSyncManager: """ @@ -563,16 +691,25 @@ if __name__ == "__main__": # CLI interface for sync import sys - print("Data Sync") + scheduled_mode = "--scheduled" in sys.argv + table_args = [a for a in sys.argv[1:] if a != "--scheduled"] try: manager = create_sync_manager() - if len(sys.argv) > 1: - tables_to_sync = sys.argv[1:] - print(f"\nSynchronizing selected tables: {', '.join(tables_to_sync)}") - results = manager.sync_all(tables=tables_to_sync) + if scheduled_mode: + print("Data Sync (scheduled mode)") + results = manager.sync_scheduled() + + if not results: + print("No tables due for sync") + sys.exit(0) + elif table_args: + print("Data Sync") + print(f"\nSynchronizing selected tables: {', '.join(table_args)}") + results = manager.sync_all(tables=table_args) else: + print("Data Sync") print("\nSynchronizing all tables...") results = manager.sync_all() diff --git a/src/scheduler.py b/src/scheduler.py new file mode 100644 index 0000000..d307e32 --- /dev/null +++ b/src/scheduler.py @@ -0,0 +1,158 @@ +""" +Schedule evaluation for automatic data sync. + +Parses sync_schedule strings from table configuration and determines +whether a table is due for synchronization based on its last sync time. + +Schedule formats: + "every 15m" - every 15 minutes + "every 1h" - every hour + "daily 05:00" - once per day at 05:00 UTC +""" + +import logging +import re +from datetime import datetime, timezone +from typing import Optional + +logger = logging.getLogger(__name__) + +# Pattern: "every 15m", "every 2h" +INTERVAL_PATTERN = re.compile(r"^every (\d+)([mh])$") + +# Pattern: "daily 05:00", "daily 17:30" +DAILY_PATTERN = re.compile(r"^daily (\d{2}):(\d{2})$") + + +def parse_interval_minutes(schedule: str) -> Optional[int]: + """Parse an interval schedule into minutes. + + Args: + schedule: Schedule string like "every 15m" or "every 1h" + + Returns: + Interval in minutes, or None if not an interval schedule. + """ + match = INTERVAL_PATTERN.match(schedule) + if not match: + return None + value = int(match.group(1)) + unit = match.group(2) + if unit == "h": + return value * 60 + return value + + +def is_table_due( + schedule: str, + last_sync_iso: Optional[str], + now: Optional[datetime] = None, +) -> bool: + """Determine whether a table is due for sync based on its schedule. + + Args: + schedule: Schedule string from table config (e.g., "every 1h", "daily 05:00") + last_sync_iso: ISO timestamp of last sync, or None if never synced + now: Current time (UTC). Defaults to datetime.now(timezone.utc). + + Returns: + True if the table should be synced now. + """ + if now is None: + now = datetime.now(timezone.utc) + + # Never synced -> always due + if not last_sync_iso: + logger.info("Table never synced, marking as due") + return True + + # Parse last_sync timestamp + last_sync = _parse_timestamp(last_sync_iso) + if last_sync is None: + logger.warning(f"Cannot parse last_sync timestamp: {last_sync_iso}, marking as due") + return True + + # Ensure timezone-aware comparison + if last_sync.tzinfo is None: + last_sync = last_sync.replace(tzinfo=timezone.utc) + + # Check interval schedule: "every Xm" / "every Xh" + interval_minutes = parse_interval_minutes(schedule) + if interval_minutes is not None: + elapsed_minutes = (now - last_sync).total_seconds() / 60 + due = elapsed_minutes >= interval_minutes + if due: + logger.debug( + f"Interval schedule: {elapsed_minutes:.0f}m elapsed >= {interval_minutes}m interval" + ) + return due + + # Check daily schedule: "daily HH:MM" + match = DAILY_PATTERN.match(schedule) + if match: + target_hour = int(match.group(1)) + target_minute = int(match.group(2)) + return _is_daily_due(last_sync, now, target_hour, target_minute) + + logger.warning(f"Unknown schedule format: {schedule}") + return False + + +def _is_daily_due( + last_sync: datetime, + now: datetime, + target_hour: int, + target_minute: int, +) -> bool: + """Check if a daily schedule is due. + + A daily schedule at HH:MM is due when: + 1. Current time is at or past HH:MM today, AND + 2. Last sync was before HH:MM today + + This means: once HH:MM passes, the first scheduler tick will trigger it, + and subsequent ticks on the same day will skip it. + """ + # Today's target time + today_target = now.replace( + hour=target_hour, minute=target_minute, second=0, microsecond=0 + ) + + # Not yet time today + if now < today_target: + return False + + # Time has passed, check if we already synced after today's target + if last_sync >= today_target: + return False + + logger.debug( + f"Daily schedule: target {target_hour:02d}:{target_minute:02d} UTC, " + f"last sync {last_sync.isoformat()}, now {now.isoformat()} -> due" + ) + return True + + +def _parse_timestamp(iso_string: str) -> Optional[datetime]: + """Parse an ISO timestamp string, handling various formats. + + Args: + iso_string: ISO 8601 timestamp string + + Returns: + datetime object or None if parsing fails + """ + try: + # Python 3.11+ fromisoformat handles most formats + return datetime.fromisoformat(iso_string) + except (ValueError, TypeError): + pass + + # Fallback: try common formats + for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"): + try: + return datetime.strptime(iso_string, fmt) + except ValueError: + continue + + return None diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..3a94486 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,321 @@ +"""Tests for src.scheduler - schedule parsing and sync-due evaluation.""" + +from datetime import datetime, timedelta, timezone +from typing import Optional + +import pytest + +from src.scheduler import ( + _is_daily_due, + _parse_timestamp, + is_table_due, + parse_interval_minutes, +) + +# Fixed reference time: 2026-03-15 12:00:00 UTC +NOW = datetime(2026, 3, 15, 12, 0, 0, tzinfo=timezone.utc) + + +# --------------------------------------------------------------------------- +# parse_interval_minutes +# --------------------------------------------------------------------------- + + +class TestParseIntervalMinutes: + """Tests for parse_interval_minutes().""" + + def test_minutes_basic(self) -> None: + assert parse_interval_minutes("every 15m") == 15 + + def test_minutes_single_digit(self) -> None: + assert parse_interval_minutes("every 5m") == 5 + + def test_minutes_large(self) -> None: + assert parse_interval_minutes("every 120m") == 120 + + def test_hours_basic(self) -> None: + assert parse_interval_minutes("every 2h") == 120 + + def test_hours_single(self) -> None: + assert parse_interval_minutes("every 1h") == 60 + + def test_hours_large(self) -> None: + assert parse_interval_minutes("every 24h") == 1440 + + def test_daily_returns_none(self) -> None: + assert parse_interval_minutes("daily 05:00") is None + + def test_invalid_format_returns_none(self) -> None: + assert parse_interval_minutes("not a schedule") is None + + def test_empty_string_returns_none(self) -> None: + assert parse_interval_minutes("") is None + + def test_missing_unit_returns_none(self) -> None: + assert parse_interval_minutes("every 15") is None + + def test_wrong_unit_returns_none(self) -> None: + assert parse_interval_minutes("every 15s") is None + + def test_no_space_returns_none(self) -> None: + assert parse_interval_minutes("every15m") is None + + def test_extra_whitespace_returns_none(self) -> None: + # Strict parsing: extra whitespace is rejected + assert parse_interval_minutes("every 15m") is None + + def test_negative_not_matched(self) -> None: + # Regex uses \d+ so negative sign won't match + assert parse_interval_minutes("every -5m") is None + + def test_zero_minutes(self) -> None: + # "every 0m" matches the pattern, returns 0 + assert parse_interval_minutes("every 0m") == 0 + + +# --------------------------------------------------------------------------- +# is_table_due - interval schedules +# --------------------------------------------------------------------------- + + +class TestIsTableDueInterval: + """Tests for is_table_due() with interval-based schedules.""" + + def test_never_synced_is_due(self) -> None: + assert is_table_due("every 15m", last_sync_iso=None, now=NOW) is True + + def test_empty_last_sync_is_due(self) -> None: + assert is_table_due("every 15m", last_sync_iso="", now=NOW) is True + + def test_synced_10min_ago_every_15m_not_due(self) -> None: + last_sync = (NOW - timedelta(minutes=10)).isoformat() + assert is_table_due("every 15m", last_sync_iso=last_sync, now=NOW) is False + + def test_synced_20min_ago_every_15m_is_due(self) -> None: + last_sync = (NOW - timedelta(minutes=20)).isoformat() + assert is_table_due("every 15m", last_sync_iso=last_sync, now=NOW) is True + + def test_synced_exactly_15min_ago_every_15m_is_due(self) -> None: + last_sync = (NOW - timedelta(minutes=15)).isoformat() + assert is_table_due("every 15m", last_sync_iso=last_sync, now=NOW) is True + + def test_synced_30min_ago_every_1h_not_due(self) -> None: + last_sync = (NOW - timedelta(minutes=30)).isoformat() + assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is False + + def test_synced_90min_ago_every_1h_is_due(self) -> None: + last_sync = (NOW - timedelta(minutes=90)).isoformat() + assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is True + + def test_synced_exactly_1h_ago_every_1h_is_due(self) -> None: + last_sync = (NOW - timedelta(hours=1)).isoformat() + assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is True + + def test_synced_59min_ago_every_1h_not_due(self) -> None: + last_sync = (NOW - timedelta(minutes=59)).isoformat() + assert is_table_due("every 1h", last_sync_iso=last_sync, now=NOW) is False + + def test_synced_3h_ago_every_2h_is_due(self) -> None: + last_sync = (NOW - timedelta(hours=3)).isoformat() + assert is_table_due("every 2h", last_sync_iso=last_sync, now=NOW) is True + + +# --------------------------------------------------------------------------- +# is_table_due - daily schedules +# --------------------------------------------------------------------------- + + +class TestIsTableDueDaily: + """Tests for is_table_due() with daily schedules.""" + + def test_before_target_time_not_due(self) -> None: + now = datetime(2026, 3, 15, 4, 30, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 14, 6, 0, 0, tzinfo=timezone.utc).isoformat() + assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is False + + def test_past_target_not_synced_today_is_due(self) -> None: + now = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc).isoformat() + assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is True + + def test_past_target_already_synced_after_target_not_due(self) -> None: + now = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 15, 5, 15, 0, tzinfo=timezone.utc).isoformat() + assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is False + + def test_evening_schedule_past_target_last_sync_yesterday_is_due(self) -> None: + now = datetime(2026, 3, 15, 18, 0, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 14, 17, 30, 0, tzinfo=timezone.utc).isoformat() + assert is_table_due("daily 17:00", last_sync_iso=last_sync, now=now) is True + + def test_daily_never_synced_is_due(self) -> None: + now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc) + assert is_table_due("daily 05:00", last_sync_iso=None, now=now) is True + + def test_daily_never_synced_before_target_still_due(self) -> None: + # Never synced always returns True regardless of target time + now = datetime(2026, 3, 15, 3, 0, 0, tzinfo=timezone.utc) + assert is_table_due("daily 05:00", last_sync_iso=None, now=now) is True + + def test_daily_exactly_at_target_time_is_due(self) -> None: + now = datetime(2026, 3, 15, 5, 0, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 14, 5, 0, 0, tzinfo=timezone.utc).isoformat() + # now == today_target, so now < today_target is False + # last_sync (yesterday) < today_target => due + assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is True + + def test_daily_synced_at_exactly_target_not_due_again(self) -> None: + now = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 15, 5, 0, 0, tzinfo=timezone.utc).isoformat() + # last_sync == today_target => last_sync >= today_target => not due + assert is_table_due("daily 05:00", last_sync_iso=last_sync, now=now) is False + + def test_midnight_schedule(self) -> None: + now = datetime(2026, 3, 15, 0, 30, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 14, 0, 15, 0, tzinfo=timezone.utc).isoformat() + assert is_table_due("daily 00:00", last_sync_iso=last_sync, now=now) is True + + def test_end_of_day_schedule(self) -> None: + now = datetime(2026, 3, 15, 23, 59, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 14, 23, 50, 0, tzinfo=timezone.utc).isoformat() + assert is_table_due("daily 23:30", last_sync_iso=last_sync, now=now) is True + + +# --------------------------------------------------------------------------- +# is_table_due - edge cases +# --------------------------------------------------------------------------- + + +class TestIsTableDueEdgeCases: + """Edge case tests for is_table_due().""" + + def test_unparseable_last_sync_returns_true(self) -> None: + # Fail-safe: if we can't parse last_sync, assume sync is needed + assert is_table_due("every 15m", last_sync_iso="garbage", now=NOW) is True + + def test_unknown_schedule_format_returns_false(self) -> None: + last_sync = (NOW - timedelta(hours=2)).isoformat() + assert is_table_due("weekly", last_sync_iso=last_sync, now=NOW) is False + + def test_unknown_schedule_never_synced_returns_true(self) -> None: + # Never synced takes priority over unknown schedule + assert is_table_due("weekly", last_sync_iso=None, now=NOW) is True + + def test_now_defaults_to_current_time(self) -> None: + # When now is not provided, it defaults to current UTC time + # A table that was never synced should be due regardless + assert is_table_due("every 15m", last_sync_iso=None) is True + + def test_naive_last_sync_treated_as_utc(self) -> None: + # Naive timestamp (no timezone) should be treated as UTC + naive_ts = "2026-03-15T11:50:00" + # 10 minutes ago from NOW (12:00), with 15m interval -> not due + assert is_table_due("every 15m", last_sync_iso=naive_ts, now=NOW) is False + + def test_last_sync_in_future_not_due(self) -> None: + # Edge case: last_sync in the future (clock skew, etc.) + future = (NOW + timedelta(hours=1)).isoformat() + assert is_table_due("every 15m", last_sync_iso=future, now=NOW) is False + + +# --------------------------------------------------------------------------- +# _is_daily_due (internal function, direct tests) +# --------------------------------------------------------------------------- + + +class TestIsDailyDue: + """Direct tests for _is_daily_due() internal function.""" + + def test_before_target_not_due(self) -> None: + now = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 14, 5, 30, 0, tzinfo=timezone.utc) + assert _is_daily_due(last_sync, now, target_hour=5, target_minute=0) is False + + def test_after_target_last_sync_before_target_is_due(self) -> None: + now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 15, 4, 0, 0, tzinfo=timezone.utc) + assert _is_daily_due(last_sync, now, target_hour=5, target_minute=0) is True + + def test_after_target_last_sync_after_target_not_due(self) -> None: + now = datetime(2026, 3, 15, 6, 0, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 15, 5, 30, 0, tzinfo=timezone.utc) + assert _is_daily_due(last_sync, now, target_hour=5, target_minute=0) is False + + def test_target_with_minutes(self) -> None: + now = datetime(2026, 3, 15, 17, 45, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc) + assert _is_daily_due(last_sync, now, target_hour=17, target_minute=30) is True + + def test_target_with_minutes_not_yet(self) -> None: + now = datetime(2026, 3, 15, 17, 15, 0, tzinfo=timezone.utc) + last_sync = datetime(2026, 3, 15, 10, 0, 0, tzinfo=timezone.utc) + assert _is_daily_due(last_sync, now, target_hour=17, target_minute=30) is False + + +# --------------------------------------------------------------------------- +# _parse_timestamp +# --------------------------------------------------------------------------- + + +class TestParseTimestamp: + """Tests for _parse_timestamp() internal function.""" + + def test_iso_with_timezone(self) -> None: + result = _parse_timestamp("2026-03-15T12:00:00+00:00") + assert result is not None + assert result.year == 2026 + assert result.month == 3 + assert result.day == 15 + assert result.hour == 12 + + def test_iso_with_z_suffix(self) -> None: + # Python 3.11+ fromisoformat handles Z + result = _parse_timestamp("2026-03-15T12:00:00Z") + assert result is not None + assert result.hour == 12 + + def test_iso_without_timezone(self) -> None: + result = _parse_timestamp("2026-03-15T12:00:00") + assert result is not None + assert result.hour == 12 + assert result.tzinfo is None + + def test_iso_with_microseconds(self) -> None: + result = _parse_timestamp("2026-03-15T12:00:00.123456") + assert result is not None + assert result.microsecond == 123456 + + def test_space_separated(self) -> None: + result = _parse_timestamp("2026-03-15 12:00:00") + assert result is not None + assert result.hour == 12 + + def test_invalid_string_returns_none(self) -> None: + assert _parse_timestamp("not-a-date") is None + + def test_empty_string_returns_none(self) -> None: + assert _parse_timestamp("") is None + + def test_partial_date_returns_none(self) -> None: + # "2026-03-15" alone - fromisoformat handles date-only in 3.11+ + result = _parse_timestamp("2026-03-15") + # Should parse as a date (with hour=0, minute=0) + assert result is not None + assert result.hour == 0 + + def test_iso_with_positive_offset(self) -> None: + result = _parse_timestamp("2026-03-15T12:00:00+05:30") + assert result is not None + assert result.hour == 12 + assert result.utcoffset() is not None + + def test_iso_with_negative_offset(self) -> None: + result = _parse_timestamp("2026-03-15T12:00:00-07:00") + assert result is not None + assert result.utcoffset() is not None + + def test_numeric_garbage_returns_none(self) -> None: + assert _parse_timestamp("12345") is None + + def test_none_like_string_returns_none(self) -> None: + assert _parse_timestamp("None") is None