feat(scheduler): re-wire sync_schedule + script.schedule; tune via env; OpenMetadata TLS (#135)

Bundles 4 issues:
- #79 — table_registry.sync_schedule honored at runtime (API-side filter + Pydantic validators)
- #78 — script_registry.schedule honored via new POST /api/scripts/run-due (atomic claim, BackgroundTask exec, deploy-time safety validation)
- #77 — sidecar JOBS env-driven (SCHEDULER_DATA_REFRESH_INTERVAL/HEALTH_CHECK_INTERVAL/SCRIPT_RUN_INTERVAL/TICK_SECONDS)
- #89 — OpenMetadataClient verify=True default (BREAKING for self-signed)

Cuts release 0.19.0. See CHANGELOG for full notes incl. Known Limitations.
This commit is contained in:
ZdenekSrotyr 2026-04-29 22:06:30 +02:00 committed by GitHub
parent 953bd9d250
commit b7a1795834
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 2839 additions and 65 deletions

View file

@ -10,13 +10,31 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [Unreleased]
### Changed
## [0.19.0] — 2026-04-29
- **BREAKING** `GET /marketplace/info` (admin-only debug endpoint) `name` field now returns the plugin's authoritative name from its `plugin.json` (e.g. `plug-x`) instead of the slug-prefixed form (`<slug>-<plug-x>`). The slug-prefixed form moved to a new `prefixed_name` field next to it; `original_name` is unchanged. Side-effect of the `/plugin` UI fix below — the synth marketplace.json's `name` field had to switch over for Claude Code's catalog lookup to work, and `/marketplace/info` mirrors that surface for consistency. Any downstream tooling that parsed the `name` field expecting the slug-prefixed format must now read `prefixed_name`.
### Added
- `table_registry.sync_schedule` is now honored at runtime. `POST /api/sync/trigger` (called by the scheduler sidecar every 15 min by default) drops local tables whose schedule says they are not due. Tables without a schedule continue to sync on every tick (opt-in feature). Manual `POST /api/sync/trigger {"tables":[...]}` bypasses the schedule filter — operator override always wins. (#79)
- `script_registry.schedule` is now honored at runtime via the new endpoint `POST /api/scripts/run-due` (admin-only). The scheduler sidecar fires this every 60 s by default. Each due script is claimed atomically (`last_status='running'`), executed in a BackgroundTask, and the outcome written to `last_run` / `last_status`. Scripts already in `running` state are skipped — no concurrent runs of the same script. (#78)
- Four new env vars on the scheduler sidecar: `SCHEDULER_DATA_REFRESH_INTERVAL`, `SCHEDULER_HEALTH_CHECK_INTERVAL`, `SCHEDULER_SCRIPT_RUN_INTERVAL`, `SCHEDULER_TICK_SECONDS`. All accept positive integers (seconds); tick must be ≤ smallest job interval. Documented in `docs/DEPLOYMENT.md` → Scheduler tuning. (#77)
- `RegisterTableRequest.sync_schedule`, `UpdateTableRequest.sync_schedule`, and `DeployScriptRequest.schedule` now reject malformed strings with a Pydantic 422 (e.g. `"hourly"`, `"daily 25:00"`). The accepted forms are unchanged: `every Nm`, `every Nh`, `daily HH:MM[,HH:MM,...]`. **Note**: cron expressions (`"0 8 * * MON"` etc.) were never honoured by the runtime evaluator — they used to round-trip through the API as a silent no-op, and now they get a loud 422 at register/deploy time. Operators using cron strings must convert to one of the supported forms. (#78, #79)
- New `verify_ssl` knob in the `openmetadata:` section of `instance.yaml` (default `true`). Operators on internal CAs / self-signed catalogs must set it explicitly. (#89)
### Changed
- **BREAKING** `POST /api/scripts/deploy` now validates the source against the safety blocklist BEFORE persisting (previously safety checks ran only at execution time). Scripts containing blocked imports / patterns return 400 from `/deploy` instead of being stored and failing every scheduler tick. Closes the claim-fail-retry loop where the new `/api/scripts/run-due` endpoint would re-claim and re-fail an unrunnable deployed script every minute. (#78)
- **BREAKING** `OpenMetadataClient` now defaults to `verify=True` for TLS. The previous version hardcoded `verify=False` and suppressed urllib3's "Unverified HTTPS request" warning at import time (which leaked to every other httpx client in the process). Existing deployments on self-signed certificates without an explicit opt-out will start failing TLS verification — set `verify_ssl: false` in the `openmetadata:` block of `instance.yaml`, or supply a CA bundle path, before upgrading. Both production call sites (`connectors/openmetadata/enricher.py`, `src/catalog_export.py`) read the new `verify_ssl` config knob and pass it through. (#89)
- **BREAKING** `GET /marketplace/info` (admin-only debug endpoint) `name` field now returns the plugin's authoritative name from its `plugin.json` (e.g. `plug-x`) instead of the slug-prefixed form (`<slug>-<plug-x>`). The slug-prefixed form moved to a new `prefixed_name` field next to it; `original_name` is unchanged. Side-effect of the `/plugin` UI fix below — the synth marketplace.json's `name` field had to switch over for Claude Code's catalog lookup to work, and `/marketplace/info` mirrors that surface for consistency. Any downstream tooling that parsed the `name` field expecting the slug-prefixed format must now read `prefixed_name`. (#133)
### Fixed
- **`/plugin` UI in Claude Code rendered "Plugin <X> not found in marketplace" in the Components panel** for every plugin Agnes served, even though agents/skills/commands loaded correctly under the plugin's own namespace. Root cause: the synthetic `.claude-plugin/marketplace.json` listed each plugin under a slug-prefixed `name` (`<slug>-<plugin>`) while the plugin's authoritative `.claude-plugin/plugin.json` kept the original name. Claude Code resolves the loaded plugin back to its catalog entry by `plugin.json` name, so the lookup missed every entry. The synth manifest now reads the plugin's authoritative name from `<plugin_dir>/.claude-plugin/plugin.json` (falling back to the upstream marketplace.json's `name` when the plugin manifest is absent or unreadable). The directory layout under `plugins/<slug>-<plugin>/...` keeps the prefix so two upstream marketplaces that ship a same-named plugin still get distinct on-disk paths in the ZIP / git tree — their catalog entries will then collide under the same `name`, which is the correct surface (admin RBAC decides which upstream wins, same as if a user added both upstream marketplaces directly to Claude Code). `/marketplace/info` now exposes `prefixed_name` alongside `name` so operators can still disambiguate cross-marketplace shadowing. (#133)
- **`/plugin` UI in Claude Code rendered "Plugin <X> not found in marketplace" in the Components panel** for every plugin Agnes served, even though agents/skills/commands loaded correctly under the plugin's own namespace. Root cause: the synthetic `.claude-plugin/marketplace.json` listed each plugin under a slug-prefixed `name` (`<slug>-<plugin>`) while the plugin's authoritative `.claude-plugin/plugin.json` kept the original name. Claude Code resolves the loaded plugin back to its catalog entry by `plugin.json` name, so the lookup missed every entry. The synth manifest now reads the plugin's authoritative name from `<plugin_dir>/.claude-plugin/plugin.json` (falling back to the upstream marketplace.json's `name` when the plugin manifest is absent or unreadable). The directory layout under `plugins/<slug>-<plugin>/...` keeps the prefix so two upstream marketplaces that ship a same-named plugin still get distinct on-disk paths in the ZIP / git tree — their catalog entries will then collide under the same `name`, which is the correct surface (admin RBAC decides which upstream wins, same as if a user added both upstream marketplaces directly to Claude Code). `/marketplace/info` now exposes `prefixed_name` alongside `name` so operators can still disambiguate cross-marketplace shadowing.
### Internal
- `src/scheduler.py` now exports `is_valid_schedule(s)` and `filter_due_tables(configs, sync_state_repo)` for reuse across the sync filter, the script runner, and Pydantic validators.
- `ScriptRepository` gains `claim_for_run(script_id)` and `record_run_result(script_id, status)` — the atomic primitives for the scheduled-script execution path. `claim_for_run` uses `UPDATE … WHERE last_status IS DISTINCT FROM 'running' RETURNING id` for race-free claim.
- `services/scheduler/__main__.py` JOBS list refactored to a `build_jobs()` factory that reads + validates env at startup.
### Known limitations
- **Stuck `last_status='running'`**: a scheduled script whose BackgroundTask crashes mid-run (process killed, OOM, gateway timeout) stays claimed forever. Recovery: `UPDATE script_registry SET last_status = NULL WHERE id = ?` from a DuckDB shell. Auto-recovery via max-runtime detection is intentionally out of scope for v0.19.0; revisit if it bites in practice.
- **Schedule quantization rounds up**: `SCHEDULER_*_INTERVAL` accepts seconds but the underlying schedule grammar is minute-grained. Non-multiples of 60 round UP to the next minute (90 s → `every 2m`, never `every 1m`) so a job never fires more often than the operator configured. Sub-minute values clamp to `every 1m`. Documented in `docs/DEPLOYMENT.md` → Scheduler tuning.
## [0.18.0] — 2026-04-29

View file

@ -25,6 +25,7 @@ from src.identifier_validation import (
is_safe_quoted_identifier as _is_safe_quoted_identifier,
)
from src.sql_safe import is_safe_project_id as _is_safe_project_id
from src.scheduler import is_valid_schedule
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/admin", tags=["admin"])
@ -677,6 +678,22 @@ class RegisterTableRequest(BaseModel):
)
return v
@field_validator("sync_schedule", mode="before")
@classmethod
def _validate_sync_schedule(cls, v):
# None / "" → no schedule, accepted.
# Any non-empty string (including pure whitespace) must parse as a
# valid schedule — otherwise it would be persisted and silently
# ignored by the runtime evaluator.
if v in (None, ""):
return v
if not is_valid_schedule(v):
raise ValueError(
f"sync_schedule must be 'every Nm' / 'every Nh' / "
f"'daily HH:MM[,HH:MM,...]', got {v!r}"
)
return v
def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None:
"""Enforce BQ-specific shape on a register/precheck request.
@ -794,6 +811,25 @@ class UpdateTableRequest(BaseModel):
def _coerce_primary_key(cls, v):
return _normalize_primary_key(v)
# Duplicated from RegisterTableRequest — Pydantic v2 validators don't
# inherit cleanly across unrelated BaseModel classes; a shared mixin
# would be overkill for two fields.
@field_validator("sync_schedule", mode="before")
@classmethod
def _validate_sync_schedule(cls, v):
# None / "" → no schedule, accepted.
# Any non-empty string (including pure whitespace) must parse as a
# valid schedule — otherwise it would be persisted and silently
# ignored by the runtime evaluator.
if v in (None, ""):
return v
if not is_valid_schedule(v):
raise ValueError(
f"sync_schedule must be 'every Nm' / 'every Nh' / "
f"'daily HH:MM[,HH:MM,...]', got {v!r}"
)
return v
class ConfigureRequest(BaseModel):
data_source: str # "keboola" | "bigquery" | "local"

View file

@ -5,15 +5,18 @@ import subprocess
import sys
import tempfile
import uuid
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from datetime import datetime, timezone
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, field_validator
from typing import Optional
import duckdb
from app.auth.access import require_admin
from app.auth.dependencies import _get_db
from src.db import get_system_db
from src.repositories.notifications import ScriptRepository
from src.scheduler import is_valid_schedule, is_table_due
router = APIRouter(prefix="/api/scripts", tags=["scripts"])
@ -26,6 +29,22 @@ class DeployScriptRequest(BaseModel):
source: str
schedule: Optional[str] = None
@field_validator("schedule", mode="before")
@classmethod
def _validate_schedule(cls, v):
if v in (None, ""):
return None
# Pure-whitespace strings (" ") fall through to is_valid_schedule
# and reject — same convention as RegisterTableRequest.sync_schedule.
# We do NOT silently normalise whitespace to None; surfacing the
# caller's mistake at register time beats persisting an unusable value.
if not is_valid_schedule(v):
raise ValueError(
f"schedule must be 'every Nm' / 'every Nh' / "
f"'daily HH:MM[,HH:MM,...]', got {v!r}"
)
return v
class RunScriptRequest(BaseModel):
name: Optional[str] = None
@ -56,7 +75,14 @@ async def deploy_script(
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Deploy a Python script to be run on the server (optionally on schedule). Admin-only."""
"""Deploy a Python script to be run on the server (optionally on schedule). Admin-only.
Validates the source against the safety blocklist BEFORE persisting
closes the Devin claim-fail-retry loop where a script with blocked
patterns would land in script_registry, fail every scheduler tick, and
re-claim itself perpetually.
"""
_validate_script_source(request.source)
repo = ScriptRepository(conn)
script_id = str(uuid.uuid4())
repo.deploy(
@ -109,13 +135,91 @@ async def undeploy_script(
repo.undeploy(script_id)
def _execute_script(source: str, name: str) -> dict:
"""Execute a Python script in a sandboxed subprocess.
@router.post("/run-due")
async def run_due_scripts(
background_tasks: BackgroundTasks,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Run every deployed script whose ``schedule`` says it is due.
The blocklist below is defense-in-depth, not a primary trust boundary.
The role gate on the route (admin-only) is the actual boundary; the
blocklist catches obvious mistakes, not a hostile admin."""
# Comprehensive safety checks — block dangerous patterns
Iterates ``script_registry``, skips rows without a schedule (those run
only via explicit POST /{id}/run), evaluates ``is_table_due(schedule,
last_run)``, and atomically claims each due row via
``ScriptRepository.claim_for_run``. Execution is queued as a
``BackgroundTask`` so the response returns immediately the sidecar
must not block waiting on a long-running script.
Concurrency: ``claim_for_run`` flips ``last_status`` to ``'running'``
inside the same UPDATE; a script already in that state is skipped on
subsequent ticks until the BackgroundTask writes a terminal status via
``record_run_result``. There is no max-runtime detection in this PR
if a BackgroundTask crashes without writing a terminal status, the
script stays stuck in ``'running'`` until an operator clears it
manually (``UPDATE script_registry SET last_status = NULL WHERE id =
?``). Documenting this as an accepted v0 limitation; revisit if it
bites in practice.
"""
repo = ScriptRepository(conn)
claimed: list[str] = []
for script in repo.list_all():
schedule = script.get("schedule")
if not schedule:
continue
last_run = script.get("last_run")
last_run_iso = last_run.isoformat() if last_run else None
if not is_table_due(schedule, last_run_iso):
continue
if not repo.claim_for_run(script["id"]):
# Lost the race / already running — next tick will retry.
continue
claimed.append(script["id"])
background_tasks.add_task(
_run_claimed_script,
script_id=script["id"],
source=script["source"],
name=script["name"],
)
return {"claimed": claimed, "count": len(claimed)}
def _run_claimed_script(script_id: str, source: str, name: str) -> None:
"""Execute a previously-claimed script and write the terminal status.
Runs in a FastAPI BackgroundTask, so it owns its own DB connection
(the request-scoped conn is already gone by the time this fires).
``_execute_script`` only raises on safety-check violations runtime
failures (non-zero exit code, ``subprocess.TimeoutExpired`` exit -1)
are returned in the result dict, so we must inspect ``exit_code`` to
decide success vs failure rather than treating "no exception" as
success. Any exception still writes 'failure' and re-raises so the BG
handler surfaces the traceback in logs.
"""
# Fresh connection for the background task — the request-scoped conn
# was returned to FastAPI by the time this fires.
bg_conn = get_system_db()
try:
bg_repo = ScriptRepository(bg_conn)
try:
result = _execute_script(source, name)
status = "success" if result.get("exit_code", 1) == 0 else "failure"
bg_repo.record_run_result(script_id, status=status)
except Exception:
bg_repo.record_run_result(script_id, status="failure")
raise
finally:
bg_conn.close()
def _validate_script_source(source: str) -> None:
"""Reject scripts containing blocked imports / patterns.
Raises HTTPException(400) on any violation. Called from BOTH the deploy
endpoint (so bad scripts never land in script_registry closes the
Devin claim-fail-retry loop where the scheduler would re-claim and
re-fail a deployed-but-unrunnable script every tick) and from
``_execute_script`` as defense-in-depth.
"""
blocked_patterns = [
# Direct imports of dangerous modules
"import subprocess", "from subprocess",
@ -194,6 +298,16 @@ def _execute_script(source: str, name: str) -> dict:
detail=f"Script contains disallowed pattern: {pattern.split('(')[0].strip()}",
)
def _execute_script(source: str, name: str) -> dict:
"""Execute a Python script in a sandboxed subprocess.
Defense-in-depth: re-runs ``_validate_script_source`` even though the
deploy endpoint already validates. A bad script reaching this path would
indicate a registry write that bypassed the deploy contract; reject it
rather than spawn a subprocess.
"""
_validate_script_source(source)
data_dir = os.environ.get("DATA_DIR", "./data")
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:

View file

@ -3,6 +3,7 @@
import hashlib
import logging
import os
import subprocess
import traceback
from datetime import datetime, timezone
from pathlib import Path
@ -19,6 +20,7 @@ from src.repositories.sync_state import SyncStateRepository
from src.repositories.sync_settings import SyncSettingsRepository, DatasetPermissionRepository
from src.repositories.table_registry import TableRegistryRepository
from src.rbac import can_access_table
from src.scheduler import filter_due_tables
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/sync", tags=["sync"])
@ -42,32 +44,50 @@ def _run_sync(tables: Optional[List[str]] = None):
This avoids DuckDB lock conflicts subprocess never opens system.duckdb.
"""
import json as _json
import subprocess
import sys
try:
from app.instance_config import get_data_source_type, get_value
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
source_type = get_data_source_type()
data_dir = _get_data_dir()
# Read table configs in main process (has shared DuckDB connection)
sys_conn = get_system_db()
# Track whether the REGISTRY (not the post-filter list) was empty.
# Auto-discovery must only fire on a truly empty registry; if the
# filter returned [] because nothing was due, re-discovering would
# bypass the schedule entirely on Keboola instances. (Devin BUG_0001
# on ebb8cc9.)
registry_has_tables = False
try:
repo = TableRegistryRepository(sys_conn)
if tables:
# Manual operator override — bypass schedule filter entirely
# so an admin saying "sync these specific tables now" wins.
all_configs = [repo.get(t) for t in tables]
table_configs = [c for c in all_configs if c is not None]
registry_has_tables = bool(table_configs)
else:
table_configs = repo.list_local(source_type) if source_type else repo.list_local()
registry_has_tables = bool(table_configs)
# Without this filter, every scheduler tick would re-sync
# every table regardless of its sync_schedule cadence,
# making the field a no-op at trigger time. Tables with
# no schedule pass through unchanged (opt-in feature).
state_repo = SyncStateRepository(sys_conn)
table_configs = filter_due_tables(table_configs, state_repo)
finally:
sys_conn.close()
if not table_configs:
# Auto-discover tables on first sync when registry is empty
if source_type == "keboola" and os.environ.get("KEBOOLA_STORAGE_TOKEN"):
# Auto-discover tables on first sync when registry is empty.
# `not registry_has_tables` is the load-bearing guard — without
# it, "filter excluded everything" looks identical to "registry
# empty" and we'd re-discover + re-sync every tick regardless of
# sync_schedule.
if not registry_has_tables and source_type == "keboola" and os.environ.get("KEBOOLA_STORAGE_TOKEN"):
logger.info("No tables registered — running auto-discovery from Keboola")
try:
from app.api.admin import _discover_and_register_tables

View file

@ -45,6 +45,14 @@ SESSION_SECRET= # python -c "import secrets; print(secrets.token_he
# LOG_LEVEL=info # debug, info, warning, error
# CORS_ORIGINS=http://localhost:3000,http://localhost:8000
# ── SCHEDULER (sidecar tuning) ──────────────────────
# All values are in seconds and must be positive integers. SCHEDULER_TICK_SECONDS
# must be <= the smallest job interval below.
# SCHEDULER_DATA_REFRESH_INTERVAL=900 # default 15 min — POST /api/sync/trigger
# SCHEDULER_HEALTH_CHECK_INTERVAL=300 # default 5 min — GET /api/health
# SCHEDULER_SCRIPT_RUN_INTERVAL=60 # default 1 min — POST /api/scripts/run-due
# SCHEDULER_TICK_SECONDS=30 # default 30 s — loop polling cadence
# ── HTTPS / REVERSE PROXY ───────────────────────────
# Set these when the app runs behind a TLS terminator (Caddy, Cloudflare
# Tunnel, nginx, GCP LB, etc.). The app itself speaks plain HTTP on :8000;

View file

@ -130,6 +130,10 @@ data_source:
# url: "https://your-catalog.example.com"
# token: "${OPENMETADATA_TOKEN}" # JWT bearer token
# cache_ttl_seconds: 3600 # Cache TTL in seconds
# verify_ssl: true # set to false ONLY for internal
# # CAs / self-signed certs; defaults
# # to true. Setting false ships the
# # JWT over an unverified channel.
# --- Email delivery (optional, for magic link auth) ---
# Without SMTP, magic links are shown directly in browser (development mode).

View file

@ -11,13 +11,9 @@ Low-level HTTP wrapper for OpenMetadata REST API with these functions:
import json
import logging
from typing import Dict, List, Optional, Any
import warnings
import httpx
# Suppress SSL warnings for self-signed certificates
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
logger = logging.getLogger(__name__)
@ -36,6 +32,7 @@ class OpenMetadataClient:
base_url: str,
token: str,
timeout: int = 30,
verify: bool | str = True,
):
"""
Initialize OpenMetadata API client.
@ -44,6 +41,12 @@ class OpenMetadataClient:
base_url: Base URL of OpenMetadata instance (e.g., "https://catalog.example.com")
token: JWT bearer token for authentication
timeout: HTTP request timeout in seconds
verify: TLS verification True (default), False to disable
(e.g., for self-signed certificates on internal CAs), or a
path to a CA bundle. The previous version hardcoded False
globally and suppressed warnings both removed in #89.
Operators with self-signed certs should pass an explicit
``verify=False`` or a CA bundle path from their config.
"""
self.base_url = base_url.rstrip("/")
self.token = token
@ -55,7 +58,7 @@ class OpenMetadataClient:
"Content-Type": "application/json",
},
timeout=timeout,
verify=False, # Allow self-signed certificates (internal networks)
verify=verify,
)
def get_table(self, fqn: str) -> Dict[str, Any]:

View file

@ -103,6 +103,7 @@ class CatalogEnricher:
url = om_config.get("url", "").strip()
token = om_config.get("token", "").strip()
cache_ttl = om_config.get("cache_ttl_seconds", 3600)
verify_ssl = om_config.get("verify_ssl", True)
if not url or not token:
logger.debug(
@ -111,7 +112,9 @@ class CatalogEnricher:
return
self._cache_ttl_seconds = cache_ttl
self._client = OpenMetadataClient(base_url=url, token=token)
self._client = OpenMetadataClient(
base_url=url, token=token, verify=verify_ssl,
)
self.enabled = True
logger.info(

View file

@ -157,6 +157,45 @@ Two health endpoints serve different audiences:
The Docker Compose `healthcheck` uses the minimal endpoint (`curl -sf http://localhost:8000/api/health`). For external monitoring tools (Datadog, Prometheus, UptimeRobot, etc.) that need service-level detail (DuckDB status, sync freshness, user count), point them at `/api/health/detailed` with an `Authorization: Bearer <token>` header. Any authenticated user can call it; a personal access token (`da admin create-pat`) works well for service accounts.
### Scheduler tuning
The scheduler sidecar (`services/scheduler/__main__.py`) fires periodic
HTTP calls against the main app. Job cadences are configurable via env
vars on the scheduler container:
| Env var | Default | Purpose |
| ---------------------------------- | ------- | --------------------------------------------- |
| `SCHEDULER_DATA_REFRESH_INTERVAL` | `900` | seconds between `POST /api/sync/trigger` |
| `SCHEDULER_HEALTH_CHECK_INTERVAL` | `300` | seconds between `GET /api/health` |
| `SCHEDULER_SCRIPT_RUN_INTERVAL` | `60` | seconds between `POST /api/scripts/run-due` |
| `SCHEDULER_TICK_SECONDS` | `30` | loop polling cadence; must be ≤ smallest interval above |
`/api/sync/trigger` walks `table_registry`; tables with a per-row
`sync_schedule` (`every Nm` / `every Nh` / `daily HH:MM[,...]`) are
filtered to only those due for sync since their last run. Tables without
a schedule continue to run on every tick. The marketplace job runs at
`daily 03:00` UTC and is not currently env-tunable.
`/api/scripts/run-due` walks `script_registry` and runs each deployed
script whose `schedule` says it is due. Scripts in the `running` state
are skipped on subsequent ticks until the previous run writes a terminal
status. The endpoint requires admin auth (the sidecar's
`SCHEDULER_API_TOKEN` resolves to a synthetic Admin user).
#### Caveats
- **Schedule quantization rounds up.** The schedule grammar has minute-
level resolution. Non-multiples of 60 seconds round UP to the next
minute (`SCHEDULER_DATA_REFRESH_INTERVAL=90` → `every 2m`, not `every 1m`)
so a job never fires more often than configured. Sub-minute values
clamp to `every 1m`. Use multiples of 60 for predictable cadence.
- **A crashed BackgroundTask can leave a script stuck in `last_status='running'`.**
The next sidecar tick will skip the stuck script forever. Recovery is
manual: open a DuckDB shell on `system.duckdb` and run
`UPDATE script_registry SET last_status = NULL WHERE id = '<id>';`
Auto-recovery via max-runtime detection is intentionally out of scope
for v0; revisit if it happens in practice.
## Which path should I pick?
| | Terraform | Docker Compose |

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[project]
name = "agnes-the-ai-analyst"
version = "0.18.0"
version = "0.19.0"
description = "Agnes — AI Data Analyst platform for AI analytical systems"
requires-python = ">=3.11,<3.14"
license = "MIT"

View file

@ -69,24 +69,83 @@ def _get_auth_token() -> str:
return ""
# Schedule definitions: (name, schedule_string, endpoint, method, timeout_sec).
# All jobs are HTTP — see the module docstring for why nothing runs
# in-process anymore. ``daily 03:00`` for marketplaces matches the cadence
# the previous in-process job used; the endpoint is admin-only and
# idempotent (it iterates the registry and per-marketplace errors do not
# abort the run).
#
# timeout_sec: per-job override for the httpx call. Marketplaces gets a
# generous 15 min because the app handler iterates every registered
# marketplace under a single lock with up to 300s of git timeout per
# entry — at 120s (the default that data-refresh uses) a real-world
# registry of more than 2-3 slow repos times out the scheduler call,
# which then re-fires on the next 30s tick and queues a redundant sync.
JOBS = [
("data-refresh", "every 15m", "/api/sync/trigger", "POST", 120),
("health-check", "every 5m", "/api/health", "GET", 30),
("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900),
]
# --- Env parsing ------------------------------------------------------------
_DEFAULTS = {
"SCHEDULER_DATA_REFRESH_INTERVAL": 15 * 60, # seconds
"SCHEDULER_HEALTH_CHECK_INTERVAL": 5 * 60,
"SCHEDULER_SCRIPT_RUN_INTERVAL": 1 * 60,
"SCHEDULER_TICK_SECONDS": 30,
}
def _read_positive_int(name: str) -> int:
"""Read an env var as a positive integer or fall back to the default.
Treats unset env (``None``) as "use default". Treats explicitly empty
string (``""``) as an operator typo and raises silently defaulting
on a literal ``FOO=`` in the env_file would mask configuration bugs.
"""
raw = os.environ.get(name)
if raw is None:
if name not in _DEFAULTS:
raise ValueError(f"Unknown scheduler env var: {name}")
return _DEFAULTS[name]
if raw == "":
raise ValueError(f"{name}='' must be a positive integer (seconds)")
try:
value = int(raw)
except (TypeError, ValueError):
raise ValueError(f"{name}={raw!r} must be a positive integer (seconds)")
if value <= 0:
raise ValueError(f"{name}={value} must be > 0 (seconds)")
return value
def _seconds_to_schedule(seconds: int) -> str:
"""Convert a seconds value to the closest 'every Nm' / 'every Nh' string.
Uses ceiling division so a non-multiple-of-60 input never produces a
schedule that fires MORE often than the operator configured (90s
'every 2m', not 'every 1m'). Sub-minute inputs clamp to 'every 1m'
because the schedule grammar has minute-level resolution.
"""
if seconds % 3600 == 0 and seconds >= 3600:
return f"every {seconds // 3600}h"
# Ceiling division: -(-x // y) is the standard trick.
minutes = max(1, -(-seconds // 60))
return f"every {minutes}m"
def resolved_tick_seconds() -> int:
"""Read + validate SCHEDULER_TICK_SECONDS in isolation (test helper)."""
return _read_positive_int("SCHEDULER_TICK_SECONDS")
def build_jobs() -> list[tuple[str, str, str, str, int]]:
"""Build the JOBS list from env, applying defaults and validation.
Tuple shape: (name, schedule_string, endpoint, method, http_timeout_sec).
Marketplaces stays hardcoded promoting it to env is out of #77 scope.
"""
refresh = _read_positive_int("SCHEDULER_DATA_REFRESH_INTERVAL")
health = _read_positive_int("SCHEDULER_HEALTH_CHECK_INTERVAL")
scripts = _read_positive_int("SCHEDULER_SCRIPT_RUN_INTERVAL")
tick = _read_positive_int("SCHEDULER_TICK_SECONDS")
smallest = min(refresh, health, scripts)
if tick > smallest:
raise ValueError(
f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job "
f"interval ({smallest}s) so jobs don't consistently miss their "
f"cadence by up to one tick"
)
return [
("data-refresh", _seconds_to_schedule(refresh), "/api/sync/trigger", "POST", 120),
("health-check", _seconds_to_schedule(health), "/api/health", "GET", 30),
("script-runner", _seconds_to_schedule(scripts), "/api/scripts/run-due", "POST", 600),
("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900),
]
_running = True
@ -124,24 +183,26 @@ def run():
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
logger.info(f"Scheduler started. API_URL={API_URL}, {len(JOBS)} jobs configured.")
jobs = build_jobs()
tick = resolved_tick_seconds()
logger.info(
"Scheduler started. API_URL=%s, %d jobs, tick=%ds. Schedules: %s",
API_URL, len(jobs), tick,
{name: schedule for name, schedule, *_ in jobs},
)
# Track last successful run per job as ISO string — matches what
# src.scheduler.is_table_due expects.
last_run: dict[str, str | None] = {name: None for name, *_ in JOBS}
last_run: dict[str, str | None] = {name: None for name, *_ in jobs}
while _running:
now_iso = datetime.now(timezone.utc).isoformat()
for name, schedule, endpoint, method, timeout_sec in JOBS:
for name, schedule, endpoint, method, timeout_sec in jobs:
if not is_table_due(schedule, last_run[name]):
continue
logger.info("Running job: %s (%s)", name, schedule)
ok = _call_api(endpoint, method, timeout_sec)
if ok:
last_run[name] = now_iso
# 30s tick is plenty: interval jobs have minute-level resolution,
# daily jobs have a ~24 h retry window.
time.sleep(30)
time.sleep(tick)
logger.info("Scheduler stopped.")

View file

@ -418,6 +418,7 @@ def main() -> None:
om_config = instance_config.get("openmetadata", {})
catalog_url = om_config.get("url", "").strip()
token = om_config.get("token", "").strip()
verify_ssl = om_config.get("verify_ssl", True)
if not catalog_url or not token:
logger.warning("OpenMetadata not configured (url or token missing) - skipping export")
@ -428,7 +429,9 @@ def main() -> None:
# Initialize client
try:
client = OpenMetadataClient(base_url=catalog_url, token=token)
client = OpenMetadataClient(
base_url=catalog_url, token=token, verify=verify_ssl,
)
except Exception as e:
logger.warning(f"Failed to initialize OpenMetadata client: {e}")
return

View file

@ -103,3 +103,40 @@ class ScriptRepository:
return []
columns = [desc[0] for desc in self.conn.description]
return [dict(zip(columns, row)) for row in results]
def claim_for_run(self, script_id: str) -> bool:
"""Atomically set last_status='running' iff the script is idle.
Returns True iff this caller is the new owner of the run slot.
Returns False if the script does not exist OR is already running.
Implementation: UPDATE WHERE last_status IS DISTINCT FROM 'running'
+ RETURNING id. DuckDB supports IS DISTINCT FROM and RETURNING; if
zero rows come back, somebody else already owns the slot.
"""
now = datetime.now(timezone.utc)
result = self.conn.execute(
"""UPDATE script_registry
SET last_status = 'running', last_run = ?
WHERE id = ?
AND last_status IS DISTINCT FROM 'running'
RETURNING id""",
[now, script_id],
).fetchone()
return result is not None
def record_run_result(self, script_id: str, status: str) -> None:
"""Write the terminal status of a finished run (clears 'running').
Accepts only 'success' or 'failure' 'running' would re-arm the
flag instead of clearing it, defeating the purpose of the call.
"""
if status not in ("success", "failure"):
raise ValueError(
f"record_run_result: status must be 'success' or 'failure', "
f"got {status!r}"
)
self.conn.execute(
"UPDATE script_registry SET last_status = ? WHERE id = ?",
[status, script_id],
)

View file

@ -168,3 +168,90 @@ def _parse_timestamp(iso_string: str) -> Optional[datetime]:
continue
return None
def is_valid_schedule(schedule: Optional[str]) -> bool:
"""Return True iff ``schedule`` parses as a documented schedule string.
Accepted forms (mirroring the rest of this module):
- ``"every Nm"`` / ``"every Nh"`` with N a positive integer
- ``"daily HH:MM"`` (24-h, UTC) optionally comma-separated:
``"daily 07:00,13:00"``
Anything else including ``None``, empty string, or a parseable-looking
but out-of-range value (``"daily 25:00"``) returns False. Pydantic
validators on the admin API call this to reject malformed input with
422 instead of accepting it and silently no-op'ing later.
"""
if not schedule or not isinstance(schedule, str):
return False
interval = parse_interval_minutes(schedule)
if interval is not None:
return interval > 0
match = DAILY_PATTERN.match(schedule)
if not match:
return False
return bool(_parse_daily_times(match.group(1)))
def filter_due_tables(
table_configs: list[dict],
sync_state_repo,
now: Optional[datetime] = None,
) -> list[dict]:
"""Drop table configs whose ``sync_schedule`` says they are not due.
Behaviour:
- ``sync_schedule`` is None / empty / not a valid string table passes
through (no schedule = "sync on every tick", existing behaviour).
- Valid schedule + last_sync within the cadence drop.
- Valid schedule + last_sync past cadence (or never) keep.
- Invalid schedule string log a warning and let the table through
(do NOT silently skip operator surprise is worse than a redundant
sync).
``sync_state_repo`` is duck-typed: only ``get_last_sync(table_id)`` is
called, returning a ``datetime`` (tz-aware preferred, naive treated as
UTC) or ``None``.
"""
if now is None:
now = datetime.now(timezone.utc)
out: list[dict] = []
for tc in table_configs:
# sync_state.table_id is populated from _meta.table_name by the
# orchestrator and equals table_registry.name (NOT id). When
# id != name (auto-discovered Keboola rows: id="in_c-crm_company",
# name="company") an id-keyed lookup misses every row and the
# filter degrades to "always sync" — defeating the schedule. The
# same pitfall is documented at app/api/sync.py:244-249.
table_id = tc.get("name") or tc.get("id")
schedule = tc.get("sync_schedule")
if not schedule:
out.append(tc)
continue
if not is_valid_schedule(schedule):
logger.warning(
"Table %s has malformed sync_schedule %r — syncing anyway "
"(fix the schedule string to suppress this message)",
table_id,
schedule,
)
out.append(tc)
continue
last_sync = sync_state_repo.get_last_sync(table_id)
if last_sync is None:
last_sync_iso = None
else:
if last_sync.tzinfo is None:
last_sync = last_sync.replace(tzinfo=timezone.utc)
last_sync_iso = last_sync.isoformat()
if is_table_due(schedule, last_sync_iso, now=now):
out.append(tc)
else:
logger.info(
"Table %s skipped: schedule=%r, last_sync=%s, not due yet",
table_id,
schedule,
last_sync_iso,
)
return out

View file

@ -32,7 +32,7 @@ class TableRegistryFactory:
_SOURCE_TYPES = ["keboola", "bigquery", "csv"]
_QUERY_MODES = ["local", "remote"]
_SCHEDULES = ["0 * * * *", "0 6 * * *", "*/30 * * * *"]
_SCHEDULES = ["every 1h", "daily 06:00", "every 30m"]
@staticmethod
def build(**overrides) -> dict[str, Any]:

View file

@ -17,6 +17,9 @@ import json
from unittest.mock import MagicMock, patch
import pytest
from pydantic import ValidationError
from app.api.admin import RegisterTableRequest, UpdateTableRequest
def _auth(token):
@ -2042,3 +2045,46 @@ class TestRegisterTablePrecheckHandlerIsSync:
"it in a threadpool; otherwise the synchronous bigquery.Client "
"calls block the asyncio event loop."
)
# --- sync_schedule format validation (#79) ----------------------------------
@pytest.mark.parametrize("schedule", [
"every 15m",
"every 1h",
"daily 05:00",
"daily 07:00,13:00,18:00",
None, # explicit None is allowed (no schedule = always sync)
])
def test_register_request_accepts_valid_sync_schedule(schedule):
req = RegisterTableRequest(name="orders", sync_schedule=schedule)
assert req.sync_schedule == schedule
@pytest.mark.parametrize("schedule", [
"hourly",
"every 0m",
"daily 25:00",
"every 5x",
" ",
])
def test_register_request_rejects_malformed_sync_schedule(schedule):
with pytest.raises(ValidationError) as exc_info:
RegisterTableRequest(name="orders", sync_schedule=schedule)
assert "sync_schedule" in str(exc_info.value)
@pytest.mark.parametrize("schedule", [
"every 30m",
"daily 08:00",
None,
])
def test_update_request_accepts_valid_sync_schedule(schedule):
req = UpdateTableRequest(sync_schedule=schedule)
assert req.sync_schedule == schedule
def test_update_request_rejects_malformed_sync_schedule():
with pytest.raises(ValidationError):
UpdateTableRequest(sync_schedule="weekly")

View file

@ -403,7 +403,7 @@ class TestRegisterTable:
"bucket": "in.c-crm",
"source_table": "full_table",
"query_mode": "local",
"sync_schedule": "0 6 * * *",
"sync_schedule": "daily 06:00",
"description": "Full configuration table",
"profile_after_sync": True,
},

View file

@ -86,7 +86,7 @@ class TestScriptsAPI:
admin_headers = {"Authorization": f"Bearer {admin_token}"}
resp = c.post("/api/scripts/deploy", json={
"name": "calc", "source": "print(2+2)", "schedule": "0 8 * * MON",
"name": "calc", "source": "print(2+2)", "schedule": "daily 08:00",
}, headers=admin_headers)
script_id = resp.json()["id"]

View file

@ -2,6 +2,8 @@
Tests for OpenMetadata client
"""
import warnings
import pytest
import httpx
from unittest.mock import Mock, patch, MagicMock
@ -155,3 +157,48 @@ def test_context_manager():
# Verify close() was called
mock_client_instance.close.assert_called_once()
# --- TLS verify (#89) -------------------------------------------------------
def test_client_verifies_tls_by_default():
"""Default `verify=True` — no more silent MITM exposure of the JWT."""
with patch("connectors.openmetadata.client.httpx.Client") as mock_client:
OpenMetadataClient(base_url="https://catalog.example.com", token="t")
kwargs = mock_client.call_args.kwargs
assert kwargs["verify"] is True
def test_client_accepts_explicit_verify_false():
"""Operators on internal CAs may opt out — but it must be explicit."""
with patch("connectors.openmetadata.client.httpx.Client") as mock_client:
OpenMetadataClient(base_url="https://catalog.example.com", token="t", verify=False)
assert mock_client.call_args.kwargs["verify"] is False
def test_client_accepts_custom_ca_bundle_path():
"""A path string passed to verify is forwarded to httpx untouched
(httpx then uses it as the trust store)."""
with patch("connectors.openmetadata.client.httpx.Client") as mock_client:
OpenMetadataClient(
base_url="https://catalog.example.com",
token="t",
verify="/etc/ssl/certs/internal-ca.pem",
)
assert mock_client.call_args.kwargs["verify"] == "/etc/ssl/certs/internal-ca.pem"
def test_module_import_does_not_mutate_global_warnings_filter():
"""The previous version called warnings.filterwarnings('ignore', ...)
at import time, suppressing urllib3 warnings for ALL httpx clients in
the process. Drop the side effect."""
import importlib
pre_filters = list(warnings.filters)
import connectors.openmetadata.client as om
importlib.reload(om)
post_filters = list(warnings.filters)
new = [f for f in post_filters if f not in pre_filters]
for action, message, *_ in new:
if message is not None:
assert "Unverified HTTPS request" not in message.pattern

View file

@ -247,10 +247,10 @@ class TestScriptRepository:
from src.repositories.notifications import ScriptRepository
repo = ScriptRepository(db_conn)
repo.deploy("s1", name="sales_alert", owner="u1",
schedule="0 8 * * MON", source="print('hello')")
schedule="daily 08:00", source="print('hello')")
script = repo.get("s1")
assert script is not None
assert script["schedule"] == "0 8 * * MON"
assert script["schedule"] == "daily 08:00"
def test_list_all(self, db_conn):
from src.repositories.notifications import ScriptRepository

View file

@ -0,0 +1,238 @@
"""Tests for the scheduled-script runner — repo claim/release primitives,
the run-due endpoint, and Pydantic validation on DeployScriptRequest."""
from datetime import datetime, timezone
import pytest
from pydantic import ValidationError
from app.api.scripts import DeployScriptRequest
from src.db import get_system_db
from src.repositories.notifications import ScriptRepository
@pytest.fixture()
def conn(tmp_path, monkeypatch):
"""Fresh system.duckdb in a tmp dir — uses real schema, no mocks."""
monkeypatch.setenv("DATA_DIR", str(tmp_path))
state_dir = tmp_path / "state"
state_dir.mkdir(parents=True, exist_ok=True)
c = get_system_db()
yield c
c.close()
def _deploy(repo: ScriptRepository, script_id="s1", schedule="every 1h"):
repo.deploy(id=script_id, name=script_id, owner="u1",
schedule=schedule, source="print('hi')")
# ---------------- claim_for_run ---------------------------------------------
def test_claim_for_run_succeeds_when_idle(conn):
repo = ScriptRepository(conn)
_deploy(repo)
assert repo.claim_for_run("s1") is True
row = repo.get("s1")
assert row["last_status"] == "running"
assert row["last_run"] is not None
def test_claim_for_run_fails_when_already_running(conn):
repo = ScriptRepository(conn)
_deploy(repo)
assert repo.claim_for_run("s1") is True
# Second claim should fail because last_status is still 'running'.
assert repo.claim_for_run("s1") is False
def test_claim_for_run_succeeds_after_completion(conn):
repo = ScriptRepository(conn)
_deploy(repo)
repo.claim_for_run("s1")
repo.record_run_result("s1", status="success")
# Now claimable again.
assert repo.claim_for_run("s1") is True
def test_claim_for_run_returns_false_for_unknown_script(conn):
repo = ScriptRepository(conn)
assert repo.claim_for_run("does-not-exist") is False
# ---------------- record_run_result -----------------------------------------
@pytest.mark.parametrize("status", ["success", "failure"])
def test_record_run_result_writes_terminal_status(conn, status):
repo = ScriptRepository(conn)
_deploy(repo)
repo.claim_for_run("s1")
repo.record_run_result("s1", status=status)
row = repo.get("s1")
assert row["last_status"] == status
def test_record_run_result_rejects_running_as_terminal(conn):
"""The 'running' string is reserved for claim_for_run; record_run_result
must reject it so a caller can't accidentally re-arm the running flag
instead of clearing it."""
repo = ScriptRepository(conn)
_deploy(repo)
repo.claim_for_run("s1")
with pytest.raises(ValueError):
repo.record_run_result("s1", status="running")
# ---------------- DeployScriptRequest.schedule validation -------------------
def test_deploy_request_accepts_valid_schedule():
req = DeployScriptRequest(name="report", source="print(1)", schedule="every 1h")
assert req.schedule == "every 1h"
def test_deploy_request_accepts_no_schedule():
req = DeployScriptRequest(name="report", source="print(1)")
assert req.schedule is None
def test_deploy_request_rejects_malformed_schedule():
with pytest.raises(ValidationError):
DeployScriptRequest(name="report", source="print(1)", schedule="weekly")
# ---------------- /api/scripts/run-due endpoint -----------------------------
def _auth(token):
return {"Authorization": f"Bearer {token}"}
def test_run_due_skips_scripts_without_schedule(seeded_app, monkeypatch):
"""A script with schedule=NULL is never picked up by run-due (those
are run only via explicit POST /api/scripts/{id}/run)."""
monkeypatch.setattr(
"app.api.scripts._execute_script",
lambda src, name: {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False},
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
deploy = c.post(
"/api/scripts/deploy",
json={"name": "manual-only", "source": "print(1)"},
headers=_auth(token),
)
assert deploy.status_code == 201
resp = c.post("/api/scripts/run-due", headers=_auth(token))
assert resp.status_code == 200
assert resp.json()["claimed"] == []
def test_run_due_claims_due_scripts(seeded_app, monkeypatch):
"""A script on 'every 1h' that has never run gets claimed and executed."""
calls = []
def _fake_exec(source, name):
calls.append(name)
return {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False}
monkeypatch.setattr("app.api.scripts._execute_script", _fake_exec)
c = seeded_app["client"]
token = seeded_app["admin_token"]
deploy = c.post(
"/api/scripts/deploy",
json={"name": "report", "source": "print(1)", "schedule": "every 1h"},
headers=_auth(token),
)
assert deploy.status_code == 201
script_id = deploy.json()["id"]
resp = c.post("/api/scripts/run-due", headers=_auth(token))
assert resp.status_code == 200
body = resp.json()
assert body["claimed"] == [script_id]
# BackgroundTasks runs synchronously inside TestClient, so the call
# has happened by now.
assert "report" in calls
def test_run_due_records_failure_when_script_exits_nonzero(seeded_app, monkeypatch):
"""`_execute_script` returns `{exit_code: N, ...}` for non-zero exits +
timeouts (only safety violations RAISE). `_run_claimed_script` must
inspect exit_code rather than treat "no exception" as success see
Devin review BUG_0001."""
monkeypatch.setattr(
"app.api.scripts._execute_script",
lambda src, name: {
"name": name, "exit_code": 1,
"stdout": "", "stderr": "boom", "truncated": False,
},
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
deploy = c.post(
"/api/scripts/deploy",
json={"name": "broken", "source": "print(1)", "schedule": "every 1h"},
headers=_auth(token),
)
assert deploy.status_code == 201
script_id = deploy.json()["id"]
resp = c.post("/api/scripts/run-due", headers=_auth(token))
assert resp.json()["claimed"] == [script_id]
# BackgroundTasks runs synchronously inside TestClient — by now the
# terminal status must be 'failure', not 'success'.
listing = c.get("/api/scripts", headers=_auth(token)).json()["scripts"]
row = next(s for s in listing if s["id"] == script_id)
assert row["last_status"] == "failure", (
f"non-zero exit_code must record 'failure', got {row['last_status']!r}"
)
def test_run_due_records_success_when_script_exits_zero(seeded_app, monkeypatch):
"""Mirror of the failure test — exit_code=0 must record 'success'."""
monkeypatch.setattr(
"app.api.scripts._execute_script",
lambda src, name: {
"name": name, "exit_code": 0,
"stdout": "ok", "stderr": "", "truncated": False,
},
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
deploy = c.post(
"/api/scripts/deploy",
json={"name": "good", "source": "print(1)", "schedule": "every 1h"},
headers=_auth(token),
)
assert deploy.status_code == 201
script_id = deploy.json()["id"]
c.post("/api/scripts/run-due", headers=_auth(token))
listing = c.get("/api/scripts", headers=_auth(token)).json()["scripts"]
row = next(s for s in listing if s["id"] == script_id)
assert row["last_status"] == "success"
def test_run_due_skips_scripts_already_running(seeded_app, monkeypatch):
"""A script in 'running' state must not be re-claimed by a second
sidecar tick that arrives while the previous run is still going."""
monkeypatch.setattr(
"app.api.scripts._execute_script",
# Simulate a slow run by NOT updating last_status — repo.claim_for_run
# already wrote 'running'; we leave it that way.
lambda src, name: {"name": name, "exit_code": 0, "stdout": "", "stderr": "", "truncated": False},
)
# Patch out record_run_result so the run never "completes".
monkeypatch.setattr(
"src.repositories.notifications.ScriptRepository.record_run_result",
lambda self, *a, **kw: None,
)
c = seeded_app["client"]
token = seeded_app["admin_token"]
deploy = c.post(
"/api/scripts/deploy",
json={"name": "long", "source": "print(1)", "schedule": "every 1h"},
headers=_auth(token),
)
assert deploy.status_code == 201
script_id = deploy.json()["id"]
first = c.post("/api/scripts/run-due", headers=_auth(token))
assert first.json()["claimed"] == [script_id]
second = c.post("/api/scripts/run-due", headers=_auth(token))
assert second.json()["claimed"] == []

View file

@ -0,0 +1,93 @@
"""Unit tests for the env-driven JOBS builder in services.scheduler."""
import pytest
def test_build_jobs_uses_documented_defaults(monkeypatch):
"""No env overrides → default cadences."""
for v in (
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
):
monkeypatch.delenv(v, raising=False)
from services.scheduler.__main__ import build_jobs, resolved_tick_seconds
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
assert jobs["data-refresh"] == "every 15m"
assert jobs["health-check"] == "every 5m"
assert jobs["script-runner"] == "every 1m"
assert jobs["marketplaces"] == "daily 03:00"
assert resolved_tick_seconds() == 30
def test_build_jobs_honors_env_overrides(monkeypatch):
monkeypatch.setenv("SCHEDULER_DATA_REFRESH_INTERVAL", "1800") # 30m
monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60") # 1m
monkeypatch.setenv("SCHEDULER_SCRIPT_RUN_INTERVAL", "120") # 2m
monkeypatch.setenv("SCHEDULER_TICK_SECONDS", "10")
from services.scheduler.__main__ import build_jobs, resolved_tick_seconds
jobs = {name: schedule for name, schedule, *_ in build_jobs()}
assert jobs["data-refresh"] == "every 30m"
assert jobs["health-check"] == "every 1m"
assert jobs["script-runner"] == "every 2m"
assert resolved_tick_seconds() == 10
@pytest.mark.parametrize("var", [
"SCHEDULER_DATA_REFRESH_INTERVAL",
"SCHEDULER_HEALTH_CHECK_INTERVAL",
"SCHEDULER_TICK_SECONDS",
"SCHEDULER_SCRIPT_RUN_INTERVAL",
])
@pytest.mark.parametrize("bad", ["0", "-5", "abc", ""])
def test_build_jobs_rejects_invalid_env(monkeypatch, var, bad):
monkeypatch.setenv(var, bad)
from services.scheduler.__main__ import build_jobs
with pytest.raises(ValueError):
build_jobs()
def test_build_jobs_rejects_tick_larger_than_smallest_interval(monkeypatch):
"""Tick must be <= the smallest job interval, otherwise jobs would
consistently miss their cadence by up to one tick."""
monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60")
monkeypatch.setenv("SCHEDULER_TICK_SECONDS", "120")
from services.scheduler.__main__ import build_jobs
with pytest.raises(ValueError, match="tick"):
build_jobs()
def test_build_jobs_includes_run_due_endpoint():
"""The script-runner job must POST to /api/scripts/run-due."""
from services.scheduler.__main__ import build_jobs
target = next(j for j in build_jobs() if j[0] == "script-runner")
name, schedule, endpoint, method, _timeout = target
assert endpoint == "/api/scripts/run-due"
assert method == "POST"
@pytest.mark.parametrize("seconds,expected", [
# Exact multiples of 60 → unchanged.
(60, "every 1m"),
(120, "every 2m"),
(900, "every 15m"),
# Exact multiples of 3600 → hour form.
(3600, "every 1h"),
(7200, "every 2h"),
# Non-multiples of 60 must round UP (ceiling), so the job never fires
# MORE often than the operator configured. Devin BUG_0001 on 1af2081.
(90, "every 2m"), # 90s asked → 120s scheduled, NOT 60s
(150, "every 3m"),
(61, "every 2m"),
(3601, "every 61m"),
# Sub-minute clamps to 1m (schedule grammar minute-grained).
(30, "every 1m"),
(1, "every 1m"),
])
def test_seconds_to_schedule_rounds_up_not_down(seconds, expected):
from services.scheduler.__main__ import _seconds_to_schedule
assert _seconds_to_schedule(seconds) == expected, (
f"_seconds_to_schedule({seconds}) must round UP — flooring would "
f"make jobs fire more often than the operator configured."
)

View file

@ -53,15 +53,21 @@ class TestScriptsDeploy:
token = seeded_app["admin_token"]
resp = c.post(
"/api/scripts/deploy",
json={"name": "scheduled", "source": "print('scheduled')", "schedule": "0 8 * * MON"},
json={"name": "scheduled", "source": "print('scheduled')", "schedule": "daily 08:00"},
headers=_auth(token),
)
assert resp.status_code == 201
data = resp.json()
assert data["schedule"] == "0 8 * * MON"
assert data["schedule"] == "daily 08:00"
def test_deploy_script_with_blocked_import_deploys_ok_but_run_fails(self, seeded_app):
"""Deploy stores scripts as-is; safety validation happens at run time, not deploy time."""
def test_deploy_script_with_blocked_import_rejected_at_deploy_time(self, seeded_app):
"""Deploy validates the source against the safety blocklist BEFORE persisting.
Previously deploys stored bad scripts as-is and rejected them at run
time, which combined with the new run-due endpoint created a
perpetual claim-fail-retry loop (Devin ANALYSIS_0004 on the #135
review). Reject up front so bad scripts never land in the registry.
"""
c = seeded_app["client"]
admin_token = seeded_app["admin_token"]
deploy_resp = c.post(
@ -69,12 +75,8 @@ class TestScriptsDeploy:
json={"name": "bad_import", "source": "import os; print(os.getcwd())"},
headers=_auth(admin_token),
)
assert deploy_resp.status_code == 201
script_id = deploy_resp.json()["id"]
run_resp = c.post(f"/api/scripts/{script_id}/run", headers=_auth(admin_token))
assert run_resp.status_code == 400
assert "Blocked" in run_resp.json()["detail"] or "disallowed" in run_resp.json()["detail"]
assert deploy_resp.status_code == 400
assert "Blocked" in deploy_resp.json()["detail"] or "disallowed" in deploy_resp.json()["detail"]
def test_deploy_requires_auth(self, seeded_app):
c = seeded_app["client"]

314
tests/test_sync_filter.py Normal file
View file

@ -0,0 +1,314 @@
"""Tests for the schedule-validity helper and the per-table due-filter."""
from datetime import datetime, timezone
import pytest
from src.scheduler import filter_due_tables, is_valid_schedule
# ---------------- is_valid_schedule -----------------------------------------
@pytest.mark.parametrize("schedule", [
"every 15m",
"every 1h",
"every 6h",
"daily 05:00",
"daily 07:00,13:00,18:00",
])
def test_is_valid_schedule_accepts_documented_formats(schedule):
assert is_valid_schedule(schedule) is True
@pytest.mark.parametrize("schedule", [
"",
"every",
"every 0m", # zero is not a positive interval
"every 15s", # seconds not supported
"daily",
"daily 25:00", # invalid hour
"daily 12:60", # invalid minute
"daily 12:00,", # trailing comma
"hourly", # unknown keyword
"every -5m", # negative
])
def test_is_valid_schedule_rejects_malformed_strings(schedule):
assert is_valid_schedule(schedule) is False
def test_is_valid_schedule_treats_none_as_invalid():
# None is "no schedule" — callers handle that case before validating.
# The validator is for non-null strings only.
assert is_valid_schedule(None) is False # type: ignore[arg-type]
# ---------------- filter_due_tables -----------------------------------------
class _FakeSyncStateRepo:
"""Stub SyncStateRepository — returns last_sync per table_id."""
def __init__(self, last_syncs: dict[str, datetime | None]):
self._data = last_syncs
def get_last_sync(self, table_id: str):
return self._data.get(table_id)
def _utc(year, month, day, hour=0, minute=0):
return datetime(year, month, day, hour, minute, tzinfo=timezone.utc)
def test_filter_due_tables_passes_through_unscheduled_tables():
"""Tables with sync_schedule=None are always due (opt-in feature)."""
configs = [
{"id": "t1", "name": "t1", "sync_schedule": None},
{"id": "t2", "name": "t2", "sync_schedule": ""},
]
repo = _FakeSyncStateRepo({})
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert [c["id"] for c in out] == ["t1", "t2"]
def test_filter_due_tables_drops_table_within_interval():
"""A table on 'every 1h' synced 30m ago is NOT due."""
configs = [{"id": "fast", "name": "fast", "sync_schedule": "every 1h"}]
repo = _FakeSyncStateRepo({"fast": _utc(2026, 5, 1, 9, 30)})
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert out == []
def test_filter_due_tables_keeps_table_past_interval():
"""A table on 'every 1h' synced 90m ago IS due."""
configs = [{"id": "fast", "name": "fast", "sync_schedule": "every 1h"}]
repo = _FakeSyncStateRepo({"fast": _utc(2026, 5, 1, 8, 30)})
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert [c["id"] for c in out] == ["fast"]
def test_filter_due_tables_keeps_never_synced_table():
"""No last_sync row → always due (matches is_table_due semantics)."""
configs = [{"id": "new", "name": "new", "sync_schedule": "every 1h"}]
repo = _FakeSyncStateRepo({}) # no entry at all
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert [c["id"] for c in out] == ["new"]
def test_filter_due_tables_treats_invalid_schedule_as_unscheduled():
"""Garbled sync_schedule: log + always sync (don't silently skip)."""
configs = [{"id": "bad", "name": "bad", "sync_schedule": "BOGUS"}]
repo = _FakeSyncStateRepo({"bad": _utc(2026, 5, 1, 9, 59)})
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert [c["id"] for c in out] == ["bad"]
def test_filter_due_tables_mixed_due_and_skipped():
configs = [
{"id": "due", "name": "due", "sync_schedule": "every 30m"},
{"id": "skipped", "name": "skipped", "sync_schedule": "every 30m"},
{"id": "always", "name": "always", "sync_schedule": None},
]
repo = _FakeSyncStateRepo({
"due": _utc(2026, 5, 1, 9, 0), # 60m ago → due
"skipped": _utc(2026, 5, 1, 9, 50), # 10m ago → skip
})
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert sorted(c["id"] for c in out) == ["always", "due"]
def test_filter_due_tables_handles_naive_last_sync():
"""SyncStateRepository can return naive datetimes from older rows; helper
must coerce to UTC instead of crashing on tz-aware vs naive comparison."""
configs = [{"id": "old", "name": "old", "sync_schedule": "every 1h"}]
naive_2h_ago = datetime(2026, 5, 1, 8, 0) # no tzinfo
repo = _FakeSyncStateRepo({"old": naive_2h_ago})
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert [c["id"] for c in out] == ["old"]
def test_filter_due_tables_keys_lookup_by_name_when_id_differs():
"""sync_state.table_id is populated from _meta.table_name (= registry
name), NOT registry id. When id != name (auto-discovered Keboola rows),
the helper must look up sync_state by NAME or it sees last_sync=None
for every row degrades to "always sync" schedule no-op. See
Devin review BUG_0002 + the comment in app/api/sync.py:244-249."""
configs = [{
"id": "in_c-crm_company", # auto-discovered shape
"name": "company", # what _meta.table_name records
"sync_schedule": "every 1h",
}]
# 30m ago — keyed by NAME, the lookup must hit and the table must skip.
repo = _FakeSyncStateRepo({"company": _utc(2026, 5, 1, 9, 30)})
out = filter_due_tables(configs, repo, now=_utc(2026, 5, 1, 10, 0))
assert out == [], (
"id-keyed lookup would have missed sync_state, treated table as "
"never-synced, and kept it. The fix keys by name."
)
# ---------------- _run_sync wiring ------------------------------------------
def test_run_sync_filters_local_tables_by_schedule(monkeypatch, tmp_path):
"""`_run_sync(tables=None)` consults `filter_due_tables` and skips
tables that are not due. Manual override (`tables=[...]`) bypasses
the filter entirely."""
from app.api import sync as sync_module
# Stub get_data_source_type → 'keboola' so the keboola subprocess code
# path is taken (also matches the existing _run_sync shape).
monkeypatch.setattr(
sync_module, "_get_data_dir", lambda: tmp_path,
)
import app.instance_config as instance_config
monkeypatch.setattr(instance_config, "get_data_source_type", lambda: "keboola")
# Fake registry with one due + one skipped table.
fake_configs = [
{"id": "due", "name": "due", "source_type": "keboola",
"sync_schedule": "every 30m", "query_mode": "local"},
{"id": "skipped", "name": "skipped", "source_type": "keboola",
"sync_schedule": "every 30m", "query_mode": "local"},
]
class _StubRegistry:
def __init__(self, conn): pass
def list_local(self, source_type=None): return list(fake_configs)
def get(self, table_id):
return next((c for c in fake_configs if c["id"] == table_id), None)
monkeypatch.setattr(sync_module, "TableRegistryRepository", _StubRegistry)
# Stub get_system_db (imported locally inside _run_sync from src.db).
class _FakeConn:
def close(self): pass
import src.db as _db_mod
monkeypatch.setattr(_db_mod, "get_system_db", lambda: _FakeConn())
# Fake sync_state: 'due' last synced 60m ago, 'skipped' 10m ago.
from datetime import datetime, timezone
last_syncs = {
"due": datetime(2026, 5, 1, 9, 0, tzinfo=timezone.utc),
"skipped": datetime(2026, 5, 1, 9, 50, tzinfo=timezone.utc),
}
class _StubState:
def __init__(self, conn): pass
def get_last_sync(self, table_id): return last_syncs.get(table_id)
monkeypatch.setattr(sync_module, "SyncStateRepository", _StubState)
# Freeze 'now' inside src.scheduler.filter_due_tables. We do this by
# monkeypatching filter_due_tables itself to inject `now=`.
from src import scheduler as _sched
real_filter = _sched.filter_due_tables
monkeypatch.setattr(
sync_module, "filter_due_tables",
lambda cfgs, repo: real_filter(
cfgs, repo, now=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc),
),
)
# Capture the configs that subprocess.run sees (via stdin payload).
captured = {}
def _fake_run(cmd, input, capture_output, text, timeout, env, cwd):
import json as _json
captured["configs"] = _json.loads(input)
class _R:
returncode = 0
stdout = "{}"
stderr = ""
return _R()
monkeypatch.setattr(sync_module.subprocess, "run", _fake_run)
# Stub orchestrator + profiler imports inside the function so we don't
# require a real DuckDB analytics file.
import src.orchestrator as _orch_mod
class _StubOrch:
def rebuild(self): return {}
monkeypatch.setattr(_orch_mod, "SyncOrchestrator", _StubOrch)
# Run with tables=None → filter applies → only 'due' goes to subprocess.
sync_module._run_sync(tables=None)
assert [c["id"] for c in captured["configs"]] == ["due"]
# Run with explicit override → filter is BYPASSED → both go through.
captured.clear()
sync_module._run_sync(tables=["due", "skipped"])
assert sorted(c["id"] for c in captured["configs"]) == ["due", "skipped"]
def test_run_sync_does_not_auto_discover_when_filter_returns_empty(monkeypatch, tmp_path):
"""Devin BUG_0001 on ebb8cc9: when registry HAS tables but filter returns
[] (nothing due), the `if not table_configs` guard must NOT fire
auto-discovery. Otherwise on Keboola instances with KEBOOLA_STORAGE_TOKEN
set, every tick re-discovers + re-reads the registry without the filter,
bypassing sync_schedule entirely."""
from app.api import sync as sync_module
monkeypatch.setattr(sync_module, "_get_data_dir", lambda: tmp_path)
import app.instance_config as instance_config
monkeypatch.setattr(instance_config, "get_data_source_type", lambda: "keboola")
# Critical: KEBOOLA_STORAGE_TOKEN must be set to make the bug reachable.
monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "fake-token")
# Registry has 1 table. Filter will return [] (synced 5m ago, schedule 1h).
fake_configs = [
{"id": "fresh", "name": "fresh", "source_type": "keboola",
"sync_schedule": "every 1h", "query_mode": "local"},
]
class _StubRegistry:
def __init__(self, conn): pass
def list_local(self, source_type=None): return list(fake_configs)
def get(self, table_id):
return next((c for c in fake_configs if c["id"] == table_id), None)
monkeypatch.setattr(sync_module, "TableRegistryRepository", _StubRegistry)
class _FakeConn:
def close(self): pass
import src.db as _db_mod
monkeypatch.setattr(_db_mod, "get_system_db", lambda: _FakeConn())
# 5m ago → not due under "every 1h"
from datetime import datetime, timezone
class _StubState:
def __init__(self, conn): pass
def get_last_sync(self, table_id):
return datetime(2026, 5, 1, 9, 55, tzinfo=timezone.utc)
monkeypatch.setattr(sync_module, "SyncStateRepository", _StubState)
from src import scheduler as _sched
real_filter = _sched.filter_due_tables
monkeypatch.setattr(
sync_module, "filter_due_tables",
lambda cfgs, repo: real_filter(
cfgs, repo, now=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc),
),
)
# Sentinel: if auto-discovery fires, this counter increments.
discovery_calls = []
def _fake_discover(conn, who):
discovery_calls.append(who)
return {"registered": 0, "skipped": 0}
import app.api.admin as _admin_mod
monkeypatch.setattr(_admin_mod, "_discover_and_register_tables", _fake_discover)
# Subprocess + orchestrator stubs in case the function gets that far
# (it shouldn't — filter returned [], no work to do).
def _fake_run(cmd, **kw):
raise AssertionError("subprocess.run must not be called when filter returns empty")
monkeypatch.setattr(sync_module.subprocess, "run", _fake_run)
sync_module._run_sync(tables=None)
assert discovery_calls == [], (
f"Auto-discovery must not fire when registry has tables but the "
f"filter excluded them all; got {discovery_calls!r} call(s)."
)