* feat(auth): v9 schema — unified role management foundation (WIP)
Tasks 1-5, 10 of the role-management-complete plan. Foundation only,
follow-up commits add REST API, CLI, UI, and tests.
Schema v9:
- user_role_grants table: direct user → internal_role mapping
(complementary to group_mappings). Drives PAT/headless auth and
persists across sessions. Source field tracks 'direct' vs auto-seed.
- internal_roles.implies (JSON): transitive role hierarchy. core.admin
implies core.km_admin → core.analyst → core.viewer. Resolver does BFS
expand at lookup time.
- internal_roles.is_core (BOOL): distinguishes seeded core.* hierarchy
from module-registered roles. UI renders them differently.
- v8→v9 migration: ADD COLUMN, CREATE TABLE, _seed_core_roles +
_backfill_users_role_to_grants, then NULL legacy users.role values.
DuckDB FK constraint blocks DROP COLUMN — sloupec zůstává jako
deprecated artifact (UserRepository ignoruje), fyzický drop deferred.
Resolver:
- Regex extended to allow dotted namespace (core.admin,
context_engineering.admin), max 64 chars total.
- expand_implies(role_keys, conn): BFS over implies JSON column.
- resolve_internal_roles signature gains optional user_id parameter;
unions group-mapping resolution with user_role_grants direct grants
before implies expansion.
require_internal_role:
- Two-path resolution: session cache (OAuth) → DB grants (PAT/headless
fallback). PAT clients now legitimately satisfy gates without the
OAuth round-trip, fixing the v8 limitation where every PAT-callable
admin endpoint needed require_role(Role.ADMIN) instead of
require_internal_role(...).
Backward-compat:
- require_role(Role.X) and require_admin become thin wrappers over
require_internal_role(f"core.{role}"). Implies hierarchy preserves the
legacy "at least this level" semantics automatically — no per-level
comparison code needed.
- src/rbac.py helpers (is_admin, has_role, get_user_role,
set_user_role, can_access_table, get_accessible_tables) all read from
the resolver via _get_internal_role_keys.
- UserRepository.create() and update() now mirror role changes into
user_role_grants via _grant_core_role helper. Preserves API while
making the new table the source of truth.
- UserRepository.delete() pre-deletes user_role_grants rows
(FK cascade — DuckDB doesn't auto-cascade).
- count_admins() reads user_role_grants ⨝ internal_roles instead of the
now-NULL users.role column.
First consumer:
- app/api/admin.py module-level docstring documents the v9 pattern for
future module authors. Existing require_role(Role.ADMIN) callsites
flow through the wrapper; no behavior change for OAuth callers, and
PAT callers gain access via direct grants.
Tests: full suite green (1396 passed, 6 skipped). Existing tests
exercise the new pathway transparently because UserRepository.create
auto-grants. New test_pat_caller_with_direct_grant_passes pins the
PAT-aware contract.
Schema: v9 (was v8). pyproject.toml + CHANGELOG bump deferred to the
final PR-prep commit.
* feat(auth): role management complete — REST API + CLI + UI + docs (v0.11.4)
Sjednocuje legacy users.role enum s v8 internal-roles foundation pod jeden
model s implies hierarchií, dodává admin UI + REST API + CLI pro správu
group mappings i přímých user grants, a dělá require_internal_role
PAT-aware tak, aby admin endpointy fungovaly uniformly napříč OAuth
i headless callery.
REST API (app/api/role_management.py, +496 LOC):
- 8 endpointů pod /api/admin: internal-roles list, group-mappings CRUD,
users/{id}/role-grants CRUD, users/{id}/effective-roles debug.
- Všechny gated require_internal_role("core.admin"). Audit-log na každé
mutaci (role_mapping.created/deleted, role_grant.created/deleted).
- Last-admin protection: refuse to delete the final core.admin grant
(mirrors users.py:count_admins protection).
- Nový UserRoleGrantsRepository v src/repositories/user_role_grants.py.
CLI (cli/commands/admin.py extension, +258 LOC):
- da admin role list / show <key>
- da admin mapping list / create <group-id> <role-key> / delete <id>
- da admin grant-role <email> <role-key>
- da admin revoke-role <email> <role-key>
- da admin effective-roles <email>
- Všechno přes typer + PAT auth, --json flag, response-shape tolerantní.
UI (admin_role_mapping.html + admin_user_detail.html + nav + user list):
- Nová stránka /admin/role-mapping: internal_roles read-only table +
group_mappings table with create/delete forms.
- Nová stránka /admin/users/{id}: core role single-select + capabilities
multi-checkbox + effective-roles debug (direct + group + expanded).
- Existing user list dostává "Detail" link na novou stránku.
- Nav link na /admin/role-mapping.
Tests: +85 nových testů přes 4 nové soubory:
- test_schema_v9_migration.py (8) — fresh install + v8→v9 backfill +
legacy column NULL semantics + unknown-role fallback + invariants.
- test_api_role_management.py (33) — všech 8 endpointů, happy + error
paths, audit-log assertions, last-admin protection.
- test_cli_admin_role.py (25 + 1 conditional) — typer subcommands,
text + json output, PAT integration smoke.
- test_admin_role_mapping_ui.py (9) + test_admin_user_capabilities_ui.py (10)
— page rendering, auth gating, form contracts, JS hooks.
Full suite: 1482 passed, 6 skipped (was 1396 → +86, žádné regrese).
Docs:
- docs/internal-roles.md kompletní rewrite — odstranil "no UI yet",
přidal hierarchy diagram, dual-path resolution, dotted-namespace
convention, admin workflow přes UI/CLI/REST, refresh semantics
for group mappings vs direct grants, migration notes.
- CLAUDE.md schema v8 → v9.
- CHANGELOG.md [0.11.4] s BREAKING marker pro users.role NULL
semantics + complete Added/Changed/Removed/Internal sekce.
- pyproject.toml: 0.11.3 → 0.11.4.
Sequencing: po mergi tohoto PR Pabu rebasuje pabu/local-dev (PR #72)
na main, jeho schema migrations se posouvají z v9/v10/v11 na v10/v11/v12.
Implementation breakdown:
- Sequential (já): foundation tasks — schema v9, resolver, PAT-aware
require_internal_role, backward-compat wrappers, rbac refactor,
UserRepository auto-grant.
- Parallel sub-agents (3 worktrees, ~10 min): REST API, CLI, UI.
- Sequential (já): integrace, docs/CHANGELOG/version, schema tests,
fullsuite verification.
* fix(auth): address Devin review on PR #73 — three regressions
Three concrete bugs caught in Devin's PR review, all fixed in this commit.
1. **users.role hydration on read** (the big one):
v8→v9 migration NULLs users.role for every existing user, but a long
tail of read sites still inspect user["role"] directly:
- app/web/templates/_app_header.html:15 — admin nav gate
- app/web/templates/_app_header.html:36-37 — role badge in dropdown
- app/web/router.py:319-321 — UserInfo.is_admin/is_analyst/is_privileged
- app/web/router.py:489 — corporate memory is_km_admin
- app/api/catalog.py:54 — admin "see all tables" bypass
- app/api/sync.py:215 — admin "see all sync states" bypass
Without a fix, every existing admin loses the entire admin nav (and
API admin bypasses) immediately after upgrade — a serious regression.
Fix: new helper _hydrate_legacy_role() in app/auth/dependencies.py
maps the highest-level core.* grant back into user["role"] as the
legacy enum string. Called from get_current_user() on both auth paths
(LOCAL_DEV_MODE + JWT/PAT). Idempotent — skips when role is already
populated. Net effect: every pre-v9 callsite keeps working transparently
for both OAuth and PAT callers, with one extra DB round-trip per
authenticated request (same cost as the existing PAT-aware
require_internal_role fallback).
3 regression tests in tests/test_schema_v9_migration.py:
- test_hydration_recovers_role_from_user_role_grants
- test_hydration_returns_highest_grant (multi-grant → highest wins)
- test_hydration_falls_back_to_viewer_when_no_grants (safe fallback)
2. **CLI effective-roles TypeError**:
API returns direct/group as List[Dict] (RoleGrantResponse-shaped),
but the CLI did ', '.join(direct) which raises TypeError on dicts.
Tests masked it because mocks used bare string lists. Replaced
raw .join() with a _names() helper that extracts role_key from
each item, falling back to str() for legacy mock shapes.
3. **UI template field-name mismatch**:
admin_user_detail.html JS reads data.groups but the API serializes
the field as group (singular, per EffectiveRolesResponse pydantic).
Currently benign because the API always returns group:[], but the
field would silently disappear once the group-derived view is wired
up. Added data.group as the primary lookup, kept the legacy aliases
for shape-drift tolerance.
Full suite: 1485 passed (was 1482, +3 hydration tests), 6 skipped, no
regressions.
* fix(auth): Devin review #2 + UX self-service + RBAC docs rename
Three threads landed in one commit because they share the same
auth/role surface and CHANGELOG entry.
Devin review #73 second round (2 actionable findings):
- _hydrate_legacy_role no longer short-circuits on truthy users.role.
The role-management endpoints (POST/DELETE /api/admin/users/{id}/
role-grants + the changeCoreRole UI flow) only mutate
user_role_grants — they don't update the legacy column. The early
return trusted that stale value, so a user downgraded via the new
REST/UI kept role="admin" in their dict on subsequent requests,
which fooled _is_admin_user_dict (src/rbac.py) and the catalog/sync
admin-bypass short-circuits into retaining elevated table access
even though require_internal_role correctly denied the API gates.
Always re-resolves now, making user_role_grants the single source
of truth on every authenticated request. Cost: one DB round-trip
per request — same as the existing PAT-aware fallback. Pinned by
test_hydration_ignores_stale_legacy_role_after_grant_revoke.
- Dev-bypass (app/auth/dependencies.py) and OAuth callback
(app/auth/providers/google.py) now pass user_id to
resolve_internal_roles so direct grants land in
session["internal_roles"] alongside group-mapped roles. Pre-fix,
every admin-gated request fell through to the per-request DB
fallback inside require_internal_role and the dev-bypass log line
read "resolved 0 internal role(s)" for an obviously-admin user.
test_session_internal_roles_populated updated to assert union.
User-visible UX (also addresses local-test feedback):
- HTTP 500 on /admin/users post-v8→v9 migration — UserResponse.role
is required str, but legacy users.role was NULL-ed by the
migration. _to_response in app/api/users.py now routes every dict
through _hydrate_legacy_role; same fix lifts the silent no-op of
last-admin protection in update_user/delete_user (the role-equality
short-circuits would skip the count_admins guard for migrated
admins). Three regression tests under TestAPIUsersPostMigration.
- /profile is now a real self-service detail page for *every*
signed-in user (not just admins). Three new server-side sections:
Effective roles (resolver output as chip cloud), Direct grants
(rows in user_role_grants with source label), Roles via groups
(which Cloud Identity / dev group grants which role for the
current user). Non-admins finally see *why* a feature is or isn't
accessible. Admins additionally see a deep-link to
/admin/users/{id} for editing their own grants.
- /admin/role-mapping group-id picker. New "Known groups" panel
above the create form: clickable chips for the calling admin's
own session.google_groups (tagged "your group") merged with
external_group_ids already used in existing mappings (tagged
"already mapped"). Click a chip → fills the form. Empty-state
copy points operators at LOCAL_DEV_GROUPS / Google sign-in
instead of leaving them to guess Cloud Identity opaque IDs from
memory.
Operational fixes:
- Scheduler log-noise: every cron tick produced a
POST /auth/token 401 because the auto-fetch fallback called the
endpoint with just an email (no password) and silently fell
through. Removed the broken path entirely. Operators set
SCHEDULER_API_TOKEN (long-lived PAT) in production; in
LOCAL_DEV_MODE the dev-bypass auto-authenticates the un-tokenized
request, so jobs continue to work.
Docs:
- docs/internal-roles.md → docs/RBAC.md (git mv preserves history).
Standard industry term, more discoverable for engineers grepping
for RBAC in a new repo. Restructured: Quickstart-by-role
(operator / end-user / module author), step-by-step
Module-author workflow with code examples (register key, gate
endpoint, declare implies, write contract test), naming pitfalls,
refresh semantics. CLAUDE.md gets a new
"Extensibility → RBAC" section pointing contributors at the doc
before they add gated endpoints. Cross-refs in app/api/admin.py
+ tests/test_role_resolver.py updated.
Tests: 293 in the auth/role/scheduler/UI test set passed, 0 regressions.
* fix(auth): Devin review #3 — login flows + RBAC docs
Two new findings on commit 7d1c048, both real and addressed.
Finding 1 (BUG, HTTP 500): every auth login flow loaded users via
UserRepository.get_by_email and passed user["role"] straight to
create_access_token, Pydantic response models, and _set_login_cookie
without going through _hydrate_legacy_role. Post-v9 the legacy column
is NULL for migrated users, and TokenResponse.role is a required str —
so POST /auth/token raised ValidationError → HTTP 500 for any v8-admin
trying to log in via password. Same root cause produced non-crashing
but semantically wrong JWTs (role: null) from Google OAuth, password
web flows, and email magic-link verification.
Fix: hydrate inline in every login flow before reading user["role"]:
- app/auth/router.py — POST /auth/token (the crash site)
- app/auth/providers/google.py — OAuth callback (was just stale JWT)
- app/auth/providers/password.py — 5 flows: JSON login, web login,
JSON setup, web reset confirm, web setup confirm
- app/auth/providers/email.py — centralized in _consume_token,
covers both /verify endpoints
New regression class TestAuthLoginFlowsPostMigration pins both the
no-crash and the correct-role contracts for all four legacy levels
(viewer/analyst/km_admin/admin) on POST /auth/token.
Finding 2 (DOCS): docs/RBAC.md showed register_internal_role() being
called with implies=[...], but the function signature is (key, *,
display_name, description, owner_module). A module author copying the
example would TypeError at import time. The implies field on
internal_roles IS honored at runtime by expand_implies, but the
registry-side write path (register_internal_role + InternalRoleSpec +
sync_registered_roles_to_db) doesn't exist yet — implies is currently
seeded only for the core.* hierarchy via _seed_core_roles in src/db.py.
Rewrote the Implies hierarchy and Module-author workflow sections to
document what's actually supported in 0.11.4 and what a future change
would need to add. The "for cross-module hierarchies, register each
level + grant both" pattern works today.
Tests: 322 in the auth/role/scheduler/UI/password test set passed,
0 regressions.
* fix(db): _seed_core_roles actually runs on every connect (Devin review #4)
Devin flagged that the docstring on `_seed_core_roles` promised per-connect
execution as a safety net for accidental DELETEs and in-code seed changes,
but the only call sites lived inside `if current < SCHEMA_VERSION:` — so
once a DB was on v9 the function never ran again, and the docstring lied.
Picked option (b) from the review (actually call it on every startup) over
option (a) (fix the docstring) because the safety net is genuinely useful:
- recovery from accidental admin DELETE on internal_roles,
- in-code _CORE_ROLES_SEED tweaks (display_name/description/implies)
ship without a manual SQL deploy,
- fresh installs and migrations stop needing their own seed call sites.
Tail call gated by `get_schema_version(conn) <= SCHEMA_VERSION` so the
future-version-is-noop rollback contract still holds — a v9 binary won't
touch a DB that's been upgraded past v9.
Test coverage: new TestSeedCoreRolesSafetyNet class (3 tests) pins the
three contracts — deleted row re-seeds, mutated display_name re-syncs
from in-code seed, applied_at on schema_version doesn't churn on
already-current DBs. Existing TestMigrationSafety::test_future_version_is_noop
still passes (verified against the gating logic).
495 lines
22 KiB
Python
495 lines
22 KiB
Python
"""Tests for the internal-role registry, sync, resolver, and require dependency.
|
|
|
|
Schema v8 adds ``internal_roles`` and ``group_mappings``; the resolver in
|
|
``app.auth.role_resolver`` is the integration point between Cloud Identity
|
|
groups (external) and Agnes-defined capabilities (internal). End-to-end
|
|
exercise rides on LOCAL_DEV_MODE + LOCAL_DEV_GROUPS so we don't need to
|
|
mock Google OAuth.
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture
|
|
def db_conn(tmp_path, monkeypatch):
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
from src.db import get_system_db
|
|
conn = get_system_db()
|
|
yield conn
|
|
conn.close()
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _clear_role_registry():
|
|
"""Module-level _REGISTRY persists across tests in the same process —
|
|
flush before AND after each test so registrations from one test don't
|
|
leak into the next, regardless of which fixture ran first."""
|
|
from app.auth.role_resolver import _clear_registry_for_tests
|
|
_clear_registry_for_tests()
|
|
yield
|
|
_clear_registry_for_tests()
|
|
|
|
|
|
class TestRegisterInternalRole:
|
|
def test_register_and_list(self):
|
|
from app.auth.role_resolver import (
|
|
register_internal_role, list_registered_roles,
|
|
)
|
|
register_internal_role(
|
|
"context_admin",
|
|
display_name="Context Admin",
|
|
description="Manages the context engineering module.",
|
|
owner_module="context_engineering",
|
|
)
|
|
register_internal_role("agent_operator", display_name="Agent Operator")
|
|
keys = [s.key for s in list_registered_roles()]
|
|
assert keys == ["agent_operator", "context_admin"] # sorted
|
|
|
|
def test_register_same_key_same_fields_is_idempotent(self):
|
|
"""Re-importing a module shouldn't blow up — same key + same fields no-ops."""
|
|
from app.auth.role_resolver import (
|
|
register_internal_role, list_registered_roles,
|
|
)
|
|
register_internal_role("x", display_name="X")
|
|
register_internal_role("x", display_name="X")
|
|
assert len(list_registered_roles()) == 1
|
|
|
|
def test_register_same_key_different_fields_raises(self):
|
|
"""Two modules picking the same key would silently overwrite each
|
|
other's metadata — refuse and force one of them to rename."""
|
|
from app.auth.role_resolver import register_internal_role
|
|
register_internal_role("x", display_name="X")
|
|
with pytest.raises(ValueError, match="already registered"):
|
|
register_internal_role("x", display_name="Different")
|
|
|
|
@pytest.mark.parametrize("bad_key", [
|
|
"Context_Admin", # uppercase
|
|
"1context", # leading digit
|
|
"context-admin", # hyphen
|
|
"", # empty
|
|
"context admin", # space
|
|
"x" * 65, # too long
|
|
])
|
|
def test_register_rejects_invalid_keys(self, bad_key):
|
|
from app.auth.role_resolver import register_internal_role
|
|
with pytest.raises(ValueError, match="Invalid internal role key"):
|
|
register_internal_role(bad_key, display_name="X")
|
|
|
|
|
|
class TestSyncRegisteredRolesToDb:
|
|
def test_inserts_new_roles(self, db_conn):
|
|
from app.auth.role_resolver import (
|
|
register_internal_role, sync_registered_roles_to_db,
|
|
)
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
register_internal_role("ctx_admin", display_name="Context Admin")
|
|
sync_registered_roles_to_db(db_conn)
|
|
row = InternalRolesRepository(db_conn).get_by_key("ctx_admin")
|
|
assert row is not None
|
|
assert row["display_name"] == "Context Admin"
|
|
|
|
def test_sync_is_idempotent(self, db_conn):
|
|
from app.auth.role_resolver import (
|
|
register_internal_role, sync_registered_roles_to_db,
|
|
)
|
|
register_internal_role("ctx_admin", display_name="Context Admin")
|
|
sync_registered_roles_to_db(db_conn)
|
|
sync_registered_roles_to_db(db_conn) # second call must not duplicate
|
|
rows = db_conn.execute(
|
|
"SELECT COUNT(*) FROM internal_roles WHERE key = 'ctx_admin'"
|
|
).fetchone()
|
|
assert rows[0] == 1
|
|
|
|
def test_sync_updates_drifted_metadata(self, db_conn):
|
|
"""Display name change in code should propagate to DB on next startup."""
|
|
from app.auth.role_resolver import (
|
|
register_internal_role, sync_registered_roles_to_db,
|
|
_clear_registry_for_tests,
|
|
)
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
register_internal_role("ctx_admin", display_name="Old Name")
|
|
sync_registered_roles_to_db(db_conn)
|
|
# Simulate a code update: clear the registry and re-register with new name.
|
|
_clear_registry_for_tests()
|
|
register_internal_role("ctx_admin", display_name="New Name")
|
|
sync_registered_roles_to_db(db_conn)
|
|
row = InternalRolesRepository(db_conn).get_by_key("ctx_admin")
|
|
assert row["display_name"] == "New Name"
|
|
|
|
def test_sync_does_not_delete_unregistered_roles(self, db_conn):
|
|
"""A role disappearing from code (module unloaded) keeps its DB row +
|
|
mappings until an admin explicitly removes it."""
|
|
from app.auth.role_resolver import (
|
|
register_internal_role, sync_registered_roles_to_db,
|
|
_clear_registry_for_tests,
|
|
)
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
register_internal_role("legacy_role", display_name="Legacy")
|
|
sync_registered_roles_to_db(db_conn)
|
|
_clear_registry_for_tests() # module no longer registers this role
|
|
sync_registered_roles_to_db(db_conn)
|
|
row = InternalRolesRepository(db_conn).get_by_key("legacy_role")
|
|
assert row is not None # still there
|
|
|
|
|
|
class TestResolveInternalRoles:
|
|
def test_returns_empty_when_no_external_groups(self, db_conn):
|
|
from app.auth.role_resolver import resolve_internal_roles
|
|
assert resolve_internal_roles([], db_conn) == []
|
|
|
|
def test_returns_empty_when_no_mappings(self, db_conn):
|
|
from app.auth.role_resolver import resolve_internal_roles
|
|
groups = [{"id": "engineers@x.com", "name": "Engineers"}]
|
|
assert resolve_internal_roles(groups, db_conn) == []
|
|
|
|
def test_resolves_single_mapping(self, db_conn):
|
|
from app.auth.role_resolver import resolve_internal_roles
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
from src.repositories.group_mappings import GroupMappingsRepository
|
|
roles = InternalRolesRepository(db_conn)
|
|
mappings = GroupMappingsRepository(db_conn)
|
|
role_id = str(uuid.uuid4())
|
|
roles.create(id=role_id, key="ctx_admin", display_name="Context Admin")
|
|
mappings.create(
|
|
id=str(uuid.uuid4()),
|
|
external_group_id="engineers@x.com",
|
|
internal_role_id=role_id,
|
|
assigned_by="admin@x.com",
|
|
)
|
|
result = resolve_internal_roles(
|
|
[{"id": "engineers@x.com", "name": "Engineers"}], db_conn,
|
|
)
|
|
assert result == ["ctx_admin"]
|
|
|
|
def test_resolves_many_to_many(self, db_conn):
|
|
"""Multiple external groups, multiple roles, with overlap — output
|
|
must be sorted + deduplicated."""
|
|
from app.auth.role_resolver import resolve_internal_roles
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
from src.repositories.group_mappings import GroupMappingsRepository
|
|
roles = InternalRolesRepository(db_conn)
|
|
mappings = GroupMappingsRepository(db_conn)
|
|
ctx_id = str(uuid.uuid4())
|
|
agent_id = str(uuid.uuid4())
|
|
roles.create(id=ctx_id, key="ctx_admin", display_name="C")
|
|
roles.create(id=agent_id, key="agent_operator", display_name="A")
|
|
# engineers → ctx_admin AND agent_operator
|
|
mappings.create(
|
|
id=str(uuid.uuid4()), external_group_id="eng@x", internal_role_id=ctx_id,
|
|
)
|
|
mappings.create(
|
|
id=str(uuid.uuid4()), external_group_id="eng@x", internal_role_id=agent_id,
|
|
)
|
|
# admins → ctx_admin (overlap with engineers)
|
|
mappings.create(
|
|
id=str(uuid.uuid4()), external_group_id="admins@x", internal_role_id=ctx_id,
|
|
)
|
|
result = resolve_internal_roles(
|
|
[{"id": "eng@x", "name": "E"}, {"id": "admins@x", "name": "A"}],
|
|
db_conn,
|
|
)
|
|
assert result == ["agent_operator", "ctx_admin"] # sorted, deduped
|
|
|
|
def test_ignores_malformed_external_group_entries(self, db_conn):
|
|
"""Defensive: a stray non-dict or missing-id entry shouldn't crash
|
|
the resolver — those just get skipped."""
|
|
from app.auth.role_resolver import resolve_internal_roles
|
|
result = resolve_internal_roles(
|
|
["not-a-dict", {"name": "no-id"}, {"id": ""}], # type: ignore[list-item]
|
|
db_conn,
|
|
)
|
|
assert result == []
|
|
|
|
|
|
class TestRequireInternalRole:
|
|
"""End-to-end via LOCAL_DEV_MODE + LOCAL_DEV_GROUPS: dev user with a
|
|
mapped external group passes the gate; without the mapping, 403."""
|
|
|
|
@pytest.fixture
|
|
def dev_app_with_mapping(self, tmp_path, monkeypatch):
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
|
|
monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!")
|
|
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
|
|
monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost")
|
|
monkeypatch.setenv(
|
|
"LOCAL_DEV_GROUPS",
|
|
'[{"id":"engineers@example.com","name":"Engineers"}]',
|
|
)
|
|
# Register a role + map external group → role BEFORE create_app() so
|
|
# the startup sync picks it up and the resolver finds the mapping on
|
|
# the first request.
|
|
from app.auth.role_resolver import register_internal_role
|
|
register_internal_role("ctx_admin", display_name="Context Admin")
|
|
|
|
from src.db import get_system_db
|
|
conn = get_system_db()
|
|
try:
|
|
from app.auth.role_resolver import sync_registered_roles_to_db
|
|
sync_registered_roles_to_db(conn)
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
from src.repositories.group_mappings import GroupMappingsRepository
|
|
role = InternalRolesRepository(conn).get_by_key("ctx_admin")
|
|
GroupMappingsRepository(conn).create(
|
|
id=str(uuid.uuid4()),
|
|
external_group_id="engineers@example.com",
|
|
internal_role_id=role["id"],
|
|
assigned_by="setup",
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
from app.main import create_app
|
|
from fastapi import Depends, FastAPI
|
|
from app.auth.role_resolver import require_internal_role
|
|
|
|
app = create_app()
|
|
# Attach two probe endpoints — one gated by ctx_admin, one by a role
|
|
# the dev user does NOT hold.
|
|
@app.get("/_test/needs-ctx")
|
|
async def needs_ctx(user: dict = Depends(require_internal_role("ctx_admin"))):
|
|
return {"ok": True, "email": user["email"]}
|
|
|
|
@app.get("/_test/needs-other")
|
|
async def needs_other(user: dict = Depends(require_internal_role("never_granted"))):
|
|
return {"ok": True}
|
|
|
|
return TestClient(app)
|
|
|
|
def test_grants_access_when_mapped_role_present(self, dev_app_with_mapping):
|
|
resp = dev_app_with_mapping.get("/_test/needs-ctx")
|
|
assert resp.status_code == 200
|
|
assert resp.json() == {"ok": True, "email": "dev@localhost"}
|
|
|
|
def test_denies_access_when_role_missing(self, dev_app_with_mapping):
|
|
resp = dev_app_with_mapping.get("/_test/needs-other")
|
|
assert resp.status_code == 403
|
|
assert "never_granted" in resp.json()["detail"]
|
|
|
|
def test_session_internal_roles_populated(self, dev_app_with_mapping):
|
|
"""Direct session inspection — the resolver wrote the resolved role
|
|
keys into session.internal_roles, decoupled from any HTML template.
|
|
|
|
The cached set must include both sources after Devin review #73:
|
|
|
|
- **Group-mapped** ``ctx_admin`` (engineers@example.com → ctx_admin
|
|
via group_mappings).
|
|
- **Direct grants** for the dev user — seeded as admin by
|
|
app/main.py, so user_role_grants contains core.admin which
|
|
implies-expands to the full ``core.*`` hierarchy.
|
|
|
|
Pre-fix the dev-bypass and OAuth callback called
|
|
``resolve_internal_roles(groups, conn)`` *without* user_id, dropping
|
|
direct grants on the floor. Every admin-gated request then fell
|
|
through to the per-request DB fallback inside
|
|
``require_internal_role`` — functionally correct, but defeated the
|
|
session cache and surfaced confusing "dev-bypass resolved 0
|
|
internal role(s)" log lines for an obviously-admin user. The fix
|
|
passes user_id so the cache is authoritative on the first hit."""
|
|
# Hit any auth-required endpoint to trigger the resolver.
|
|
dev_app_with_mapping.get("/_test/needs-ctx")
|
|
from itsdangerous import TimestampSigner
|
|
import base64, json as _json
|
|
cookie = dev_app_with_mapping.cookies.get("session")
|
|
assert cookie, "session cookie missing"
|
|
signer = TimestampSigner(os.environ["SESSION_SECRET"])
|
|
unsigned = signer.unsign(cookie, max_age=14 * 24 * 3600)
|
|
payload = _json.loads(base64.b64decode(unsigned))
|
|
roles = set(payload.get("internal_roles") or [])
|
|
# Group-mapping path
|
|
assert "ctx_admin" in roles, (
|
|
"ctx_admin must come from the engineers@example.com group mapping"
|
|
)
|
|
# Direct-grant path (dev user is seeded as admin)
|
|
assert "core.admin" in roles, (
|
|
"core.admin must come from user_role_grants — Devin review #73 "
|
|
"regression: dev-bypass was previously dropping direct grants"
|
|
)
|
|
# Implies expansion runs after union
|
|
assert {"core.viewer", "core.analyst", "core.km_admin"}.issubset(roles), (
|
|
"implies expansion must include the full core.* hierarchy "
|
|
"below the held core.admin grant"
|
|
)
|
|
|
|
def test_stale_session_keeps_old_roles_after_mapping_change(self, dev_app_with_mapping):
|
|
"""KNOWN LIMITATION (documented in docs/RBAC.md → Resolution
|
|
timing): roles are resolved at sign-in only. If an admin revokes a
|
|
mapping mid-session, the user keeps the cached role keys until they
|
|
log out + back in. This test pins that behavior so any future cache
|
|
invalidation pathway (admin UI broadcast, deactivate-then-reactivate
|
|
side-effect) is a deliberate change, not an accident."""
|
|
# First request — dev-bypass populates session.internal_roles=["ctx_admin"].
|
|
resp1 = dev_app_with_mapping.get("/_test/needs-ctx")
|
|
assert resp1.status_code == 200
|
|
|
|
# Admin revokes the mapping out-of-band.
|
|
from src.db import get_system_db
|
|
from src.repositories.group_mappings import GroupMappingsRepository
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
conn = get_system_db()
|
|
try:
|
|
role = InternalRolesRepository(conn).get_by_key("ctx_admin")
|
|
existing = GroupMappingsRepository(conn).list_by_role(role["id"])
|
|
for m in existing:
|
|
GroupMappingsRepository(conn).delete(m["id"])
|
|
finally:
|
|
conn.close()
|
|
|
|
# Second request — session still holds the cached role; gate still passes.
|
|
# The dev-bypass write-skip path (groups_changed=False AND
|
|
# internal_roles already in session) keeps the session value intact,
|
|
# mirroring the OAuth flow where session lives until logout.
|
|
resp2 = dev_app_with_mapping.get("/_test/needs-ctx")
|
|
assert resp2.status_code == 200, (
|
|
"Stale-session contract broken: revoking a mapping must NOT "
|
|
"drop access mid-session today. If this assertion starts "
|
|
"failing, decide deliberately whether you've added "
|
|
"invalidation (good — update the doc) or introduced a "
|
|
"regression that double-resolves on every request (bad)."
|
|
)
|
|
|
|
def test_pat_caller_with_direct_grant_passes(self, db_conn, monkeypatch):
|
|
"""v9 PAT-aware path: a user with a direct user_role_grants row
|
|
passes require_internal_role even without session.internal_roles.
|
|
This is the new admin-CLI-via-PAT contract — without it, all admin
|
|
endpoints would 403 to PAT clients after the require_admin
|
|
wrappers route through require_internal_role('core.admin')."""
|
|
from unittest.mock import MagicMock
|
|
import asyncio
|
|
from app.auth.role_resolver import require_internal_role
|
|
from src.repositories.users import UserRepository
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
|
|
# UserRepository.create(role="admin") auto-grants core.admin in v9
|
|
# — the explicit grant insert below would violate the UNIQUE
|
|
# (user_id, internal_role_id) constraint. Just create + verify.
|
|
user_id = str(uuid.uuid4())
|
|
UserRepository(db_conn).create(
|
|
id=user_id, email="admin-pat@example.com", name="Admin PAT",
|
|
role="admin",
|
|
)
|
|
|
|
# PAT-shape request: session middleware ran (attribute exists) but
|
|
# no internal_roles key. Gate must consult DB and grant access.
|
|
request = MagicMock()
|
|
request.session = {}
|
|
|
|
check = require_internal_role("core.admin")
|
|
result = asyncio.run(check(
|
|
request=request, user={"id": user_id, "email": "admin-pat@example.com"},
|
|
))
|
|
assert result["id"] == user_id
|
|
|
|
def test_pat_caller_without_grant_gets_403(self):
|
|
"""PAT/headless callers carry no session.internal_roles, but v9
|
|
require_internal_role falls back to user_role_grants in DB. A PAT
|
|
client whose user has no matching grant must still hit 403, not
|
|
slip through. Pins the closed-by-default behavior of the new
|
|
two-path resolver."""
|
|
from unittest.mock import MagicMock
|
|
import asyncio
|
|
from fastapi import HTTPException
|
|
from app.auth.role_resolver import require_internal_role
|
|
|
|
# PAT request shape: session middleware ran (session attribute exists)
|
|
# but OAuth callback never fired, so internal_roles is absent.
|
|
request = MagicMock()
|
|
request.session = {}
|
|
|
|
check = require_internal_role("ctx_admin")
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
# No matching grant in DB either — empty user dict means user_id
|
|
# is None and the DB lookup returns []. Gate must close.
|
|
asyncio.run(check(request=request, user={"email": "pat@example.com"}))
|
|
|
|
assert exc_info.value.status_code == 403
|
|
assert "ctx_admin" in exc_info.value.detail
|
|
|
|
def test_oauth_pipeline_groups_to_internal_roles(self, db_conn):
|
|
"""End-to-end data flow: fake _fetch_google_groups output (the
|
|
only Cloud Identity touchpoint) → join against group_mappings →
|
|
internal_roles list. The OAuth handshake itself isn't exercised
|
|
here — its failure modes live in _fetch_google_groups, which
|
|
has its own coverage. This test pins the resolver as the
|
|
contract between 'whatever Google returned' and
|
|
'session.internal_roles'."""
|
|
from app.auth.role_resolver import (
|
|
register_internal_role,
|
|
sync_registered_roles_to_db,
|
|
resolve_internal_roles,
|
|
)
|
|
from src.repositories.internal_roles import InternalRolesRepository
|
|
from src.repositories.group_mappings import GroupMappingsRepository
|
|
|
|
register_internal_role("ctx_admin", display_name="Context Admin")
|
|
register_internal_role("agent_op", display_name="Agent Operator")
|
|
sync_registered_roles_to_db(db_conn)
|
|
|
|
ctx = InternalRolesRepository(db_conn).get_by_key("ctx_admin")
|
|
agent = InternalRolesRepository(db_conn).get_by_key("agent_op")
|
|
gm = GroupMappingsRepository(db_conn)
|
|
gm.create(
|
|
id=str(uuid.uuid4()),
|
|
external_group_id="engineers@example.com",
|
|
internal_role_id=ctx["id"],
|
|
)
|
|
gm.create(
|
|
id=str(uuid.uuid4()),
|
|
external_group_id="ops@example.com",
|
|
internal_role_id=agent["id"],
|
|
)
|
|
|
|
# Simulate Google's response: two mapped groups + one unrelated.
|
|
google_groups = [
|
|
{"id": "engineers@example.com", "name": "Engineering"},
|
|
{"id": "ops@example.com", "name": "Operations"},
|
|
{"id": "marketing@example.com", "name": "Marketing"}, # unmapped
|
|
]
|
|
result = resolve_internal_roles(google_groups, db_conn)
|
|
assert result == ["agent_op", "ctx_admin"] # sorted, deduped
|
|
|
|
def test_dev_bypass_falls_back_to_empty_on_resolver_error(
|
|
self, tmp_path, monkeypatch
|
|
):
|
|
"""If resolve_internal_roles raises mid-request (corrupted DB,
|
|
schema mid-migration, transient lock), the dev-bypass path
|
|
catches and writes []. Auth must never break on resolver
|
|
infrastructure failures — same defensive contract as the OAuth
|
|
callback's try/except wrapper."""
|
|
monkeypatch.setenv("DATA_DIR", str(tmp_path))
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret-32chars-minimum!!!!!")
|
|
monkeypatch.setenv("SESSION_SECRET", "test-session-secret-32chars-minimum!!")
|
|
monkeypatch.setenv("LOCAL_DEV_MODE", "1")
|
|
monkeypatch.setenv("LOCAL_DEV_USER_EMAIL", "dev@localhost")
|
|
monkeypatch.setenv(
|
|
"LOCAL_DEV_GROUPS",
|
|
'[{"id":"engineers@example.com","name":"Engineers"}]',
|
|
)
|
|
# Patch the symbol on the module so the lazy import inside the
|
|
# dev-bypass branch picks up the broken stub on call.
|
|
import app.auth.role_resolver as rr
|
|
|
|
def boom(*_args, **_kwargs):
|
|
raise RuntimeError("simulated resolver failure")
|
|
|
|
monkeypatch.setattr(rr, "resolve_internal_roles", boom)
|
|
|
|
from app.main import create_app
|
|
from fastapi import Depends, FastAPI
|
|
from app.auth.dependencies import get_current_user
|
|
|
|
app = create_app()
|
|
|
|
@app.get("/_test/probe")
|
|
async def probe(user: dict = Depends(get_current_user)):
|
|
return {"email": user["email"]}
|
|
|
|
client = TestClient(app)
|
|
# Auth still succeeds — resolver failure must not 500/401 the request.
|
|
resp = client.get("/_test/probe")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["email"] == "dev@localhost"
|