"""Tests for auth providers — password, email magic link, google OAuth.""" import os import pytest from fastapi.testclient import TestClient @pytest.fixture def client(tmp_path, monkeypatch): monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") from app.main import create_app from src.db import get_system_db from src.repositories.users import UserRepository conn = get_system_db() ur = UserRepository(conn) # User with password try: from argon2 import PasswordHasher ph = PasswordHasher() pw_hash = ph.hash("testpass123") except ImportError: import hashlib pw_hash = hashlib.sha256(b"testpass123").hexdigest() ur.create(id="pw1", email="pw@test.com", name="PW User", password_hash=pw_hash) # User with setup token (and fresh created timestamp so the JSON /setup # endpoint's TTL check accepts it) from datetime import datetime, timezone ur.create(id="setup1", email="setup@test.com", name="Setup User") ur.update(id="setup1", setup_token="setup-token-123", setup_token_created=datetime.now(timezone.utc)) # User for magic link ur.create(id="ml1", email="ml@test.com", name="ML User") conn.close() app = create_app() return TestClient(app) class TestTokenEndpoint: """Tests for /auth/token — password bypass fix.""" def test_token_empty_password_rejected_when_user_has_hash(self, client): """Empty password must be rejected when user has password_hash.""" resp = client.post("/auth/token", json={"email": "pw@test.com", "password": ""}) assert resp.status_code == 401 def test_token_missing_password_rejected_when_user_has_hash(self, client): """Omitting password field (defaults to '') must be rejected when user has password_hash.""" resp = client.post("/auth/token", json={"email": "pw@test.com"}) assert resp.status_code == 401 def test_token_wrong_password_rejected(self, client): """Wrong password must be rejected with 401.""" resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "wrongpass"}) assert resp.status_code == 401 def test_token_correct_password_succeeds(self, client): """Correct password must issue a token.""" resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "testpass123"}) assert resp.status_code == 200 data = resp.json() assert "access_token" in data assert data["email"] == "pw@test.com" def test_token_no_password_hash_user_gets_token(self, client): """User without password_hash (OAuth-only) must be rejected at /auth/token.""" resp = client.post("/auth/token", json={"email": "ml@test.com"}) assert resp.status_code == 401 def test_token_rejected_for_oauth_only_user(self, client): """OAuth-only user (no password_hash) must not receive a token via /auth/token.""" resp = client.post("/auth/token", json={"email": "ml@test.com"}) assert resp.status_code == 401 assert "external authentication" in resp.json()["detail"] def test_token_unknown_user_rejected(self, client): """Unknown email must return 401.""" resp = client.post("/auth/token", json={"email": "nobody@test.com", "password": "anything"}) assert resp.status_code == 401 class TestPasswordAuth: def test_login_success(self, client): resp = client.post("/auth/password/login", json={ "email": "pw@test.com", "password": "testpass123", }) assert resp.status_code == 200 assert "access_token" in resp.json() def test_login_wrong_password(self, client): resp = client.post("/auth/password/login", json={ "email": "pw@test.com", "password": "wrongpass", }) assert resp.status_code == 401 def test_login_unknown_user(self, client): resp = client.post("/auth/password/login", json={ "email": "unknown@test.com", "password": "test", }) assert resp.status_code == 401 def test_setup_password(self, client): resp = client.post("/auth/password/setup", json={ "email": "setup@test.com", "token": "setup-token-123", "password": "newpass456", }) assert resp.status_code == 200 assert "access_token" in resp.json() def test_setup_wrong_token(self, client): resp = client.post("/auth/password/setup", json={ "email": "setup@test.com", "token": "wrong-token", "password": "newpass", }) assert resp.status_code == 400 class TestEmailAuth: def test_send_link_registered(self, client): resp = client.post("/auth/email/send-link", json={"email": "ml@test.com"}) assert resp.status_code == 200 # Always returns same message (anti-enumeration) assert "If this email" in resp.json()["message"] def test_send_link_unregistered(self, client): resp = client.post("/auth/email/send-link", json={"email": "nobody@test.com"}) assert resp.status_code == 200 assert "If this email" in resp.json()["message"] def test_verify_invalid_token(self, client): resp = client.post("/auth/email/verify", json={ "email": "ml@test.com", "token": "invalid", }) assert resp.status_code == 401 def test_concurrent_verify_only_one_wins(self, client): """Two concurrent magic-link verifies — exactly one must succeed (M10).""" from concurrent.futures import ThreadPoolExecutor, as_completed from src.db import get_system_db from src.repositories.users import UserRepository # Create a user and set a magic-link token conn = get_system_db() repo = UserRepository(conn) repo.create(id="ml-user-1", email="concurrent@test.com", name="Test") token = "tok_concurrent_test_12345" from datetime import datetime, timezone repo.update(id="ml-user-1", reset_token=token, reset_token_created=datetime.now(timezone.utc)) conn.close() results = [] barrier = __import__("threading").Barrier(2, timeout=5) def verify(): barrier.wait() # ensure both threads hit the endpoint simultaneously resp = client.post("/auth/email/verify", json={ "email": "concurrent@test.com", "token": token, }) results.append(resp.status_code) with ThreadPoolExecutor(max_workers=2) as pool: futures = [pool.submit(verify) for _ in range(2)] # Collect results (re-raise any exceptions) for f in as_completed(futures): f.result() # Exactly one must succeed (200), the other must fail (401) successes = results.count(200) failures = results.count(401) assert successes == 1, f"Expected exactly 1 success, got {successes} (results: {results})" assert failures == 1, f"Expected exactly 1 failure, got {failures} (results: {results})" class TestGoogleOAuth: def test_google_login_not_configured(self, client): """Without GOOGLE_CLIENT_ID, should redirect to login with error.""" resp = client.get("/auth/google/login", follow_redirects=False) assert resp.status_code == 302 or resp.status_code == 307 assert "error" in resp.headers.get("location", "") @pytest.mark.skip(reason="v12: _fetch_google_groups removed; group sync now uses ADC via app.auth.group_sync.fetch_user_groups. Rewrite for the new module.") class TestGoogleGroupsFetch: """Unit tests for _fetch_google_groups — the helper must be tolerant of every realistic failure mode (non-Workspace tenants return 403, expired tokens return 401, network errors bubble from httpx) and never raise.""" def test_parses_groups_from_success_response(self, monkeypatch): import asyncio from app.auth.providers import google as gp # searchTransitiveGroups returns {"memberships": [...]}, not {"groups": [...]}. # Each item carries the group identity in groupKey.id + displayName, # matching the actual API response shape. fake_payload = { "memberships": [ { "group": "groups/abc123", "groupKey": {"id": "team-eng@example.com"}, "displayName": "Engineering", }, { "group": "groups/def456", "groupKey": {"id": "everyone@example.com"}, # No displayName — falls back to id }, ], } class _Resp: status_code = 200 text = "" def json(self): return fake_payload class _FakeClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): return False async def get(self, url, params=None, headers=None): return _Resp() monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient) groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com")) assert groups == [ {"id": "team-eng@example.com", "name": "Engineering"}, {"id": "everyone@example.com", "name": "everyone@example.com"}, ] def test_returns_empty_on_403(self, monkeypatch): """Cloud Identity not enabled (non-Workspace tenant) → 403 → [] + warning.""" import asyncio from app.auth.providers import google as gp class _Resp: status_code = 403 text = "Cloud Identity API has not been enabled" class _FakeClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): return False async def get(self, url, params=None, headers=None): return _Resp() monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient) groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com")) assert groups == [] def test_returns_empty_on_exception(self, monkeypatch): """Network error inside httpx must be swallowed, not propagated.""" import asyncio from app.auth.providers import google as gp class _FakeClient: def __init__(self, *a, **kw): pass async def __aenter__(self): return self async def __aexit__(self, *a): return False async def get(self, *a, **kw): raise RuntimeError("boom") monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient) groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com")) assert groups == [] class TestLocalDevGroupsParser: """Unit tests for get_local_dev_groups() — must tolerate every malformed input shape (typos, wrong type, missing id) and never raise. Bad input becomes [] + a WARNING log so the dev mock can't break the dev flow.""" def test_returns_empty_when_unset(self, monkeypatch): from app.auth.dependencies import get_local_dev_groups monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False) assert get_local_dev_groups() == [] def test_returns_empty_when_blank(self, monkeypatch): from app.auth.dependencies import get_local_dev_groups monkeypatch.setenv("LOCAL_DEV_GROUPS", " ") assert get_local_dev_groups() == [] def test_parses_valid_json_array(self, monkeypatch): from app.auth.dependencies import get_local_dev_groups monkeypatch.setenv( "LOCAL_DEV_GROUPS", '[{"id":"eng@x.com","name":"Engineering"},' '{"id":"admins@x.com","name":"Admins"}]', ) assert get_local_dev_groups() == [ {"id": "eng@x.com", "name": "Engineering"}, {"id": "admins@x.com", "name": "Admins"}, ] def test_defaults_name_to_id(self, monkeypatch): from app.auth.dependencies import get_local_dev_groups monkeypatch.setenv("LOCAL_DEV_GROUPS", '[{"id":"eng@x.com"}]') assert get_local_dev_groups() == [{"id": "eng@x.com", "name": "eng@x.com"}] def test_preserves_extra_fields(self, monkeypatch): """Forward-compat: unknown fields like roles/labels survive parsing so future group-aware code can be exercised in dev without parser changes.""" from app.auth.dependencies import get_local_dev_groups monkeypatch.setenv( "LOCAL_DEV_GROUPS", '[{"id":"eng@x.com","name":"Eng","roles":["MEMBER","OWNER"]}]', ) result = get_local_dev_groups() assert result == [ {"id": "eng@x.com", "name": "Eng", "roles": ["MEMBER", "OWNER"]}, ] def test_returns_empty_on_invalid_json(self, monkeypatch): from app.auth.dependencies import get_local_dev_groups monkeypatch.setenv("LOCAL_DEV_GROUPS", "not-json,foo") assert get_local_dev_groups() == [] def test_returns_empty_on_non_list(self, monkeypatch): from app.auth.dependencies import get_local_dev_groups monkeypatch.setenv("LOCAL_DEV_GROUPS", '{"id":"eng@x.com"}') assert get_local_dev_groups() == [] def test_skips_items_without_id(self, monkeypatch): """Bad items are dropped, valid siblings survive — partial config still produces something useful instead of nuking the whole list.""" from app.auth.dependencies import get_local_dev_groups monkeypatch.setenv( "LOCAL_DEV_GROUPS", '[{"name":"no-id"},{"id":"eng@x.com","name":"Eng"},"string-not-object"]', ) assert get_local_dev_groups() == [{"id": "eng@x.com", "name": "Eng"}] @pytest.mark.skip(reason="v12: session.google_groups + /me/profile group rendering removed; profile now reads user_group_members. Rewrite to assert membership rows instead.") class TestLocalDevGroupsInjection: """End-to-end: with LOCAL_DEV_MODE=1 + LOCAL_DEV_GROUPS, the seeded dev user's session.google_groups gets populated on first authenticated request so /me/profile renders the mocked groups.""" @pytest.fixture def dev_client(self, tmp_path, monkeypatch): monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!") monkeypatch.setenv("LOCAL_DEV_MODE", "1") monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost") monkeypatch.setenv( "LOCAL_DEV_GROUPS", '[{"id":"local-dev-engineers@example.com","name":"Local Dev Engineers"}]', ) from app.main import create_app return TestClient(create_app()) def test_dev_user_sees_mocked_groups_on_profile(self, dev_client): resp = dev_client.get("/me/profile") assert resp.status_code == 200 body = resp.text assert "local-dev-engineers@example.com" in body assert "Local Dev Engineers" in body assert "No Google groups available" not in body def test_empty_LOCAL_DEV_GROUPS_falls_back_to_empty_state( self, tmp_path, monkeypatch ): monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") monkeypatch.setenv("LOCAL_DEV_MODE", "1") monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False) from app.main import create_app client = TestClient(create_app()) resp = client.get("/me/profile") assert resp.status_code == 200 assert "No Google groups available" in resp.text class TestLocalDevGroupsStartupValidation: """Startup banner reports on LOCAL_DEV_GROUPS so a typo or malformed JSON is loud at boot, not silent until the first authenticated request.""" def _capture_startup_logs(self, tmp_path, monkeypatch, caplog, env_value): import logging monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") monkeypatch.setenv("LOCAL_DEV_MODE", "1") if env_value is None: monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False) else: monkeypatch.setenv("LOCAL_DEV_GROUPS", env_value) from app.main import create_app with caplog.at_level(logging.WARNING, logger="app.main"): create_app() return caplog.text def test_logs_count_and_ids_on_valid_input(self, tmp_path, monkeypatch, caplog): text = self._capture_startup_logs( tmp_path, monkeypatch, caplog, '[{"id":"a@x.com","name":"A"},{"id":"b@x.com","name":"B"}]', ) assert "mocking 2 group(s)" in text assert "a@x.com" in text assert "b@x.com" in text def test_warns_when_set_but_malformed(self, tmp_path, monkeypatch, caplog): text = self._capture_startup_logs( tmp_path, monkeypatch, caplog, "not-valid-json", ) assert "produced no valid groups" in text def test_logs_unset_explicitly(self, tmp_path, monkeypatch, caplog): text = self._capture_startup_logs(tmp_path, monkeypatch, caplog, None) assert "LOCAL_DEV_GROUPS is unset" in text class TestCookieAuth: def test_web_ui_with_cookie(self, client): """Test that web UI routes accept JWT from cookie.""" from app.auth.jwt import create_access_token from src.db import get_system_db from src.repositories.users import UserRepository conn = get_system_db() ur = UserRepository(conn) # Use existing user user = ur.get_by_email("pw@test.com") conn.close() token = create_access_token(user["id"], user["email"]) # Set cookie and access dashboard client.cookies.set("access_token", token) resp = client.get("/dashboard") # Should not be 401 — cookie auth works assert resp.status_code != 401 @pytest.mark.skip(reason="v12: callback writes user_group_members instead of users.groups JSON. Rewrite assertions for the new schema.") class TestGoogleCallbackGroupSync: """Google OAuth callback populates users.groups from Workspace. The real google.py module captures GOOGLE_CLIENT_ID/SECRET at import time and conditionally registers `oauth.google`. For tests we: 1. Patch `is_available` so the callback's early-return guard doesn't fire 2. Stub `oauth.google.authorize_access_token` with an AsyncMock 3. Stub `fetch_user_groups` at the import site (app.auth.providers.google) to return a fixed list — no real Google traffic """ @pytest.fixture def google_app(self, tmp_path, monkeypatch): import json as _json from unittest.mock import AsyncMock from types import SimpleNamespace monkeypatch.setenv("DATA_DIR", str(tmp_path)) monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") from app.main import create_app import app.auth.providers.google as g_mod # (1) bypass the is_available guard monkeypatch.setattr(g_mod, "is_available", lambda: True) # (2) fake oauth.google with async authorize_access_token fake_oauth_google = SimpleNamespace( authorize_access_token=AsyncMock( return_value={ "userinfo": { "email": "tester@groupon.com", "name": "Tester", } } ) ) monkeypatch.setattr(g_mod.oauth, "google", fake_oauth_google, raising=False) # (3) fake fetch_user_groups — also patches the import inside # google_callback because it does `from app.auth.group_sync import fetch_user_groups` # inside the function body, so patching the source module is enough. import app.auth.group_sync as gs_mod monkeypatch.setattr( gs_mod, "fetch_user_groups", lambda email: ["grp_a@groupon.com", "grp_b@groupon.com"], ) app = create_app() client = TestClient(app, follow_redirects=False) return {"client": client, "json": _json} def test_callback_creates_user_with_groups(self, google_app): """First-time login → user row + groups populated + two user_groups rows.""" c = google_app["client"] _json = google_app["json"] resp = c.get("/auth/google/callback?code=x&state=y") assert resp.status_code == 302 assert resp.headers["location"] == "/dashboard" # access_token cookie set assert "access_token" in resp.cookies from src.db import get_system_db from src.repositories.users import UserRepository from src.repositories.user_groups import UserGroupsRepository conn = get_system_db() try: user = UserRepository(conn).get_by_email("tester@groupon.com") assert user is not None assert user["role"] == "analyst" assert _json.loads(user["groups"]) == [ "grp_a@groupon.com", "grp_b@groupon.com", ] names = {g["name"] for g in UserGroupsRepository(conn).list_all()} assert "grp_a@groupon.com" in names assert "grp_b@groupon.com" in names # non-system flag row = UserGroupsRepository(conn).get_by_name("grp_a@groupon.com") assert row["is_system"] is False assert row["created_by"] == "system:google-sync" finally: conn.close() def test_callback_updates_groups_on_relogin(self, google_app, monkeypatch): """Second login with a different group set overwrites the first.""" c = google_app["client"] _json = google_app["json"] # First login — default stub returns [a, b] c.get("/auth/google/callback?code=x&state=y") # Swap the mock to return a single, different group on the next call import app.auth.group_sync as gs_mod monkeypatch.setattr( gs_mod, "fetch_user_groups", lambda email: ["grp_c@groupon.com"] ) c.get("/auth/google/callback?code=x&state=y") from src.db import get_system_db from src.repositories.users import UserRepository conn = get_system_db() try: user = UserRepository(conn).get_by_email("tester@groupon.com") assert _json.loads(user["groups"]) == ["grp_c@groupon.com"] finally: conn.close() def test_callback_fails_soft_on_group_sync_exception(self, google_app, monkeypatch): """An exception inside fetch_user_groups does not block the login.""" c = google_app["client"] _json = google_app["json"] def raise_boom(email): raise RuntimeError("Google API is down") import app.auth.group_sync as gs_mod monkeypatch.setattr(gs_mod, "fetch_user_groups", raise_boom) resp = c.get("/auth/google/callback?code=x&state=y") # Login still proceeds, redirect to dashboard with token cookie assert resp.status_code == 302 assert resp.headers["location"] == "/dashboard" assert "access_token" in resp.cookies from src.db import get_system_db from src.repositories.users import UserRepository conn = get_system_db() try: user = UserRepository(conn).get_by_email("tester@groupon.com") assert user is not None # groups stays NULL (no previous value either) assert user["groups"] is None finally: conn.close() def test_callback_empty_groups_does_not_overwrite_existing(self, google_app, monkeypatch): """fetch_user_groups returning [] means 'no data' — don't wipe existing groups on a transient failure masked as empty.""" c = google_app["client"] _json = google_app["json"] # First login populates groups c.get("/auth/google/callback?code=x&state=y") # Second login: Google returns empty import app.auth.group_sync as gs_mod monkeypatch.setattr(gs_mod, "fetch_user_groups", lambda email: []) c.get("/auth/google/callback?code=x&state=y") from src.db import get_system_db from src.repositories.users import UserRepository conn = get_system_db() try: user = UserRepository(conn).get_by_email("tester@groupon.com") # Previous groups preserved assert _json.loads(user["groups"]) == [ "grp_a@groupon.com", "grp_b@groupon.com", ] finally: conn.close() class TestEmailMagicLinkTTL: """Tests for email magic link token expiry and replay prevention.""" def test_expired_magic_link_rejected(self, client): """A magic link token older than MAGIC_LINK_EXPIRY must be rejected.""" from src.db import get_system_db from src.repositories.users import UserRepository from datetime import datetime, timezone, timedelta conn = get_system_db() repo = UserRepository(conn) repo.create(id="expired-user", email="expired@test.com", name="Expired") # Set token with old timestamp (beyond 1-hour TTL) old_time = datetime.now(timezone.utc) - timedelta(hours=2) repo.update(id="expired-user", reset_token="expired-token-123", reset_token_created=old_time) conn.close() resp = client.post("/auth/email/verify", json={ "email": "expired@test.com", "token": "expired-token-123", }) assert resp.status_code == 401 def test_token_reuse_prevented(self, client): """A consumed magic link token cannot be used again.""" from src.db import get_system_db from src.repositories.users import UserRepository from datetime import datetime, timezone conn = get_system_db() repo = UserRepository(conn) repo.create(id="reuse-user", email="reuse@test.com", name="Reuse") token = "reusable-token-456" repo.update(id="reuse-user", reset_token=token, reset_token_created=datetime.now(timezone.utc)) conn.close() # First use should succeed resp1 = client.post("/auth/email/verify", json={ "email": "reuse@test.com", "token": token, }) assert resp1.status_code == 200 # Second use must fail resp2 = client.post("/auth/email/verify", json={ "email": "reuse@test.com", "token": token, }) assert resp2.status_code == 401 def test_invalid_signature_token_rejected(self, client): """A token that doesn't match any stored value must be rejected.""" from src.db import get_system_db from src.repositories.users import UserRepository from datetime import datetime, timezone conn = get_system_db() repo = UserRepository(conn) repo.create(id="sig-user", email="sig@test.com", name="Sig") repo.update(id="sig-user", reset_token="real-token-789", reset_token_created=datetime.now(timezone.utc)) conn.close() resp = client.post("/auth/email/verify", json={ "email": "sig@test.com", "token": "wrong-token-xyz", }) assert resp.status_code == 401 @pytest.mark.skip(reason="Authlib OAuth internals require complex async mock; group sync is tested via unit tests and integration. Full E2E OAuth flow needs real Google credentials or dedicated mock infrastructure.") class TestGoogleOAuthFullFlow: """Tests for Google OAuth callback with mocked token exchange and group sync. These tests require mocking authlib's internal OAuth client which involves async Starlette session middleware. The group sync logic is covered by unit tests for fetch_user_groups and the existing TestGoogleCallbackGroupSync. """ def test_google_callback_creates_new_user(self, tmp_path, monkeypatch): """Google OAuth callback must create a new user if not found.""" pass def test_google_callback_syncs_group_memberships(self, tmp_path, monkeypatch): """Google OAuth callback must sync Workspace groups into user_group_members.""" pass def test_google_callback_existing_user_not_duplicated(self, tmp_path, monkeypatch): """Re-login via Google OAuth must not duplicate the user.""" pass def test_google_callback_api_error_handled(self, tmp_path, monkeypatch): """Google OAuth callback must handle API errors gracefully.""" pass