feat(config): expose materialize.lock_ttl_seconds in server-config

New top-level 'materialize' section, single field (lock_ttl_seconds).
Default 86400 (24h). Backs the file-lock TTL reclaim added in the
per-table-mutex change. Editable via PUT /api/admin/server-config and
the /admin/server-config UI.
This commit is contained in:
ZdenekSrotyr 2026-05-04 18:52:54 +02:00
parent 3871d5320a
commit 6c0846fd17
4 changed files with 252 additions and 0 deletions

View file

@ -146,6 +146,38 @@ def _validate_urls_in_patch(sections: Dict[str, Dict[str, Any]]) -> None:
_validate_url_not_private(value, field_name=".".join(path)) _validate_url_not_private(value, field_name=".".join(path))
_LOCK_TTL_MIN = 60
_LOCK_TTL_MAX = 7 * 24 * 3600 # 604800 — one week
def _validate_materialize_section(sections: Dict[str, Dict[str, Any]]) -> None:
"""Validate the materialize section patch when present.
Checks field-level constraints that the Pydantic envelope can't enforce
(it only validates the outer shape, not nested leaf values).
"""
mat = sections.get("materialize")
if not isinstance(mat, dict):
return
ttl = mat.get("lock_ttl_seconds")
if ttl is None:
return
if not isinstance(ttl, int) or isinstance(ttl, bool):
raise HTTPException(
status_code=422,
detail="materialize.lock_ttl_seconds must be an integer",
)
if ttl < _LOCK_TTL_MIN or ttl > _LOCK_TTL_MAX:
raise HTTPException(
status_code=422,
detail=(
f"materialize.lock_ttl_seconds must be between "
f"{_LOCK_TTL_MIN} and {_LOCK_TTL_MAX} "
f"(got {ttl})"
),
)
# --- Server-config (instance.yaml) editor ----------------------------------- # --- Server-config (instance.yaml) editor -----------------------------------
# #
# The /admin/server-config UI POSTs a partial dict here keyed by section # The /admin/server-config UI POSTs a partial dict here keyed by section
@ -175,6 +207,7 @@ _EDITABLE_SECTIONS: tuple[str, ...] = (
"openmetadata", "openmetadata",
"desktop", "desktop",
"corporate_memory", "corporate_memory",
"materialize",
) )
# "Danger-zone" sections — flipping these can lock operators out (auth.*) or # "Danger-zone" sections — flipping these can lock operators out (auth.*) or
@ -585,6 +618,23 @@ _KNOWN_FIELDS: dict[str, dict[str, dict]] = {
), ),
}, },
}, },
# materialize — file-lock TTL for the concurrent-materialize safety net.
# A single field; more knobs may follow as the feature matures.
"materialize": {
"lock_ttl_seconds": {
"kind": "int",
"default": 86400,
"hint": (
"How long (seconds) before a stale materialize lock file is "
"reclaimed. The lock is a .parquet.lock sibling file; if the "
"holder process is hard-killed, the next attempt reclaims the "
"lock once the file's mtime is older than this TTL. "
"Default 86400 (24 h). Min 60, max 604800 (7 days). "
"Lower only if you know materializes never exceed the new value "
"and your host regularly hard-kills processes."
),
},
},
} }
# Keys whose values must be redacted from the audit diff. We match # Keys whose values must be redacted from the audit diff. We match
@ -913,6 +963,9 @@ async def update_server_config(
# the per-section patch (e.g. data_source.keboola.stack_url). # the per-section patch (e.g. data_source.keboola.stack_url).
_validate_urls_in_patch(request.sections) _validate_urls_in_patch(request.sections)
# Field-level constraints for sections whose values have documented ranges.
_validate_materialize_section(request.sections)
# Defense-in-depth: scrub redaction sentinels (`***` / `<empty>`) out of # Defense-in-depth: scrub redaction sentinels (`***` / `<empty>`) out of
# secret-keyed leaves in the patch before they reach the deep-merge. # secret-keyed leaves in the patch before they reach the deep-merge.
# The client form does the same scrub, but an API caller round-tripping # The client form does the same scrub, but an API caller round-tripping

View file

@ -218,6 +218,10 @@ const SECTION_META = {
title: "Corporate Memory", title: "Corporate Memory",
help: "Optional governance for AI-extracted knowledge. When the section is unset, the system runs in legacy democratic-wiki mode with no admin review.", help: "Optional governance for AI-extracted knowledge. When the section is unset, the system runs in legacy democratic-wiki mode with no admin review.",
}, },
materialize: {
title: "Materialize",
help: "Concurrency safety net for the materialize path. Controls the file-lock TTL used to detect and reclaim stale locks from hard-killed processes.",
},
}; };
const DANGER_SECTIONS = new Set(["auth", "server"]); const DANGER_SECTIONS = new Set(["auth", "server"]);

