feat(bq-materialize): per-table mutex + file lock with TTL reclaim

Two layers of concurrency control. Layer 1 is a per-table_id
threading.Lock keyed on table_id; Layer 2 is fcntl.flock on a sibling
<id>.parquet.lock file. Overlapping calls for the same id raise
MaterializeInFlightError, which the caller treats as 'skipped,
in_flight' instead of a hard error. Stale file locks (mtime older
than materialize.lock_ttl_seconds, default 86400) are reclaimed on
the next attempt — covers the rare case where a holder was hard-killed
before kernel-level flock release.

Pre-fix, when a materialize ran longer than the scheduler tick interval
(15 min), the next tick called materialize_query for the same id, hit
the unconditional tmp_path.unlink() at function entry, and started a
second COPY against the same path. Both writers interleaved bytes;
the original COPY's read_parquet validation then failed with
'No magic bytes found at end of file'.
This commit is contained in:
ZdenekSrotyr 2026-05-04 17:40:21 +02:00
parent a2afcfe59a
commit 16eaf7a399
2 changed files with 410 additions and 81 deletions

View file

@ -3,12 +3,14 @@
No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH.
"""
import fcntl
import hashlib
import logging
import os
import re
import shutil
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Any, Optional
@ -23,6 +25,109 @@ import duckdb
# not the per-source extract-file write, so we need a dedicated lock here.
_INIT_EXTRACT_LOCK = threading.Lock()
_LOCK_TTL_DEFAULT_SECONDS: int = 86400 # 24h — overridable via materialize.lock_ttl_seconds
class MaterializeInFlightError(Exception):
"""Raised when a per-table_id materialize is already running.
Caller (`_run_materialized_pass`) should treat this as a 'skipped,
in-flight' outcome — the in-flight worker will finish and write
sync_state on its own. Critically, this is NOT an error condition;
`state.set_error` MUST NOT be called for this exception or the
registry would surface a false-positive failure to the operator
every overlap."""
def __init__(self, table_id: str, layer: str = "process"):
self.table_id = table_id
self.layer = layer
super().__init__(
f"materialize for {table_id!r} already in flight ({layer} lock held)"
)
_table_locks: dict[str, threading.Lock] = {}
_table_locks_registry: threading.Lock = threading.Lock()
def _get_table_lock(table_id: str) -> threading.Lock:
"""Return the process-wide mutex for a given table_id, creating it
on first reference. The registry mutex serializes the dict mutation
only once the per-id Lock is returned, contention between callers
happens on that lock alone."""
with _table_locks_registry:
lock = _table_locks.get(table_id)
if lock is None:
lock = threading.Lock()
_table_locks[table_id] = lock
return lock
def _get_lock_ttl_seconds() -> int:
"""Read the configured stale-lock TTL with fallback to the default.
Operator override lives at instance.yaml `materialize.lock_ttl_seconds`
(also editable via /admin/server-config). Default 86400 s = 24 h
matches the upper bound of any healthy BQ COPY in practice anything
longer is a stuck process or a hung BQ session, both of which warrant
reclaim on next attempt."""
try:
from app.instance_config import get_value
v = get_value(
"materialize", "lock_ttl_seconds",
default=_LOCK_TTL_DEFAULT_SECONDS,
)
n = int(v) if v is not None else _LOCK_TTL_DEFAULT_SECONDS
return n if n > 0 else _LOCK_TTL_DEFAULT_SECONDS
except Exception:
return _LOCK_TTL_DEFAULT_SECONDS
def _try_acquire_file_lock(lock_path: Path):
"""Try to acquire an advisory exclusive flock on `lock_path`. Returns
the open file object on success (caller must close to release); None
on conflict.
Stale-lock reclaim: if the lock_path exists and its mtime is older
than the configured TTL, log a warning and unlink before retrying.
A live holder still wins the second flock attempt (kernel-level
flock isn't tied to mtime), so the reclaim doesn't break correctness
it just unblocks the case where a holder process was hard-killed
before the kernel released the lock."""
lock_path.parent.mkdir(parents=True, exist_ok=True)
def _try_open_and_flock():
f = open(lock_path, "w")
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return f
except BlockingIOError:
f.close()
return None
holder = _try_open_and_flock()
if holder is not None:
return holder
# Conflict. If the file is older than TTL, reclaim and retry once.
try:
age = time.time() - lock_path.stat().st_mtime
except FileNotFoundError:
return _try_open_and_flock()
if age > _get_lock_ttl_seconds():
logger.warning(
"Reclaiming stale materialize lock at %s (age %.1fs > TTL)",
lock_path, age,
)
try:
lock_path.unlink()
except FileNotFoundError:
pass
return _try_open_and_flock()
return None
from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError
from app.instance_config import get_value
from src.sql_safe import (
@ -395,6 +500,13 @@ def materialize_query(
Atomic write: result lands in `<id>.parquet.tmp` first, then
`os.replace` swaps it in. A failed COPY leaves no partial file behind.
Concurrency: per-``table_id`` in-process mutex + advisory file lock
on ``<table_id>.parquet.lock``. Overlapping calls for the same id
raise ``MaterializeInFlightError`` immediately so the caller can
skip cleanly without consuming the COPY budget twice. Stale file
locks (mtime > ``materialize.lock_ttl_seconds``, default 24 h) are
reclaimed automatically.
Args:
table_id: Logical id from table_registry; becomes the parquet
filename. Must pass `validate_identifier()` so it can't
@ -414,6 +526,8 @@ def materialize_query(
Raises:
ValueError: if `table_id` is unsafe or `bq.projects.billing` fails
the GCP project_id grammar check.
MaterializeInFlightError: if a concurrent call for the same table_id
is already in progress (in-process or cross-process).
MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it.
BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing /
not_configured) caller catches and aggregates into the trigger
@ -429,6 +543,16 @@ def materialize_query(
parquet_path = data_dir / f"{table_id}.parquet"
tmp_path = data_dir / f"{table_id}.parquet.tmp"
lock_path = data_dir / f"{table_id}.parquet.lock"
proc_lock = _get_table_lock(table_id)
if not proc_lock.acquire(blocking=False):
raise MaterializeInFlightError(table_id, layer="process")
try:
file_lock = _try_acquire_file_lock(lock_path)
if file_lock is None:
raise MaterializeInFlightError(table_id, layer="file")
try:
if tmp_path.exists():
tmp_path.unlink()
@ -518,6 +642,15 @@ def materialize_query(
"query_mode": "materialized",
"hash": parquet_hash,
}
finally:
try:
file_lock.close() # releases flock
except Exception:
pass
# Don't unlink lock_path — its mtime is the TTL signal for
# the next reclaim. Leaving it in place is intentional.
finally:
proc_lock.release()
def _resolve_bq_project_id() -> str:

View file

@ -0,0 +1,196 @@
"""Per-table_id concurrency: in-process mutex + advisory file lock with
TTL reclaim. Two overlapping materialize_query calls for the same id
must NOT corrupt each other's parquet."""
from __future__ import annotations
import os
import threading
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from connectors.bigquery.extractor import (
materialize_query,
MaterializeInFlightError,
_get_table_lock,
_LOCK_TTL_DEFAULT_SECONDS,
)
@pytest.fixture(autouse=True)
def reset_locks(monkeypatch):
# Tests must not share lock state across runs.
import connectors.bigquery.extractor as mod
monkeypatch.setattr(mod, "_table_locks", {})
yield
def _slow_bq(stall_seconds: float = 1.0):
"""Build a fake BqAccess whose duckdb_session COPY blocks for
`stall_seconds` so we can race a second call against it."""
bq = MagicMock()
bq.projects.billing = "prj-billing"
bq.projects.data = "prj-data"
class _Session:
def __enter__(self):
return self
def __exit__(self, *a):
return False
def execute(self, sql):
if sql.startswith("SELECT database_name"):
class _R:
def fetchall(self):
return [("memory",)]
return _R()
if sql.startswith("ATTACH"):
return MagicMock()
if sql.startswith("COPY"):
# Simulate a long-running COPY by writing a stub parquet
# then sleeping so a second call can race us.
# Extract the path from the COPY statement.
import re
m = re.search(r"TO '([^']+)'", sql)
assert m
Path(m.group(1)).write_bytes(b"PARQUET_STUB_HEADER" + b"\x00" * 200)
time.sleep(stall_seconds)
return MagicMock()
if sql.startswith("SELECT count"):
class _R:
def fetchone(self):
return (42,)
return _R()
return MagicMock()
bq.duckdb_session.return_value = _Session()
return bq
def test_concurrent_calls_for_same_id_raise_in_flight(tmp_path):
bq = _slow_bq(stall_seconds=2.0)
out_dir = str(tmp_path)
captured: list = []
def runner(tag):
try:
r = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
captured.append(("ok", tag, r))
except MaterializeInFlightError as e:
captured.append(("in_flight", tag, str(e)))
except Exception as e:
captured.append(("err", tag, str(e)))
t1 = threading.Thread(target=runner, args=("first",))
t2 = threading.Thread(target=runner, args=("second",))
t1.start()
time.sleep(0.2) # let t1 acquire the lock
t2.start()
t1.join()
t2.join()
outcomes = [c[0] for c in captured]
assert outcomes.count("ok") == 1, f"expected exactly one success, got {captured}"
assert outcomes.count("in_flight") == 1
def test_sequential_calls_for_same_id_both_succeed(tmp_path):
bq = _slow_bq(stall_seconds=0.05)
out_dir = str(tmp_path)
r1 = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
r2 = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
assert r1["rows"] == 42
assert r2["rows"] == 42
def test_different_ids_run_in_parallel(tmp_path):
bq = _slow_bq(stall_seconds=1.0)
out_dir = str(tmp_path)
captured: list = []
def runner(tid):
try:
r = materialize_query(
table_id=tid, sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
captured.append((tid, r["rows"]))
except Exception as e:
captured.append((tid, "ERROR"))
threads = [threading.Thread(target=runner, args=(f"tab_{i}",)) for i in range(3)]
start = time.time()
for t in threads: t.start()
for t in threads: t.join()
elapsed = time.time() - start
# If they were serialized, would take >= 3s. Parallel: ~1s.
assert elapsed < 2.0, f"expected parallel, elapsed={elapsed:.2f}s"
assert len(captured) == 3
assert all(c[1] == 42 for c in captured)
def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch):
"""Force a stale .lock file (mtime older than TTL) and verify a new
call reclaims it instead of raising MaterializeInFlightError."""
bq = _slow_bq(stall_seconds=0.05)
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text("")
# Set mtime to 25h ago (> default 24h TTL).
old_ts = time.time() - 25 * 3600
os.utime(lock_path, (old_ts, old_ts))
r = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=str(tmp_path), max_bytes=None,
)
assert r["rows"] == 42
def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch):
"""Force a fresh .lock file (mtime within TTL) and verify a new
call raises rather than reclaims."""
bq = _slow_bq(stall_seconds=0.05)
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
# Open the lock file and HOLD a fcntl exclusive lock so the materialize
# call's flock(LOCK_NB) sees a real conflicting lock — relying on
# mtime-only would let the test pass even if flock acquisition was
# broken.
import fcntl
holder = open(lock_path, "w")
fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
try:
with pytest.raises(MaterializeInFlightError):
materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=str(tmp_path), max_bytes=None,
)
finally:
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
holder.close()
def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch):
"""When `materialize.lock_ttl_seconds` is set in instance.yaml, that
value overrides the default."""
monkeypatch.setattr(
"app.instance_config.get_value",
lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"),
)
from connectors.bigquery.extractor import _get_lock_ttl_seconds
assert _get_lock_ttl_seconds() == 60