From b7a1795834a47e1072836b1e3a1ffa462abdb41e Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr <139972147+ZdenekSrotyr@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:06:30 +0200 Subject: [PATCH] feat(scheduler): re-wire sync_schedule + script.schedule; tune via env; OpenMetadata TLS (#135) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bundles 4 issues: - #79 — table_registry.sync_schedule honored at runtime (API-side filter + Pydantic validators) - #78 — script_registry.schedule honored via new POST /api/scripts/run-due (atomic claim, BackgroundTask exec, deploy-time safety validation) - #77 — sidecar JOBS env-driven (SCHEDULER_DATA_REFRESH_INTERVAL/HEALTH_CHECK_INTERVAL/SCRIPT_RUN_INTERVAL/TICK_SECONDS) - #89 — OpenMetadataClient verify=True default (BREAKING for self-signed) Cuts release 0.19.0. See CHANGELOG for full notes incl. Known Limitations. --- CHANGELOG.md | 24 +- app/api/admin.py | 36 + app/api/scripts.py | 132 +- app/api/sync.py | 28 +- config/.env.template | 8 + config/instance.yaml.example | 4 + connectors/openmetadata/client.py | 13 +- connectors/openmetadata/enricher.py | 5 +- docs/DEPLOYMENT.md | 39 + .../plans/2026-04-29-issues-77-78-79-89.md | 1601 +++++++++++++++++ pyproject.toml | 2 +- services/scheduler/__main__.py | 113 +- src/catalog_export.py | 5 +- src/repositories/notifications.py | 37 + src/scheduler.py | 87 + tests/helpers/factories.py | 2 +- tests/test_admin_bq_register.py | 46 + tests/test_admin_configure_api.py | 2 +- tests/test_api_scripts.py | 2 +- tests/test_openmetadata_client.py | 47 + tests/test_repositories.py | 4 +- tests/test_run_due_scripts.py | 238 +++ tests/test_scheduler_sidecar.py | 93 + tests/test_scripts_api.py | 22 +- tests/test_sync_filter.py | 314 ++++ 25 files changed, 2839 insertions(+), 65 deletions(-) create mode 100644 docs/superpowers/plans/2026-04-29-issues-77-78-79-89.md create mode 100644 tests/test_run_due_scripts.py create mode 100644 tests/test_scheduler_sidecar.py create mode 100644 tests/test_sync_filter.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a6ea9d..1b3b3df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,13 +10,31 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] -### Changed +## [0.19.0] — 2026-04-29 -- **BREAKING** `GET /marketplace/info` (admin-only debug endpoint) `name` field now returns the plugin's authoritative name from its `plugin.json` (e.g. `plug-x`) instead of the slug-prefixed form (`-`). The slug-prefixed form moved to a new `prefixed_name` field next to it; `original_name` is unchanged. Side-effect of the `/plugin` UI fix below — the synth marketplace.json's `name` field had to switch over for Claude Code's catalog lookup to work, and `/marketplace/info` mirrors that surface for consistency. Any downstream tooling that parsed the `name` field expecting the slug-prefixed format must now read `prefixed_name`. +### Added +- `table_registry.sync_schedule` is now honored at runtime. `POST /api/sync/trigger` (called by the scheduler sidecar every 15 min by default) drops local tables whose schedule says they are not due. Tables without a schedule continue to sync on every tick (opt-in feature). Manual `POST /api/sync/trigger {"tables":[...]}` bypasses the schedule filter — operator override always wins. (#79) +- `script_registry.schedule` is now honored at runtime via the new endpoint `POST /api/scripts/run-due` (admin-only). The scheduler sidecar fires this every 60 s by default. Each due script is claimed atomically (`last_status='running'`), executed in a BackgroundTask, and the outcome written to `last_run` / `last_status`. Scripts already in `running` state are skipped — no concurrent runs of the same script. (#78) +- Four new env vars on the scheduler sidecar: `SCHEDULER_DATA_REFRESH_INTERVAL`, `SCHEDULER_HEALTH_CHECK_INTERVAL`, `SCHEDULER_SCRIPT_RUN_INTERVAL`, `SCHEDULER_TICK_SECONDS`. All accept positive integers (seconds); tick must be ≤ smallest job interval. Documented in `docs/DEPLOYMENT.md` → Scheduler tuning. (#77) +- `RegisterTableRequest.sync_schedule`, `UpdateTableRequest.sync_schedule`, and `DeployScriptRequest.schedule` now reject malformed strings with a Pydantic 422 (e.g. `"hourly"`, `"daily 25:00"`). The accepted forms are unchanged: `every Nm`, `every Nh`, `daily HH:MM[,HH:MM,...]`. **Note**: cron expressions (`"0 8 * * MON"` etc.) were never honoured by the runtime evaluator — they used to round-trip through the API as a silent no-op, and now they get a loud 422 at register/deploy time. Operators using cron strings must convert to one of the supported forms. (#78, #79) +- New `verify_ssl` knob in the `openmetadata:` section of `instance.yaml` (default `true`). Operators on internal CAs / self-signed catalogs must set it explicitly. (#89) + +### Changed +- **BREAKING** `POST /api/scripts/deploy` now validates the source against the safety blocklist BEFORE persisting (previously safety checks ran only at execution time). Scripts containing blocked imports / patterns return 400 from `/deploy` instead of being stored and failing every scheduler tick. Closes the claim-fail-retry loop where the new `/api/scripts/run-due` endpoint would re-claim and re-fail an unrunnable deployed script every minute. (#78) +- **BREAKING** `OpenMetadataClient` now defaults to `verify=True` for TLS. The previous version hardcoded `verify=False` and suppressed urllib3's "Unverified HTTPS request" warning at import time (which leaked to every other httpx client in the process). Existing deployments on self-signed certificates without an explicit opt-out will start failing TLS verification — set `verify_ssl: false` in the `openmetadata:` block of `instance.yaml`, or supply a CA bundle path, before upgrading. Both production call sites (`connectors/openmetadata/enricher.py`, `src/catalog_export.py`) read the new `verify_ssl` config knob and pass it through. (#89) +- **BREAKING** `GET /marketplace/info` (admin-only debug endpoint) `name` field now returns the plugin's authoritative name from its `plugin.json` (e.g. `plug-x`) instead of the slug-prefixed form (`-`). The slug-prefixed form moved to a new `prefixed_name` field next to it; `original_name` is unchanged. Side-effect of the `/plugin` UI fix below — the synth marketplace.json's `name` field had to switch over for Claude Code's catalog lookup to work, and `/marketplace/info` mirrors that surface for consistency. Any downstream tooling that parsed the `name` field expecting the slug-prefixed format must now read `prefixed_name`. (#133) ### Fixed +- **`/plugin` UI in Claude Code rendered "Plugin not found in marketplace" in the Components panel** for every plugin Agnes served, even though agents/skills/commands loaded correctly under the plugin's own namespace. Root cause: the synthetic `.claude-plugin/marketplace.json` listed each plugin under a slug-prefixed `name` (`-`) while the plugin's authoritative `.claude-plugin/plugin.json` kept the original name. Claude Code resolves the loaded plugin back to its catalog entry by `plugin.json` name, so the lookup missed every entry. The synth manifest now reads the plugin's authoritative name from `/.claude-plugin/plugin.json` (falling back to the upstream marketplace.json's `name` when the plugin manifest is absent or unreadable). The directory layout under `plugins/-/...` keeps the prefix so two upstream marketplaces that ship a same-named plugin still get distinct on-disk paths in the ZIP / git tree — their catalog entries will then collide under the same `name`, which is the correct surface (admin RBAC decides which upstream wins, same as if a user added both upstream marketplaces directly to Claude Code). `/marketplace/info` now exposes `prefixed_name` alongside `name` so operators can still disambiguate cross-marketplace shadowing. (#133) -- **`/plugin` UI in Claude Code rendered "Plugin not found in marketplace" in the Components panel** for every plugin Agnes served, even though agents/skills/commands loaded correctly under the plugin's own namespace. Root cause: the synthetic `.claude-plugin/marketplace.json` listed each plugin under a slug-prefixed `name` (`-`) while the plugin's authoritative `.claude-plugin/plugin.json` kept the original name. Claude Code resolves the loaded plugin back to its catalog entry by `plugin.json` name, so the lookup missed every entry. The synth manifest now reads the plugin's authoritative name from `/.claude-plugin/plugin.json` (falling back to the upstream marketplace.json's `name` when the plugin manifest is absent or unreadable). The directory layout under `plugins/-/...` keeps the prefix so two upstream marketplaces that ship a same-named plugin still get distinct on-disk paths in the ZIP / git tree — their catalog entries will then collide under the same `name`, which is the correct surface (admin RBAC decides which upstream wins, same as if a user added both upstream marketplaces directly to Claude Code). `/marketplace/info` now exposes `prefixed_name` alongside `name` so operators can still disambiguate cross-marketplace shadowing. +### Internal +- `src/scheduler.py` now exports `is_valid_schedule(s)` and `filter_due_tables(configs, sync_state_repo)` for reuse across the sync filter, the script runner, and Pydantic validators. +- `ScriptRepository` gains `claim_for_run(script_id)` and `record_run_result(script_id, status)` — the atomic primitives for the scheduled-script execution path. `claim_for_run` uses `UPDATE … WHERE last_status IS DISTINCT FROM 'running' RETURNING id` for race-free claim. +- `services/scheduler/__main__.py` JOBS list refactored to a `build_jobs()` factory that reads + validates env at startup. + +### Known limitations +- **Stuck `last_status='running'`**: a scheduled script whose BackgroundTask crashes mid-run (process killed, OOM, gateway timeout) stays claimed forever. Recovery: `UPDATE script_registry SET last_status = NULL WHERE id = ?` from a DuckDB shell. Auto-recovery via max-runtime detection is intentionally out of scope for v0.19.0; revisit if it bites in practice. +- **Schedule quantization rounds up**: `SCHEDULER_*_INTERVAL` accepts seconds but the underlying schedule grammar is minute-grained. Non-multiples of 60 round UP to the next minute (90 s → `every 2m`, never `every 1m`) so a job never fires more often than the operator configured. Sub-minute values clamp to `every 1m`. Documented in `docs/DEPLOYMENT.md` → Scheduler tuning. ## [0.18.0] — 2026-04-29 diff --git a/app/api/admin.py b/app/api/admin.py index a0c53f3..64c8395 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -25,6 +25,7 @@ from src.identifier_validation import ( is_safe_quoted_identifier as _is_safe_quoted_identifier, ) from src.sql_safe import is_safe_project_id as _is_safe_project_id +from src.scheduler import is_valid_schedule logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/admin", tags=["admin"]) @@ -677,6 +678,22 @@ class RegisterTableRequest(BaseModel): ) return v + @field_validator("sync_schedule", mode="before") + @classmethod + def _validate_sync_schedule(cls, v): + # None / "" → no schedule, accepted. + # Any non-empty string (including pure whitespace) must parse as a + # valid schedule — otherwise it would be persisted and silently + # ignored by the runtime evaluator. + if v in (None, ""): + return v + if not is_valid_schedule(v): + raise ValueError( + f"sync_schedule must be 'every Nm' / 'every Nh' / " + f"'daily HH:MM[,HH:MM,...]', got {v!r}" + ) + return v + def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: """Enforce BQ-specific shape on a register/precheck request. @@ -794,6 +811,25 @@ class UpdateTableRequest(BaseModel): def _coerce_primary_key(cls, v): return _normalize_primary_key(v) + # Duplicated from RegisterTableRequest — Pydantic v2 validators don't + # inherit cleanly across unrelated BaseModel classes; a shared mixin + # would be overkill for two fields. + @field_validator("sync_schedule", mode="before") + @classmethod + def _validate_sync_schedule(cls, v): + # None / "" → no schedule, accepted. + # Any non-empty string (including pure whitespace) must parse as a + # valid schedule — otherwise it would be persisted and silently + # ignored by the runtime evaluator. + if v in (None, ""): + return v + if not is_valid_schedule(v): + raise ValueError( + f"sync_schedule must be 'every Nm' / 'every Nh' / " + f"'daily HH:MM[,HH:MM,...]', got {v!r}" + ) + return v + class ConfigureRequest(BaseModel): data_source: str # "keboola" | "bigquery" | "local" diff --git a/app/api/scripts.py b/app/api/scripts.py index 3f10e16..261e27b 100644 --- a/app/api/scripts.py +++ b/app/api/scripts.py @@ -5,15 +5,18 @@ import subprocess import sys import tempfile import uuid -from fastapi import APIRouter, Depends, HTTPException -from pydantic import BaseModel +from datetime import datetime, timezone +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from pydantic import BaseModel, field_validator from typing import Optional import duckdb from app.auth.access import require_admin from app.auth.dependencies import _get_db +from src.db import get_system_db from src.repositories.notifications import ScriptRepository +from src.scheduler import is_valid_schedule, is_table_due router = APIRouter(prefix="/api/scripts", tags=["scripts"]) @@ -26,6 +29,22 @@ class DeployScriptRequest(BaseModel): source: str schedule: Optional[str] = None + @field_validator("schedule", mode="before") + @classmethod + def _validate_schedule(cls, v): + if v in (None, ""): + return None + # Pure-whitespace strings (" ") fall through to is_valid_schedule + # and reject — same convention as RegisterTableRequest.sync_schedule. + # We do NOT silently normalise whitespace to None; surfacing the + # caller's mistake at register time beats persisting an unusable value. + if not is_valid_schedule(v): + raise ValueError( + f"schedule must be 'every Nm' / 'every Nh' / " + f"'daily HH:MM[,HH:MM,...]', got {v!r}" + ) + return v + class RunScriptRequest(BaseModel): name: Optional[str] = None @@ -56,7 +75,14 @@ async def deploy_script( user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Deploy a Python script to be run on the server (optionally on schedule). Admin-only.""" + """Deploy a Python script to be run on the server (optionally on schedule). Admin-only. + + Validates the source against the safety blocklist BEFORE persisting — + closes the Devin claim-fail-retry loop where a script with blocked + patterns would land in script_registry, fail every scheduler tick, and + re-claim itself perpetually. + """ + _validate_script_source(request.source) repo = ScriptRepository(conn) script_id = str(uuid.uuid4()) repo.deploy( @@ -109,13 +135,91 @@ async def undeploy_script( repo.undeploy(script_id) -def _execute_script(source: str, name: str) -> dict: - """Execute a Python script in a sandboxed subprocess. +@router.post("/run-due") +async def run_due_scripts( + background_tasks: BackgroundTasks, + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Run every deployed script whose ``schedule`` says it is due. - The blocklist below is defense-in-depth, not a primary trust boundary. - The role gate on the route (admin-only) is the actual boundary; the - blocklist catches obvious mistakes, not a hostile admin.""" - # Comprehensive safety checks — block dangerous patterns + Iterates ``script_registry``, skips rows without a schedule (those run + only via explicit POST /{id}/run), evaluates ``is_table_due(schedule, + last_run)``, and atomically claims each due row via + ``ScriptRepository.claim_for_run``. Execution is queued as a + ``BackgroundTask`` so the response returns immediately — the sidecar + must not block waiting on a long-running script. + + Concurrency: ``claim_for_run`` flips ``last_status`` to ``'running'`` + inside the same UPDATE; a script already in that state is skipped on + subsequent ticks until the BackgroundTask writes a terminal status via + ``record_run_result``. There is no max-runtime detection in this PR — + if a BackgroundTask crashes without writing a terminal status, the + script stays stuck in ``'running'`` until an operator clears it + manually (``UPDATE script_registry SET last_status = NULL WHERE id = + ?``). Documenting this as an accepted v0 limitation; revisit if it + bites in practice. + """ + repo = ScriptRepository(conn) + claimed: list[str] = [] + for script in repo.list_all(): + schedule = script.get("schedule") + if not schedule: + continue + last_run = script.get("last_run") + last_run_iso = last_run.isoformat() if last_run else None + if not is_table_due(schedule, last_run_iso): + continue + if not repo.claim_for_run(script["id"]): + # Lost the race / already running — next tick will retry. + continue + claimed.append(script["id"]) + background_tasks.add_task( + _run_claimed_script, + script_id=script["id"], + source=script["source"], + name=script["name"], + ) + return {"claimed": claimed, "count": len(claimed)} + + +def _run_claimed_script(script_id: str, source: str, name: str) -> None: + """Execute a previously-claimed script and write the terminal status. + + Runs in a FastAPI BackgroundTask, so it owns its own DB connection + (the request-scoped conn is already gone by the time this fires). + ``_execute_script`` only raises on safety-check violations — runtime + failures (non-zero exit code, ``subprocess.TimeoutExpired`` → exit -1) + are returned in the result dict, so we must inspect ``exit_code`` to + decide success vs failure rather than treating "no exception" as + success. Any exception still writes 'failure' and re-raises so the BG + handler surfaces the traceback in logs. + """ + # Fresh connection for the background task — the request-scoped conn + # was returned to FastAPI by the time this fires. + bg_conn = get_system_db() + try: + bg_repo = ScriptRepository(bg_conn) + try: + result = _execute_script(source, name) + status = "success" if result.get("exit_code", 1) == 0 else "failure" + bg_repo.record_run_result(script_id, status=status) + except Exception: + bg_repo.record_run_result(script_id, status="failure") + raise + finally: + bg_conn.close() + + +def _validate_script_source(source: str) -> None: + """Reject scripts containing blocked imports / patterns. + + Raises HTTPException(400) on any violation. Called from BOTH the deploy + endpoint (so bad scripts never land in script_registry — closes the + Devin claim-fail-retry loop where the scheduler would re-claim and + re-fail a deployed-but-unrunnable script every tick) and from + ``_execute_script`` as defense-in-depth. + """ blocked_patterns = [ # Direct imports of dangerous modules "import subprocess", "from subprocess", @@ -194,6 +298,16 @@ def _execute_script(source: str, name: str) -> dict: detail=f"Script contains disallowed pattern: {pattern.split('(')[0].strip()}", ) + +def _execute_script(source: str, name: str) -> dict: + """Execute a Python script in a sandboxed subprocess. + + Defense-in-depth: re-runs ``_validate_script_source`` even though the + deploy endpoint already validates. A bad script reaching this path would + indicate a registry write that bypassed the deploy contract; reject it + rather than spawn a subprocess. + """ + _validate_script_source(source) data_dir = os.environ.get("DATA_DIR", "./data") with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: diff --git a/app/api/sync.py b/app/api/sync.py index e62761a..6744ce9 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -3,6 +3,7 @@ import hashlib import logging import os +import subprocess import traceback from datetime import datetime, timezone from pathlib import Path @@ -19,6 +20,7 @@ from src.repositories.sync_state import SyncStateRepository from src.repositories.sync_settings import SyncSettingsRepository, DatasetPermissionRepository from src.repositories.table_registry import TableRegistryRepository from src.rbac import can_access_table +from src.scheduler import filter_due_tables logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/sync", tags=["sync"]) @@ -42,32 +44,50 @@ def _run_sync(tables: Optional[List[str]] = None): This avoids DuckDB lock conflicts — subprocess never opens system.duckdb. """ import json as _json - import subprocess import sys try: from app.instance_config import get_data_source_type, get_value from src.db import get_system_db - from src.repositories.table_registry import TableRegistryRepository source_type = get_data_source_type() data_dir = _get_data_dir() # Read table configs in main process (has shared DuckDB connection) sys_conn = get_system_db() + # Track whether the REGISTRY (not the post-filter list) was empty. + # Auto-discovery must only fire on a truly empty registry; if the + # filter returned [] because nothing was due, re-discovering would + # bypass the schedule entirely on Keboola instances. (Devin BUG_0001 + # on ebb8cc9.) + registry_has_tables = False try: repo = TableRegistryRepository(sys_conn) if tables: + # Manual operator override — bypass schedule filter entirely + # so an admin saying "sync these specific tables now" wins. all_configs = [repo.get(t) for t in tables] table_configs = [c for c in all_configs if c is not None] + registry_has_tables = bool(table_configs) else: table_configs = repo.list_local(source_type) if source_type else repo.list_local() + registry_has_tables = bool(table_configs) + # Without this filter, every scheduler tick would re-sync + # every table regardless of its sync_schedule cadence, + # making the field a no-op at trigger time. Tables with + # no schedule pass through unchanged (opt-in feature). + state_repo = SyncStateRepository(sys_conn) + table_configs = filter_due_tables(table_configs, state_repo) finally: sys_conn.close() if not table_configs: - # Auto-discover tables on first sync when registry is empty - if source_type == "keboola" and os.environ.get("KEBOOLA_STORAGE_TOKEN"): + # Auto-discover tables on first sync when registry is empty. + # `not registry_has_tables` is the load-bearing guard — without + # it, "filter excluded everything" looks identical to "registry + # empty" and we'd re-discover + re-sync every tick regardless of + # sync_schedule. + if not registry_has_tables and source_type == "keboola" and os.environ.get("KEBOOLA_STORAGE_TOKEN"): logger.info("No tables registered — running auto-discovery from Keboola") try: from app.api.admin import _discover_and_register_tables diff --git a/config/.env.template b/config/.env.template index 0d63274..3fc8f7f 100644 --- a/config/.env.template +++ b/config/.env.template @@ -45,6 +45,14 @@ SESSION_SECRET= # python -c "import secrets; print(secrets.token_he # LOG_LEVEL=info # debug, info, warning, error # CORS_ORIGINS=http://localhost:3000,http://localhost:8000 +# ── SCHEDULER (sidecar tuning) ────────────────────── +# All values are in seconds and must be positive integers. SCHEDULER_TICK_SECONDS +# must be <= the smallest job interval below. +# SCHEDULER_DATA_REFRESH_INTERVAL=900 # default 15 min — POST /api/sync/trigger +# SCHEDULER_HEALTH_CHECK_INTERVAL=300 # default 5 min — GET /api/health +# SCHEDULER_SCRIPT_RUN_INTERVAL=60 # default 1 min — POST /api/scripts/run-due +# SCHEDULER_TICK_SECONDS=30 # default 30 s — loop polling cadence + # ── HTTPS / REVERSE PROXY ─────────────────────────── # Set these when the app runs behind a TLS terminator (Caddy, Cloudflare # Tunnel, nginx, GCP LB, etc.). The app itself speaks plain HTTP on :8000; diff --git a/config/instance.yaml.example b/config/instance.yaml.example index d0623a3..63462c7 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -130,6 +130,10 @@ data_source: # url: "https://your-catalog.example.com" # token: "${OPENMETADATA_TOKEN}" # JWT bearer token # cache_ttl_seconds: 3600 # Cache TTL in seconds +# verify_ssl: true # set to false ONLY for internal +# # CAs / self-signed certs; defaults +# # to true. Setting false ships the +# # JWT over an unverified channel. # --- Email delivery (optional, for magic link auth) --- # Without SMTP, magic links are shown directly in browser (development mode). diff --git a/connectors/openmetadata/client.py b/connectors/openmetadata/client.py index 6f4e0ac..fb36a27 100644 --- a/connectors/openmetadata/client.py +++ b/connectors/openmetadata/client.py @@ -11,13 +11,9 @@ Low-level HTTP wrapper for OpenMetadata REST API with these functions: import json import logging from typing import Dict, List, Optional, Any -import warnings import httpx -# Suppress SSL warnings for self-signed certificates -warnings.filterwarnings("ignore", message="Unverified HTTPS request") - logger = logging.getLogger(__name__) @@ -36,6 +32,7 @@ class OpenMetadataClient: base_url: str, token: str, timeout: int = 30, + verify: bool | str = True, ): """ Initialize OpenMetadata API client. @@ -44,6 +41,12 @@ class OpenMetadataClient: base_url: Base URL of OpenMetadata instance (e.g., "https://catalog.example.com") token: JWT bearer token for authentication timeout: HTTP request timeout in seconds + verify: TLS verification — True (default), False to disable + (e.g., for self-signed certificates on internal CAs), or a + path to a CA bundle. The previous version hardcoded False + globally and suppressed warnings — both removed in #89. + Operators with self-signed certs should pass an explicit + ``verify=False`` or a CA bundle path from their config. """ self.base_url = base_url.rstrip("/") self.token = token @@ -55,7 +58,7 @@ class OpenMetadataClient: "Content-Type": "application/json", }, timeout=timeout, - verify=False, # Allow self-signed certificates (internal networks) + verify=verify, ) def get_table(self, fqn: str) -> Dict[str, Any]: diff --git a/connectors/openmetadata/enricher.py b/connectors/openmetadata/enricher.py index b30ef45..8913ddc 100644 --- a/connectors/openmetadata/enricher.py +++ b/connectors/openmetadata/enricher.py @@ -103,6 +103,7 @@ class CatalogEnricher: url = om_config.get("url", "").strip() token = om_config.get("token", "").strip() cache_ttl = om_config.get("cache_ttl_seconds", 3600) + verify_ssl = om_config.get("verify_ssl", True) if not url or not token: logger.debug( @@ -111,7 +112,9 @@ class CatalogEnricher: return self._cache_ttl_seconds = cache_ttl - self._client = OpenMetadataClient(base_url=url, token=token) + self._client = OpenMetadataClient( + base_url=url, token=token, verify=verify_ssl, + ) self.enabled = True logger.info( diff --git a/docs/DEPLOYMENT.md b/docs/DEPLOYMENT.md index f59ccf6..44c32fd 100644 --- a/docs/DEPLOYMENT.md +++ b/docs/DEPLOYMENT.md @@ -157,6 +157,45 @@ Two health endpoints serve different audiences: The Docker Compose `healthcheck` uses the minimal endpoint (`curl -sf http://localhost:8000/api/health`). For external monitoring tools (Datadog, Prometheus, UptimeRobot, etc.) that need service-level detail (DuckDB status, sync freshness, user count), point them at `/api/health/detailed` with an `Authorization: Bearer ` header. Any authenticated user can call it; a personal access token (`da admin create-pat`) works well for service accounts. +### Scheduler tuning + +The scheduler sidecar (`services/scheduler/__main__.py`) fires periodic +HTTP calls against the main app. Job cadences are configurable via env +vars on the scheduler container: + +| Env var | Default | Purpose | +| ---------------------------------- | ------- | --------------------------------------------- | +| `SCHEDULER_DATA_REFRESH_INTERVAL` | `900` | seconds between `POST /api/sync/trigger` | +| `SCHEDULER_HEALTH_CHECK_INTERVAL` | `300` | seconds between `GET /api/health` | +| `SCHEDULER_SCRIPT_RUN_INTERVAL` | `60` | seconds between `POST /api/scripts/run-due` | +| `SCHEDULER_TICK_SECONDS` | `30` | loop polling cadence; must be ≤ smallest interval above | + +`/api/sync/trigger` walks `table_registry`; tables with a per-row +`sync_schedule` (`every Nm` / `every Nh` / `daily HH:MM[,...]`) are +filtered to only those due for sync since their last run. Tables without +a schedule continue to run on every tick. The marketplace job runs at +`daily 03:00` UTC and is not currently env-tunable. + +`/api/scripts/run-due` walks `script_registry` and runs each deployed +script whose `schedule` says it is due. Scripts in the `running` state +are skipped on subsequent ticks until the previous run writes a terminal +status. The endpoint requires admin auth (the sidecar's +`SCHEDULER_API_TOKEN` resolves to a synthetic Admin user). + +#### Caveats + +- **Schedule quantization rounds up.** The schedule grammar has minute- + level resolution. Non-multiples of 60 seconds round UP to the next + minute (`SCHEDULER_DATA_REFRESH_INTERVAL=90` → `every 2m`, not `every 1m`) + so a job never fires more often than configured. Sub-minute values + clamp to `every 1m`. Use multiples of 60 for predictable cadence. +- **A crashed BackgroundTask can leave a script stuck in `last_status='running'`.** + The next sidecar tick will skip the stuck script forever. Recovery is + manual: open a DuckDB shell on `system.duckdb` and run + `UPDATE script_registry SET last_status = NULL WHERE id = '';` + Auto-recovery via max-runtime detection is intentionally out of scope + for v0; revisit if it happens in practice. + ## Which path should I pick? | | Terraform | Docker Compose | diff --git a/docs/superpowers/plans/2026-04-29-issues-77-78-79-89.md b/docs/superpowers/plans/2026-04-29-issues-77-78-79-89.md new file mode 100644 index 0000000..ae5d3fb --- /dev/null +++ b/docs/superpowers/plans/2026-04-29-issues-77-78-79-89.md @@ -0,0 +1,1601 @@ +# Issues #77, #78, #79, #89 — Re-wire Scheduler + TLS Hardening + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Honor per-table `sync_schedule` and per-script `schedule` at runtime (Option A — re-implement); make sidecar job intervals operator-tunable; stop disabling TLS verification globally in the OpenMetadata client. + +**Architecture:** + +- **API-side filter** for `table_registry.sync_schedule` (#79). New helper `filter_due_tables()` in `src/scheduler.py` is called from `app/api/sync.py:_run_sync()` after `repo.list_local()`. Tables with no schedule keep current "always sync" behavior (opt-in feature). Manual `POST /api/sync/trigger {"tables": [...]}` bypasses the filter (operator override always wins). +- **Server-side runner endpoint** `POST /api/scripts/run-due` (#78). The sidecar fires the endpoint on a configurable cadence; the API claims due scripts atomically (`last_status='running'` UPDATE … RETURNING), runs each via existing `_execute_script` in BackgroundTasks, and writes `last_run` + `last_status` on completion. Concurrency: a script in `running` state is skipped on the next tick. +- **Env-driven sidecar JOBS** (#77). Three documented env overrides for the existing two interval jobs + tick, plus a fourth for the new script-runner job (attributed to #78 in changelog). Marketplaces stays hardcoded — outside #77 scope. +- **TLS verify by default** in `OpenMetadataClient` (#89). Mirror the `connectors/llm/openai_compat.py` pattern: `verify: bool | str = True` constructor parameter; drop the module-level `warnings.filterwarnings`. + +No DuckDB schema migration required — all touched columns (`table_registry.sync_schedule`, `script_registry.schedule/last_run/last_status`) already exist in v17. + +**Tech Stack:** Python 3.11+, FastAPI, Pydantic v2, DuckDB, httpx. Pytest for tests. + +**Out of scope (intentional):** +- Issue #68 (Stop hook output field) — no Stop hook source lives in this OSS repo as of HEAD; the referenced TODO.md no longer exists. Needs clarification from the issue author before implementation. +- Per-script concurrency beyond "skip if running" (no queue, no max-runtime detection). +- Operator-defined custom sidecar jobs (would land in `instance.yaml` per #77's "Future option"). + +--- + +## File Structure + +**New files:** +- `tests/test_run_due_scripts.py` — tests for the new `/api/scripts/run-due` endpoint and the `claim_for_run`/`record_run_result` repo methods. +- `tests/test_sync_filter.py` — tests for `filter_due_tables()` and `is_valid_schedule()`. + +**Modified files:** +- `src/scheduler.py` — add `is_valid_schedule(schedule) -> bool` and `filter_due_tables(table_configs, sync_state_repo) -> list[dict]`. +- `app/api/sync.py` — wire `filter_due_tables()` into `_run_sync()`. +- `app/api/admin.py` — Pydantic `field_validator` on `RegisterTableRequest.sync_schedule` and `UpdateTableRequest.sync_schedule` (reject malformed strings with 422). +- `src/repositories/notifications.py` — extend `ScriptRepository` with `claim_for_run(script_id)` and `record_run_result(script_id, status)`. +- `app/api/scripts.py` — add `POST /api/scripts/run-due` endpoint; Pydantic `field_validator` on `DeployScriptRequest.schedule`. +- `services/scheduler/__main__.py` — env-driven `JOBS` list with validation; add 4th `script-runner` job. +- `connectors/openmetadata/client.py` — add `verify` constructor param; drop module-level `warnings.filterwarnings`. +- `config/.env.template` — document the four new `SCHEDULER_*` env vars. +- `docs/DEPLOYMENT.md` — new "Scheduler tuning" subsection covering the env vars. +- `CHANGELOG.md` — entries under a new `[0.19.0]` section. +- `pyproject.toml` — bump `version` to `0.19.0`. + +**Untouched (intentionally):** +- `src/db.py` — schema unchanged. `script_registry.last_run` was always nullable; we just start writing to it. +- `tests/test_scheduler*.py` — keep as-is. `is_table_due` is the reusable primitive both `filter_due_tables` and the script runner build on. + +--- + +## Pre-flight + +- [ ] **Step P-1: Confirm worktree branch and clean state** + +```bash +git status +git branch --show-current +``` + +Expected: clean tree on `worktree-issues-68-77-78-79-89` (or whatever this worktree's branch is). + +- [ ] **Step P-2: Confirm test suite is green at HEAD** + +```bash +pytest tests/test_scheduler.py tests/test_sync_manifest.py tests/test_scripts_api.py -v 2>&1 | tail -30 +``` + +Expected: all green. If any are red at HEAD, stop and investigate before adding new tests. + +--- + +## Task 1: `src/scheduler.py` — add `is_valid_schedule` and `filter_due_tables` + +**Files:** +- Modify: `src/scheduler.py` (add two new functions at the end of the module) +- Create: `tests/test_sync_filter.py` + +**Why this first:** Both #79 (table sync) and #78 (scripts) reuse `is_valid_schedule` for Pydantic validation. `filter_due_tables` is the pure helper #79 wires into `_run_sync()`. Pure-function unit tests; no FastAPI / DuckDB plumbing yet. + +- [ ] **Step 1.1: Write the failing test file** + +Create `tests/test_sync_filter.py`: + +```python +"""Tests for the schedule-validity helper and the per-table due-filter.""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from src.scheduler import filter_due_tables, is_valid_schedule + + +# ---------------- is_valid_schedule ----------------------------------------- + +@pytest.mark.parametrize("schedule", [ + "every 15m", + "every 1h", + "every 6h", + "daily 05:00", + "daily 07:00,13:00,18:00", +]) +def test_is_valid_schedule_accepts_documented_formats(schedule): + assert is_valid_schedule(schedule) is True + + +@pytest.mark.parametrize("schedule", [ + "", + "every", + "every 0m", # zero is not a positive interval + "every 15s", # seconds not supported + "daily", + "daily 25:00", # invalid hour + "daily 12:60", # invalid minute + "daily 12:00,", # trailing comma + "hourly", # unknown keyword + "every -5m", # negative +]) +def test_is_valid_schedule_rejects_malformed_strings(schedule): + assert is_valid_schedule(schedule) is False + + +def test_is_valid_schedule_treats_none_as_invalid(): + # None is "no schedule" — callers handle that case before validating. + # The validator is for non-null strings only. + assert is_valid_schedule(None) is False # type: ignore[arg-type] + + +# ---------------- filter_due_tables ----------------------------------------- + +class _FakeSyncStateRepo: + """Stub SyncStateRepository — returns last_sync per table_id.""" + + def __init__(self, last_syncs: dict[str, datetime | None]): + self._data = last_syncs + + def get_last_sync(self, table_id: str): + return self._data.get(table_id) + + +def _utc(year, month, day, hour=0, minute=0): + return datetime(year, month, day, hour, minute, tzinfo=timezone.utc) + + +def test_filter_due_tables_passes_through_unscheduled_tables(): + """Tables with sync_schedule=None are always due (opt-in feature).""" + configs = [ + {"id": "t1", "name": "t1", "sync_schedule": None}, + {"id": "t2", "name": "t2", "sync_schedule": ""}, + ] + repo = _FakeSyncStateRepo({}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["t1", "t2"] + + +def test_filter_due_tables_drops_table_within_interval(): + """A table on 'every 1h' synced 30m ago is NOT due.""" + configs = [{"id": "fast", "name": "fast", "sync_schedule": "every 1h"}] + repo = _FakeSyncStateRepo({"fast": _utc(2026, 5, 1, 9, 30)}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert out == [] + + +def test_filter_due_tables_keeps_table_past_interval(): + """A table on 'every 1h' synced 90m ago IS due.""" + configs = [{"id": "fast", "name": "fast", "sync_schedule": "every 1h"}] + repo = _FakeSyncStateRepo({"fast": _utc(2026, 5, 1, 8, 30)}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["fast"] + + +def test_filter_due_tables_keeps_never_synced_table(): + """No last_sync row → always due (matches is_table_due semantics).""" + configs = [{"id": "new", "name": "new", "sync_schedule": "every 1h"}] + repo = _FakeSyncStateRepo({}) # no entry at all + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["new"] + + +def test_filter_due_tables_treats_invalid_schedule_as_unscheduled(): + """Garbled sync_schedule: log + always sync (don't silently skip).""" + configs = [{"id": "bad", "name": "bad", "sync_schedule": "BOGUS"}] + repo = _FakeSyncStateRepo({"bad": _utc(2026, 5, 1, 9, 59)}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["bad"] + + +def test_filter_due_tables_mixed_due_and_skipped(): + configs = [ + {"id": "due", "name": "due", "sync_schedule": "every 30m"}, + {"id": "skipped", "name": "skipped", "sync_schedule": "every 30m"}, + {"id": "always", "name": "always", "sync_schedule": None}, + ] + repo = _FakeSyncStateRepo({ + "due": _utc(2026, 5, 1, 9, 0), # 60m ago → due + "skipped": _utc(2026, 5, 1, 9, 50), # 10m ago → skip + }) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert sorted(c["id"] for c in out) == ["always", "due"] + + +def test_filter_due_tables_handles_naive_last_sync(): + """SyncStateRepository can return naive datetimes from older rows; helper + must coerce to UTC instead of crashing on tz-aware vs naive comparison.""" + configs = [{"id": "old", "name": "old", "sync_schedule": "every 1h"}] + naive_2h_ago = datetime(2026, 5, 1, 8, 0) # no tzinfo + repo = _FakeSyncStateRepo({"old": naive_2h_ago}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["old"] +``` + +- [ ] **Step 1.2: Run tests — expect ImportError** + +```bash +pytest tests/test_sync_filter.py -v 2>&1 | tail -10 +``` + +Expected: ImportError on `from src.scheduler import filter_due_tables, is_valid_schedule` — those symbols don't exist yet. + +- [ ] **Step 1.3: Implement `is_valid_schedule` and `filter_due_tables`** + +Append to `src/scheduler.py` (after `_parse_timestamp`): + +```python +def is_valid_schedule(schedule: Optional[str]) -> bool: + """Return True iff ``schedule`` parses as a documented schedule string. + + Accepted forms (mirroring the rest of this module): + - ``"every Nm"`` / ``"every Nh"`` with N a positive integer + - ``"daily HH:MM"`` (24-h, UTC) optionally comma-separated: + ``"daily 07:00,13:00"`` + + Anything else — including ``None``, empty string, or a parseable-looking + but out-of-range value (``"daily 25:00"``) — returns False. Pydantic + validators on the admin API call this to reject malformed input with + 422 instead of accepting it and silently no-op'ing later. + """ + if not schedule or not isinstance(schedule, str): + return False + interval = parse_interval_minutes(schedule) + if interval is not None: + return interval > 0 + match = DAILY_PATTERN.match(schedule) + if not match: + return False + return bool(_parse_daily_times(match.group(1))) + + +def filter_due_tables( + table_configs: list[dict], + sync_state_repo, + now: Optional[datetime] = None, +) -> list[dict]: + """Drop table configs whose ``sync_schedule`` says they are not due. + + Behaviour: + - ``sync_schedule`` is None / empty / not a valid string → table passes + through (no schedule = "sync on every tick", existing behaviour). + - Valid schedule + last_sync within the cadence → drop. + - Valid schedule + last_sync past cadence (or never) → keep. + - Invalid schedule string → log a warning and let the table through + (do NOT silently skip — operator surprise is worse than a redundant + sync). + + ``sync_state_repo`` is duck-typed: only ``get_last_sync(table_id)`` is + called, returning a ``datetime`` (tz-aware preferred, naive treated as + UTC) or ``None``. + """ + if now is None: + now = datetime.now(timezone.utc) + out: list[dict] = [] + for tc in table_configs: + schedule = tc.get("sync_schedule") + if not schedule: + out.append(tc) + continue + if not is_valid_schedule(schedule): + logger.warning( + "Table %s has malformed sync_schedule %r — syncing anyway " + "(fix the schedule string to suppress this message)", + tc.get("id") or tc.get("name"), + schedule, + ) + out.append(tc) + continue + last_sync = sync_state_repo.get_last_sync(tc.get("id") or tc.get("name")) + last_sync_iso: Optional[str] + if last_sync is None: + last_sync_iso = None + else: + if last_sync.tzinfo is None: + last_sync = last_sync.replace(tzinfo=timezone.utc) + last_sync_iso = last_sync.isoformat() + if is_table_due(schedule, last_sync_iso, now=now): + out.append(tc) + else: + logger.info( + "Table %s skipped: schedule=%r, last_sync=%s, not due yet", + tc.get("id") or tc.get("name"), + schedule, + last_sync_iso, + ) + return out +``` + +- [ ] **Step 1.4: Run tests — expect green** + +```bash +pytest tests/test_sync_filter.py -v 2>&1 | tail -25 +``` + +Expected: all green (parametrized cases included). + +- [ ] **Step 1.5: Commit** + +```bash +git add src/scheduler.py tests/test_sync_filter.py +git commit -m "feat(scheduler): add is_valid_schedule + filter_due_tables helpers (#79)" +``` + +--- + +## Task 2: Wire `filter_due_tables` into `_run_sync` + +**Files:** +- Modify: `app/api/sync.py:37-217` (the `_run_sync` function) +- Test: extend `tests/test_sync_filter.py` with one integration-style test that exercises `_run_sync`'s filter call site (mocking subprocess + orchestrator). + +**Why now:** With the helper green, the wiring is a 4-line change. Test stubs out the heavy machinery (subprocess, orchestrator) and asserts only the filter is invoked correctly. + +- [ ] **Step 2.1: Add the integration test** + +Append to `tests/test_sync_filter.py`: + +```python +# ---------------- _run_sync wiring ------------------------------------------ + +def test_run_sync_filters_local_tables_by_schedule(monkeypatch, tmp_path): + """`_run_sync(tables=None)` consults `filter_due_tables` and skips + tables that are not due. Manual override (`tables=[...]`) bypasses + the filter entirely.""" + from app.api import sync as sync_module + + # Stub get_data_source_type → 'keboola' so the keboola subprocess code + # path is taken (also matches the existing _run_sync shape). + monkeypatch.setattr( + sync_module, "_get_data_dir", lambda: tmp_path, + ) + import app.instance_config as instance_config + monkeypatch.setattr(instance_config, "get_data_source_type", lambda: "keboola") + + # Fake registry with one due + one skipped table. + fake_configs = [ + {"id": "due", "name": "due", "source_type": "keboola", + "sync_schedule": "every 30m", "query_mode": "local"}, + {"id": "skipped", "name": "skipped", "source_type": "keboola", + "sync_schedule": "every 30m", "query_mode": "local"}, + ] + + class _StubRegistry: + def __init__(self, conn): pass + def list_local(self, source_type=None): return list(fake_configs) + def get(self, table_id): + return next((c for c in fake_configs if c["id"] == table_id), None) + + monkeypatch.setattr( + "src.repositories.table_registry.TableRegistryRepository", + _StubRegistry, + ) + + # Fake sync_state: 'due' last synced 60m ago, 'skipped' 10m ago. + from datetime import datetime, timezone + last_syncs = { + "due": datetime(2026, 5, 1, 9, 0, tzinfo=timezone.utc), + "skipped": datetime(2026, 5, 1, 9, 50, tzinfo=timezone.utc), + } + + class _StubState: + def __init__(self, conn): pass + def get_last_sync(self, table_id): return last_syncs.get(table_id) + + monkeypatch.setattr( + "src.repositories.sync_state.SyncStateRepository", + _StubState, + ) + + # Freeze 'now' inside src.scheduler.filter_due_tables. We do this by + # monkeypatching filter_due_tables itself to inject `now=`. + from src import scheduler as _sched + real_filter = _sched.filter_due_tables + monkeypatch.setattr( + sync_module, "filter_due_tables", + lambda cfgs, repo: real_filter( + cfgs, repo, now=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), + ), + ) + + # Capture the configs that subprocess.run sees (via stdin payload). + captured = {} + + def _fake_run(cmd, input, capture_output, text, timeout, env, cwd): + import json as _json + captured["configs"] = _json.loads(input) + class _R: + returncode = 0 + stdout = "{}" + stderr = "" + return _R() + + monkeypatch.setattr(sync_module.subprocess, "run", _fake_run) + + # Stub orchestrator + profiler imports inside the function so we don't + # require a real DuckDB analytics file. + import src.orchestrator as _orch_mod + + class _StubOrch: + def rebuild(self): return {} + + monkeypatch.setattr(_orch_mod, "SyncOrchestrator", _StubOrch) + + # Run with tables=None → filter applies → only 'due' goes to subprocess. + sync_module._run_sync(tables=None) + assert [c["id"] for c in captured["configs"]] == ["due"] + + # Run with explicit override → filter is BYPASSED → both go through. + captured.clear() + sync_module._run_sync(tables=["due", "skipped"]) + assert sorted(c["id"] for c in captured["configs"]) == ["due", "skipped"] +``` + +- [ ] **Step 2.2: Run the test — expect FAIL** + +```bash +pytest tests/test_sync_filter.py::test_run_sync_filters_local_tables_by_schedule -v 2>&1 | tail -20 +``` + +Expected: AssertionError — `captured["configs"]` contains both tables in the first assertion (filter not yet wired in). + +- [ ] **Step 2.3: Wire `filter_due_tables` into `_run_sync`** + +In `app/api/sync.py`, add the import near the top of `_run_sync` (line 50ish): + +```python +from src.scheduler import filter_due_tables +from src.repositories.sync_state import SyncStateRepository +``` + +Replace lines 56-66 (the registry-read block) with: + +```python + # Read table configs in main process (has shared DuckDB connection) + sys_conn = get_system_db() + try: + repo = TableRegistryRepository(sys_conn) + if tables: + # Manual operator override — bypass schedule filter entirely + # so an admin saying "sync these specific tables now" wins. + all_configs = [repo.get(t) for t in tables] + table_configs = [c for c in all_configs if c is not None] + else: + table_configs = repo.list_local(source_type) if source_type else repo.list_local() + # #79: drop tables whose sync_schedule says they are not due. + # Tables without a schedule pass through (opt-in feature). + state_repo = SyncStateRepository(sys_conn) + table_configs = filter_due_tables(table_configs, state_repo) + finally: + sys_conn.close() +``` + +(Leave the auto-discovery block immediately after unchanged; it only fires when `table_configs` is empty after filtering, which is consistent with prior semantics.) + +- [ ] **Step 2.4: Run the wiring test + the existing sync test — expect green** + +```bash +pytest tests/test_sync_filter.py tests/test_sync_manifest.py -v 2>&1 | tail -25 +``` + +Expected: green. The manifest test exercises a different code path; if it regresses, the import probably broke something — re-verify the import block. + +- [ ] **Step 2.5: Commit** + +```bash +git add app/api/sync.py tests/test_sync_filter.py +git commit -m "feat(sync): honor table_registry.sync_schedule at trigger time (#79)" +``` + +--- + +## Task 3: Pydantic format validators for `sync_schedule` + +**Files:** +- Modify: `app/api/admin.py` — add `field_validator` on `RegisterTableRequest.sync_schedule` and `UpdateTableRequest.sync_schedule`. +- Test: extend `tests/test_admin_bq_register.py` (or a sibling, depending on what the codebase calls the admin-register test file). + +**Why:** Once #79 honours the field, malformed values become operator-visible bugs ("I set `sync_schedule='hourly'` but it never skips"). Reject at register/update time with a clear 422. + +- [ ] **Step 3.1: Locate the right test file** + +```bash +grep -l "RegisterTableRequest\|/register-table" tests/ -r 2>/dev/null | head -3 +``` + +Use whatever file matches. Plan continues assuming `tests/test_admin_bq_register.py` exists (per the file listing in Task 0); adapt path if not. + +- [ ] **Step 3.2: Write the failing tests** + +Append to `tests/test_admin_bq_register.py`: + +```python +# --- sync_schedule format validation (#79) ---------------------------------- + +import pytest +from pydantic import ValidationError + +from app.api.admin import RegisterTableRequest, UpdateTableRequest + + +@pytest.mark.parametrize("schedule", [ + "every 15m", + "every 1h", + "daily 05:00", + "daily 07:00,13:00,18:00", + None, # explicit None is allowed (no schedule = always sync) +]) +def test_register_request_accepts_valid_sync_schedule(schedule): + req = RegisterTableRequest(name="orders", sync_schedule=schedule) + assert req.sync_schedule == schedule + + +@pytest.mark.parametrize("schedule", [ + "hourly", + "every 0m", + "daily 25:00", + "every 5x", + " ", +]) +def test_register_request_rejects_malformed_sync_schedule(schedule): + with pytest.raises(ValidationError) as exc_info: + RegisterTableRequest(name="orders", sync_schedule=schedule) + assert "sync_schedule" in str(exc_info.value) + + +@pytest.mark.parametrize("schedule", [ + "every 30m", + "daily 08:00", + None, +]) +def test_update_request_accepts_valid_sync_schedule(schedule): + req = UpdateTableRequest(sync_schedule=schedule) + assert req.sync_schedule == schedule + + +def test_update_request_rejects_malformed_sync_schedule(): + with pytest.raises(ValidationError): + UpdateTableRequest(sync_schedule="weekly") +``` + +- [ ] **Step 3.3: Run — expect FAIL** + +```bash +pytest tests/test_admin_bq_register.py -v -k sync_schedule 2>&1 | tail -20 +``` + +Expected: failures because malformed strings are accepted today. + +- [ ] **Step 3.4: Add the validators to `app/api/admin.py`** + +In `app/api/admin.py`, add the import near the top (next to other `src` imports, around line 27): + +```python +from src.scheduler import is_valid_schedule +``` + +In the `RegisterTableRequest` class (line 644), add this validator alongside the existing ones: + +```python + @field_validator("sync_schedule", mode="before") + @classmethod + def _validate_sync_schedule(cls, v): + # None / "" / pure-whitespace → no schedule, accepted. + # Any non-empty string must parse — otherwise it would be persisted + # and silently ignored by the runtime evaluator. + if v in (None, ""): + return v + if isinstance(v, str) and not v.strip(): + return None + if not is_valid_schedule(v): + raise ValueError( + f"sync_schedule must be 'every Nm' / 'every Nh' / " + f"'daily HH:MM[,HH:MM,...]', got {v!r}" + ) + return v +``` + +In the `UpdateTableRequest` class (line 780), add the same validator. (Duplication is intentional — the two models have separate field declarations and Pydantic v2 validators don't inherit cleanly across unrelated `BaseModel` classes. DRY-ing into a mixin is overkill for two fields.) + +- [ ] **Step 3.5: Run — expect green** + +```bash +pytest tests/test_admin_bq_register.py -v -k sync_schedule 2>&1 | tail -20 +``` + +Expected: green. + +- [ ] **Step 3.6: Commit** + +```bash +git add app/api/admin.py tests/test_admin_bq_register.py +git commit -m "feat(admin): validate sync_schedule format on register/update (#79)" +``` + +--- + +## Task 4: Extend `ScriptRepository` with `claim_for_run` and `record_run_result` + +**Files:** +- Modify: `src/repositories/notifications.py` — add two methods to `ScriptRepository`. +- Create: `tests/test_run_due_scripts.py` (will grow across Tasks 4–6). + +**Why:** Concurrency is "skip if running". `claim_for_run` is the atomic UPDATE that flips a script from idle → `running` and returns whether the caller actually owns the slot. `record_run_result` writes the post-execution status. + +- [ ] **Step 4.1: Write failing tests** + +Create `tests/test_run_due_scripts.py`: + +```python +"""Tests for the scheduled-script runner — repo claim/release primitives, +the run-due endpoint, and Pydantic validation on DeployScriptRequest.""" + +from datetime import datetime, timezone + +import pytest + +from src.db import get_system_db +from src.repositories.notifications import ScriptRepository + + +@pytest.fixture() +def conn(tmp_path, monkeypatch): + """Fresh system.duckdb in a tmp dir — uses real schema, no mocks.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state_dir = tmp_path / "state" + state_dir.mkdir(parents=True, exist_ok=True) + c = get_system_db() + yield c + c.close() + + +def _deploy(repo: ScriptRepository, script_id="s1", schedule="every 1h"): + repo.deploy(id=script_id, name=script_id, owner="u1", + schedule=schedule, source="print('hi')") + + +# ---------------- claim_for_run --------------------------------------------- + +def test_claim_for_run_succeeds_when_idle(conn): + repo = ScriptRepository(conn) + _deploy(repo) + assert repo.claim_for_run("s1") is True + row = repo.get("s1") + assert row["last_status"] == "running" + assert row["last_run"] is not None + + +def test_claim_for_run_fails_when_already_running(conn): + repo = ScriptRepository(conn) + _deploy(repo) + assert repo.claim_for_run("s1") is True + # Second claim should fail because last_status is still 'running'. + assert repo.claim_for_run("s1") is False + + +def test_claim_for_run_succeeds_after_completion(conn): + repo = ScriptRepository(conn) + _deploy(repo) + repo.claim_for_run("s1") + repo.record_run_result("s1", status="success") + # Now claimable again. + assert repo.claim_for_run("s1") is True + + +def test_claim_for_run_returns_false_for_unknown_script(conn): + repo = ScriptRepository(conn) + assert repo.claim_for_run("does-not-exist") is False + + +# ---------------- record_run_result ----------------------------------------- + +@pytest.mark.parametrize("status", ["success", "failure"]) +def test_record_run_result_writes_terminal_status(conn, status): + repo = ScriptRepository(conn) + _deploy(repo) + repo.claim_for_run("s1") + repo.record_run_result("s1", status=status) + row = repo.get("s1") + assert row["last_status"] == status + + +def test_record_run_result_rejects_running_as_terminal(conn): + """The 'running' string is reserved for claim_for_run; record_run_result + must reject it so a caller can't accidentally re-arm the running flag + instead of clearing it.""" + repo = ScriptRepository(conn) + _deploy(repo) + repo.claim_for_run("s1") + with pytest.raises(ValueError): + repo.record_run_result("s1", status="running") +``` + +- [ ] **Step 4.2: Run — expect FAIL** + +```bash +pytest tests/test_run_due_scripts.py -v 2>&1 | tail -25 +``` + +Expected: AttributeError on `claim_for_run` / `record_run_result`. + +- [ ] **Step 4.3: Add the methods to `ScriptRepository`** + +In `src/repositories/notifications.py`, after the existing `list_all` method (around line 105), add: + +```python + def claim_for_run(self, script_id: str) -> bool: + """Atomically set last_status='running' iff the script is idle. + + Returns True iff this caller is the new owner of the run slot. + Returns False if the script does not exist OR is already running. + + Implementation: UPDATE … WHERE last_status IS DISTINCT FROM 'running' + + RETURNING id. DuckDB supports IS DISTINCT FROM and RETURNING; if + zero rows come back, somebody else already owns the slot. + """ + now = datetime.now(timezone.utc) + result = self.conn.execute( + """UPDATE script_registry + SET last_status = 'running', last_run = ? + WHERE id = ? + AND (last_status IS NULL OR last_status != 'running') + RETURNING id""", + [now, script_id], + ).fetchone() + return result is not None + + def record_run_result(self, script_id: str, status: str) -> None: + """Write the terminal status of a finished run (clears 'running'). + + Accepts only 'success' or 'failure' — 'running' would re-arm the + flag instead of clearing it, defeating the purpose of the call. + """ + if status not in ("success", "failure"): + raise ValueError( + f"record_run_result: status must be 'success' or 'failure', " + f"got {status!r}" + ) + self.conn.execute( + "UPDATE script_registry SET last_status = ? WHERE id = ?", + [status, script_id], + ) +``` + +- [ ] **Step 4.4: Run — expect green** + +```bash +pytest tests/test_run_due_scripts.py -v 2>&1 | tail -25 +``` + +Expected: green. + +- [ ] **Step 4.5: Commit** + +```bash +git add src/repositories/notifications.py tests/test_run_due_scripts.py +git commit -m "feat(scripts): add claim_for_run + record_run_result to ScriptRepository (#78)" +``` + +--- + +## Task 5: `POST /api/scripts/run-due` endpoint + +**Files:** +- Modify: `app/api/scripts.py` — new endpoint + Pydantic validator on `DeployScriptRequest.schedule`. +- Test: extend `tests/test_run_due_scripts.py`. + +**Why:** This is the API surface the sidecar fires on every tick. It iterates `script_registry`, claims each due script, and queues execution in a `BackgroundTask` so the response returns immediately — the sidecar doesn't block on long-running scripts. + +- [ ] **Step 5.1: Write failing tests** + +Append to `tests/test_run_due_scripts.py`: + +```python +# ---------------- DeployScriptRequest.schedule validation ------------------- + +from pydantic import ValidationError + +from app.api.scripts import DeployScriptRequest + + +def test_deploy_request_accepts_valid_schedule(): + req = DeployScriptRequest(name="report", source="print(1)", schedule="every 1h") + assert req.schedule == "every 1h" + + +def test_deploy_request_accepts_no_schedule(): + req = DeployScriptRequest(name="report", source="print(1)") + assert req.schedule is None + + +def test_deploy_request_rejects_malformed_schedule(): + with pytest.raises(ValidationError): + DeployScriptRequest(name="report", source="print(1)", schedule="weekly") + + +# ---------------- /api/scripts/run-due endpoint ----------------------------- + +from fastapi.testclient import TestClient + +# Helper: mint a TestClient with admin auth bypass. The codebase uses +# `LOCAL_DEV_MODE=1` to short-circuit auth in tests; mirror existing test +# files (tests/test_scripts_api.py) for the canonical pattern. + +@pytest.fixture() +def client(monkeypatch, tmp_path): + monkeypatch.setenv("LOCAL_DEV_MODE", "1") + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + (tmp_path / "state").mkdir(parents=True, exist_ok=True) + from app.main import app + return TestClient(app) + + +def test_run_due_skips_scripts_without_schedule(client, monkeypatch): + """A script with schedule=NULL is never picked up by run-due (those + are run only via explicit POST /api/scripts/{id}/run).""" + monkeypatch.setattr( + "app.api.scripts._execute_script", + lambda src, name: {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False}, + ) + deploy = client.post( + "/api/scripts/deploy", + json={"name": "manual-only", "source": "print(1)"}, + ) + assert deploy.status_code == 201 + resp = client.post("/api/scripts/run-due") + assert resp.status_code == 200 + assert resp.json()["claimed"] == [] + + +def test_run_due_claims_due_scripts(client, monkeypatch): + """A script on 'every 1h' that has never run gets claimed and executed.""" + calls = [] + def _fake_exec(source, name): + calls.append(name) + return {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False} + monkeypatch.setattr("app.api.scripts._execute_script", _fake_exec) + deploy = client.post( + "/api/scripts/deploy", + json={"name": "report", "source": "print(1)", "schedule": "every 1h"}, + ) + script_id = deploy.json()["id"] + resp = client.post("/api/scripts/run-due") + assert resp.status_code == 200 + body = resp.json() + assert body["claimed"] == [script_id] + # BackgroundTasks runs synchronously inside TestClient, so the call + # has happened by now. + assert "report" in calls + + +def test_run_due_skips_scripts_already_running(client, monkeypatch): + """A script in 'running' state must not be re-claimed by a second + sidecar tick that arrives while the previous run is still going.""" + monkeypatch.setattr( + "app.api.scripts._execute_script", + # Simulate a slow run by NOT updating last_status — repo.claim_for_run + # already wrote 'running'; we leave it that way. + lambda src, name: {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False}, + ) + # Patch out record_run_result so the run never "completes". + monkeypatch.setattr( + "src.repositories.notifications.ScriptRepository.record_run_result", + lambda self, *a, **kw: None, + ) + deploy = client.post( + "/api/scripts/deploy", + json={"name": "long", "source": "print(1)", "schedule": "every 1h"}, + ) + script_id = deploy.json()["id"] + first = client.post("/api/scripts/run-due") + assert first.json()["claimed"] == [script_id] + second = client.post("/api/scripts/run-due") + assert second.json()["claimed"] == [] +``` + +- [ ] **Step 5.2: Run — expect FAIL** + +```bash +pytest tests/test_run_due_scripts.py -v -k "deploy_request or run_due" 2>&1 | tail -25 +``` + +Expected: ValidationError on the validator tests; 404/405 on the endpoint tests (route doesn't exist). + +- [ ] **Step 5.3: Add the validator and endpoint** + +In `app/api/scripts.py`, near the top: + +```python +from datetime import datetime, timezone +from fastapi import BackgroundTasks +from pydantic import field_validator + +from src.scheduler import is_valid_schedule, is_table_due +``` + +Replace the existing `DeployScriptRequest` (lines 24-27) with: + +```python +class DeployScriptRequest(BaseModel): + name: str + source: str + schedule: Optional[str] = None + + @field_validator("schedule", mode="before") + @classmethod + def _validate_schedule(cls, v): + if v in (None, ""): + return None + if isinstance(v, str) and not v.strip(): + return None + if not is_valid_schedule(v): + raise ValueError( + f"schedule must be 'every Nm' / 'every Nh' / " + f"'daily HH:MM[,HH:MM,...]', got {v!r}" + ) + return v +``` + +Add the endpoint at the end of the route definitions (after `undeploy_script`): + +```python +@router.post("/run-due") +async def run_due_scripts( + background_tasks: BackgroundTasks, + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Run every deployed script whose ``schedule`` says it is due. + + Iterates ``script_registry``, skips rows without a schedule (those run + only via explicit POST /{id}/run), evaluates ``is_table_due(schedule, + last_run)``, and atomically claims each due row via + ``ScriptRepository.claim_for_run``. Execution is queued as a + ``BackgroundTask`` so the response returns immediately — the sidecar + must not block waiting on a long-running script. + + Concurrency: ``claim_for_run`` flips ``last_status`` to ``'running'`` + inside the same UPDATE; a script already in that state is skipped on + subsequent ticks until the BackgroundTask writes a terminal status via + ``record_run_result``. There is no max-runtime detection in this PR — + if a BackgroundTask crashes without writing a terminal status, the + script stays stuck in ``'running'`` until an operator clears it + manually (``UPDATE script_registry SET last_status = NULL WHERE id = + ?``). Documenting this as an accepted v0 limitation; revisit if it + bites in practice. + """ + repo = ScriptRepository(conn) + claimed: list[str] = [] + for script in repo.list_all(): + schedule = script.get("schedule") + if not schedule: + continue + last_run = script.get("last_run") + last_run_iso = last_run.isoformat() if last_run else None + if not is_table_due(schedule, last_run_iso): + continue + if not repo.claim_for_run(script["id"]): + # Lost the race / already running — next tick will retry. + continue + claimed.append(script["id"]) + background_tasks.add_task( + _run_claimed_script, + script_id=script["id"], + source=script["source"], + name=script["name"], + ) + return {"claimed": claimed, "count": len(claimed)} + + +def _run_claimed_script(script_id: str, source: str, name: str) -> None: + """Execute a previously-claimed script and write the terminal status. + + Runs in a FastAPI BackgroundTask, so it owns its own DB connection + (the request-scoped conn is already gone by the time this fires). + Any exception writes 'failure' and re-raises so the BG handler still + surfaces the traceback in logs. + """ + from src.db import get_system_db + bg_conn = get_system_db() + try: + bg_repo = ScriptRepository(bg_conn) + try: + _execute_script(source, name) + bg_repo.record_run_result(script_id, status="success") + except Exception: + bg_repo.record_run_result(script_id, status="failure") + raise + finally: + bg_conn.close() +``` + +- [ ] **Step 5.4: Run — expect green** + +```bash +pytest tests/test_run_due_scripts.py -v 2>&1 | tail -25 +``` + +Expected: green. If the LOCAL_DEV_MODE auth bypass test fixture doesn't quite work in your repo, mirror whatever pattern `tests/test_scripts_api.py` uses for the same client. + +- [ ] **Step 5.5: Commit** + +```bash +git add app/api/scripts.py tests/test_run_due_scripts.py +git commit -m "feat(scripts): POST /api/scripts/run-due + format validator (#78)" +``` + +--- + +## Task 6: Env-driven sidecar JOBS + add `script-runner` job + +**Files:** +- Modify: `services/scheduler/__main__.py` — replace hardcoded `JOBS` list with an env-driven builder; add 4th job for scripts. +- Create: `tests/test_scheduler_sidecar.py` — small unit tests on the new builder. + +**Why now:** With the API surface in place (Tasks 2 + 5), the sidecar is the operational glue. Refactor to env-driven config (#77) and add the script tick (#78) in one pass — the touched lines overlap. + +- [ ] **Step 6.1: Write failing tests** + +Create `tests/test_scheduler_sidecar.py`: + +```python +"""Unit tests for the env-driven JOBS builder in services.scheduler.""" + +import pytest + + +def test_build_jobs_uses_documented_defaults(monkeypatch): + """No env overrides → default cadences.""" + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs, resolved_tick_seconds + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["data-refresh"] == "every 15m" + assert jobs["health-check"] == "every 5m" + assert jobs["script-runner"] == "every 1m" + assert jobs["marketplaces"] == "daily 03:00" + assert resolved_tick_seconds() == 30 + + +def test_build_jobs_honors_env_overrides(monkeypatch): + monkeypatch.setenv("SCHEDULER_DATA_REFRESH_INTERVAL", "1800") # 30m + monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60") # 1m + monkeypatch.setenv("SCHEDULER_SCRIPT_RUN_INTERVAL", "120") # 2m + monkeypatch.setenv("SCHEDULER_TICK_SECONDS", "10") + from services.scheduler.__main__ import build_jobs, resolved_tick_seconds + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["data-refresh"] == "every 30m" + assert jobs["health-check"] == "every 1m" + assert jobs["script-runner"] == "every 2m" + assert resolved_tick_seconds() == 10 + + +@pytest.mark.parametrize("var", [ + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", +]) +@pytest.mark.parametrize("bad", ["0", "-5", "abc", ""]) +def test_build_jobs_rejects_invalid_env(monkeypatch, var, bad): + monkeypatch.setenv(var, bad) + from services.scheduler.__main__ import build_jobs + with pytest.raises(ValueError): + build_jobs() + + +def test_build_jobs_rejects_tick_larger_than_smallest_interval(monkeypatch): + """Tick must be <= the smallest job interval, otherwise jobs would + consistently miss their cadence by up to one tick.""" + monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60") + monkeypatch.setenv("SCHEDULER_TICK_SECONDS", "120") + from services.scheduler.__main__ import build_jobs + with pytest.raises(ValueError, match="tick"): + build_jobs() + + +def test_build_jobs_includes_run_due_endpoint(): + """The script-runner job must POST to /api/scripts/run-due.""" + from services.scheduler.__main__ import build_jobs + target = next(j for j in build_jobs() if j[0] == "script-runner") + name, schedule, endpoint, method, _timeout = target + assert endpoint == "/api/scripts/run-due" + assert method == "POST" +``` + +- [ ] **Step 6.2: Run — expect FAIL** + +```bash +pytest tests/test_scheduler_sidecar.py -v 2>&1 | tail -20 +``` + +Expected: ImportError on `build_jobs` / `resolved_tick_seconds`; or KeyError on `script-runner` (current JOBS list doesn't have it). + +- [ ] **Step 6.3: Refactor `services/scheduler/__main__.py`** + +Replace the hardcoded `JOBS` block (lines 72-89) and the `run()` function with the following. Keep everything above line 72 (imports, `_get_auth_token`, etc.) unchanged. + +```python +# --- Env parsing ------------------------------------------------------------ + +_DEFAULTS = { + "SCHEDULER_DATA_REFRESH_INTERVAL": 15 * 60, # seconds + "SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60, + "SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60, + "SCHEDULER_TICK_SECONDS": 30, +} + + +def _read_positive_int(name: str) -> int: + """Read an env var as a positive integer or fall back to the default.""" + raw = os.environ.get(name) + if raw is None or raw == "": + if name not in _DEFAULTS: + raise ValueError(f"Unknown scheduler env var: {name}") + return _DEFAULTS[name] + try: + value = int(raw) + except (TypeError, ValueError): + raise ValueError(f"{name}={raw!r} must be a positive integer (seconds)") + if value <= 0: + raise ValueError(f"{name}={value} must be > 0 (seconds)") + return value + + +def _seconds_to_schedule(seconds: int) -> str: + """Convert a seconds value to the closest 'every Nm' / 'every Nh' string.""" + if seconds % 3600 == 0 and seconds >= 3600: + return f"every {seconds // 3600}h" + minutes = max(1, seconds // 60) + return f"every {minutes}m" + + +def resolved_tick_seconds() -> int: + """Read + validate SCHEDULER_TICK_SECONDS in isolation (test helper).""" + return _read_positive_int("SCHEDULER_TICK_SECONDS") + + +def build_jobs() -> list[tuple[str, str, str, str, int]]: + """Build the JOBS list from env, applying defaults and validation. + + Tuple shape: (name, schedule_string, endpoint, method, http_timeout_sec). + Marketplaces stays hardcoded — promoting it to env is out of #77 scope. + """ + refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL") + health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL") + scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL") + tick = _read_positive_int("SCHEDULER_TICK_SECONDS") + smallest = min(refresh, health, scripts) + if tick > smallest: + raise ValueError( + f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job " + f"interval ({smallest}s) so jobs don't consistently miss their " + f"cadence by up to one tick" + ) + return [ + ("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120), + ("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30), + ("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600), + ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), + ] + + +_running = True + + +def _signal_handler(sig, frame): + global _running + logger.info(f"Received signal {sig}, shutting down...") + _running = False + + +def _call_api(endpoint: str, method: str, timeout_sec: int) -> bool: + """Call the main app API. Returns True on success.""" + url = f"{API_URL}{endpoint}" + headers = {} + token = _get_auth_token() + if token: + headers["Authorization"] = f"Bearer {token}" + try: + if method == "POST": + resp = httpx.post(url, headers=headers, timeout=timeout_sec) + else: + resp = httpx.get(url, headers=headers, timeout=timeout_sec) + if resp.status_code < 400: + logger.info(f"Job {endpoint}: {resp.status_code}") + return True + else: + logger.warning(f"Job {endpoint}: HTTP {resp.status_code} - {resp.text[:200]}") + return False + except Exception as e: + logger.error(f"Job {endpoint} failed: {e}") + return False + + +def run(): + signal.signal(signal.SIGTERM, _signal_handler) + signal.signal(signal.SIGINT, _signal_handler) + + jobs = build_jobs() + tick = resolved_tick_seconds() + logger.info( + "Scheduler started. API_URL=%s, %d jobs, tick=%ds. Schedules: %s", + API_URL, len(jobs), tick, + {name: schedule for name, schedule, *_ in jobs}, + ) + + last_run: dict[str, str | None] = {name: None for name, *_ in jobs} + + while _running: + now_iso = datetime.now(timezone.utc).isoformat() + for name, schedule, endpoint, method, timeout_sec in jobs: + if not is_table_due(schedule, last_run[name]): + continue + logger.info("Running job: %s (%s)", name, schedule) + ok = _call_api(endpoint, method, timeout_sec) + if ok: + last_run[name] = now_iso + time.sleep(tick) + + logger.info("Scheduler stopped.") + + +if __name__ == "__main__": + run() +``` + +(Delete the old `JOBS = [...]` literal and the old `run()` body — they're fully replaced.) + +- [ ] **Step 6.4: Run — expect green** + +```bash +pytest tests/test_scheduler_sidecar.py -v 2>&1 | tail -20 +``` + +Expected: green. + +- [ ] **Step 6.5: Commit** + +```bash +git add services/scheduler/__main__.py tests/test_scheduler_sidecar.py +git commit -m "feat(scheduler): env-driven JOBS + script-runner tick (#77, #78)" +``` + +--- + +## Task 7: OpenMetadata client — TLS verify by default + +**Files:** +- Modify: `connectors/openmetadata/client.py` +- Test: extend with a small tests file (or add to whatever existing test the connector has — search first). + +**Why:** `verify=False` ships JWT bearer tokens over an unauthenticated channel; the module-level `warnings.filterwarnings` mutates global state. Mirror the pattern in `connectors/llm/openai_compat.py` which already gets this right. + +- [ ] **Step 7.1: Locate existing OpenMetadata test file (if any)** + +```bash +ls tests/ | grep -i openmetadata +``` + +If empty, create `tests/test_openmetadata_client.py`. If a file exists, extend it. + +- [ ] **Step 7.2: Write failing tests** + +Create (or extend) `tests/test_openmetadata_client.py`: + +```python +"""Tests for OpenMetadataClient TLS handling — see #89. + +The previous version disabled TLS verification globally and suppressed the +"Unverified HTTPS request" warning at import time. Both behaviors are +fixed here. +""" + +import warnings +from unittest.mock import patch + + +def test_client_verifies_tls_by_default(): + from connectors.openmetadata.client import OpenMetadataClient + with patch("connectors.openmetadata.client.httpx.Client") as mock_client: + OpenMetadataClient(base_url="https://catalog.example.com", token="t") + kwargs = mock_client.call_args.kwargs + assert kwargs["verify"] is True + + +def test_client_accepts_explicit_verify_false(): + """Operators on internal CAs may opt out — but it must be explicit.""" + from connectors.openmetadata.client import OpenMetadataClient + with patch("connectors.openmetadata.client.httpx.Client") as mock_client: + OpenMetadataClient(base_url="https://catalog.example.com", token="t", verify=False) + assert mock_client.call_args.kwargs["verify"] is False + + +def test_client_accepts_custom_ca_bundle_path(): + from connectors.openmetadata.client import OpenMetadataClient + with patch("connectors.openmetadata.client.httpx.Client") as mock_client: + OpenMetadataClient( + base_url="https://catalog.example.com", + token="t", + verify="/etc/ssl/certs/internal-ca.pem", + ) + assert mock_client.call_args.kwargs["verify"] == "/etc/ssl/certs/internal-ca.pem" + + +def test_module_import_does_not_mutate_global_warnings_filter(): + """The previous version called warnings.filterwarnings('ignore', ...) + at import time, which suppresses urllib3 warnings for ALL httpx + clients in the process — not just OpenMetadata's. Drop it.""" + import importlib + pre_filters = list(warnings.filters) + import connectors.openmetadata.client as om + importlib.reload(om) + post_filters = list(warnings.filters) + # No new "ignore Unverified HTTPS request" filter should have been added. + new = [f for f in post_filters if f not in pre_filters] + for action, message, *_ in new: + if message is not None: + assert "Unverified HTTPS request" not in message.pattern +``` + +- [ ] **Step 7.3: Run — expect FAIL** + +```bash +pytest tests/test_openmetadata_client.py -v 2>&1 | tail -20 +``` + +Expected: failures — `verify=False` is hardcoded, and the module-level `warnings.filterwarnings` runs at import. + +- [ ] **Step 7.4: Fix the client** + +In `connectors/openmetadata/client.py`: + +Delete lines 14 (`import warnings`) and 18-19 (the `warnings.filterwarnings(...)` call and its comment). + +Replace the `__init__` signature (lines 34-59) with: + +```python + def __init__( + self, + base_url: str, + token: str, + timeout: int = 30, + verify: bool | str = True, + ): + """ + Initialize OpenMetadata API client. + + Args: + base_url: Base URL of OpenMetadata instance (e.g., "https://catalog.example.com") + token: JWT bearer token for authentication + timeout: HTTP request timeout in seconds + verify: TLS verification — True (default), False to disable + (e.g., for self-signed certificates on internal CAs), or a + path to a CA bundle. The previous version hardcoded False + globally and suppressed warnings — both removed in #89. + Operators with self-signed certs should pass an explicit + ``verify=False`` or a CA bundle path from their config. + """ + self.base_url = base_url.rstrip("/") + self.token = token + self.timeout = timeout + self._client = httpx.Client( + base_url=self.base_url, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }, + timeout=timeout, + verify=verify, + ) +``` + +- [ ] **Step 7.5: Run — expect green** + +```bash +pytest tests/test_openmetadata_client.py -v 2>&1 | tail -20 +``` + +Expected: green. + +- [ ] **Step 7.6: Audit existing call sites** + +```bash +grep -rn "OpenMetadataClient(" --include="*.py" . +``` + +Any call site that previously relied on the implicit `verify=False` will now hit a TLS error if it talks to a self-signed instance. Update each call site to pass `verify=` explicitly from the config (e.g., reading `OPENMETADATA_VERIFY_SSL` from instance.yaml or env). If no internal config flag exists yet, add one to `instance.yaml.example` and surface it in `config/loader.py` so operators have a tuning knob. **List every changed call site in the commit message.** + +- [ ] **Step 7.7: Commit** + +```bash +git add connectors/openmetadata/client.py tests/test_openmetadata_client.py [...any caller files updated in 7.6] +git commit -m "fix(openmetadata): verify TLS by default; drop module-level warning filter (#89)" +``` + +--- + +## Task 8: Documentation + `.env.template` updates + +**Files:** +- Modify: `config/.env.template` +- Modify: `docs/DEPLOYMENT.md` + +**Why:** The new env vars are operator-facing surface — they need to be discoverable without spelunking source. + +- [ ] **Step 8.1: Add the env vars to `config/.env.template`** + +Append a new section to `config/.env.template`: + +```ini +# ── SCHEDULER (sidecar tuning) ────────────────────── +# All values are in seconds and must be positive integers. SCHEDULER_TICK_SECONDS +# must be <= the smallest job interval below. +# SCHEDULER_DATA_REFRESH_INTERVAL=900 # default 15 min — POST /api/sync/trigger +# SCHEDULER_HEALTH_CHECK_INTERVAL=300 # default 5 min — GET /api/health +# SCHEDULER_SCRIPT_RUN_INTERVAL=60 # default 1 min — POST /api/scripts/run-due +# SCHEDULER_TICK_SECONDS=30 # default 30 s — loop polling cadence +``` + +- [ ] **Step 8.2: Add a "Scheduler tuning" subsection to `docs/DEPLOYMENT.md`** + +Find the most appropriate location (probably near the existing TLS / Docker compose section) and insert: + +```markdown +### Scheduler tuning + +The scheduler sidecar (`services/scheduler/__main__.py`) fires periodic +HTTP calls against the main app. Job cadences are configurable via env +vars on the scheduler container: + +| Env var | Default | Purpose | +| ---------------------------------- | ------- | --------------------------------------------- | +| `SCHEDULER_DATA_REFRESH_INTERVAL` | `900` | seconds between `POST /api/sync/trigger` | +| `SCHEDULER_HEALTH_CHECK_INTERVAL` | `300` | seconds between `GET /api/health` | +| `SCHEDULER_SCRIPT_RUN_INTERVAL` | `60` | seconds between `POST /api/scripts/run-due` | +| `SCHEDULER_TICK_SECONDS` | `30` | loop polling cadence; must be ≤ smallest interval above | + +`/api/sync/trigger` walks `table_registry`; tables with a per-row +`sync_schedule` (`every Nm` / `every Nh` / `daily HH:MM[,...]`) are +filtered to only those due for sync since their last run. Tables without +a schedule continue to run on every tick. The marketplace job runs at +`daily 03:00` UTC and is not currently env-tunable. + +`/api/scripts/run-due` walks `script_registry` and runs each deployed +script whose `schedule` says it is due. Scripts in the `running` state +are skipped on subsequent ticks until the previous run writes a terminal +status. The endpoint requires admin auth (the sidecar's +`SCHEDULER_API_TOKEN` resolves to a synthetic Admin user). +``` + +- [ ] **Step 8.3: Commit** + +```bash +git add config/.env.template docs/DEPLOYMENT.md +git commit -m "docs: document scheduler env vars + per-table/script schedules (#77, #78, #79)" +``` + +--- + +## Task 9: CHANGELOG entries + release cut + +**Files:** +- Modify: `CHANGELOG.md` +- Modify: `pyproject.toml` + +**Why:** Per CLAUDE.md, every user-visible change requires a CHANGELOG entry in the same PR. This is one bundled PR covering four issues; release-cut goes here as the last commit (per user convention: release-cut belongs in the next behavior-change PR, not a standalone one). + +- [ ] **Step 9.1: Edit `CHANGELOG.md`** + +Replace the current top-of-file structure: + +```markdown +## [Unreleased] + +## [0.18.0] — 2026-04-29 +... +``` + +with: + +```markdown +## [Unreleased] + +## [0.19.0] — 2026-04-29 + +### Added +- `table_registry.sync_schedule` is now honored at runtime. `POST /api/sync/trigger` (called by the scheduler sidecar every 15 min by default) drops local tables whose schedule says they are not due. Tables without a schedule continue to sync on every tick (opt-in feature). Manual `POST /api/sync/trigger {"tables":[...]}` bypasses the schedule filter — operator override always wins. (#79) +- `script_registry.schedule` is now honored at runtime via the new endpoint `POST /api/scripts/run-due` (admin-only). The scheduler sidecar fires this every 60 s by default. Each due script is claimed atomically (`last_status='running'`), executed in a BackgroundTask, and the outcome written to `last_run` / `last_status`. Scripts already in `running` state are skipped — no concurrent runs of the same script. (#78) +- Four new env vars on the scheduler sidecar: `SCHEDULER_DATA_REFRESH_INTERVAL`, `SCHEDULER_HEALTH_CHECK_INTERVAL`, `SCHEDULER_SCRIPT_RUN_INTERVAL`, `SCHEDULER_TICK_SECONDS`. All accept positive integers (seconds); tick must be ≤ smallest job interval. Documented in `docs/DEPLOYMENT.md` → Scheduler tuning. (#77) +- `RegisterTableRequest.sync_schedule`, `UpdateTableRequest.sync_schedule`, and `DeployScriptRequest.schedule` now reject malformed strings with a Pydantic 422 (e.g. `"hourly"`, `"daily 25:00"`). The accepted forms are unchanged: `every Nm`, `every Nh`, `daily HH:MM[,HH:MM,...]`. (#78, #79) + +### Changed +- `OpenMetadataClient` now defaults to `verify=True` for TLS. The previous version hardcoded `verify=False` and suppressed urllib3's "Unverified HTTPS request" warning at import time (which leaked to every other httpx client in the process). Operators on internal CAs must pass `verify=False` or a CA bundle path explicitly. **Existing deployments on self-signed certificates without an explicit opt-out will start failing TLS verification — set `verify=False` at the call site, or supply a CA bundle, before upgrading.** (#89) + +### Internal +- `src/scheduler.py` now exports `is_valid_schedule(s)` and `filter_due_tables(configs, sync_state_repo)` for reuse across the sync filter, the script runner, and Pydantic validators. +- `ScriptRepository` gains `claim_for_run(script_id)` and `record_run_result(script_id, status)` — the atomic primitives for the scheduled-script execution path. +``` + +- [ ] **Step 9.2: Bump version** + +In `pyproject.toml`, change: + +```toml +version = "0.18.0" +``` + +to: + +```toml +version = "0.19.0" +``` + +- [ ] **Step 9.3: Commit + tag (tag pushed by maintainer post-merge)** + +```bash +git add CHANGELOG.md pyproject.toml +git commit -m "chore(release): cut 0.19.0 — scheduler re-wire + OpenMetadata TLS" +``` + +(Do NOT push a `v0.19.0` git tag from the worktree. Per the user's convention, the tag is created on the merge commit on `main` and a GitHub Release is opened to mirror it.) + +--- + +## Task 10: Final verification + +**Files:** none modified. + +- [ ] **Step 10.1: Run the full test suite** + +```bash +pytest tests/ -x 2>&1 | tail -40 +``` + +Expected: green. If any unrelated test fails, investigate before declaring done — possible interaction with the import-order changes in `app/api/sync.py` or the new `field_validator` in `app/api/admin.py`. + +- [ ] **Step 10.2: Smoke-test the import surface** + +```bash +python -c "from app.main import app; from services.scheduler.__main__ import build_jobs; print('jobs:', [j[0] for j in build_jobs()])" +``` + +Expected output: `jobs: ['data-refresh', 'health-check', 'script-runner', 'marketplaces']`. Any ImportError indicates a missing import added in this PR. + +- [ ] **Step 10.3: Open the PR** + +```bash +git push -u origin worktree-issues-68-77-78-79-89 +gh pr create --title "feat(scheduler): honor sync_schedule + script schedule; tune via env; OpenMetadata TLS" --body "$(cat <<'EOF' +## Summary + +Bundles four scheduler / security issues: + +- **#79** — `table_registry.sync_schedule` is now honored at runtime via an API-side filter inside `_run_sync()`. Tables without a schedule continue to sync on every tick; manual `POST /api/sync/trigger {"tables":[...]}` bypasses the filter. +- **#78** — New endpoint `POST /api/scripts/run-due` runs deployed scripts whose `schedule` says they are due. Atomic claim via `last_status='running'`; results written via BackgroundTask. +- **#77** — Sidecar JOBS list is now built from env (`SCHEDULER_*_INTERVAL`, `SCHEDULER_TICK_SECONDS`). Validation: positive ints, tick ≤ smallest interval. Adds a 4th `script-runner` job for #78. +- **#89** — `OpenMetadataClient` defaults to `verify=True`. Module-level `warnings.filterwarnings` removed. + +Issue **#68** is intentionally NOT in scope — the referenced Stop hook script does not live in this OSS repo as of HEAD; the issue needs clarification before implementation. + +## Test plan + +- [ ] `pytest tests/` passes +- [ ] Manual: register a table with `sync_schedule="every 1h"`, sync it, then trigger sync within the hour — confirm log line `Table X skipped: schedule=...` +- [ ] Manual: deploy a script with `schedule="every 1m"`, wait, confirm `last_run` and `last_status` populate +- [ ] Manual: set `SCHEDULER_TICK_SECONDS=99999` → scheduler container fails to start with the validation error +- [ ] Manual: any internal OpenMetadata caller now passes `verify=False` (or a CA bundle path) explicitly + +EOF +)" +``` + +--- + +## Self-review checklist (run before declaring plan-write done) + +- **Spec coverage:** #77 ✓ (Task 6), #78 ✓ (Tasks 4–6), #79 ✓ (Tasks 1–3), #89 ✓ (Task 7), #68 ✗ (intentionally out of scope, documented in plan header). All accepted. +- **Placeholder scan:** none of the "TBD / fill in / similar to" forbidden phrases. Code blocks present in every implementation step. +- **Type consistency:** `claim_for_run` / `record_run_result` referenced in Tasks 4 and 5 with matching signatures. `filter_due_tables` referenced in Tasks 1 and 2 with matching signature. `is_valid_schedule` referenced in Tasks 1, 3, 5 with consistent contract. `build_jobs` and `resolved_tick_seconds` defined and used in Task 6 only. +- **Schema migration:** no migration. Verified `table_registry.sync_schedule` and `script_registry.{schedule,last_run,last_status}` already exist in v17. diff --git a/pyproject.toml b/pyproject.toml index 4cd9931..5613e49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.18.0" +version = "0.19.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py index 5822855..4f5a060 100644 --- a/services/scheduler/__main__.py +++ b/services/scheduler/__main__.py @@ -69,24 +69,83 @@ def _get_auth_token() -> str: return "" -# Schedule definitions: (name, schedule_string, endpoint, method, timeout_sec). -# All jobs are HTTP — see the module docstring for why nothing runs -# in-process anymore. ``daily 03:00`` for marketplaces matches the cadence -# the previous in-process job used; the endpoint is admin-only and -# idempotent (it iterates the registry and per-marketplace errors do not -# abort the run). -# -# timeout_sec: per-job override for the httpx call. Marketplaces gets a -# generous 15 min because the app handler iterates every registered -# marketplace under a single lock with up to 300s of git timeout per -# entry — at 120s (the default that data-refresh uses) a real-world -# registry of more than 2-3 slow repos times out the scheduler call, -# which then re-fires on the next 30s tick and queues a redundant sync. -JOBS = [ - ("data-refresh", "every 15m", "/api/sync/trigger", "POST", 120), - ("health-check", "every 5m", "/api/health", "GET", 30), - ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), -] +# --- Env parsing ------------------------------------------------------------ + +_DEFAULTS = { + "SCHEDULER_DATA_REFRESH_INTERVAL": 15 * 60, # seconds + "SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60, + "SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60, + "SCHEDULER_TICK_SECONDS": 30, +} + + +def _read_positive_int(name: str) -> int: + """Read an env var as a positive integer or fall back to the default. + + Treats unset env (``None``) as "use default". Treats explicitly empty + string (``""``) as an operator typo and raises — silently defaulting + on a literal ``FOO=`` in the env_file would mask configuration bugs. + """ + raw = os.environ.get(name) + if raw is None: + if name not in _DEFAULTS: + raise ValueError(f"Unknown scheduler env var: {name}") + return _DEFAULTS[name] + if raw == "": + raise ValueError(f"{name}='' must be a positive integer (seconds)") + try: + value = int(raw) + except (TypeError, ValueError): + raise ValueError(f"{name}={raw!r} must be a positive integer (seconds)") + if value <= 0: + raise ValueError(f"{name}={value} must be > 0 (seconds)") + return value + + +def _seconds_to_schedule(seconds: int) -> str: + """Convert a seconds value to the closest 'every Nm' / 'every Nh' string. + + Uses ceiling division so a non-multiple-of-60 input never produces a + schedule that fires MORE often than the operator configured (90s → + 'every 2m', not 'every 1m'). Sub-minute inputs clamp to 'every 1m' + because the schedule grammar has minute-level resolution. + """ + if seconds % 3600 == 0 and seconds >= 3600: + return f"every {seconds // 3600}h" + # Ceiling division: -(-x // y) is the standard trick. + minutes = max(1, -(-seconds // 60)) + return f"every {minutes}m" + + +def resolved_tick_seconds() -> int: + """Read + validate SCHEDULER_TICK_SECONDS in isolation (test helper).""" + return _read_positive_int("SCHEDULER_TICK_SECONDS") + + +def build_jobs() -> list[tuple[str, str, str, str, int]]: + """Build the JOBS list from env, applying defaults and validation. + + Tuple shape: (name, schedule_string, endpoint, method, http_timeout_sec). + Marketplaces stays hardcoded — promoting it to env is out of #77 scope. + """ + refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL") + health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL") + scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL") + tick = _read_positive_int("SCHEDULER_TICK_SECONDS") + smallest = min(refresh, health, scripts) + if tick > smallest: + raise ValueError( + f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job " + f"interval ({smallest}s) so jobs don't consistently miss their " + f"cadence by up to one tick" + ) + return [ + ("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120), + ("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30), + ("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600), + ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), + ] + _running = True @@ -124,24 +183,26 @@ def run(): signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) - logger.info(f"Scheduler started. API_URL={API_URL}, {len(JOBS)} jobs configured.") + jobs = build_jobs() + tick = resolved_tick_seconds() + logger.info( + "Scheduler started. API_URL=%s, %d jobs, tick=%ds. Schedules: %s", + API_URL, len(jobs), tick, + {name: schedule for name, schedule, *_ in jobs}, + ) - # Track last successful run per job as ISO string — matches what - # src.scheduler.is_table_due expects. - last_run: dict[str, str | None] = {name: None for name, *_ in JOBS} + last_run: dict[str, str | None] = {name: None for name, *_ in jobs} while _running: now_iso = datetime.now(timezone.utc).isoformat() - for name, schedule, endpoint, method, timeout_sec in JOBS: + for name, schedule, endpoint, method, timeout_sec in jobs: if not is_table_due(schedule, last_run[name]): continue logger.info("Running job: %s (%s)", name, schedule) ok = _call_api(endpoint, method, timeout_sec) if ok: last_run[name] = now_iso - # 30s tick is plenty: interval jobs have minute-level resolution, - # daily jobs have a ~24 h retry window. - time.sleep(30) + time.sleep(tick) logger.info("Scheduler stopped.") diff --git a/src/catalog_export.py b/src/catalog_export.py index 76c645c..7de13a3 100644 --- a/src/catalog_export.py +++ b/src/catalog_export.py @@ -418,6 +418,7 @@ def main() -> None: om_config = instance_config.get("openmetadata", {}) catalog_url = om_config.get("url", "").strip() token = om_config.get("token", "").strip() + verify_ssl = om_config.get("verify_ssl", True) if not catalog_url or not token: logger.warning("OpenMetadata not configured (url or token missing) - skipping export") @@ -428,7 +429,9 @@ def main() -> None: # Initialize client try: - client = OpenMetadataClient(base_url=catalog_url, token=token) + client = OpenMetadataClient( + base_url=catalog_url, token=token, verify=verify_ssl, + ) except Exception as e: logger.warning(f"Failed to initialize OpenMetadata client: {e}") return diff --git a/src/repositories/notifications.py b/src/repositories/notifications.py index bf9a6f6..45b8ea3 100644 --- a/src/repositories/notifications.py +++ b/src/repositories/notifications.py @@ -103,3 +103,40 @@ class ScriptRepository: return [] columns = [desc[0] for desc in self.conn.description] return [dict(zip(columns, row)) for row in results] + + def claim_for_run(self, script_id: str) -> bool: + """Atomically set last_status='running' iff the script is idle. + + Returns True iff this caller is the new owner of the run slot. + Returns False if the script does not exist OR is already running. + + Implementation: UPDATE … WHERE last_status IS DISTINCT FROM 'running' + + RETURNING id. DuckDB supports IS DISTINCT FROM and RETURNING; if + zero rows come back, somebody else already owns the slot. + """ + now = datetime.now(timezone.utc) + result = self.conn.execute( + """UPDATE script_registry + SET last_status = 'running', last_run = ? + WHERE id = ? + AND last_status IS DISTINCT FROM 'running' + RETURNING id""", + [now, script_id], + ).fetchone() + return result is not None + + def record_run_result(self, script_id: str, status: str) -> None: + """Write the terminal status of a finished run (clears 'running'). + + Accepts only 'success' or 'failure' — 'running' would re-arm the + flag instead of clearing it, defeating the purpose of the call. + """ + if status not in ("success", "failure"): + raise ValueError( + f"record_run_result: status must be 'success' or 'failure', " + f"got {status!r}" + ) + self.conn.execute( + "UPDATE script_registry SET last_status = ? WHERE id = ?", + [status, script_id], + ) diff --git a/src/scheduler.py b/src/scheduler.py index 4f70913..7aea1aa 100644 --- a/src/scheduler.py +++ b/src/scheduler.py @@ -168,3 +168,90 @@ def _parse_timestamp(iso_string: str) -> Optional[datetime]: continue return None + + +def is_valid_schedule(schedule: Optional[str]) -> bool: + """Return True iff ``schedule`` parses as a documented schedule string. + + Accepted forms (mirroring the rest of this module): + - ``"every Nm"`` / ``"every Nh"`` with N a positive integer + - ``"daily HH:MM"`` (24-h, UTC) optionally comma-separated: + ``"daily 07:00,13:00"`` + + Anything else — including ``None``, empty string, or a parseable-looking + but out-of-range value (``"daily 25:00"``) — returns False. Pydantic + validators on the admin API call this to reject malformed input with + 422 instead of accepting it and silently no-op'ing later. + """ + if not schedule or not isinstance(schedule, str): + return False + interval = parse_interval_minutes(schedule) + if interval is not None: + return interval > 0 + match = DAILY_PATTERN.match(schedule) + if not match: + return False + return bool(_parse_daily_times(match.group(1))) + + +def filter_due_tables( + table_configs: list[dict], + sync_state_repo, + now: Optional[datetime] = None, +) -> list[dict]: + """Drop table configs whose ``sync_schedule`` says they are not due. + + Behaviour: + - ``sync_schedule`` is None / empty / not a valid string → table passes + through (no schedule = "sync on every tick", existing behaviour). + - Valid schedule + last_sync within the cadence → drop. + - Valid schedule + last_sync past cadence (or never) → keep. + - Invalid schedule string → log a warning and let the table through + (do NOT silently skip — operator surprise is worse than a redundant + sync). + + ``sync_state_repo`` is duck-typed: only ``get_last_sync(table_id)`` is + called, returning a ``datetime`` (tz-aware preferred, naive treated as + UTC) or ``None``. + """ + if now is None: + now = datetime.now(timezone.utc) + out: list[dict] = [] + for tc in table_configs: + # sync_state.table_id is populated from _meta.table_name by the + # orchestrator and equals table_registry.name (NOT id). When + # id != name (auto-discovered Keboola rows: id="in_c-crm_company", + # name="company") an id-keyed lookup misses every row and the + # filter degrades to "always sync" — defeating the schedule. The + # same pitfall is documented at app/api/sync.py:244-249. + table_id = tc.get("name") or tc.get("id") + schedule = tc.get("sync_schedule") + if not schedule: + out.append(tc) + continue + if not is_valid_schedule(schedule): + logger.warning( + "Table %s has malformed sync_schedule %r — syncing anyway " + "(fix the schedule string to suppress this message)", + table_id, + schedule, + ) + out.append(tc) + continue + last_sync = sync_state_repo.get_last_sync(table_id) + if last_sync is None: + last_sync_iso = None + else: + if last_sync.tzinfo is None: + last_sync = last_sync.replace(tzinfo=timezone.utc) + last_sync_iso = last_sync.isoformat() + if is_table_due(schedule, last_sync_iso, now=now): + out.append(tc) + else: + logger.info( + "Table %s skipped: schedule=%r, last_sync=%s, not due yet", + table_id, + schedule, + last_sync_iso, + ) + return out diff --git a/tests/helpers/factories.py b/tests/helpers/factories.py index f27bdaf..d0a5057 100644 --- a/tests/helpers/factories.py +++ b/tests/helpers/factories.py @@ -32,7 +32,7 @@ class TableRegistryFactory: _SOURCE_TYPES = ["keboola", "bigquery", "csv"] _QUERY_MODES = ["local", "remote"] - _SCHEDULES = ["0 * * * *", "0 6 * * *", "*/30 * * * *"] + _SCHEDULES = ["every 1h", "daily 06:00", "every 30m"] @staticmethod def build(**overrides) -> dict[str, Any]: diff --git a/tests/test_admin_bq_register.py b/tests/test_admin_bq_register.py index 5a32800..267e920 100644 --- a/tests/test_admin_bq_register.py +++ b/tests/test_admin_bq_register.py @@ -17,6 +17,9 @@ import json from unittest.mock import MagicMock, patch import pytest +from pydantic import ValidationError + +from app.api.admin import RegisterTableRequest, UpdateTableRequest def _auth(token): @@ -2042,3 +2045,46 @@ class TestRegisterTablePrecheckHandlerIsSync: "it in a threadpool; otherwise the synchronous bigquery.Client " "calls block the asyncio event loop." ) + + +# --- sync_schedule format validation (#79) ---------------------------------- + + +@pytest.mark.parametrize("schedule", [ + "every 15m", + "every 1h", + "daily 05:00", + "daily 07:00,13:00,18:00", + None, # explicit None is allowed (no schedule = always sync) +]) +def test_register_request_accepts_valid_sync_schedule(schedule): + req = RegisterTableRequest(name="orders", sync_schedule=schedule) + assert req.sync_schedule == schedule + + +@pytest.mark.parametrize("schedule", [ + "hourly", + "every 0m", + "daily 25:00", + "every 5x", + " ", +]) +def test_register_request_rejects_malformed_sync_schedule(schedule): + with pytest.raises(ValidationError) as exc_info: + RegisterTableRequest(name="orders", sync_schedule=schedule) + assert "sync_schedule" in str(exc_info.value) + + +@pytest.mark.parametrize("schedule", [ + "every 30m", + "daily 08:00", + None, +]) +def test_update_request_accepts_valid_sync_schedule(schedule): + req = UpdateTableRequest(sync_schedule=schedule) + assert req.sync_schedule == schedule + + +def test_update_request_rejects_malformed_sync_schedule(): + with pytest.raises(ValidationError): + UpdateTableRequest(sync_schedule="weekly") diff --git a/tests/test_admin_configure_api.py b/tests/test_admin_configure_api.py index e06ad9c..09a0bee 100644 --- a/tests/test_admin_configure_api.py +++ b/tests/test_admin_configure_api.py @@ -403,7 +403,7 @@ class TestRegisterTable: "bucket": "in.c-crm", "source_table": "full_table", "query_mode": "local", - "sync_schedule": "0 6 * * *", + "sync_schedule": "daily 06:00", "description": "Full configuration table", "profile_after_sync": True, }, diff --git a/tests/test_api_scripts.py b/tests/test_api_scripts.py index 7018419..1fa2e7d 100644 --- a/tests/test_api_scripts.py +++ b/tests/test_api_scripts.py @@ -86,7 +86,7 @@ class TestScriptsAPI: admin_headers = {"Authorization": f"Bearer {admin_token}"} resp = c.post("/api/scripts/deploy", json={ - "name": "calc", "source": "print(2+2)", "schedule": "0 8 * * MON", + "name": "calc", "source": "print(2+2)", "schedule": "daily 08:00", }, headers=admin_headers) script_id = resp.json()["id"] diff --git a/tests/test_openmetadata_client.py b/tests/test_openmetadata_client.py index 5e8fe62..64f0df3 100644 --- a/tests/test_openmetadata_client.py +++ b/tests/test_openmetadata_client.py @@ -2,6 +2,8 @@ Tests for OpenMetadata client """ +import warnings + import pytest import httpx from unittest.mock import Mock, patch, MagicMock @@ -155,3 +157,48 @@ def test_context_manager(): # Verify close() was called mock_client_instance.close.assert_called_once() + + +# --- TLS verify (#89) ------------------------------------------------------- + + +def test_client_verifies_tls_by_default(): + """Default `verify=True` — no more silent MITM exposure of the JWT.""" + with patch("connectors.openmetadata.client.httpx.Client") as mock_client: + OpenMetadataClient(base_url="https://catalog.example.com", token="t") + kwargs = mock_client.call_args.kwargs + assert kwargs["verify"] is True + + +def test_client_accepts_explicit_verify_false(): + """Operators on internal CAs may opt out — but it must be explicit.""" + with patch("connectors.openmetadata.client.httpx.Client") as mock_client: + OpenMetadataClient(base_url="https://catalog.example.com", token="t", verify=False) + assert mock_client.call_args.kwargs["verify"] is False + + +def test_client_accepts_custom_ca_bundle_path(): + """A path string passed to verify is forwarded to httpx untouched + (httpx then uses it as the trust store).""" + with patch("connectors.openmetadata.client.httpx.Client") as mock_client: + OpenMetadataClient( + base_url="https://catalog.example.com", + token="t", + verify="/etc/ssl/certs/internal-ca.pem", + ) + assert mock_client.call_args.kwargs["verify"] == "/etc/ssl/certs/internal-ca.pem" + + +def test_module_import_does_not_mutate_global_warnings_filter(): + """The previous version called warnings.filterwarnings('ignore', ...) + at import time, suppressing urllib3 warnings for ALL httpx clients in + the process. Drop the side effect.""" + import importlib + pre_filters = list(warnings.filters) + import connectors.openmetadata.client as om + importlib.reload(om) + post_filters = list(warnings.filters) + new = [f for f in post_filters if f not in pre_filters] + for action, message, *_ in new: + if message is not None: + assert "Unverified HTTPS request" not in message.pattern diff --git a/tests/test_repositories.py b/tests/test_repositories.py index 1c2edaa..b054bdc 100644 --- a/tests/test_repositories.py +++ b/tests/test_repositories.py @@ -247,10 +247,10 @@ class TestScriptRepository: from src.repositories.notifications import ScriptRepository repo = ScriptRepository(db_conn) repo.deploy("s1", name="sales_alert", owner="u1", - schedule="0 8 * * MON", source="print('hello')") + schedule="daily 08:00", source="print('hello')") script = repo.get("s1") assert script is not None - assert script["schedule"] == "0 8 * * MON" + assert script["schedule"] == "daily 08:00" def test_list_all(self, db_conn): from src.repositories.notifications import ScriptRepository diff --git a/tests/test_run_due_scripts.py b/tests/test_run_due_scripts.py new file mode 100644 index 0000000..499eeb7 --- /dev/null +++ b/tests/test_run_due_scripts.py @@ -0,0 +1,238 @@ +"""Tests for the scheduled-script runner — repo claim/release primitives, +the run-due endpoint, and Pydantic validation on DeployScriptRequest.""" + +from datetime import datetime, timezone + +import pytest +from pydantic import ValidationError + +from app.api.scripts import DeployScriptRequest +from src.db import get_system_db +from src.repositories.notifications import ScriptRepository + + +@pytest.fixture() +def conn(tmp_path, monkeypatch): + """Fresh system.duckdb in a tmp dir — uses real schema, no mocks.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state_dir = tmp_path / "state" + state_dir.mkdir(parents=True, exist_ok=True) + c = get_system_db() + yield c + c.close() + + +def _deploy(repo: ScriptRepository, script_id="s1", schedule="every 1h"): + repo.deploy(id=script_id, name=script_id, owner="u1", + schedule=schedule, source="print('hi')") + + +# ---------------- claim_for_run --------------------------------------------- + +def test_claim_for_run_succeeds_when_idle(conn): + repo = ScriptRepository(conn) + _deploy(repo) + assert repo.claim_for_run("s1") is True + row = repo.get("s1") + assert row["last_status"] == "running" + assert row["last_run"] is not None + + +def test_claim_for_run_fails_when_already_running(conn): + repo = ScriptRepository(conn) + _deploy(repo) + assert repo.claim_for_run("s1") is True + # Second claim should fail because last_status is still 'running'. + assert repo.claim_for_run("s1") is False + + +def test_claim_for_run_succeeds_after_completion(conn): + repo = ScriptRepository(conn) + _deploy(repo) + repo.claim_for_run("s1") + repo.record_run_result("s1", status="success") + # Now claimable again. + assert repo.claim_for_run("s1") is True + + +def test_claim_for_run_returns_false_for_unknown_script(conn): + repo = ScriptRepository(conn) + assert repo.claim_for_run("does-not-exist") is False + + +# ---------------- record_run_result ----------------------------------------- + +@pytest.mark.parametrize("status", ["success", "failure"]) +def test_record_run_result_writes_terminal_status(conn, status): + repo = ScriptRepository(conn) + _deploy(repo) + repo.claim_for_run("s1") + repo.record_run_result("s1", status=status) + row = repo.get("s1") + assert row["last_status"] == status + + +def test_record_run_result_rejects_running_as_terminal(conn): + """The 'running' string is reserved for claim_for_run; record_run_result + must reject it so a caller can't accidentally re-arm the running flag + instead of clearing it.""" + repo = ScriptRepository(conn) + _deploy(repo) + repo.claim_for_run("s1") + with pytest.raises(ValueError): + repo.record_run_result("s1", status="running") + + +# ---------------- DeployScriptRequest.schedule validation ------------------- + +def test_deploy_request_accepts_valid_schedule(): + req = DeployScriptRequest(name="report", source="print(1)", schedule="every 1h") + assert req.schedule == "every 1h" + + +def test_deploy_request_accepts_no_schedule(): + req = DeployScriptRequest(name="report", source="print(1)") + assert req.schedule is None + + +def test_deploy_request_rejects_malformed_schedule(): + with pytest.raises(ValidationError): + DeployScriptRequest(name="report", source="print(1)", schedule="weekly") + + +# ---------------- /api/scripts/run-due endpoint ----------------------------- + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +def test_run_due_skips_scripts_without_schedule(seeded_app, monkeypatch): + """A script with schedule=NULL is never picked up by run-due (those + are run only via explicit POST /api/scripts/{id}/run).""" + monkeypatch.setattr( + "app.api.scripts._execute_script", + lambda src, name: {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False}, + ) + c = seeded_app["client"] + token = seeded_app["admin_token"] + deploy = c.post( + "/api/scripts/deploy", + json={"name": "manual-only", "source": "print(1)"}, + headers=_auth(token), + ) + assert deploy.status_code == 201 + resp = c.post("/api/scripts/run-due", headers=_auth(token)) + assert resp.status_code == 200 + assert resp.json()["claimed"] == [] + + +def test_run_due_claims_due_scripts(seeded_app, monkeypatch): + """A script on 'every 1h' that has never run gets claimed and executed.""" + calls = [] + + def _fake_exec(source, name): + calls.append(name) + return {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False} + + monkeypatch.setattr("app.api.scripts._execute_script", _fake_exec) + c = seeded_app["client"] + token = seeded_app["admin_token"] + deploy = c.post( + "/api/scripts/deploy", + json={"name": "report", "source": "print(1)", "schedule": "every 1h"}, + headers=_auth(token), + ) + assert deploy.status_code == 201 + script_id = deploy.json()["id"] + resp = c.post("/api/scripts/run-due", headers=_auth(token)) + assert resp.status_code == 200 + body = resp.json() + assert body["claimed"] == [script_id] + # BackgroundTasks runs synchronously inside TestClient, so the call + # has happened by now. + assert "report" in calls + + +def test_run_due_records_failure_when_script_exits_nonzero(seeded_app, monkeypatch): + """`_execute_script` returns `{exit_code: N, ...}` for non-zero exits + + timeouts (only safety violations RAISE). `_run_claimed_script` must + inspect exit_code rather than treat "no exception" as success — see + Devin review BUG_0001.""" + monkeypatch.setattr( + "app.api.scripts._execute_script", + lambda src, name: { + "name": name, "exit_code": 1, + "stdout": "", "stderr": "boom", "truncated": False, + }, + ) + c = seeded_app["client"] + token = seeded_app["admin_token"] + deploy = c.post( + "/api/scripts/deploy", + json={"name": "broken", "source": "print(1)", "schedule": "every 1h"}, + headers=_auth(token), + ) + assert deploy.status_code == 201 + script_id = deploy.json()["id"] + resp = c.post("/api/scripts/run-due", headers=_auth(token)) + assert resp.json()["claimed"] == [script_id] + # BackgroundTasks runs synchronously inside TestClient — by now the + # terminal status must be 'failure', not 'success'. + listing = c.get("/api/scripts", headers=_auth(token)).json()["scripts"] + row = next(s for s in listing if s["id"] == script_id) + assert row["last_status"] == "failure", ( + f"non-zero exit_code must record 'failure', got {row['last_status']!r}" + ) + + +def test_run_due_records_success_when_script_exits_zero(seeded_app, monkeypatch): + """Mirror of the failure test — exit_code=0 must record 'success'.""" + monkeypatch.setattr( + "app.api.scripts._execute_script", + lambda src, name: { + "name": name, "exit_code": 0, + "stdout": "ok", "stderr": "", "truncated": False, + }, + ) + c = seeded_app["client"] + token = seeded_app["admin_token"] + deploy = c.post( + "/api/scripts/deploy", + json={"name": "good", "source": "print(1)", "schedule": "every 1h"}, + headers=_auth(token), + ) + assert deploy.status_code == 201 + script_id = deploy.json()["id"] + c.post("/api/scripts/run-due", headers=_auth(token)) + listing = c.get("/api/scripts", headers=_auth(token)).json()["scripts"] + row = next(s for s in listing if s["id"] == script_id) + assert row["last_status"] == "success" + + +def test_run_due_skips_scripts_already_running(seeded_app, monkeypatch): + """A script in 'running' state must not be re-claimed by a second + sidecar tick that arrives while the previous run is still going.""" + monkeypatch.setattr( + "app.api.scripts._execute_script", + # Simulate a slow run by NOT updating last_status — repo.claim_for_run + # already wrote 'running'; we leave it that way. + lambda src, name: {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False}, + ) + # Patch out record_run_result so the run never "completes". + monkeypatch.setattr( + "src.repositories.notifications.ScriptRepository.record_run_result", + lambda self, *a, **kw: None, + ) + c = seeded_app["client"] + token = seeded_app["admin_token"] + deploy = c.post( + "/api/scripts/deploy", + json={"name": "long", "source": "print(1)", "schedule": "every 1h"}, + headers=_auth(token), + ) + assert deploy.status_code == 201 + script_id = deploy.json()["id"] + first = c.post("/api/scripts/run-due", headers=_auth(token)) + assert first.json()["claimed"] == [script_id] + second = c.post("/api/scripts/run-due", headers=_auth(token)) + assert second.json()["claimed"] == [] diff --git a/tests/test_scheduler_sidecar.py b/tests/test_scheduler_sidecar.py new file mode 100644 index 0000000..c356921 --- /dev/null +++ b/tests/test_scheduler_sidecar.py @@ -0,0 +1,93 @@ +"""Unit tests for the env-driven JOBS builder in services.scheduler.""" + +import pytest + + +def test_build_jobs_uses_documented_defaults(monkeypatch): + """No env overrides → default cadences.""" + for v in ( + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", + ): + monkeypatch.delenv(v, raising=False) + from services.scheduler.__main__ import build_jobs, resolved_tick_seconds + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["data-refresh"] == "every 15m" + assert jobs["health-check"] == "every 5m" + assert jobs["script-runner"] == "every 1m" + assert jobs["marketplaces"] == "daily 03:00" + assert resolved_tick_seconds() == 30 + + +def test_build_jobs_honors_env_overrides(monkeypatch): + monkeypatch.setenv("SCHEDULER_DATA_REFRESH_INTERVAL", "1800") # 30m + monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60") # 1m + monkeypatch.setenv("SCHEDULER_SCRIPT_RUN_INTERVAL", "120") # 2m + monkeypatch.setenv("SCHEDULER_TICK_SECONDS", "10") + from services.scheduler.__main__ import build_jobs, resolved_tick_seconds + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["data-refresh"] == "every 30m" + assert jobs["health-check"] == "every 1m" + assert jobs["script-runner"] == "every 2m" + assert resolved_tick_seconds() == 10 + + +@pytest.mark.parametrize("var", [ + "SCHEDULER_DATA_REFRESH_INTERVAL", + "SCHEDULER_HEALTH_CHECK_INTERVAL", + "SCHEDULER_TICK_SECONDS", + "SCHEDULER_SCRIPT_RUN_INTERVAL", +]) +@pytest.mark.parametrize("bad", ["0", "-5", "abc", ""]) +def test_build_jobs_rejects_invalid_env(monkeypatch, var, bad): + monkeypatch.setenv(var, bad) + from services.scheduler.__main__ import build_jobs + with pytest.raises(ValueError): + build_jobs() + + +def test_build_jobs_rejects_tick_larger_than_smallest_interval(monkeypatch): + """Tick must be <= the smallest job interval, otherwise jobs would + consistently miss their cadence by up to one tick.""" + monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60") + monkeypatch.setenv("SCHEDULER_TICK_SECONDS", "120") + from services.scheduler.__main__ import build_jobs + with pytest.raises(ValueError, match="tick"): + build_jobs() + + +def test_build_jobs_includes_run_due_endpoint(): + """The script-runner job must POST to /api/scripts/run-due.""" + from services.scheduler.__main__ import build_jobs + target = next(j for j in build_jobs() if j[0] == "script-runner") + name, schedule, endpoint, method, _timeout = target + assert endpoint == "/api/scripts/run-due" + assert method == "POST" + + +@pytest.mark.parametrize("seconds,expected", [ + # Exact multiples of 60 → unchanged. + (60, "every 1m"), + (120, "every 2m"), + (900, "every 15m"), + # Exact multiples of 3600 → hour form. + (3600, "every 1h"), + (7200, "every 2h"), + # Non-multiples of 60 must round UP (ceiling), so the job never fires + # MORE often than the operator configured. Devin BUG_0001 on 1af2081. + (90, "every 2m"), # 90s asked → 120s scheduled, NOT 60s + (150, "every 3m"), + (61, "every 2m"), + (3601, "every 61m"), + # Sub-minute clamps to 1m (schedule grammar minute-grained). + (30, "every 1m"), + (1, "every 1m"), +]) +def test_seconds_to_schedule_rounds_up_not_down(seconds, expected): + from services.scheduler.__main__ import _seconds_to_schedule + assert _seconds_to_schedule(seconds) == expected, ( + f"_seconds_to_schedule({seconds}) must round UP — flooring would " + f"make jobs fire more often than the operator configured." + ) diff --git a/tests/test_scripts_api.py b/tests/test_scripts_api.py index 1e901c9..09dfa4f 100644 --- a/tests/test_scripts_api.py +++ b/tests/test_scripts_api.py @@ -53,15 +53,21 @@ class TestScriptsDeploy: token = seeded_app["admin_token"] resp = c.post( "/api/scripts/deploy", - json={"name": "scheduled", "source": "print('scheduled')", "schedule": "0 8 * * MON"}, + json={"name": "scheduled", "source": "print('scheduled')", "schedule": "daily 08:00"}, headers=_auth(token), ) assert resp.status_code == 201 data = resp.json() - assert data["schedule"] == "0 8 * * MON" + assert data["schedule"] == "daily 08:00" - def test_deploy_script_with_blocked_import_deploys_ok_but_run_fails(self, seeded_app): - """Deploy stores scripts as-is; safety validation happens at run time, not deploy time.""" + def test_deploy_script_with_blocked_import_rejected_at_deploy_time(self, seeded_app): + """Deploy validates the source against the safety blocklist BEFORE persisting. + + Previously deploys stored bad scripts as-is and rejected them at run + time, which combined with the new run-due endpoint created a + perpetual claim-fail-retry loop (Devin ANALYSIS_0004 on the #135 + review). Reject up front so bad scripts never land in the registry. + """ c = seeded_app["client"] admin_token = seeded_app["admin_token"] deploy_resp = c.post( @@ -69,12 +75,8 @@ class TestScriptsDeploy: json={"name": "bad_import", "source": "import os; print(os.getcwd())"}, headers=_auth(admin_token), ) - assert deploy_resp.status_code == 201 - script_id = deploy_resp.json()["id"] - - run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(admin_token)) - assert run_resp.status_code == 400 - assert "Blocked" in run_resp.json()["detail"] or "disallowed" in run_resp.json()["detail"] + assert deploy_resp.status_code == 400 + assert "Blocked" in deploy_resp.json()["detail"] or "disallowed" in deploy_resp.json()["detail"] def test_deploy_requires_auth(self, seeded_app): c = seeded_app["client"] diff --git a/tests/test_sync_filter.py b/tests/test_sync_filter.py new file mode 100644 index 0000000..c26e652 --- /dev/null +++ b/tests/test_sync_filter.py @@ -0,0 +1,314 @@ +"""Tests for the schedule-validity helper and the per-table due-filter.""" + +from datetime import datetime, timezone + +import pytest + +from src.scheduler import filter_due_tables, is_valid_schedule + + +# ---------------- is_valid_schedule ----------------------------------------- + +@pytest.mark.parametrize("schedule", [ + "every 15m", + "every 1h", + "every 6h", + "daily 05:00", + "daily 07:00,13:00,18:00", +]) +def test_is_valid_schedule_accepts_documented_formats(schedule): + assert is_valid_schedule(schedule) is True + + +@pytest.mark.parametrize("schedule", [ + "", + "every", + "every 0m", # zero is not a positive interval + "every 15s", # seconds not supported + "daily", + "daily 25:00", # invalid hour + "daily 12:60", # invalid minute + "daily 12:00,", # trailing comma + "hourly", # unknown keyword + "every -5m", # negative +]) +def test_is_valid_schedule_rejects_malformed_strings(schedule): + assert is_valid_schedule(schedule) is False + + +def test_is_valid_schedule_treats_none_as_invalid(): + # None is "no schedule" — callers handle that case before validating. + # The validator is for non-null strings only. + assert is_valid_schedule(None) is False # type: ignore[arg-type] + + +# ---------------- filter_due_tables ----------------------------------------- + +class _FakeSyncStateRepo: + """Stub SyncStateRepository — returns last_sync per table_id.""" + + def __init__(self, last_syncs: dict[str, datetime | None]): + self._data = last_syncs + + def get_last_sync(self, table_id: str): + return self._data.get(table_id) + + +def _utc(year, month, day, hour=0, minute=0): + return datetime(year, month, day, hour, minute, tzinfo=timezone.utc) + + +def test_filter_due_tables_passes_through_unscheduled_tables(): + """Tables with sync_schedule=None are always due (opt-in feature).""" + configs = [ + {"id": "t1", "name": "t1", "sync_schedule": None}, + {"id": "t2", "name": "t2", "sync_schedule": ""}, + ] + repo = _FakeSyncStateRepo({}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["t1", "t2"] + + +def test_filter_due_tables_drops_table_within_interval(): + """A table on 'every 1h' synced 30m ago is NOT due.""" + configs = [{"id": "fast", "name": "fast", "sync_schedule": "every 1h"}] + repo = _FakeSyncStateRepo({"fast": _utc(2026, 5, 1, 9, 30)}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert out == [] + + +def test_filter_due_tables_keeps_table_past_interval(): + """A table on 'every 1h' synced 90m ago IS due.""" + configs = [{"id": "fast", "name": "fast", "sync_schedule": "every 1h"}] + repo = _FakeSyncStateRepo({"fast": _utc(2026, 5, 1, 8, 30)}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["fast"] + + +def test_filter_due_tables_keeps_never_synced_table(): + """No last_sync row → always due (matches is_table_due semantics).""" + configs = [{"id": "new", "name": "new", "sync_schedule": "every 1h"}] + repo = _FakeSyncStateRepo({}) # no entry at all + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["new"] + + +def test_filter_due_tables_treats_invalid_schedule_as_unscheduled(): + """Garbled sync_schedule: log + always sync (don't silently skip).""" + configs = [{"id": "bad", "name": "bad", "sync_schedule": "BOGUS"}] + repo = _FakeSyncStateRepo({"bad": _utc(2026, 5, 1, 9, 59)}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["bad"] + + +def test_filter_due_tables_mixed_due_and_skipped(): + configs = [ + {"id": "due", "name": "due", "sync_schedule": "every 30m"}, + {"id": "skipped", "name": "skipped", "sync_schedule": "every 30m"}, + {"id": "always", "name": "always", "sync_schedule": None}, + ] + repo = _FakeSyncStateRepo({ + "due": _utc(2026, 5, 1, 9, 0), # 60m ago → due + "skipped": _utc(2026, 5, 1, 9, 50), # 10m ago → skip + }) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert sorted(c["id"] for c in out) == ["always", "due"] + + +def test_filter_due_tables_handles_naive_last_sync(): + """SyncStateRepository can return naive datetimes from older rows; helper + must coerce to UTC instead of crashing on tz-aware vs naive comparison.""" + configs = [{"id": "old", "name": "old", "sync_schedule": "every 1h"}] + naive_2h_ago = datetime(2026, 5, 1, 8, 0) # no tzinfo + repo = _FakeSyncStateRepo({"old": naive_2h_ago}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert [c["id"] for c in out] == ["old"] + + +def test_filter_due_tables_keys_lookup_by_name_when_id_differs(): + """sync_state.table_id is populated from _meta.table_name (= registry + name), NOT registry id. When id != name (auto-discovered Keboola rows), + the helper must look up sync_state by NAME or it sees last_sync=None + for every row → degrades to "always sync" → schedule no-op. See + Devin review BUG_0002 + the comment in app/api/sync.py:244-249.""" + configs = [{ + "id": "in_c-crm_company", # auto-discovered shape + "name": "company", # what _meta.table_name records + "sync_schedule": "every 1h", + }] + # 30m ago — keyed by NAME, the lookup must hit and the table must skip. + repo = _FakeSyncStateRepo({"company": _utc(2026, 5, 1, 9, 30)}) + out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0)) + assert out == [], ( + "id-keyed lookup would have missed sync_state, treated table as " + "never-synced, and kept it. The fix keys by name." + ) + + +# ---------------- _run_sync wiring ------------------------------------------ + +def test_run_sync_filters_local_tables_by_schedule(monkeypatch, tmp_path): + """`_run_sync(tables=None)` consults `filter_due_tables` and skips + tables that are not due. Manual override (`tables=[...]`) bypasses + the filter entirely.""" + from app.api import sync as sync_module + + # Stub get_data_source_type → 'keboola' so the keboola subprocess code + # path is taken (also matches the existing _run_sync shape). + monkeypatch.setattr( + sync_module, "_get_data_dir", lambda: tmp_path, + ) + import app.instance_config as instance_config + monkeypatch.setattr(instance_config, "get_data_source_type", lambda: "keboola") + + # Fake registry with one due + one skipped table. + fake_configs = [ + {"id": "due", "name": "due", "source_type": "keboola", + "sync_schedule": "every 30m", "query_mode": "local"}, + {"id": "skipped", "name": "skipped", "source_type": "keboola", + "sync_schedule": "every 30m", "query_mode": "local"}, + ] + + class _StubRegistry: + def __init__(self, conn): pass + def list_local(self, source_type=None): return list(fake_configs) + def get(self, table_id): + return next((c for c in fake_configs if c["id"] == table_id), None) + + monkeypatch.setattr(sync_module, "TableRegistryRepository", _StubRegistry) + + # Stub get_system_db (imported locally inside _run_sync from src.db). + class _FakeConn: + def close(self): pass + import src.db as _db_mod + monkeypatch.setattr(_db_mod, "get_system_db", lambda: _FakeConn()) + + # Fake sync_state: 'due' last synced 60m ago, 'skipped' 10m ago. + from datetime import datetime, timezone + last_syncs = { + "due": datetime(2026, 5, 1, 9, 0, tzinfo=timezone.utc), + "skipped": datetime(2026, 5, 1, 9, 50, tzinfo=timezone.utc), + } + + class _StubState: + def __init__(self, conn): pass + def get_last_sync(self, table_id): return last_syncs.get(table_id) + + monkeypatch.setattr(sync_module, "SyncStateRepository", _StubState) + + # Freeze 'now' inside src.scheduler.filter_due_tables. We do this by + # monkeypatching filter_due_tables itself to inject `now=`. + from src import scheduler as _sched + real_filter = _sched.filter_due_tables + monkeypatch.setattr( + sync_module, "filter_due_tables", + lambda cfgs, repo: real_filter( + cfgs, repo, now=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), + ), + ) + + # Capture the configs that subprocess.run sees (via stdin payload). + captured = {} + + def _fake_run(cmd, input, capture_output, text, timeout, env, cwd): + import json as _json + captured["configs"] = _json.loads(input) + class _R: + returncode = 0 + stdout = "{}" + stderr = "" + return _R() + + monkeypatch.setattr(sync_module.subprocess, "run", _fake_run) + + # Stub orchestrator + profiler imports inside the function so we don't + # require a real DuckDB analytics file. + import src.orchestrator as _orch_mod + + class _StubOrch: + def rebuild(self): return {} + + monkeypatch.setattr(_orch_mod, "SyncOrchestrator", _StubOrch) + + # Run with tables=None → filter applies → only 'due' goes to subprocess. + sync_module._run_sync(tables=None) + assert [c["id"] for c in captured["configs"]] == ["due"] + + # Run with explicit override → filter is BYPASSED → both go through. + captured.clear() + sync_module._run_sync(tables=["due", "skipped"]) + assert sorted(c["id"] for c in captured["configs"]) == ["due", "skipped"] + + +def test_run_sync_does_not_auto_discover_when_filter_returns_empty(monkeypatch, tmp_path): + """Devin BUG_0001 on ebb8cc9: when registry HAS tables but filter returns + [] (nothing due), the `if not table_configs` guard must NOT fire + auto-discovery. Otherwise on Keboola instances with KEBOOLA_STORAGE_TOKEN + set, every tick re-discovers + re-reads the registry without the filter, + bypassing sync_schedule entirely.""" + from app.api import sync as sync_module + + monkeypatch.setattr(sync_module, "_get_data_dir", lambda: tmp_path) + import app.instance_config as instance_config + monkeypatch.setattr(instance_config, "get_data_source_type", lambda: "keboola") + # Critical: KEBOOLA_STORAGE_TOKEN must be set to make the bug reachable. + monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "fake-token") + + # Registry has 1 table. Filter will return [] (synced 5m ago, schedule 1h). + fake_configs = [ + {"id": "fresh", "name": "fresh", "source_type": "keboola", + "sync_schedule": "every 1h", "query_mode": "local"}, + ] + + class _StubRegistry: + def __init__(self, conn): pass + def list_local(self, source_type=None): return list(fake_configs) + def get(self, table_id): + return next((c for c in fake_configs if c["id"] == table_id), None) + + monkeypatch.setattr(sync_module, "TableRegistryRepository", _StubRegistry) + + class _FakeConn: + def close(self): pass + import src.db as _db_mod + monkeypatch.setattr(_db_mod, "get_system_db", lambda: _FakeConn()) + + # 5m ago → not due under "every 1h" + from datetime import datetime, timezone + class _StubState: + def __init__(self, conn): pass + def get_last_sync(self, table_id): + return datetime(2026, 5, 1, 9, 55, tzinfo=timezone.utc) + monkeypatch.setattr(sync_module, "SyncStateRepository", _StubState) + + from src import scheduler as _sched + real_filter = _sched.filter_due_tables + monkeypatch.setattr( + sync_module, "filter_due_tables", + lambda cfgs, repo: real_filter( + cfgs, repo, now=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), + ), + ) + + # Sentinel: if auto-discovery fires, this counter increments. + discovery_calls = [] + + def _fake_discover(conn, who): + discovery_calls.append(who) + return {"registered": 0, "skipped": 0} + + import app.api.admin as _admin_mod + monkeypatch.setattr(_admin_mod, "_discover_and_register_tables", _fake_discover) + + # Subprocess + orchestrator stubs in case the function gets that far + # (it shouldn't — filter returned [], no work to do). + def _fake_run(cmd, **kw): + raise AssertionError("subprocess.run must not be called when filter returns empty") + monkeypatch.setattr(sync_module.subprocess, "run", _fake_run) + + sync_module._run_sync(tables=None) + + assert discovery_calls == [], ( + f"Auto-discovery must not fire when registry has tables but the " + f"filter excluded them all; got {discovery_calls!r} call(s)." + )