agnes-the-ai-analyst/tests/test_auth_providers.py
ZdenekSrotyr 5f6bb7a4b2
fix(security+ops) + release(0.12.1): #82 #85 #87 hardening + cut 0.12.1 (#104)
* fix(security+ops): #82 #85 #87 — auth hardening, API validation, deploy posture

Security and operational hardening across three issue groups:

- M23: docker-compose.override.yml → docker-compose.dev.yml (BREAKING, prod foot-gun)
- C13: Container runs as non-root user 'agnes' (USER directive in Dockerfile)
- M21: Docker resource limits (mem_limit, cpus) on app + scheduler
- M22: Caddyfile security headers (X-Frame-Options, X-Content-Type-Options, Referrer-Policy, -Server)
- M17: /api/health split into minimal (unauth) + /api/health/detailed (auth) (BREAKING)
- M26: release.yml restricts build-and-push to main + workflow_dispatch; paths-ignore for docs

- C2: table_id traversal validation on /api/data/{table_id}/download
- M4: Upload streaming (chunk-read + temp file) instead of full-buffer; /local-md hashed filename

- C5: reset_token removed from POST /api/users/{id}/reset-password response
- C8: Startup WARNING when no user has password_hash (bootstrap window visible)
- M9: Audit log on failed web form login (mirrors /auth/token endpoint)
- M10: Atomic magic-link consume via compare-and-swap (CONSUMED: marker + DuckDB conflict catch)

