feat(config): expose materialize.lock_ttl_seconds in server-config
New top-level 'materialize' section, single field (lock_ttl_seconds). Default 86400 (24h). Backs the file-lock TTL reclaim added in the per-table-mutex change. Editable via PUT /api/admin/server-config and the /admin/server-config UI.
This commit is contained in:
parent
3871d5320a
commit
6c0846fd17
4 changed files with 252 additions and 0 deletions
|
|
@ -146,6 +146,38 @@ def _validate_urls_in_patch(sections: Dict[str, Dict[str, Any]]) -> None:
|
||||||
_validate_url_not_private(value, field_name=".".join(path))
|
_validate_url_not_private(value, field_name=".".join(path))
|
||||||
|
|
||||||
|
|
||||||
|
_LOCK_TTL_MIN = 60
|
||||||
|
_LOCK_TTL_MAX = 7 * 24 * 3600 # 604800 — one week
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_materialize_section(sections: Dict[str, Dict[str, Any]]) -> None:
|
||||||
|
"""Validate the materialize section patch when present.
|
||||||
|
|
||||||
|
Checks field-level constraints that the Pydantic envelope can't enforce
|
||||||
|
(it only validates the outer shape, not nested leaf values).
|
||||||
|
"""
|
||||||
|
mat = sections.get("materialize")
|
||||||
|
if not isinstance(mat, dict):
|
||||||
|
return
|
||||||
|
ttl = mat.get("lock_ttl_seconds")
|
||||||
|
if ttl is None:
|
||||||
|
return
|
||||||
|
if not isinstance(ttl, int) or isinstance(ttl, bool):
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=422,
|
||||||
|
detail="materialize.lock_ttl_seconds must be an integer",
|
||||||
|
)
|
||||||
|
if ttl < _LOCK_TTL_MIN or ttl > _LOCK_TTL_MAX:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=422,
|
||||||
|
detail=(
|
||||||
|
f"materialize.lock_ttl_seconds must be between "
|
||||||
|
f"{_LOCK_TTL_MIN} and {_LOCK_TTL_MAX} "
|
||||||
|
f"(got {ttl})"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# --- Server-config (instance.yaml) editor -----------------------------------
|
# --- Server-config (instance.yaml) editor -----------------------------------
|
||||||
#
|
#
|
||||||
# The /admin/server-config UI POSTs a partial dict here keyed by section
|
# The /admin/server-config UI POSTs a partial dict here keyed by section
|
||||||
|
|
@ -175,6 +207,7 @@ _EDITABLE_SECTIONS: tuple[str, ...] = (
|
||||||
"openmetadata",
|
"openmetadata",
|
||||||
"desktop",
|
"desktop",
|
||||||
"corporate_memory",
|
"corporate_memory",
|
||||||
|
"materialize",
|
||||||
)
|
)
|
||||||
|
|
||||||
# "Danger-zone" sections — flipping these can lock operators out (auth.*) or
|
# "Danger-zone" sections — flipping these can lock operators out (auth.*) or
|
||||||
|
|
@ -585,6 +618,23 @@ _KNOWN_FIELDS: dict[str, dict[str, dict]] = {
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
# materialize — file-lock TTL for the concurrent-materialize safety net.
|
||||||
|
# A single field; more knobs may follow as the feature matures.
|
||||||
|
"materialize": {
|
||||||
|
"lock_ttl_seconds": {
|
||||||
|
"kind": "int",
|
||||||
|
"default": 86400,
|
||||||
|
"hint": (
|
||||||
|
"How long (seconds) before a stale materialize lock file is "
|
||||||
|
"reclaimed. The lock is a .parquet.lock sibling file; if the "
|
||||||
|
"holder process is hard-killed, the next attempt reclaims the "
|
||||||
|
"lock once the file's mtime is older than this TTL. "
|
||||||
|
"Default 86400 (24 h). Min 60, max 604800 (7 days). "
|
||||||
|
"Lower only if you know materializes never exceed the new value "
|
||||||
|
"and your host regularly hard-kills processes."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
# Keys whose values must be redacted from the audit diff. We match
|
# Keys whose values must be redacted from the audit diff. We match
|
||||||
|
|
@ -913,6 +963,9 @@ async def update_server_config(
|
||||||
# the per-section patch (e.g. data_source.keboola.stack_url).
|
# the per-section patch (e.g. data_source.keboola.stack_url).
|
||||||
_validate_urls_in_patch(request.sections)
|
_validate_urls_in_patch(request.sections)
|
||||||
|
|
||||||
|
# Field-level constraints for sections whose values have documented ranges.
|
||||||
|
_validate_materialize_section(request.sections)
|
||||||
|
|
||||||
# Defense-in-depth: scrub redaction sentinels (`***` / `<empty>`) out of
|
# Defense-in-depth: scrub redaction sentinels (`***` / `<empty>`) out of
|
||||||
# secret-keyed leaves in the patch before they reach the deep-merge.
|
# secret-keyed leaves in the patch before they reach the deep-merge.
|
||||||
# The client form does the same scrub, but an API caller round-tripping
|
# The client form does the same scrub, but an API caller round-tripping
|
||||||
|
|
|
||||||
|
|
@ -218,6 +218,10 @@ const SECTION_META = {
|
||||||
title: "Corporate Memory",
|
title: "Corporate Memory",
|
||||||
help: "Optional governance for AI-extracted knowledge. When the section is unset, the system runs in legacy democratic-wiki mode with no admin review.",
|
help: "Optional governance for AI-extracted knowledge. When the section is unset, the system runs in legacy democratic-wiki mode with no admin review.",
|
||||||
},
|
},
|
||||||
|
materialize: {
|
||||||
|
title: "Materialize",
|
||||||
|
help: "Concurrency safety net for the materialize path. Controls the file-lock TTL used to detect and reclaim stale locks from hard-killed processes.",
|
||||||
|
},
|
||||||
};
|
};
|
||||||
const DANGER_SECTIONS = new Set(["auth", "server"]);
|
const DANGER_SECTIONS = new Set(["auth", "server"]);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -403,3 +403,19 @@ catalog:
|
||||||
# schema_cache_ttl_seconds: 3600 # /api/v2/schema/{table_id} cache lifetime (default: 1 h)
|
# schema_cache_ttl_seconds: 3600 # /api/v2/schema/{table_id} cache lifetime (default: 1 h)
|
||||||
# sample_cache_ttl_seconds: 3600 # /api/v2/sample/{table_id} cache lifetime (default: 1 h)
|
# sample_cache_ttl_seconds: 3600 # /api/v2/sample/{table_id} cache lifetime (default: 1 h)
|
||||||
# # Admins can force-refresh via POST /api/v2/sample/{id}?refresh=true
|
# # Admins can force-refresh via POST /api/v2/sample/{id}?refresh=true
|
||||||
|
|
||||||
|
# --- Materialize concurrency safety (optional) ---
|
||||||
|
# Concurrency safety net for the materialize path (BQ + Keboola). When
|
||||||
|
# two materialize attempts race for the same table_id, the second one
|
||||||
|
# raises MaterializeInFlightError and skips. The lock is held in a
|
||||||
|
# .parquet.lock sibling file; if a holder process is hard-killed before
|
||||||
|
# kernel-level flock release, the next attempt reclaims the lock once
|
||||||
|
# the file's mtime is older than this TTL.
|
||||||
|
#
|
||||||
|
# Default 86400 (24h) is generous on purpose — anything shorter risks
|
||||||
|
# a long-running COPY being interrupted by its own scheduler successor.
|
||||||
|
# Lower it only if you know your materialize never exceeds the new
|
||||||
|
# value AND your host has a habit of hard-killing processes.
|
||||||
|
# Min 60 (1 minute), max 604800 (7 days). Configurable via /admin/server-config UI.
|
||||||
|
# materialize:
|
||||||
|
# lock_ttl_seconds: 86400
|
||||||
|
|
|
||||||
179
tests/test_admin_server_config_materialize_section.py
Normal file
179
tests/test_admin_server_config_materialize_section.py
Normal file
|
|
@ -0,0 +1,179 @@
|
||||||
|
"""/api/admin/server-config exposes materialize.lock_ttl_seconds and
|
||||||
|
accepts updates. Default is 86400 (24h).
|
||||||
|
|
||||||
|
Fixture `seeded_app` is auto-discovered from `tests/conftest.py` —
|
||||||
|
DO NOT import. It returns a dict: `{"client": TestClient,
|
||||||
|
"admin_token": str, ...}`. Auth helper `_auth(token)` mirrors the
|
||||||
|
project's local pattern (also used in test_api_admin_materialized.py).
|
||||||
|
|
||||||
|
Behaviour contract:
|
||||||
|
- GET returns `materialize` section in `sections` (empty dict when no
|
||||||
|
override is set, since the endpoint surfaces every editable section).
|
||||||
|
- GET also exposes the known_fields registry entry for `materialize`
|
||||||
|
with `lock_ttl_seconds` spec (kind=int, default=86400).
|
||||||
|
- POST with a valid value persists it and GET returns the new value.
|
||||||
|
- POST with lock_ttl_seconds < 60 or > 604800 is rejected with 422.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
def _auth(token: str) -> dict:
|
||||||
|
return {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# GET — default state
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_returns_materialize_in_editable_sections(seeded_app):
|
||||||
|
"""materialize must appear in editable_sections."""
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.get("/api/admin/server-config", headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.json()
|
||||||
|
assert "materialize" in body["editable_sections"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_returns_materialize_section_key(seeded_app):
|
||||||
|
"""materialize key appears in sections (empty dict when no override set)."""
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.get("/api/admin/server-config", headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.json()
|
||||||
|
# The endpoint surfaces every editable section so the UI can render it.
|
||||||
|
assert "materialize" in body["sections"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_returns_materialize_known_fields(seeded_app):
|
||||||
|
"""known_fields must have a materialize.lock_ttl_seconds entry."""
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.get("/api/admin/server-config", headers=headers)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.json()
|
||||||
|
mat_fields = body.get("known_fields", {}).get("materialize", {})
|
||||||
|
assert "lock_ttl_seconds" in mat_fields, body.get("known_fields", {})
|
||||||
|
spec = mat_fields["lock_ttl_seconds"]
|
||||||
|
assert spec["kind"] == "int"
|
||||||
|
assert spec["default"] == 86400
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# POST — update and read back
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_put_updates_materialize_lock_ttl(seeded_app, tmp_path, monkeypatch):
|
||||||
|
"""POST with a valid value persists; GET reflects the new value."""
|
||||||
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
||||||
|
state = tmp_path / "state"
|
||||||
|
state.mkdir(parents=True, exist_ok=True)
|
||||||
|
import app.instance_config as ic
|
||||||
|
ic._instance_config = None
|
||||||
|
try:
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.post(
|
||||||
|
"/api/admin/server-config",
|
||||||
|
json={"sections": {"materialize": {"lock_ttl_seconds": 3600}}},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200, resp.text
|
||||||
|
|
||||||
|
# Verify on disk.
|
||||||
|
loaded = yaml.safe_load((state / "instance.yaml").read_text())
|
||||||
|
assert loaded["materialize"]["lock_ttl_seconds"] == 3600
|
||||||
|
|
||||||
|
# Verify GET reflects the new value.
|
||||||
|
ic._instance_config = None
|
||||||
|
resp2 = client.get("/api/admin/server-config", headers=headers)
|
||||||
|
assert resp2.json()["sections"]["materialize"]["lock_ttl_seconds"] == 3600
|
||||||
|
finally:
|
||||||
|
ic._instance_config = None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# POST — validation
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_lock_ttl_below_min_rejected(seeded_app):
|
||||||
|
"""lock_ttl_seconds < 60 is rejected with 422."""
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.post(
|
||||||
|
"/api/admin/server-config",
|
||||||
|
json={"sections": {"materialize": {"lock_ttl_seconds": -5}}},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert resp.status_code == 422, resp.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_lock_ttl_zero_rejected(seeded_app):
|
||||||
|
"""lock_ttl_seconds=0 is rejected with 422 (below the 60s floor)."""
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.post(
|
||||||
|
"/api/admin/server-config",
|
||||||
|
json={"sections": {"materialize": {"lock_ttl_seconds": 0}}},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert resp.status_code == 422, resp.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_lock_ttl_above_max_rejected(seeded_app):
|
||||||
|
"""lock_ttl_seconds > 604800 (1 week) is rejected with 422."""
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.post(
|
||||||
|
"/api/admin/server-config",
|
||||||
|
json={"sections": {"materialize": {"lock_ttl_seconds": 604801}}},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert resp.status_code == 422, resp.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_valid_lock_ttl_boundary_min_accepted(seeded_app, tmp_path, monkeypatch):
|
||||||
|
"""lock_ttl_seconds=60 (minimum) is accepted."""
|
||||||
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
||||||
|
state = tmp_path / "state"
|
||||||
|
state.mkdir(parents=True, exist_ok=True)
|
||||||
|
import app.instance_config as ic
|
||||||
|
ic._instance_config = None
|
||||||
|
try:
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.post(
|
||||||
|
"/api/admin/server-config",
|
||||||
|
json={"sections": {"materialize": {"lock_ttl_seconds": 60}}},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200, resp.text
|
||||||
|
finally:
|
||||||
|
ic._instance_config = None
|
||||||
|
|
||||||
|
|
||||||
|
def test_valid_lock_ttl_boundary_max_accepted(seeded_app, tmp_path, monkeypatch):
|
||||||
|
"""lock_ttl_seconds=604800 (maximum) is accepted."""
|
||||||
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
||||||
|
state = tmp_path / "state"
|
||||||
|
state.mkdir(parents=True, exist_ok=True)
|
||||||
|
import app.instance_config as ic
|
||||||
|
ic._instance_config = None
|
||||||
|
try:
|
||||||
|
client = seeded_app["client"]
|
||||||
|
headers = _auth(seeded_app["admin_token"])
|
||||||
|
resp = client.post(
|
||||||
|
"/api/admin/server-config",
|
||||||
|
json={"sections": {"materialize": {"lock_ttl_seconds": 604800}}},
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200, resp.text
|
||||||
|
finally:
|
||||||
|
ic._instance_config = None
|
||||||
Loading…
Reference in a new issue