agnes-the-ai-analyst/cli/commands/admin.py
Petr Simecek 83ced81966
feat(auth): unified role management — UI + REST API + CLI + schema v9 (v0.11.4) (#73)
* feat(auth): v9 schema — unified role management foundation (WIP)

Tasks 1-5, 10 of the role-management-complete plan. Foundation only,
follow-up commits add REST API, CLI, UI, and tests.

Schema v9:
- user_role_grants table: direct user → internal_role mapping
  (complementary to group_mappings). Drives PAT/headless auth and
  persists across sessions. Source field tracks 'direct' vs auto-seed.
- internal_roles.implies (JSON): transitive role hierarchy. core.admin
  implies core.km_admin → core.analyst → core.viewer. Resolver does BFS
  expand at lookup time.
- internal_roles.is_core (BOOL): distinguishes seeded core.* hierarchy
  from module-registered roles. UI renders them differently.
- v8→v9 migration: ADD COLUMN, CREATE TABLE, _seed_core_roles +
  _backfill_users_role_to_grants, then NULL legacy users.role values.
  DuckDB FK constraint blocks DROP COLUMN — sloupec zůstává jako
  deprecated artifact (UserRepository ignoruje), fyzický drop deferred.

Resolver:
- Regex extended to allow dotted namespace (core.admin,
  context_engineering.admin), max 64 chars total.
- expand_implies(role_keys, conn): BFS over implies JSON column.
- resolve_internal_roles signature gains optional user_id parameter;
  unions group-mapping resolution with user_role_grants direct grants
  before implies expansion.

require_internal_role:
- Two-path resolution: session cache (OAuth) → DB grants (PAT/headless
  fallback). PAT clients now legitimately satisfy gates without the
  OAuth round-trip, fixing the v8 limitation where every PAT-callable
  admin endpoint needed require_role(Role.ADMIN) instead of
  require_internal_role(...).

Backward-compat:
- require_role(Role.X) and require_admin become thin wrappers over
  require_internal_role(f"core.{role}"). Implies hierarchy preserves the
  legacy "at least this level" semantics automatically — no per-level
  comparison code needed.
- src/rbac.py helpers (is_admin, has_role, get_user_role,
  set_user_role, can_access_table, get_accessible_tables) all read from
  the resolver via _get_internal_role_keys.
- UserRepository.create() and update() now mirror role changes into
  user_role_grants via _grant_core_role helper. Preserves API while
  making the new table the source of truth.
- UserRepository.delete() pre-deletes user_role_grants rows
  (FK cascade — DuckDB doesn't auto-cascade).
- count_admins() reads user_role_grants ⨝ internal_roles instead of the
  now-NULL users.role column.

First consumer:
- app/api/admin.py module-level docstring documents the v9 pattern for
  future module authors. Existing require_role(Role.ADMIN) callsites
  flow through the wrapper; no behavior change for OAuth callers, and
  PAT callers gain access via direct grants.

Tests: full suite green (1396 passed, 6 skipped). Existing tests
exercise the new pathway transparently because UserRepository.create
auto-grants. New test_pat_caller_with_direct_grant_passes pins the
PAT-aware contract.

Schema: v9 (was v8). pyproject.toml + CHANGELOG bump deferred to the
final PR-prep commit.

* feat(auth): role management complete — REST API + CLI + UI + docs (v0.11.4)

Sjednocuje legacy users.role enum s v8 internal-roles foundation pod jeden
model s implies hierarchií, dodává admin UI + REST API + CLI pro správu
group mappings i přímých user grants, a dělá require_internal_role
PAT-aware tak, aby admin endpointy fungovaly uniformly napříč OAuth
i headless callery.

REST API (app/api/role_management.py, +496 LOC):
- 8 endpointů pod /api/admin: internal-roles list, group-mappings CRUD,
  users/{id}/role-grants CRUD, users/{id}/effective-roles debug.
- Všechny gated require_internal_role("core.admin"). Audit-log na každé
  mutaci (role_mapping.created/deleted, role_grant.created/deleted).
- Last-admin protection: refuse to delete the final core.admin grant
  (mirrors users.py:count_admins protection).
- Nový UserRoleGrantsRepository v src/repositories/user_role_grants.py.

CLI (cli/commands/admin.py extension, +258 LOC):
- da admin role list / show <key>
- da admin mapping list / create <group-id> <role-key> / delete <id>
- da admin grant-role <email> <role-key>
- da admin revoke-role <email> <role-key>
- da admin effective-roles <email>
- Všechno přes typer + PAT auth, --json flag, response-shape tolerantní.

UI (admin_role_mapping.html + admin_user_detail.html + nav + user list):
- Nová stránka /admin/role-mapping: internal_roles read-only table +
  group_mappings table with create/delete forms.
- Nová stránka /admin/users/{id}: core role single-select + capabilities
  multi-checkbox + effective-roles debug (direct + group + expanded).
- Existing user list dostává "Detail" link na novou stránku.
- Nav link na /admin/role-mapping.

Tests: +85 nových testů přes 4 nové soubory:
- test_schema_v9_migration.py (8) — fresh install + v8→v9 backfill +
  legacy column NULL semantics + unknown-role fallback + invariants.
- test_api_role_management.py (33) — všech 8 endpointů, happy + error
  paths, audit-log assertions, last-admin protection.
- test_cli_admin_role.py (25 + 1 conditional) — typer subcommands,
  text + json output, PAT integration smoke.
- test_admin_role_mapping_ui.py (9) + test_admin_user_capabilities_ui.py (10)
  — page rendering, auth gating, form contracts, JS hooks.
Full suite: 1482 passed, 6 skipped (was 1396 → +86, žádné regrese).

Docs:
- docs/internal-roles.md kompletní rewrite — odstranil "no UI yet",
  přidal hierarchy diagram, dual-path resolution, dotted-namespace
  convention, admin workflow přes UI/CLI/REST, refresh semantics
  for group mappings vs direct grants, migration notes.
- CLAUDE.md schema v8 → v9.
- CHANGELOG.md [0.11.4] s BREAKING marker pro users.role NULL
  semantics + complete Added/Changed/Removed/Internal sekce.
- pyproject.toml: 0.11.3 → 0.11.4.

Sequencing: po mergi tohoto PR Pabu rebasuje pabu/local-dev (PR #72)
na main, jeho schema migrations se posouvají z v9/v10/v11 na v10/v11/v12.

Implementation breakdown:
- Sequential (já): foundation tasks — schema v9, resolver, PAT-aware
  require_internal_role, backward-compat wrappers, rbac refactor,
  UserRepository auto-grant.
- Parallel sub-agents (3 worktrees, ~10 min): REST API, CLI, UI.
- Sequential (já): integrace, docs/CHANGELOG/version, schema tests,
  fullsuite verification.

* fix(auth): address Devin review on PR #73 — three regressions

Three concrete bugs caught in Devin's PR review, all fixed in this commit.

1. **users.role hydration on read** (the big one):
   v8→v9 migration NULLs users.role for every existing user, but a long
   tail of read sites still inspect user["role"] directly:
   - app/web/templates/_app_header.html:15 — admin nav gate
   - app/web/templates/_app_header.html:36-37 — role badge in dropdown
   - app/web/router.py:319-321 — UserInfo.is_admin/is_analyst/is_privileged
   - app/web/router.py:489 — corporate memory is_km_admin
   - app/api/catalog.py:54 — admin "see all tables" bypass
   - app/api/sync.py:215 — admin "see all sync states" bypass

   Without a fix, every existing admin loses the entire admin nav (and
   API admin bypasses) immediately after upgrade — a serious regression.

   Fix: new helper _hydrate_legacy_role() in app/auth/dependencies.py
   maps the highest-level core.* grant back into user["role"] as the
   legacy enum string. Called from get_current_user() on both auth paths
   (LOCAL_DEV_MODE + JWT/PAT). Idempotent — skips when role is already
   populated. Net effect: every pre-v9 callsite keeps working transparently
   for both OAuth and PAT callers, with one extra DB round-trip per
   authenticated request (same cost as the existing PAT-aware
   require_internal_role fallback).

   3 regression tests in tests/test_schema_v9_migration.py:
   - test_hydration_recovers_role_from_user_role_grants
   - test_hydration_returns_highest_grant (multi-grant → highest wins)
   - test_hydration_falls_back_to_viewer_when_no_grants (safe fallback)

2. **CLI effective-roles TypeError**:
   API returns direct/group as List[Dict] (RoleGrantResponse-shaped),
   but the CLI did ', '.join(direct) which raises TypeError on dicts.
   Tests masked it because mocks used bare string lists. Replaced
   raw .join() with a _names() helper that extracts role_key from
   each item, falling back to str() for legacy mock shapes.

3. **UI template field-name mismatch**:
   admin_user_detail.html JS reads data.groups but the API serializes
   the field as group (singular, per EffectiveRolesResponse pydantic).
   Currently benign because the API always returns group:[], but the
   field would silently disappear once the group-derived view is wired
   up. Added data.group as the primary lookup, kept the legacy aliases
   for shape-drift tolerance.

Full suite: 1485 passed (was 1482, +3 hydration tests), 6 skipped, no
regressions.

* fix(auth): Devin review #2 + UX self-service + RBAC docs rename

Three threads landed in one commit because they share the same
auth/role surface and CHANGELOG entry.

Devin review #73 second round (2 actionable findings):

- _hydrate_legacy_role no longer short-circuits on truthy users.role.
  The role-management endpoints (POST/DELETE /api/admin/users/{id}/
  role-grants + the changeCoreRole UI flow) only mutate
  user_role_grants — they don't update the legacy column. The early
  return trusted that stale value, so a user downgraded via the new
  REST/UI kept role="admin" in their dict on subsequent requests,
  which fooled _is_admin_user_dict (src/rbac.py) and the catalog/sync
  admin-bypass short-circuits into retaining elevated table access
  even though require_internal_role correctly denied the API gates.
  Always re-resolves now, making user_role_grants the single source
  of truth on every authenticated request. Cost: one DB round-trip
  per request — same as the existing PAT-aware fallback. Pinned by
  test_hydration_ignores_stale_legacy_role_after_grant_revoke.

- Dev-bypass (app/auth/dependencies.py) and OAuth callback
  (app/auth/providers/google.py) now pass user_id to
  resolve_internal_roles so direct grants land in
  session["internal_roles"] alongside group-mapped roles. Pre-fix,
  every admin-gated request fell through to the per-request DB
  fallback inside require_internal_role and the dev-bypass log line
  read "resolved 0 internal role(s)" for an obviously-admin user.
  test_session_internal_roles_populated updated to assert union.

User-visible UX (also addresses local-test feedback):

- HTTP 500 on /admin/users post-v8→v9 migration — UserResponse.role
  is required str, but legacy users.role was NULL-ed by the
  migration. _to_response in app/api/users.py now routes every dict
  through _hydrate_legacy_role; same fix lifts the silent no-op of
  last-admin protection in update_user/delete_user (the role-equality
  short-circuits would skip the count_admins guard for migrated
  admins). Three regression tests under TestAPIUsersPostMigration.

- /profile is now a real self-service detail page for *every*
  signed-in user (not just admins). Three new server-side sections:
  Effective roles (resolver output as chip cloud), Direct grants
  (rows in user_role_grants with source label), Roles via groups
  (which Cloud Identity / dev group grants which role for the
  current user). Non-admins finally see *why* a feature is or isn't
  accessible. Admins additionally see a deep-link to
  /admin/users/{id} for editing their own grants.

- /admin/role-mapping group-id picker. New "Known groups" panel
  above the create form: clickable chips for the calling admin's
  own session.google_groups (tagged "your group") merged with
  external_group_ids already used in existing mappings (tagged
  "already mapped"). Click a chip → fills the form. Empty-state
  copy points operators at LOCAL_DEV_GROUPS / Google sign-in
  instead of leaving them to guess Cloud Identity opaque IDs from
  memory.

Operational fixes:

- Scheduler log-noise: every cron tick produced a
  POST /auth/token 401 because the auto-fetch fallback called the
  endpoint with just an email (no password) and silently fell
  through. Removed the broken path entirely. Operators set
  SCHEDULER_API_TOKEN (long-lived PAT) in production; in
  LOCAL_DEV_MODE the dev-bypass auto-authenticates the un-tokenized
  request, so jobs continue to work.

Docs:

- docs/internal-roles.md → docs/RBAC.md (git mv preserves history).
  Standard industry term, more discoverable for engineers grepping
  for RBAC in a new repo. Restructured: Quickstart-by-role
  (operator / end-user / module author), step-by-step
  Module-author workflow with code examples (register key, gate
  endpoint, declare implies, write contract test), naming pitfalls,
  refresh semantics. CLAUDE.md gets a new
  "Extensibility → RBAC" section pointing contributors at the doc
  before they add gated endpoints. Cross-refs in app/api/admin.py
  + tests/test_role_resolver.py updated.

Tests: 293 in the auth/role/scheduler/UI test set passed, 0 regressions.

* fix(auth): Devin review #3 — login flows + RBAC docs

Two new findings on commit 7d1c048, both real and addressed.

Finding 1 (BUG, HTTP 500): every auth login flow loaded users via
UserRepository.get_by_email and passed user["role"] straight to
create_access_token, Pydantic response models, and _set_login_cookie
without going through _hydrate_legacy_role. Post-v9 the legacy column
is NULL for migrated users, and TokenResponse.role is a required str —
so POST /auth/token raised ValidationError → HTTP 500 for any v8-admin
trying to log in via password. Same root cause produced non-crashing
but semantically wrong JWTs (role: null) from Google OAuth, password
web flows, and email magic-link verification.

Fix: hydrate inline in every login flow before reading user["role"]:
- app/auth/router.py — POST /auth/token (the crash site)
- app/auth/providers/google.py — OAuth callback (was just stale JWT)
- app/auth/providers/password.py — 5 flows: JSON login, web login,
  JSON setup, web reset confirm, web setup confirm
- app/auth/providers/email.py — centralized in _consume_token,
  covers both /verify endpoints

New regression class TestAuthLoginFlowsPostMigration pins both the
no-crash and the correct-role contracts for all four legacy levels
(viewer/analyst/km_admin/admin) on POST /auth/token.

Finding 2 (DOCS): docs/RBAC.md showed register_internal_role() being
called with implies=[...], but the function signature is (key, *,
display_name, description, owner_module). A module author copying the
example would TypeError at import time. The implies field on
internal_roles IS honored at runtime by expand_implies, but the
registry-side write path (register_internal_role + InternalRoleSpec +
sync_registered_roles_to_db) doesn't exist yet — implies is currently
seeded only for the core.* hierarchy via _seed_core_roles in src/db.py.

Rewrote the Implies hierarchy and Module-author workflow sections to
document what's actually supported in 0.11.4 and what a future change
would need to add. The "for cross-module hierarchies, register each
level + grant both" pattern works today.

Tests: 322 in the auth/role/scheduler/UI/password test set passed,
0 regressions.

* fix(db): _seed_core_roles actually runs on every connect (Devin review #4)

Devin flagged that the docstring on `_seed_core_roles` promised per-connect
execution as a safety net for accidental DELETEs and in-code seed changes,
but the only call sites lived inside `if current < SCHEMA_VERSION:` — so
once a DB was on v9 the function never ran again, and the docstring lied.

Picked option (b) from the review (actually call it on every startup) over
option (a) (fix the docstring) because the safety net is genuinely useful:
- recovery from accidental admin DELETE on internal_roles,
- in-code _CORE_ROLES_SEED tweaks (display_name/description/implies)
  ship without a manual SQL deploy,
- fresh installs and migrations stop needing their own seed call sites.

Tail call gated by `get_schema_version(conn) <= SCHEMA_VERSION` so the
future-version-is-noop rollback contract still holds — a v9 binary won't
touch a DB that's been upgraded past v9.

Test coverage: new TestSeedCoreRolesSafetyNet class (3 tests) pins the
three contracts — deleted row re-seeds, mutated display_name re-syncs
from in-code seed, applied_at on schema_version doesn't churn on
already-current DBs. Existing TestMigrationSafety::test_future_version_is_noop
still passes (verified against the gating logic).
2026-04-27 02:23:01 +02:00

604 lines
22 KiB
Python

"""Admin commands — da admin."""
import json
import typer
from cli.client import api_get, api_post, api_delete, api_patch
admin_app = typer.Typer(help="Admin operations (requires admin role)")
@admin_app.command("add-user")
def add_user(
email: str = typer.Argument(..., help="User email"),
name: str = typer.Option("", help="User display name"),
role: str = typer.Option("analyst", help="Role: viewer, analyst, admin, km_admin"),
):
"""Add a new user."""
resp = api_post("/api/users", json={"email": email, "name": name or email.split("@")[0], "role": role})
if resp.status_code == 201:
data = resp.json()
typer.echo(f"Created user: {data['email']} (id: {data['id']}, role: {data['role']})")
else:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
@admin_app.command("list-users")
def list_users(as_json: bool = typer.Option(False, "--json")):
"""List all users."""
resp = api_get("/api/users")
if resp.status_code != 200:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
users = resp.json()
if as_json:
typer.echo(json.dumps(users, indent=2))
else:
for u in users:
status_str = "active" if u.get("active", True) else "DEACTIVATED"
typer.echo(
f" {u['email']:30s} role={u['role']:10s} {status_str:12s} id={u['id'][:8]}"
)
@admin_app.command("remove-user")
def remove_user(user_id: str = typer.Argument(..., help="User ID to remove")):
"""Remove a user."""
resp = api_delete(f"/api/users/{user_id}")
if resp.status_code == 204:
typer.echo("User removed.")
else:
typer.echo(f"Failed: {resp.text}", err=True)
raise typer.Exit(1)
@admin_app.command("register-table")
def register_table(
name: str = typer.Argument(..., help="Table display name"),
source_type: str = typer.Option("keboola", help="Source type"),
bucket: str = typer.Option("", help="Source bucket/dataset"),
source_table: str = typer.Option("", help="Source table name"),
query_mode: str = typer.Option("local", help="Query mode: local or remote"),
description: str = typer.Option("", help="Table description"),
):
"""Register a single table."""
resp = api_post("/api/admin/register-table", json={
"name": name,
"source_type": source_type,
"bucket": bucket,
"source_table": source_table or name,
"query_mode": query_mode,
"description": description,
})
if resp.status_code == 201:
typer.echo(f"Registered: {name}")
elif resp.status_code == 409:
typer.echo(f"Already exists: {name}")
else:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
@admin_app.command("discover-and-register")
def discover_and_register(
source_type: str = typer.Option("keboola", help="Source type"),
token: str = typer.Option(None, help="Keboola Storage API token"),
url: str = typer.Option(None, help="Keboola stack URL"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would be registered"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Discover all tables from source and register them."""
import httpx
import os
kbc_token = token or os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
kbc_url = url or os.environ.get("KEBOOLA_STACK_URL", "")
if not kbc_token or not kbc_url:
typer.echo("Need KEBOOLA_STORAGE_TOKEN and KEBOOLA_STACK_URL (env or --token/--url)", err=True)
raise typer.Exit(1)
typer.echo(f"Discovering tables from {kbc_url}...")
resp = httpx.get(f"{kbc_url.rstrip('/')}/v2/storage/tables",
headers={"X-StorageApi-Token": kbc_token}, timeout=30)
resp.raise_for_status()
tables = resp.json()
typer.echo(f"Found {len(tables)} tables")
if as_json and dry_run:
typer.echo(json.dumps([{"id": t["id"], "name": t["name"],
"bucket": t.get("bucket", {}).get("id", ""),
"rows": t.get("rowsCount", 0)} for t in tables], indent=2))
return
registered = 0
skipped = 0
errors = 0
for t in tables:
table_id = t["id"]
name = t["name"]
bucket_id = t.get("bucket", {}).get("id", "")
if dry_run:
typer.echo(f" [DRY RUN] {name:30s} bucket={bucket_id:20s} rows={t.get('rowsCount', 0):>10,}")
continue
resp = api_post("/api/admin/register-table", json={
"name": name,
"source_type": source_type,
"bucket": bucket_id,
"source_table": name,
"query_mode": "local",
"description": f"Auto-discovered from {source_type}",
})
if resp.status_code == 201:
registered += 1
typer.echo(f"{name}")
elif resp.status_code == 409:
skipped += 1
else:
errors += 1
typer.echo(f"{name}: {resp.json().get('detail', resp.text)}")
if not dry_run:
typer.echo(f"\nDone: {registered} registered, {skipped} already existed, {errors} errors")
@admin_app.command("list-tables")
def list_tables(as_json: bool = typer.Option(False, "--json")):
"""List registered tables."""
resp = api_get("/api/admin/registry")
if resp.status_code != 200:
typer.echo(f"Failed: {resp.text}", err=True)
raise typer.Exit(1)
data = resp.json()
if as_json:
typer.echo(json.dumps(data, indent=2))
else:
typer.echo(f"Registered tables: {data['count']}")
for t in data["tables"]:
typer.echo(f" {t['name']:30s} src={t.get('source_type','?'):10s} mode={t.get('query_mode','?'):6s} bucket={t.get('bucket',''):20s}")
@admin_app.command("metadata-show")
def metadata_show(
table_id: str = typer.Argument(..., help="Table ID to show metadata for"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Show column metadata for a table."""
resp = api_get(f"/api/admin/metadata/{table_id}")
if resp.status_code != 200:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
data = resp.json()
if as_json:
typer.echo(json.dumps(data, indent=2))
else:
columns = data.get("columns", [])
if not columns:
typer.echo(f"No column metadata for table: {table_id}")
return
typer.echo(f"Column metadata for table: {table_id} ({len(columns)} columns)")
typer.echo(f" {'COLUMN':<30s} {'BASETYPE':<12s} {'CONFIDENCE':<12s} DESCRIPTION")
typer.echo(" " + "-" * 80)
for col in columns:
typer.echo(
f" {col['column_name']:<30s} {col.get('basetype') or '':^12s} "
f"{col.get('confidence') or '':^12s} {col.get('description') or ''}"
)
@admin_app.command("metadata-apply")
def metadata_apply(
proposal_path: str = typer.Argument(..., help="Path to proposal JSON file"),
push_to_source: bool = typer.Option(False, "--push-to-source", help="Push metadata to Keboola after import"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show what would change without applying"),
):
"""Apply a metadata proposal JSON to DuckDB."""
import os
if not os.path.exists(proposal_path):
typer.echo(f"Proposal file not found: {proposal_path}", err=True)
raise typer.Exit(1)
with open(proposal_path, "r", encoding="utf-8") as f:
proposal = json.load(f)
tables = proposal.get("tables", {})
total = sum(len(t.get("columns", {})) for t in tables.values())
if dry_run:
typer.echo(f"[DRY RUN] Would import {total} column(s) from {len(tables)} table(s):")
for table_id, table_data in tables.items():
columns = table_data.get("columns", {})
for col_name, col_data in columns.items():
typer.echo(
f" {table_id}.{col_name}: basetype={col_data.get('basetype')} "
f"description={col_data.get('description')}"
)
return
from src.repositories.column_metadata import ColumnMetadataRepository
from src.db import get_system_db
conn = get_system_db()
try:
repo = ColumnMetadataRepository(conn)
count = repo.import_proposal(proposal_path)
typer.echo(f"Imported {count} column(s) from proposal.")
finally:
conn.close()
if push_to_source:
for table_id in tables:
resp = api_post(f"/api/admin/metadata/{table_id}/push")
if resp.status_code == 200:
typer.echo(f"Pushed metadata for {table_id} to source.")
else:
typer.echo(f"Failed to push {table_id}: {resp.json().get('detail', resp.text)}", err=True)
# ---- User management (#11) ----
def _resolve_user_id(ref: str) -> str:
"""Accept either a UUID or an email; look up email → id via list."""
if "@" not in ref:
return ref
resp = api_get("/api/users")
if resp.status_code != 200:
typer.echo(f"Could not list users: {resp.text}", err=True)
raise typer.Exit(1)
for u in resp.json():
if u.get("email") == ref:
return u["id"]
typer.echo(f"User not found: {ref}", err=True)
raise typer.Exit(1)
def _print_user_result(resp, ok_msg: str) -> None:
if resp.status_code in (200, 204):
typer.echo(ok_msg)
else:
try:
detail = resp.json().get("detail", resp.text)
except Exception:
detail = resp.text
typer.echo(f"Failed: {detail}", err=True)
raise typer.Exit(1)
@admin_app.command("set-role")
def set_role(
user_ref: str = typer.Argument(..., help="User id or email"),
role: str = typer.Argument(..., help="viewer | analyst | km_admin | admin"),
):
"""Set a user's role."""
uid = _resolve_user_id(user_ref)
resp = api_patch(f"/api/users/{uid}", json={"role": role})
_print_user_result(resp, f"Updated role for {user_ref}{role}")
@admin_app.command("deactivate")
def deactivate(user_ref: str = typer.Argument(..., help="User id or email")):
"""Deactivate a user (blocks login, existing tokens also rejected)."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/deactivate")
_print_user_result(resp, f"Deactivated {user_ref}")
@admin_app.command("activate")
def activate(user_ref: str = typer.Argument(..., help="User id or email")):
"""Re-activate a deactivated user."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/activate")
_print_user_result(resp, f"Activated {user_ref}")
@admin_app.command("reset-password")
def reset_password(user_ref: str = typer.Argument(..., help="User id or email")):
"""Generate a reset token (emailed if SMTP/SendGrid configured)."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/reset-password")
if resp.status_code == 200:
data = resp.json()
typer.echo(f"Reset token: {data['reset_token']}")
typer.echo(f"Email sent: {data['email_sent']}")
else:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
@admin_app.command("set-password")
def set_password(
user_ref: str = typer.Argument(..., help="User id or email"),
password: str = typer.Option(
..., prompt=True, hide_input=True, confirmation_prompt=True,
help="New password (hidden input)",
),
):
"""Set a user's password directly (force-reset flow)."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/set-password", json={"password": password})
if resp.status_code == 204:
typer.echo(f"Password set for {user_ref}")
else:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
# ---- Role management (v9 — internal_roles + group_mappings + user_role_grants) ----
#
# Calls the role-management REST API under /api/admin (see app/api/role_management.py).
# All endpoints require core.admin; PAT auth is supported uniformly via the v9
# require_internal_role two-path resolver.
role_app = typer.Typer(help="Internal-role browsing (read-only)")
mapping_app = typer.Typer(help="External group → internal role mapping CRUD")
admin_app.add_typer(role_app, name="role")
admin_app.add_typer(mapping_app, name="mapping")
def _fail(resp, prefix: str = "Failed") -> None:
"""Print API failure detail and raise typer.Exit(1)."""
try:
detail = resp.json().get("detail", resp.text)
except Exception:
detail = resp.text
typer.echo(f"{prefix}: {detail}", err=True)
raise typer.Exit(1)
def _print_rows(rows: list, columns: list[tuple[str, str, int]]) -> None:
"""Render a list of dicts as a fixed-width table.
columns: list of (key, header, width) — order matches the column display.
"""
header = " " + " ".join(f"{h:<{w}s}" for _, h, w in columns)
typer.echo(header)
typer.echo(" " + "-" * (len(header) - 2))
for row in rows:
cells = []
for key, _, width in columns:
val = row.get(key)
cells.append(f"{(str(val) if val is not None else ''):<{width}s}")
typer.echo(" " + " ".join(cells))
@role_app.command("list")
def role_list(as_json: bool = typer.Option(False, "--json", help="Output as JSON")):
"""List all internal roles (registered capability keys)."""
resp = api_get("/api/admin/internal-roles")
if resp.status_code != 200:
_fail(resp)
data = resp.json()
# Endpoint may return a list or {roles: [...]} — accept either shape.
roles = data["roles"] if isinstance(data, dict) and "roles" in data else data
if as_json:
typer.echo(json.dumps(roles, indent=2))
return
if not roles:
typer.echo("No internal roles registered.")
return
typer.echo(f"Internal roles: {len(roles)}")
_print_rows(roles, [
("key", "KEY", 30),
("display_name", "DISPLAY NAME", 28),
("owner_module", "OWNER", 16),
("is_core", "CORE", 5),
])
@role_app.command("show")
def role_show(
role_key: str = typer.Argument(..., help="Role key, e.g. core.admin"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Show a single role detail with mapping + grant counts."""
# The list endpoint is the canonical reader; iterate to find the key.
resp = api_get("/api/admin/internal-roles")
if resp.status_code != 200:
_fail(resp)
data = resp.json()
roles = data["roles"] if isinstance(data, dict) and "roles" in data else data
role = next((r for r in roles if r.get("key") == role_key), None)
if role is None:
typer.echo(f"Role not found: {role_key}", err=True)
raise typer.Exit(1)
mappings_resp = api_get("/api/admin/group-mappings")
if mappings_resp.status_code != 200:
_fail(mappings_resp)
mdata = mappings_resp.json()
mappings = mdata["mappings"] if isinstance(mdata, dict) and "mappings" in mdata else mdata
matching_mappings = [
m for m in mappings
if m.get("role_key") == role_key or m.get("internal_role_key") == role_key
]
# Grants are exposed per-user in the API contract; we summarize what's
# cheaply visible here (matched mappings) and leave per-user grants to
# `da admin effective-roles <email>`.
payload = {
"role": role,
"mapping_count": len(matching_mappings),
"mappings": matching_mappings,
}
if as_json:
typer.echo(json.dumps(payload, indent=2))
return
typer.echo(f"Role: {role.get('key')}")
typer.echo(f" display_name : {role.get('display_name', '')}")
typer.echo(f" description : {role.get('description', '') or ''}")
typer.echo(f" owner_module : {role.get('owner_module', '') or ''}")
typer.echo(f" is_core : {bool(role.get('is_core'))}")
implies = role.get("implies")
if isinstance(implies, str):
try:
implies = json.loads(implies)
except (TypeError, ValueError):
implies = []
typer.echo(f" implies : {', '.join(implies) if implies else '(none)'}")
typer.echo(f" mappings : {len(matching_mappings)}")
@mapping_app.command("list")
def mapping_list(as_json: bool = typer.Option(False, "--json", help="Output as JSON")):
"""List all external-group → internal-role mappings."""
resp = api_get("/api/admin/group-mappings")
if resp.status_code != 200:
_fail(resp)
data = resp.json()
mappings = data["mappings"] if isinstance(data, dict) and "mappings" in data else data
if as_json:
typer.echo(json.dumps(mappings, indent=2))
return
if not mappings:
typer.echo("No group mappings configured.")
return
# Normalize role_key (some API shapes nest it under `internal_role_key`).
for m in mappings:
if "role_key" not in m and "internal_role_key" in m:
m["role_key"] = m["internal_role_key"]
typer.echo(f"Group mappings: {len(mappings)}")
_print_rows(mappings, [
("external_group_id", "EXTERNAL GROUP", 40),
("role_key", "ROLE KEY", 28),
("assigned_by", "ASSIGNED BY", 24),
("id", "MAPPING ID", 36),
])
@mapping_app.command("create")
def mapping_create(
external_group_id: str = typer.Argument(..., help="Cloud Identity group ID"),
role_key: str = typer.Argument(..., help="Internal role key, e.g. core.admin"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Map an external group to an internal role."""
resp = api_post(
"/api/admin/group-mappings",
json={"external_group_id": external_group_id, "role_key": role_key},
)
if resp.status_code not in (200, 201):
_fail(resp)
created = resp.json()
if as_json:
typer.echo(json.dumps(created, indent=2))
return
typer.echo(
f"Created mapping: {created.get('external_group_id')}"
f"{created.get('role_key') or created.get('internal_role_key')} "
f"(id={created.get('id')})"
)
@mapping_app.command("delete")
def mapping_delete(
mapping_id: str = typer.Argument(..., help="Mapping ID to delete"),
):
"""Delete a group mapping by ID."""
resp = api_delete(f"/api/admin/group-mappings/{mapping_id}")
if resp.status_code in (200, 204):
typer.echo(f"Deleted mapping {mapping_id}")
return
_fail(resp)
@admin_app.command("grant-role")
def grant_role(
user_email: str = typer.Argument(..., help="User email"),
role_key: str = typer.Argument(..., help="Internal role key, e.g. core.admin"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Grant an internal role directly to a user (PAT-friendly flow)."""
uid = _resolve_user_id(user_email)
resp = api_post(
f"/api/admin/users/{uid}/role-grants",
json={"role_key": role_key},
)
if resp.status_code not in (200, 201):
_fail(resp)
granted = resp.json()
if as_json:
typer.echo(json.dumps(granted, indent=2))
return
typer.echo(
f"Granted {role_key} to {user_email} (grant_id={granted.get('id')})"
)
@admin_app.command("revoke-role")
def revoke_role(
user_email: str = typer.Argument(..., help="User email"),
role_key: str = typer.Argument(..., help="Internal role key to revoke"),
):
"""Revoke a previously-granted internal role from a user."""
uid = _resolve_user_id(user_email)
list_resp = api_get(f"/api/admin/users/{uid}/role-grants")
if list_resp.status_code != 200:
_fail(list_resp, prefix="Failed to list grants")
data = list_resp.json()
grants = data["grants"] if isinstance(data, dict) and "grants" in data else data
matching = [
g for g in grants
if g.get("role_key") == role_key or g.get("internal_role_key") == role_key
]
if not matching:
typer.echo(
f"No active grant for {user_email} with role_key={role_key}", err=True,
)
raise typer.Exit(1)
grant_id = matching[0].get("id")
if not grant_id:
typer.echo(f"Grant row missing id: {matching[0]!r}", err=True)
raise typer.Exit(1)
del_resp = api_delete(f"/api/admin/users/{uid}/role-grants/{grant_id}")
if del_resp.status_code in (200, 204):
typer.echo(f"Revoked {role_key} from {user_email}")
return
_fail(del_resp)
@admin_app.command("effective-roles")
def effective_roles(
user_email: str = typer.Argument(..., help="User email"),
as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
"""Show the user's effective roles (direct + group + expanded)."""
uid = _resolve_user_id(user_email)
resp = api_get(f"/api/admin/users/{uid}/effective-roles")
if resp.status_code != 200:
_fail(resp)
data = resp.json()
if as_json:
typer.echo(json.dumps(data, indent=2))
return
typer.echo(f"Effective roles for {user_email}:")
direct = data.get("direct") or data.get("direct_roles") or []
group = data.get("group") or data.get("group_roles") or []
expanded = data.get("expanded") or data.get("effective") or data.get("effective_roles") or []
# API response shape: direct/group are List[Dict] (RoleGrantResponse-like
# with role_key + grant metadata), expanded is List[str]. Render uniformly
# by extracting role_key from dicts and falling back to str() for legacy
# mock shapes that yield bare strings.
def _names(items):
return ", ".join(
(it.get("role_key") or it.get("key") or str(it))
if isinstance(it, dict) else str(it)
for it in items
)
typer.echo(f" direct : {_names(direct) if direct else '(none)'}")
typer.echo(f" group : {_names(group) if group else '(none)'}")
typer.echo(f" expanded : {', '.join(expanded) if expanded else '(none)'}")