feat(bq-materialize): per-table mutex + file lock with TTL reclaim

Two layers of concurrency control. Layer 1 is a per-table_id
threading.Lock keyed on table_id; Layer 2 is fcntl.flock on a sibling
<id>.parquet.lock file. Overlapping calls for the same id raise
MaterializeInFlightError, which the caller treats as 'skipped,
in_flight' instead of a hard error. Stale file locks (mtime older
than materialize.lock_ttl_seconds, default 86400) are reclaimed on
the next attempt — covers the rare case where a holder was hard-killed
before kernel-level flock release.

Pre-fix, when a materialize ran longer than the scheduler tick interval
(15 min), the next tick called materialize_query for the same id, hit
the unconditional tmp_path.unlink() at function entry, and started a
second COPY against the same path. Both writers interleaved bytes;
the original COPY's read_parquet validation then failed with
'No magic bytes found at end of file'.
This commit is contained in:
ZdenekSrotyr 2026-05-04 17:40:21 +02:00
parent a2afcfe59a
commit 16eaf7a399
2 changed files with 410 additions and 81 deletions

View file

@ -3,12 +3,14 @@
No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH. No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH.
""" """
import fcntl
import hashlib import hashlib
import logging import logging
import os import os
import re import re
import shutil import shutil
import threading import threading
import time
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
@ -23,6 +25,109 @@ import duckdb
# not the per-source extract-file write, so we need a dedicated lock here. # not the per-source extract-file write, so we need a dedicated lock here.
_INIT_EXTRACT_LOCK = threading.Lock() _INIT_EXTRACT_LOCK = threading.Lock()
_LOCK_TTL_DEFAULT_SECONDS: int = 86400 # 24h — overridable via materialize.lock_ttl_seconds
class MaterializeInFlightError(Exception):
"""Raised when a per-table_id materialize is already running.
Caller (`_run_materialized_pass`) should treat this as a 'skipped,
in-flight' outcome — the in-flight worker will finish and write
sync_state on its own. Critically, this is NOT an error condition;
`state.set_error` MUST NOT be called for this exception or the
registry would surface a false-positive failure to the operator
every overlap."""
def __init__(self, table_id: str, layer: str = "process"):
self.table_id = table_id
self.layer = layer
super().__init__(
f"materialize for {table_id!r} already in flight ({layer} lock held)"
)
_table_locks: dict[str, threading.Lock] = {}
_table_locks_registry: threading.Lock = threading.Lock()
def _get_table_lock(table_id: str) -> threading.Lock:
"""Return the process-wide mutex for a given table_id, creating it
on first reference. The registry mutex serializes the dict mutation
only once the per-id Lock is returned, contention between callers
happens on that lock alone."""
with _table_locks_registry:
lock = _table_locks.get(table_id)
if lock is None:
lock = threading.Lock()
_table_locks[table_id] = lock
return lock
def _get_lock_ttl_seconds() -> int:
"""Read the configured stale-lock TTL with fallback to the default.
Operator override lives at instance.yaml `materialize.lock_ttl_seconds`
(also editable via /admin/server-config). Default 86400 s = 24 h
matches the upper bound of any healthy BQ COPY in practice anything
longer is a stuck process or a hung BQ session, both of which warrant
reclaim on next attempt."""
try:
from app.instance_config import get_value
v = get_value(
"materialize", "lock_ttl_seconds",
default=_LOCK_TTL_DEFAULT_SECONDS,
)
n = int(v) if v is not None else _LOCK_TTL_DEFAULT_SECONDS
return n if n > 0 else _LOCK_TTL_DEFAULT_SECONDS
except Exception:
return _LOCK_TTL_DEFAULT_SECONDS
def _try_acquire_file_lock(lock_path: Path):
"""Try to acquire an advisory exclusive flock on `lock_path`. Returns
the open file object on success (caller must close to release); None
on conflict.
Stale-lock reclaim: if the lock_path exists and its mtime is older
than the configured TTL, log a warning and unlink before retrying.
A live holder still wins the second flock attempt (kernel-level
flock isn't tied to mtime), so the reclaim doesn't break correctness
it just unblocks the case where a holder process was hard-killed
before the kernel released the lock."""
lock_path.parent.mkdir(parents=True, exist_ok=True)
def _try_open_and_flock():
f = open(lock_path, "w")
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return f
except BlockingIOError:
f.close()
return None
holder = _try_open_and_flock()
if holder is not None:
return holder
# Conflict. If the file is older than TTL, reclaim and retry once.
try:
age = time.time() - lock_path.stat().st_mtime
except FileNotFoundError:
return _try_open_and_flock()
if age > _get_lock_ttl_seconds():
logger.warning(
"Reclaiming stale materialize lock at %s (age %.1fs > TTL)",
lock_path, age,
)
try:
lock_path.unlink()
except FileNotFoundError:
pass
return _try_open_and_flock()
return None
from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError
from app.instance_config import get_value from app.instance_config import get_value
from src.sql_safe import ( from src.sql_safe import (
@ -395,6 +500,13 @@ def materialize_query(
Atomic write: result lands in `<id>.parquet.tmp` first, then Atomic write: result lands in `<id>.parquet.tmp` first, then
`os.replace` swaps it in. A failed COPY leaves no partial file behind. `os.replace` swaps it in. A failed COPY leaves no partial file behind.
Concurrency: per-``table_id`` in-process mutex + advisory file lock
on ``<table_id>.parquet.lock``. Overlapping calls for the same id
raise ``MaterializeInFlightError`` immediately so the caller can
skip cleanly without consuming the COPY budget twice. Stale file
locks (mtime > ``materialize.lock_ttl_seconds``, default 24 h) are
reclaimed automatically.
Args: Args:
table_id: Logical id from table_registry; becomes the parquet table_id: Logical id from table_registry; becomes the parquet
filename. Must pass `validate_identifier()` so it can't filename. Must pass `validate_identifier()` so it can't
@ -414,6 +526,8 @@ def materialize_query(
Raises: Raises:
ValueError: if `table_id` is unsafe or `bq.projects.billing` fails ValueError: if `table_id` is unsafe or `bq.projects.billing` fails
the GCP project_id grammar check. the GCP project_id grammar check.
MaterializeInFlightError: if a concurrent call for the same table_id
is already in progress (in-process or cross-process).
MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it. MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it.
BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing / BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing /
not_configured) caller catches and aggregates into the trigger not_configured) caller catches and aggregates into the trigger
@ -429,95 +543,114 @@ def materialize_query(
parquet_path = data_dir / f"{table_id}.parquet" parquet_path = data_dir / f"{table_id}.parquet"
tmp_path = data_dir / f"{table_id}.parquet.tmp" tmp_path = data_dir / f"{table_id}.parquet.tmp"
if tmp_path.exists(): lock_path = data_dir / f"{table_id}.parquet.lock"
tmp_path.unlink()
# Build the wrapped SQL once — both the cost guardrail dry-run and proc_lock = _get_table_lock(table_id)
# the COPY operate on `sql` (the inner BQ SQL); only the COPY needs if not proc_lock.acquire(blocking=False):
# the DuckDB-side bigquery_query() envelope. raise MaterializeInFlightError(table_id, layer="process")
billing_project = bq.projects.billing try:
wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql) file_lock = _try_acquire_file_lock(lock_path)
if file_lock is None:
if max_bytes is not None and max_bytes > 0: raise MaterializeInFlightError(table_id, layer="file")
try: try:
from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl
estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native)
except Exception as e:
logger.warning(
"BQ dry-run failed for materialize cost guardrail (fail-open): %s. "
"Proceeding with COPY against `bigquery_query()` wrapping.",
e,
)
estimated = 0
if estimated > max_bytes:
raise MaterializeBudgetError(
f"dry-run estimate {estimated:,} bytes exceeds cap "
f"{max_bytes:,} for table {table_id!r}",
table_id=table_id,
current=estimated,
limit=max_bytes,
)
# COPY through a BqAccess-managed session. The session has the BQ
# extension loaded with a SECRET token; bigquery_query() reuses that
# auth path against the billing_project for the jobs API call.
with bq.duckdb_session() as conn:
attached = {
r[0] for r in conn.execute(
"SELECT database_name FROM duckdb_databases()"
).fetchall()
}
if "bq" not in attached:
conn.execute(
f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)"
)
try:
safe_path = _escape_sql_string_literal(str(tmp_path))
conn.execute(
f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)"
)
rows = conn.execute(
f"SELECT count(*) FROM read_parquet('{safe_path}')"
).fetchone()[0]
except Exception:
if tmp_path.exists(): if tmp_path.exists():
tmp_path.unlink() tmp_path.unlink()
raise
# Compute the parquet hash inline before the atomic swap. The caller used # Build the wrapped SQL once — both the cost guardrail dry-run and
# to re-read the file in `_run_materialized_pass` to hash it via # the COPY operate on `sql` (the inner BQ SQL); only the COPY needs
# `_file_hash`, but that's a synchronous full-read on the FastAPI worker # the DuckDB-side bigquery_query() envelope.
# thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other billing_project = bq.projects.billing
# requests. Hashing here keeps the open-file handle hot from the COPY wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql)
# round and removes the second read. Devil's-advocate review item.
h = hashlib.md5()
with open(tmp_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
parquet_hash = h.hexdigest()
size_bytes = tmp_path.stat().st_size if max_bytes is not None and max_bytes > 0:
os.replace(tmp_path, parquet_path) try:
from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl
estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native)
except Exception as e:
logger.warning(
"BQ dry-run failed for materialize cost guardrail (fail-open): %s. "
"Proceeding with COPY against `bigquery_query()` wrapping.",
e,
)
estimated = 0
if estimated > max_bytes:
raise MaterializeBudgetError(
f"dry-run estimate {estimated:,} bytes exceeds cap "
f"{max_bytes:,} for table {table_id!r}",
table_id=table_id,
current=estimated,
limit=max_bytes,
)
rows = int(rows) # COPY through a BqAccess-managed session. The session has the BQ
if rows == 0: # extension loaded with a SECRET token; bigquery_query() reuses that
# 0 rows is indistinguishable from "the SQL is wrong and nobody # auth path against the billing_project for the jobs API call.
# noticed" — surface it loudly so operators see it in the scheduler with bq.duckdb_session() as conn:
# log line and the per-row error aggregation. Caller decides whether attached = {
# to alert. r[0] for r in conn.execute(
logger.warning( "SELECT database_name FROM duckdb_databases()"
"Materialized %s produced 0 rows — verify the SQL filter is " ).fetchall()
"intentional. Parquet written: %s", }
table_id, parquet_path, if "bq" not in attached:
) conn.execute(
f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)"
)
return { try:
"rows": rows, safe_path = _escape_sql_string_literal(str(tmp_path))
"size_bytes": size_bytes, conn.execute(
"query_mode": "materialized", f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)"
"hash": parquet_hash, )
} rows = conn.execute(
f"SELECT count(*) FROM read_parquet('{safe_path}')"
).fetchone()[0]
except Exception:
if tmp_path.exists():
tmp_path.unlink()
raise
# Compute the parquet hash inline before the atomic swap. The caller used
# to re-read the file in `_run_materialized_pass` to hash it via
# `_file_hash`, but that's a synchronous full-read on the FastAPI worker
# thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other
# requests. Hashing here keeps the open-file handle hot from the COPY
# round and removes the second read. Devil's-advocate review item.
h = hashlib.md5()
with open(tmp_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
parquet_hash = h.hexdigest()
size_bytes = tmp_path.stat().st_size
os.replace(tmp_path, parquet_path)
rows = int(rows)
if rows == 0:
# 0 rows is indistinguishable from "the SQL is wrong and nobody
# noticed" — surface it loudly so operators see it in the scheduler
# log line and the per-row error aggregation. Caller decides whether
# to alert.
logger.warning(
"Materialized %s produced 0 rows — verify the SQL filter is "
"intentional. Parquet written: %s",
table_id, parquet_path,
)
return {
"rows": rows,
"size_bytes": size_bytes,
"query_mode": "materialized",
"hash": parquet_hash,
}
finally:
try:
file_lock.close() # releases flock
except Exception:
pass
# Don't unlink lock_path — its mtime is the TTL signal for
# the next reclaim. Leaving it in place is intentional.
finally:
proc_lock.release()
def _resolve_bq_project_id() -> str: def _resolve_bq_project_id() -> str:

View file

@ -0,0 +1,196 @@
"""Per-table_id concurrency: in-process mutex + advisory file lock with
TTL reclaim. Two overlapping materialize_query calls for the same id
must NOT corrupt each other's parquet."""
from __future__ import annotations
import os
import threading
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from connectors.bigquery.extractor import (
materialize_query,
MaterializeInFlightError,
_get_table_lock,
_LOCK_TTL_DEFAULT_SECONDS,
)
@pytest.fixture(autouse=True)
def reset_locks(monkeypatch):
# Tests must not share lock state across runs.
import connectors.bigquery.extractor as mod
monkeypatch.setattr(mod, "_table_locks", {})
yield
def _slow_bq(stall_seconds: float = 1.0):
"""Build a fake BqAccess whose duckdb_session COPY blocks for
`stall_seconds` so we can race a second call against it."""
bq = MagicMock()
bq.projects.billing = "prj-billing"
bq.projects.data = "prj-data"
class _Session:
def __enter__(self):
return self
def __exit__(self, *a):
return False
def execute(self, sql):
if sql.startswith("SELECT database_name"):
class _R:
def fetchall(self):
return [("memory",)]
return _R()
if sql.startswith("ATTACH"):
return MagicMock()
if sql.startswith("COPY"):
# Simulate a long-running COPY by writing a stub parquet
# then sleeping so a second call can race us.
# Extract the path from the COPY statement.
import re
m = re.search(r"TO '([^']+)'", sql)
assert m
Path(m.group(1)).write_bytes(b"PARQUET_STUB_HEADER" + b"\x00" * 200)
time.sleep(stall_seconds)
return MagicMock()
if sql.startswith("SELECT count"):
class _R:
def fetchone(self):
return (42,)
return _R()
return MagicMock()
bq.duckdb_session.return_value = _Session()
return bq
def test_concurrent_calls_for_same_id_raise_in_flight(tmp_path):
bq = _slow_bq(stall_seconds=2.0)
out_dir = str(tmp_path)
captured: list = []
def runner(tag):
try:
r = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
captured.append(("ok", tag, r))
except MaterializeInFlightError as e:
captured.append(("in_flight", tag, str(e)))
except Exception as e:
captured.append(("err", tag, str(e)))
t1 = threading.Thread(target=runner, args=("first",))
t2 = threading.Thread(target=runner, args=("second",))
t1.start()
time.sleep(0.2) # let t1 acquire the lock
t2.start()
t1.join()
t2.join()
outcomes = [c[0] for c in captured]
assert outcomes.count("ok") == 1, f"expected exactly one success, got {captured}"
assert outcomes.count("in_flight") == 1
def test_sequential_calls_for_same_id_both_succeed(tmp_path):
bq = _slow_bq(stall_seconds=0.05)
out_dir = str(tmp_path)
r1 = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
r2 = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
assert r1["rows"] == 42
assert r2["rows"] == 42
def test_different_ids_run_in_parallel(tmp_path):
bq = _slow_bq(stall_seconds=1.0)
out_dir = str(tmp_path)
captured: list = []
def runner(tid):
try:
r = materialize_query(
table_id=tid, sql="SELECT 1",
bq=bq, output_dir=out_dir, max_bytes=None,
)
captured.append((tid, r["rows"]))
except Exception as e:
captured.append((tid, "ERROR"))
threads = [threading.Thread(target=runner, args=(f"tab_{i}",)) for i in range(3)]
start = time.time()
for t in threads: t.start()
for t in threads: t.join()
elapsed = time.time() - start
# If they were serialized, would take >= 3s. Parallel: ~1s.
assert elapsed < 2.0, f"expected parallel, elapsed={elapsed:.2f}s"
assert len(captured) == 3
assert all(c[1] == 42 for c in captured)
def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch):
"""Force a stale .lock file (mtime older than TTL) and verify a new
call reclaims it instead of raising MaterializeInFlightError."""
bq = _slow_bq(stall_seconds=0.05)
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text("")
# Set mtime to 25h ago (> default 24h TTL).
old_ts = time.time() - 25 * 3600
os.utime(lock_path, (old_ts, old_ts))
r = materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=str(tmp_path), max_bytes=None,
)
assert r["rows"] == 42
def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch):
"""Force a fresh .lock file (mtime within TTL) and verify a new
call raises rather than reclaims."""
bq = _slow_bq(stall_seconds=0.05)
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
# Open the lock file and HOLD a fcntl exclusive lock so the materialize
# call's flock(LOCK_NB) sees a real conflicting lock — relying on
# mtime-only would let the test pass even if flock acquisition was
# broken.
import fcntl
holder = open(lock_path, "w")
fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
try:
with pytest.raises(MaterializeInFlightError):
materialize_query(
table_id="t1", sql="SELECT 1",
bq=bq, output_dir=str(tmp_path), max_bytes=None,
)
finally:
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
holder.close()
def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch):
"""When `materialize.lock_ttl_seconds` is set in instance.yaml, that
value overrides the default."""
monkeypatch.setattr(
"app.instance_config.get_value",
lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"),
)
from connectors.bigquery.extractor import _get_lock_ttl_seconds
assert _get_lock_ttl_seconds() == 60