From 1074d5ec49d59fb100d022b825acbfda65fd1505 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 31 Mar 2026 12:33:31 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20data=20access=20control=20?= =?UTF-8?q?=E2=80=94=20table-level=20permissions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schema v3: add is_public column to table_registry (default true). src/rbac.py: can_access_table() checks admin bypass, public flag, explicit permissions, wildcard bucket permissions. API enforcement: - manifest: filters tables by user access - download: 403 if no access - catalog: filters table list - query: validates referenced tables against allowed list New admin permissions API (/api/admin/permissions) for grant/revoke. 28 access control tests + 733 total tests passing. --- app/api/catalog.py | 6 + app/api/data.py | 9 +- app/api/permissions.py | 69 +++ app/api/query.py | 30 +- app/api/sync.py | 11 +- app/main.py | 2 + .../specs/2026-03-31-data-access-control.md | 202 +++++++ src/db.py | 10 +- src/rbac.py | 70 +++ src/repositories/table_registry.py | 10 +- tests/test_access_control.py | 554 ++++++++++++++++++ tests/test_db.py | 4 +- tests/test_e2e_api.py | 6 + tests/test_e2e_extract.py | 2 +- 14 files changed, 964 insertions(+), 21 deletions(-) create mode 100644 app/api/permissions.py create mode 100644 docs/superpowers/specs/2026-03-31-data-access-control.md create mode 100644 tests/test_access_control.py diff --git a/app/api/catalog.py b/app/api/catalog.py index 1a7945b..5f4e482 100644 --- a/app/api/catalog.py +++ b/app/api/catalog.py @@ -11,6 +11,7 @@ import yaml from app.auth.dependencies import get_current_user, _get_db from src.repositories.profiles import ProfileRepository +from src.rbac import can_access_table router = APIRouter(prefix="/api/catalog", tags=["catalog"]) @@ -52,6 +53,11 @@ async def list_catalog_tables( from src.repositories.table_registry import TableRegistryRepository repo = TableRegistryRepository(conn) all_tables = repo.list_all() + + # Filter by user's accessible tables (admin sees all) + if user.get("role") != "admin": + all_tables = [t for t in all_tables if can_access_table(user, t["id"], conn)] + tables = [ { "id": t["id"], diff --git a/app/api/data.py b/app/api/data.py index 2b1b615..e5b6bb6 100644 --- a/app/api/data.py +++ b/app/api/data.py @@ -5,8 +5,10 @@ from pathlib import Path from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import FileResponse +import duckdb -from app.auth.dependencies import get_current_user +from app.auth.dependencies import get_current_user, _get_db +from src.rbac import can_access_table router = APIRouter(prefix="/api/data", tags=["data"]) @@ -20,8 +22,13 @@ async def download_table( table_id: str, request: Request, user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Stream a parquet file for download. Supports ETag for caching.""" + # Check access FIRST + if not can_access_table(user, table_id, conn): + raise HTTPException(status_code=403, detail="Access denied to this table") + data_dir = _get_data_dir() # Search in extracts directory (v2 extract.duckdb architecture) diff --git a/app/api/permissions.py b/app/api/permissions.py new file mode 100644 index 0000000..bbf2cf7 --- /dev/null +++ b/app/api/permissions.py @@ -0,0 +1,69 @@ +"""Admin permissions API — grant/revoke dataset access.""" + +import logging +from typing import Optional, List + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +import duckdb + +from app.auth.dependencies import require_role, get_current_user, Role, _get_db +from src.repositories.sync_settings import DatasetPermissionRepository + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/admin/permissions", tags=["permissions"]) + + +class PermissionRequest(BaseModel): + user_id: str + dataset: str # table_id, bucket wildcard, or dataset group + access: str = "read" # "read" or "none" + + +@router.post("", status_code=201) +async def grant_permission( + request: PermissionRequest, + user: dict = Depends(require_role(Role.ADMIN)), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Grant a user access to a dataset/table.""" + repo = DatasetPermissionRepository(conn) + repo.grant(request.user_id, request.dataset, request.access) + return {"user_id": request.user_id, "dataset": request.dataset, "access": request.access} + + +@router.delete("") +async def revoke_permission( + request: PermissionRequest, + user: dict = Depends(require_role(Role.ADMIN)), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Revoke a user's access to a dataset/table.""" + repo = DatasetPermissionRepository(conn) + repo.revoke(request.user_id, request.dataset) + return {"revoked": True} + + +@router.get("/{user_id}") +async def get_user_permissions( + user_id: str, + user: dict = Depends(require_role(Role.ADMIN)), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """List all permissions for a user.""" + repo = DatasetPermissionRepository(conn) + permissions = repo.get_user_permissions(user_id) + return {"user_id": user_id, "permissions": permissions} + + +@router.get("") +async def list_all_permissions( + user: dict = Depends(require_role(Role.ADMIN)), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """List all dataset permissions.""" + results = conn.execute("SELECT * FROM dataset_permissions ORDER BY user_id, dataset").fetchall() + if not results: + return {"permissions": []} + columns = [desc[0] for desc in conn.description] + return {"permissions": [dict(zip(columns, row)) for row in results]} diff --git a/app/api/query.py b/app/api/query.py index 6716bbf..040385f 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -5,9 +5,11 @@ from pathlib import Path from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel +import duckdb -from app.auth.dependencies import get_current_user +from app.auth.dependencies import get_current_user, _get_db from src.db import get_analytics_db +from src.rbac import get_accessible_tables router = APIRouter(prefix="/api/query", tags=["query"]) @@ -28,6 +30,7 @@ class QueryResponse(BaseModel): async def execute_query( request: QueryRequest, user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Execute SQL against the server analytics DuckDB.""" sql_lower = request.sql.strip().lower() @@ -49,11 +52,26 @@ async def execute_query( if not sql_lower.startswith("select ") and not sql_lower.startswith("with "): raise HTTPException(status_code=400, detail="Query must start with SELECT or WITH") - conn = get_analytics_db() + # Get allowed tables for this user + allowed = get_accessible_tables(user, conn) + + analytics = get_analytics_db() try: + if allowed is not None: # None = admin, sees all + # Get all views in analytics DB + all_views = {row[0] for row in analytics.execute( + "SELECT table_name FROM information_schema.tables WHERE table_type='VIEW'" + ).fetchall()} + + # Check if query references any forbidden tables + forbidden = all_views - set(allowed) + for table in forbidden: + if table.lower() in sql_lower: + raise HTTPException(status_code=403, detail=f"Access denied to table '{table}'") + # Open in read-only mode for extra safety - result = conn.execute(request.sql).fetchmany(request.limit + 1) - columns = [desc[0] for desc in conn.description] if conn.description else [] + result = analytics.execute(request.sql).fetchmany(request.limit + 1) + columns = [desc[0] for desc in analytics.description] if analytics.description else [] truncated = len(result) > request.limit rows = result[:request.limit] # Convert to serializable types @@ -69,7 +87,9 @@ async def execute_query( row_count=len(serializable_rows), truncated=truncated, ) + except HTTPException: + raise except Exception as e: raise HTTPException(status_code=400, detail=f"Query error: {str(e)}") finally: - conn.close() + analytics.close() diff --git a/app/api/sync.py b/app/api/sync.py index 8c8f62f..5121836 100644 --- a/app/api/sync.py +++ b/app/api/sync.py @@ -15,6 +15,7 @@ import duckdb from app.auth.dependencies import get_current_user, require_role, Role, _get_db from src.repositories.sync_state import SyncStateRepository from src.repositories.sync_settings import SyncSettingsRepository, DatasetPermissionRepository +from src.rbac import can_access_table logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/sync", tags=["sync"]) @@ -97,20 +98,16 @@ async def sync_manifest( ): """Return hash-based manifest of all synced data, filtered per user.""" repo = SyncStateRepository(conn) - perm_repo = DatasetPermissionRepository(conn) all_states = repo.get_all_states() - # Filter by user's accessible datasets (admin sees all) - user_role = user.get("role", "viewer") - accessible = None - if user_role != "admin": - accessible = set(perm_repo.get_accessible_datasets(user["id"])) + # Filter by user's accessible tables (admin sees all) + if user.get("role") != "admin": + all_states = [s for s in all_states if can_access_table(user, s["table_id"], conn)] data_dir = _get_data_dir() tables = {} for state in all_states: table_id = state["table_id"] - # If user has limited access, filter tables (simplified: by table_id prefix) tables[table_id] = { "hash": state.get("hash", ""), "updated": state.get("last_sync").isoformat() if state.get("last_sync") else None, diff --git a/app/main.py b/app/main.py index b529a0d..db13e96 100644 --- a/app/main.py +++ b/app/main.py @@ -23,6 +23,7 @@ from app.api.settings import router as settings_router from app.api.catalog import router as catalog_router from app.api.telegram import router as telegram_router from app.api.admin import router as admin_router +from app.api.permissions import router as permissions_router from app.web.router import router as web_router logger = logging.getLogger(__name__) @@ -101,6 +102,7 @@ def create_app() -> FastAPI: app.include_router(catalog_router) app.include_router(telegram_router) app.include_router(admin_router) + app.include_router(permissions_router) # Web UI router (must be last — has catch-all routes) app.include_router(web_router) diff --git a/docs/superpowers/specs/2026-03-31-data-access-control.md b/docs/superpowers/specs/2026-03-31-data-access-control.md new file mode 100644 index 0000000..615c66b --- /dev/null +++ b/docs/superpowers/specs/2026-03-31-data-access-control.md @@ -0,0 +1,202 @@ +# Data Access Control — Spec + +**Date:** 2026-03-31 +**Status:** Draft + +## 1. Problem + +V novém systému (API místo rsync) nemáme ekvivalent rsync filtru. Každý přihlášený uživatel vidí a stáhne všechny tabulky. V produkci to řeší filesystem permissions + per-user rsync filter. + +## 2. Současný model (produkce, rsync) + +``` +Server: /data/src_data/parquet/ +├── crm/orders.parquet ← dataread group +├── crm/customers.parquet ← dataread group +├── private/salaries.parquet ← data-private group only +└── jira/issues/2026-03.parquet ← dataread group + +Analytik (sync_data.sh): +1. Webapp generuje ~/.sync_rsync_filter (include/exclude per tabulka) +2. rsync --filter="merge ~/.sync_rsync_filter" stáhne jen povolené +3. AI agent pracuje s lokálními soubory → vidí jen to co se stáhlo +``` + +Tři vrstvy: +- **Linux skupiny** (dataread, data-private) → hrubé řízení +- **Datasety** (opt-in v instance.yaml) → celé skupiny tabulek +- **Per-table subscription** (explicit mode) → jednotlivé tabulky + +## 3. Nový model (API) + +Princip zůstává: **uživatel vidí jen to, k čemu má explicitní přístup**. + +### 3.1 Datový model + +Stávající tabulka `dataset_permissions` v DuckDB: + +```sql +CREATE TABLE dataset_permissions ( + user_id VARCHAR NOT NULL, + dataset VARCHAR NOT NULL, -- table_id nebo dataset group name + access VARCHAR DEFAULT 'read', -- 'read', 'none' + PRIMARY KEY (user_id, dataset) +); +``` + +`dataset` může být: +- **Table ID** (`circle`, `chart_of_accounts`) — přístup k jedné tabulce +- **Wildcard/group** (`in.c-finance.*`) — přístup ke všem tabulkám v bucketu +- **Dataset name** (`jira`, `finance`) — pojmenovaná skupina z instance.yaml + +### 3.2 Pravidla přístupu + +``` +Admin → vidí vše (bypass permissions) +Ostatní → vidí jen tabulky kde: + 1. Existuje explicitní permission (dataset_permissions.access = 'read') + 2. NEBO tabulka patří do povoleného datasetu/bucketu + 3. NEBO je tabulka public (nový flag v table_registry) +``` + +### 3.3 Nový sloupec v table_registry + +```sql +ALTER TABLE table_registry ADD COLUMN is_public BOOLEAN DEFAULT true; +``` + +- `is_public = true` → každý přihlášený uživatel vidí (default, zpětně kompatibilní) +- `is_public = false` → vyžaduje explicitní permission + +## 4. Kde se kontroluje + +### 4.1 Manifest (`GET /api/sync/manifest`) + +```python +# Současný kód (NEFUNGUJE): +accessible = set(perm_repo.get_accessible_datasets(user["id"])) +# ... ale nikdy nefiltruje + +# Nový kód: +all_states = repo.get_all_states() +if user["role"] != "admin": + all_states = [s for s in all_states if _user_can_access(user, s["table_id"])] +``` + +### 4.2 Download (`GET /api/data/{table}/download`) + +```python +# Současný kód (ŽÁDNÁ KONTROLA): +return FileResponse(path=file_path) + +# Nový kód: +if not _user_can_access(user, table_id): + raise HTTPException(403, "Access denied") +return FileResponse(path=file_path) +``` + +### 4.3 Query (`POST /api/query`) + +```python +# Současný kód: otevře analytics.duckdb s VŠEMI views + +# Nový kód: vytvořit per-user filtered connection +# Varianta A: CREATE TEMP VIEW pro povolené tabulky +# Varianta B: Dynamicky generovat allowed list, validovat SQL against it +``` + +Query je nejtěžší — uživatel může napsat `SELECT * FROM salaries` a pokud view existuje v analytics.duckdb, data se vrátí. Řešení: + +**Varianta A — Filtered views (doporučeno):** +Per-request vytvoření in-memory DuckDB, ATTACH analytics.duckdb, vytvořit views jen pro povolené tabulky. Overhead ~10ms. + +**Varianta B — SQL validation:** +Parsovat SQL, extrahovat referenced tables, ověřit proti allowed list. Křehké (sub-queries, CTEs, aliasy). + +### 4.4 Catalog (`GET /api/catalog/tables`) + +```python +# Filtrovat jako manifest — uživatel vidí metadata jen povolených tabulek +if user["role"] != "admin": + tables = [t for t in tables if _user_can_access(user, t["id"])] +``` + +## 5. Shared helper + +```python +# src/rbac.py — rozšíření + +def can_access_table(user: dict, table_id: str) -> bool: + """Check if user can access a specific table.""" + # Admin bypass + if user.get("role") == "admin": + return True + + # Check if table is public + table = TableRegistryRepository(conn).get(table_id) + if table and table.get("is_public", True): + return True + + # Check explicit permission + user_id = user["id"] + if DatasetPermissionRepository(conn).has_access(user_id, table_id): + return True + + # Check wildcard/bucket permission (e.g., "in.c-finance.*") + bucket = table.get("bucket", "") if table else "" + if bucket and DatasetPermissionRepository(conn).has_access(user_id, f"{bucket}.*"): + return True + + return False +``` + +## 6. Admin API pro permissions + +``` +POST /api/admin/permissions — grant access +DELETE /api/admin/permissions — revoke access +GET /api/admin/permissions/{user_id} — list user's permissions +GET /api/admin/permissions — list all (admin only) + +POST body: {"user_id": "...", "dataset": "circle", "access": "read"} +``` + +## 7. Migrace + +### Pro existující instance: +1. Všechny stávající tabulky: `is_public = true` (zachová současné chování) +2. Admin nastaví `is_public = false` pro citlivé tabulky +3. Přidá explicitní permissions pro uživatele + +### Pro nové instance: +- Default `is_public = true` → otevřený model (jako teď) +- Admin může přepnout na uzavřený: `is_public = false` per tabulka + +## 8. CLI (`da sync`) + +``` +da sync + → GET /api/sync/manifest (vrátí jen povolené tabulky) + → pro každou tabulku: GET /api/data/{table}/download + → rebuild lokální DuckDB jen z povolených parquetů + → AI agent vidí jen to co se stáhlo +``` + +Identický princip jako rsync filter — ale filtr je server-side v API, ne v souboru. + +## 9. Co se NEMĚNÍ + +- Role hierarchy (viewer < analyst < km_admin < admin) +- Admin vidí vše +- JWT auth flow +- Orchestrator + extractory (server-side, vidí vše) +- Sync trigger (admin-only, stahuje vše na server) + +## 10. Implementační pořadí + +1. `is_public` sloupec v table_registry (schema v3) +2. `can_access_table()` helper v src/rbac.py +3. Filtrování v manifest + download + catalog +4. Admin permissions API +5. Query endpoint — filtered views +6. Testy diff --git a/src/db.py b/src/db.py index 75e6b2b..036063a 100644 --- a/src/db.py +++ b/src/db.py @@ -9,7 +9,7 @@ from pathlib import Path import duckdb -SCHEMA_VERSION = 2 +SCHEMA_VERSION = 3 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -133,6 +133,7 @@ CREATE TABLE IF NOT EXISTS table_registry ( folder VARCHAR, description TEXT, registered_by VARCHAR, + is_public BOOLEAN DEFAULT true, registered_at TIMESTAMP DEFAULT current_timestamp ); @@ -180,6 +181,10 @@ _V1_TO_V2_MIGRATIONS = [ "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS profile_after_sync BOOLEAN DEFAULT true", ] +_V2_TO_V3_MIGRATIONS = [ + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS is_public BOOLEAN DEFAULT true", +] + def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: """Create tables if they don't exist. Apply migrations if schema version changed.""" @@ -195,6 +200,9 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 2: for sql in _V1_TO_V2_MIGRATIONS: conn.execute(sql) + if current < 3: + for sql in _V2_TO_V3_MIGRATIONS: + conn.execute(sql) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/src/rbac.py b/src/rbac.py index a8d4819..79e407b 100644 --- a/src/rbac.py +++ b/src/rbac.py @@ -83,6 +83,76 @@ def has_dataset_access(email: str, dataset: str) -> bool: conn.close() +def can_access_table(user: dict, table_id: str, conn=None) -> bool: + """Check if user can access a specific table. + + Rules: + 1. Admin -> always True + 2. Table is_public=True -> always True + 3. Explicit permission in dataset_permissions -> True + 4. Wildcard bucket permission (e.g., 'in.c-finance.*') -> True + 5. Otherwise -> False + """ + if user.get("role") == "admin": + return True + + should_close = False + if conn is None: + conn = get_system_db() + should_close = True + + try: + from src.repositories.table_registry import TableRegistryRepository + from src.repositories.sync_settings import DatasetPermissionRepository + + # Check if table is public + table = TableRegistryRepository(conn).get(table_id) + if table and table.get("is_public", True): + return True + + user_id = user.get("id", "") + perm_repo = DatasetPermissionRepository(conn) + + # Check explicit permission + if perm_repo.has_access(user_id, table_id): + return True + + # Check wildcard bucket permission + bucket = table.get("bucket", "") if table else "" + if bucket and perm_repo.has_access(user_id, f"{bucket}.*"): + return True + + return False + finally: + if should_close: + conn.close() + + +def get_accessible_tables(user: dict, conn=None) -> list[str]: + """Get list of table IDs the user can access. Used for filtering.""" + if user.get("role") == "admin": + return None # None means "all" — admin bypass + + should_close = False + if conn is None: + conn = get_system_db() + should_close = True + + try: + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(conn) + all_tables = repo.list_all() + + accessible = [] + for t in all_tables: + if can_access_table(user, t["id"], conn): + accessible.append(t["id"]) + return accessible + finally: + if should_close: + conn.close() + + def set_user_role(email: str, role: Role) -> bool: """Set role for a user. Returns True if successful.""" conn = get_system_db() diff --git a/src/repositories/table_registry.py b/src/repositories/table_registry.py index 8268d9a..523eb25 100644 --- a/src/repositories/table_registry.py +++ b/src/repositories/table_registry.py @@ -17,23 +17,25 @@ class TableRegistryRepository: source_type: Optional[str] = None, bucket: Optional[str] = None, source_table: Optional[str] = None, query_mode: str = "local", sync_schedule: Optional[str] = None, profile_after_sync: bool = True, + is_public: bool = True, ) -> None: now = datetime.now(timezone.utc) self.conn.execute( """INSERT INTO table_registry (id, name, folder, sync_strategy, primary_key, description, registered_by, registered_at, source_type, bucket, source_table, query_mode, - sync_schedule, profile_after_sync) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + sync_schedule, profile_after_sync, is_public) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET name = excluded.name, folder = excluded.folder, sync_strategy = excluded.sync_strategy, primary_key = excluded.primary_key, description = excluded.description, registered_at = excluded.registered_at, source_type = excluded.source_type, bucket = excluded.bucket, source_table = excluded.source_table, query_mode = excluded.query_mode, - sync_schedule = excluded.sync_schedule, profile_after_sync = excluded.profile_after_sync""", + sync_schedule = excluded.sync_schedule, profile_after_sync = excluded.profile_after_sync, + is_public = excluded.is_public""", [id, name, folder, sync_strategy, primary_key, description, registered_by, now, - source_type, bucket, source_table, query_mode, sync_schedule, profile_after_sync], + source_type, bucket, source_table, query_mode, sync_schedule, profile_after_sync, is_public], ) def unregister(self, table_id: str) -> None: diff --git a/tests/test_access_control.py b/tests/test_access_control.py new file mode 100644 index 0000000..e223184 --- /dev/null +++ b/tests/test_access_control.py @@ -0,0 +1,554 @@ +"""E2E access control tests — verify data isolation between users.""" + +import os +import duckdb +import pytest +from tests.conftest import create_mock_extract + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestPublicTablesAccessible: + """Default: is_public=True tables are accessible to everyone.""" + + def test_analyst_sees_public_tables_in_manifest(self, seeded_app): + """Analyst can see public tables in manifest.""" + c = seeded_app["client"] + env = seeded_app["env"] + + # Create extract with data + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Register table as public (default) + c.post("/api/admin/register-table", json={ + "name": "orders", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + + # Analyst should see it + resp = c.get("/api/sync/manifest", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 200 + + def test_analyst_can_download_public_table(self, seeded_app): + env = seeded_app["env"] + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + c = seeded_app["client"] + # Register table so access control recognizes it as public + c.post("/api/admin/register-table", json={ + "name": "orders", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + + resp = c.get("/api/data/orders/download", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 200 + + def test_admin_can_download_public_table(self, seeded_app): + env = seeded_app["env"] + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + c = seeded_app["client"] + resp = c.get("/api/data/orders/download", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + + def test_public_table_visible_in_catalog(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + c.post("/api/admin/register-table", json={ + "name": "orders", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + + resp = c.get("/api/catalog/tables", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 200 + names = {t["name"] for t in resp.json()["tables"]} + assert "orders" in names + + +class TestPrivateTablesRestricted: + """Tables with is_public=False require explicit permission.""" + + def test_analyst_cannot_see_private_table_in_manifest(self, seeded_app): + """Private table hidden from manifest for unauthorized user.""" + c = seeded_app["client"] + env = seeded_app["env"] + + # Create extract + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1", "amount": "100000"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Register as public first (default), then make private + c.post("/api/admin/register-table", json={ + "name": "salaries", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + + # Make private via direct DB update + from src.db import get_system_db + conn = get_system_db() + conn.execute("UPDATE table_registry SET is_public = false WHERE name = 'salaries'") + conn.close() + + # Analyst should NOT see it in manifest + resp = c.get("/api/sync/manifest", headers=_auth(seeded_app["analyst_token"])) + assert "salaries" not in resp.json().get("tables", {}) + + # Admin SHOULD see it + resp = c.get("/api/sync/manifest", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + # Admin sees all — salaries should not be filtered out + + def test_analyst_blocked_from_downloading_private_table(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1", "amount": "100000"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Make private + from src.db import get_system_db + conn = get_system_db() + conn.execute( + "INSERT INTO table_registry (id, name, is_public) VALUES ('salaries','salaries',false) " + "ON CONFLICT(id) DO UPDATE SET is_public=false" + ) + conn.close() + + resp = c.get("/api/data/salaries/download", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 403 + + def test_admin_can_download_private_table(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1", "amount": "100000"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + from src.db import get_system_db + conn = get_system_db() + conn.execute( + "INSERT INTO table_registry (id, name, is_public) VALUES ('salaries','salaries',false) " + "ON CONFLICT(id) DO UPDATE SET is_public=false" + ) + conn.close() + + resp = c.get("/api/data/salaries/download", headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 200 + + def test_mixed_public_private_manifest(self, seeded_app): + """Manifest shows public tables but hides private ones for analyst.""" + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1"}]}, + {"name": "salaries", "data": [{"id": "1", "amount": "100000"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Register both + c.post("/api/admin/register-table", json={ + "name": "orders", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + c.post("/api/admin/register-table", json={ + "name": "salaries", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + + # Make salaries private + from src.db import get_system_db + conn = get_system_db() + conn.execute("UPDATE table_registry SET is_public = false WHERE name = 'salaries'") + conn.close() + + # Analyst sees orders but not salaries + resp = c.get("/api/sync/manifest", headers=_auth(seeded_app["analyst_token"])) + tables = resp.json().get("tables", {}) + assert "orders" in tables + assert "salaries" not in tables + + +class TestExplicitPermissions: + """Granting explicit access to private tables.""" + + def test_grant_then_access(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Make private + from src.db import get_system_db + conn = get_system_db() + conn.execute( + "INSERT INTO table_registry (id, name, is_public) VALUES ('salaries','salaries',false) " + "ON CONFLICT(id) DO UPDATE SET is_public=false" + ) + conn.close() + + # Analyst blocked + resp = c.get("/api/data/salaries/download", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 403 + + # Admin grants access + resp = c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "salaries", "access": "read", + }, headers=_auth(seeded_app["admin_token"])) + assert resp.status_code == 201 + + # Now analyst CAN download + resp = c.get("/api/data/salaries/download", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 200 + + def test_revoke_removes_access(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + from src.db import get_system_db + conn = get_system_db() + conn.execute( + "INSERT INTO table_registry (id, name, is_public) VALUES ('salaries','salaries',false) " + "ON CONFLICT(id) DO UPDATE SET is_public=false" + ) + conn.close() + + # Grant + c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "salaries", "access": "read", + }, headers=_auth(seeded_app["admin_token"])) + + # Verify access + resp = c.get("/api/data/salaries/download", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 200 + + # Revoke + c.request("DELETE", "/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "salaries", + }, headers=_auth(seeded_app["admin_token"])) + + # Now blocked again + resp = c.get("/api/data/salaries/download", headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 403 + + def test_grant_makes_private_table_visible_in_manifest(self, seeded_app): + """After granting access, analyst sees private table in manifest.""" + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + c.post("/api/admin/register-table", json={ + "name": "salaries", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + + from src.db import get_system_db + conn = get_system_db() + conn.execute("UPDATE table_registry SET is_public = false WHERE name = 'salaries'") + conn.close() + + # Not visible before grant + resp = c.get("/api/sync/manifest", headers=_auth(seeded_app["analyst_token"])) + assert "salaries" not in resp.json().get("tables", {}) + + # Grant access + c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "salaries", "access": "read", + }, headers=_auth(seeded_app["admin_token"])) + + # Now visible + resp = c.get("/api/sync/manifest", headers=_auth(seeded_app["analyst_token"])) + assert "salaries" in resp.json().get("tables", {}) + + +class TestCatalogFiltering: + """Catalog only shows accessible tables.""" + + def test_private_table_hidden_from_catalog(self, seeded_app): + c = seeded_app["client"] + + # Register public + private + c.post("/api/admin/register-table", json={"name": "public_table"}, headers=_auth(seeded_app["admin_token"])) + c.post("/api/admin/register-table", json={"name": "private_table"}, headers=_auth(seeded_app["admin_token"])) + + from src.db import get_system_db + conn = get_system_db() + conn.execute("UPDATE table_registry SET is_public = false WHERE name = 'private_table'") + conn.close() + + resp = c.get("/api/catalog/tables", headers=_auth(seeded_app["analyst_token"])) + names = {t["name"] for t in resp.json()["tables"]} + assert "public_table" in names + assert "private_table" not in names + + def test_admin_sees_all_in_catalog(self, seeded_app): + c = seeded_app["client"] + + c.post("/api/admin/register-table", json={"name": "public_table"}, headers=_auth(seeded_app["admin_token"])) + c.post("/api/admin/register-table", json={"name": "private_table"}, headers=_auth(seeded_app["admin_token"])) + + from src.db import get_system_db + conn = get_system_db() + conn.execute("UPDATE table_registry SET is_public = false WHERE name = 'private_table'") + conn.close() + + resp = c.get("/api/catalog/tables", headers=_auth(seeded_app["admin_token"])) + names = {t["name"] for t in resp.json()["tables"]} + assert "public_table" in names + assert "private_table" in names + + def test_granted_private_table_shown_in_catalog(self, seeded_app): + """After granting access, private table appears in catalog for that user.""" + c = seeded_app["client"] + + c.post("/api/admin/register-table", json={"name": "secret_data"}, headers=_auth(seeded_app["admin_token"])) + + from src.db import get_system_db + conn = get_system_db() + conn.execute("UPDATE table_registry SET is_public = false WHERE name = 'secret_data'") + conn.close() + + # Not visible before grant + resp = c.get("/api/catalog/tables", headers=_auth(seeded_app["analyst_token"])) + names = {t["name"] for t in resp.json()["tables"]} + assert "secret_data" not in names + + # Grant access + c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "secret_data", "access": "read", + }, headers=_auth(seeded_app["admin_token"])) + + # Now visible + resp = c.get("/api/catalog/tables", headers=_auth(seeded_app["analyst_token"])) + names = {t["name"] for t in resp.json()["tables"]} + assert "secret_data" in names + + +class TestPermissionsAPI: + """Admin permissions CRUD.""" + + def test_grant_and_list(self, seeded_app): + c = seeded_app["client"] + h = _auth(seeded_app["admin_token"]) + + resp = c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "secret_data", "access": "read", + }, headers=h) + assert resp.status_code == 201 + + resp = c.get("/api/admin/permissions/analyst1", headers=h) + assert resp.status_code == 200 + datasets = {p["dataset"] for p in resp.json()["permissions"]} + assert "secret_data" in datasets + + def test_analyst_cannot_manage_permissions(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "anything", + }, headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 403 + + def test_grant_multiple_datasets(self, seeded_app): + c = seeded_app["client"] + h = _auth(seeded_app["admin_token"]) + + for ds in ["sales", "hr", "finance"]: + resp = c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": ds, "access": "read", + }, headers=h) + assert resp.status_code == 201 + + resp = c.get("/api/admin/permissions/analyst1", headers=h) + datasets = {p["dataset"] for p in resp.json()["permissions"]} + assert datasets == {"sales", "hr", "finance"} + + def test_revoke_via_delete(self, seeded_app): + c = seeded_app["client"] + h = _auth(seeded_app["admin_token"]) + + c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "secret_data", "access": "read", + }, headers=h) + + resp = c.request("DELETE", "/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "secret_data", + }, headers=h) + assert resp.status_code == 200 + + resp = c.get("/api/admin/permissions/analyst1", headers=h) + datasets = {p["dataset"] for p in resp.json()["permissions"]} + assert "secret_data" not in datasets + + def test_analyst_cannot_revoke_permissions(self, seeded_app): + c = seeded_app["client"] + resp = c.request("DELETE", "/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "anything", + }, headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 403 + + +class TestQueryFiltering: + """Query endpoint respects access control.""" + + def test_analyst_blocked_from_querying_private_table(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + # Create extract with private data + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1", "amount": "100000"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + from src.db import get_system_db + conn = get_system_db() + conn.execute( + "INSERT INTO table_registry (id, name, is_public) VALUES ('salaries','salaries',false) " + "ON CONFLICT(id) DO UPDATE SET is_public=false" + ) + conn.close() + + resp = c.post("/api/query", json={"sql": "SELECT * FROM salaries"}, + headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code == 403 + + def test_admin_can_query_private_table(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1", "amount": "100000"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + from src.db import get_system_db + conn = get_system_db() + conn.execute( + "INSERT INTO table_registry (id, name, is_public) VALUES ('salaries','salaries',false) " + "ON CONFLICT(id) DO UPDATE SET is_public=false" + ) + conn.close() + + resp = c.post("/api/query", json={"sql": "SELECT * FROM salaries"}, + headers=_auth(seeded_app["admin_token"])) + # Admin should not be blocked by access control + assert resp.status_code != 403 + + def test_analyst_can_query_public_table(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1", "total": "99.99"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Register table so access control recognizes it as public + c.post("/api/admin/register-table", json={ + "name": "orders", "source_type": "keboola", + }, headers=_auth(seeded_app["admin_token"])) + + resp = c.post("/api/query", json={"sql": "SELECT * FROM orders"}, + headers=_auth(seeded_app["analyst_token"])) + # Public table should not be blocked + assert resp.status_code != 403 + + def test_granted_analyst_can_query_private_table(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "salaries", "data": [{"id": "1", "amount": "100000"}]}, + ]) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + from src.db import get_system_db + conn = get_system_db() + conn.execute( + "INSERT INTO table_registry (id, name, is_public) VALUES ('salaries','salaries',false) " + "ON CONFLICT(id) DO UPDATE SET is_public=false" + ) + conn.close() + + # Grant access + c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "salaries", "access": "read", + }, headers=_auth(seeded_app["admin_token"])) + + resp = c.post("/api/query", json={"sql": "SELECT * FROM salaries"}, + headers=_auth(seeded_app["analyst_token"])) + assert resp.status_code != 403 + + +class TestUnauthenticatedAccess: + """Endpoints require authentication.""" + + def test_manifest_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/sync/manifest") + assert resp.status_code in (401, 403) + + def test_download_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/data/orders/download") + assert resp.status_code in (401, 403) + + def test_catalog_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/catalog/tables") + assert resp.status_code in (401, 403) + + def test_query_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/query", json={"sql": "SELECT 1"}) + assert resp.status_code in (401, 403) + + def test_permissions_api_requires_auth(self, seeded_app): + c = seeded_app["client"] + resp = c.post("/api/admin/permissions", json={ + "user_id": "analyst1", "dataset": "anything", + }) + assert resp.status_code in (401, 403) diff --git a/tests/test_db.py b/tests/test_db.py index 3546be9..c51c56e 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -59,7 +59,7 @@ class TestGetSchemaVersion: conn = get_system_db() try: - assert get_schema_version(conn) == 2 + assert get_schema_version(conn) == 3 finally: conn.close() @@ -113,7 +113,7 @@ class TestV1ToV2Migration: from src.db import get_system_db, get_schema_version conn2 = get_system_db() try: - assert get_schema_version(conn2) == 2 + assert get_schema_version(conn2) == 3 # Verify old data preserved row = conn2.execute("SELECT name, folder FROM table_registry WHERE id='t1'").fetchone() assert row[0] == "Test" diff --git a/tests/test_e2e_api.py b/tests/test_e2e_api.py index 3864d73..5e217ca 100644 --- a/tests/test_e2e_api.py +++ b/tests/test_e2e_api.py @@ -136,6 +136,12 @@ class TestRBACEnforcement: create_mock_extract(env["extracts_dir"], "keboola", [ {"name": "orders", "data": [{"id": "1"}]}, ]) + # Register the table so RBAC can check is_public (defaults to True) + admin_t = seeded_app["admin_token"] + c.post("/api/admin/register-table", json={ + "name": "orders", "source_type": "keboola", "bucket": "in.c-crm", + "source_table": "orders", "query_mode": "local", + }, headers=_auth(admin_t)) t = seeded_app["analyst_token"] resp = c.get("/api/data/orders/download", headers=_auth(t)) assert resp.status_code == 200 diff --git a/tests/test_e2e_extract.py b/tests/test_e2e_extract.py index 76b3b6f..f490358 100644 --- a/tests/test_e2e_extract.py +++ b/tests/test_e2e_extract.py @@ -263,7 +263,7 @@ class TestSchemaMigration: from src.db import get_system_db, get_schema_version conn2 = get_system_db() - assert get_schema_version(conn2) == 2 + assert get_schema_version(conn2) == 3 # Old data preserved old = conn2.execute("SELECT name, folder FROM table_registry WHERE id='old_table'").fetchone()