diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e635bd..5d36214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,24 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C +## [0.11.3] — 2026-04-26 + +Authorization-foundation release — adds the internal-roles layer between Cloud Identity groups and per-module capability checks. Schema v8 migration; no admin UI yet (follow-up). + +### Added + +- **Internal roles + group mapping (foundation).** Schema v8 adds two tables: `internal_roles` (app-defined capabilities like `context_admin`, `agent_operator`, registered by Agnes modules at import time) and `group_mappings` (many-to-many bindings of Cloud Identity group IDs to internal role keys, managed by admins). New `app.auth.role_resolver` module exposes `register_internal_role(...)` for module authors, `sync_registered_roles_to_db(...)` (run once at startup, idempotent), `resolve_internal_roles(external_groups, conn)` (called at sign-in, writes resolved keys into `session["internal_roles"]`), and a `require_internal_role("…")` FastAPI dependency factory for permission checks. Resolution runs at sign-in (Google OAuth callback + dev-bypass — populates on first request and whenever external groups change, mirroring the OAuth callback's always-write semantics). No DB hit per request. Refresh requires re-login, same semantics as `session.google_groups`. **No admin UI yet** — mapping rows must be created via the repository directly until the management UI ships in a follow-up. PAT/headless clients carry no session and therefore cannot pass `require_internal_role` gates by design — `require_internal_role` distinguishes "signed-in but missing role" from "no session at all" and surfaces a PAT-specific 403 detail in the second case so an API consumer hitting the wall sees what to fix. See `docs/internal-roles.md` → *PAT and headless requests*. + +### Changed + +- `docs/internal-roles.md` documents `Admin → Users → deactivate then reactivate` as the supported "force re-resolve now" lever for users you can't get to log out (long-lived sessions, automated clients) — invalidates the existing session and forces a fresh sign-in on the next request. + +### Internal + +- INFO-level audit log on every successful resolve (OAuth callback + dev-bypass) so a "wrong role" complaint is debuggable from the log alone — admin can correlate "user X claims they lost access" with the resolver output without replaying the request. +- Startup warning when `SESSION_SECRET` is shorter than 32 chars, matching the existing `JWT_SECRET_KEY` gate. Both HMAC surfaces sign trust-laden state (`session.internal_roles`, `session.google_groups`, JWTs) — keeping the two gates consistent so a weak secret gets surfaced at boot, not after a quiet downgrade. +- `_clear_registry_for_tests()` now refuses to run unless `TESTING=1` so a stray import path in production can't drop the registered capabilities. + ## [0.11.2] — 2026-04-26 Dev-experience patch release — make `LOCAL_DEV_MODE` realistic enough to actually exercise group-aware code paths on `localhost`, and consolidate scattered dev-onboarding instructions into a single `docs/local-development.md`. @@ -116,6 +134,7 @@ First tagged semver release. The `version = "2.x"` strings that appeared in earl - Test suite expanded to 1357+ tests (4 layers — unit, integration, web smoke, journey). +[0.11.3]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.3 [0.11.2]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.2 [0.11.1]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.1 [0.11.0]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.0 diff --git a/CLAUDE.md b/CLAUDE.md index 8343c9d..2645cb1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -232,7 +232,7 @@ Module sets `lifecycle { ignore_changes = [metadata_startup_script] }` on `googl ## Key Implementation Details ### DuckDB Schema (src/db.py) -- Schema v7 with auto-migration from v1→v2→v3→v4→v5→v6→v7 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`) +- Schema v8 with auto-migration v1→…→v8 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8 adds `internal_roles` + `group_mappings`) - `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc. - `sync_state`, `sync_history`: track extraction progress - `users`, `dataset_permissions`, `audit_log`: auth + RBAC diff --git a/app/auth/dependencies.py b/app/auth/dependencies.py index 7bcb726..3fd7644 100644 --- a/app/auth/dependencies.py +++ b/app/auth/dependencies.py @@ -145,12 +145,37 @@ async def get_current_user( if request is not None and hasattr(request, "session"): target_groups = get_local_dev_groups() current = request.session.get("google_groups") + groups_changed = False if target_groups and current != target_groups: request.session["google_groups"] = target_groups + groups_changed = True elif not target_groups and current: # Clear stale groups if the operator unsets LOCAL_DEV_GROUPS # mid-session — matches production's "always-write" semantics. request.session["google_groups"] = [] + groups_changed = True + # Populate internal_roles whenever it would otherwise be missing + # — first request after sign-in or any time groups changed. This + # mirrors the OAuth callback's unconditional write so a dev + # request never reaches require_internal_role with the key + # absent. Skipping when role list is already cached + groups + # didn't change keeps the per-request cost at a session lookup. + if groups_changed or "internal_roles" not in request.session: + try: + from app.auth.role_resolver import resolve_internal_roles + resolved = resolve_internal_roles(target_groups, conn) + request.session["internal_roles"] = resolved + logger.info( + "dev-bypass resolved %d internal role(s) for %s: %s", + len(resolved), + user.get("email", ""), + resolved or "", + ) + except Exception as e: + logger.warning( + "dev-bypass: resolve_internal_roles failed: %s", e, + ) + request.session["internal_roles"] = [] return user # Fall through to normal auth if seed missing — surfaces the bug instead of hiding it. diff --git a/app/auth/providers/google.py b/app/auth/providers/google.py index b0a92ed..cce0dcf 100644 --- a/app/auth/providers/google.py +++ b/app/auth/providers/google.py @@ -193,6 +193,33 @@ async def google_callback(request: Request): else: request.session["google_groups"] = [] + # Resolve external groups into internal role keys at sign-in. Cached + # on the session for the lifetime of this login — refresh requires + # re-login, same as the google_groups list itself. + try: + from app.auth.role_resolver import resolve_internal_roles + from src.db import get_system_db + conn = get_system_db() + try: + resolved = resolve_internal_roles( + request.session.get("google_groups", []) or [], + conn, + ) + finally: + conn.close() + request.session["internal_roles"] = resolved + # INFO-level audit so a wrong-role complaint is debuggable from + # the log alone — admin can correlate "user X claims they lost + # access" with the resolver output without replaying the request. + logger.info( + "Resolved %d internal role(s) for %s: %s", + len(resolved), email, resolved or "", + ) + except Exception as e: + # Resolver errors must not break login — fall back to no roles. + logger.warning("Failed to resolve internal_roles for %s: %s", email, e) + request.session["internal_roles"] = [] + # Issue JWT jwt_token = create_access_token(user["id"], user["email"], user["role"]) diff --git a/app/auth/role_resolver.py b/app/auth/role_resolver.py new file mode 100644 index 0000000..fce6a7c --- /dev/null +++ b/app/auth/role_resolver.py @@ -0,0 +1,226 @@ +"""Internal-role registry, resolver, and FastAPI dependency factory. + +## Lifecycle + +1. **Module import** — each Agnes module declares its internal roles via + ``register_internal_role(...)``. The registry is module-level state, so + the registration happens once per process. +2. **App startup** — ``sync_registered_roles_to_db(conn)`` inserts any + newly-registered keys into ``internal_roles`` and refreshes the metadata + (display_name, description, owner_module) on existing rows. Idempotent. +3. **Sign-in** — ``resolve_internal_roles(external_groups, conn)`` joins + ``session.google_groups`` against ``group_mappings`` and writes the + resulting role-key list into ``session["internal_roles"]``. +4. **Request handling** — ``require_internal_role("context_admin")`` reads + the cached list off the session; no DB hit per request. + +## Refresh semantics + +Resolution happens at sign-in, so a user with a stale session keeps stale +roles after an admin changes a mapping. ``Logout → sign in again`` is the +only refresh path today — the same semantics as Google's group cache and +the existing ``session.google_groups`` flow. +""" + +from __future__ import annotations + +import logging +import os +import re +import uuid +from dataclasses import dataclass +from typing import Optional + +import duckdb +from fastapi import Depends, HTTPException, Request, status + +from app.auth.dependencies import get_current_user +from src.repositories.group_mappings import GroupMappingsRepository +from src.repositories.internal_roles import InternalRolesRepository + +logger = logging.getLogger(__name__) + + +_ROLE_KEY_RE = re.compile(r"^[a-z][a-z0-9_]{0,63}$") + + +@dataclass(frozen=True) +class InternalRoleSpec: + """Module-side declaration of an internal role. + + Mirrors the persisted shape minus ``id`` (assigned at sync time) and + timestamps. Frozen so a stray mutation can't desync registry from DB. + """ + key: str + display_name: str + description: str = "" + owner_module: Optional[str] = None + + +# Module-level registry. Populated by register_internal_role() at import time; +# drained by sync_registered_roles_to_db() at app startup. Kept module-level +# (not class-state) because role registration is conceptually per-process. +_REGISTRY: dict[str, InternalRoleSpec] = {} + + +def register_internal_role( + key: str, + *, + display_name: str, + description: str = "", + owner_module: Optional[str] = None, +) -> None: + """Declare an internal role at module-import time. + + ``key`` is the immutable identifier referenced from code (e.g. + ``"context_admin"``); must match ``[a-z][a-z0-9_]{0,63}``. Calling twice + with the same key + same fields is a no-op (re-import safe). Calling + twice with conflicting fields raises ``ValueError`` — that almost always + means two modules picked the same key, which would leave admins unable + to tell which capability they're granting in the mapping UI. + """ + if not _ROLE_KEY_RE.match(key): + raise ValueError( + f"Invalid internal role key {key!r}: must be lower_snake_case, " + f"start with a letter, max 64 chars." + ) + spec = InternalRoleSpec( + key=key, + display_name=display_name, + description=description, + owner_module=owner_module, + ) + existing = _REGISTRY.get(key) + if existing is not None and existing != spec: + raise ValueError( + f"Internal role {key!r} already registered with different fields " + f"(existing={existing}, new={spec}). Pick a unique key." + ) + _REGISTRY[key] = spec + + +def list_registered_roles() -> list[InternalRoleSpec]: + """Snapshot of the current registry — sorted by key for stable output.""" + return sorted(_REGISTRY.values(), key=lambda s: s.key) + + +def _clear_registry_for_tests() -> None: + """Reset the module-level registry. Tests only — never call from app code. + + Refuses to run unless ``TESTING=1`` so a stray import-path in production + can't accidentally drop the registered capabilities. Pytest sets this + via conftest / pytest.ini; production never does. + """ + if os.environ.get("TESTING", "").lower() not in ("1", "true"): + raise RuntimeError( + "_clear_registry_for_tests() called outside of TESTING — " + "this drops every registered internal role and is never safe " + "in app code. Set TESTING=1 if you really mean this.", + ) + _REGISTRY.clear() + + +def sync_registered_roles_to_db(conn: duckdb.DuckDBPyConnection) -> None: + """Reconcile registered roles into ``internal_roles``. Idempotent. + + Inserts new keys, updates display_name/description/owner_module for + existing keys when they've changed. Never deletes — a role disappearing + from code may just mean the module was unloaded; the DB row keeps the + mappings safe until an admin explicitly removes it. + """ + repo = InternalRolesRepository(conn) + inserted = 0 + updated = 0 + for spec in _REGISTRY.values(): + existing = repo.get_by_key(spec.key) + if existing is None: + repo.create( + id=str(uuid.uuid4()), + key=spec.key, + display_name=spec.display_name, + description=spec.description, + owner_module=spec.owner_module, + ) + inserted += 1 + else: + drift = ( + existing.get("display_name") != spec.display_name + or (existing.get("description") or "") != spec.description + or (existing.get("owner_module") or None) != spec.owner_module + ) + if drift: + repo.update( + id=existing["id"], + display_name=spec.display_name, + description=spec.description, + owner_module=spec.owner_module, + ) + updated += 1 + if inserted or updated: + logger.info( + "internal_roles sync: %d inserted, %d updated, %d total registered", + inserted, updated, len(_REGISTRY), + ) + + +def resolve_internal_roles( + external_groups: list[dict], + conn: duckdb.DuckDBPyConnection, +) -> list[str]: + """Map ``session.google_groups`` to internal role keys via ``group_mappings``. + + Pure read of the mapping table — never mutates state. Returns a sorted, + de-duplicated list of role keys. Empty list when no external groups are + supplied or none of them are mapped. + """ + ids = [g["id"] for g in external_groups if isinstance(g, dict) and g.get("id")] + if not ids: + return [] + return GroupMappingsRepository(conn).resolve_role_keys(ids) + + +def require_internal_role(role_key: str): + """FastAPI dependency factory: 403 unless the user holds ``role_key``. + + Reads ``session["internal_roles"]`` populated at sign-in; no DB hit. + The ``user`` dependency runs first so we still 401 unauthenticated + requests with the standard message before checking role membership. + + PAT/headless callers carry no session-resolved roles (the OAuth callback + is the only writer of ``session["internal_roles"]``), so any + ``require_internal_role`` gate returns 403 for them by design — see + ``docs/internal-roles.md`` → *PAT and headless requests*. The 403 detail + distinguishes the two failure modes so an API consumer hitting it via + a token sees an actionable message instead of a generic "missing role". + """ + async def _check( + request: Request, + user: dict = Depends(get_current_user), + ) -> dict: + # "internal_roles in session" is the marker that this request went + # through a sign-in flow (OAuth callback or dev-bypass). Bearer-token + # callers leave the key absent — distinguish those from "signed in, + # but lacks this specific role" so the API consumer sees what to fix. + has_session_roles = ( + hasattr(request, "session") + and "internal_roles" in request.session + ) + roles = ( + request.session.get("internal_roles") or [] + if has_session_roles else [] + ) + if role_key not in roles: + if not has_session_roles: + detail = ( + f"Requires internal role '{role_key}'. This endpoint needs " + f"an interactive (OAuth) session — Bearer/PAT tokens do not " + f"carry session-resolved roles by design." + ) + else: + detail = f"Requires internal role '{role_key}'" + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=detail, + ) + return user + return _check diff --git a/app/main.py b/app/main.py index 52a63a7..f86a709 100644 --- a/app/main.py +++ b/app/main.py @@ -109,6 +109,16 @@ def create_app() -> FastAPI: # Session middleware (required for OAuth state) from app.secrets import get_session_secret session_secret = get_session_secret() + if len(session_secret) < 32: + # Same gate JWT applies (app/auth/jwt.py:_get_secret_key) — keeps the + # two HMAC surfaces consistent. session_internal_roles + google_groups + # are trusted off the cookie signature; a weak SESSION_SECRET means + # those gates are weak too. + import warnings as _warnings + _warnings.warn( + f"SESSION_SECRET is {len(session_secret)} chars — minimum 32 recommended", + UserWarning, stacklevel=2, + ) app.add_middleware(SessionMiddleware, secret_key=session_secret) # CORS for CLI and external clients @@ -210,6 +220,29 @@ def create_app() -> FastAPI: except Exception as e: logger.warning(f"Could not seed admin: {e}") + # Sync internal-role registry into DB. Modules call register_internal_role() + # at import time; this hook reconciles the registry into the internal_roles + # table so the mapping UI has something to show. Idempotent — safe to run + # on every startup. + try: + from app.auth.role_resolver import ( + sync_registered_roles_to_db, list_registered_roles, + ) + from src.db import get_system_db + conn = get_system_db() + try: + sync_registered_roles_to_db(conn) + finally: + conn.close() + registered = list_registered_roles() + if registered: + logger.info( + "internal_roles registered: %s", + ", ".join(s.key for s in registered), + ) + except Exception as e: + logger.warning("internal_roles sync failed at startup: %s", e) + # Static files static_dir = Path(__file__).parent / "web" / "static" if static_dir.exists(): diff --git a/docs/internal-roles.md b/docs/internal-roles.md new file mode 100644 index 0000000..6b88c32 --- /dev/null +++ b/docs/internal-roles.md @@ -0,0 +1,121 @@ +# Internal roles + external group mapping + +Two-layer authorization model for Agnes: + +- **External groups** — Cloud Identity / Google Workspace groups, pulled at sign-in into `session.google_groups`. Owned by the organization; Agnes only reads them. See `docs/auth-groups.md`. +- **Internal roles** — Agnes-defined capabilities (e.g. `context_admin`, `agent_operator`, `dataset_finance_reader`). Owned by Agnes. Registered in code by module authors, persisted in the `internal_roles` table. +- **Group mappings** — admin-managed many-to-many table binding external group IDs to internal role keys. The resolver joins this table at sign-in and writes the resolved role keys into `session["internal_roles"]`. + +Permission checks read off the session — no DB hit per request. + +## When to use which + +| You want to gate on … | Use … | +|---|---| +| "Is this user signed in at all?" | `Depends(get_current_user)` | +| "Coarse global role" (admin / analyst / viewer) | `Depends(require_admin)` / `Depends(require_role(Role.ANALYST))` — `users.role` column | +| "Specific module capability" | `Depends(require_internal_role("context_admin"))` — this doc | + +`users.role` stays the coarse gate for "may enter the building"; internal roles are the fine-grained per-module capabilities layered on top. + +## Module-author workflow (registering a role) + +In your module's import path (e.g. `services/context_engineering/__init__.py`): + +```python +from app.auth.role_resolver import register_internal_role + +register_internal_role( + "context_admin", + display_name="Context Engineering Admin", + description="Manages prompt templates and retrieval settings.", + owner_module="context_engineering", +) +``` + +Constraints on `key`: +- lower_snake_case, starts with a letter, ≤ 64 chars (`^[a-z][a-z0-9_]{0,63}$`) +- **immutable** — referenced from code; renaming would silently break every existing mapping. Pick carefully. +- registering the same key twice with the **same** fields is a no-op (re-import safe); registering with **different** fields raises `ValueError`. If two modules collide, one of them must rename. + +`register_internal_role` only populates the in-process registry. The startup hook in `app/main.py` calls `sync_registered_roles_to_db(conn)` to reconcile the registry into the `internal_roles` table: +- **Inserts** keys that don't exist yet +- **Updates** `display_name` / `description` / `owner_module` when they've drifted from code +- **Never deletes** — a role disappearing from code (module unloaded) keeps its DB row and any mappings until an admin explicitly removes it + +## Admin workflow (mapping external → internal) + +Until the management UI ships, mappings are created via repository directly: + +```python +from src.db import get_system_db +from src.repositories.group_mappings import GroupMappingsRepository +from src.repositories.internal_roles import InternalRolesRepository +import uuid + +conn = get_system_db() +role = InternalRolesRepository(conn).get_by_key("context_admin") +GroupMappingsRepository(conn).create( + id=str(uuid.uuid4()), + external_group_id="engineering@example.com", # Cloud Identity group ID + internal_role_id=role["id"], + assigned_by="admin@example.com", +) +conn.close() +``` + +After the mapping is created, affected users must **sign out and back in** for the resolver to pick it up — same refresh semantics as Google's group cache. + +If you can't get the user to log out (long-lived session, automated client), `Admin → Users → deactivate then reactivate` invalidates the existing session and forces a fresh sign-in on the next request. That is the supported "force re-resolve now" lever — there's no per-user role-cache invalidation API today. + +## Permission check (callsite) + +```python +from fastapi import Depends +from app.auth.role_resolver import require_internal_role + +@router.post("/context/templates") +async def update_template( + body: TemplateUpdate, + user: dict = Depends(require_internal_role("context_admin")), +): + ... +``` + +The dependency reads `session["internal_roles"]` (populated at sign-in); a missing role returns `403 Requires internal role 'context_admin'`. Unauthenticated requests still get `401` from the upstream `get_current_user` dependency. + +## Local development + +`LOCAL_DEV_GROUPS` mocks `session.google_groups` (see `docs/auth-groups.md` → *Local-dev mock*). The dev-bypass branch in `app/auth/dependencies.py` re-runs the resolver every time the mocked groups change, so editing `LOCAL_DEV_GROUPS` + hitting any auth-required endpoint refreshes `session["internal_roles"]` on the next request — no need to bounce the app. + +Typical dev setup: + +```bash +export LOCAL_DEV_MODE=1 +export LOCAL_DEV_GROUPS='[{"id":"engineering@example.com","name":"Engineering"}]' +# Register the role + create the mapping (one-shot script or manual SQL), +# then hit any protected endpoint — dev user now holds context_admin. +``` + +## PAT and headless requests + +Internal roles are **session-scoped only**. Personal Access Tokens (PAT) and other Bearer-token clients carry a JWT that proves identity but not a signed session cookie, so `session["internal_roles"]` is never populated for them. Concretely: any endpoint protected by `Depends(require_internal_role(…))` will return `403` for a PAT client even when the corresponding user's external groups would map to that role through a browser sign-in. + +This is intentional, not a bug — the same constraint already applies to `session.google_groups`, and PAT-issued JWTs deliberately don't snapshot that list (the user's group memberships can change after the token was issued without any way to re-sign the token). Two practical implications: + +- **Don't gate PAT-callable endpoints with `require_internal_role`.** Use `users.role` (`require_admin` / `require_role(Role.ANALYST)`) for the coarse check, or check the JWT claims directly. Internal roles fit OAuth-flow consumers (the web UI) and the dev bypass. +- **If you need a CLI/script to act with elevated capability**, the current escape valves are: (a) issue the PAT to a user whose `users.role` already covers it, (b) call the endpoint through the OAuth flow from a browser session, or (c) wait for the planned `da admin grant-role` CLI helper (see *Future work*) which will store an explicit per-user grant outside the group-mapping flow. + +## Resolution timing + +Resolver runs at sign-in only: +- Google OAuth callback (`app/auth/providers/google.py`) — after `_fetch_google_groups`, before issuing the JWT +- Dev-bypass branch (`app/auth/dependencies.py`) — when `LOCAL_DEV_GROUPS` value changes for a session + +Per-request reads are off `session["internal_roles"]` only; no DB hit. Trade-off: a user with a stale session keeps stale roles until they log out + back in. Same as `session.google_groups`. Cheaper than per-request DB lookup; matches the existing mental model. + +## Future work (not in this PR) + +- Admin UI under `/admin/role-mapping` — list registered roles + their current mappings, add/remove mappings, surface drift between registry and DB. +- Audit-log entries for mapping create/delete (write into `audit_log` with `action="role_mapping.created"` / `"role_mapping.deleted"`, `resource=f"mapping:{id}"`). +- Optional CLI helper `da admin grant-role ` for ad-hoc grants without going through external groups. diff --git a/pyproject.toml b/pyproject.toml index 0b2da7e..447f055 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.11.2" +version = "0.11.3" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/src/db.py b/src/db.py index b051875..3b74db2 100644 --- a/src/db.py +++ b/src/db.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 7 +SCHEMA_VERSION = 8 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -221,6 +221,34 @@ CREATE TABLE IF NOT EXISTS personal_access_tokens ( last_used_ip VARCHAR, revoked_at TIMESTAMP ); + +-- Internal roles: app-defined capabilities (e.g. 'context_admin', 'agent_operator'). +-- `key` is the immutable lower_snake_case identifier referenced from code; modules +-- self-register their roles on import and the startup hook syncs the registry to +-- this table. Admins map external Cloud Identity groups onto these roles via +-- group_mappings — they don't create roles in the UI. +CREATE TABLE IF NOT EXISTS internal_roles ( + id VARCHAR PRIMARY KEY, + key VARCHAR UNIQUE NOT NULL, + display_name VARCHAR NOT NULL, + description TEXT, + owner_module VARCHAR, + created_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + updated_at TIMESTAMP NOT NULL DEFAULT current_timestamp +); + +-- External-to-internal group mapping: which Cloud Identity groups grant which +-- internal role. Many-to-many. The resolver joins this table at sign-in and +-- writes the resulting role keys into session.internal_roles for cheap lookup +-- on subsequent requests. +CREATE TABLE IF NOT EXISTS group_mappings ( + id VARCHAR PRIMARY KEY, + external_group_id VARCHAR NOT NULL, + internal_role_id VARCHAR NOT NULL REFERENCES internal_roles(id), + assigned_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + assigned_by VARCHAR, + UNIQUE (external_group_id, internal_role_id) +); """ @@ -432,6 +460,30 @@ _V6_TO_V7_MIGRATIONS = [ "ALTER TABLE personal_access_tokens ADD COLUMN IF NOT EXISTS last_used_ip VARCHAR", ] +_V7_TO_V8_MIGRATIONS = [ + """ + CREATE TABLE IF NOT EXISTS internal_roles ( + id VARCHAR PRIMARY KEY, + key VARCHAR UNIQUE NOT NULL, + display_name VARCHAR NOT NULL, + description TEXT, + owner_module VARCHAR, + created_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + updated_at TIMESTAMP NOT NULL DEFAULT current_timestamp + ) + """, + """ + CREATE TABLE IF NOT EXISTS group_mappings ( + id VARCHAR PRIMARY KEY, + external_group_id VARCHAR NOT NULL, + internal_role_id VARCHAR NOT NULL REFERENCES internal_roles(id), + assigned_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + assigned_by VARCHAR, + UNIQUE (external_group_id, internal_role_id) + ) + """, +] + _V3_TO_V4_MIGRATIONS = [ """ CREATE TABLE IF NOT EXISTS metric_definitions ( @@ -522,6 +574,9 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 7: for sql in _V6_TO_V7_MIGRATIONS: conn.execute(sql) + if current < 8: + for sql in _V7_TO_V8_MIGRATIONS: + conn.execute(sql) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/src/repositories/group_mappings.py b/src/repositories/group_mappings.py new file mode 100644 index 0000000..582ef47 --- /dev/null +++ b/src/repositories/group_mappings.py @@ -0,0 +1,104 @@ +"""Repository for external→internal group mappings. + +Each row binds a Cloud Identity group ID (as it appears in +``session.google_groups[*].id``) to an internal role. Many-to-many: a single +external group may grant several internal roles, and a single internal role +may be granted by several external groups. The resolver in +``app.auth.role_resolver`` reads this table at sign-in. +""" + +from typing import Any, Dict, List, Optional + +import duckdb + + +class GroupMappingsRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> Optional[Dict[str, Any]]: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def get_by_id(self, mapping_id: str) -> Optional[Dict[str, Any]]: + result = self.conn.execute( + "SELECT * FROM group_mappings WHERE id = ?", [mapping_id] + ).fetchone() + return self._row_to_dict(result) + + def list_all(self) -> List[Dict[str, Any]]: + """All mappings, joined with the role's `key` for display purposes.""" + results = self.conn.execute( + """SELECT m.*, r.key AS internal_role_key, r.display_name AS internal_role_display_name + FROM group_mappings m + JOIN internal_roles r ON r.id = m.internal_role_id + ORDER BY m.external_group_id, r.key""" + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def list_by_external_group(self, external_group_id: str) -> List[Dict[str, Any]]: + results = self.conn.execute( + """SELECT m.*, r.key AS internal_role_key + FROM group_mappings m + JOIN internal_roles r ON r.id = m.internal_role_id + WHERE m.external_group_id = ? + ORDER BY r.key""", + [external_group_id], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def list_by_role(self, internal_role_id: str) -> List[Dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM group_mappings WHERE internal_role_id = ? " + "ORDER BY external_group_id", + [internal_role_id], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def create( + self, + id: str, + external_group_id: str, + internal_role_id: str, + assigned_by: Optional[str] = None, + ) -> None: + self.conn.execute( + """INSERT INTO group_mappings + (id, external_group_id, internal_role_id, assigned_by) + VALUES (?, ?, ?, ?)""", + [id, external_group_id, internal_role_id, assigned_by], + ) + + def delete(self, mapping_id: str) -> None: + self.conn.execute("DELETE FROM group_mappings WHERE id = ?", [mapping_id]) + + def resolve_role_keys(self, external_group_ids: List[str]) -> List[str]: + """Map external group IDs to the set of internal role keys they grant. + + Returns a sorted, de-duplicated list — empty when ``external_group_ids`` + is empty or none of them are mapped. The resolver in + ``app.auth.role_resolver`` calls this on every sign-in. + """ + if not external_group_ids: + return [] + placeholders = ",".join(["?"] * len(external_group_ids)) + rows = self.conn.execute( + f"""SELECT DISTINCT r.key + FROM group_mappings m + JOIN internal_roles r ON r.id = m.internal_role_id + WHERE m.external_group_id IN ({placeholders}) + ORDER BY r.key""", + external_group_ids, + ).fetchall() + return [row[0] for row in rows] diff --git a/src/repositories/internal_roles.py b/src/repositories/internal_roles.py new file mode 100644 index 0000000..1632117 --- /dev/null +++ b/src/repositories/internal_roles.py @@ -0,0 +1,82 @@ +"""Repository for internal roles (app-defined capabilities). + +Internal roles are registered by Agnes modules at import-time via +``app.auth.role_resolver.register_internal_role`` and synced into this table +on startup. Admins map external Cloud Identity groups onto these roles via +``GroupMappingsRepository`` — they don't create roles in the UI. +""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +import duckdb + + +class InternalRolesRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> Optional[Dict[str, Any]]: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def get_by_id(self, role_id: str) -> Optional[Dict[str, Any]]: + result = self.conn.execute( + "SELECT * FROM internal_roles WHERE id = ?", [role_id] + ).fetchone() + return self._row_to_dict(result) + + def get_by_key(self, key: str) -> Optional[Dict[str, Any]]: + result = self.conn.execute( + "SELECT * FROM internal_roles WHERE key = ?", [key] + ).fetchone() + return self._row_to_dict(result) + + def list_all(self) -> List[Dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM internal_roles ORDER BY key" + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def create( + self, + id: str, + key: str, + display_name: str, + description: Optional[str] = None, + owner_module: Optional[str] = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO internal_roles + (id, key, display_name, description, owner_module, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [id, key, display_name, description, owner_module, now, now], + ) + + def update(self, id: str, **kwargs) -> None: + # `key` is intentionally NOT in the allowlist — it's the immutable + # identifier referenced from code; a rename would silently break + # every group mapping pointing at it. Re-register under a new key + # and have an admin re-map. + allowed = {"display_name", "description", "owner_module"} + updates = {k: v for k, v in kwargs.items() if k in allowed} + if not updates: + return + updates["updated_at"] = datetime.now(timezone.utc) + set_clause = ", ".join(f"{k} = ?" for k in updates) + values = list(updates.values()) + [id] + self.conn.execute( + f"UPDATE internal_roles SET {set_clause} WHERE id = ?", values + ) + + def delete(self, role_id: str) -> None: + # Caller should delete dependent group_mappings first (no ON DELETE + # CASCADE on the FK) — surfaces dangling references instead of + # silently dropping mappings. + self.conn.execute("DELETE FROM internal_roles WHERE id = ?", [role_id]) diff --git a/tests/test_role_resolver.py b/tests/test_role_resolver.py new file mode 100644 index 0000000..1cbb541 --- /dev/null +++ b/tests/test_role_resolver.py @@ -0,0 +1,435 @@ +"""Tests for the internal-role registry, sync, resolver, and require dependency. + +Schema v8 adds ``internal_roles`` and ``group_mappings``; the resolver in +``app.auth.role_resolver`` is the integration point between Cloud Identity +groups (external) and Agnes-defined capabilities (internal). End-to-end +exercise rides on LOCAL_DEV_MODE + LOCAL_DEV_GROUPS so we don't need to +mock Google OAuth. +""" + +import os +import uuid + +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def db_conn(tmp_path, monkeypatch): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + from src.db import get_system_db + conn = get_system_db() + yield conn + conn.close() + + +@pytest.fixture(autouse=True) +def _clear_role_registry(): + """Module-level _REGISTRY persists across tests in the same process — + flush before AND after each test so registrations from one test don't + leak into the next, regardless of which fixture ran first.""" + from app.auth.role_resolver import _clear_registry_for_tests + _clear_registry_for_tests() + yield + _clear_registry_for_tests() + + +class TestRegisterInternalRole: + def test_register_and_list(self): + from app.auth.role_resolver import ( + register_internal_role, list_registered_roles, + ) + register_internal_role( + "context_admin", + display_name="Context Admin", + description="Manages the context engineering module.", + owner_module="context_engineering", + ) + register_internal_role("agent_operator", display_name="Agent Operator") + keys = [s.key for s in list_registered_roles()] + assert keys == ["agent_operator", "context_admin"] # sorted + + def test_register_same_key_same_fields_is_idempotent(self): + """Re-importing a module shouldn't blow up — same key + same fields no-ops.""" + from app.auth.role_resolver import ( + register_internal_role, list_registered_roles, + ) + register_internal_role("x", display_name="X") + register_internal_role("x", display_name="X") + assert len(list_registered_roles()) == 1 + + def test_register_same_key_different_fields_raises(self): + """Two modules picking the same key would silently overwrite each + other's metadata — refuse and force one of them to rename.""" + from app.auth.role_resolver import register_internal_role + register_internal_role("x", display_name="X") + with pytest.raises(ValueError, match="already registered"): + register_internal_role("x", display_name="Different") + + @pytest.mark.parametrize("bad_key", [ + "Context_Admin", # uppercase + "1context", # leading digit + "context-admin", # hyphen + "", # empty + "context admin", # space + "x" * 65, # too long + ]) + def test_register_rejects_invalid_keys(self, bad_key): + from app.auth.role_resolver import register_internal_role + with pytest.raises(ValueError, match="Invalid internal role key"): + register_internal_role(bad_key, display_name="X") + + +class TestSyncRegisteredRolesToDb: + def test_inserts_new_roles(self, db_conn): + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + ) + from src.repositories.internal_roles import InternalRolesRepository + register_internal_role("ctx_admin", display_name="Context Admin") + sync_registered_roles_to_db(db_conn) + row = InternalRolesRepository(db_conn).get_by_key("ctx_admin") + assert row is not None + assert row["display_name"] == "Context Admin" + + def test_sync_is_idempotent(self, db_conn): + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + ) + register_internal_role("ctx_admin", display_name="Context Admin") + sync_registered_roles_to_db(db_conn) + sync_registered_roles_to_db(db_conn) # second call must not duplicate + rows = db_conn.execute( + "SELECT COUNT(*) FROM internal_roles WHERE key = 'ctx_admin'" + ).fetchone() + assert rows[0] == 1 + + def test_sync_updates_drifted_metadata(self, db_conn): + """Display name change in code should propagate to DB on next startup.""" + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + _clear_registry_for_tests, + ) + from src.repositories.internal_roles import InternalRolesRepository + register_internal_role("ctx_admin", display_name="Old Name") + sync_registered_roles_to_db(db_conn) + # Simulate a code update: clear the registry and re-register with new name. + _clear_registry_for_tests() + register_internal_role("ctx_admin", display_name="New Name") + sync_registered_roles_to_db(db_conn) + row = InternalRolesRepository(db_conn).get_by_key("ctx_admin") + assert row["display_name"] == "New Name" + + def test_sync_does_not_delete_unregistered_roles(self, db_conn): + """A role disappearing from code (module unloaded) keeps its DB row + + mappings until an admin explicitly removes it.""" + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + _clear_registry_for_tests, + ) + from src.repositories.internal_roles import InternalRolesRepository + register_internal_role("legacy_role", display_name="Legacy") + sync_registered_roles_to_db(db_conn) + _clear_registry_for_tests() # module no longer registers this role + sync_registered_roles_to_db(db_conn) + row = InternalRolesRepository(db_conn).get_by_key("legacy_role") + assert row is not None # still there + + +class TestResolveInternalRoles: + def test_returns_empty_when_no_external_groups(self, db_conn): + from app.auth.role_resolver import resolve_internal_roles + assert resolve_internal_roles([], db_conn) == [] + + def test_returns_empty_when_no_mappings(self, db_conn): + from app.auth.role_resolver import resolve_internal_roles + groups = [{"id": "engineers@x.com", "name": "Engineers"}] + assert resolve_internal_roles(groups, db_conn) == [] + + def test_resolves_single_mapping(self, db_conn): + from app.auth.role_resolver import resolve_internal_roles + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + roles = InternalRolesRepository(db_conn) + mappings = GroupMappingsRepository(db_conn) + role_id = str(uuid.uuid4()) + roles.create(id=role_id, key="ctx_admin", display_name="Context Admin") + mappings.create( + id=str(uuid.uuid4()), + external_group_id="engineers@x.com", + internal_role_id=role_id, + assigned_by="admin@x.com", + ) + result = resolve_internal_roles( + [{"id": "engineers@x.com", "name": "Engineers"}], db_conn, + ) + assert result == ["ctx_admin"] + + def test_resolves_many_to_many(self, db_conn): + """Multiple external groups, multiple roles, with overlap — output + must be sorted + deduplicated.""" + from app.auth.role_resolver import resolve_internal_roles + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + roles = InternalRolesRepository(db_conn) + mappings = GroupMappingsRepository(db_conn) + ctx_id = str(uuid.uuid4()) + agent_id = str(uuid.uuid4()) + roles.create(id=ctx_id, key="ctx_admin", display_name="C") + roles.create(id=agent_id, key="agent_operator", display_name="A") + # engineers → ctx_admin AND agent_operator + mappings.create( + id=str(uuid.uuid4()), external_group_id="eng@x", internal_role_id=ctx_id, + ) + mappings.create( + id=str(uuid.uuid4()), external_group_id="eng@x", internal_role_id=agent_id, + ) + # admins → ctx_admin (overlap with engineers) + mappings.create( + id=str(uuid.uuid4()), external_group_id="admins@x", internal_role_id=ctx_id, + ) + result = resolve_internal_roles( + [{"id": "eng@x", "name": "E"}, {"id": "admins@x", "name": "A"}], + db_conn, + ) + assert result == ["agent_operator", "ctx_admin"] # sorted, deduped + + def test_ignores_malformed_external_group_entries(self, db_conn): + """Defensive: a stray non-dict or missing-id entry shouldn't crash + the resolver — those just get skipped.""" + from app.auth.role_resolver import resolve_internal_roles + result = resolve_internal_roles( + ["not-a-dict", {"name": "no-id"}, {"id": ""}], # type: ignore[list-item] + db_conn, + ) + assert result == [] + + +class TestRequireInternalRole: + """End-to-end via LOCAL_DEV_MODE + LOCAL_DEV_GROUPS: dev user with a + mapped external group passes the gate; without the mapping, 403.""" + + @pytest.fixture + def dev_app_with_mapping(self, tmp_path, monkeypatch): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") + monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!") + monkeypatch.setenv("LOCAL_DEV_MODE", "1") + monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost") + monkeypatch.setenv( + "LOCAL_DEV_GROUPS", + '[{"id":"engineers@example.com","name":"Engineers"}]', + ) + # Register a role + map external group → role BEFORE create_app() so + # the startup sync picks it up and the resolver finds the mapping on + # the first request. + from app.auth.role_resolver import register_internal_role + register_internal_role("ctx_admin", display_name="Context Admin") + + from src.db import get_system_db + conn = get_system_db() + try: + from app.auth.role_resolver import sync_registered_roles_to_db + sync_registered_roles_to_db(conn) + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + role = InternalRolesRepository(conn).get_by_key("ctx_admin") + GroupMappingsRepository(conn).create( + id=str(uuid.uuid4()), + external_group_id="engineers@example.com", + internal_role_id=role["id"], + assigned_by="setup", + ) + finally: + conn.close() + + from app.main import create_app + from fastapi import Depends, FastAPI + from app.auth.role_resolver import require_internal_role + + app = create_app() + # Attach two probe endpoints — one gated by ctx_admin, one by a role + # the dev user does NOT hold. + @app.get("/_test/needs-ctx") + async def needs_ctx(user: dict = Depends(require_internal_role("ctx_admin"))): + return {"ok": True, "email": user["email"]} + + @app.get("/_test/needs-other") + async def needs_other(user: dict = Depends(require_internal_role("never_granted"))): + return {"ok": True} + + return TestClient(app) + + def test_grants_access_when_mapped_role_present(self, dev_app_with_mapping): + resp = dev_app_with_mapping.get("/_test/needs-ctx") + assert resp.status_code == 200 + assert resp.json() == {"ok": True, "email": "dev@localhost"} + + def test_denies_access_when_role_missing(self, dev_app_with_mapping): + resp = dev_app_with_mapping.get("/_test/needs-other") + assert resp.status_code == 403 + assert "never_granted" in resp.json()["detail"] + + def test_session_internal_roles_populated(self, dev_app_with_mapping): + """Direct session inspection — the resolver wrote the resolved role + keys into session.internal_roles, decoupled from any HTML template.""" + # Hit any auth-required endpoint to trigger the resolver. + dev_app_with_mapping.get("/_test/needs-ctx") + from itsdangerous import TimestampSigner + import base64, json as _json + cookie = dev_app_with_mapping.cookies.get("session") + assert cookie, "session cookie missing" + signer = TimestampSigner(os.environ["SESSION_SECRET"]) + unsigned = signer.unsign(cookie, max_age=14 * 24 * 3600) + payload = _json.loads(base64.b64decode(unsigned)) + assert payload.get("internal_roles") == ["ctx_admin"] + + def test_stale_session_keeps_old_roles_after_mapping_change(self, dev_app_with_mapping): + """KNOWN LIMITATION (documented in docs/internal-roles.md → Resolution + timing): roles are resolved at sign-in only. If an admin revokes a + mapping mid-session, the user keeps the cached role keys until they + log out + back in. This test pins that behavior so any future cache + invalidation pathway (admin UI broadcast, deactivate-then-reactivate + side-effect) is a deliberate change, not an accident.""" + # First request — dev-bypass populates session.internal_roles=["ctx_admin"]. + resp1 = dev_app_with_mapping.get("/_test/needs-ctx") + assert resp1.status_code == 200 + + # Admin revokes the mapping out-of-band. + from src.db import get_system_db + from src.repositories.group_mappings import GroupMappingsRepository + from src.repositories.internal_roles import InternalRolesRepository + conn = get_system_db() + try: + role = InternalRolesRepository(conn).get_by_key("ctx_admin") + existing = GroupMappingsRepository(conn).list_by_role(role["id"]) + for m in existing: + GroupMappingsRepository(conn).delete(m["id"]) + finally: + conn.close() + + # Second request — session still holds the cached role; gate still passes. + # The dev-bypass write-skip path (groups_changed=False AND + # internal_roles already in session) keeps the session value intact, + # mirroring the OAuth flow where session lives until logout. + resp2 = dev_app_with_mapping.get("/_test/needs-ctx") + assert resp2.status_code == 200, ( + "Stale-session contract broken: revoking a mapping must NOT " + "drop access mid-session today. If this assertion starts " + "failing, decide deliberately whether you've added " + "invalidation (good — update the doc) or introduced a " + "regression that double-resolves on every request (bad)." + ) + + def test_pat_caller_gets_pat_specific_403_detail(self): + """Bearer/PAT requests don't carry session-resolved roles + (session middleware exists but the OAuth callback is the only + writer of session.internal_roles). require_internal_role must + fail closed AND surface a PAT-specific message so an API + consumer hitting the wall sees what to fix instead of a + generic 'missing role' from a token they thought was admin.""" + from unittest.mock import MagicMock + import asyncio + from fastapi import HTTPException + from app.auth.role_resolver import require_internal_role + + # PAT request shape: session middleware ran (session attribute exists), + # but OAuth callback never fired (no "internal_roles" key in dict). + request = MagicMock() + request.session = {} # empty — no "internal_roles" key + + check = require_internal_role("ctx_admin") + with pytest.raises(HTTPException) as exc_info: + asyncio.run(check(request=request, user={"email": "pat@example.com"})) + + assert exc_info.value.status_code == 403 + # The detail spells out the PAT/Bearer caveat, not just the missing role. + detail = exc_info.value.detail + assert "ctx_admin" in detail + assert "Bearer" in detail or "PAT" in detail + assert "session" in detail.lower() + + def test_oauth_pipeline_groups_to_internal_roles(self, db_conn): + """End-to-end data flow: fake _fetch_google_groups output (the + only Cloud Identity touchpoint) → join against group_mappings → + internal_roles list. The OAuth handshake itself isn't exercised + here — its failure modes live in _fetch_google_groups, which + has its own coverage. This test pins the resolver as the + contract between 'whatever Google returned' and + 'session.internal_roles'.""" + from app.auth.role_resolver import ( + register_internal_role, + sync_registered_roles_to_db, + resolve_internal_roles, + ) + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + + register_internal_role("ctx_admin", display_name="Context Admin") + register_internal_role("agent_op", display_name="Agent Operator") + sync_registered_roles_to_db(db_conn) + + ctx = InternalRolesRepository(db_conn).get_by_key("ctx_admin") + agent = InternalRolesRepository(db_conn).get_by_key("agent_op") + gm = GroupMappingsRepository(db_conn) + gm.create( + id=str(uuid.uuid4()), + external_group_id="engineers@example.com", + internal_role_id=ctx["id"], + ) + gm.create( + id=str(uuid.uuid4()), + external_group_id="ops@example.com", + internal_role_id=agent["id"], + ) + + # Simulate Google's response: two mapped groups + one unrelated. + google_groups = [ + {"id": "engineers@example.com", "name": "Engineering"}, + {"id": "ops@example.com", "name": "Operations"}, + {"id": "marketing@example.com", "name": "Marketing"}, # unmapped + ] + result = resolve_internal_roles(google_groups, db_conn) + assert result == ["agent_op", "ctx_admin"] # sorted, deduped + + def test_dev_bypass_falls_back_to_empty_on_resolver_error( + self, tmp_path, monkeypatch + ): + """If resolve_internal_roles raises mid-request (corrupted DB, + schema mid-migration, transient lock), the dev-bypass path + catches and writes []. Auth must never break on resolver + infrastructure failures — same defensive contract as the OAuth + callback's try/except wrapper.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") + monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!") + monkeypatch.setenv("LOCAL_DEV_MODE", "1") + monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost") + monkeypatch.setenv( + "LOCAL_DEV_GROUPS", + '[{"id":"engineers@example.com","name":"Engineers"}]', + ) + # Patch the symbol on the module so the lazy import inside the + # dev-bypass branch picks up the broken stub on call. + import app.auth.role_resolver as rr + + def boom(*_args, **_kwargs): + raise RuntimeError("simulated resolver failure") + + monkeypatch.setattr(rr, "resolve_internal_roles", boom) + + from app.main import create_app + from fastapi import Depends, FastAPI + from app.auth.dependencies import get_current_user + + app = create_app() + + @app.get("/_test/probe") + async def probe(user: dict = Depends(get_current_user)): + return {"email": user["email"]} + + client = TestClient(app) + # Auth still succeeds — resolver failure must not 500/401 the request. + resp = client.get("/_test/probe") + assert resp.status_code == 200 + assert resp.json()["email"] == "dev@localhost"