* feat(rbac): drop dataset_permissions + access_requests + users.role + is_public; v19 migration
BREAKING. Sjednocení datové RBAC vrstvy do per-group resource_grants modelu.
Před PR byla legacy data RBAC vrstva (dataset_permissions + is_public bypass)
de-facto neaktivní — is_public neměl API/UI/CLI surface, default true znamenal
že can_access_table vždycky bypassl. Dnes každý non-admin přístup vyžaduje
explicitní resource_grants(group, "table", id) řádek.
Schema v18 → v19 (src/db.py:_v18_to_v19_finalize):
- DROP TABLE dataset_permissions, access_requests
- DROP COLUMN users.role (NULL artifact since v13)
- DROP COLUMN table_registry.is_public
- Drops přes table-rebuild idiom (rename → create new → INSERT … SELECT
→ drop old) kvůli DuckDB ALTER DROP COLUMN limitacím na tabulkách
s historic FK constraints. INSERT picks intersection sloupců, takže
test fixtures s minimal pre-v19 schemou migrate cleanly.
Runtime:
- src/rbac.py:can_access_table → deleguje na app.auth.access.can_access
- DatasetPermissionRepository, AccessRequestRepository smazány
- AGNES_ENABLE_TABLE_GRANTS env-gate v app/resource_types.py odstraněn
(TABLE je unconditionally enabled)
API drop:
- app/api/permissions.py, app/api/access_requests.py celé soubory
- /admin/permissions web route + admin_permissions.html
- "Request Access" modal v catalog.html + locked-row UI
- ~10 if user.get("role") != "admin" checků nahrazeno (admin shortcut
je uvnitř can_access_table)
- /api/settings: drop permissions field z GET; PUT /api/settings/dataset
gate přepnut na can_access(user_id, "table", dataset, conn)
Auth:
- app/auth/jwt.py:create_access_token: drop role parametr (claim zmizí
z nově vydávaných JWT; staré tokeny zůstávají valid, claim ignored)
- app/api/users.py: drop role z CreateUserRequest / UpdateUserRequest
(admin promotion = explicit add to Admin group via memberships API)
- src/repositories/users.py: drop role z create() / update()
CLI:
- da admin set-role smazán → hard-fail s replacement command
- da admin add-user --role flag pryč
- da auth import-token --role flag pryč
- da auth whoami: drop "Role:" výpis
- cli/config.py:save_token: role parametr now optional, no longer written
(back-compat se starými token.json soubory zachována — pole se ignoruje)
Tests:
- DELETE: test_permissions.py, test_permissions_api.py, test_access_requests_api.py
- REWRITE: test_access_control.py (resource_grants flow), test_rbac.py
(can_access_table over resource_grants), test_journey_rbac.py
(drop access-request flow), test_resource_types.py (drop env-gate
tests, drop is_public from helpers), test_v2_*.py (drop role-based
user dicts in favor of id-based + Admin group membership),
test_settings_api.py (no permissions field, can_access gate)
- TRIVIAL: ~30 souborů — drop role="admin" arg z UserRepository.create
a 3rd positional role z create_access_token
- NEW: test_v18_to_v19 migration test (test_db.py),
test_can_access_table_no_implicit_public (test_rbac.py),
test_admin_set_role_returns_hardfail (test_cli_admin.py)
- OpenAPI snapshot regenerated
Docs:
- CHANGELOG: BREAKING entry pod [Unreleased]
- CLAUDE.md: schema v18 → v19
- docs/architecture.md: schema table + RBAC sekce přepsána
- docs/auth-google-oauth.md: admin promotion přes da admin break-glass
- cli/skills/security.md: kompletně přepsáno na group-based model
- docs/TODO-rbac-data-enforcement.md: smazáno (TODO splněn)
Test results: 2363 passed, 19 failed. Zbývající failures jsou pre-existing
Windows-specific issues (fcntl, charset) nesouvisející s tímto PR —
ověřeno git stash pop.
Plan: ~/.claude/plans/floofy-coalescing-parnas.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* chore(release): cut 0.27.0
---------
Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: ZdenekSrotyr <zdenek.srotyr@keboola.com>
322 lines
12 KiB
Python
322 lines
12 KiB
Python
"""Tests for #11 — user management (active flag, safeguards, endpoints)."""
|
|
|
|
import os
|
|
import tempfile
|
|
import pytest
|
|
|
|
import duckdb
|
|
|
|
from src.db import _ensure_schema, get_schema_version
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db(monkeypatch):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
monkeypatch.setenv("DATA_DIR", tmp)
|
|
# Reset cached system DB so we open a brand-new instance in tmp
|
|
from src.db import close_system_db
|
|
close_system_db()
|
|
yield tmp
|
|
close_system_db()
|
|
|
|
|
|
def test_schema_v5_adds_active_column(fresh_db):
|
|
from src.db import get_system_db, close_system_db
|
|
conn = get_system_db()
|
|
try:
|
|
cols = conn.execute("PRAGMA table_info(users)").fetchall()
|
|
col_names = [c[1] for c in cols]
|
|
assert "active" in col_names
|
|
assert "deactivated_at" in col_names
|
|
assert "deactivated_by" in col_names
|
|
assert get_schema_version(conn) >= 5
|
|
finally:
|
|
conn.close()
|
|
close_system_db()
|
|
|
|
|
|
def test_schema_v5_backfill_keeps_existing_users_active(fresh_db):
|
|
"""Simulate upgrading from v4: insert a user pre-migration, verify active=TRUE afterwards."""
|
|
import uuid
|
|
import duckdb as _duckdb
|
|
from pathlib import Path
|
|
|
|
# 1. Create a v4-era DB by hand.
|
|
db_dir = Path(fresh_db) / "state"
|
|
db_dir.mkdir(parents=True, exist_ok=True)
|
|
db_path = db_dir / "system.duckdb"
|
|
conn = _duckdb.connect(str(db_path))
|
|
try:
|
|
conn.execute("CREATE TABLE schema_version (version INTEGER NOT NULL, applied_at TIMESTAMP DEFAULT current_timestamp)")
|
|
conn.execute("INSERT INTO schema_version (version) VALUES (4)")
|
|
conn.execute("""CREATE TABLE users (
|
|
id VARCHAR PRIMARY KEY, email VARCHAR UNIQUE NOT NULL,
|
|
name VARCHAR, role VARCHAR DEFAULT 'analyst',
|
|
password_hash VARCHAR, setup_token VARCHAR,
|
|
setup_token_created TIMESTAMP, reset_token VARCHAR,
|
|
reset_token_created TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT current_timestamp, updated_at TIMESTAMP)""")
|
|
uid = str(uuid.uuid4())
|
|
conn.execute("INSERT INTO users (id, email, name, role) VALUES (?, 'pre@v4', 'Pre', 'admin')", [uid])
|
|
finally:
|
|
conn.close()
|
|
|
|
# 2. Now let the app open it — schema should migrate to v5 and backfill active=TRUE.
|
|
from src.db import get_system_db, close_system_db, get_schema_version
|
|
close_system_db()
|
|
conn = get_system_db()
|
|
try:
|
|
assert get_schema_version(conn) >= 5
|
|
row = conn.execute("SELECT email, active FROM users WHERE email = 'pre@v4'").fetchone()
|
|
assert row is not None
|
|
assert row[1] is True
|
|
finally:
|
|
conn.close()
|
|
close_system_db()
|
|
|
|
|
|
def test_repository_update_accepts_active(fresh_db):
|
|
import uuid
|
|
from src.db import get_system_db, close_system_db
|
|
from src.repositories.users import UserRepository
|
|
conn = get_system_db()
|
|
try:
|
|
repo = UserRepository(conn)
|
|
uid = str(uuid.uuid4())
|
|
repo.create(id=uid, email="a@b.c", name="A")
|
|
repo.update(id=uid, active=False, deactivated_by="admin-uuid")
|
|
row = repo.get_by_id(uid)
|
|
assert row["active"] is False
|
|
assert row["deactivated_by"] == "admin-uuid"
|
|
finally:
|
|
conn.close()
|
|
close_system_db()
|
|
|
|
|
|
def test_repository_count_admins(fresh_db):
|
|
"""v12: count_admins counts users in the Admin system group, not users.role."""
|
|
import uuid
|
|
from src.db import SYSTEM_ADMIN_GROUP, get_system_db, close_system_db
|
|
from src.repositories.user_group_members import UserGroupMembersRepository
|
|
from src.repositories.users import UserRepository
|
|
conn = get_system_db()
|
|
try:
|
|
repo = UserRepository(conn)
|
|
assert repo.count_admins() == 0
|
|
admin_gid = conn.execute(
|
|
"SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP]
|
|
).fetchone()[0]
|
|
admin_id = str(uuid.uuid4())
|
|
repo.create(id=admin_id, email="a@b.c", name="A")
|
|
UserGroupMembersRepository(conn).add_member(admin_id, admin_gid, source="system_seed")
|
|
repo.create(id=str(uuid.uuid4()), email="b@b.c", name="B")
|
|
assert repo.count_admins() == 1
|
|
finally:
|
|
conn.close()
|
|
close_system_db()
|
|
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
@pytest.fixture
|
|
def app_client(fresh_db, monkeypatch):
|
|
monkeypatch.setenv("TESTING", "1")
|
|
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
|
|
from app.main import app
|
|
return TestClient(app)
|
|
|
|
|
|
def _seed_admin(fresh_db):
|
|
"""Create an admin user (in Admin user_group) and return (id, bearer_token)."""
|
|
import uuid
|
|
from src.db import SYSTEM_ADMIN_GROUP, get_system_db
|
|
from src.repositories.user_group_members import UserGroupMembersRepository
|
|
from src.repositories.users import UserRepository
|
|
from app.auth.jwt import create_access_token
|
|
conn = get_system_db()
|
|
try:
|
|
uid = str(uuid.uuid4())
|
|
UserRepository(conn).create(id=uid, email="admin@test", name="Admin")
|
|
admin_gid = conn.execute(
|
|
"SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP]
|
|
).fetchone()[0]
|
|
UserGroupMembersRepository(conn).add_member(uid, admin_gid, source="system_seed")
|
|
token = create_access_token(user_id=uid, email="admin@test")
|
|
return uid, token
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_patch_user_updates_role(app_client, fresh_db):
|
|
import uuid
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
admin_id, token = _seed_admin(fresh_db)
|
|
target_id = str(uuid.uuid4())
|
|
conn = get_system_db()
|
|
try:
|
|
UserRepository(conn).create(id=target_id, email="x@test", name="X")
|
|
finally:
|
|
conn.close()
|
|
|
|
resp = app_client.patch(
|
|
f"/api/users/{target_id}",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"role": "analyst", "name": "X2"},
|
|
)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
# v12: role response is admin/user based on Admin group membership.
|
|
# Patching role="analyst" is a no-op for the admin group → still "user".
|
|
assert data["role"] == "user"
|
|
assert data["name"] == "X2"
|
|
|
|
|
|
def test_cannot_self_deactivate(app_client, fresh_db):
|
|
admin_id, token = _seed_admin(fresh_db)
|
|
resp = app_client.patch(
|
|
f"/api/users/{admin_id}",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"active": False},
|
|
)
|
|
assert resp.status_code == 409
|
|
assert "yourself" in resp.json()["detail"].lower()
|
|
|
|
|
|
def test_cannot_delete_last_admin(app_client, fresh_db):
|
|
"""Deleting the sole active admin must 409.
|
|
Note: the endpoint checks self-delete first, which also triggers 409 here,
|
|
so we accept either "yourself" or "last" wording — the point is the
|
|
safeguard blocks deletion of the only admin."""
|
|
admin_id, token = _seed_admin(fresh_db)
|
|
# Create a non-admin so we have ≥2 users, but admin is still the only admin.
|
|
resp = app_client.post(
|
|
"/api/users",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"email": "x@test", "name": "X", "role": "viewer"},
|
|
)
|
|
x_id = resp.json()["id"]
|
|
# Try deleting the admin.
|
|
resp = app_client.delete(
|
|
f"/api/users/{admin_id}",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
assert resp.status_code == 409
|
|
detail = resp.json()["detail"].lower()
|
|
assert "last" in detail or "yourself" in detail
|
|
|
|
|
|
def test_deactivated_user_cannot_authenticate(app_client, fresh_db):
|
|
"""A deactivated user's old JWT must be rejected."""
|
|
import uuid
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
from app.auth.jwt import create_access_token
|
|
|
|
conn = get_system_db()
|
|
try:
|
|
uid = str(uuid.uuid4())
|
|
UserRepository(conn).create(id=uid, email="u@test", name="U")
|
|
token = create_access_token(user_id=uid, email="u@test")
|
|
UserRepository(conn).update(id=uid, active=False)
|
|
finally:
|
|
conn.close()
|
|
|
|
resp = app_client.get(
|
|
"/api/users", # any authenticated endpoint
|
|
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
|
|
)
|
|
# Deactivated — must not succeed.
|
|
assert resp.status_code in (401, 403)
|
|
|
|
|
|
def test_admin_users_page_renders_for_admin(app_client, fresh_db):
|
|
admin_id, token = _seed_admin(fresh_db)
|
|
resp = app_client.get(
|
|
"/admin/users",
|
|
headers={"Accept": "text/html"},
|
|
cookies={"access_token": token},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert 'class="users-title">Users' in resp.text
|
|
|
|
|
|
def test_admin_users_page_denies_non_admin(app_client, fresh_db):
|
|
import uuid
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
from app.auth.jwt import create_access_token
|
|
conn = get_system_db()
|
|
try:
|
|
uid = str(uuid.uuid4())
|
|
UserRepository(conn).create(id=uid, email="a@test", name="A")
|
|
token = create_access_token(user_id=uid, email="a@test")
|
|
finally:
|
|
conn.close()
|
|
resp = app_client.get(
|
|
"/admin/users",
|
|
headers={"Accept": "text/html"},
|
|
cookies={"access_token": token},
|
|
follow_redirects=False,
|
|
)
|
|
# HTML request to admin-only page → 302 (to /login) for non-admin per Phase 0, or 403.
|
|
# Phase 0 is out of scope here so we accept 403 (current behaviour) or 302.
|
|
assert resp.status_code in (302, 403)
|
|
|
|
|
|
def test_deactivated_admin_rejected_by_active_check(app_client, fresh_db):
|
|
"""Deactivating an admin must cause their token to be rejected as 401 (not succeed)."""
|
|
import uuid
|
|
from src.db import get_system_db
|
|
from src.repositories.users import UserRepository
|
|
from app.auth.jwt import create_access_token
|
|
|
|
# Seed two admins so we can deactivate one without tripping the last-admin rule.
|
|
admin_id, admin_token = _seed_admin(fresh_db)
|
|
conn = get_system_db()
|
|
try:
|
|
other_uid = str(uuid.uuid4())
|
|
UserRepository(conn).create(id=other_uid, email="other@test", name="Other")
|
|
other_token = create_access_token(user_id=other_uid, email="other@test")
|
|
# Directly deactivate the "other" admin via repository (bypass safeguard
|
|
# because we already have 2 admins; this is just a state setup).
|
|
UserRepository(conn).update(id=other_uid, active=False)
|
|
finally:
|
|
conn.close()
|
|
|
|
resp = app_client.get(
|
|
"/api/users",
|
|
headers={"Authorization": f"Bearer {other_token}", "Accept": "application/json"},
|
|
)
|
|
assert resp.status_code == 401
|
|
assert "deactivated" in resp.json().get("detail", "").lower()
|
|
|
|
|
|
def test_cannot_deactivate_last_admin(app_client, fresh_db):
|
|
"""v19: try to deactivate the last active admin → 409.
|
|
Admin demotion is now done via group membership (DELETE /api/admin/users/{id}/memberships/{group_id}),
|
|
but the deactivate path retains its own last-admin guard.
|
|
"""
|
|
admin_id, token = _seed_admin(fresh_db)
|
|
# Create a second non-admin user.
|
|
resp = app_client.post(
|
|
"/api/users",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"email": "y@test", "name": "Y"},
|
|
)
|
|
assert resp.status_code == 201
|
|
# Try to deactivate the only active admin → must fail.
|
|
resp = app_client.patch(
|
|
f"/api/users/{admin_id}",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
json={"active": False},
|
|
)
|
|
# The endpoint blocks deactivation for the last active admin BEFORE the
|
|
# self-deactivate check (the user IS themselves, but the message says "last
|
|
# active admin"). Either error is acceptable — both signal the constraint.
|
|
assert resp.status_code == 409
|
|
assert (
|
|
"admin" in resp.json()["detail"].lower()
|
|
or "yourself" in resp.json()["detail"].lower()
|
|
)
|