agnes-the-ai-analyst/tests/test_bootstrap.py
ZdenekSrotyr 3205a8d300 fix: block /auth/token for OAuth-only users without password_hash
Users without a password_hash (Google OAuth / magic-link accounts) could
obtain a JWT by simply posting their email to /auth/token. Add an else
clause that rejects such requests with 401, directing them to their
configured auth provider. Update and extend tests accordingly.
2026-04-09 16:29:47 +02:00

133 lines
4.8 KiB
Python

"""Tests for bootstrap endpoint — first admin user creation."""
import os
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def fresh_client(tmp_path, monkeypatch):
"""Client with EMPTY database — no users."""
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
from app.main import create_app
app = create_app()
return TestClient(app)
@pytest.fixture
def seeded_client(tmp_path, monkeypatch):
"""Client with one existing user."""
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
from app.main import create_app
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
UserRepository(conn).create(id="existing", email="existing@test.com", name="E", role="admin")
conn.close()
return TestClient(create_app())
class TestBootstrap:
def test_bootstrap_on_empty_db(self, fresh_client):
"""First call creates admin and returns token."""
resp = fresh_client.post("/auth/bootstrap", json={
"email": "admin@test.com",
"name": "Admin",
})
assert resp.status_code == 200
data = resp.json()
assert data["email"] == "admin@test.com"
assert data["role"] == "admin"
assert "access_token" in data
def test_bootstrap_with_password(self, fresh_client):
"""Bootstrap with password sets password hash."""
resp = fresh_client.post("/auth/bootstrap", json={
"email": "admin@test.com",
"password": "securepass123",
})
assert resp.status_code == 200
# Token works
token = resp.json()["access_token"]
resp2 = fresh_client.get("/api/health")
assert resp2.status_code == 200
def test_bootstrap_disabled_when_users_exist(self, seeded_client):
"""Bootstrap fails with 403 when users already exist."""
resp = seeded_client.post("/auth/bootstrap", json={
"email": "hacker@evil.com",
})
assert resp.status_code == 403
assert "already exist" in resp.json()["detail"]
def test_bootstrap_then_login(self, fresh_client):
"""After bootstrap with password, /auth/token login works; without password it requires OAuth."""
# Bootstrap with a password
fresh_client.post("/auth/bootstrap", json={
"email": "admin@test.com",
"password": "adminpass123",
})
# Normal password login succeeds
resp = fresh_client.post("/auth/token", json={
"email": "admin@test.com",
"password": "adminpass123",
})
assert resp.status_code == 200
assert resp.json()["role"] == "admin"
def test_bootstrap_no_password_token_rejected(self, fresh_client):
"""After passwordless bootstrap, /auth/token must reject the user (OAuth-only flow)."""
fresh_client.post("/auth/bootstrap", json={
"email": "admin@test.com",
})
resp = fresh_client.post("/auth/token", json={
"email": "admin@test.com",
})
assert resp.status_code == 401
def test_bootstrap_second_call_fails(self, fresh_client):
"""Second bootstrap call fails — endpoint self-deactivates."""
fresh_client.post("/auth/bootstrap", json={"email": "admin@test.com"})
resp = fresh_client.post("/auth/bootstrap", json={"email": "second@test.com"})
assert resp.status_code == 403
def test_full_agent_flow(self, fresh_client):
"""Simulate full AI agent deployment flow."""
# 1. Health check (no auth)
resp = fresh_client.get("/api/health")
assert resp.status_code == 200
assert resp.json()["status"] == "healthy"
# 2. Bootstrap admin
resp = fresh_client.post("/auth/bootstrap", json={
"email": "agent@company.com", "name": "AI Agent",
})
assert resp.status_code == 200
token = resp.json()["access_token"]
headers = {"Authorization": f"Bearer {token}"}
# 3. Check manifest (empty, no data yet)
resp = fresh_client.get("/api/sync/manifest", headers=headers)
assert resp.status_code == 200
assert len(resp.json()["tables"]) == 0
# 4. List users
resp = fresh_client.get("/api/users", headers=headers)
assert resp.status_code == 200
assert len(resp.json()) == 1
# 5. Add analyst user
resp = fresh_client.post("/api/users", json={
"email": "analyst@company.com", "name": "Analyst",
}, headers=headers)
assert resp.status_code == 201
# 6. Verify
resp = fresh_client.get("/api/health")
assert resp.json()["services"]["users"]["count"] == 2