diff --git a/app/auth/router.py b/app/auth/router.py index acd581c..99a76ea 100644 --- a/app/auth/router.py +++ b/app/auth/router.py @@ -105,35 +105,51 @@ async def bootstrap( request: BootstrapRequest, conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Create the first admin user. Only works when no users exist. + """Bootstrap the first admin account. - This endpoint allows an AI agent to bootstrap a fresh instance - without needing docker exec or SSH. It automatically deactivates - after the first user is created. + Allowed when no user has a password_hash yet. This covers: + (a) No users exist at all. + (b) Only seed users (created by SEED_ADMIN_EMAIL at startup) exist, which + have no password and cannot log in — bootstrap lets the operator + activate them with a password. + + If a user with the given email already exists (e.g. as a seed), this + endpoint sets its password_hash (or clears it, if no password was supplied — + useful for OAuth-only flows) and promotes it to admin. + + Deactivates as soon as any user has a password_hash. """ repo = UserRepository(conn) existing = repo.list_all() - if existing: + + # Bootstrap is locked once anyone has a password set. + users_with_password = [u for u in existing if u.get("password_hash")] + if users_with_password: raise HTTPException( status_code=403, - detail=f"Bootstrap disabled — {len(existing)} users already exist. Use /auth/token to login.", + detail=f"Bootstrap disabled — {len(users_with_password)} user(s) already have passwords set. Use /auth/password/login.", ) - user_id = str(uuid.uuid4()) - password_hash = None - if request.password: - password_hash = PasswordHasher().hash(request.password) + password_hash = PasswordHasher().hash(request.password) if request.password else None - repo.create( - id=user_id, - email=request.email, - name=request.name or request.email.split("@")[0], - role="admin", - password_hash=password_hash, - ) + # If a matching user already exists (e.g. seed), update it; else create fresh. + existing_user = next((u for u in existing if u.get("email") == request.email), None) + if existing_user: + user_id = existing_user["id"] + repo.update(id=user_id, password_hash=password_hash, role="admin") + _audit(user_id, "bootstrap_activated_seed") + else: + user_id = str(uuid.uuid4()) + repo.create( + id=user_id, + email=request.email, + name=request.name or request.email.split("@")[0], + role="admin", + password_hash=password_hash, + ) + _audit(user_id, "bootstrap_completed") token = create_access_token(user_id=user_id, email=request.email, role="admin") - _audit(user_id, "bootstrap_completed") return TokenResponse( access_token=token, user_id=user_id, diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 07ea4d9..65b904f 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -17,7 +17,7 @@ def fresh_client(tmp_path, monkeypatch): @pytest.fixture def seeded_client(tmp_path, monkeypatch): - """Client with one existing user.""" + """Client with one existing seed user (no password_hash — like SEED_ADMIN_EMAIL seeding).""" monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") from app.main import create_app @@ -29,6 +29,27 @@ def seeded_client(tmp_path, monkeypatch): return TestClient(create_app()) +@pytest.fixture +def password_user_client(tmp_path, monkeypatch): + """Client with a user who already has a password set — bootstrap must be disabled.""" + from argon2 import PasswordHasher + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") + from app.main import create_app + from src.db import get_system_db + from src.repositories.users import UserRepository + conn = get_system_db() + UserRepository(conn).create( + id="existing", + email="existing@test.com", + name="E", + role="admin", + password_hash=PasswordHasher().hash("pre-existing-pass"), + ) + conn.close() + return TestClient(create_app()) + + class TestBootstrap: def test_bootstrap_on_empty_db(self, fresh_client): """First call creates admin and returns token.""" @@ -55,13 +76,30 @@ class TestBootstrap: resp2 = fresh_client.get("/api/health") assert resp2.status_code == 200 - def test_bootstrap_disabled_when_users_exist(self, seeded_client): - """Bootstrap fails with 403 when users already exist.""" + def test_bootstrap_activates_seed_user(self, seeded_client): + """Bootstrap activates a password-less seed user (SEED_ADMIN_EMAIL scenario).""" resp = seeded_client.post("/auth/bootstrap", json={ + "email": "existing@test.com", + "password": "newpass123", + }) + assert resp.status_code == 200 + assert resp.json()["role"] == "admin" + + # Login now works + login = seeded_client.post("/auth/password/login", json={ + "email": "existing@test.com", + "password": "newpass123", + }) + assert login.status_code == 200 + + def test_bootstrap_disabled_when_password_user_exists(self, password_user_client): + """Bootstrap fails with 403 when any user already has a password set.""" + resp = password_user_client.post("/auth/bootstrap", json={ "email": "hacker@evil.com", + "password": "should-not-work", }) assert resp.status_code == 403 - assert "already exist" in resp.json()["detail"] + assert "already have passwords" in resp.json()["detail"] def test_bootstrap_then_login(self, fresh_client): """After bootstrap with password, /auth/token login works; without password it requires OAuth.""" @@ -90,11 +128,19 @@ class TestBootstrap: }) assert resp.status_code == 401 - def test_bootstrap_second_call_fails(self, fresh_client): - """Second bootstrap call fails — endpoint self-deactivates.""" - fresh_client.post("/auth/bootstrap", json={"email": "admin@test.com"}) + def test_bootstrap_second_call_fails_once_password_set(self, fresh_client): + """Endpoint self-deactivates once any user has a password.""" + # First call WITH password — locks bootstrap + fresh_client.post("/auth/bootstrap", json={ + "email": "admin@test.com", + "password": "realpass123", + }) - resp = fresh_client.post("/auth/bootstrap", json={"email": "second@test.com"}) + # Any subsequent bootstrap attempt fails + resp = fresh_client.post("/auth/bootstrap", json={ + "email": "second@test.com", + "password": "other-pass", + }) assert resp.status_code == 403 def test_full_agent_flow(self, fresh_client):