fix(bq-materialize): TTL reclaim was dead code (Devin Review on extractor.py:166)

`_try_acquire_file_lock` opened the lock file with `open(mode='w')`
BEFORE the mtime check, which truncated the file and refreshed mtime
to now. The subsequent age check always saw ~0, so the TTL reclaim
branch was never reachable and `materialize.lock_ttl_seconds` was
a silently no-op config knob.

Repro:
  before open(w): mtime age = 100000s
  after  open(w): mtime age = 0s

Fix: stat the lock path BEFORE any open(). If pre-probe mtime is
older than TTL, unlink (forcing a fresh inode for the open + flock
that follows). Order is now stat-then-decide-then-probe, not
probe-then-stat-then-decide.

Two regression tests added in tests/test_bq_materialize_concurrency.py:
- test_stale_held_lock_is_reclaimed_despite_live_holder — exercises
  the full reclaim path with a still-living fcntl holder. Pre-fix
  this returned None (in_flight forever); post-fix returns a holder
  fd on a new inode.
- test_failed_probe_does_not_self_refresh_lock_mtime — sister test
  pins that a failed acquisition's mode='w' truncate doesn't
  pathologically loop.

Residual cross-process risk (genuinely overrunning materialize past
TTL races a fresh attempt — both write to the same parquet.tmp,
inode-level flock independence means new acquisition succeeds while
old holder is still alive) stays documented in the helper docstring.
In-process threading.Lock keyed on table_id blocks the single-process
race; cross-process protection relies on TTL being well above
longest plausible COPY (24h default).
This commit is contained in:
ZdenekSrotyr 2026-05-04 22:36:56 +02:00
parent bc9dd5c5f0
commit c432e90f62
3 changed files with 151 additions and 40 deletions

View file