Also: SSRF protection on /api/admin/configure (#46), memory stats SQL aggregation (#90)

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>

* fix(review): SSRF 169.254.x.x + IPv6 multicast; M10 marker cleanup safety

Review fixes:
- Add 169.254.0.0/16 (link-local, cloud metadata) to SSRF regex — was
  missing, allowing requests to AWS/GCP/Azure metadata endpoints
- Add ff[0-9a-f]{2}: (IPv6 multicast) to SSRF regex
- M10: wrap Step 3 (CONSUMED marker cleanup) in try-except with
  warning log — prevents unhandled exception if DB write fails after
  successful token consumption
- Add test for 169.254.169.254 SSRF rejection

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>

* fix(review): SSRF IPv6 bypass, CLI health endpoint, upload FD leak

Address Devin Review findings on PR #104:

1. SSRF IPv6 bypass: Replace hostname regex with DNS resolution +
   ipaddress module checks. The old regex patterns like `fe80:` only
   matched up to the first colon, missing real IPv6 addresses like
   `fe80::1`, `fc00::1`, `ff02::1`. The new approach resolves the
   hostname via getaddrinfo and checks each resulting IP against
   ipaddress.is_private/is_loopback/is_link_local/is_reserved/is_multicast.

2. CLI commands broken: `da setup test-connection`, `da setup verify`,
   `da diagnose`, `da status` all called /api/health expecting the old
   format (status=="healthy", services dict). Now they call
   /api/health/detailed for service-level checks (with graceful fallback
   to the minimal endpoint when auth is not configured).

3. Temp file handle leak: _stream_to_temp returns an open
   NamedTemporaryFile; callers now close it before shutil.move() to
   prevent FD leaks until GC.

Also adds IPv6 SSRF test cases (loopback, link-local, unique-local,
multicast) with mocked DNS resolution for test environment independence.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>

* fix(review): download regex blocks hyphenated IDs; document health split

Address Devin Review round-3 findings on PR #104:

1. _SAFE_IDENTIFIER regex blocked hyphenated table IDs: The download
   endpoint used the strict SQL-identifier regex which does not allow
   dots or hyphens, but Keboola table IDs like in.c-crm.orders
   contain both. Switched to _SAFE_QUOTED_IDENTIFIER which allows dots
   and hyphens while still blocking path-traversal chars (/, .., \)
   and quote/control characters. Added test for hyphenated/dotted IDs.

2. Documented health endpoint split in DEPLOYMENT.md: Added Health
   checks & external monitoring section explaining both endpoints
   (minimal unauth /api/health vs authenticated /api/health/detailed)
   and how to wire external monitoring tools to the detailed endpoint
   with a PAT.

Generated with [Devin](https://cli.devin.ai/docs)

Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>

* release(0.12.1): cut hotfix for snapshot integrity + #82/#85/#87 hardening

* fix(security): apply CAS pattern to password reset confirm (#82/M10 follow-up)

Devin review on the rebased PR flagged the asymmetry: magic-link verify
got the atomic compare-and-swap pattern in the original M10 fix, but
password reset confirm at /auth/password/reset/confirm was still using
read-validate-clear. Two concurrent POSTs with the same valid reset
token could both succeed in setting different new passwords (last-write-
wins). Lower severity than the magic-link race because the attacker
would need the reset token AND to race the legitimate user, but the
asymmetry was a polish gap.

Mirrors app/auth/providers/email.py::_consume_token CAS exactly: write
unique CONSUMED:<random> marker via UPDATE...WHERE token=old_token, then
SELECT to verify our marker won, then proceed. Only the winner clears
the marker and applies the password change.

New regression test_concurrent_reset_only_one_wins in
tests/test_password_flows.py::TestResetConfirm pins the contract: two
ThreadPoolExecutor workers + Barrier hit /reset/confirm with the same
token; exactly one gets 302 (password applied), the other gets 200 with
'Invalid or expired'. Sanity-checked against the pre-CAS code — both
POSTs got 302 (race confirmed).

---------

Co-authored-by: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
2026-04-28 19:57:30 +02:00

612 lines
25 KiB
Python

"""Tests for auth providers — password, email magic link, google OAuth."""
import os
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def client(tmp_path, monkeypatch):
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
from app.main import create_app
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
ur = UserRepository(conn)
# User with password
try:
from argon2 import PasswordHasher
ph = PasswordHasher()
pw_hash = ph.hash("testpass123")
except ImportError:
import hashlib
pw_hash = hashlib.sha256(b"testpass123").hexdigest()
ur.create(id="pw1", email="pw@test.com", name="PW User", role="analyst", password_hash=pw_hash)
# User with setup token (and fresh created timestamp so the JSON /setup
# endpoint's TTL check accepts it)
from datetime import datetime, timezone
ur.create(id="setup1", email="setup@test.com", name="Setup User", role="analyst")
ur.update(id="setup1", setup_token="setup-token-123",
setup_token_created=datetime.now(timezone.utc))
# User for magic link
ur.create(id="ml1", email="ml@test.com", name="ML User", role="analyst")
conn.close()
app = create_app()
return TestClient(app)
class TestTokenEndpoint:
"""Tests for /auth/token — password bypass fix."""
def test_token_empty_password_rejected_when_user_has_hash(self, client):
"""Empty password must be rejected when user has password_hash."""
resp = client.post("/auth/token", json={"email": "pw@test.com", "password": ""})
assert resp.status_code == 401
def test_token_missing_password_rejected_when_user_has_hash(self, client):
"""Omitting password field (defaults to '') must be rejected when user has password_hash."""
resp = client.post("/auth/token", json={"email": "pw@test.com"})
assert resp.status_code == 401
def test_token_wrong_password_rejected(self, client):
"""Wrong password must be rejected with 401."""
resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "wrongpass"})
assert resp.status_code == 401
def test_token_correct_password_succeeds(self, client):
"""Correct password must issue a token."""
resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "testpass123"})
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert data["email"] == "pw@test.com"
def test_token_no_password_hash_user_gets_token(self, client):
"""User without password_hash (OAuth-only) must be rejected at /auth/token."""
resp = client.post("/auth/token", json={"email": "ml@test.com"})
assert resp.status_code == 401
def test_token_rejected_for_oauth_only_user(self, client):
"""OAuth-only user (no password_hash) must not receive a token via /auth/token."""
resp = client.post("/auth/token", json={"email": "ml@test.com"})
assert resp.status_code == 401
assert "external authentication" in resp.json()["detail"]
def test_token_unknown_user_rejected(self, client):
"""Unknown email must return 401."""
resp = client.post("/auth/token", json={"email": "nobody@test.com", "password": "anything"})
assert resp.status_code == 401
class TestPasswordAuth:
def test_login_success(self, client):
resp = client.post("/auth/password/login", json={
"email": "pw@test.com", "password": "testpass123",
})
assert resp.status_code == 200
assert "access_token" in resp.json()
def test_login_wrong_password(self, client):
resp = client.post("/auth/password/login", json={
"email": "pw@test.com", "password": "wrongpass",
})
assert resp.status_code == 401
def test_login_unknown_user(self, client):
resp = client.post("/auth/password/login", json={
"email": "unknown@test.com", "password": "test",
})
assert resp.status_code == 401
def test_setup_password(self, client):
resp = client.post("/auth/password/setup", json={
"email": "setup@test.com", "token": "setup-token-123", "password": "newpass456",
})
assert resp.status_code == 200
assert "access_token" in resp.json()
def test_setup_wrong_token(self, client):
resp = client.post("/auth/password/setup", json={
"email": "setup@test.com", "token": "wrong-token", "password": "newpass",
})
assert resp.status_code == 400
class TestEmailAuth:
def test_send_link_registered(self, client):
resp = client.post("/auth/email/send-link", json={"email": "ml@test.com"})
assert resp.status_code == 200
# Always returns same message (anti-enumeration)
assert "If this email" in resp.json()["message"]
def test_send_link_unregistered(self, client):
resp = client.post("/auth/email/send-link", json={"email": "nobody@test.com"})
assert resp.status_code == 200
assert "If this email" in resp.json()["message"]
def test_verify_invalid_token(self, client):
resp = client.post("/auth/email/verify", json={
"email": "ml@test.com", "token": "invalid",
})
assert resp.status_code == 401
def test_concurrent_verify_only_one_wins(self, client):
"""Two concurrent magic-link verifies — exactly one must succeed (M10)."""
from concurrent.futures import ThreadPoolExecutor, as_completed
from src.db import get_system_db
from src.repositories.users import UserRepository
# Create a user and set a magic-link token
conn = get_system_db()
repo = UserRepository(conn)
repo.create(id="ml-user-1", email="concurrent@test.com", name="Test", role="viewer")
token = "tok_concurrent_test_12345"
from datetime import datetime, timezone
repo.update(id="ml-user-1", reset_token=token, reset_token_created=datetime.now(timezone.utc))
conn.close()
results = []
barrier = __import__("threading").Barrier(2, timeout=5)
def verify():
barrier.wait() # ensure both threads hit the endpoint simultaneously
resp = client.post("/auth/email/verify", json={
"email": "concurrent@test.com", "token": token,
})
results.append(resp.status_code)
with ThreadPoolExecutor(max_workers=2) as pool:
futures = [pool.submit(verify) for _ in range(2)]
# Collect results (re-raise any exceptions)
for f in as_completed(futures):
f.result()
# Exactly one must succeed (200), the other must fail (401)
successes = results.count(200)
failures = results.count(401)
assert successes == 1, f"Expected exactly 1 success, got {successes} (results: {results})"
assert failures == 1, f"Expected exactly 1 failure, got {failures} (results: {results})"
class TestGoogleOAuth:
def test_google_login_not_configured(self, client):
"""Without GOOGLE_CLIENT_ID, should redirect to login with error."""
resp = client.get("/auth/google/login", follow_redirects=False)
assert resp.status_code == 302 or resp.status_code == 307
assert "error" in resp.headers.get("location", "")
@pytest.mark.skip(reason="v12: _fetch_google_groups removed; group sync now uses ADC via app.auth.group_sync.fetch_user_groups. Rewrite for the new module.")
class TestGoogleGroupsFetch:
"""Unit tests for _fetch_google_groups — the helper must be tolerant of
every realistic failure mode (non-Workspace tenants return 403, expired
tokens return 401, network errors bubble from httpx) and never raise."""
def test_parses_groups_from_success_response(self, monkeypatch):
import asyncio
from app.auth.providers import google as gp
# searchTransitiveGroups returns {"memberships": [...]}, not {"groups": [...]}.
# Each item carries the group identity in groupKey.id + displayName,
# matching the actual API response shape.
fake_payload = {
"memberships": [
{
"group": "groups/abc123",
"groupKey": {"id": "team-eng@example.com"},
"displayName": "Engineering",
},
{
"group": "groups/def456",
"groupKey": {"id": "everyone@example.com"},
# No displayName — falls back to id
},
],
}
class _Resp:
status_code = 200
text = ""
def json(self):
return fake_payload
class _FakeClient:
def __init__(self, *a, **kw):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *a):
return False
async def get(self, url, params=None, headers=None):
return _Resp()
monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient)
groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com"))
assert groups == [
{"id": "team-eng@example.com", "name": "Engineering"},
{"id": "everyone@example.com", "name": "everyone@example.com"},
]
def test_returns_empty_on_403(self, monkeypatch):
"""Cloud Identity not enabled (non-Workspace tenant) → 403 → [] + warning."""
import asyncio
from app.auth.providers import google as gp
class _Resp:
status_code = 403
text = "Cloud Identity API has not been enabled"
class _FakeClient:
def __init__(self, *a, **kw): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
async def get(self, url, params=None, headers=None):
return _Resp()
monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient)
groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com"))
assert groups == []
def test_returns_empty_on_exception(self, monkeypatch):
"""Network error inside httpx must be swallowed, not propagated."""
import asyncio
from app.auth.providers import google as gp
class _FakeClient:
def __init__(self, *a, **kw): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
async def get(self, *a, **kw):
raise RuntimeError("boom")
monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient)
groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com"))
assert groups == []
class TestLocalDevGroupsParser:
"""Unit tests for get_local_dev_groups() — must tolerate every malformed
input shape (typos, wrong type, missing id) and never raise. Bad input
becomes [] + a WARNING log so the dev mock can't break the dev flow."""
def test_returns_empty_when_unset(self, monkeypatch):
from app.auth.dependencies import get_local_dev_groups
monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False)
assert get_local_dev_groups() == []
def test_returns_empty_when_blank(self, monkeypatch):
from app.auth.dependencies import get_local_dev_groups
monkeypatch.setenv("LOCAL_DEV_GROUPS", " ")
assert get_local_dev_groups() == []
def test_parses_valid_json_array(self, monkeypatch):
from app.auth.dependencies import get_local_dev_groups
monkeypatch.setenv(
"LOCAL_DEV_GROUPS",
'[{"id":"eng@x.com","name":"Engineering"},'
'{"id":"admins@x.com","name":"Admins"}]',
)
assert get_local_dev_groups() == [
{"id": "eng@x.com", "name": "Engineering"},
{"id": "admins@x.com", "name": "Admins"},
]
def test_defaults_name_to_id(self, monkeypatch):
from app.auth.dependencies import get_local_dev_groups
monkeypatch.setenv("LOCAL_DEV_GROUPS", '[{"id":"eng@x.com"}]')
assert get_local_dev_groups() == [{"id": "eng@x.com", "name": "eng@x.com"}]
def test_preserves_extra_fields(self, monkeypatch):
"""Forward-compat: unknown fields like roles/labels survive parsing
so future group-aware code can be exercised in dev without parser changes."""
from app.auth.dependencies import get_local_dev_groups
monkeypatch.setenv(
"LOCAL_DEV_GROUPS",
'[{"id":"eng@x.com","name":"Eng","roles":["MEMBER","OWNER"]}]',
)
result = get_local_dev_groups()
assert result == [
{"id": "eng@x.com", "name": "Eng", "roles": ["MEMBER", "OWNER"]},
]
def test_returns_empty_on_invalid_json(self, monkeypatch):
from app.auth.dependencies import get_local_dev_groups
monkeypatch.setenv("LOCAL_DEV_GROUPS", "not-json,foo")
assert get_local_dev_groups() == []
def test_returns_empty_on_non_list(self, monkeypatch):
from app.auth.dependencies import get_local_dev_groups
monkeypatch.setenv("LOCAL_DEV_GROUPS", '{"id":"eng@x.com"}')
assert get_local_dev_groups() == []
def test_skips_items_without_id(self, monkeypatch):
"""Bad items are dropped, valid siblings survive — partial config
still produces something useful instead of nuking the whole list."""
from app.auth.dependencies import get_local_dev_groups
monkeypatch.setenv(
"LOCAL_DEV_GROUPS",
'[{"name":"no-id"},{"id":"eng@x.com","name":"Eng"},"string-not-object"]',
)
assert get_local_dev_groups() == [{"id": "eng@x.com", "name": "Eng"}]
@pytest.mark.skip(reason="v12: session.google_groups + /profile group rendering removed; profile now reads user_group_members. Rewrite to assert membership rows instead.")
class TestLocalDevGroupsInjection:
"""End-to-end: with LOCAL_DEV_MODE=1 + LOCAL_DEV_GROUPS, the seeded dev
user's session.google_groups gets populated on first authenticated request
so /profile renders the mocked groups."""
@pytest.fixture
def dev_client(self, tmp_path, monkeypatch):
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!")
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost")
monkeypatch.setenv(
"LOCAL_DEV_GROUPS",
'[{"id":"local-dev-engineers@example.com","name":"Local Dev Engineers"}]',
)
from app.main import create_app
return TestClient(create_app())
def test_dev_user_sees_mocked_groups_on_profile(self, dev_client):
resp = dev_client.get("/profile")
assert resp.status_code == 200
body = resp.text
assert "local-dev-engineers@example.com" in body
assert "Local Dev Engineers" in body
assert "No Google groups available" not in body
def test_empty_LOCAL_DEV_GROUPS_falls_back_to_empty_state(
self, tmp_path, monkeypatch
):
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False)
from app.main import create_app
client = TestClient(create_app())
resp = client.get("/profile")
assert resp.status_code == 200
assert "No Google groups available" in resp.text
class TestLocalDevGroupsStartupValidation:
"""Startup banner reports on LOCAL_DEV_GROUPS so a typo or malformed JSON
is loud at boot, not silent until the first authenticated request."""
def _capture_startup_logs(self, tmp_path, monkeypatch, caplog, env_value):
import logging
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
if env_value is None:
monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False)
else:
monkeypatch.setenv("LOCAL_DEV_GROUPS", env_value)
from app.main import create_app
with caplog.at_level(logging.WARNING, logger="app.main"):
create_app()
return caplog.text
def test_logs_count_and_ids_on_valid_input(self, tmp_path, monkeypatch, caplog):
text = self._capture_startup_logs(
tmp_path, monkeypatch, caplog,
'[{"id":"a@x.com","name":"A"},{"id":"b@x.com","name":"B"}]',
)
assert "mocking 2 group(s)" in text
assert "a@x.com" in text
assert "b@x.com" in text
def test_warns_when_set_but_malformed(self, tmp_path, monkeypatch, caplog):
text = self._capture_startup_logs(
tmp_path, monkeypatch, caplog, "not-valid-json",
)
assert "produced no valid groups" in text
def test_logs_unset_explicitly(self, tmp_path, monkeypatch, caplog):
text = self._capture_startup_logs(tmp_path, monkeypatch, caplog, None)
assert "LOCAL_DEV_GROUPS is unset" in text
class TestCookieAuth:
def test_web_ui_with_cookie(self, client):
"""Test that web UI routes accept JWT from cookie."""
from app.auth.jwt import create_access_token
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
ur = UserRepository(conn)
# Use existing user
user = ur.get_by_email("pw@test.com")
conn.close()
token = create_access_token(user["id"], user["email"], user["role"])
# Set cookie and access dashboard
client.cookies.set("access_token", token)
resp = client.get("/dashboard")
# Should not be 401 — cookie auth works
assert resp.status_code != 401
@pytest.mark.skip(reason="v12: callback writes user_group_members instead of users.groups JSON. Rewrite assertions for the new schema.")
class TestGoogleCallbackGroupSync:
"""Google OAuth callback populates users.groups from Workspace.
The real google.py module captures GOOGLE_CLIENT_ID/SECRET at import
time and conditionally registers `oauth.google`. For tests we:
1. Patch `is_available` so the callback's early-return guard doesn't fire
2. Stub `oauth.google.authorize_access_token` with an AsyncMock
3. Stub `fetch_user_groups` at the import site (app.auth.providers.google)
to return a fixed list — no real Google traffic
"""
@pytest.fixture
def google_app(self, tmp_path, monkeypatch):
import json as _json
from unittest.mock import AsyncMock
from types import SimpleNamespace
monkeypatch.setenv("DATA_DIR", str(tmp_path))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
from app.main import create_app
import app.auth.providers.google as g_mod
# (1) bypass the is_available guard
monkeypatch.setattr(g_mod, "is_available", lambda: True)
# (2) fake oauth.google with async authorize_access_token
fake_oauth_google = SimpleNamespace(
authorize_access_token=AsyncMock(
return_value={
"userinfo": {
"email": "tester@groupon.com",
"name": "Tester",
}
}
)
)
monkeypatch.setattr(g_mod.oauth, "google", fake_oauth_google, raising=False)
# (3) fake fetch_user_groups — also patches the import inside
# google_callback because it does `from app.auth.group_sync import fetch_user_groups`
# inside the function body, so patching the source module is enough.
import app.auth.group_sync as gs_mod
monkeypatch.setattr(
gs_mod,
"fetch_user_groups",
lambda email: ["grp_a@groupon.com", "grp_b@groupon.com"],
)
app = create_app()
client = TestClient(app, follow_redirects=False)
return {"client": client, "json": _json}
def test_callback_creates_user_with_groups(self, google_app):
"""First-time login → user row + groups populated + two user_groups rows."""
c = google_app["client"]
_json = google_app["json"]
resp = c.get("/auth/google/callback?code=x&state=y")
assert resp.status_code == 302
assert resp.headers["location"] == "/dashboard"
# access_token cookie set
assert "access_token" in resp.cookies
from src.db import get_system_db
from src.repositories.users import UserRepository
from src.repositories.user_groups import UserGroupsRepository
conn = get_system_db()
try:
user = UserRepository(conn).get_by_email("tester@groupon.com")
assert user is not None
assert user["role"] == "analyst"
assert _json.loads(user["groups"]) == [
"grp_a@groupon.com",
"grp_b@groupon.com",
]
names = {g["name"] for g in UserGroupsRepository(conn).list_all()}
assert "grp_a@groupon.com" in names
assert "grp_b@groupon.com" in names
# non-system flag
row = UserGroupsRepository(conn).get_by_name("grp_a@groupon.com")
assert row["is_system"] is False
assert row["created_by"] == "system:google-sync"
finally:
conn.close()
def test_callback_updates_groups_on_relogin(self, google_app, monkeypatch):
"""Second login with a different group set overwrites the first."""
c = google_app["client"]
_json = google_app["json"]
# First login — default stub returns [a, b]
c.get("/auth/google/callback?code=x&state=y")
# Swap the mock to return a single, different group on the next call
import app.auth.group_sync as gs_mod
monkeypatch.setattr(
gs_mod, "fetch_user_groups", lambda email: ["grp_c@groupon.com"]
)
c.get("/auth/google/callback?code=x&state=y")
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
try:
user = UserRepository(conn).get_by_email("tester@groupon.com")
assert _json.loads(user["groups"]) == ["grp_c@groupon.com"]
finally:
conn.close()
def test_callback_fails_soft_on_group_sync_exception(self, google_app, monkeypatch):
"""An exception inside fetch_user_groups does not block the login."""
c = google_app["client"]
_json = google_app["json"]
def raise_boom(email):
raise RuntimeError("Google API is down")
import app.auth.group_sync as gs_mod
monkeypatch.setattr(gs_mod, "fetch_user_groups", raise_boom)
resp = c.get("/auth/google/callback?code=x&state=y")
# Login still proceeds, redirect to dashboard with token cookie
assert resp.status_code == 302
assert resp.headers["location"] == "/dashboard"
assert "access_token" in resp.cookies
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
try:
user = UserRepository(conn).get_by_email("tester@groupon.com")
assert user is not None
# groups stays NULL (no previous value either)
assert user["groups"] is None
finally:
conn.close()
def test_callback_empty_groups_does_not_overwrite_existing(self, google_app, monkeypatch):
"""fetch_user_groups returning [] means 'no data' — don't wipe existing
groups on a transient failure masked as empty."""
c = google_app["client"]
_json = google_app["json"]
# First login populates groups
c.get("/auth/google/callback?code=x&state=y")
# Second login: Google returns empty
import app.auth.group_sync as gs_mod
monkeypatch.setattr(gs_mod, "fetch_user_groups", lambda email: [])
c.get("/auth/google/callback?code=x&state=y")
from src.db import get_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
try:
user = UserRepository(conn).get_by_email("tester@groupon.com")
# Previous groups preserved
assert _json.loads(user["groups"]) == [
"grp_a@groupon.com",
"grp_b@groupon.com",
]
finally:
conn.close()