From 6c36b2697948aa9c647e7b8ce45cd6cb7be27fbd Mon Sep 17 00:00:00 2001 From: Petr Simecek Date: Sun, 26 Apr 2026 23:49:10 +0200 Subject: [PATCH] =?UTF-8?q?release(0.11.3):=20internal=20roles=20+=20exter?= =?UTF-8?q?nal=E2=86=92internal=20group=20mapping=20(foundation)=20(#71)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(auth): internal roles + external→internal group mapping (foundation) Two-layer authorization model: external Cloud Identity groups (org-managed) get mapped onto internal Agnes-defined capabilities (app-managed) via an admin-curated many-to-many table. Per-request permission checks read off the session — no DB hit. Refresh requires re-login. Schema v8 — new tables: - internal_roles (id, key UNIQUE, display_name, description, owner_module, …) — app-defined capabilities like 'context_admin'. Modules self-register at import; the startup hook syncs the registry into this table (idempotent). - group_mappings (id, external_group_id, internal_role_id FK, …) — admin-managed bindings, UNIQUE(external_group_id, internal_role_id). app/auth/role_resolver.py — new module: - register_internal_role(key, display_name, description, owner_module) Module-author entry point. lower_snake_case key, immutable, validated. Same key + same fields = no-op (re-import safe); same key + different fields = ValueError so two modules can't silently overwrite each other. - sync_registered_roles_to_db(conn) — startup reconciliation. Inserts new keys, updates drifted metadata, never deletes (preserves mappings). - resolve_internal_roles(external_groups, conn) — joins group_mappings. Sorted, deduplicated role-key list. Plugged into google_callback + dev-bypass branch in get_current_user. - require_internal_role('key') — FastAPI dependency factory; reads session.internal_roles; 403 with explicit message when missing. Resolution runs at sign-in only (Google callback + LOCAL_DEV_GROUPS change in dev-bypass) — same semantics as session.google_groups. No admin UI yet; mappings created via repository directly until follow-up PR ships UI. 21 new tests in tests/test_role_resolver.py: register/list, idempotency, collision detection, key-format validation; sync insert/update/no-delete; resolve empty/single/many-to-many/malformed-input; e2e via LOCAL_DEV_GROUPS — gated endpoint allowed/denied + direct session-cookie inspection. Full sweep: 178/178 passed across auth + db + repo tests. (Two pre-existing test_catalog_export.py failures verified unrelated.) * fix(auth): polish review feedback — first-request dev populate + PAT doc Two follow-ups from a code-reviewer pass on the foundation commit before opening the PR: - Dev-bypass populates session["internal_roles"] on the first request after sign-in, not just when external groups change. The previous guard only resolved when groups_changed=True, which left a hole for the LOCAL_DEV_GROUPS=`""` (explicit empty) flow: target=[], current=None, neither write branch fires, internal_roles stays unset, and require_internal_role then 403s with no roles to check against. The OAuth callback writes session["internal_roles"] unconditionally on sign-in (even []); dev-bypass now matches that semantics. Adds a single-pass populate gated on the key being absent from the session, so subsequent same-state requests still no-op (cheap session lookup, no resolver call). - Document that internal roles are session-scoped and PAT/headless clients will get 403 from any require_internal_role(...) endpoint. Same constraint already applies to session.google_groups (PAT JWTs deliberately don't snapshot group memberships — they could change after issuance with no way to re-sign), but the doc didn't surface this — an operator pointing a CLI at a role-gated endpoint would see 403 with no clue why. New "PAT and headless requests" section spells out the constraint, the rationale, and the three escape valves (use users.role for the gate; route through OAuth; wait for the planned `da admin grant-role` CLI helper). 54 auth tests still pass locally (21 role-resolver + 33 existing auth-provider). * release(0.11.3): cut release for the internal-roles foundation Bumps pyproject.toml 0.11.2 → 0.11.3 and renames CHANGELOG's [Unreleased] section to [0.11.3] — 2026-04-26 (with a fresh empty [Unreleased] skeleton appended). Adds the matching [0.11.3] link reference at the bottom of CHANGELOG so the section heading renders as a hyperlink to the GitHub release page once the tag lands. The bullet itself is unchanged content; the rephrasing of "dev-bypass when external groups change" → "dev-bypass — populates on first request and whenever external groups change, mirroring the OAuth callback's always-write semantics" reflects the polish committed in d590579, plus the appended PAT/headless caveat pointing at the doc section that landed in the same polish pass. * fix(auth): address review feedback from Pavel — PAT-specific 403, audit logs, hardening Round-2 polish over the internal-roles foundation, addressing Pavel's review on PR #71. No behavior change for the happy path; tightens the safety rails and makes the failure modes self-explanatory. User-visible: - require_internal_role now distinguishes "no session" (Bearer/PAT caller) from "signed in but missing role" and surfaces a PAT-specific 403 detail in the first case ("This endpoint needs an interactive (OAuth) session — Bearer/PAT tokens do not carry session-resolved roles by design"). - docs/internal-roles.md documents deactivate+reactivate as the supported "force re-resolve now" lever for users that can't be made to log out. Internal hardening: - INFO-level audit log on every successful resolve (OAuth callback + dev-bypass) so a wrong-role complaint is debuggable from the log alone. - Startup warning when SESSION_SECRET is shorter than 32 chars, matching the existing JWT_SECRET_KEY gate — both HMAC surfaces sign trust-laden state (session.internal_roles, session.google_groups, JWTs). - _clear_registry_for_tests() now refuses to run unless TESTING=1 so a stray import path in production can't drop the registered capabilities. Tests: - 4 new tests in tests/test_role_resolver.py covering: stale-session contract after a mid-session mapping revoke (pin the documented limitation), PAT 403 detail wording, OAuth pipeline data flow from external groups to internal_roles, and the dev-bypass empty-list fallback when the resolver raises. CHANGELOG.md updated under [0.11.3] (### Changed + ### Internal). CLAUDE.md schema doc bumped from v7 to v8. --------- Co-authored-by: Claude --- CHANGELOG.md | 19 ++ CLAUDE.md | 2 +- app/auth/dependencies.py | 25 ++ app/auth/providers/google.py | 27 ++ app/auth/role_resolver.py | 226 +++++++++++++++ app/main.py | 33 +++ docs/internal-roles.md | 121 ++++++++ pyproject.toml | 2 +- src/db.py | 57 +++- src/repositories/group_mappings.py | 104 +++++++ src/repositories/internal_roles.py | 82 ++++++ tests/test_role_resolver.py | 435 +++++++++++++++++++++++++++++ 12 files changed, 1130 insertions(+), 3 deletions(-) create mode 100644 app/auth/role_resolver.py create mode 100644 docs/internal-roles.md create mode 100644 src/repositories/group_mappings.py create mode 100644 src/repositories/internal_roles.py create mode 100644 tests/test_role_resolver.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e635bd..5d36214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,24 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C +## [0.11.3] — 2026-04-26 + +Authorization-foundation release — adds the internal-roles layer between Cloud Identity groups and per-module capability checks. Schema v8 migration; no admin UI yet (follow-up). + +### Added + +- **Internal roles + group mapping (foundation).** Schema v8 adds two tables: `internal_roles` (app-defined capabilities like `context_admin`, `agent_operator`, registered by Agnes modules at import time) and `group_mappings` (many-to-many bindings of Cloud Identity group IDs to internal role keys, managed by admins). New `app.auth.role_resolver` module exposes `register_internal_role(...)` for module authors, `sync_registered_roles_to_db(...)` (run once at startup, idempotent), `resolve_internal_roles(external_groups, conn)` (called at sign-in, writes resolved keys into `session["internal_roles"]`), and a `require_internal_role("…")` FastAPI dependency factory for permission checks. Resolution runs at sign-in (Google OAuth callback + dev-bypass — populates on first request and whenever external groups change, mirroring the OAuth callback's always-write semantics). No DB hit per request. Refresh requires re-login, same semantics as `session.google_groups`. **No admin UI yet** — mapping rows must be created via the repository directly until the management UI ships in a follow-up. PAT/headless clients carry no session and therefore cannot pass `require_internal_role` gates by design — `require_internal_role` distinguishes "signed-in but missing role" from "no session at all" and surfaces a PAT-specific 403 detail in the second case so an API consumer hitting the wall sees what to fix. See `docs/internal-roles.md` → *PAT and headless requests*. + +### Changed + +- `docs/internal-roles.md` documents `Admin → Users → deactivate then reactivate` as the supported "force re-resolve now" lever for users you can't get to log out (long-lived sessions, automated clients) — invalidates the existing session and forces a fresh sign-in on the next request. + +### Internal + +- INFO-level audit log on every successful resolve (OAuth callback + dev-bypass) so a "wrong role" complaint is debuggable from the log alone — admin can correlate "user X claims they lost access" with the resolver output without replaying the request. +- Startup warning when `SESSION_SECRET` is shorter than 32 chars, matching the existing `JWT_SECRET_KEY` gate. Both HMAC surfaces sign trust-laden state (`session.internal_roles`, `session.google_groups`, JWTs) — keeping the two gates consistent so a weak secret gets surfaced at boot, not after a quiet downgrade. +- `_clear_registry_for_tests()` now refuses to run unless `TESTING=1` so a stray import path in production can't drop the registered capabilities. + ## [0.11.2] — 2026-04-26 Dev-experience patch release — make `LOCAL_DEV_MODE` realistic enough to actually exercise group-aware code paths on `localhost`, and consolidate scattered dev-onboarding instructions into a single `docs/local-development.md`. @@ -116,6 +134,7 @@ First tagged semver release. The `version = "2.x"` strings that appeared in earl - Test suite expanded to 1357+ tests (4 layers — unit, integration, web smoke, journey). +[0.11.3]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.3 [0.11.2]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.2 [0.11.1]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.1 [0.11.0]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.0 diff --git a/CLAUDE.md b/CLAUDE.md index 8343c9d..2645cb1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -232,7 +232,7 @@ Module sets `lifecycle { ignore_changes = [metadata_startup_script] }` on `googl ## Key Implementation Details ### DuckDB Schema (src/db.py) -- Schema v7 with auto-migration from v1→v2→v3→v4→v5→v6→v7 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`) +- Schema v8 with auto-migration v1→…→v8 (v5 adds `users.active`, v6 adds `personal_access_tokens`, v7 adds `personal_access_tokens.last_used_ip`, v8 adds `internal_roles` + `group_mappings`) - `table_registry`: id, name, source_type, bucket, source_table, query_mode, sync_schedule, etc. - `sync_state`, `sync_history`: track extraction progress - `users`, `dataset_permissions`, `audit_log`: auth + RBAC diff --git a/app/auth/dependencies.py b/app/auth/dependencies.py index 7bcb726..3fd7644 100644 --- a/app/auth/dependencies.py +++ b/app/auth/dependencies.py @@ -145,12 +145,37 @@ async def get_current_user( if request is not None and hasattr(request, "session"): target_groups = get_local_dev_groups() current = request.session.get("google_groups") + groups_changed = False if target_groups and current != target_groups: request.session["google_groups"] = target_groups + groups_changed = True elif not target_groups and current: # Clear stale groups if the operator unsets LOCAL_DEV_GROUPS # mid-session — matches production's "always-write" semantics. request.session["google_groups"] = [] + groups_changed = True + # Populate internal_roles whenever it would otherwise be missing + # — first request after sign-in or any time groups changed. This + # mirrors the OAuth callback's unconditional write so a dev + # request never reaches require_internal_role with the key + # absent. Skipping when role list is already cached + groups + # didn't change keeps the per-request cost at a session lookup. + if groups_changed or "internal_roles" not in request.session: + try: + from app.auth.role_resolver import resolve_internal_roles + resolved = resolve_internal_roles(target_groups, conn) + request.session["internal_roles"] = resolved + logger.info( + "dev-bypass resolved %d internal role(s) for %s: %s", + len(resolved), + user.get("email", ""), + resolved or "", + ) + except Exception as e: + logger.warning( + "dev-bypass: resolve_internal_roles failed: %s", e, + ) + request.session["internal_roles"] = [] return user # Fall through to normal auth if seed missing — surfaces the bug instead of hiding it. diff --git a/app/auth/providers/google.py b/app/auth/providers/google.py index b0a92ed..cce0dcf 100644 --- a/app/auth/providers/google.py +++ b/app/auth/providers/google.py @@ -193,6 +193,33 @@ async def google_callback(request: Request): else: request.session["google_groups"] = [] + # Resolve external groups into internal role keys at sign-in. Cached + # on the session for the lifetime of this login — refresh requires + # re-login, same as the google_groups list itself. + try: + from app.auth.role_resolver import resolve_internal_roles + from src.db import get_system_db + conn = get_system_db() + try: + resolved = resolve_internal_roles( + request.session.get("google_groups", []) or [], + conn, + ) + finally: + conn.close() + request.session["internal_roles"] = resolved + # INFO-level audit so a wrong-role complaint is debuggable from + # the log alone — admin can correlate "user X claims they lost + # access" with the resolver output without replaying the request. + logger.info( + "Resolved %d internal role(s) for %s: %s", + len(resolved), email, resolved or "", + ) + except Exception as e: + # Resolver errors must not break login — fall back to no roles. + logger.warning("Failed to resolve internal_roles for %s: %s", email, e) + request.session["internal_roles"] = [] + # Issue JWT jwt_token = create_access_token(user["id"], user["email"], user["role"]) diff --git a/app/auth/role_resolver.py b/app/auth/role_resolver.py new file mode 100644 index 0000000..fce6a7c --- /dev/null +++ b/app/auth/role_resolver.py @@ -0,0 +1,226 @@ +"""Internal-role registry, resolver, and FastAPI dependency factory. + +## Lifecycle + +1. **Module import** — each Agnes module declares its internal roles via + ``register_internal_role(...)``. The registry is module-level state, so + the registration happens once per process. +2. **App startup** — ``sync_registered_roles_to_db(conn)`` inserts any + newly-registered keys into ``internal_roles`` and refreshes the metadata + (display_name, description, owner_module) on existing rows. Idempotent. +3. **Sign-in** — ``resolve_internal_roles(external_groups, conn)`` joins + ``session.google_groups`` against ``group_mappings`` and writes the + resulting role-key list into ``session["internal_roles"]``. +4. **Request handling** — ``require_internal_role("context_admin")`` reads + the cached list off the session; no DB hit per request. + +## Refresh semantics + +Resolution happens at sign-in, so a user with a stale session keeps stale +roles after an admin changes a mapping. ``Logout → sign in again`` is the +only refresh path today — the same semantics as Google's group cache and +the existing ``session.google_groups`` flow. +""" + +from __future__ import annotations + +import logging +import os +import re +import uuid +from dataclasses import dataclass +from typing import Optional + +import duckdb +from fastapi import Depends, HTTPException, Request, status + +from app.auth.dependencies import get_current_user +from src.repositories.group_mappings import GroupMappingsRepository +from src.repositories.internal_roles import InternalRolesRepository + +logger = logging.getLogger(__name__) + + +_ROLE_KEY_RE = re.compile(r"^[a-z][a-z0-9_]{0,63}$") + + +@dataclass(frozen=True) +class InternalRoleSpec: + """Module-side declaration of an internal role. + + Mirrors the persisted shape minus ``id`` (assigned at sync time) and + timestamps. Frozen so a stray mutation can't desync registry from DB. + """ + key: str + display_name: str + description: str = "" + owner_module: Optional[str] = None + + +# Module-level registry. Populated by register_internal_role() at import time; +# drained by sync_registered_roles_to_db() at app startup. Kept module-level +# (not class-state) because role registration is conceptually per-process. +_REGISTRY: dict[str, InternalRoleSpec] = {} + + +def register_internal_role( + key: str, + *, + display_name: str, + description: str = "", + owner_module: Optional[str] = None, +) -> None: + """Declare an internal role at module-import time. + + ``key`` is the immutable identifier referenced from code (e.g. + ``"context_admin"``); must match ``[a-z][a-z0-9_]{0,63}``. Calling twice + with the same key + same fields is a no-op (re-import safe). Calling + twice with conflicting fields raises ``ValueError`` — that almost always + means two modules picked the same key, which would leave admins unable + to tell which capability they're granting in the mapping UI. + """ + if not _ROLE_KEY_RE.match(key): + raise ValueError( + f"Invalid internal role key {key!r}: must be lower_snake_case, " + f"start with a letter, max 64 chars." + ) + spec = InternalRoleSpec( + key=key, + display_name=display_name, + description=description, + owner_module=owner_module, + ) + existing = _REGISTRY.get(key) + if existing is not None and existing != spec: + raise ValueError( + f"Internal role {key!r} already registered with different fields " + f"(existing={existing}, new={spec}). Pick a unique key." + ) + _REGISTRY[key] = spec + + +def list_registered_roles() -> list[InternalRoleSpec]: + """Snapshot of the current registry — sorted by key for stable output.""" + return sorted(_REGISTRY.values(), key=lambda s: s.key) + + +def _clear_registry_for_tests() -> None: + """Reset the module-level registry. Tests only — never call from app code. + + Refuses to run unless ``TESTING=1`` so a stray import-path in production + can't accidentally drop the registered capabilities. Pytest sets this + via conftest / pytest.ini; production never does. + """ + if os.environ.get("TESTING", "").lower() not in ("1", "true"): + raise RuntimeError( + "_clear_registry_for_tests() called outside of TESTING — " + "this drops every registered internal role and is never safe " + "in app code. Set TESTING=1 if you really mean this.", + ) + _REGISTRY.clear() + + +def sync_registered_roles_to_db(conn: duckdb.DuckDBPyConnection) -> None: + """Reconcile registered roles into ``internal_roles``. Idempotent. + + Inserts new keys, updates display_name/description/owner_module for + existing keys when they've changed. Never deletes — a role disappearing + from code may just mean the module was unloaded; the DB row keeps the + mappings safe until an admin explicitly removes it. + """ + repo = InternalRolesRepository(conn) + inserted = 0 + updated = 0 + for spec in _REGISTRY.values(): + existing = repo.get_by_key(spec.key) + if existing is None: + repo.create( + id=str(uuid.uuid4()), + key=spec.key, + display_name=spec.display_name, + description=spec.description, + owner_module=spec.owner_module, + ) + inserted += 1 + else: + drift = ( + existing.get("display_name") != spec.display_name + or (existing.get("description") or "") != spec.description + or (existing.get("owner_module") or None) != spec.owner_module + ) + if drift: + repo.update( + id=existing["id"], + display_name=spec.display_name, + description=spec.description, + owner_module=spec.owner_module, + ) + updated += 1 + if inserted or updated: + logger.info( + "internal_roles sync: %d inserted, %d updated, %d total registered", + inserted, updated, len(_REGISTRY), + ) + + +def resolve_internal_roles( + external_groups: list[dict], + conn: duckdb.DuckDBPyConnection, +) -> list[str]: + """Map ``session.google_groups`` to internal role keys via ``group_mappings``. + + Pure read of the mapping table — never mutates state. Returns a sorted, + de-duplicated list of role keys. Empty list when no external groups are + supplied or none of them are mapped. + """ + ids = [g["id"] for g in external_groups if isinstance(g, dict) and g.get("id")] + if not ids: + return [] + return GroupMappingsRepository(conn).resolve_role_keys(ids) + + +def require_internal_role(role_key: str): + """FastAPI dependency factory: 403 unless the user holds ``role_key``. + + Reads ``session["internal_roles"]`` populated at sign-in; no DB hit. + The ``user`` dependency runs first so we still 401 unauthenticated + requests with the standard message before checking role membership. + + PAT/headless callers carry no session-resolved roles (the OAuth callback + is the only writer of ``session["internal_roles"]``), so any + ``require_internal_role`` gate returns 403 for them by design — see + ``docs/internal-roles.md`` → *PAT and headless requests*. The 403 detail + distinguishes the two failure modes so an API consumer hitting it via + a token sees an actionable message instead of a generic "missing role". + """ + async def _check( + request: Request, + user: dict = Depends(get_current_user), + ) -> dict: + # "internal_roles in session" is the marker that this request went + # through a sign-in flow (OAuth callback or dev-bypass). Bearer-token + # callers leave the key absent — distinguish those from "signed in, + # but lacks this specific role" so the API consumer sees what to fix. + has_session_roles = ( + hasattr(request, "session") + and "internal_roles" in request.session + ) + roles = ( + request.session.get("internal_roles") or [] + if has_session_roles else [] + ) + if role_key not in roles: + if not has_session_roles: + detail = ( + f"Requires internal role '{role_key}'. This endpoint needs " + f"an interactive (OAuth) session — Bearer/PAT tokens do not " + f"carry session-resolved roles by design." + ) + else: + detail = f"Requires internal role '{role_key}'" + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=detail, + ) + return user + return _check diff --git a/app/main.py b/app/main.py index 52a63a7..f86a709 100644 --- a/app/main.py +++ b/app/main.py @@ -109,6 +109,16 @@ def create_app() -> FastAPI: # Session middleware (required for OAuth state) from app.secrets import get_session_secret session_secret = get_session_secret() + if len(session_secret) < 32: + # Same gate JWT applies (app/auth/jwt.py:_get_secret_key) — keeps the + # two HMAC surfaces consistent. session_internal_roles + google_groups + # are trusted off the cookie signature; a weak SESSION_SECRET means + # those gates are weak too. + import warnings as _warnings + _warnings.warn( + f"SESSION_SECRET is {len(session_secret)} chars — minimum 32 recommended", + UserWarning, stacklevel=2, + ) app.add_middleware(SessionMiddleware, secret_key=session_secret) # CORS for CLI and external clients @@ -210,6 +220,29 @@ def create_app() -> FastAPI: except Exception as e: logger.warning(f"Could not seed admin: {e}") + # Sync internal-role registry into DB. Modules call register_internal_role() + # at import time; this hook reconciles the registry into the internal_roles + # table so the mapping UI has something to show. Idempotent — safe to run + # on every startup. + try: + from app.auth.role_resolver import ( + sync_registered_roles_to_db, list_registered_roles, + ) + from src.db import get_system_db + conn = get_system_db() + try: + sync_registered_roles_to_db(conn) + finally: + conn.close() + registered = list_registered_roles() + if registered: + logger.info( + "internal_roles registered: %s", + ", ".join(s.key for s in registered), + ) + except Exception as e: + logger.warning("internal_roles sync failed at startup: %s", e) + # Static files static_dir = Path(__file__).parent / "web" / "static" if static_dir.exists(): diff --git a/docs/internal-roles.md b/docs/internal-roles.md new file mode 100644 index 0000000..6b88c32 --- /dev/null +++ b/docs/internal-roles.md @@ -0,0 +1,121 @@ +# Internal roles + external group mapping + +Two-layer authorization model for Agnes: + +- **External groups** — Cloud Identity / Google Workspace groups, pulled at sign-in into `session.google_groups`. Owned by the organization; Agnes only reads them. See `docs/auth-groups.md`. +- **Internal roles** — Agnes-defined capabilities (e.g. `context_admin`, `agent_operator`, `dataset_finance_reader`). Owned by Agnes. Registered in code by module authors, persisted in the `internal_roles` table. +- **Group mappings** — admin-managed many-to-many table binding external group IDs to internal role keys. The resolver joins this table at sign-in and writes the resolved role keys into `session["internal_roles"]`. + +Permission checks read off the session — no DB hit per request. + +## When to use which + +| You want to gate on … | Use … | +|---|---| +| "Is this user signed in at all?" | `Depends(get_current_user)` | +| "Coarse global role" (admin / analyst / viewer) | `Depends(require_admin)` / `Depends(require_role(Role.ANALYST))` — `users.role` column | +| "Specific module capability" | `Depends(require_internal_role("context_admin"))` — this doc | + +`users.role` stays the coarse gate for "may enter the building"; internal roles are the fine-grained per-module capabilities layered on top. + +## Module-author workflow (registering a role) + +In your module's import path (e.g. `services/context_engineering/__init__.py`): + +```python +from app.auth.role_resolver import register_internal_role + +register_internal_role( + "context_admin", + display_name="Context Engineering Admin", + description="Manages prompt templates and retrieval settings.", + owner_module="context_engineering", +) +``` + +Constraints on `key`: +- lower_snake_case, starts with a letter, ≤ 64 chars (`^[a-z][a-z0-9_]{0,63}$`) +- **immutable** — referenced from code; renaming would silently break every existing mapping. Pick carefully. +- registering the same key twice with the **same** fields is a no-op (re-import safe); registering with **different** fields raises `ValueError`. If two modules collide, one of them must rename. + +`register_internal_role` only populates the in-process registry. The startup hook in `app/main.py` calls `sync_registered_roles_to_db(conn)` to reconcile the registry into the `internal_roles` table: +- **Inserts** keys that don't exist yet +- **Updates** `display_name` / `description` / `owner_module` when they've drifted from code +- **Never deletes** — a role disappearing from code (module unloaded) keeps its DB row and any mappings until an admin explicitly removes it + +## Admin workflow (mapping external → internal) + +Until the management UI ships, mappings are created via repository directly: + +```python +from src.db import get_system_db +from src.repositories.group_mappings import GroupMappingsRepository +from src.repositories.internal_roles import InternalRolesRepository +import uuid + +conn = get_system_db() +role = InternalRolesRepository(conn).get_by_key("context_admin") +GroupMappingsRepository(conn).create( + id=str(uuid.uuid4()), + external_group_id="engineering@example.com", # Cloud Identity group ID + internal_role_id=role["id"], + assigned_by="admin@example.com", +) +conn.close() +``` + +After the mapping is created, affected users must **sign out and back in** for the resolver to pick it up — same refresh semantics as Google's group cache. + +If you can't get the user to log out (long-lived session, automated client), `Admin → Users → deactivate then reactivate` invalidates the existing session and forces a fresh sign-in on the next request. That is the supported "force re-resolve now" lever — there's no per-user role-cache invalidation API today. + +## Permission check (callsite) + +```python +from fastapi import Depends +from app.auth.role_resolver import require_internal_role + +@router.post("/context/templates") +async def update_template( + body: TemplateUpdate, + user: dict = Depends(require_internal_role("context_admin")), +): + ... +``` + +The dependency reads `session["internal_roles"]` (populated at sign-in); a missing role returns `403 Requires internal role 'context_admin'`. Unauthenticated requests still get `401` from the upstream `get_current_user` dependency. + +## Local development + +`LOCAL_DEV_GROUPS` mocks `session.google_groups` (see `docs/auth-groups.md` → *Local-dev mock*). The dev-bypass branch in `app/auth/dependencies.py` re-runs the resolver every time the mocked groups change, so editing `LOCAL_DEV_GROUPS` + hitting any auth-required endpoint refreshes `session["internal_roles"]` on the next request — no need to bounce the app. + +Typical dev setup: + +```bash +export LOCAL_DEV_MODE=1 +export LOCAL_DEV_GROUPS='[{"id":"engineering@example.com","name":"Engineering"}]' +# Register the role + create the mapping (one-shot script or manual SQL), +# then hit any protected endpoint — dev user now holds context_admin. +``` + +## PAT and headless requests + +Internal roles are **session-scoped only**. Personal Access Tokens (PAT) and other Bearer-token clients carry a JWT that proves identity but not a signed session cookie, so `session["internal_roles"]` is never populated for them. Concretely: any endpoint protected by `Depends(require_internal_role(…))` will return `403` for a PAT client even when the corresponding user's external groups would map to that role through a browser sign-in. + +This is intentional, not a bug — the same constraint already applies to `session.google_groups`, and PAT-issued JWTs deliberately don't snapshot that list (the user's group memberships can change after the token was issued without any way to re-sign the token). Two practical implications: + +- **Don't gate PAT-callable endpoints with `require_internal_role`.** Use `users.role` (`require_admin` / `require_role(Role.ANALYST)`) for the coarse check, or check the JWT claims directly. Internal roles fit OAuth-flow consumers (the web UI) and the dev bypass. +- **If you need a CLI/script to act with elevated capability**, the current escape valves are: (a) issue the PAT to a user whose `users.role` already covers it, (b) call the endpoint through the OAuth flow from a browser session, or (c) wait for the planned `da admin grant-role` CLI helper (see *Future work*) which will store an explicit per-user grant outside the group-mapping flow. + +## Resolution timing + +Resolver runs at sign-in only: +- Google OAuth callback (`app/auth/providers/google.py`) — after `_fetch_google_groups`, before issuing the JWT +- Dev-bypass branch (`app/auth/dependencies.py`) — when `LOCAL_DEV_GROUPS` value changes for a session + +Per-request reads are off `session["internal_roles"]` only; no DB hit. Trade-off: a user with a stale session keeps stale roles until they log out + back in. Same as `session.google_groups`. Cheaper than per-request DB lookup; matches the existing mental model. + +## Future work (not in this PR) + +- Admin UI under `/admin/role-mapping` — list registered roles + their current mappings, add/remove mappings, surface drift between registry and DB. +- Audit-log entries for mapping create/delete (write into `audit_log` with `action="role_mapping.created"` / `"role_mapping.deleted"`, `resource=f"mapping:{id}"`). +- Optional CLI helper `da admin grant-role ` for ad-hoc grants without going through external groups. diff --git a/pyproject.toml b/pyproject.toml index 0b2da7e..447f055 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.11.2" +version = "0.11.3" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/src/db.py b/src/db.py index b051875..3b74db2 100644 --- a/src/db.py +++ b/src/db.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 7 +SCHEMA_VERSION = 8 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -221,6 +221,34 @@ CREATE TABLE IF NOT EXISTS personal_access_tokens ( last_used_ip VARCHAR, revoked_at TIMESTAMP ); + +-- Internal roles: app-defined capabilities (e.g. 'context_admin', 'agent_operator'). +-- `key` is the immutable lower_snake_case identifier referenced from code; modules +-- self-register their roles on import and the startup hook syncs the registry to +-- this table. Admins map external Cloud Identity groups onto these roles via +-- group_mappings — they don't create roles in the UI. +CREATE TABLE IF NOT EXISTS internal_roles ( + id VARCHAR PRIMARY KEY, + key VARCHAR UNIQUE NOT NULL, + display_name VARCHAR NOT NULL, + description TEXT, + owner_module VARCHAR, + created_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + updated_at TIMESTAMP NOT NULL DEFAULT current_timestamp +); + +-- External-to-internal group mapping: which Cloud Identity groups grant which +-- internal role. Many-to-many. The resolver joins this table at sign-in and +-- writes the resulting role keys into session.internal_roles for cheap lookup +-- on subsequent requests. +CREATE TABLE IF NOT EXISTS group_mappings ( + id VARCHAR PRIMARY KEY, + external_group_id VARCHAR NOT NULL, + internal_role_id VARCHAR NOT NULL REFERENCES internal_roles(id), + assigned_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + assigned_by VARCHAR, + UNIQUE (external_group_id, internal_role_id) +); """ @@ -432,6 +460,30 @@ _V6_TO_V7_MIGRATIONS = [ "ALTER TABLE personal_access_tokens ADD COLUMN IF NOT EXISTS last_used_ip VARCHAR", ] +_V7_TO_V8_MIGRATIONS = [ + """ + CREATE TABLE IF NOT EXISTS internal_roles ( + id VARCHAR PRIMARY KEY, + key VARCHAR UNIQUE NOT NULL, + display_name VARCHAR NOT NULL, + description TEXT, + owner_module VARCHAR, + created_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + updated_at TIMESTAMP NOT NULL DEFAULT current_timestamp + ) + """, + """ + CREATE TABLE IF NOT EXISTS group_mappings ( + id VARCHAR PRIMARY KEY, + external_group_id VARCHAR NOT NULL, + internal_role_id VARCHAR NOT NULL REFERENCES internal_roles(id), + assigned_at TIMESTAMP NOT NULL DEFAULT current_timestamp, + assigned_by VARCHAR, + UNIQUE (external_group_id, internal_role_id) + ) + """, +] + _V3_TO_V4_MIGRATIONS = [ """ CREATE TABLE IF NOT EXISTS metric_definitions ( @@ -522,6 +574,9 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 7: for sql in _V6_TO_V7_MIGRATIONS: conn.execute(sql) + if current < 8: + for sql in _V7_TO_V8_MIGRATIONS: + conn.execute(sql) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/src/repositories/group_mappings.py b/src/repositories/group_mappings.py new file mode 100644 index 0000000..582ef47 --- /dev/null +++ b/src/repositories/group_mappings.py @@ -0,0 +1,104 @@ +"""Repository for external→internal group mappings. + +Each row binds a Cloud Identity group ID (as it appears in +``session.google_groups[*].id``) to an internal role. Many-to-many: a single +external group may grant several internal roles, and a single internal role +may be granted by several external groups. The resolver in +``app.auth.role_resolver`` reads this table at sign-in. +""" + +from typing import Any, Dict, List, Optional + +import duckdb + + +class GroupMappingsRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> Optional[Dict[str, Any]]: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def get_by_id(self, mapping_id: str) -> Optional[Dict[str, Any]]: + result = self.conn.execute( + "SELECT * FROM group_mappings WHERE id = ?", [mapping_id] + ).fetchone() + return self._row_to_dict(result) + + def list_all(self) -> List[Dict[str, Any]]: + """All mappings, joined with the role's `key` for display purposes.""" + results = self.conn.execute( + """SELECT m.*, r.key AS internal_role_key, r.display_name AS internal_role_display_name + FROM group_mappings m + JOIN internal_roles r ON r.id = m.internal_role_id + ORDER BY m.external_group_id, r.key""" + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def list_by_external_group(self, external_group_id: str) -> List[Dict[str, Any]]: + results = self.conn.execute( + """SELECT m.*, r.key AS internal_role_key + FROM group_mappings m + JOIN internal_roles r ON r.id = m.internal_role_id + WHERE m.external_group_id = ? + ORDER BY r.key""", + [external_group_id], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def list_by_role(self, internal_role_id: str) -> List[Dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM group_mappings WHERE internal_role_id = ? " + "ORDER BY external_group_id", + [internal_role_id], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def create( + self, + id: str, + external_group_id: str, + internal_role_id: str, + assigned_by: Optional[str] = None, + ) -> None: + self.conn.execute( + """INSERT INTO group_mappings + (id, external_group_id, internal_role_id, assigned_by) + VALUES (?, ?, ?, ?)""", + [id, external_group_id, internal_role_id, assigned_by], + ) + + def delete(self, mapping_id: str) -> None: + self.conn.execute("DELETE FROM group_mappings WHERE id = ?", [mapping_id]) + + def resolve_role_keys(self, external_group_ids: List[str]) -> List[str]: + """Map external group IDs to the set of internal role keys they grant. + + Returns a sorted, de-duplicated list — empty when ``external_group_ids`` + is empty or none of them are mapped. The resolver in + ``app.auth.role_resolver`` calls this on every sign-in. + """ + if not external_group_ids: + return [] + placeholders = ",".join(["?"] * len(external_group_ids)) + rows = self.conn.execute( + f"""SELECT DISTINCT r.key + FROM group_mappings m + JOIN internal_roles r ON r.id = m.internal_role_id + WHERE m.external_group_id IN ({placeholders}) + ORDER BY r.key""", + external_group_ids, + ).fetchall() + return [row[0] for row in rows] diff --git a/src/repositories/internal_roles.py b/src/repositories/internal_roles.py new file mode 100644 index 0000000..1632117 --- /dev/null +++ b/src/repositories/internal_roles.py @@ -0,0 +1,82 @@ +"""Repository for internal roles (app-defined capabilities). + +Internal roles are registered by Agnes modules at import-time via +``app.auth.role_resolver.register_internal_role`` and synced into this table +on startup. Admins map external Cloud Identity groups onto these roles via +``GroupMappingsRepository`` — they don't create roles in the UI. +""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +import duckdb + + +class InternalRolesRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def _row_to_dict(self, row) -> Optional[Dict[str, Any]]: + if not row: + return None + columns = [desc[0] for desc in self.conn.description] + return dict(zip(columns, row)) + + def get_by_id(self, role_id: str) -> Optional[Dict[str, Any]]: + result = self.conn.execute( + "SELECT * FROM internal_roles WHERE id = ?", [role_id] + ).fetchone() + return self._row_to_dict(result) + + def get_by_key(self, key: str) -> Optional[Dict[str, Any]]: + result = self.conn.execute( + "SELECT * FROM internal_roles WHERE key = ?", [key] + ).fetchone() + return self._row_to_dict(result) + + def list_all(self) -> List[Dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM internal_roles ORDER BY key" + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def create( + self, + id: str, + key: str, + display_name: str, + description: Optional[str] = None, + owner_module: Optional[str] = None, + ) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO internal_roles + (id, key, display_name, description, owner_module, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [id, key, display_name, description, owner_module, now, now], + ) + + def update(self, id: str, **kwargs) -> None: + # `key` is intentionally NOT in the allowlist — it's the immutable + # identifier referenced from code; a rename would silently break + # every group mapping pointing at it. Re-register under a new key + # and have an admin re-map. + allowed = {"display_name", "description", "owner_module"} + updates = {k: v for k, v in kwargs.items() if k in allowed} + if not updates: + return + updates["updated_at"] = datetime.now(timezone.utc) + set_clause = ", ".join(f"{k} = ?" for k in updates) + values = list(updates.values()) + [id] + self.conn.execute( + f"UPDATE internal_roles SET {set_clause} WHERE id = ?", values + ) + + def delete(self, role_id: str) -> None: + # Caller should delete dependent group_mappings first (no ON DELETE + # CASCADE on the FK) — surfaces dangling references instead of + # silently dropping mappings. + self.conn.execute("DELETE FROM internal_roles WHERE id = ?", [role_id]) diff --git a/tests/test_role_resolver.py b/tests/test_role_resolver.py new file mode 100644 index 0000000..1cbb541 --- /dev/null +++ b/tests/test_role_resolver.py @@ -0,0 +1,435 @@ +"""Tests for the internal-role registry, sync, resolver, and require dependency. + +Schema v8 adds ``internal_roles`` and ``group_mappings``; the resolver in +``app.auth.role_resolver`` is the integration point between Cloud Identity +groups (external) and Agnes-defined capabilities (internal). End-to-end +exercise rides on LOCAL_DEV_MODE + LOCAL_DEV_GROUPS so we don't need to +mock Google OAuth. +""" + +import os +import uuid + +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def db_conn(tmp_path, monkeypatch): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + from src.db import get_system_db + conn = get_system_db() + yield conn + conn.close() + + +@pytest.fixture(autouse=True) +def _clear_role_registry(): + """Module-level _REGISTRY persists across tests in the same process — + flush before AND after each test so registrations from one test don't + leak into the next, regardless of which fixture ran first.""" + from app.auth.role_resolver import _clear_registry_for_tests + _clear_registry_for_tests() + yield + _clear_registry_for_tests() + + +class TestRegisterInternalRole: + def test_register_and_list(self): + from app.auth.role_resolver import ( + register_internal_role, list_registered_roles, + ) + register_internal_role( + "context_admin", + display_name="Context Admin", + description="Manages the context engineering module.", + owner_module="context_engineering", + ) + register_internal_role("agent_operator", display_name="Agent Operator") + keys = [s.key for s in list_registered_roles()] + assert keys == ["agent_operator", "context_admin"] # sorted + + def test_register_same_key_same_fields_is_idempotent(self): + """Re-importing a module shouldn't blow up — same key + same fields no-ops.""" + from app.auth.role_resolver import ( + register_internal_role, list_registered_roles, + ) + register_internal_role("x", display_name="X") + register_internal_role("x", display_name="X") + assert len(list_registered_roles()) == 1 + + def test_register_same_key_different_fields_raises(self): + """Two modules picking the same key would silently overwrite each + other's metadata — refuse and force one of them to rename.""" + from app.auth.role_resolver import register_internal_role + register_internal_role("x", display_name="X") + with pytest.raises(ValueError, match="already registered"): + register_internal_role("x", display_name="Different") + + @pytest.mark.parametrize("bad_key", [ + "Context_Admin", # uppercase + "1context", # leading digit + "context-admin", # hyphen + "", # empty + "context admin", # space + "x" * 65, # too long + ]) + def test_register_rejects_invalid_keys(self, bad_key): + from app.auth.role_resolver import register_internal_role + with pytest.raises(ValueError, match="Invalid internal role key"): + register_internal_role(bad_key, display_name="X") + + +class TestSyncRegisteredRolesToDb: + def test_inserts_new_roles(self, db_conn): + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + ) + from src.repositories.internal_roles import InternalRolesRepository + register_internal_role("ctx_admin", display_name="Context Admin") + sync_registered_roles_to_db(db_conn) + row = InternalRolesRepository(db_conn).get_by_key("ctx_admin") + assert row is not None + assert row["display_name"] == "Context Admin" + + def test_sync_is_idempotent(self, db_conn): + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + ) + register_internal_role("ctx_admin", display_name="Context Admin") + sync_registered_roles_to_db(db_conn) + sync_registered_roles_to_db(db_conn) # second call must not duplicate + rows = db_conn.execute( + "SELECT COUNT(*) FROM internal_roles WHERE key = 'ctx_admin'" + ).fetchone() + assert rows[0] == 1 + + def test_sync_updates_drifted_metadata(self, db_conn): + """Display name change in code should propagate to DB on next startup.""" + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + _clear_registry_for_tests, + ) + from src.repositories.internal_roles import InternalRolesRepository + register_internal_role("ctx_admin", display_name="Old Name") + sync_registered_roles_to_db(db_conn) + # Simulate a code update: clear the registry and re-register with new name. + _clear_registry_for_tests() + register_internal_role("ctx_admin", display_name="New Name") + sync_registered_roles_to_db(db_conn) + row = InternalRolesRepository(db_conn).get_by_key("ctx_admin") + assert row["display_name"] == "New Name" + + def test_sync_does_not_delete_unregistered_roles(self, db_conn): + """A role disappearing from code (module unloaded) keeps its DB row + + mappings until an admin explicitly removes it.""" + from app.auth.role_resolver import ( + register_internal_role, sync_registered_roles_to_db, + _clear_registry_for_tests, + ) + from src.repositories.internal_roles import InternalRolesRepository + register_internal_role("legacy_role", display_name="Legacy") + sync_registered_roles_to_db(db_conn) + _clear_registry_for_tests() # module no longer registers this role + sync_registered_roles_to_db(db_conn) + row = InternalRolesRepository(db_conn).get_by_key("legacy_role") + assert row is not None # still there + + +class TestResolveInternalRoles: + def test_returns_empty_when_no_external_groups(self, db_conn): + from app.auth.role_resolver import resolve_internal_roles + assert resolve_internal_roles([], db_conn) == [] + + def test_returns_empty_when_no_mappings(self, db_conn): + from app.auth.role_resolver import resolve_internal_roles + groups = [{"id": "engineers@x.com", "name": "Engineers"}] + assert resolve_internal_roles(groups, db_conn) == [] + + def test_resolves_single_mapping(self, db_conn): + from app.auth.role_resolver import resolve_internal_roles + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + roles = InternalRolesRepository(db_conn) + mappings = GroupMappingsRepository(db_conn) + role_id = str(uuid.uuid4()) + roles.create(id=role_id, key="ctx_admin", display_name="Context Admin") + mappings.create( + id=str(uuid.uuid4()), + external_group_id="engineers@x.com", + internal_role_id=role_id, + assigned_by="admin@x.com", + ) + result = resolve_internal_roles( + [{"id": "engineers@x.com", "name": "Engineers"}], db_conn, + ) + assert result == ["ctx_admin"] + + def test_resolves_many_to_many(self, db_conn): + """Multiple external groups, multiple roles, with overlap — output + must be sorted + deduplicated.""" + from app.auth.role_resolver import resolve_internal_roles + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + roles = InternalRolesRepository(db_conn) + mappings = GroupMappingsRepository(db_conn) + ctx_id = str(uuid.uuid4()) + agent_id = str(uuid.uuid4()) + roles.create(id=ctx_id, key="ctx_admin", display_name="C") + roles.create(id=agent_id, key="agent_operator", display_name="A") + # engineers → ctx_admin AND agent_operator + mappings.create( + id=str(uuid.uuid4()), external_group_id="eng@x", internal_role_id=ctx_id, + ) + mappings.create( + id=str(uuid.uuid4()), external_group_id="eng@x", internal_role_id=agent_id, + ) + # admins → ctx_admin (overlap with engineers) + mappings.create( + id=str(uuid.uuid4()), external_group_id="admins@x", internal_role_id=ctx_id, + ) + result = resolve_internal_roles( + [{"id": "eng@x", "name": "E"}, {"id": "admins@x", "name": "A"}], + db_conn, + ) + assert result == ["agent_operator", "ctx_admin"] # sorted, deduped + + def test_ignores_malformed_external_group_entries(self, db_conn): + """Defensive: a stray non-dict or missing-id entry shouldn't crash + the resolver — those just get skipped.""" + from app.auth.role_resolver import resolve_internal_roles + result = resolve_internal_roles( + ["not-a-dict", {"name": "no-id"}, {"id": ""}], # type: ignore[list-item] + db_conn, + ) + assert result == [] + + +class TestRequireInternalRole: + """End-to-end via LOCAL_DEV_MODE + LOCAL_DEV_GROUPS: dev user with a + mapped external group passes the gate; without the mapping, 403.""" + + @pytest.fixture + def dev_app_with_mapping(self, tmp_path, monkeypatch): + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") + monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!") + monkeypatch.setenv("LOCAL_DEV_MODE", "1") + monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost") + monkeypatch.setenv( + "LOCAL_DEV_GROUPS", + '[{"id":"engineers@example.com","name":"Engineers"}]', + ) + # Register a role + map external group → role BEFORE create_app() so + # the startup sync picks it up and the resolver finds the mapping on + # the first request. + from app.auth.role_resolver import register_internal_role + register_internal_role("ctx_admin", display_name="Context Admin") + + from src.db import get_system_db + conn = get_system_db() + try: + from app.auth.role_resolver import sync_registered_roles_to_db + sync_registered_roles_to_db(conn) + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + role = InternalRolesRepository(conn).get_by_key("ctx_admin") + GroupMappingsRepository(conn).create( + id=str(uuid.uuid4()), + external_group_id="engineers@example.com", + internal_role_id=role["id"], + assigned_by="setup", + ) + finally: + conn.close() + + from app.main import create_app + from fastapi import Depends, FastAPI + from app.auth.role_resolver import require_internal_role + + app = create_app() + # Attach two probe endpoints — one gated by ctx_admin, one by a role + # the dev user does NOT hold. + @app.get("/_test/needs-ctx") + async def needs_ctx(user: dict = Depends(require_internal_role("ctx_admin"))): + return {"ok": True, "email": user["email"]} + + @app.get("/_test/needs-other") + async def needs_other(user: dict = Depends(require_internal_role("never_granted"))): + return {"ok": True} + + return TestClient(app) + + def test_grants_access_when_mapped_role_present(self, dev_app_with_mapping): + resp = dev_app_with_mapping.get("/_test/needs-ctx") + assert resp.status_code == 200 + assert resp.json() == {"ok": True, "email": "dev@localhost"} + + def test_denies_access_when_role_missing(self, dev_app_with_mapping): + resp = dev_app_with_mapping.get("/_test/needs-other") + assert resp.status_code == 403 + assert "never_granted" in resp.json()["detail"] + + def test_session_internal_roles_populated(self, dev_app_with_mapping): + """Direct session inspection — the resolver wrote the resolved role + keys into session.internal_roles, decoupled from any HTML template.""" + # Hit any auth-required endpoint to trigger the resolver. + dev_app_with_mapping.get("/_test/needs-ctx") + from itsdangerous import TimestampSigner + import base64, json as _json + cookie = dev_app_with_mapping.cookies.get("session") + assert cookie, "session cookie missing" + signer = TimestampSigner(os.environ["SESSION_SECRET"]) + unsigned = signer.unsign(cookie, max_age=14 * 24 * 3600) + payload = _json.loads(base64.b64decode(unsigned)) + assert payload.get("internal_roles") == ["ctx_admin"] + + def test_stale_session_keeps_old_roles_after_mapping_change(self, dev_app_with_mapping): + """KNOWN LIMITATION (documented in docs/internal-roles.md → Resolution + timing): roles are resolved at sign-in only. If an admin revokes a + mapping mid-session, the user keeps the cached role keys until they + log out + back in. This test pins that behavior so any future cache + invalidation pathway (admin UI broadcast, deactivate-then-reactivate + side-effect) is a deliberate change, not an accident.""" + # First request — dev-bypass populates session.internal_roles=["ctx_admin"]. + resp1 = dev_app_with_mapping.get("/_test/needs-ctx") + assert resp1.status_code == 200 + + # Admin revokes the mapping out-of-band. + from src.db import get_system_db + from src.repositories.group_mappings import GroupMappingsRepository + from src.repositories.internal_roles import InternalRolesRepository + conn = get_system_db() + try: + role = InternalRolesRepository(conn).get_by_key("ctx_admin") + existing = GroupMappingsRepository(conn).list_by_role(role["id"]) + for m in existing: + GroupMappingsRepository(conn).delete(m["id"]) + finally: + conn.close() + + # Second request — session still holds the cached role; gate still passes. + # The dev-bypass write-skip path (groups_changed=False AND + # internal_roles already in session) keeps the session value intact, + # mirroring the OAuth flow where session lives until logout. + resp2 = dev_app_with_mapping.get("/_test/needs-ctx") + assert resp2.status_code == 200, ( + "Stale-session contract broken: revoking a mapping must NOT " + "drop access mid-session today. If this assertion starts " + "failing, decide deliberately whether you've added " + "invalidation (good — update the doc) or introduced a " + "regression that double-resolves on every request (bad)." + ) + + def test_pat_caller_gets_pat_specific_403_detail(self): + """Bearer/PAT requests don't carry session-resolved roles + (session middleware exists but the OAuth callback is the only + writer of session.internal_roles). require_internal_role must + fail closed AND surface a PAT-specific message so an API + consumer hitting the wall sees what to fix instead of a + generic 'missing role' from a token they thought was admin.""" + from unittest.mock import MagicMock + import asyncio + from fastapi import HTTPException + from app.auth.role_resolver import require_internal_role + + # PAT request shape: session middleware ran (session attribute exists), + # but OAuth callback never fired (no "internal_roles" key in dict). + request = MagicMock() + request.session = {} # empty — no "internal_roles" key + + check = require_internal_role("ctx_admin") + with pytest.raises(HTTPException) as exc_info: + asyncio.run(check(request=request, user={"email": "pat@example.com"})) + + assert exc_info.value.status_code == 403 + # The detail spells out the PAT/Bearer caveat, not just the missing role. + detail = exc_info.value.detail + assert "ctx_admin" in detail + assert "Bearer" in detail or "PAT" in detail + assert "session" in detail.lower() + + def test_oauth_pipeline_groups_to_internal_roles(self, db_conn): + """End-to-end data flow: fake _fetch_google_groups output (the + only Cloud Identity touchpoint) → join against group_mappings → + internal_roles list. The OAuth handshake itself isn't exercised + here — its failure modes live in _fetch_google_groups, which + has its own coverage. This test pins the resolver as the + contract between 'whatever Google returned' and + 'session.internal_roles'.""" + from app.auth.role_resolver import ( + register_internal_role, + sync_registered_roles_to_db, + resolve_internal_roles, + ) + from src.repositories.internal_roles import InternalRolesRepository + from src.repositories.group_mappings import GroupMappingsRepository + + register_internal_role("ctx_admin", display_name="Context Admin") + register_internal_role("agent_op", display_name="Agent Operator") + sync_registered_roles_to_db(db_conn) + + ctx = InternalRolesRepository(db_conn).get_by_key("ctx_admin") + agent = InternalRolesRepository(db_conn).get_by_key("agent_op") + gm = GroupMappingsRepository(db_conn) + gm.create( + id=str(uuid.uuid4()), + external_group_id="engineers@example.com", + internal_role_id=ctx["id"], + ) + gm.create( + id=str(uuid.uuid4()), + external_group_id="ops@example.com", + internal_role_id=agent["id"], + ) + + # Simulate Google's response: two mapped groups + one unrelated. + google_groups = [ + {"id": "engineers@example.com", "name": "Engineering"}, + {"id": "ops@example.com", "name": "Operations"}, + {"id": "marketing@example.com", "name": "Marketing"}, # unmapped + ] + result = resolve_internal_roles(google_groups, db_conn) + assert result == ["agent_op", "ctx_admin"] # sorted, deduped + + def test_dev_bypass_falls_back_to_empty_on_resolver_error( + self, tmp_path, monkeypatch + ): + """If resolve_internal_roles raises mid-request (corrupted DB, + schema mid-migration, transient lock), the dev-bypass path + catches and writes []. Auth must never break on resolver + infrastructure failures — same defensive contract as the OAuth + callback's try/except wrapper.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!") + monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!") + monkeypatch.setenv("LOCAL_DEV_MODE", "1") + monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost") + monkeypatch.setenv( + "LOCAL_DEV_GROUPS", + '[{"id":"engineers@example.com","name":"Engineers"}]', + ) + # Patch the symbol on the module so the lazy import inside the + # dev-bypass branch picks up the broken stub on call. + import app.auth.role_resolver as rr + + def boom(*_args, **_kwargs): + raise RuntimeError("simulated resolver failure") + + monkeypatch.setattr(rr, "resolve_internal_roles", boom) + + from app.main import create_app + from fastapi import Depends, FastAPI + from app.auth.dependencies import get_current_user + + app = create_app() + + @app.get("/_test/probe") + async def probe(user: dict = Depends(get_current_user)): + return {"email": user["email"]} + + client = TestClient(app) + # Auth still succeeds — resolver failure must not 500/401 the request. + resp = client.get("/_test/probe") + assert resp.status_code == 200 + assert resp.json()["email"] == "dev@localhost"