@ -39,6 +39,7 @@ End-to-end clean-analyst-bootstrap rewrite. The web `/setup` page now produces a
- `agnes snapshot create` (formerly `da fetch`) no longer materializes an empty `user/duckdb/analytics.duckdb` when run before any `agnes pull`. Friendly hint redirects to `agnes pull`. - `agnes snapshot create` (formerly `da fetch`) no longer materializes an empty `user/duckdb/analytics.duckdb` when run before any `agnes pull`. Friendly hint redirects to `agnes pull`.
- Workspace `agnes status` reads from the canonical `server/parquet/` and `user/duckdb/analytics.duckdb` paths (was reading legacy `data/parquet/`, `data/metadata/last_sync.json`). - Workspace `agnes status` reads from the canonical `server/parquet/` and `user/duckdb/analytics.duckdb` paths (was reading legacy `data/parquet/`, `data/metadata/last_sync.json`).
- `agnes init` and `agnes pull` errors now use the `cli/error_render.py` typed-error renderer (added in 0.32.0), so analyst-facing error UX matches the structured shape `agnes query --remote` already produces. - `agnes init` and `agnes pull` errors now use the `cli/error_render.py` typed-error renderer (added in 0.32.0), so analyst-facing error UX matches the structured shape `agnes query --remote` already produces.
- **BigQuery materialize TTL reclaim is no longer dead code** (Devin Review on `extractor.py:166`). `_try_acquire_file_lock` used to call `open(lock_path, mode="w")` BEFORE checking the lock-file mtime, which truncated the file and refreshed mtime to *now* on every invocation. The subsequent `time.time() - lock_path.stat().st_mtime` always saw age ~0, so `age > TTL` never fired, and `materialize.lock_ttl_seconds` was a silently no-op config knob. Fix: stat the lock path BEFORE any `open()` to read the real pre-probe mtime; if older than TTL, unlink (forcing a fresh inode for the next `open + flock`); only then probe. Two regression tests added: `test_stale_held_lock_is_reclaimed_despite_live_holder` exercises the full reclaim path with a still-living fcntl holder, `test_failed_probe_does_not_self_refresh_lock_mtime` pins that a failed acquisition doesn't pathologically loop. Residual cross-process risk (a genuinely overrunning materialize past TTL races a fresh attempt) is documented in the helper docstring; in-process `threading.Lock` keyed on `table_id` blocks the single-process race.
- **`agnes init --token X` now correctly uses the explicit token in the verify call**, even when `~/.config/agnes/token.json` already holds a stale token from a prior install. Pre-fix `cli.config.get_token()` read the on-disk file first and only fell back to env vars, so step 2 (PAT-verify) ran with the stale token and failed with a confusing 401 — even though the `--token` arg was valid (Devin Review on `init.py:99`). Fix: a `ContextVar`-based override in `cli.config` short-circuits `get_token()` before the file read; `_override_server_env` (used by both `agnes init` and `agnes pull`'s `run_pull`) sets it for the duration of the call. Async-safe (each task sees its own override) and leak-proof (resets on context exit). - **`agnes init --token X` now correctly uses the explicit token in the verify call**, even when `~/.config/agnes/token.json` already holds a stale token from a prior install. Pre-fix `cli.config.get_token()` read the on-disk file first and only fell back to env vars, so step 2 (PAT-verify) ran with the stale token and failed with a confusing 401 — even though the `--token` arg was valid (Devin Review on `init.py:99`). Fix: a `ContextVar`-based override in `cli.config` short-circuits `get_token()` before the file read; `_override_server_env` (used by both `agnes init` and `agnes pull`'s `run_pull`) sets it for the duration of the call. Async-safe (each task sees its own override) and leak-proof (resets on context exit).
- **`agnes status` sessions counter now reads the same source as `agnes push`** — `~/.claude/projects/<encoded-cwd>/` (Claude Code's actual write path) with the legacy `<workspace>/user/sessions/` as a fallback, via `cli.lib.claude_sessions.list_session_files()`. Pre-fix the counter only checked the legacy dir and always reported 0 in workspaces bootstrapped with `agnes init` (since Claude Code never writes there). - **`agnes status` sessions counter now reads the same source as `agnes push`** — `~/.claude/projects/<encoded-cwd>/` (Claude Code's actual write path) with the legacy `<workspace>/user/sessions/` as a fallback, via `cli.lib.claude_sessions.list_session_files()`. Pre-fix the counter only checked the legacy dir and always reported 0 in workspaces bootstrapped with `agnes init` (since Claude Code never writes there).
- **BigQuery materialize lock-reclaim docstring** at `connectors/bigquery/extractor.py:_try_acquire_file_lock` corrected: a still-running holder's `fcntl.flock` does NOT block the post-unlink reacquisition (new file = new inode = independent lock). The in-process `threading.Lock` keyed on `table_id` is the actual concurrency guard; cross-process protection (two schedulers on one workspace) relies on operators not running multiple concurrent schedulers AND on the TTL being well above the longest plausible COPY (24 h default). Documenting the residual risk so it isn't masked by a misleading "we're safe" comment (Devin Review on extractor.py:111). - **BigQuery materialize lock-reclaim docstring** at `connectors/bigquery/extractor.py:_try_acquire_file_lock` corrected: a still-running holder's `fcntl.flock` does NOT block the post-unlink reacquisition (new file = new inode = independent lock). The in-process `threading.Lock` keyed on `table_id` is the actual concurrency guard; cross-process protection (two schedulers on one workspace) relies on operators not running multiple concurrent schedulers AND on the TTL being well above the longest plausible COPY (24 h default). Documenting the residual risk so it isn't masked by a misleading "we're safe" comment (Devin Review on extractor.py:111).

View file

