From 6c0846fd171c0034c13461337724c3d64dccddf8 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 18:52:54 +0200 Subject: [PATCH] feat(config): expose materialize.lock_ttl_seconds in server-config New top-level 'materialize' section, single field (lock_ttl_seconds). Default 86400 (24h). Backs the file-lock TTL reclaim added in the per-table-mutex change. Editable via PUT /api/admin/server-config and the /admin/server-config UI. --- app/api/admin.py | 53 ++++++ app/web/templates/admin_server_config.html | 4 + config/instance.yaml.example | 16 ++ ...admin_server_config_materialize_section.py | 179 ++++++++++++++++++ 4 files changed, 252 insertions(+) create mode 100644 tests/test_admin_server_config_materialize_section.py diff --git a/app/api/admin.py b/app/api/admin.py index 24cce49..427f9bd 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -146,6 +146,38 @@ def _validate_urls_in_patch(sections: Dict[str, Dict[str, Any]]) -> None: _validate_url_not_private(value, field_name=".".join(path)) +_LOCK_TTL_MIN = 60 +_LOCK_TTL_MAX = 7 * 24 * 3600 # 604800 — one week + + +def _validate_materialize_section(sections: Dict[str, Dict[str, Any]]) -> None: + """Validate the materialize section patch when present. + + Checks field-level constraints that the Pydantic envelope can't enforce + (it only validates the outer shape, not nested leaf values). + """ + mat = sections.get("materialize") + if not isinstance(mat, dict): + return + ttl = mat.get("lock_ttl_seconds") + if ttl is None: + return + if not isinstance(ttl, int) or isinstance(ttl, bool): + raise HTTPException( + status_code=422, + detail="materialize.lock_ttl_seconds must be an integer", + ) + if ttl < _LOCK_TTL_MIN or ttl > _LOCK_TTL_MAX: + raise HTTPException( + status_code=422, + detail=( + f"materialize.lock_ttl_seconds must be between " + f"{_LOCK_TTL_MIN} and {_LOCK_TTL_MAX} " + f"(got {ttl})" + ), + ) + + # --- Server-config (instance.yaml) editor ----------------------------------- # # The /admin/server-config UI POSTs a partial dict here keyed by section @@ -175,6 +207,7 @@ _EDITABLE_SECTIONS: tuple[str, ...] = ( "openmetadata", "desktop", "corporate_memory", + "materialize", ) # "Danger-zone" sections — flipping these can lock operators out (auth.*) or @@ -585,6 +618,23 @@ _KNOWN_FIELDS: dict[str, dict[str, dict]] = { ), }, }, + # materialize — file-lock TTL for the concurrent-materialize safety net. + # A single field; more knobs may follow as the feature matures. + "materialize": { + "lock_ttl_seconds": { + "kind": "int", + "default": 86400, + "hint": ( + "How long (seconds) before a stale materialize lock file is " + "reclaimed. The lock is a .parquet.lock sibling file; if the " + "holder process is hard-killed, the next attempt reclaims the " + "lock once the file's mtime is older than this TTL. " + "Default 86400 (24 h). Min 60, max 604800 (7 days). " + "Lower only if you know materializes never exceed the new value " + "and your host regularly hard-kills processes." + ), + }, + }, } # Keys whose values must be redacted from the audit diff. We match @@ -913,6 +963,9 @@ async def update_server_config( # the per-section patch (e.g. data_source.keboola.stack_url). _validate_urls_in_patch(request.sections) + # Field-level constraints for sections whose values have documented ranges. + _validate_materialize_section(request.sections) + # Defense-in-depth: scrub redaction sentinels (`***` / ``) out of # secret-keyed leaves in the patch before they reach the deep-merge. # The client form does the same scrub, but an API caller round-tripping diff --git a/app/web/templates/admin_server_config.html b/app/web/templates/admin_server_config.html index a3efa09..1a11615 100644 --- a/app/web/templates/admin_server_config.html +++ b/app/web/templates/admin_server_config.html @@ -218,6 +218,10 @@ const SECTION_META = { title: "Corporate Memory", help: "Optional governance for AI-extracted knowledge. When the section is unset, the system runs in legacy democratic-wiki mode with no admin review.", }, + materialize: { + title: "Materialize", + help: "Concurrency safety net for the materialize path. Controls the file-lock TTL used to detect and reclaim stale locks from hard-killed processes.", + }, }; const DANGER_SECTIONS = new Set(["auth", "server"]); diff --git a/config/instance.yaml.example b/config/instance.yaml.example index 9cae9b2..ec178e6 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -403,3 +403,19 @@ catalog: # schema_cache_ttl_seconds: 3600 # /api/v2/schema/{table_id} cache lifetime (default: 1 h) # sample_cache_ttl_seconds: 3600 # /api/v2/sample/{table_id} cache lifetime (default: 1 h) # # Admins can force-refresh via POST /api/v2/sample/{id}?refresh=true + +# --- Materialize concurrency safety (optional) --- +# Concurrency safety net for the materialize path (BQ + Keboola). When +# two materialize attempts race for the same table_id, the second one +# raises MaterializeInFlightError and skips. The lock is held in a +# .parquet.lock sibling file; if a holder process is hard-killed before +# kernel-level flock release, the next attempt reclaims the lock once +# the file's mtime is older than this TTL. +# +# Default 86400 (24h) is generous on purpose — anything shorter risks +# a long-running COPY being interrupted by its own scheduler successor. +# Lower it only if you know your materialize never exceeds the new +# value AND your host has a habit of hard-killing processes. +# Min 60 (1 minute), max 604800 (7 days). Configurable via /admin/server-config UI. +# materialize: +# lock_ttl_seconds: 86400 diff --git a/tests/test_admin_server_config_materialize_section.py b/tests/test_admin_server_config_materialize_section.py new file mode 100644 index 0000000..9d115c9 --- /dev/null +++ b/tests/test_admin_server_config_materialize_section.py @@ -0,0 +1,179 @@ +"""/api/admin/server-config exposes materialize.lock_ttl_seconds and +accepts updates. Default is 86400 (24h). + +Fixture `seeded_app` is auto-discovered from `tests/conftest.py` — +DO NOT import. It returns a dict: `{"client": TestClient, +"admin_token": str, ...}`. Auth helper `_auth(token)` mirrors the +project's local pattern (also used in test_api_admin_materialized.py). + +Behaviour contract: + - GET returns `materialize` section in `sections` (empty dict when no + override is set, since the endpoint surfaces every editable section). + - GET also exposes the known_fields registry entry for `materialize` + with `lock_ttl_seconds` spec (kind=int, default=86400). + - POST with a valid value persists it and GET returns the new value. + - POST with lock_ttl_seconds < 60 or > 604800 is rejected with 422. +""" +from __future__ import annotations + +import pytest +import yaml + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +# --------------------------------------------------------------------------- +# GET — default state +# --------------------------------------------------------------------------- + + +def test_get_returns_materialize_in_editable_sections(seeded_app): + """materialize must appear in editable_sections.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.get("/api/admin/server-config", headers=headers) + assert resp.status_code == 200 + body = resp.json() + assert "materialize" in body["editable_sections"] + + +def test_get_returns_materialize_section_key(seeded_app): + """materialize key appears in sections (empty dict when no override set).""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.get("/api/admin/server-config", headers=headers) + assert resp.status_code == 200 + body = resp.json() + # The endpoint surfaces every editable section so the UI can render it. + assert "materialize" in body["sections"] + + +def test_get_returns_materialize_known_fields(seeded_app): + """known_fields must have a materialize.lock_ttl_seconds entry.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.get("/api/admin/server-config", headers=headers) + assert resp.status_code == 200 + body = resp.json() + mat_fields = body.get("known_fields", {}).get("materialize", {}) + assert "lock_ttl_seconds" in mat_fields, body.get("known_fields", {}) + spec = mat_fields["lock_ttl_seconds"] + assert spec["kind"] == "int" + assert spec["default"] == 86400 + + +# --------------------------------------------------------------------------- +# POST — update and read back +# --------------------------------------------------------------------------- + + +def test_put_updates_materialize_lock_ttl(seeded_app, tmp_path, monkeypatch): + """POST with a valid value persists; GET reflects the new value.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state = tmp_path / "state" + state.mkdir(parents=True, exist_ok=True) + import app.instance_config as ic + ic._instance_config = None + try: + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 3600}}}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + + # Verify on disk. + loaded = yaml.safe_load((state / "instance.yaml").read_text()) + assert loaded["materialize"]["lock_ttl_seconds"] == 3600 + + # Verify GET reflects the new value. + ic._instance_config = None + resp2 = client.get("/api/admin/server-config", headers=headers) + assert resp2.json()["sections"]["materialize"]["lock_ttl_seconds"] == 3600 + finally: + ic._instance_config = None + + +# --------------------------------------------------------------------------- +# POST — validation +# --------------------------------------------------------------------------- + + +def test_invalid_lock_ttl_below_min_rejected(seeded_app): + """lock_ttl_seconds < 60 is rejected with 422.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": -5}}}, + headers=headers, + ) + assert resp.status_code == 422, resp.text + + +def test_invalid_lock_ttl_zero_rejected(seeded_app): + """lock_ttl_seconds=0 is rejected with 422 (below the 60s floor).""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 0}}}, + headers=headers, + ) + assert resp.status_code == 422, resp.text + + +def test_invalid_lock_ttl_above_max_rejected(seeded_app): + """lock_ttl_seconds > 604800 (1 week) is rejected with 422.""" + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 604801}}}, + headers=headers, + ) + assert resp.status_code == 422, resp.text + + +def test_valid_lock_ttl_boundary_min_accepted(seeded_app, tmp_path, monkeypatch): + """lock_ttl_seconds=60 (minimum) is accepted.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state = tmp_path / "state" + state.mkdir(parents=True, exist_ok=True) + import app.instance_config as ic + ic._instance_config = None + try: + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 60}}}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + finally: + ic._instance_config = None + + +def test_valid_lock_ttl_boundary_max_accepted(seeded_app, tmp_path, monkeypatch): + """lock_ttl_seconds=604800 (maximum) is accepted.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + state = tmp_path / "state" + state.mkdir(parents=True, exist_ok=True) + import app.instance_config as ic + ic._instance_config = None + try: + client = seeded_app["client"] + headers = _auth(seeded_app["admin_token"]) + resp = client.post( + "/api/admin/server-config", + json={"sections": {"materialize": {"lock_ttl_seconds": 604800}}}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + finally: + ic._instance_config = None