fix(claude_md): RBAC-filter tables; align today with now (UTC)

- _list_tables now accepts a user param and delegates to
  get_accessible_tables: admins see all, non-admins see only tables
  covered by their resource_grants. Fixes silent leak of table names
  to unauthorised analysts.
- today derived from now.date() (UTC) instead of date.today()
  (server-local TZ), so today and now are always consistent.
- Updated test_render_override_tables_list to seed an admin user so
  RBAC filtering doesn't hide the table; added three new tests covering
  per-user table isolation, admin sees-all, and no-grants-empty.
This commit is contained in:
ZdenekSrotyr 2026-05-04 05:57:22 +02:00
parent a2157ee807
commit 93fdea3461
2 changed files with 130 additions and 10 deletions

View file

@ -18,7 +18,7 @@ See also: surfaced as the "Agent Workspace Prompt" admin editor at
from __future__ import annotations
import logging
from datetime import date, datetime, timezone
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
@ -53,13 +53,28 @@ def _load_default_template() -> str:
)
def _list_tables(conn: duckdb.DuckDBPyConnection) -> list[dict[str, Any]]:
def _list_tables(conn: duckdb.DuckDBPyConnection, *, user: dict) -> list[dict[str, Any]]:
"""Return registered tables filtered by the calling user's RBAC grants.
For admins, returns all tables. For non-admins, returns only tables the
user has explicit ``resource_grants(resource_type='table')`` access to.
"""
from src.rbac import get_accessible_tables
try:
rows = conn.execute(
"""SELECT name, description, query_mode
FROM table_registry
ORDER BY name"""
).fetchall()
allowed_ids = get_accessible_tables(user, conn) # None=admin, list=non-admin
if allowed_ids is None:
rows = conn.execute(
"SELECT name, description, query_mode FROM table_registry ORDER BY name"
).fetchall()
elif not allowed_ids:
return []
else:
placeholders = ",".join(["?"] * len(allowed_ids))
rows = conn.execute(
f"SELECT name, description, query_mode FROM table_registry "
f"WHERE id IN ({placeholders}) ORDER BY name",
allowed_ids,
).fetchall()
except duckdb.CatalogException:
return []
return [
@ -148,7 +163,7 @@ def build_claude_md_context(
},
"sync_interval": get_sync_interval(),
"data_source": {"type": get_data_source_type()},
"tables": _list_tables(conn),
"tables": _list_tables(conn, user=user),
"metrics": _metrics_summary(conn),
"marketplaces": _marketplaces_for_user(conn, user),
"user": {
@ -159,7 +174,7 @@ def build_claude_md_context(
"groups": user.get("groups") or [],
},
"now": now,
"today": date.today().isoformat(),
"today": now.date().isoformat(),
}

View file

@ -73,11 +73,17 @@ def test_render_uses_override_when_set(conn):
def test_render_override_tables_list(conn):
# Seed a table registry entry
# Seed a table registry entry and ensure the test user is an admin so
# RBAC filtering does not hide the table.
conn.execute(
"INSERT INTO table_registry (id, name, description, query_mode, source_type) "
"VALUES ('t1', 'orders', 'All orders', 'local', 'keboola')"
)
from src.repositories.users import UserRepository
from src.repositories.user_group_members import UserGroupMembersRepository
UserRepository(conn).create(id="u1", email="alice@example.com", name="Alice")
admin_gid = conn.execute("SELECT id FROM user_groups WHERE name='Admin'").fetchone()[0]
UserGroupMembersRepository(conn).add_member("u1", admin_gid, source="admin")
ClaudeMdTemplateRepository(conn).set(
"{% for t in tables %}- {{ t.name }}: {{ t.description }}{% endfor %}",
updated_by="admin@example.com",
@ -167,3 +173,102 @@ def test_render_raises_on_template_error(conn):
)
with pytest.raises(TemplateError):
render_claude_md(conn, user=_user(), server_url="https://example.com")
# ---------------------------------------------------------------------------
# RBAC-filtered tables — two users with different grants see different tables
# ---------------------------------------------------------------------------
def _make_user(conn, *, user_id: str, email: str) -> None:
from src.repositories.users import UserRepository
UserRepository(conn).create(id=user_id, email=email, name=email.split("@")[0])
def _make_group(conn, *, name: str) -> str:
from src.repositories.user_groups import UserGroupsRepository
return UserGroupsRepository(conn).create(name=name)["id"]
def _add_member(conn, *, user_id: str, group_id: str) -> None:
from src.repositories.user_group_members import UserGroupMembersRepository
UserGroupMembersRepository(conn).add_member(user_id, group_id, source="admin")
def _grant_table(conn, *, group_id: str, table_id: str) -> None:
from src.repositories.resource_grants import ResourceGrantsRepository
ResourceGrantsRepository(conn).create(
group_id=group_id, resource_type="table", resource_id=table_id
)
def test_render_tables_filtered_by_rbac(conn):
"""Non-admin users see only tables granted to their groups."""
# Seed two tables
conn.execute(
"INSERT INTO table_registry (id, name, description, query_mode, source_type) "
"VALUES ('t-a', 'orders', 'Order data', 'local', 'keboola')"
)
conn.execute(
"INSERT INTO table_registry (id, name, description, query_mode, source_type) "
"VALUES ('t-b', 'revenue', 'Revenue data', 'local', 'keboola')"
)
# Two users, two groups
_make_user(conn, user_id="ua", email="alice@example.com")
_make_user(conn, user_id="ub", email="bob@example.com")
gid_a = _make_group(conn, name="group-a")
gid_b = _make_group(conn, name="group-b")
_add_member(conn, user_id="ua", group_id=gid_a)
_add_member(conn, user_id="ub", group_id=gid_b)
# Grant: group-a → t-a, group-b → t-b
_grant_table(conn, group_id=gid_a, table_id="t-a")
_grant_table(conn, group_id=gid_b, table_id="t-b")
user_a = {"id": "ua", "email": "alice@example.com", "name": "Alice", "is_admin": False, "groups": []}
user_b = {"id": "ub", "email": "bob@example.com", "name": "Bob", "is_admin": False, "groups": []}
ctx_a = build_claude_md_context(conn, user=user_a, server_url="https://example.com")
table_names_a = {t["name"] for t in ctx_a["tables"]}
assert "orders" in table_names_a
assert "revenue" not in table_names_a
ctx_b = build_claude_md_context(conn, user=user_b, server_url="https://example.com")
table_names_b = {t["name"] for t in ctx_b["tables"]}
assert "revenue" in table_names_b
assert "orders" not in table_names_b
def test_render_tables_admin_sees_all(conn):
"""Admin users see all tables regardless of grants."""
conn.execute(
"INSERT INTO table_registry (id, name, description, query_mode, source_type) "
"VALUES ('t-x', 'alpha', 'Alpha table', 'local', 'keboola')"
)
conn.execute(
"INSERT INTO table_registry (id, name, description, query_mode, source_type) "
"VALUES ('t-y', 'beta', 'Beta table', 'local', 'keboola')"
)
# Admin user: member of the Admin system group
_make_user(conn, user_id="u-admin", email="admin@example.com")
admin_gid = conn.execute("SELECT id FROM user_groups WHERE name='Admin'").fetchone()[0]
_add_member(conn, user_id="u-admin", group_id=admin_gid)
user_admin = {"id": "u-admin", "email": "admin@example.com", "name": "Admin", "is_admin": True, "groups": []}
ctx = build_claude_md_context(conn, user=user_admin, server_url="https://example.com")
table_names = {t["name"] for t in ctx["tables"]}
assert "alpha" in table_names
assert "beta" in table_names
def test_render_tables_empty_for_user_with_no_grants(conn):
"""Non-admin with no grants sees no tables."""
conn.execute(
"INSERT INTO table_registry (id, name, description, query_mode, source_type) "
"VALUES ('t-z', 'secret', 'Secret table', 'local', 'keboola')"
)
_make_user(conn, user_id="u-none", email="none@example.com")
user_none = {"id": "u-none", "email": "none@example.com", "name": "None", "is_admin": False, "groups": []}
ctx = build_claude_md_context(conn, user=user_none, server_url="https://example.com")
assert ctx["tables"] == []