diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index 7b158a0..44ef386 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -17,6 +17,15 @@ from typing import List, Dict, Any, Optional import duckdb +from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError +from src.sql_safe import ( + validate_identifier as _validate_identifier, + validate_project_id as _validate_project_id, +) +from src.identifier_validation import validate_identifier, validate_quoted_identifier + +logger = logging.getLogger(__name__) + # Serializes the body of `init_extract` across threads so two concurrent # materialize calls (e.g. the synchronous timeout-fallback BackgroundTask # kicking in while the original daemon thread is still running) can't both @@ -46,6 +55,10 @@ class MaterializeInFlightError(Exception): ) +# Unbounded by design — each registered table_id gets one Lock for the +# process lifetime. Per-Lock cost is ~56 bytes; a deployment with even +# 10k registered tables holds <1 MB. No cleanup logic — clean would +# need ref-counting and risks freeing a Lock currently held by a worker. _table_locks: dict[str, threading.Lock] = {} _table_locks_registry: threading.Lock = threading.Lock() @@ -72,6 +85,9 @@ def _get_lock_ttl_seconds() -> int: longer is a stuck process or a hung BQ session, both of which warrant reclaim on next attempt.""" try: + # Deferred import: keeps the connectors module importable in + # contexts where the app layer isn't bootstrapped (e.g. unit tests + # that exercise extractor helpers without the FastAPI app). from app.instance_config import get_value v = get_value( "materialize", "lock_ttl_seconds", @@ -97,13 +113,25 @@ def _try_acquire_file_lock(lock_path: Path): lock_path.parent.mkdir(parents=True, exist_ok=True) def _try_open_and_flock(): + # Open in 'w' mode so the file's mtime updates on every successful + # acquisition — the mtime is the TTL signal for the next caller. + # Content is intentionally empty; the fd exists only to anchor flock. f = open(lock_path, "w") try: fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) return f except BlockingIOError: + # Another holder owns the lock — return None so the caller can + # decide between TTL-reclaim and propagating MaterializeInFlightError. f.close() return None + except OSError: + # Anything else (read-only fs, unsupported, fd exhaustion) is a + # platform / config error, not a contention signal. Close the fd + # and re-raise so the caller (and operator) sees the real failure + # instead of a silent leak. + f.close() + raise holder = _try_open_and_flock() if holder is not None: @@ -128,16 +156,6 @@ def _try_acquire_file_lock(lock_path: Path): return None -from connectors.bigquery.auth import get_metadata_token, BQMetadataAuthError -from app.instance_config import get_value -from src.sql_safe import ( - validate_identifier as _validate_identifier, - validate_project_id as _validate_project_id, -) -from src.identifier_validation import validate_identifier, validate_quoted_identifier - -logger = logging.getLogger(__name__) - def _detect_table_type( conn: duckdb.DuckDBPyConnection, diff --git a/tests/test_bq_materialize_concurrency.py b/tests/test_bq_materialize_concurrency.py index 372600e..815661c 100644 --- a/tests/test_bq_materialize_concurrency.py +++ b/tests/test_bq_materialize_concurrency.py @@ -141,8 +141,11 @@ def test_different_ids_run_in_parallel(tmp_path): def test_stale_file_lock_is_reclaimed_after_ttl(tmp_path, monkeypatch): - """Force a stale .lock file (mtime older than TTL) and verify a new - call reclaims it instead of raising MaterializeInFlightError.""" + """Verify a stale, unheld .lock file (old mtime, no live flock holder) does NOT + cause `MaterializeInFlightError`. The reclaim branch in `_try_acquire_file_lock` + is technically not reached here (the first `_try_open_and_flock` succeeds because + nobody holds the lock), but exercising the in-flight-by-mtime-only mistake is what + this test guards against.""" bq = _slow_bq(stall_seconds=0.05) lock_path = Path(tmp_path) / "data" / "t1.parquet.lock" lock_path.parent.mkdir(parents=True, exist_ok=True) @@ -187,6 +190,11 @@ def test_fresh_file_lock_blocks_with_in_flight_error(tmp_path, monkeypatch): def test_lock_ttl_reads_from_instance_config(tmp_path, monkeypatch): """When `materialize.lock_ttl_seconds` is set in instance.yaml, that value overrides the default.""" + # Patches `app.instance_config.get_value` directly. This works because + # `_get_lock_ttl_seconds` re-imports `get_value` on every call (see + # extractor.py for the deferred-import rationale). If a future change + # hoists the import to module-level, this patch must change to target + # `connectors.bigquery.extractor.get_value` instead. monkeypatch.setattr( "app.instance_config.get_value", lambda *args, **kw: 60 if args == ("materialize", "lock_ttl_seconds") else kw.get("default"),