agnes-the-ai-analyst/tests/test_cli_auth.py
minasarustamyan d4ac84dd46
feat(rbac): drop dataset_permissions + users.role + is_public; v19 migration (#150)
* feat(rbac): drop dataset_permissions + access_requests + users.role + is_public; v19 migration

BREAKING. Sjednocení datové RBAC vrstvy do per-group resource_grants modelu.
Před PR byla legacy data RBAC vrstva (dataset_permissions + is_public bypass)
de-facto neaktivní — is_public neměl API/UI/CLI surface, default true znamenal
že can_access_table vždycky bypassl. Dnes každý non-admin přístup vyžaduje
explicitní resource_grants(group, "table", id) řádek.

Schema v18 → v19 (src/db.py:_v18_to_v19_finalize):
- DROP TABLE dataset_permissions, access_requests
- DROP COLUMN users.role (NULL artifact since v13)
- DROP COLUMN table_registry.is_public
- Drops přes table-rebuild idiom (rename → create new → INSERT … SELECT
  → drop old) kvůli DuckDB ALTER DROP COLUMN limitacím na tabulkách
  s historic FK constraints. INSERT picks intersection sloupců, takže
  test fixtures s minimal pre-v19 schemou migrate cleanly.

Runtime:
- src/rbac.py:can_access_table → deleguje na app.auth.access.can_access
- DatasetPermissionRepository, AccessRequestRepository smazány
- AGNES_ENABLE_TABLE_GRANTS env-gate v app/resource_types.py odstraněn
  (TABLE je unconditionally enabled)

API drop:
- app/api/permissions.py, app/api/access_requests.py celé soubory
- /admin/permissions web route + admin_permissions.html
- "Request Access" modal v catalog.html + locked-row UI
- ~10 if user.get("role") != "admin" checků nahrazeno (admin shortcut
  je uvnitř can_access_table)
- /api/settings: drop permissions field z GET; PUT /api/settings/dataset
  gate přepnut na can_access(user_id, "table", dataset, conn)

Auth:
- app/auth/jwt.py:create_access_token: drop role parametr (claim zmizí
  z nově vydávaných JWT; staré tokeny zůstávají valid, claim ignored)
- app/api/users.py: drop role z CreateUserRequest / UpdateUserRequest
  (admin promotion = explicit add to Admin group via memberships API)
- src/repositories/users.py: drop role z create() / update()

CLI:
- da admin set-role smazán → hard-fail s replacement command
- da admin add-user --role flag pryč
- da auth import-token --role flag pryč
- da auth whoami: drop "Role:" výpis
- cli/config.py:save_token: role parametr now optional, no longer written
  (back-compat se starými token.json soubory zachována — pole se ignoruje)

Tests:
- DELETE: test_permissions.py, test_permissions_api.py, test_access_requests_api.py
- REWRITE: test_access_control.py (resource_grants flow), test_rbac.py
  (can_access_table over resource_grants), test_journey_rbac.py
  (drop access-request flow), test_resource_types.py (drop env-gate
  tests, drop is_public from helpers), test_v2_*.py (drop role-based
  user dicts in favor of id-based + Admin group membership),
  test_settings_api.py (no permissions field, can_access gate)
- TRIVIAL: ~30 souborů — drop role="admin" arg z UserRepository.create
  a 3rd positional role z create_access_token
- NEW: test_v18_to_v19 migration test (test_db.py),
  test_can_access_table_no_implicit_public (test_rbac.py),
  test_admin_set_role_returns_hardfail (test_cli_admin.py)
- OpenAPI snapshot regenerated

Docs:
- CHANGELOG: BREAKING entry pod [Unreleased]
- CLAUDE.md: schema v18 → v19
- docs/architecture.md: schema table + RBAC sekce přepsána
- docs/auth-google-oauth.md: admin promotion přes da admin break-glass
- cli/skills/security.md: kompletně přepsáno na group-based model
- docs/TODO-rbac-data-enforcement.md: smazáno (TODO splněn)

Test results: 2363 passed, 19 failed. Zbývající failures jsou pre-existing
Windows-specific issues (fcntl, charset) nesouvisející s tímto PR —
ověřeno git stash pop.

Plan: ~/.claude/plans/floofy-coalescing-parnas.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(release): cut 0.27.0

---------

Co-authored-by: Minas Arustamyan <arustamyan.minas@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: ZdenekSrotyr <zdenek.srotyr@keboola.com>
2026-04-30 22:02:16 +02:00

258 lines
10 KiB
Python

"""Tests for da auth login/logout/whoami commands."""
import json
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local"))
(tmp_path / "config").mkdir()
(tmp_path / "local").mkdir()
yield tmp_path
def _make_response(status_code=200, json_data=None, text=""):
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = json_data or {}
resp.text = text
return resp
class TestAuthLogin:
def test_login_success(self):
"""Login with valid credentials saves token and shows confirmation."""
mock_resp = _make_response(200, {
"access_token": "tok123",
"email": "alice@example.com",
"role": "user",
})
with patch("cli.commands.auth.api_post", return_value=mock_resp):
with patch("cli.commands.auth.save_token") as mock_save:
# Empty password (simulates magic-link / OAuth account) — still 200 from server
result = runner.invoke(app, ["auth", "login", "--email", "alice@example.com"], input="\n")
assert result.exit_code == 0
assert "alice@example.com" in result.output
mock_save.assert_called_once_with("tok123", "alice@example.com")
def test_login_invalid_credentials(self):
"""Login with bad credentials exits with error."""
mock_resp = _make_response(401, {"detail": "Invalid credentials"})
with patch("cli.commands.auth.api_post", return_value=mock_resp):
result = runner.invoke(app, ["auth", "login", "--email", "bad@example.com"], input="\n")
assert result.exit_code == 1
assert "Login failed" in result.output
def test_login_connection_error(self):
"""Login propagates connection errors cleanly."""
with patch("cli.commands.auth.api_post", side_effect=Exception("Connection refused")):
result = runner.invoke(app, ["auth", "login", "--email", "alice@example.com"], input="\n")
assert result.exit_code == 1
assert "Connection error" in result.output
class TestAuthLogout:
def test_logout(self):
"""Logout clears token and confirms."""
with patch("cli.commands.auth.clear_token") as mock_clear:
result = runner.invoke(app, ["auth", "logout"])
assert result.exit_code == 0
assert "Logged out" in result.output
mock_clear.assert_called_once()
class TestAuthImportToken:
def _make_jwt(self, email="alice@example.com", typ="pat"):
import jwt as pyjwt
return pyjwt.encode(
{"email": email, "typ": typ, "sub": "u-1"},
"unused",
algorithm="HS256",
)
def _mock_verify(self, status_code=200, json_data=None):
"""Build a patcher for cli.commands.auth.httpx.Client that returns a canned response."""
resp = _make_response(status_code, json_data or {})
mock_client = MagicMock()
mock_client.__enter__.return_value = mock_client
mock_client.__exit__.return_value = False
mock_client.get.return_value = resp
return patch("cli.commands.auth.httpx.Client", return_value=mock_client)
def test_import_token_success_writes_canonical_format(self, tmp_path, monkeypatch):
"""Valid JWT + 200 from server -> canonical token.json on disk."""
monkeypatch.setenv("DA_SERVER", "http://example.test")
token = self._make_jwt(email="bob@example.com")
with self._mock_verify(200):
result = runner.invoke(app, ["auth", "import-token", "--token", token])
assert result.exit_code == 0, result.output
assert "bob@example.com" in result.output
token_file = tmp_path / "config" / "token.json"
assert token_file.exists()
data = json.loads(token_file.read_text())
# v19: token.json no longer carries a role label (auth derives admin
# from group memberships server-side).
assert data == {"access_token": token, "email": "bob@example.com"}
def test_import_token_401_does_not_overwrite_existing(self, tmp_path, monkeypatch):
"""A 401 response aborts import and leaves the prior token file untouched."""
monkeypatch.setenv("DA_SERVER", "http://example.test")
existing = {"access_token": "keep-me", "email": "old@example.com"}
token_file = tmp_path / "config" / "token.json"
token_file.write_text(json.dumps(existing))
token = self._make_jwt()
with self._mock_verify(401, {"detail": "Token revoked"}):
result = runner.invoke(app, ["auth", "import-token", "--token", token])
assert result.exit_code == 1
assert "Token rejected by server" in result.output
assert "Token revoked" in result.output
# Existing file must be intact.
assert json.loads(token_file.read_text()) == existing
def test_import_token_with_server_flag_persists_server_to_config_yaml(
self, tmp_path, monkeypatch
):
"""Passing --server should write `server: URL` to ~/.config/da/config.yaml
so the user never has to configure the server in a separate step."""
# No DA_SERVER env var — rely entirely on the --server flag for persistence.
monkeypatch.delenv("DA_SERVER", raising=False)
token = self._make_jwt(email="dave@example.com")
with self._mock_verify(200):
result = runner.invoke(
app,
[
"auth", "import-token",
"--token", token,
"--server", "https://agnes.example.com",
],
)
assert result.exit_code == 0, result.output
config_file = tmp_path / "config" / "config.yaml"
assert config_file.exists(), "config.yaml must be written when --server is passed"
import yaml
cfg = yaml.safe_load(config_file.read_text())
assert cfg.get("server") == "https://agnes.example.com"
def test_import_token_claim_fallback_via_cli_email_override(self, tmp_path, monkeypatch):
"""Missing email claim -> refuse without --email, accept with it.
v19 dropped the --role flag (token.json no longer carries role)."""
import jwt as pyjwt
monkeypatch.setenv("DA_SERVER", "http://example.test")
# JWT without email claim — simulates a malformed or minimal token.
token = pyjwt.encode({"sub": "u-1", "typ": "pat"}, "unused", algorithm="HS256")
with self._mock_verify(200):
fail_result = runner.invoke(app, ["auth", "import-token", "--token", token])
assert fail_result.exit_code == 1
assert "missing" in fail_result.output.lower()
with self._mock_verify(200):
ok_result = runner.invoke(
app,
[
"auth", "import-token",
"--token", token,
"--email", "carol@example.com",
],
)
assert ok_result.exit_code == 0, ok_result.output
token_file = tmp_path / "config" / "token.json"
data = json.loads(token_file.read_text())
assert data == {"access_token": token, "email": "carol@example.com"}
class TestAuthWhoami:
def test_whoami_no_token(self):
"""Whoami exits when no token is stored."""
with patch("cli.commands.auth.get_token", return_value=None):
result = runner.invoke(app, ["auth", "whoami"])
assert result.exit_code == 1
assert "Not logged in" in result.output
def test_whoami_valid_token(self):
"""Whoami decodes JWT and shows user info. v19: no role claim."""
import jwt as pyjwt
token = pyjwt.encode(
{"email": "alice@example.com"},
"secret",
algorithm="HS256",
)
with patch("cli.commands.auth.get_token", return_value=token):
with patch("cli.commands.auth.get_server_url", return_value="http://localhost:8000"):
result = runner.invoke(app, ["auth", "whoami"])
assert result.exit_code == 0
assert "alice@example.com" in result.output
def test_whoami_invalid_token(self):
"""Whoami with garbled token exits with error."""
with patch("cli.commands.auth.get_token", return_value="not.a.jwt"):
result = runner.invoke(app, ["auth", "whoami"])
# May succeed or fail depending on jwt decode — either way no traceback
assert result.exit_code in (0, 1)
def test_da_login_sends_password(monkeypatch):
import httpx
from typer.testing import CliRunner
from cli.commands import auth as auth_mod
captured = {}
def fake_post(path, json=None, **kwargs):
captured["path"] = path
captured["json"] = json
return httpx.Response(200, json={
"access_token": "tok", "email": "u@t", "role": "analyst",
"user_id": "u1", "token_type": "bearer",
})
monkeypatch.setattr(auth_mod, "api_post", fake_post, raising=False)
runner = CliRunner()
# Provide email and password via stdin (typer prompts)
result = runner.invoke(auth_mod.auth_app, ["login"], input="u@t\nhunter2\n")
assert result.exit_code == 0, result.output
assert captured["path"] == "/auth/token"
assert captured["json"] == {"email": "u@t", "password": "hunter2"}
def test_da_auth_token_create_calls_api(monkeypatch):
import httpx
from typer.testing import CliRunner
from cli.commands.auth import auth_app
from cli.commands import tokens as tok_mod
captured = {}
def fake_post(path, json=None, **kwargs):
captured["path"] = path
captured["json"] = json
return httpx.Response(201, json={
"id": "abc", "name": json["name"], "prefix": "XXXXXXXX",
"token": "raw-token-once",
"expires_at": None, "created_at": "2026-04-21T00:00:00+00:00",
})
monkeypatch.setattr(tok_mod, "api_post", fake_post, raising=False)
runner = CliRunner()
result = runner.invoke(auth_app, ["token", "create", "--name", "laptop", "--ttl", "30d"])
assert result.exit_code == 0, result.output
assert captured["path"] == "/auth/tokens"
assert captured["json"] == {"name": "laptop", "expires_in_days": 30}
assert "raw-token-once" in result.output