feat(bq-materialize): per-table mutex + file lock with TTL reclaim
Two layers of concurrency control. Layer 1 is a per-table_id threading.Lock keyed on table_id; Layer 2 is fcntl.flock on a sibling <id>.parquet.lock file. Overlapping calls for the same id raise MaterializeInFlightError, which the caller treats as 'skipped, in_flight' instead of a hard error. Stale file locks (mtime older than materialize.lock_ttl_seconds, default 86400) are reclaimed on the next attempt — covers the rare case where a holder was hard-killed before kernel-level flock release. Pre-fix, when a materialize ran longer than the scheduler tick interval (15 min), the next tick called materialize_query for the same id, hit the unconditional tmp_path.unlink() at function entry, and started a second COPY against the same path. Both writers interleaved bytes; the original COPY's read_parquet validation then failed with 'No magic bytes found at end of file'.
This commit is contained in:
parent
a2afcfe59a
commit
16eaf7a399
2 changed files with 410 additions and 81 deletions
|
|
@ -3,12 +3,14 @@
|
||||||
No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH.
|
No data is downloaded. All queries go directly to BigQuery via DuckDB extension ATTACH.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import fcntl
|
||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Dict, Any, Optional
|
from typing import List, Dict, Any, Optional
|
||||||
|
|
@ -23,6 +25,109 @@ import duckdb
|
||||||
# not the per-source extract-file write, so we need a dedicated lock here.
|
# not the per-source extract-file write, so we need a dedicated lock here.
|
||||||
_INIT_EXTRACT_LOCK = threading.Lock()
|
_INIT_EXTRACT_LOCK = threading.Lock()
|
||||||
|
|
||||||
|
_LOCK_TTL_DEFAULT_SECONDS: int = 86400 # 24h — overridable via materialize.lock_ttl_seconds
|
||||||
|
|
||||||
|
|
||||||
|
class MaterializeInFlightError(Exception):
|
||||||
|
"""Raised when a per-table_id materialize is already running.
|
||||||
|
|
||||||
|
Caller (`_run_materialized_pass`) should treat this as a 'skipped,
|
||||||
|
in-flight' outcome — the in-flight worker will finish and write
|
||||||
|
sync_state on its own. Critically, this is NOT an error condition;
|
||||||
|
`state.set_error` MUST NOT be called for this exception or the
|
||||||
|
registry would surface a false-positive failure to the operator
|
||||||
|
every overlap."""
|
||||||
|
|
||||||
|
def __init__(self, table_id: str, layer: str = "process"):
|
||||||
|
self.table_id = table_id
|
||||||
|
self.layer = layer
|
||||||
|
super().__init__(
|
||||||
|
f"materialize for {table_id!r} already in flight ({layer} lock held)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_table_locks: dict[str, threading.Lock] = {}
|
||||||
|
_table_locks_registry: threading.Lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_table_lock(table_id: str) -> threading.Lock:
|
||||||
|
"""Return the process-wide mutex for a given table_id, creating it
|
||||||
|
on first reference. The registry mutex serializes the dict mutation
|
||||||
|
only — once the per-id Lock is returned, contention between callers
|
||||||
|
happens on that lock alone."""
|
||||||
|
with _table_locks_registry:
|
||||||
|
lock = _table_locks.get(table_id)
|
||||||
|
if lock is None:
|
||||||
|
lock = threading.Lock()
|
||||||
|
_table_locks[table_id] = lock
|
||||||
|
return lock
|
||||||
|
|
||||||
|
|
||||||
|
def _get_lock_ttl_seconds() -> int:
|
||||||
|
"""Read the configured stale-lock TTL with fallback to the default.
|
||||||
|
|
||||||
|
Operator override lives at instance.yaml `materialize.lock_ttl_seconds`
|
||||||
|
(also editable via /admin/server-config). Default 86400 s = 24 h
|
||||||
|
matches the upper bound of any healthy BQ COPY in practice — anything
|
||||||
|
longer is a stuck process or a hung BQ session, both of which warrant
|
||||||
|
reclaim on next attempt."""
|
||||||
|
try:
|
||||||
|
from app.instance_config import get_value
|
||||||
|
v = get_value(
|
||||||
|
"materialize", "lock_ttl_seconds",
|
||||||
|
default=_LOCK_TTL_DEFAULT_SECONDS,
|
||||||
|
)
|
||||||
|
n = int(v) if v is not None else _LOCK_TTL_DEFAULT_SECONDS
|
||||||
|
return n if n > 0 else _LOCK_TTL_DEFAULT_SECONDS
|
||||||
|
except Exception:
|
||||||
|
return _LOCK_TTL_DEFAULT_SECONDS
|
||||||
|
|
||||||
|
|
||||||
|
def _try_acquire_file_lock(lock_path: Path):
|
||||||
|
"""Try to acquire an advisory exclusive flock on `lock_path`. Returns
|
||||||
|
the open file object on success (caller must close to release); None
|
||||||
|
on conflict.
|
||||||
|
|
||||||
|
Stale-lock reclaim: if the lock_path exists and its mtime is older
|
||||||
|
than the configured TTL, log a warning and unlink before retrying.
|
||||||
|
A live holder still wins the second flock attempt (kernel-level
|
||||||
|
flock isn't tied to mtime), so the reclaim doesn't break correctness
|
||||||
|
— it just unblocks the case where a holder process was hard-killed
|
||||||
|
before the kernel released the lock."""
|
||||||
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def _try_open_and_flock():
|
||||||
|
f = open(lock_path, "w")
|
||||||
|
try:
|
||||||
|
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
return f
|
||||||
|
except BlockingIOError:
|
||||||
|
f.close()
|
||||||
|
return None
|
||||||
|
|
||||||
|
holder = _try_open_and_flock()
|
||||||
|
if holder is not None:
|
||||||
|
return holder
|
||||||
|
|
||||||
|
# Conflict. If the file is older than TTL, reclaim and retry once.
|
||||||
|
try:
|
||||||
|
age = time.time() - lock_path.stat().st_mtime
|
||||||
|
except FileNotFoundError:
|
||||||
|
return _try_open_and_flock()
|
||||||
|
|
||||||
|
if age > _get_lock_ttl_seconds():
|
||||||
|
logger.warning(
|
||||||
|
"Reclaiming stale materialize lock at %s (age %.1fs > TTL)",
|
||||||
|
lock_path, age,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
lock_path.unlink()
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
return _try_open_and_flock()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError
|
from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError
|
||||||
from app.instance_config import get_value
|
from app.instance_config import get_value
|
||||||
from src.sql_safe import (
|
from src.sql_safe import (
|
||||||
|
|
@ -395,6 +500,13 @@ def materialize_query(
|
||||||
Atomic write: result lands in `<id>.parquet.tmp` first, then
|
Atomic write: result lands in `<id>.parquet.tmp` first, then
|
||||||
`os.replace` swaps it in. A failed COPY leaves no partial file behind.
|
`os.replace` swaps it in. A failed COPY leaves no partial file behind.
|
||||||
|
|
||||||
|
Concurrency: per-``table_id`` in-process mutex + advisory file lock
|
||||||
|
on ``<table_id>.parquet.lock``. Overlapping calls for the same id
|
||||||
|
raise ``MaterializeInFlightError`` immediately so the caller can
|
||||||
|
skip cleanly without consuming the COPY budget twice. Stale file
|
||||||
|
locks (mtime > ``materialize.lock_ttl_seconds``, default 24 h) are
|
||||||
|
reclaimed automatically.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
table_id: Logical id from table_registry; becomes the parquet
|
table_id: Logical id from table_registry; becomes the parquet
|
||||||
filename. Must pass `validate_identifier()` so it can't
|
filename. Must pass `validate_identifier()` so it can't
|
||||||
|
|
@ -414,6 +526,8 @@ def materialize_query(
|
||||||
Raises:
|
Raises:
|
||||||
ValueError: if `table_id` is unsafe or `bq.projects.billing` fails
|
ValueError: if `table_id` is unsafe or `bq.projects.billing` fails
|
||||||
the GCP project_id grammar check.
|
the GCP project_id grammar check.
|
||||||
|
MaterializeInFlightError: if a concurrent call for the same table_id
|
||||||
|
is already in progress (in-process or cross-process).
|
||||||
MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it.
|
MaterializeBudgetError: if `max_bytes > 0` and dry-run estimate exceeds it.
|
||||||
BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing /
|
BqAccessError: from `bq.duckdb_session()` (auth_failed / bq_lib_missing /
|
||||||
not_configured) — caller catches and aggregates into the trigger
|
not_configured) — caller catches and aggregates into the trigger
|
||||||
|
|
@ -429,95 +543,114 @@ def materialize_query(
|
||||||
|
|
||||||
parquet_path = data_dir / f"{table_id}.parquet"
|
parquet_path = data_dir / f"{table_id}.parquet"
|
||||||
tmp_path = data_dir / f"{table_id}.parquet.tmp"
|
tmp_path = data_dir / f"{table_id}.parquet.tmp"
|
||||||
if tmp_path.exists():
|
lock_path = data_dir / f"{table_id}.parquet.lock"
|
||||||
tmp_path.unlink()
|
|
||||||
|
|
||||||
# Build the wrapped SQL once — both the cost guardrail dry-run and
|
proc_lock = _get_table_lock(table_id)
|
||||||
# the COPY operate on `sql` (the inner BQ SQL); only the COPY needs
|
if not proc_lock.acquire(blocking=False):
|
||||||
# the DuckDB-side bigquery_query() envelope.
|
raise MaterializeInFlightError(table_id, layer="process")
|
||||||
billing_project = bq.projects.billing
|
try:
|
||||||
wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql)
|
file_lock = _try_acquire_file_lock(lock_path)
|
||||||
|
if file_lock is None:
|
||||||
if max_bytes is not None and max_bytes > 0:
|
raise MaterializeInFlightError(table_id, layer="file")
|
||||||
try:
|
try:
|
||||||
from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl
|
|
||||||
estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
"BQ dry-run failed for materialize cost guardrail (fail-open): %s. "
|
|
||||||
"Proceeding with COPY against `bigquery_query()` wrapping.",
|
|
||||||
e,
|
|
||||||
)
|
|
||||||
estimated = 0
|
|
||||||
if estimated > max_bytes:
|
|
||||||
raise MaterializeBudgetError(
|
|
||||||
f"dry-run estimate {estimated:,} bytes exceeds cap "
|
|
||||||
f"{max_bytes:,} for table {table_id!r}",
|
|
||||||
table_id=table_id,
|
|
||||||
current=estimated,
|
|
||||||
limit=max_bytes,
|
|
||||||
)
|
|
||||||
|
|
||||||
# COPY through a BqAccess-managed session. The session has the BQ
|
|
||||||
# extension loaded with a SECRET token; bigquery_query() reuses that
|
|
||||||
# auth path against the billing_project for the jobs API call.
|
|
||||||
with bq.duckdb_session() as conn:
|
|
||||||
attached = {
|
|
||||||
r[0] for r in conn.execute(
|
|
||||||
"SELECT database_name FROM duckdb_databases()"
|
|
||||||
).fetchall()
|
|
||||||
}
|
|
||||||
if "bq" not in attached:
|
|
||||||
conn.execute(
|
|
||||||
f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
safe_path = _escape_sql_string_literal(str(tmp_path))
|
|
||||||
conn.execute(
|
|
||||||
f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)"
|
|
||||||
)
|
|
||||||
rows = conn.execute(
|
|
||||||
f"SELECT count(*) FROM read_parquet('{safe_path}')"
|
|
||||||
).fetchone()[0]
|
|
||||||
except Exception:
|
|
||||||
if tmp_path.exists():
|
if tmp_path.exists():
|
||||||
tmp_path.unlink()
|
tmp_path.unlink()
|
||||||
raise
|
|
||||||
|
|
||||||
# Compute the parquet hash inline before the atomic swap. The caller used
|
# Build the wrapped SQL once — both the cost guardrail dry-run and
|
||||||
# to re-read the file in `_run_materialized_pass` to hash it via
|
# the COPY operate on `sql` (the inner BQ SQL); only the COPY needs
|
||||||
# `_file_hash`, but that's a synchronous full-read on the FastAPI worker
|
# the DuckDB-side bigquery_query() envelope.
|
||||||
# thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other
|
billing_project = bq.projects.billing
|
||||||
# requests. Hashing here keeps the open-file handle hot from the COPY
|
wrapped_sql = _wrap_admin_sql_for_jobs_api(billing_project, sql)
|
||||||
# round and removes the second read. Devil's-advocate review item.
|
|
||||||
h = hashlib.md5()
|
|
||||||
with open(tmp_path, "rb") as f:
|
|
||||||
for chunk in iter(lambda: f.read(8192), b""):
|
|
||||||
h.update(chunk)
|
|
||||||
parquet_hash = h.hexdigest()
|
|
||||||
|
|
||||||
size_bytes = tmp_path.stat().st_size
|
if max_bytes is not None and max_bytes > 0:
|
||||||
os.replace(tmp_path, parquet_path)
|
try:
|
||||||
|
from app.api.v2_scan import _bq_dry_run_bytes # reuse main's impl
|
||||||
|
estimated = _bq_dry_run_bytes(bq, sql) # NB: pass inner SQL (BQ-native)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"BQ dry-run failed for materialize cost guardrail (fail-open): %s. "
|
||||||
|
"Proceeding with COPY against `bigquery_query()` wrapping.",
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
estimated = 0
|
||||||
|
if estimated > max_bytes:
|
||||||
|
raise MaterializeBudgetError(
|
||||||
|
f"dry-run estimate {estimated:,} bytes exceeds cap "
|
||||||
|
f"{max_bytes:,} for table {table_id!r}",
|
||||||
|
table_id=table_id,
|
||||||
|
current=estimated,
|
||||||
|
limit=max_bytes,
|
||||||
|
)
|
||||||
|
|
||||||
rows = int(rows)
|
# COPY through a BqAccess-managed session. The session has the BQ
|
||||||
if rows == 0:
|
# extension loaded with a SECRET token; bigquery_query() reuses that
|
||||||
# 0 rows is indistinguishable from "the SQL is wrong and nobody
|
# auth path against the billing_project for the jobs API call.
|
||||||
# noticed" — surface it loudly so operators see it in the scheduler
|
with bq.duckdb_session() as conn:
|
||||||
# log line and the per-row error aggregation. Caller decides whether
|
attached = {
|
||||||
# to alert.
|
r[0] for r in conn.execute(
|
||||||
logger.warning(
|
"SELECT database_name FROM duckdb_databases()"
|
||||||
"Materialized %s produced 0 rows — verify the SQL filter is "
|
).fetchall()
|
||||||
"intentional. Parquet written: %s",
|
}
|
||||||
table_id, parquet_path,
|
if "bq" not in attached:
|
||||||
)
|
conn.execute(
|
||||||
|
f"ATTACH 'project={bq.projects.data}' AS bq (TYPE bigquery, READ_ONLY)"
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
try:
|
||||||
"rows": rows,
|
safe_path = _escape_sql_string_literal(str(tmp_path))
|
||||||
"size_bytes": size_bytes,
|
conn.execute(
|
||||||
"query_mode": "materialized",
|
f"COPY ({wrapped_sql}) TO '{safe_path}' (FORMAT PARQUET)"
|
||||||
"hash": parquet_hash,
|
)
|
||||||
}
|
rows = conn.execute(
|
||||||
|
f"SELECT count(*) FROM read_parquet('{safe_path}')"
|
||||||
|
).fetchone()[0]
|
||||||
|
except Exception:
|
||||||
|
if tmp_path.exists():
|
||||||
|
tmp_path.unlink()
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Compute the parquet hash inline before the atomic swap. The caller used
|
||||||
|
# to re-read the file in `_run_materialized_pass` to hash it via
|
||||||
|
# `_file_hash`, but that's a synchronous full-read on the FastAPI worker
|
||||||
|
# thread — a 10 GiB parquet means 50+ seconds of disk I/O blocking other
|
||||||
|
# requests. Hashing here keeps the open-file handle hot from the COPY
|
||||||
|
# round and removes the second read. Devil's-advocate review item.
|
||||||
|
h = hashlib.md5()
|
||||||
|
with open(tmp_path, "rb") as f:
|
||||||
|
for chunk in iter(lambda: f.read(8192), b""):
|
||||||
|
h.update(chunk)
|
||||||
|
parquet_hash = h.hexdigest()
|
||||||
|
|
||||||
|
size_bytes = tmp_path.stat().st_size
|
||||||
|
os.replace(tmp_path, parquet_path)
|
||||||
|
|
||||||
|
rows = int(rows)
|
||||||
|
if rows == 0:
|
||||||
|
# 0 rows is indistinguishable from "the SQL is wrong and nobody
|
||||||
|
# noticed" — surface it loudly so operators see it in the scheduler
|
||||||
|
# log line and the per-row error aggregation. Caller decides whether
|
||||||
|
# to alert.
|
||||||
|
logger.warning(
|
||||||
|
"Materialized %s produced 0 rows — verify the SQL filter is "
|
||||||
|
"intentional. Parquet written: %s",
|
||||||
|
table_id, parquet_path,
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"rows": rows,
|
||||||
|
"size_bytes": size_bytes,
|
||||||
|
"query_mode": "materialized",
|
||||||
|
"hash": parquet_hash,
|
||||||
|
}
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
file_lock.close() # releases flock
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# Don't unlink lock_path — its mtime is the TTL signal for
|
||||||
|
# the next reclaim. Leaving it in place is intentional.
|
||||||
|
finally:
|
||||||
|
proc_lock.release()
|
||||||
|
|
||||||
|
|
||||||
def _resolve_bq_project_id() -> str:
|
def _resolve_bq_project_id() -> str:
|
||||||
|
|
|
||||||
196
tests/test_bq_materialize_concurrency.py
Normal file
196
tests/test_bq_materialize_concurrency.py
Normal file
|
|
@ -0,0 +1,196 @@
|
||||||
|
"""Per-table_id concurrency: in-process mutex + advisory file lock with
|
||||||
|
TTL reclaim. Two overlapping materialize_query calls for the same id
|
||||||
|
must NOT corrupt each other's parquet."""
|
||||||
|
from __future__ import annotations
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from connectors.bigquery.extractor import (
|
||||||
|
materialize_query,
|
||||||
|
MaterializeInFlightError,
|
||||||
|
_get_table_lock,
|
||||||
|
_LOCK_TTL_DEFAULT_SECONDS,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def reset_locks(monkeypatch):
|
||||||
|
# Tests must not share lock state across runs.
|
||||||
|
import connectors.bigquery.extractor as mod
|
||||||
|
monkeypatch.setattr(mod, "_table_locks", {})
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
def _slow_bq(stall_seconds: float = 1.0):
|
||||||
|
"""Build a fake BqAccess whose duckdb_session COPY blocks for
|
||||||
|
`stall_seconds` so we can race a second call against it."""
|
||||||
|
bq = MagicMock()
|
||||||
|
bq.projects.billing = "prj-billing"
|
||||||
|
bq.projects.data = "prj-data"
|
||||||
|
|
||||||
|
class _Session:
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
def __exit__(self, *a):
|
||||||
|
return False
|
||||||
|
def execute(self, sql):
|
||||||
|
if sql.startswith("SELECT database_name"):
|
||||||
|
class _R:
|
||||||
|
def fetchall(self):
|
||||||
|
return [("memory",)]
|
||||||
|
return _R()
|
||||||
|
if sql.startswith("ATTACH"):
|
||||||
|
return MagicMock()
|
||||||
|
if sql.startswith("COPY"):
|
||||||
|
# Simulate a long-running COPY by writing a stub parquet
|
||||||
|
# then sleeping so a second call can race us.
|
||||||
|
# Extract the path from the COPY statement.
|
||||||
|
import re
|
||||||
|
m = re.search(r"TO '([^']+)'", sql)
|
||||||
|
assert m
|
||||||
|
Path(m.group(1)).write_bytes(b"PARQUET_STUB_HEADER" + b"\x00" * 200)
|
||||||
|
time.sleep(stall_seconds)
|
||||||
|
return MagicMock()
|
||||||
|
if sql.startswith("SELECT count"):
|
||||||
|
class _R:
|
||||||
|
def fetchone(self):
|
||||||
|
return (42,)
|
||||||
|
return _R()
|
||||||
|
return MagicMock()
|
||||||
|
|
||||||
|
bq.duckdb_session.return_value = _Session()
|
||||||
|
return bq
|
||||||
|
|
||||||
|
|
||||||
|
def test_concurrent_calls_for_same_id_raise_in_flight(tmp_path):
|
||||||
|
bq = _slow_bq(stall_seconds=2.0)
|
||||||
|
|
||||||
|
out_dir = str(tmp_path)
|
||||||
|
captured: list = []
|
||||||
|
|
||||||
|
def runner(tag):
|
||||||
|
try:
|
||||||
|
r = materialize_query(
|
||||||
|
table_id="t1", sql="SELECT 1",
|
||||||
|
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||||
|
)
|
||||||
|
captured.append(("ok", tag, r))
|
||||||
|
except MaterializeInFlightError as e:
|
||||||
|
captured.append(("in_flight", tag, str(e)))
|
||||||
|
except Exception as e:
|
||||||
|
captured.append(("err", tag, str(e)))
|
||||||
|
|
||||||
|
t1 = threading.Thread(target=runner, args=("first",))
|
||||||
|
t2 = threading.Thread(target=runner, args=("second",))
|
||||||
|
t1.start()
|
||||||
|
time.sleep(0.2) # let t1 acquire the lock
|
||||||
|
t2.start()
|
||||||
|
t1.join()
|
||||||
|
t2.join()
|
||||||
|
|
||||||
|
outcomes = [c[0] for c in captured]
|
||||||
|
assert outcomes.count("ok") == 1, f"expected exactly one success, got {captured}"
|
||||||
|
assert outcomes.count("in_flight") == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_sequential_calls_for_same_id_both_succeed(tmp_path):
|
||||||
|
bq = _slow_bq(stall_seconds=0.05)
|
||||||
|
|
||||||
|
out_dir = str(tmp_path)
|
||||||
|
r1 = materialize_query(
|
||||||
|
table_id="t1", sql="SELECT 1",
|
||||||
|
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||||
|
)
|
||||||
|
r2 = materialize_query(
|
||||||
|
table_id="t1", sql="SELECT 1",
|
||||||
|
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||||
|
)
|
||||||
|
assert r1["rows"] == 42
|
||||||
|
assert r2["rows"] == 42
|
||||||
|
|
||||||
|
|
||||||
|
def test_different_ids_run_in_parallel(tmp_path):
|
||||||
|
bq = _slow_bq(stall_seconds=1.0)
|
||||||
|
out_dir = str(tmp_path)
|
||||||
|
captured: list = []
|
||||||
|
|
||||||
|
def runner(tid):
|
||||||
|
try:
|
||||||
|
r = materialize_query(
|
||||||
|
table_id=tid, sql="SELECT 1",
|
||||||
|
bq=bq, output_dir=out_dir, max_bytes=None,
|
||||||
|
)
|
||||||
|
captured.append((tid, r["rows"]))
|
||||||
|
except Exception as e:
|
||||||
|
captured.append((tid, "ERROR"))
|
||||||
|
|
||||||
|
threads = [threading.Thread(target=runner, args=(f"tab_{i}",)) for i in range(3)]
|
||||||
|
start = time.time()
|
||||||
|
for t in threads: t.start()
|
||||||
|
for t in threads: t.join()
|
||||||
|
elapsed = time.time() - start
|
||||||
|
# If they were serialized, would take >= 3s. Parallel: ~1s.
|
||||||
|
assert elapsed < 2.0, f"expected parallel, elapsed={elapsed:.2f}s"
|
||||||
|
assert len(captured) == 3
|
||||||
|
assert all(c[1] == 42 for c in captured)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch):
|
||||||
|
"""Force a stale .lock file (mtime older than TTL) and verify a new
|
||||||
|
call reclaims it instead of raising MaterializeInFlightError."""
|
||||||
|
bq = _slow_bq(stall_seconds=0.05)
|
||||||
|
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
|
||||||
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
lock_path.write_text("")
|
||||||
|
|
||||||
|
# Set mtime to 25h ago (> default 24h TTL).
|
||||||
|
old_ts = time.time() - 25 * 3600
|
||||||
|
os.utime(lock_path, (old_ts, old_ts))
|
||||||
|
|
||||||
|
r = materialize_query(
|
||||||
|
table_id="t1", sql="SELECT 1",
|
||||||
|
bq=bq, output_dir=str(tmp_path), max_bytes=None,
|
||||||
|
)
|
||||||
|
assert r["rows"] == 42
|
||||||
|
|
||||||
|
|
||||||
|
def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch):
|
||||||
|
"""Force a fresh .lock file (mtime within TTL) and verify a new
|
||||||
|
call raises rather than reclaims."""
|
||||||
|
bq = _slow_bq(stall_seconds=0.05)
|
||||||
|
lock_path = Path(tmp_path) / "data" / "t1.parquet.lock"
|
||||||
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Open the lock file and HOLD a fcntl exclusive lock so the materialize
|
||||||
|
# call's flock(LOCK_NB) sees a real conflicting lock — relying on
|
||||||
|
# mtime-only would let the test pass even if flock acquisition was
|
||||||
|
# broken.
|
||||||
|
import fcntl
|
||||||
|
holder = open(lock_path, "w")
|
||||||
|
fcntl.flock(holder.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
try:
|
||||||
|
with pytest.raises(MaterializeInFlightError):
|
||||||
|
materialize_query(
|
||||||
|
table_id="t1", sql="SELECT 1",
|
||||||
|
bq=bq, output_dir=str(tmp_path), max_bytes=None,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
fcntl.flock(holder.fileno(), fcntl.LOCK_UN)
|
||||||
|
holder.close()
|
||||||
|
|
||||||
|
|
||||||
|
def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch):
|
||||||
|
"""When `materialize.lock_ttl_seconds` is set in instance.yaml, that
|
||||||
|
value overrides the default."""
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"app.instance_config.get_value",
|
||||||
|
lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"),
|
||||||
|
)
|
||||||
|
|
||||||
|
from connectors.bigquery.extractor import _get_lock_ttl_seconds
|
||||||
|
assert _get_lock_ttl_seconds() == 60
|
||||||
Loading…
Reference in a new issue