View file

@ -403,3 +403,19 @@ catalog:
# schema_cache_ttl_seconds: 3600 # /api/v2/schema/{table_id} cache lifetime (default: 1 h) # schema_cache_ttl_seconds: 3600 # /api/v2/schema/{table_id} cache lifetime (default: 1 h)
# sample_cache_ttl_seconds: 3600 # /api/v2/sample/{table_id} cache lifetime (default: 1 h) # sample_cache_ttl_seconds: 3600 # /api/v2/sample/{table_id} cache lifetime (default: 1 h)
# # Admins can force-refresh via POST /api/v2/sample/{id}?refresh=true # # Admins can force-refresh via POST /api/v2/sample/{id}?refresh=true
# --- Materialize concurrency safety (optional) ---
# Concurrency safety net for the materialize path (BQ + Keboola). When
# two materialize attempts race for the same table_id, the second one
# raises MaterializeInFlightError and skips. The lock is held in a
# .parquet.lock sibling file; if a holder process is hard-killed before
# kernel-level flock release, the next attempt reclaims the lock once
# the file's mtime is older than this TTL.
#
# Default 86400 (24h) is generous on purpose — anything shorter risks
# a long-running COPY being interrupted by its own scheduler successor.
# Lower it only if you know your materialize never exceeds the new
# value AND your host has a habit of hard-killing processes.
# Min 60 (1 minute), max 604800 (7 days). Configurable via /admin/server-config UI.
# materialize:
# lock_ttl_seconds: 86400

View file

