This squashes 13 commits from ma/staging plus a small docstring translation
into a single coherent unit. Three workstreams.
== RBAC v13 redesign ==
- Drops core.viewer/analyst/km_admin/admin hierarchy and the
internal_roles / group_mappings / user_role_grants / plugin_access tables.
- Replaced by user_group_members + resource_grants. Atomic v12→v13 backfill
wrapped in BEGIN/COMMIT; ROLLBACK leaves schema_version at 12 for retry.
- Two authorization primitives in app.auth.access:
require_admin — Admin-group god-mode
require_resource_access(rt, "{path}") — entity-scoped grants
Single DB lookup per request; no session cache; no implies BFS.
- /admin/access UI (single page) replaces /admin/role-mapping +
/admin/plugin-access. CLI `da admin group/grant *` replaces
`da admin role/mapping/grant-role/revoke-role/effective-roles`.
- ResourceType.TABLE listing-only — admins can record table grants,
runtime enforcement still flows through legacy dataset_permissions
(migration plan in docs/TODO-rbac-data-enforcement.md).
== Claude Code marketplace ==
- Aggregated /marketplace.zip + /marketplace.git/* (PAT-gated,
RBAC-filtered, content-addressed cache via dulwich).
- Admin god-mode dropped on the marketplace surface — admins curate
their own view via grants like everyone else.
- Bare-repo cache materializes per RBAC-filtered ETag; stale entries
not pruned in this iteration (disclaimed in git_backend.py docstring).
== #81 #83 #44 security/ops hardening ==
- #81 Group A — orchestrator ATTACH allow-listing (extension/url/alias).
- #81 Group B — Keboola extractor 3-state exit codes:
0 success / 1 total fail / 2 PARTIAL fail
Sync API logs PARTIAL FAILURE alert on exit 2. Operators with binary
alerting must teach it the new partial signal.
- #81 Group C — schema v10 view_ownership; rejects silent overwrite
of a prior connector's view name on collision.
- #81 Group D — extractor-side identifier validation.
- #83 — Jira webhook fail-closed when JIRA_WEBHOOK_SECRET unset
+ path-traversal fix.
- #44 — entire /api/scripts/* surface is admin-only (planted-script +
sandbox-bypass risk closed).
== Web UI polish + deploy fix ==
- /admin/access: live grant-count badges (no stale snapshot revert),
shared-header CSS link added to /catalog and /admin/{tables,permissions},
per-resource-type colored stripes.
- docker-compose.host-mount.yml: bind,rbind so dual-disk hosts don't
silently shadow sub-mounts and write state to the wrong disk.
== OSS vendor-neutralization (waves 1+2) ==
- scripts/grpn/ → scripts/ops/. Customer-specific identifiers
(project IDs, internal hostnames, dev/prod VM IPs, brand names)
replaced with placeholders across code, docs, Terraform, Caddyfile,
OAuth probe, and planning docs. Downstream infra repos that copied
scripts/grpn/agnes-tls-rotate.sh or agnes-auto-upgrade.sh must
update the path.
== Translation ==
- src/repositories/user_groups.py::ensure_system docstring translated
from Czech to English for codebase consistency.
Co-authored-by: Mina Rustamyan <mina@keboola.com>
575 lines
23 KiB
Python
575 lines
23 KiB
Python
"""Tests for auth providers — password, email magic link, google OAuth."""
|
|
|
|
import os
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture
|
|
def client(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
|
|
|
|
from app.main import create_app
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
|
|
conn = get_system_db()
|
|
ur = UserRepository(conn)
|
|
# User with password
|
|
try:
|
|
from argon2 import PasswordHasher
|
|
ph = PasswordHasher()
|
|
pw_hash = ph.hash("testpass123")
|
|
except ImportError:
|
|
import hashlib
|
|
pw_hash = hashlib.sha256(b"testpass123").hexdigest()
|
|
|
|
ur.create(id="pw1", email="pw@test.com", name="PW User", role="analyst", password_hash=pw_hash)
|
|
# User with setup token (and fresh created timestamp so the JSON /setup
|
|
# endpoint's TTL check accepts it)
|
|
from datetime import datetime, timezone
|
|
ur.create(id="setup1", email="setup@test.com", name="Setup User", role="analyst")
|
|
ur.update(id="setup1", setup_token="setup-token-123",
|
|
setup_token_created=datetime.now(timezone.utc))
|
|
# User for magic link
|
|
ur.create(id="ml1", email="ml@test.com", name="ML User", role="analyst")
|
|
conn.close()
|
|
|
|
app = create_app()
|
|
return TestClient(app)
|
|
|
|
|
|
class TestTokenEndpoint:
|
|
"""Tests for /auth/token — password bypass fix."""
|
|
|
|
def test_token_empty_password_rejected_when_user_has_hash(self, client):
|
|
"""Empty password must be rejected when user has password_hash."""
|
|
resp = client.post("/auth/token", json={"email": "pw@test.com", "password": ""})
|
|
assert resp.status_code == 401
|
|
|
|
def test_token_missing_password_rejected_when_user_has_hash(self, client):
|
|
"""Omitting password field (defaults to '') must be rejected when user has password_hash."""
|
|
resp = client.post("/auth/token", json={"email": "pw@test.com"})
|
|
assert resp.status_code == 401
|
|
|
|
def test_token_wrong_password_rejected(self, client):
|
|
"""Wrong password must be rejected with 401."""
|
|
resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "wrongpass"})
|
|
assert resp.status_code == 401
|
|
|
|
def test_token_correct_password_succeeds(self, client):
|
|
"""Correct password must issue a token."""
|
|
resp = client.post("/auth/token", json={"email": "pw@test.com", "password": "testpass123"})
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert "access_token" in data
|
|
assert data["email"] == "pw@test.com"
|
|
|
|
def test_token_no_password_hash_user_gets_token(self, client):
|
|
"""User without password_hash (OAuth-only) must be rejected at /auth/token."""
|
|
resp = client.post("/auth/token", json={"email": "ml@test.com"})
|
|
assert resp.status_code == 401
|
|
|
|
def test_token_rejected_for_oauth_only_user(self, client):
|
|
"""OAuth-only user (no password_hash) must not receive a token via /auth/token."""
|
|
resp = client.post("/auth/token", json={"email": "ml@test.com"})
|
|
assert resp.status_code == 401
|
|
assert "external authentication" in resp.json()["detail"]
|
|
|
|
def test_token_unknown_user_rejected(self, client):
|
|
"""Unknown email must return 401."""
|
|
resp = client.post("/auth/token", json={"email": "nobody@test.com", "password": "anything"})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
class TestPasswordAuth:
|
|
def test_login_success(self, client):
|
|
resp = client.post("/auth/password/login", json={
|
|
"email": "pw@test.com", "password": "testpass123",
|
|
})
|
|
assert resp.status_code == 200
|
|
assert "access_token" in resp.json()
|
|
|
|
def test_login_wrong_password(self, client):
|
|
resp = client.post("/auth/password/login", json={
|
|
"email": "pw@test.com", "password": "wrongpass",
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_login_unknown_user(self, client):
|
|
resp = client.post("/auth/password/login", json={
|
|
"email": "unknown@test.com", "password": "test",
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
def test_setup_password(self, client):
|
|
resp = client.post("/auth/password/setup", json={
|
|
"email": "setup@test.com", "token": "setup-token-123", "password": "newpass456",
|
|
})
|
|
assert resp.status_code == 200
|
|
assert "access_token" in resp.json()
|
|
|
|
def test_setup_wrong_token(self, client):
|
|
resp = client.post("/auth/password/setup", json={
|
|
"email": "setup@test.com", "token": "wrong-token", "password": "newpass",
|
|
})
|
|
assert resp.status_code == 400
|
|
|
|
|
|
class TestEmailAuth:
|
|
def test_send_link_registered(self, client):
|
|
resp = client.post("/auth/email/send-link", json={"email": "ml@test.com"})
|
|
assert resp.status_code == 200
|
|
# Always returns same message (anti-enumeration)
|
|
assert "If this email" in resp.json()["message"]
|
|
|
|
def test_send_link_unregistered(self, client):
|
|
resp = client.post("/auth/email/send-link", json={"email": "nobody@test.com"})
|
|
assert resp.status_code == 200
|
|
assert "If this email" in resp.json()["message"]
|
|
|
|
def test_verify_invalid_token(self, client):
|
|
resp = client.post("/auth/email/verify", json={
|
|
"email": "ml@test.com", "token": "invalid",
|
|
})
|
|
assert resp.status_code == 401
|
|
|
|
|
|
class TestGoogleOAuth:
|
|
def test_google_login_not_configured(self, client):
|
|
"""Without GOOGLE_CLIENT_ID, should redirect to login with error."""
|
|
resp = client.get("/auth/google/login", follow_redirects=False)
|
|
assert resp.status_code == 302 or resp.status_code == 307
|
|
assert "error" in resp.headers.get("location", "")
|
|
|
|
|
|
@pytest.mark.skip(reason="v12: _fetch_google_groups removed; group sync now uses ADC via app.auth.group_sync.fetch_user_groups. Rewrite for the new module.")
|
|
class TestGoogleGroupsFetch:
|
|
"""Unit tests for _fetch_google_groups — the helper must be tolerant of
|
|
every realistic failure mode (non-Workspace tenants return 403, expired
|
|
tokens return 401, network errors bubble from httpx) and never raise."""
|
|
|
|
def test_parses_groups_from_success_response(self, monkeypatch):
|
|
import asyncio
|
|
from app.auth.providers import google as gp
|
|
|
|
|
|
# searchTransitiveGroups returns {"memberships": [...]}, not {"groups": [...]}.
|
|
# Each item carries the group identity in groupKey.id + displayName,
|
|
# matching the actual API response shape.
|
|
fake_payload = {
|
|
"memberships": [
|
|
{
|
|
"group": "groups/abc123",
|
|
"groupKey": {"id": "team-eng@example.com"},
|
|
"displayName": "Engineering",
|
|
},
|
|
{
|
|
"group": "groups/def456",
|
|
"groupKey": {"id": "everyone@example.com"},
|
|
# No displayName — falls back to id
|
|
},
|
|
],
|
|
}
|
|
|
|
class _Resp:
|
|
status_code = 200
|
|
text = ""
|
|
def json(self):
|
|
return fake_payload
|
|
|
|
class _FakeClient:
|
|
def __init__(self, *a, **kw):
|
|
pass
|
|
async def __aenter__(self):
|
|
return self
|
|
async def __aexit__(self, *a):
|
|
return False
|
|
async def get(self, url, params=None, headers=None):
|
|
return _Resp()
|
|
|
|
monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient)
|
|
|
|
groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com"))
|
|
assert groups == [
|
|
{"id": "team-eng@example.com", "name": "Engineering"},
|
|
{"id": "everyone@example.com", "name": "everyone@example.com"},
|
|
]
|
|
|
|
def test_returns_empty_on_403(self, monkeypatch):
|
|
"""Cloud Identity not enabled (non-Workspace tenant) → 403 → [] + warning."""
|
|
import asyncio
|
|
from app.auth.providers import google as gp
|
|
|
|
class _Resp:
|
|
status_code = 403
|
|
text = "Cloud Identity API has not been enabled"
|
|
|
|
class _FakeClient:
|
|
def __init__(self, *a, **kw): pass
|
|
async def __aenter__(self): return self
|
|
async def __aexit__(self, *a): return False
|
|
async def get(self, url, params=None, headers=None):
|
|
return _Resp()
|
|
|
|
monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient)
|
|
|
|
groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com"))
|
|
assert groups == []
|
|
|
|
def test_returns_empty_on_exception(self, monkeypatch):
|
|
"""Network error inside httpx must be swallowed, not propagated."""
|
|
import asyncio
|
|
from app.auth.providers import google as gp
|
|
|
|
class _FakeClient:
|
|
def __init__(self, *a, **kw): pass
|
|
async def __aenter__(self): return self
|
|
async def __aexit__(self, *a): return False
|
|
async def get(self, *a, **kw):
|
|
raise RuntimeError("boom")
|
|
|
|
monkeypatch.setattr(gp.httpx, "AsyncClient", _FakeClient)
|
|
|
|
groups = asyncio.run(gp._fetch_google_groups("fake-token", "user@example.com"))
|
|
assert groups == []
|
|
|
|
|
|
class TestLocalDevGroupsParser:
|
|
"""Unit tests for get_local_dev_groups() — must tolerate every malformed
|
|
input shape (typos, wrong type, missing id) and never raise. Bad input
|
|
becomes [] + a WARNING log so the dev mock can't break the dev flow."""
|
|
|
|
def test_returns_empty_when_unset(self, monkeypatch):
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False)
|
|
assert get_local_dev_groups() == []
|
|
|
|
def test_returns_empty_when_blank(self, monkeypatch):
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.setenv("LOCAL_DEV_GROUPS", " ")
|
|
assert get_local_dev_groups() == []
|
|
|
|
def test_parses_valid_json_array(self, monkeypatch):
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.setenv(
|
|
"LOCAL_DEV_GROUPS",
|
|
'[{"id":"eng@x.com","name":"Engineering"},'
|
|
'{"id":"admins@x.com","name":"Admins"}]',
|
|
)
|
|
assert get_local_dev_groups() == [
|
|
{"id": "eng@x.com", "name": "Engineering"},
|
|
{"id": "admins@x.com", "name": "Admins"},
|
|
]
|
|
|
|
def test_defaults_name_to_id(self, monkeypatch):
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.setenv("LOCAL_DEV_GROUPS", '[{"id":"eng@x.com"}]')
|
|
assert get_local_dev_groups() == [{"id": "eng@x.com", "name": "eng@x.com"}]
|
|
|
|
def test_preserves_extra_fields(self, monkeypatch):
|
|
"""Forward-compat: unknown fields like roles/labels survive parsing
|
|
so future group-aware code can be exercised in dev without parser changes."""
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.setenv(
|
|
"LOCAL_DEV_GROUPS",
|
|
'[{"id":"eng@x.com","name":"Eng","roles":["MEMBER","OWNER"]}]',
|
|
)
|
|
result = get_local_dev_groups()
|
|
assert result == [
|
|
{"id": "eng@x.com", "name": "Eng", "roles": ["MEMBER", "OWNER"]},
|
|
]
|
|
|
|
def test_returns_empty_on_invalid_json(self, monkeypatch):
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.setenv("LOCAL_DEV_GROUPS", "not-json,foo")
|
|
assert get_local_dev_groups() == []
|
|
|
|
def test_returns_empty_on_non_list(self, monkeypatch):
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.setenv("LOCAL_DEV_GROUPS", '{"id":"eng@x.com"}')
|
|
assert get_local_dev_groups() == []
|
|
|
|
def test_skips_items_without_id(self, monkeypatch):
|
|
"""Bad items are dropped, valid siblings survive — partial config
|
|
still produces something useful instead of nuking the whole list."""
|
|
from app.auth.dependencies import get_local_dev_groups
|
|
monkeypatch.setenv(
|
|
"LOCAL_DEV_GROUPS",
|
|
'[{"name":"no-id"},{"id":"eng@x.com","name":"Eng"},"string-not-object"]',
|
|
)
|
|
assert get_local_dev_groups() == [{"id": "eng@x.com", "name": "Eng"}]
|
|
|
|
|
|
@pytest.mark.skip(reason="v12: session.google_groups + /profile group rendering removed; profile now reads user_group_members. Rewrite to assert membership rows instead.")
|
|
class TestLocalDevGroupsInjection:
|
|
"""End-to-end: with LOCAL_DEV_MODE=1 + LOCAL_DEV_GROUPS, the seeded dev
|
|
user's session.google_groups gets populated on first authenticated request
|
|
so /profile renders the mocked groups."""
|
|
|
|
@pytest.fixture
|
|
def dev_client(self, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
|
|
monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!")
|
|
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
|
|
monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost")
|
|
monkeypatch.setenv(
|
|
"LOCAL_DEV_GROUPS",
|
|
'[{"id":"local-dev-engineers@example.com","name":"Local Dev Engineers"}]',
|
|
)
|
|
from app.main import create_app
|
|
return TestClient(create_app())
|
|
|
|
def test_dev_user_sees_mocked_groups_on_profile(self, dev_client):
|
|
resp = dev_client.get("/profile")
|
|
assert resp.status_code == 200
|
|
body = resp.text
|
|
assert "local-dev-engineers@example.com" in body
|
|
assert "Local Dev Engineers" in body
|
|
assert "No Google groups available" not in body
|
|
|
|
def test_empty_LOCAL_DEV_GROUPS_falls_back_to_empty_state(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
|
|
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
|
|
monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False)
|
|
from app.main import create_app
|
|
client = TestClient(create_app())
|
|
resp = client.get("/profile")
|
|
assert resp.status_code == 200
|
|
assert "No Google groups available" in resp.text
|
|
|
|
|
|
class TestLocalDevGroupsStartupValidation:
|
|
"""Startup banner reports on LOCAL_DEV_GROUPS so a typo or malformed JSON
|
|
is loud at boot, not silent until the first authenticated request."""
|
|
|
|
def _capture_startup_logs(self, tmp_path, monkeypatch, caplog, env_value):
|
|
import logging
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
|
|
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
|
|
if env_value is None:
|
|
monkeypatch.delenv("LOCAL_DEV_GROUPS", raising=False)
|
|
else:
|
|
monkeypatch.setenv("LOCAL_DEV_GROUPS", env_value)
|
|
from app.main import create_app
|
|
with caplog.at_level(logging.WARNING, logger="app.main"):
|
|
create_app()
|
|
return caplog.text
|
|
|
|
def test_logs_count_and_ids_on_valid_input(self, tmp_path, monkeypatch, caplog):
|
|
text = self._capture_startup_logs(
|
|
tmp_path, monkeypatch, caplog,
|
|
'[{"id":"a@x.com","name":"A"},{"id":"b@x.com","name":"B"}]',
|
|
)
|
|
assert "mocking 2 group(s)" in text
|
|
assert "a@x.com" in text
|
|
assert "b@x.com" in text
|
|
|
|
def test_warns_when_set_but_malformed(self, tmp_path, monkeypatch, caplog):
|
|
text = self._capture_startup_logs(
|
|
tmp_path, monkeypatch, caplog, "not-valid-json",
|
|
)
|
|
assert "produced no valid groups" in text
|
|
|
|
def test_logs_unset_explicitly(self, tmp_path, monkeypatch, caplog):
|
|
text = self._capture_startup_logs(tmp_path, monkeypatch, caplog, None)
|
|
assert "LOCAL_DEV_GROUPS is unset" in text
|
|
|
|
|
|
class TestCookieAuth:
|
|
def test_web_ui_with_cookie(self, client):
|
|
"""Test that web UI routes accept JWT from cookie."""
|
|
from app.auth.jwt import create_access_token
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
|
|
conn = get_system_db()
|
|
ur = UserRepository(conn)
|
|
# Use existing user
|
|
user = ur.get_by_email("pw@test.com")
|
|
conn.close()
|
|
|
|
token = create_access_token(user["id"], user["email"], user["role"])
|
|
# Set cookie and access dashboard
|
|
client.cookies.set("access_token", token)
|
|
resp = client.get("/dashboard")
|
|
# Should not be 401 — cookie auth works
|
|
assert resp.status_code != 401
|
|
|
|
|
|
@pytest.mark.skip(reason="v12: callback writes user_group_members instead of users.groups JSON. Rewrite assertions for the new schema.")
|
|
class TestGoogleCallbackGroupSync:
|
|
"""Google OAuth callback populates users.groups from Workspace.
|
|
|
|
The real google.py module captures GOOGLE_CLIENT_ID/SECRET at import
|
|
time and conditionally registers `oauth.google`. For tests we:
|
|
1. Patch `is_available` so the callback's early-return guard doesn't fire
|
|
2. Stub `oauth.google.authorize_access_token` with an AsyncMock
|
|
3. Stub `fetch_user_groups` at the import site (app.auth.providers.google)
|
|
to return a fixed list — no real Google traffic
|
|
"""
|
|
|
|
@pytest.fixture
|
|
def google_app(self, tmp_path, monkeypatch):
|
|
import json as _json
|
|
from unittest.mock import AsyncMock
|
|
from types import SimpleNamespace
|
|
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
|
|
|
|
from app.main import create_app
|
|
import app.auth.providers.google as g_mod
|
|
|
|
# (1) bypass the is_available guard
|
|
monkeypatch.setattr(g_mod, "is_available", lambda: True)
|
|
|
|
# (2) fake oauth.google with async authorize_access_token
|
|
fake_oauth_google = SimpleNamespace(
|
|
authorize_access_token=AsyncMock(
|
|
return_value={
|
|
"userinfo": {
|
|
"email": "tester@groupon.com",
|
|
"name": "Tester",
|
|
}
|
|
}
|
|
)
|
|
)
|
|
monkeypatch.setattr(g_mod.oauth, "google", fake_oauth_google, raising=False)
|
|
|
|
# (3) fake fetch_user_groups — also patches the import inside
|
|
# google_callback because it does `from app.auth.group_sync import fetch_user_groups`
|
|
# inside the function body, so patching the source module is enough.
|
|
import app.auth.group_sync as gs_mod
|
|
monkeypatch.setattr(
|
|
gs_mod,
|
|
"fetch_user_groups",
|
|
lambda email: ["grp_a@groupon.com", "grp_b@groupon.com"],
|
|
)
|
|
|
|
app = create_app()
|
|
client = TestClient(app, follow_redirects=False)
|
|
return {"client": client, "json": _json}
|
|
|
|
def test_callback_creates_user_with_groups(self, google_app):
|
|
"""First-time login → user row + groups populated + two user_groups rows."""
|
|
c = google_app["client"]
|
|
_json = google_app["json"]
|
|
|
|
resp = c.get("/auth/google/callback?code=x&state=y")
|
|
assert resp.status_code == 302
|
|
assert resp.headers["location"] == "/dashboard"
|
|
# access_token cookie set
|
|
assert "access_token" in resp.cookies
|
|
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
from src.repositories.user_groups import UserGroupsRepository
|
|
|
|
conn = get_system_db()
|
|
try:
|
|
user = UserRepository(conn).get_by_email("tester@groupon.com")
|
|
assert user is not None
|
|
assert user["role"] == "analyst"
|
|
assert _json.loads(user["groups"]) == [
|
|
"grp_a@groupon.com",
|
|
"grp_b@groupon.com",
|
|
]
|
|
names = {g["name"] for g in UserGroupsRepository(conn).list_all()}
|
|
assert "grp_a@groupon.com" in names
|
|
assert "grp_b@groupon.com" in names
|
|
# non-system flag
|
|
row = UserGroupsRepository(conn).get_by_name("grp_a@groupon.com")
|
|
assert row["is_system"] is False
|
|
assert row["created_by"] == "system:google-sync"
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_callback_updates_groups_on_relogin(self, google_app, monkeypatch):
|
|
"""Second login with a different group set overwrites the first."""
|
|
c = google_app["client"]
|
|
_json = google_app["json"]
|
|
|
|
# First login — default stub returns [a, b]
|
|
c.get("/auth/google/callback?code=x&state=y")
|
|
|
|
# Swap the mock to return a single, different group on the next call
|
|
import app.auth.group_sync as gs_mod
|
|
monkeypatch.setattr(
|
|
gs_mod, "fetch_user_groups", lambda email: ["grp_c@groupon.com"]
|
|
)
|
|
|
|
c.get("/auth/google/callback?code=x&state=y")
|
|
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
|
|
conn = get_system_db()
|
|
try:
|
|
user = UserRepository(conn).get_by_email("tester@groupon.com")
|
|
assert _json.loads(user["groups"]) == ["grp_c@groupon.com"]
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_callback_fails_soft_on_group_sync_exception(self, google_app, monkeypatch):
|
|
"""An exception inside fetch_user_groups does not block the login."""
|
|
c = google_app["client"]
|
|
_json = google_app["json"]
|
|
|
|
def raise_boom(email):
|
|
raise RuntimeError("Google API is down")
|
|
|
|
import app.auth.group_sync as gs_mod
|
|
monkeypatch.setattr(gs_mod, "fetch_user_groups", raise_boom)
|
|
|
|
resp = c.get("/auth/google/callback?code=x&state=y")
|
|
# Login still proceeds, redirect to dashboard with token cookie
|
|
assert resp.status_code == 302
|
|
assert resp.headers["location"] == "/dashboard"
|
|
assert "access_token" in resp.cookies
|
|
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
|
|
conn = get_system_db()
|
|
try:
|
|
user = UserRepository(conn).get_by_email("tester@groupon.com")
|
|
assert user is not None
|
|
# groups stays NULL (no previous value either)
|
|
assert user["groups"] is None
|
|
finally:
|
|
conn.close()
|
|
|
|
def test_callback_empty_groups_does_not_overwrite_existing(self, google_app, monkeypatch):
|
|
"""fetch_user_groups returning [] means 'no data' — don't wipe existing
|
|
groups on a transient failure masked as empty."""
|
|
c = google_app["client"]
|
|
_json = google_app["json"]
|
|
|
|
# First login populates groups
|
|
c.get("/auth/google/callback?code=x&state=y")
|
|
|
|
# Second login: Google returns empty
|
|
import app.auth.group_sync as gs_mod
|
|
monkeypatch.setattr(gs_mod, "fetch_user_groups", lambda email: [])
|
|
c.get("/auth/google/callback?code=x&state=y")
|
|
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
|
|
conn = get_system_db()
|
|
try:
|
|
user = UserRepository(conn).get_by_email("tester@groupon.com")
|
|
# Previous groups preserved
|
|
assert _json.loads(user["groups"]) == [
|
|
"grp_a@groupon.com",
|
|
"grp_b@groupon.com",
|
|
]
|
|
finally:
|
|
conn.close()
|