From 16eaf7a3991e859bc12ec5bc51ca9583030771c3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 17:40:21 +0200 Subject: [PATCH] feat(bq-materialize): per-table mutex + file lock with TTL reclaim MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two layers of concurrency control. Layer 1 is a per-table_id threading.Lock keyed on table_id; Layer 2 is fcntl.flock on a sibling .parquet.lock file. Overlapping calls for the same id raise MaterializeInFlightError, which the caller treats as 'skipped, in_flight' instead of a hard error. Stale file locks (mtime older than materialize.lock_ttl_seconds, default 86400) are reclaimed on the next attempt — covers the rare case where a holder was hard-killed before kernel-level flock release. Pre-fix, when a materialize ran longer than the scheduler tick interval (15 min), the next tick called materialize_query for the same id, hit the unconditional tmp_path.unlink() at function entry, and started a second COPY against the same path. Both writers interleaved bytes; the original COPY's read_parquet validation then failed with 'No magic bytes found at end of file'. --- connectors/bigquery/extractor.py | 295 ++++++++++++++++------- tests/test_bq_materialize_concurrency.py | 196 +++++++++++++++ 2 files changed, 410 insertions(+), 81 deletions(-) create mode 100644 tests/test_bq_materialize_concurrency.py diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index e8dcebf..7b158a0 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -3,12 +3,14 @@ No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH. """ +import fcntl import hashlib import logging import os import re import shutil import threading +import time from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Any, Optional @@ -23,6 +25,109 @@ import duckdb # not the per-source extract-file write, so we need a dedicated lock here. _INIT_EXTRACT_LOCK = threading.Lock() +_LOCK_TTL_DEFAULT_SECONDS: int = 86400 # 24h — overridable via materialize.lock_ttl_seconds + + +class MaterializeInFlightError(Exception): + """Raised when a per-table_id materialize is already running. + + Caller (`_run_materialized_pass`) should treat this as a 'skipped, + in-flight' outcome — the in-flight worker will finish and write + sync_state on its own. Critically, this is NOT an error condition; + `state.set_error` MUST NOT be called for this exception or the + registry would surface a false-positive failure to the operator + every overlap.""" + + def __init__(self, table_id: str, layer: str = "process"): + self.table_id = table_id + self.layer = layer + super().__init__( + f"materialize for {table_id!r} already in flight ({layer} lock held)" + ) + + +_table_locks: dict[str, threading.Lock] = {} +_table_locks_registry: threading.Lock = threading.Lock() + + +def _get_table_lock(table_id: str) -> threading.Lock: + """Return the process-wide mutex for a given table_id, creating it + on first reference. The registry mutex serializes the dict mutation + only — once the per-id Lock is returned, contention between callers + happens on that lock alone.""" + with _table_locks_registry: + lock = _table_locks.get(table_id) + if lock is None: + lock = threading.Lock() + _table_locks[table_id] = lock + return lock + + +def _get_lock_ttl_seconds() -> int: + """Read the configured stale-lock TTL with fallback to the default. + + Operator override lives at instance.yaml `materialize.lock_ttl_seconds` + (also editable via /admin/server-config). Default 86400 s = 24 h + matches the upper bound of any healthy BQ COPY in practice — anything + longer is a stuck process or a hung BQ session, both of which warrant + reclaim on next attempt.""" + try: + from app.instance_config import get_value + v = get_value( + "materialize", "lock_ttl_seconds", + default=_LOCK_TTL_DEFAULT_SECONDS, + ) + n = int(v) if v is not None else _LOCK_TTL_DEFAULT_SECONDS + return n if n > 0 else _LOCK_TTL_DEFAULT_SECONDS + except Exception: + return _LOCK_TTL_DEFAULT_SECONDS + + +def _try_acquire_file_lock(lock_path: Path): + """Try to acquire an advisory exclusive flock on `lock_path`. Returns + the open file object on success (caller must close to release); None + on conflict. + + Stale-lock reclaim: if the lock_path exists and its mtime is older + than the configured TTL, log a warning and unlink before retrying. + A live holder still wins the second flock attempt (kernel-level + flock isn't tied to mtime), so the reclaim doesn't break correctness + — it just unblocks the case where a holder process was hard-killed + before the kernel released the lock.""" + lock_path.parent.mkdir(parents=True, exist_ok=True) + + def _try_open_and_flock(): + f = open(lock_path, "w") + try: + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + return f + except BlockingIOError: + f.close() + return None + + holder = _try_open_and_flock() + if holder is not None: + return holder + + # Conflict. If the file is older than TTL, reclaim and retry once. + try: + age = time.time() - lock_path.stat().st_mtime + except FileNotFoundError: + return _try_open_and_flock() + + if age > _get_lock_ttl_seconds(): + logger.warning( + "Reclaiming stale materialize lock at %s (age %.1fs > TTL)", + lock_path, age, + ) + try: + lock_path.unlink() + except FileNotFoundError: + pass + return _try_open_and_flock() + + return None + from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError from app.instance_config import get_value from src.sql_safe import ( @@ -395,6 +500,13 @@ def materialize_query( Atomic write: result lands in `.parquet.tmp` first, then `os.replace` swaps it in. A failed COPY leaves no partial file behind. + Concurrency: per-``table_id`` in-process mutex + advisory file lock + on ``.parquet.lock``. Overlapping calls for the same id + raise ``MaterializeInFlightError`` immediately so the caller can + skip cleanly without consuming the COPY budget twice. Stale file + locks (mtime > ``materialize.lock_ttl_seconds``, default 24 h) are + reclaimed automatically. + Args: table_id: Logical id from table_registry; becomes the parquet filename. Must pass `validate_identifier()` so it can't @@ -414,6 +526,8 @@ def materialize_query( Raises: ValueError: if `table_id` is unsafe or `bq.projects.billing` fails the GCP project_id grammar check. + MaterializeInFlightError: if a concurrent call for the same table_id + is already in progress (in-process or cross-process). MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it. BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing / not_configured) — caller catches and aggregates into the trigger @@ -429,95 +543,114 @@ def materialize_query( parquet_path = data_dir / f"{table_id}.parquet" tmp_path = data_dir / f"{table_id}.parquet.tmp" - if tmp_path.exists(): - tmp_path.unlink() + lock_path = data_dir / f"{table_id}.parquet.lock" - # Build the wrapped SQL once — both the cost guardrail dry-run and - # the COPY operate on `sql` (the inner BQ SQL); only the COPY needs - # the DuckDB-side bigquery_query() envelope. - billing_project = bq.projects.billing - wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql) - - if max_bytes is not None and max_bytes > 0: + proc_lock = _get_table_lock(table_id) + if not proc_lock.acquire(blocking=False): + raise MaterializeInFlightError(table_id, layer="process") + try: + file_lock = _try_acquire_file_lock(lock_path) + if file_lock is None: + raise MaterializeInFlightError(table_id, layer="file") try: - from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl - estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native) - except Exception as e: - logger.warning( - "BQ dry-run failed for materialize cost guardrail (fail-open): %s. " - "Proceeding with COPY against `bigquery_query()` wrapping.", - e, - ) - estimated = 0 - if estimated > max_bytes: - raise MaterializeBudgetError( - f"dry-run estimate {estimated:,} bytes exceeds cap " - f"{max_bytes:,} for table {table_id!r}", - table_id=table_id, - current=estimated, - limit=max_bytes, - ) - - # COPY through a BqAccess-managed session. The session has the BQ - # extension loaded with a SECRET token; bigquery_query() reuses that - # auth path against the billing_project for the jobs API call. - with bq.duckdb_session() as conn: - attached = { - r[0] for r in conn.execute( - "SELECT database_name FROM duckdb_databases()" - ).fetchall() - } - if "bq" not in attached: - conn.execute( - f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)" - ) - - try: - safe_path = _escape_sql_string_literal(str(tmp_path)) - conn.execute( - f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" - ) - rows = conn.execute( - f"SELECT count(*) FROM read_parquet('{safe_path}')" - ).fetchone()[0] - except Exception: if tmp_path.exists(): tmp_path.unlink() - raise - # Compute the parquet hash inline before the atomic swap. The caller used - # to re-read the file in `_run_materialized_pass` to hash it via - # `_file_hash`, but that's a synchronous full-read on the FastAPI worker - # thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other - # requests. Hashing here keeps the open-file handle hot from the COPY - # round and removes the second read. Devil's-advocate review item. - h = hashlib.md5() - with open(tmp_path, "rb") as f: - for chunk in iter(lambda: f.read(8192), b""): - h.update(chunk) - parquet_hash = h.hexdigest() + # Build the wrapped SQL once — both the cost guardrail dry-run and + # the COPY operate on `sql` (the inner BQ SQL); only the COPY needs + # the DuckDB-side bigquery_query() envelope. + billing_project = bq.projects.billing + wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql) - size_bytes = tmp_path.stat().st_size - os.replace(tmp_path, parquet_path) + if max_bytes is not None and max_bytes > 0: + try: + from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl + estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native) + except Exception as e: + logger.warning( + "BQ dry-run failed for materialize cost guardrail (fail-open): %s. " + "Proceeding with COPY against `bigquery_query()` wrapping.", + e, + ) + estimated = 0 + if estimated > max_bytes: + raise MaterializeBudgetError( + f"dry-run estimate {estimated:,} bytes exceeds cap " + f"{max_bytes:,} for table {table_id!r}", + table_id=table_id, + current=estimated, + limit=max_bytes, + ) - rows = int(rows) - if rows == 0: - # 0 rows is indistinguishable from "the SQL is wrong and nobody - # noticed" — surface it loudly so operators see it in the scheduler - # log line and the per-row error aggregation. Caller decides whether - # to alert. - logger.warning( - "Materialized %s produced 0 rows — verify the SQL filter is " - "intentional. Parquet written: %s", - table_id, parquet_path, - ) + # COPY through a BqAccess-managed session. The session has the BQ + # extension loaded with a SECRET token; bigquery_query() reuses that + # auth path against the billing_project for the jobs API call. + with bq.duckdb_session() as conn: + attached = { + r[0] for r in conn.execute( + "SELECT database_name FROM duckdb_databases()" + ).fetchall() + } + if "bq" not in attached: + conn.execute( + f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)" + ) - return { - "rows": rows, - "size_bytes": size_bytes, - "query_mode": "materialized", - "hash": parquet_hash, - } + try: + safe_path = _escape_sql_string_literal(str(tmp_path)) + conn.execute( + f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)" + ) + rows = conn.execute( + f"SELECT count(*) FROM read_parquet('{safe_path}')" + ).fetchone()[0] + except Exception: + if tmp_path.exists(): + tmp_path.unlink() + raise + + # Compute the parquet hash inline before the atomic swap. The caller used + # to re-read the file in `_run_materialized_pass` to hash it via + # `_file_hash`, but that's a synchronous full-read on the FastAPI worker + # thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other + # requests. Hashing here keeps the open-file handle hot from the COPY + # round and removes the second read. Devil's-advocate review item. + h = hashlib.md5() + with open(tmp_path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + parquet_hash = h.hexdigest() + + size_bytes = tmp_path.stat().st_size + os.replace(tmp_path, parquet_path) + + rows = int(rows) + if rows == 0: + # 0 rows is indistinguishable from "the SQL is wrong and nobody + # noticed" — surface it loudly so operators see it in the scheduler + # log line and the per-row error aggregation. Caller decides whether + # to alert. + logger.warning( + "Materialized %s produced 0 rows — verify the SQL filter is " + "intentional. Parquet written: %s", + table_id, parquet_path, + ) + + return { + "rows": rows, + "size_bytes": size_bytes, + "query_mode": "materialized", + "hash": parquet_hash, + } + finally: + try: + file_lock.close() # releases flock + except Exception: + pass + # Don't unlink lock_path — its mtime is the TTL signal for + # the next reclaim. Leaving it in place is intentional. + finally: + proc_lock.release() def _resolve_bq_project_id() -> str: diff --git a/tests/test_bq_materialize_concurrency.py b/tests/test_bq_materialize_concurrency.py new file mode 100644 index 0000000..372600e --- /dev/null +++ b/tests/test_bq_materialize_concurrency.py @@ -0,0 +1,196 @@ +"""Per-table_id concurrency: in-process mutex + advisory file lock with +TTL reclaim. Two overlapping materialize_query calls for the same id +must NOT corrupt each other's parquet.""" +from __future__ import annotations +import os +import threading +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from connectors.bigquery.extractor import ( + materialize_query, + MaterializeInFlightError, + _get_table_lock, + _LOCK_TTL_DEFAULT_SECONDS, +) + + +@pytest.fixture(autouse=True) +def reset_locks(monkeypatch): + # Tests must not share lock state across runs. + import connectors.bigquery.extractor as mod + monkeypatch.setattr(mod, "_table_locks", {}) + yield + + +def _slow_bq(stall_seconds: float = 1.0): + """Build a fake BqAccess whose duckdb_session COPY blocks for + `stall_seconds` so we can race a second call against it.""" + bq = MagicMock() + bq.projects.billing = "prj-billing" + bq.projects.data = "prj-data" + + class _Session: + def __enter__(self): + return self + def __exit__(self, *a): + return False + def execute(self, sql): + if sql.startswith("SELECT database_name"): + class _R: + def fetchall(self): + return [("memory",)] + return _R() + if sql.startswith("ATTACH"): + return MagicMock() + if sql.startswith("COPY"): + # Simulate a long-running COPY by writing a stub parquet + # then sleeping so a second call can race us. + # Extract the path from the COPY statement. + import re + m = re.search(r"TO '([^']+)'", sql) + assert m + Path(m.group(1)).write_bytes(b"PARQUET_STUB_HEADER" + b"\x00" * 200) + time.sleep(stall_seconds) + return MagicMock() + if sql.startswith("SELECT count"): + class _R: + def fetchone(self): + return (42,) + return _R() + return MagicMock() + + bq.duckdb_session.return_value = _Session() + return bq + + +def test_concurrent_calls_for_same_id_raise_in_flight(tmp_path): + bq = _slow_bq(stall_seconds=2.0) + + out_dir = str(tmp_path) + captured: list = [] + + def runner(tag): + try: + r = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + captured.append(("ok", tag, r)) + except MaterializeInFlightError as e: + captured.append(("in_flight", tag, str(e))) + except Exception as e: + captured.append(("err", tag, str(e))) + + t1 = threading.Thread(target=runner, args=("first",)) + t2 = threading.Thread(target=runner, args=("second",)) + t1.start() + time.sleep(0.2) # let t1 acquire the lock + t2.start() + t1.join() + t2.join() + + outcomes = [c[0] for c in captured] + assert outcomes.count("ok") == 1, f"expected exactly one success, got {captured}" + assert outcomes.count("in_flight") == 1 + + +def test_sequential_calls_for_same_id_both_succeed(tmp_path): + bq = _slow_bq(stall_seconds=0.05) + + out_dir = str(tmp_path) + r1 = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + r2 = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + assert r1["rows"] == 42 + assert r2["rows"] == 42 + + +def test_different_ids_run_in_parallel(tmp_path): + bq = _slow_bq(stall_seconds=1.0) + out_dir = str(tmp_path) + captured: list = [] + + def runner(tid): + try: + r = materialize_query( + table_id=tid, sql="SELECT 1", + bq=bq, output_dir=out_dir, max_bytes=None, + ) + captured.append((tid, r["rows"])) + except Exception as e: + captured.append((tid, "ERROR")) + + threads = [threading.Thread(target=runner, args=(f"tab_{i}",)) for i in range(3)] + start = time.time() + for t in threads: t.start() + for t in threads: t.join() + elapsed = time.time() - start + # If they were serialized, would take >= 3s. Parallel: ~1s. + assert elapsed < 2.0, f"expected parallel, elapsed={elapsed:.2f}s" + assert len(captured) == 3 + assert all(c[1] == 42 for c in captured) + + +def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch): + """Force a stale .lock file (mtime older than TTL) and verify a new + call reclaims it instead of raising MaterializeInFlightError.""" + bq = _slow_bq(stall_seconds=0.05) + lock_path = Path(tmp_path) / "data" / "t1.parquet.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + lock_path.write_text("") + + # Set mtime to 25h ago (> default 24h TTL). + old_ts = time.time() - 25 * 3600 + os.utime(lock_path, (old_ts, old_ts)) + + r = materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=str(tmp_path), max_bytes=None, + ) + assert r["rows"] == 42 + + +def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch): + """Force a fresh .lock file (mtime within TTL) and verify a new + call raises rather than reclaims.""" + bq = _slow_bq(stall_seconds=0.05) + lock_path = Path(tmp_path) / "data" / "t1.parquet.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + + # Open the lock file and HOLD a fcntl exclusive lock so the materialize + # call's flock(LOCK_NB) sees a real conflicting lock — relying on + # mtime-only would let the test pass even if flock acquisition was + # broken. + import fcntl + holder = open(lock_path, "w") + fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + try: + with pytest.raises(MaterializeInFlightError): + materialize_query( + table_id="t1", sql="SELECT 1", + bq=bq, output_dir=str(tmp_path), max_bytes=None, + ) + finally: + fcntl.flock(holder.fileno(), fcntl.LOCK_UN) + holder.close() + + +def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch): + """When `materialize.lock_ttl_seconds` is set in instance.yaml, that + value overrides the default.""" + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"), + ) + + from connectors.bigquery.extractor import _get_lock_ttl_seconds + assert _get_lock_ttl_seconds() == 60