agnes-the-ai-analyst/tests/test_sync_trigger_singleton.py
ZdenekSrotyr 117b6784ea
fix(sync+ops): defer-probe race, AGNES_TEMP_DIR chown, default-schedule env knob (#283)
* fix(sync+ops): defer-probe race, AGNES_TEMP_DIR chown, default-schedule env knob

Three sync-ops fixes surfaced during agnes-dev steady-state operation
after the v0.46→v0.54 cutover settled. None of them depend on each
other; bundled because they all live in the sync trigger / agnes-auto-
upgrade flow and are diagnosed from the same observation window.

1. (fix) /api/sync/status race window. The trigger handler returned 200
   BEFORE the background task acquired _sync_lock. In that few-hundred-ms
   gap, an honest /api/sync/status call returned locked=false — and the
   host-side agnes-auto-upgrade.sh defer probe fired right in that
   window proceeded with 'docker compose up -d' and SIGKILLed the
   just-spawning extractor / materialized worker.

   Observed on agnes-dev: 3 mid-sync container kills in 30 min, each
   followed by a few-min outage and a partial sync. The WAL replay
   auto-recovery (PR #217) kept the system DB consistent through each
   kill, but the actual sync work was lost.

   Fix: handler stamps _recent_trigger_at; status endpoint returns
   locked=true for _TRIGGER_HOLD_SEC (=30s) after the most recent
   trigger, even if the background task hasn't yet acquired the lock.
   30s covers the schedule → spawn latency with margin; short enough
   not to indefinitely block auto-upgrade after a one-off trigger.
   Defense in depth: the real lock still gates the extractor subprocess.

2. (fix) scripts/ops/agnes-auto-upgrade.sh: post-upgrade chown loop
   now mkdir -p's /data/tmp before chown'ing, and includes it in the
   list of dirs that get the runtime UID:GID. /data/tmp is the default
   AGNES_TEMP_DIR set in docker-compose.yml — Snowflake-UNLOAD slice
   staging and CSV intermediates land here. Pre-fix the runtime user
   (uid 999) couldn't create /data/tmp under a root-owned data-disk
   root, so tempfiles silently fell back to the boot disk's overlayfs
   /tmp — defeating the whole point of routing slice staging onto the
   dedicated data volume.

3. (feat) AGNES_DEFAULT_SYNC_SCHEDULE env var sets the platform-wide
   fallback sync_schedule. Lets a deployment dial cadence down to
   'daily 03:00' (data freshness budget once-per-day) without having
   to PUT every registry row. Per-table sync_schedule still wins;
   literal 'every 1h' is the floor if neither is set — OSS-historical
   default unchanged.

Tests:
- test_sync_status_trigger_hold_window_reports_locked_after_trigger
- test_sync_status_trigger_hold_window_expires
- test_default_schedule_falls_through_env_then_every_1h (3 branches)

* release: 0.54.3 — sync defer-probe race + AGNES_TEMP_DIR chown + default-schedule env knob

Last commit on the PR per CLAUDE.md hard rule. Patch bump (0.54.2 →
0.54.3) bundling three sync-ops fixes from agnes-dev steady-state
observation.

No DB migration; trigger-hold window is additive (anything that already
saw locked=true still does — the window EXTENDS the true period);
/data/tmp chown is no-op when already correct; AGNES_DEFAULT_SYNC_SCHEDULE
unset = every-1h default unchanged.
2026-05-13 09:44:20 +00:00

281 lines
11 KiB
Python

"""Process-wide singleton guard around POST /api/sync/trigger.
Without it, two near-simultaneous trigger calls each launch their own
extractor subprocess, both write `extract.duckdb`, fight for the file
lock, starve uvicorn, and Docker flips the container to `unhealthy`.
These tests cover the trigger handler's 409 fast-fail (the
operator-visible behavior) and the in-`_run_sync` defense-in-depth
(if something bypasses the handler).
"""
from unittest.mock import patch
import pytest
from app.api import sync as sync_module
@pytest.fixture(autouse=True)
def reset_sync_lock():
"""Make sure each test starts with a free lock — and never leaves one
held even if an assertion fires mid-test. Also wipe the trigger-hold
timestamp so an earlier test's ``_recent_trigger_at`` doesn't
silently pin the next test's ``/api/sync/status`` at locked=True."""
if sync_module._sync_lock.locked():
sync_module._sync_lock.release()
sync_module._recent_trigger_at = 0.0
yield
if sync_module._sync_lock.locked():
sync_module._sync_lock.release()
sync_module._recent_trigger_at = 0.0
def test_run_sync_skips_when_lock_held(capsys):
"""When `_sync_lock` is already held, `_run_sync` no-ops with a log
line instead of starting a second extractor subprocess."""
sync_module._sync_lock.acquire()
# Patch the heavy parts so a successful path would otherwise execute.
# Reaching any of them while the lock is held would be the bug.
with patch("app.api.sync.subprocess.run") as run_mock, \
patch("app.instance_config.get_data_source_type") as src_mock:
sync_module._run_sync(tables=None)
assert not run_mock.called, "extractor subprocess must not run when lock is held"
assert not src_mock.called, "_run_sync must short-circuit before reading config"
captured = capsys.readouterr()
assert "another sync is already in flight" in captured.err
def test_run_sync_releases_lock_on_exception():
"""Even if the body throws, the lock must release so the next sync can
run. Asserts the `finally:` covers all exit paths.
`_run_sync` imports `get_data_source_type` lazily inside the body, so
we patch the source module rather than the re-export in `app.api.sync`.
"""
with patch(
"app.instance_config.get_data_source_type",
side_effect=RuntimeError("boom"),
):
# Should not raise — `_run_sync` catches and logs
sync_module._run_sync(tables=None)
assert not sync_module._sync_lock.locked()
def test_trigger_endpoint_returns_409_when_locked():
"""Handler-level fast-fail: when a sync is already running, the
trigger endpoint returns 409 without scheduling a second background
task."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
# Stand up a minimal app exposing only the sync router. Bypass auth
# by overriding the require_admin dependency.
from app.auth.access import require_admin
app = FastAPI()
app.include_router(sync_module.router)
app.dependency_overrides[require_admin] = lambda: {"id": "test", "email": "t@e"}
client = TestClient(app)
sync_module._sync_lock.acquire()
try:
resp = client.post("/api/sync/trigger")
assert resp.status_code == 409
assert resp.json()["detail"] == "sync_already_in_progress"
finally:
sync_module._sync_lock.release()
def test_trigger_endpoint_succeeds_when_lock_free():
"""When the lock is free, the trigger endpoint schedules the
background task and returns 200. The background task itself doesn't
execute synchronously in TestClient — that's how FastAPI background
tasks work — so we patch `_run_sync` to a no-op and only assert the
handler shape."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
from app.auth.access import require_admin
app = FastAPI()
app.include_router(sync_module.router)
app.dependency_overrides[require_admin] = lambda: {"id": "test", "email": "t@e"}
client = TestClient(app)
with patch("app.api.sync._run_sync") as run_mock:
resp = client.post("/api/sync/trigger")
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "triggered"
# BackgroundTasks runs after response; TestClient awaits them
run_mock.assert_called_once()
# ---- body shape acceptance -------------------------------------------------
def _make_client():
"""Stand up a minimal FastAPI app exposing the sync router with auth
bypassed and `_run_sync` mocked. Returns (client, run_mock)."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
from app.auth.access import require_admin
app = FastAPI()
app.include_router(sync_module.router)
app.dependency_overrides[require_admin] = lambda: {"id": "test", "email": "t@e"}
return TestClient(app)
@pytest.mark.parametrize("body,expected_tables", [
(None, None), # no body
([], []), # empty array
(["kbc_job"], ["kbc_job"]), # bare array
(["a", "b", "c"], ["a", "b", "c"]),
({"tables": None}, None), # explicit null
({"tables": []}, []),
({"tables": ["kbc_job"]}, ["kbc_job"]), # object form
({"tables": ["a", "b"], "extra": "x"}, ["a", "b"]), # extra keys ignored
])
def test_trigger_accepts_both_body_shapes(body, expected_tables):
"""Both ``["x", "y"]`` and ``{"tables": ["x", "y"]}`` (and `null` /
no body) reach `_run_sync` with the same `tables` arg. Lets older
clients (raw array) and newer ones (object matching the response
payload shape) both work."""
client = _make_client()
with patch("app.api.sync._run_sync") as run_mock:
if body is None:
resp = client.post("/api/sync/trigger")
else:
resp = client.post("/api/sync/trigger", json=body)
assert resp.status_code == 200, resp.text
run_mock.assert_called_once_with(expected_tables)
@pytest.mark.parametrize("bad_body", [
"kbc_job", # bare string
42, # number
{"tables": "kbc_job"}, # tables as string, not array
{"tables": [1, 2, 3]}, # tables entries not strings
[1, 2, 3], # array of ints
[{"id": "x"}], # array of objects
])
def test_trigger_rejects_malformed_bodies(bad_body):
"""Anything that isn't a list-of-strings, an object with a
list-of-strings under `tables`, or null/missing returns 422 with a
structured detail — never silently treated as 'sync everything'."""
client = _make_client()
with patch("app.api.sync._run_sync") as run_mock:
resp = client.post("/api/sync/trigger", json=bad_body)
assert resp.status_code == 422, resp.text
assert not run_mock.called
# ---- /api/sync/status (auto-upgrade defer probe) --------------------------
def test_sync_status_unlocked_returns_locked_false():
"""Default state: no sync running → ``{"locked": false}``. No auth
required (host-side cron probes from outside the auth boundary)."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
app = FastAPI()
app.include_router(sync_module.router)
client = TestClient(app)
if sync_module._sync_lock.locked():
sync_module._sync_lock.release()
resp = client.get("/api/sync/status")
assert resp.status_code == 200
assert resp.json() == {"locked": False}
def test_sync_status_locked_returns_locked_true():
"""Held lock → ``{"locked": true}``. agnes-auto-upgrade.sh greps for
`"locked":true` to defer the recreate, so the wire format must be
exactly this shape."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
app = FastAPI()
app.include_router(sync_module.router)
client = TestClient(app)
sync_module._sync_lock.acquire()
try:
resp = client.get("/api/sync/status")
assert resp.status_code == 200
body = resp.json()
assert body == {"locked": True}
finally:
sync_module._sync_lock.release()
def test_sync_status_does_not_require_auth():
"""No `require_admin` / `get_current_user` dependency — the host's
cron has no PAT and shouldn't need one for a status check."""
from fastapi.testclient import TestClient
from fastapi import FastAPI
app = FastAPI()
app.include_router(sync_module.router)
client = TestClient(app)
# No auth headers at all.
resp = client.get("/api/sync/status")
assert resp.status_code == 200
def test_sync_status_trigger_hold_window_reports_locked_after_trigger():
"""Race-protection: even when ``_sync_lock`` is NOT yet held (the
background task hasn't acquired it), a recent ``_recent_trigger_at``
timestamp within ``_TRIGGER_HOLD_SEC`` must report
``{"locked": true}``. Without this, an auto-upgrade defer probe
firing in the few-hundred-ms window between the trigger handler's
200 response and ``_run_sync.acquire()`` would see locked=False
and SIGKILL the spawning extractor."""
import time as _time
if sync_module._sync_lock.locked():
sync_module._sync_lock.release()
# Stamp a fresh trigger time, then immediately probe.
sync_module._recent_trigger_at = _time.monotonic()
try:
from fastapi.testclient import TestClient
from fastapi import FastAPI
app = FastAPI()
app.include_router(sync_module.router)
client = TestClient(app)
resp = client.get("/api/sync/status")
assert resp.status_code == 200
assert resp.json() == {"locked": True}, (
"trigger-hold window not honored — auto-upgrade defer probe "
"would race the background task acquisition"
)
finally:
sync_module._recent_trigger_at = 0.0
def test_sync_status_trigger_hold_window_expires():
"""Trigger-hold reports locked only for ``_TRIGGER_HOLD_SEC`` — a
stale timestamp past the window must NOT pin the probe at True
forever. Without expiry, a single trigger would block all
auto-upgrades indefinitely."""
if sync_module._sync_lock.locked():
sync_module._sync_lock.release()
# Stamp a timestamp ``_TRIGGER_HOLD_SEC + 5`` seconds in the past.
import time as _time
sync_module._recent_trigger_at = (
_time.monotonic() - sync_module._TRIGGER_HOLD_SEC - 5
)
try:
from fastapi.testclient import TestClient
from fastapi import FastAPI
app = FastAPI()
app.include_router(sync_module.router)
client = TestClient(app)
resp = client.get("/api/sync/status")
assert resp.json() == {"locked": False}
finally:
sync_module._recent_trigger_at = 0.0