feat(bq-materialize): per-table mutex + file lock with TTL reclaim
Two layers of concurrency control. Layer 1 is a per-table_id threading.Lock keyed on table_id; Layer 2 is fcntl.flock on a sibling <id>.parquet.lock file. Overlapping calls for the same id raise MaterializeInFlightError, which the caller treats as 'skipped, in_flight' instead of a hard error. Stale file locks (mtime older than materialize.lock_ttl_seconds, default 86400) are reclaimed on the next attempt — covers the rare case where a holder was hard-killed before kernel-level flock release. Pre-fix, when a materialize ran longer than the scheduler tick interval (15 min), the next tick called materialize_query for the same id, hit the unconditional tmp_path.unlink() at function entry, and started a second COPY against the same path. Both writers interleaved bytes; the original COPY's read_parquet validation then failed with 'No magic bytes found at end of file'.
This commit is contained in:
parent
a2afcfe59a
commit
16eaf7a399
2 changed files with 410 additions and 81 deletions
|
|
@ -3,12 +3,14 @@
|
|||
No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH.
|
||||
"""
|
||||
|
||||
import fcntl
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
|
@ -23,6 +25,109 @@ import duckdb
|
|||
# not the per-source extract-file write, so we need a dedicated lock here.
|
||||
_INIT_EXTRACT_LOCK = threading.Lock()
|
||||
|
||||
_LOCK_TTL_DEFAULT_SECONDS: int = 86400 # 24h — overridable via materialize.lock_ttl_seconds
|
||||
|
||||
|
||||
class MaterializeInFlightError(Exception):
|
||||
"""Raised when a per-table_id materialize is already running.
|
||||
|
||||
Caller (`_run_materialized_pass`) should treat this as a 'skipped,
|
||||
in-flight' outcome — the in-flight worker will finish and write
|
||||
sync_state on its own. Critically, this is NOT an error condition;
|
||||
`state.set_error` MUST NOT be called for this exception or the
|
||||
registry would surface a false-positive failure to the operator
|
||||
every overlap."""
|
||||
|
||||
def __init__(self, table_id: str, layer: str = "process"):
|
||||
self.table_id = table_id
|
||||
self.layer = layer
|
||||
super().__init__(
|
||||
f"materialize for {table_id!r} already in flight ({layer} lock held)"
|
||||
)
|
||||
|
||||
|
||||
_table_locks: dict[str, threading.Lock] = {}
|
||||
_table_locks_registry: threading.Lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_table_lock(table_id: str) -> threading.Lock:
|
||||
"""Return the process-wide mutex for a given table_id, creating it
|
||||
on first reference. The registry mutex serializes the dict mutation
|
||||
only — once the per-id Lock is returned, contention between callers
|
||||
happens on that lock alone."""
|
||||
with _table_locks_registry:
|
||||
lock = _table_locks.get(table_id)
|
||||
if lock is None:
|
||||
lock = threading.Lock()
|
||||
_table_locks[table_id] = lock
|
||||
return lock
|
||||
|
||||
|
||||
def _get_lock_ttl_seconds() -> int:
|
||||
"""Read the configured stale-lock TTL with fallback to the default.
|
||||
|
||||
Operator override lives at instance.yaml `materialize.lock_ttl_seconds`
|
||||
(also editable via /admin/server-config). Default 86400 s = 24 h
|
||||
matches the upper bound of any healthy BQ COPY in practice — anything
|
||||
longer is a stuck process or a hung BQ session, both of which warrant
|
||||
reclaim on next attempt."""
|
||||
try:
|
||||
from app.instance_config import get_value
|
||||
v = get_value(
|
||||
"materialize", "lock_ttl_seconds",
|
||||
default=_LOCK_TTL_DEFAULT_SECONDS,
|
||||
)
|
||||
n = int(v) if v is not None else _LOCK_TTL_DEFAULT_SECONDS
|
||||
return n if n > 0 else _LOCK_TTL_DEFAULT_SECONDS
|
||||
except Exception:
|
||||
return _LOCK_TTL_DEFAULT_SECONDS
|
||||
|
||||
|
||||
def _try_acquire_file_lock(lock_path: Path):
|
||||
"""Try to acquire an advisory exclusive flock on `lock_path`. Returns
|
||||
the open file object on success (caller must close to release); None
|
||||
on conflict.
|
||||
|
||||
Stale-lock reclaim: if the lock_path exists and its mtime is older
|
||||
than the configured TTL, log a warning and unlink before retrying.
|
||||
A live holder still wins the second flock attempt (kernel-level
|
||||
flock isn't tied to mtime), so the reclaim doesn't break correctness
|
||||
— it just unblocks the case where a holder process was hard-killed
|
||||
before the kernel released the lock."""
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _try_open_and_flock():
|
||||
f = open(lock_path, "w")
|
||||
try:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
return f
|
||||
except BlockingIOError:
|
||||
f.close()
|
||||
return None
|
||||
|
||||
holder = _try_open_and_flock()
|
||||
if holder is not None:
|
||||
return holder
|
||||
|
||||
# Conflict. If the file is older than TTL, reclaim and retry once.
|
||||
try:
|
||||
age = time.time() - lock_path.stat().st_mtime
|
||||
except FileNotFoundError:
|
||||
return _try_open_and_flock()
|
||||
|
||||
if age > _get_lock_ttl_seconds():
|
||||
logger.warning(
|
||||
"Reclaiming stale materialize lock at %s (age %.1fs > TTL)",
|
||||
lock_path, age,
|
||||
)
|
||||
try:
|
||||
lock_path.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return _try_open_and_flock()
|
||||
|
||||
return None
|
||||
|
||||
from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError
|
||||
from app.instance_config import get_value
|
||||
from src.sql_safe import (
|
||||
|
|
@ -395,6 +500,13 @@ def materialize_query(
|
|||
Atomic write: result lands in `<id>.parquet.tmp` first, then
|
||||
`os.replace` swaps it in. A failed COPY leaves no partial file behind.
|
||||
|
||||
Concurrency: per-``table_id`` in-process mutex + advisory file lock
|
||||
on ``<table_id>.parquet.lock``. Overlapping calls for the same id
|
||||
raise ``MaterializeInFlightError`` immediately so the caller can
|
||||
skip cleanly without consuming the COPY budget twice. Stale file
|
||||
locks (mtime > ``materialize.lock_ttl_seconds``, default 24 h) are
|
||||
reclaimed automatically.
|
||||
|
||||
Args:
|
||||
table_id: Logical id from table_registry; becomes the parquet
|
||||
filename. Must pass `validate_identifier()` so it can't
|
||||
|
|
@ -414,6 +526,8 @@ def materialize_query(
|
|||
Raises:
|
||||
ValueError: if `table_id` is unsafe or `bq.projects.billing` fails
|
||||
the GCP project_id grammar check.
|
||||
MaterializeInFlightError: if a concurrent call for the same table_id
|
||||
is already in progress (in-process or cross-process).
|
||||
MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it.
|
||||
BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing /
|
||||
not_configured) — caller catches and aggregates into the trigger
|
||||
|
|
@ -429,6 +543,16 @@ def materialize_query(
|
|||
|
||||
parquet_path = data_dir / f"{table_id}.parquet"
|
||||
tmp_path = data_dir / f"{table_id}.parquet.tmp"
|
||||
lock_path = data_dir / f"{table_id}.parquet.lock"
|
||||
|
||||
proc_lock = _get_table_lock(table_id)
|
||||
if not proc_lock.acquire(blocking=False):
|
||||
raise MaterializeInFlightError(table_id, layer="process")
|
||||
try:
|
||||
file_lock = _try_acquire_file_lock(lock_path)
|
||||
if file_lock is None:
|
||||
raise MaterializeInFlightError(table_id, layer="file")
|
||||
try:
|
||||
if tmp_path.exists():
|
||||
tmp_path.unlink()
|
||||
|
||||
|
|
@ -518,6 +642,15 @@ def materialize_query(
|
|||
"query_mode": "materialized",
|
||||
"hash": parquet_hash,
|
||||
}
|
||||
finally:
|
||||
try:
|
||||
file_lock.close() # releases flock
|
||||
except Exception:
|
||||
pass
|
||||
# Don't unlink lock_path — its mtime is the TTL signal for
|
||||
# the next reclaim. Leaving it in place is intentional.
|
||||
finally:
|
||||
proc_lock.release()
|
||||
|
||||
|
||||
def _resolve_bq_project_id() -> str:
|
||||
|
|
|
|||
196
tests/test_bq_materialize_concurrency.py
Normal file
196
tests/test_bq_materialize_concurrency.py
Normal file
|
|
@ -0,0 +1,196 @@
|
|||
"""Per-table_id concurrency: in-process mutex + advisory file lock with
|
||||
TTL reclaim. Two overlapping materialize_query calls for the same id
|
||||
must NOT corrupt each other's parquet."""
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from connectors.bigquery.extractor import (
|
||||
materialize_query,
|
||||
MaterializeInFlightError,
|
||||
_get_table_lock,
|
||||
_LOCK_TTL_DEFAULT_SECONDS,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_locks(monkeypatch):
|
||||
# Tests must not share lock state across runs.
|
||||
import connectors.bigquery.extractor as mod
|
||||
monkeypatch.setattr(mod, "_table_locks", {})
|
||||
yield
|
||||
|
||||
|
||||
def _slow_bq(stall_seconds: float = 1.0):
|
||||
"""Build a fake BqAccess whose duckdb_session COPY blocks for
|
||||
`stall_seconds` so we can race a second call against it."""
|
||||
bq = MagicMock()
|
||||
bq.projects.billing = "prj-billing"
|
||||
bq.projects.data = "prj-data"
|
||||
|
||||
class _Session:
|
||||
def __enter__(self):
|
||||
return self
|
||||
def __exit__(self, *a):
|
||||
return False
|
||||
def execute(self, sql):
|
||||
if sql.startswith("SELECT database_name"):
|
||||
class _R:
|
||||
def fetchall(self):
|
||||
return [("memory",)]
|
||||
return _R()
|
||||
if sql.startswith("ATTACH"):
|
||||
return MagicMock()
|
||||
if sql.startswith("COPY"):
|
||||
# Simulate a long-running COPY by writing a stub parquet
|
||||
# then sleeping so a second call can race us.
|
||||
# Extract the path from the COPY statement.
|
||||
import re
|
||||
m = re.search(r"TO '([^']+)'", sql)
|
||||
assert m
|
||||
Path(m.group(1)).write_bytes(b"PARQUET_STUB_HEADER" + b"\x00" * 200)
|
||||
time.sleep(stall_seconds)
|
||||
return MagicMock()
|
||||
if sql.startswith("SELECT count"):
|
||||
class _R:
|
||||
def fetchone(self):
|
||||
return (42,)
|
||||
return _R()
|
||||
return MagicMock()
|
||||
|
||||
bq.duckdb_session.return_value = _Session()
|
||||
return bq
|
||||
|
||||
|
||||
def test_concurrent_calls_for_same_id_raise_in_flight(tmp_path):
|
||||
bq = _slow_bq(stall_seconds=2.0)
|
||||
|
||||
out_dir = str(tmp_path)
|
||||
captured: list = []
|
||||
|
||||
def runner(tag):
|
||||
try:
|
||||
r = materialize_query(
|
||||
table_id="t1", sql="SELECT 1",
|
||||
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||
)
|
||||
captured.append(("ok", tag, r))
|
||||
except MaterializeInFlightError as e:
|
||||
captured.append(("in_flight", tag, str(e)))
|
||||
except Exception as e:
|
||||
captured.append(("err", tag, str(e)))
|
||||
|
||||
t1 = threading.Thread(target=runner, args=("first",))
|
||||
t2 = threading.Thread(target=runner, args=("second",))
|
||||
t1.start()
|
||||
time.sleep(0.2) # let t1 acquire the lock
|
||||
t2.start()
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
||||
outcomes = [c[0] for c in captured]
|
||||
assert outcomes.count("ok") == 1, f"expected exactly one success, got {captured}"
|
||||
assert outcomes.count("in_flight") == 1
|
||||
|
||||
|
||||
def test_sequential_calls_for_same_id_both_succeed(tmp_path):
|
||||
bq = _slow_bq(stall_seconds=0.05)
|
||||
|
||||
out_dir = str(tmp_path)
|
||||
r1 = materialize_query(
|
||||
table_id="t1", sql="SELECT 1",
|
||||
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||
)
|
||||
r2 = materialize_query(
|
||||
table_id="t1", sql="SELECT 1",
|
||||
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||
)
|
||||
assert r1["rows"] == 42
|
||||
assert r2["rows"] == 42
|
||||
|
||||
|
||||
def test_different_ids_run_in_parallel(tmp_path):
|
||||
bq = _slow_bq(stall_seconds=1.0)
|
||||
out_dir = str(tmp_path)
|
||||
captured: list = []
|
||||
|
||||
def runner(tid):
|
||||
try:
|
||||
r = materialize_query(
|
||||
table_id=tid, sql="SELECT 1",
|
||||
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||
)
|
||||
captured.append((tid, r["rows"]))
|
||||
except Exception as e:
|
||||
captured.append((tid, "ERROR"))
|
||||
|
||||
threads = [threading.Thread(target=runner, args=(f"tab_{i}",)) for i in range(3)]
|
||||
start = time.time()
|
||||
for t in threads: t.start()
|
||||
for t in threads: t.join()
|
||||
elapsed = time.time() - start
|
||||
# If they were serialized, would take >= 3s. Parallel: ~1s.
|
||||
assert elapsed < 2.0, f"expected parallel, elapsed={elapsed:.2f}s"
|
||||
assert len(captured) == 3
|
||||
assert all(c[1] == 42 for c in captured)
|
||||
|
||||
|
||||
def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch):
|
||||
"""Force a stale .lock file (mtime older than TTL) and verify a new
|
||||
call reclaims it instead of raising MaterializeInFlightError."""
|
||||
bq = _slow_bq(stall_seconds=0.05)
|
||||
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
lock_path.write_text("")
|
||||
|
||||
# Set mtime to 25h ago (> default 24h TTL).
|
||||
old_ts = time.time() - 25 * 3600
|
||||
os.utime(lock_path, (old_ts, old_ts))
|
||||
|
||||
r = materialize_query(
|
||||
table_id="t1", sql="SELECT 1",
|
||||
bq=bq, output_dir=str(tmp_path), max_bytes=None,
|
||||
)
|
||||
assert r["rows"] == 42
|
||||
|
||||
|
||||
def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch):
|
||||
"""Force a fresh .lock file (mtime within TTL) and verify a new
|
||||
call raises rather than reclaims."""
|
||||
bq = _slow_bq(stall_seconds=0.05)
|
||||
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
|
||||
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Open the lock file and HOLD a fcntl exclusive lock so the materialize
|
||||
# call's flock(LOCK_NB) sees a real conflicting lock — relying on
|
||||
# mtime-only would let the test pass even if flock acquisition was
|
||||
# broken.
|
||||
import fcntl
|
||||
holder = open(lock_path, "w")
|
||||
fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
try:
|
||||
with pytest.raises(MaterializeInFlightError):
|
||||
materialize_query(
|
||||
table_id="t1", sql="SELECT 1",
|
||||
bq=bq, output_dir=str(tmp_path), max_bytes=None,
|
||||
)
|
||||
finally:
|
||||
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
|
||||
holder.close()
|
||||
|
||||
|
||||
def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch):
|
||||
"""When `materialize.lock_ttl_seconds` is set in instance.yaml, that
|
||||
value overrides the default."""
|
||||
monkeypatch.setattr(
|
||||
"app.instance_config.get_value",
|
||||
lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"),
|
||||
)
|
||||
|
||||
from connectors.bigquery.extractor import _get_lock_ttl_seconds
|
||||
assert _get_lock_ttl_seconds() == 60
|
||||
Loading…
Reference in a new issue