Merge: /auth/bootstrap seed-user fix

This commit is contained in:
ZdenekSrotyr 2026-04-21 20:01:38 +02:00
commit 0643437ab8
2 changed files with 88 additions and 26 deletions

View file

@ -105,25 +105,41 @@ async def bootstrap(
request: BootstrapRequest, request: BootstrapRequest,
conn: duckdb.DuckDBPyConnection = Depends(_get_db), conn: duckdb.DuckDBPyConnection = Depends(_get_db),
): ):
"""Create the first admin user. Only works when no users exist. """Bootstrap the first admin account.
This endpoint allows an AI agent to bootstrap a fresh instance Allowed when no user has a password_hash yet. This covers:
without needing docker exec or SSH. It automatically deactivates (a) No users exist at all.
after the first user is created. (b) Only seed users (created by SEED_ADMIN_EMAIL at startup) exist, which
have no password and cannot log in bootstrap lets the operator
activate them with a password.
If a user with the given email already exists (e.g. as a seed), this
endpoint sets its password_hash (or clears it, if no password was supplied
useful for OAuth-only flows) and promotes it to admin.
Deactivates as soon as any user has a password_hash.
""" """
repo = UserRepository(conn) repo = UserRepository(conn)
existing = repo.list_all() existing = repo.list_all()
if existing:
# Bootstrap is locked once anyone has a password set.
users_with_password = [u for u in existing if u.get("password_hash")]
if users_with_password:
raise HTTPException( raise HTTPException(
status_code=403, status_code=403,
detail=f"Bootstrap disabled — {len(existing)} users already exist. Use /auth/token to login.", detail=f"Bootstrap disabled — {len(users_with_password)} user(s) already have passwords set. Use /auth/password/login.",
) )
user_id = str(uuid.uuid4()) password_hash = PasswordHasher().hash(request.password) if request.password else None
password_hash = None
if request.password:
password_hash = PasswordHasher().hash(request.password)
# If a matching user already exists (e.g. seed), update it; else create fresh.
existing_user = next((u for u in existing if u.get("email") == request.email), None)
if existing_user:
user_id = existing_user["id"]
repo.update(id=user_id, password_hash=password_hash, role="admin")
_audit(user_id, "bootstrap_activated_seed")
else:
user_id = str(uuid.uuid4())
repo.create( repo.create(
id=user_id, id=user_id,
email=request.email, email=request.email,
@ -131,9 +147,9 @@ async def bootstrap(
role="admin", role="admin",
password_hash=password_hash, password_hash=password_hash,
) )
_audit(user_id, "bootstrap_completed")
token = create_access_token(user_id=user_id, email=request.email, role="admin") token = create_access_token(user_id=user_id, email=request.email, role="admin")
_audit(user_id, "bootstrap_completed")
return TokenResponse( return TokenResponse(
access_token=token, access_token=token,
user_id=user_id, user_id=user_id,

View file

@ -17,7 +17,7 @@ def fresh_client(tmp_path, monkeypatch):
@pytest.fixture @pytest.fixture
def seeded_client(tmp_path, monkeypatch): def seeded_client(tmp_path, monkeypatch):
"""Client with one existing user.""" """Client with one existing seed user (no password_hash — like SEED_ADMIN_EMAIL seeding)."""
monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
from app.main import create_app from app.main import create_app
@ -29,6 +29,27 @@ def seeded_client(tmp_path, monkeypatch):
return TestClient(create_app()) return TestClient(create_app())
@pytest.fixture
def password_user_client(tmp_path, monkeypatch):
"""Client with a user who already has a password set — bootstrap must be disabled."""
from argon2 import PasswordHasher
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
from app.main import create_app
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
UserRepository(conn).create(
id="existing",
email="existing@test.com",
name="E",
role="admin",
password_hash=PasswordHasher().hash("pre-existing-pass"),
)
conn.close()
return TestClient(create_app())
class TestBootstrap: class TestBootstrap:
def test_bootstrap_on_empty_db(self, fresh_client): def test_bootstrap_on_empty_db(self, fresh_client):
"""First call creates admin and returns token.""" """First call creates admin and returns token."""
@ -55,13 +76,30 @@ class TestBootstrap:
resp2 = fresh_client.get("/api/health") resp2 = fresh_client.get("/api/health")
assert resp2.status_code == 200 assert resp2.status_code == 200
def test_bootstrap_disabled_when_users_exist(self, seeded_client): def test_bootstrap_activates_seed_user(self, seeded_client):
"""Bootstrap fails with 403 when users already exist.""" """Bootstrap activates a password-less seed user (SEED_ADMIN_EMAIL scenario)."""
resp = seeded_client.post("/auth/bootstrap", json={ resp = seeded_client.post("/auth/bootstrap", json={
"email": "existing@test.com",
"password": "newpass123",
})
assert resp.status_code == 200
assert resp.json()["role"] == "admin"
# Login now works
login = seeded_client.post("/auth/password/login", json={
"email": "existing@test.com",
"password": "newpass123",
})
assert login.status_code == 200
def test_bootstrap_disabled_when_password_user_exists(self, password_user_client):
"""Bootstrap fails with 403 when any user already has a password set."""
resp = password_user_client.post("/auth/bootstrap", json={
"email": "hacker@evil.com", "email": "hacker@evil.com",
"password": "should-not-work",
}) })
assert resp.status_code == 403 assert resp.status_code == 403
assert "already exist" in resp.json()["detail"] assert "already have passwords" in resp.json()["detail"]
def test_bootstrap_then_login(self, fresh_client): def test_bootstrap_then_login(self, fresh_client):
"""After bootstrap with password, /auth/token login works; without password it requires OAuth.""" """After bootstrap with password, /auth/token login works; without password it requires OAuth."""
@ -90,11 +128,19 @@ class TestBootstrap:
}) })
assert resp.status_code == 401 assert resp.status_code == 401
def test_bootstrap_second_call_fails(self, fresh_client): def test_bootstrap_second_call_fails_once_password_set(self, fresh_client):
"""Second bootstrap call fails — endpoint self-deactivates.""" """Endpoint self-deactivates once any user has a password."""
fresh_client.post("/auth/bootstrap", json={"email": "admin@test.com"}) # First call WITH password — locks bootstrap
fresh_client.post("/auth/bootstrap", json={
"email": "admin@test.com",
"password": "realpass123",
})
resp = fresh_client.post("/auth/bootstrap", json={"email": "second@test.com"}) # Any subsequent bootstrap attempt fails
resp = fresh_client.post("/auth/bootstrap", json={
"email": "second@test.com",
"password": "other-pass",
})
assert resp.status_code == 403 assert resp.status_code == 403
def test_full_agent_flow(self, fresh_client): def test_full_agent_flow(self, fresh_client):