diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d3eff8..f18a5cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,15 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.54.3] — 2026-05-13 + +### Added +- `AGNES_DEFAULT_SYNC_SCHEDULE` env var (consumed by `app/api/sync.py:_run_materialized_pass`) sets the platform-wide fallback `sync_schedule` for registry rows that don't pin their own value. Lets a deployment dial cadence down to `daily 03:00` without having to PUT every row. Per-table `sync_schedule` still wins; literal `every 1h` is the floor if neither is set (matches OSS-historical behaviour). + +### Fixed +- `GET /api/sync/status` no longer reports `locked=false` during the ~few-hundred-ms window between the trigger handler's 200 response and the background task's `_sync_lock.acquire()`. The handler now stamps `_recent_trigger_at`, and the status endpoint returns `locked=true` for `_TRIGGER_HOLD_SEC` (=30s) after the most recent trigger. Pre-fix, host-side `agnes-auto-upgrade.sh` defer probe firing in that window saw an honest `locked=false` and proceeded with `docker compose up -d`, SIGKILLing the just-spawning extractor / materialized worker. Observed on agnes-dev: 3 mid-sync container kills in 30 min until the trigger-hold window closed the gap. +- `scripts/ops/agnes-auto-upgrade.sh`: the post-upgrade chown loop now includes `/data/tmp` (the default `AGNES_TEMP_DIR` set in `docker-compose.yml`) and `mkdir -p`'s it first. Pre-fix the runtime user (`uid 999`) couldn't create `/data/tmp` under a root-owned data-disk root, so tempfiles silently fell back to the boot disk's overlayfs `/tmp` — defeating the whole point of routing slice staging onto the dedicated data volume. + ## [0.54.2] — 2026-05-13 ### Added diff --git a/app/api/sync.py b/app/api/sync.py index 9e7ba97..cb9c486 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -41,6 +41,26 @@ router = APIRouter(prefix="/api/sync", tags=["sync"]) # something bypasses the handler). _sync_lock = threading.Lock() +# Race-protection: the trigger handler returns 200 BEFORE the background task +# acquires ``_sync_lock``. In that ~few-hundred-ms gap, ``/api/sync/status`` +# would honestly report ``locked=False`` — and the host-side +# ``agnes-auto-upgrade.sh`` defer probe (which polls this endpoint) would +# proceed with ``docker compose up -d`` and SIGKILL the still-spawning +# extractor / materialized worker. Mid-sync container kill is the exact +# class of corruption the WAL replay auto-recovery is meant to be a +# safety net for, not a routine occurrence. +# +# Fix: stamp the trigger time alongside the lock. ``/api/sync/status`` also +# returns ``locked=True`` for ``_TRIGGER_HOLD_SEC`` seconds after the most +# recent trigger, even if the background task hasn't yet acquired the lock. +# The window is short enough that an operator-issued ``/api/sync/trigger`` +# followed by an immediate ``GET /api/sync/status`` is consistent +# (locked=True), but long enough to cover the schedule → background-task +# spawn latency. Defense in depth: the real lock still gates the +# extractor subprocess. +_TRIGGER_HOLD_SEC = 30 +_recent_trigger_at: float = 0.0 # monotonic clock; 0 = never triggered + def _file_hash(path: Path) -> str: if not path.exists(): @@ -164,7 +184,17 @@ def _run_materialized_pass( last = state.get_last_sync(ref_name) last_iso = last.isoformat() if last else None - schedule = row.get("sync_schedule") or "every 1h" + # Per-table schedule wins; fall through to AGNES_DEFAULT_SYNC_SCHEDULE + # (operator override), then to ``every 1h`` (OSS-historical default). + # The env knob lets a deployment dial down the platform-wide refresh + # cadence without having to PUT every registry row — useful when + # data freshness budget is "once per day" and the hourly default + # over-fetches. + schedule = ( + row.get("sync_schedule") + or os.environ.get("AGNES_DEFAULT_SYNC_SCHEDULE", "").strip() + or "every 1h" + ) if not is_table_due(schedule, last_iso): summary["skipped"].append({"table": ref_name, "reason": "due_check"}) continue @@ -785,10 +815,19 @@ async def sync_status(): Returns: ``{"locked": bool}`` — True if `_sync_lock` is currently held by - a `_run_sync` invocation. The host script defers the upgrade - when this is True and retries on the next 5-min cron tick. + a `_run_sync` invocation, OR a sync was triggered within the + last ``_TRIGGER_HOLD_SEC`` seconds (so the FastAPI background + task hasn't yet acquired the lock). Without the trigger-hold + window, an auto-upgrade probe firing in the gap between the + trigger handler's 200 response and the background task's + ``_sync_lock.acquire()`` would see ``locked=False`` and proceed + with ``up -d`` — killing the just-spawning extractor. """ - return {"locked": _sync_lock.locked()} + locked = _sync_lock.locked() + if not locked and _recent_trigger_at: + # Monotonic deadline; clock skew / DST jumps don't matter. + locked = (time.monotonic() - _recent_trigger_at) < _TRIGGER_HOLD_SEC + return {"locked": locked} # ---- Trigger ---- @@ -870,6 +909,12 @@ async def trigger_sync( detail="sync_already_in_progress", ) _t0 = time.monotonic() + # Stamp the trigger time so `/api/sync/status` reports locked=True + # for the next ``_TRIGGER_HOLD_SEC`` even though the background + # task hasn't yet acquired ``_sync_lock``. Closes the race window + # the host-side ``agnes-auto-upgrade.sh`` defer probe was hitting. + global _recent_trigger_at + _recent_trigger_at = _t0 background_tasks.add_task(_run_sync, tables) try: from src.db import get_system_db diff --git a/pyproject.toml b/pyproject.toml index a839283..3fdae81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.54.2" +version = "0.54.3" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/scripts/ops/agnes-auto-upgrade.sh b/scripts/ops/agnes-auto-upgrade.sh index 8d0d262..91f00be 100755 --- a/scripts/ops/agnes-auto-upgrade.sh +++ b/scripts/ops/agnes-auto-upgrade.sh @@ -176,7 +176,15 @@ if [ "$BEFORE" != "$AFTER" ] || [ "$CONFIG_BEFORE" != "$CONFIG_AFTER" ]; then IMAGE_UIDGID=$(docker run --rm --entrypoint cat "$IMAGE" /etc/passwd 2>/dev/null \ | awk -F: -v u="${IMAGE_USER%%:*}" '$1==u || $3==u {print $3":"$4; exit}') if [ -n "$IMAGE_UIDGID" ]; then - for d in "$STATE_DIR" /data/extracts /data/analytics; do + # /data/tmp is the default ``AGNES_TEMP_DIR`` from + # docker-compose.yml — Snowflake-UNLOAD slice staging + # and CSV intermediates land here. Without an explicit + # mkdir + chown, the runtime user can't create it + # under a root-owned ``/data`` (the data disk's root + # comes up root-owned on first mount). ``mkdir -p`` + # is idempotent so existing dirs survive. + mkdir -p /data/tmp 2>/dev/null || true + for d in "$STATE_DIR" /data/extracts /data/analytics /data/tmp; do [ -d "$d" ] && chown -R "$IMAGE_UIDGID" "$d" 2>/dev/null || true done fi diff --git a/tests/test_run_materialized_pass_in_flight_skip.py b/tests/test_run_materialized_pass_in_flight_skip.py index 4f4a702..a7e59a5 100644 --- a/tests/test_run_materialized_pass_in_flight_skip.py +++ b/tests/test_run_materialized_pass_in_flight_skip.py @@ -41,6 +41,60 @@ def fake_registry_with_one_materialized(monkeypatch, tmp_path): return state +def test_default_schedule_falls_through_env_then_every_1h( + monkeypatch, fake_registry_with_one_materialized, +): + """Per-table ``sync_schedule=None`` → fall through to + ``AGNES_DEFAULT_SYNC_SCHEDULE`` env (operator deployment override) → + fall through to literal ``every 1h`` (OSS-historical default). + Test the THREE branches: + + 1. Per-table schedule wins over env. + 2. Env wins when per-table is None. + 3. ``every 1h`` is the floor — env unset + per-table None. + + Branch (2) is the operator knob for ``daily 03:00`` deployments + (data freshness budget once-per-day; the hourly default + over-fetches Snowflake on every Keboola export-async cycle).""" + captured = {} + + def fake_is_due(schedule, last_iso, now=None): + captured["schedule"] = schedule + return False # short-circuit the dispatcher + + monkeypatch.setattr("app.api.sync.is_table_due", fake_is_due) + + # Case 3: env unset, per-table None → "every 1h" + monkeypatch.delenv("AGNES_DEFAULT_SYNC_SCHEDULE", raising=False) + _run_materialized_pass(MagicMock(), MagicMock()) + assert captured["schedule"] == "every 1h", captured + + # Case 2: env set, per-table None → env value + monkeypatch.setenv("AGNES_DEFAULT_SYNC_SCHEDULE", "daily 03:00") + _run_materialized_pass(MagicMock(), MagicMock()) + assert captured["schedule"] == "daily 03:00", captured + + # Case 1: per-table schedule wins over env. (Mutate fixture's row.) + fake_registry_with_one_materialized # ensure fixture is loaded + import app.api.sync as _sm + # The fixture's _Repo.list_all returns a captured list; reach into + # its closure isn't easy. Easier: monkeypatch list_all directly. + pinned_rows = [{ + "id": "in_flight_t", "name": "in_flight_t", + "query_mode": "materialized", "source_type": "bigquery", + "source_query": "SELECT 1", + "sync_schedule": "every 30m", # explicit per-table + }] + + class _RepoWithSched: + def __init__(self, conn): pass + def list_all(self): return pinned_rows + + monkeypatch.setattr(_sm, "TableRegistryRepository", _RepoWithSched) + _run_materialized_pass(MagicMock(), MagicMock()) + assert captured["schedule"] == "every 30m", captured + + def test_in_flight_recorded_as_skipped_not_error(fake_registry_with_one_materialized): state = fake_registry_with_one_materialized diff --git a/tests/test_sync_trigger_singleton.py b/tests/test_sync_trigger_singleton.py index 0298a4e..70a4886 100644 --- a/tests/test_sync_trigger_singleton.py +++ b/tests/test_sync_trigger_singleton.py @@ -18,12 +18,16 @@ from app.api import sync as sync_module @pytest.fixture(autouse=True) def reset_sync_lock(): """Make sure each test starts with a free lock — and never leaves one - held even if an assertion fires mid-test.""" + held even if an assertion fires mid-test. Also wipe the trigger-hold + timestamp so an earlier test's ``_recent_trigger_at`` doesn't + silently pin the next test's ``/api/sync/status`` at locked=True.""" if sync_module._sync_lock.locked(): sync_module._sync_lock.release() + sync_module._recent_trigger_at = 0.0 yield if sync_module._sync_lock.locked(): sync_module._sync_lock.release() + sync_module._recent_trigger_at = 0.0 def test_run_sync_skips_when_lock_held(capsys): @@ -222,3 +226,56 @@ def test_sync_status_does_not_require_auth(): # No auth headers at all. resp = client.get("/api/sync/status") assert resp.status_code == 200 + + +def test_sync_status_trigger_hold_window_reports_locked_after_trigger(): + """Race-protection: even when ``_sync_lock`` is NOT yet held (the + background task hasn't acquired it), a recent ``_recent_trigger_at`` + timestamp within ``_TRIGGER_HOLD_SEC`` must report + ``{"locked": true}``. Without this, an auto-upgrade defer probe + firing in the few-hundred-ms window between the trigger handler's + 200 response and ``_run_sync.acquire()`` would see locked=False + and SIGKILL the spawning extractor.""" + import time as _time + if sync_module._sync_lock.locked(): + sync_module._sync_lock.release() + # Stamp a fresh trigger time, then immediately probe. + sync_module._recent_trigger_at = _time.monotonic() + try: + from fastapi.testclient import TestClient + from fastapi import FastAPI + app = FastAPI() + app.include_router(sync_module.router) + client = TestClient(app) + resp = client.get("/api/sync/status") + assert resp.status_code == 200 + assert resp.json() == {"locked": True}, ( + "trigger-hold window not honored — auto-upgrade defer probe " + "would race the background task acquisition" + ) + finally: + sync_module._recent_trigger_at = 0.0 + + +def test_sync_status_trigger_hold_window_expires(): + """Trigger-hold reports locked only for ``_TRIGGER_HOLD_SEC`` — a + stale timestamp past the window must NOT pin the probe at True + forever. Without expiry, a single trigger would block all + auto-upgrades indefinitely.""" + if sync_module._sync_lock.locked(): + sync_module._sync_lock.release() + # Stamp a timestamp ``_TRIGGER_HOLD_SEC + 5`` seconds in the past. + import time as _time + sync_module._recent_trigger_at = ( + _time.monotonic() - sync_module._TRIGGER_HOLD_SEC - 5 + ) + try: + from fastapi.testclient import TestClient + from fastapi import FastAPI + app = FastAPI() + app.include_router(sync_module.router) + client = TestClient(app) + resp = client.get("/api/sync/status") + assert resp.json() == {"locked": False} + finally: + sync_module._recent_trigger_at = 0.0