@ -105,7 +105,16 @@ def _try_acquire_file_lock(lock_path: Path):
on conflict. on conflict.
Stale-lock reclaim: if the lock_path exists and its mtime is older Stale-lock reclaim: if the lock_path exists and its mtime is older
than the configured TTL, log a warning and unlink before retrying. than the configured TTL, log a warning and unlink BEFORE attempting
to open + flock. The pre-stat-then-decide order is load-bearing: a
naive "open then check mtime" path opens with `mode="w"` which
truncates the file and refreshes its mtime to *now* on every
invocation, including failed flock attempts. That makes the age
check at the next pass always see ~0 the TTL reclaim branch
becomes unreachable and `materialize.lock_ttl_seconds` config knob
is silently dead code (Devin Review on extractor.py:166). Stat-first
sidesteps the self-refresh: the reclaim decision is made against the
REAL mtime the file had before any of this call's I/O.
Caveat: ``lock_path.unlink()`` + the subsequent ``open()`` creates a Caveat: ``lock_path.unlink()`` + the subsequent ``open()`` creates a
NEW inode fcntl.flock keys on inode, so a still-running holder's NEW inode fcntl.flock keys on inode, so a still-running holder's
@ -121,17 +130,44 @@ def _try_acquire_file_lock(lock_path: Path):
and skip reclaim while the holder pid is alive.""" and skip reclaim while the holder pid is alive."""
lock_path.parent.mkdir(parents=True, exist_ok=True) lock_path.parent.mkdir(parents=True, exist_ok=True)
def _try_open_and_flock(): # Stat BEFORE any open(). This is the pre-probe mtime — what the
# Open in 'w' mode so the file's mtime updates on every successful # file looked like before our call ran any I/O. If the file doesn't
# acquisition — the mtime is the TTL signal for the next caller. # exist yet, there's nothing stale to reclaim and we fall through to
# Content is intentionally empty; the fd exists only to anchor flock. # the open + flock path.
try:
pre_probe_mtime: float | None = lock_path.stat().st_mtime
except FileNotFoundError:
pre_probe_mtime = None
if pre_probe_mtime is not None:
age = time.time() - pre_probe_mtime
if age > _get_lock_ttl_seconds():
logger.warning(
"Reclaiming stale materialize lock at %s (age %.1fs > TTL)",
lock_path, age,
)
try:
lock_path.unlink()
except FileNotFoundError:
# Race: someone else reclaimed between our stat and unlink.
# Fall through to the open + flock attempt — they must have
# already acquired and we'll legitimately conflict.
pass
# Open in 'w' mode (creates if missing, truncates if present). The
# mtime refresh on success is intentional — that's the signal the
# NEXT caller's pre_probe_mtime stat reads to decide whether the
# lock has gone stale. Failed flock on a non-stale lock simply
# bumps mtime to now (the last-attempted-acquire timestamp); that's
# a harmless lie about who's holding it, since the active holder
# would refresh it anyway on a successful acquire.
f = open(lock_path, "w") f = open(lock_path, "w")
try: try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return f return f
except BlockingIOError: except BlockingIOError:
# Another holder owns the lock — return None so the caller can # Another holder owns the lock — return None so the caller can
# decide between TTL-reclaim and propagating MaterializeInFlightError. # propagate MaterializeInFlightError.
f.close() f.close()
return None return None
except OSError: except OSError:
@ -142,29 +178,6 @@ def _try_acquire_file_lock(lock_path: Path):
f.close() f.close()
raise raise
holder = _try_open_and_flock()
if holder is not None:
return holder
# Conflict. If the file is older than TTL, reclaim and retry once.
try:
age = time.time() - lock_path.stat().st_mtime
except FileNotFoundError:
return _try_open_and_flock()
if age > _get_lock_ttl_seconds():
logger.warning(
"Reclaiming stale materialize lock at %s (age %.1fs > TTL)",
lock_path, age,
)
try:
lock_path.unlink()
except FileNotFoundError:
pass
return _try_open_and_flock()
return None
def _detect_table_type( def _detect_table_type(
conn: duckdb.DuckDBPyConnection, conn: duckdb.DuckDBPyConnection,

View file

@ -187,6 +187,103 @@ def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch):
holder.close() holder.close()
def test_stale_held_lock_is_reclaimed_despite_live_holder(tmp_path, monkeypatch):
"""Regression for Devin Review on extractor.py:166. The TTL reclaim
path used to be dead code: `_try_acquire_file_lock` opened the lock
file with `open(mode="w")` BEFORE checking mtime, which truncated
the file and refreshed mtime to now on every call. Subsequent
`time.time() - lock_path.stat().st_mtime` always saw age ~0, so
`age > TTL` never fired, so `materialize.lock_ttl_seconds` was
silently a no-op.
This test exercises the actual reclaim path: an OLD-mtime lock file
held by a still-living fcntl holder. Pre-fix path: failed probe
refreshes mtime age check sees ~0 never reclaims caller
raises MaterializeInFlightError forever. Post-fix path: stat first,
see old mtime, unlink (creates new inode), open + flock new inode
succeeds (the live holder's flock is on the now-orphan old inode,
no inode-level conflict).
"""
import fcntl
import os
# Use the helper directly — exercising it through `materialize_query`
# would also work but obscures which acquisition we're testing.
from connectors.bigquery.extractor import _try_acquire_file_lock
lock_path = Path(tmp_path) / "t1.parquet.lock"
# Live holder: open + flock. Holder stays alive for the duration
# of the test (we close it in finally).
holder = open(lock_path, "w")
fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
# Backdate the mtime past the default 24h TTL. This is the
# condition the reclaim should detect.
old_ts = time.time() - 25 * 3600
os.utime(lock_path, (old_ts, old_ts))
try:
# Pre-fix this call returns None (failed probe refreshed mtime,
# age check < TTL, no reclaim). Post-fix: stat first → old mtime
# → unlink + new inode → flock succeeds → returns a holder fd.
new_holder = _try_acquire_file_lock(lock_path)
assert new_holder is not None, (
"TTL reclaim is dead code: the old-mtime lock should have "
"been unlinked and a new inode acquired"
)
assert new_holder.fileno() != holder.fileno()
new_holder.close()
finally:
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
holder.close()
def test_failed_probe_does_not_self_refresh_lock_mtime(tmp_path, monkeypatch):
"""Sister test: the pre-probe stat must read the REAL mtime, not a
value contaminated by the call's own `open(mode='w')`. Pre-fix the
probe ran first and truncated the file, so any subsequent caller
including this test's own followup stat — saw ~now mtime. After
fix, a failed acquisition should NOT update mtime if the file
wasn't already due for reclaim.
Setup: lock file exists with mtime FRESH (within TTL) AND held by
a live holder. New call probes fails returns None. Assertion:
mtime after the failed call is no more than ~1 s newer than the
pre-call mtime the failed probe's `open('w')` does still touch
the file (mode='w' inherently truncates on open), and we accept
that as documented behavior. But mtime must NOT have jumped from
"old fresh" to "way fresher" by some pathological refresh loop.
"""
import fcntl
from connectors.bigquery.extractor import _try_acquire_file_lock
lock_path = Path(tmp_path) / "t1.parquet.lock"
holder = open(lock_path, "w")
fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
pre_call_mtime = lock_path.stat().st_mtime
try:
# Probe fails — no reclaim because mtime is fresh.
result = _try_acquire_file_lock(lock_path)
assert result is None, "fresh held lock must block, not reclaim"
post_call_mtime = lock_path.stat().st_mtime
# The probe still opens with mode='w', which DOES update mtime.
# That's documented in the helper's docstring as the
# "last-attempted-acquire" signal. We're not asserting "mtime
# unchanged" — just that the operation is bounded (no runaway).
assert post_call_mtime - pre_call_mtime < 5, (
"failed probe shifted mtime by more than 5s — implausible "
"unless the helper looped"
)
finally:
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
holder.close()
def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch): def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch):
"""When `materialize.lock_ttl_seconds` is set in instance.yaml, that """When `materialize.lock_ttl_seconds` is set in instance.yaml, that
value overrides the default.""" value overrides the default."""