@ -0,0 +1,179 @@
"""/api/admin/server-config exposes materialize.lock_ttl_seconds and
accepts updates. Default is 86400 (24h).
Fixture `seeded_app` is auto-discovered from `tests/conftest.py`
DO NOT import. It returns a dict: `{"client": TestClient,
"admin_token": str, ...}`. Auth helper `_auth(token)` mirrors the
project's local pattern (also used in test_api_admin_materialized.py).
Behaviour contract:
- GET returns `materialize` section in `sections` (empty dict when no
override is set, since the endpoint surfaces every editable section).
- GET also exposes the known_fields registry entry for `materialize`
with `lock_ttl_seconds` spec (kind=int, default=86400).
- POST with a valid value persists it and GET returns the new value.
- POST with lock_ttl_seconds < 60 or > 604800 is rejected with 422.
"""
from __future__ import annotations
import pytest
import yaml
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
# ---------------------------------------------------------------------------
# GET — default state
# ---------------------------------------------------------------------------
def test_get_returns_materialize_in_editable_sections(seeded_app):
"""materialize must appear in editable_sections."""
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.get("/api/admin/server-config", headers=headers)
assert resp.status_code == 200
body = resp.json()
assert "materialize" in body["editable_sections"]
def test_get_returns_materialize_section_key(seeded_app):
"""materialize key appears in sections (empty dict when no override set)."""
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.get("/api/admin/server-config", headers=headers)
assert resp.status_code == 200
body = resp.json()
# The endpoint surfaces every editable section so the UI can render it.
assert "materialize" in body["sections"]
def test_get_returns_materialize_known_fields(seeded_app):
"""known_fields must have a materialize.lock_ttl_seconds entry."""
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.get("/api/admin/server-config", headers=headers)
assert resp.status_code == 200
body = resp.json()
mat_fields = body.get("known_fields", {}).get("materialize", {})
assert "lock_ttl_seconds" in mat_fields, body.get("known_fields", {})
spec = mat_fields["lock_ttl_seconds"]
assert spec["kind"] == "int"
assert spec["default"] == 86400
# ---------------------------------------------------------------------------
# POST — update and read back
# ---------------------------------------------------------------------------
def test_put_updates_materialize_lock_ttl(seeded_app, tmp_path, monkeypatch):
"""POST with a valid value persists; GET reflects the new value."""
monkeypatch.setenv("DATA_DIR", str(tmp_path))
state = tmp_path / "state"
state.mkdir(parents=True, exist_ok=True)
import app.instance_config as ic
ic._instance_config = None
try:
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.post(
"/api/admin/server-config",
json={"sections": {"materialize": {"lock_ttl_seconds": 3600}}},
headers=headers,
)
assert resp.status_code == 200, resp.text
# Verify on disk.
loaded = yaml.safe_load((state / "instance.yaml").read_text())
assert loaded["materialize"]["lock_ttl_seconds"] == 3600
# Verify GET reflects the new value.
ic._instance_config = None
resp2 = client.get("/api/admin/server-config", headers=headers)
assert resp2.json()["sections"]["materialize"]["lock_ttl_seconds"] == 3600
finally:
ic._instance_config = None
# ---------------------------------------------------------------------------
# POST — validation
# ---------------------------------------------------------------------------
def test_invalid_lock_ttl_below_min_rejected(seeded_app):
"""lock_ttl_seconds < 60 is rejected with 422."""
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.post(
"/api/admin/server-config",
json={"sections": {"materialize": {"lock_ttl_seconds": -5}}},
headers=headers,
)
assert resp.status_code == 422, resp.text
def test_invalid_lock_ttl_zero_rejected(seeded_app):
"""lock_ttl_seconds=0 is rejected with 422 (below the 60s floor)."""
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.post(
"/api/admin/server-config",
json={"sections": {"materialize": {"lock_ttl_seconds": 0}}},
headers=headers,
)
assert resp.status_code == 422, resp.text
def test_invalid_lock_ttl_above_max_rejected(seeded_app):
"""lock_ttl_seconds > 604800 (1 week) is rejected with 422."""
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.post(
"/api/admin/server-config",
json={"sections": {"materialize": {"lock_ttl_seconds": 604801}}},
headers=headers,
)
assert resp.status_code == 422, resp.text
def test_valid_lock_ttl_boundary_min_accepted(seeded_app, tmp_path, monkeypatch):
"""lock_ttl_seconds=60 (minimum) is accepted."""
monkeypatch.setenv("DATA_DIR", str(tmp_path))
state = tmp_path / "state"
state.mkdir(parents=True, exist_ok=True)
import app.instance_config as ic
ic._instance_config = None
try:
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.post(
"/api/admin/server-config",
json={"sections": {"materialize": {"lock_ttl_seconds": 60}}},
headers=headers,
)
assert resp.status_code == 200, resp.text
finally:
ic._instance_config = None
def test_valid_lock_ttl_boundary_max_accepted(seeded_app, tmp_path, monkeypatch):
"""lock_ttl_seconds=604800 (maximum) is accepted."""
monkeypatch.setenv("DATA_DIR", str(tmp_path))
state = tmp_path / "state"
state.mkdir(parents=True, exist_ok=True)
import app.instance_config as ic
ic._instance_config = None
try:
client = seeded_app["client"]
headers = _auth(seeded_app["admin_token"])
resp = client.post(
"/api/admin/server-config",
json={"sections": {"materialize": {"lock_ttl_seconds": 604800}}},
headers=headers,
)
assert resp.status_code == 200, resp.text
finally:
ic._instance_config = None