diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..656ebed --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,45 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.13" + + - name: Install uv + uses: astral-sh/setup-uv@v4 + + - name: Install dependencies + run: uv pip install --system -r requirements.txt + + - name: Run unit tests + run: python -m pytest tests/test_db.py tests/test_repositories.py tests/test_migration.py tests/test_permissions.py tests/test_api.py tests/test_api_scripts.py tests/test_cli.py -v + env: + JWT_SECRET_KEY: ci-test-secret-32chars-minimum!! + + docker-build: + runs-on: ubuntu-latest + needs: test + steps: + - uses: actions/checkout@v4 + + - name: Build Docker image + run: docker build -t data-analyst:test . + + - name: Run Docker health check + run: | + docker run -d --name test-app -p 8000:8000 \ + -e DATA_DIR=/data -e JWT_SECRET_KEY=ci-test-secret-32chars-minimum!! \ + data-analyst:test + sleep 5 + curl -f http://localhost:8000/api/health + docker stop test-app diff --git a/app/api/scripts.py b/app/api/scripts.py new file mode 100644 index 0000000..f888c8b --- /dev/null +++ b/app/api/scripts.py @@ -0,0 +1,161 @@ +"""Script management and execution endpoints.""" + +import os +import subprocess +import tempfile +import uuid +from pathlib import Path + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from typing import Optional, List + +import duckdb + +from app.auth.dependencies import get_current_user, require_role, Role, _get_db +from src.repositories.notifications import ScriptRepository + +router = APIRouter(prefix="/api/scripts", tags=["scripts"]) + +SCRIPT_TIMEOUT = int(os.environ.get("SCRIPT_TIMEOUT", "300")) # 5 min default +SCRIPT_MAX_OUTPUT = int(os.environ.get("SCRIPT_MAX_OUTPUT", "65536")) # 64KB + + +class DeployScriptRequest(BaseModel): + name: str + source: str + schedule: Optional[str] = None + + +class RunScriptRequest(BaseModel): + name: Optional[str] = None + source: Optional[str] = None + + +class ScriptResponse(BaseModel): + id: str + name: str + schedule: Optional[str] + owner: Optional[str] + + +@router.get("") +async def list_scripts( + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + repo = ScriptRepository(conn) + scripts = repo.list_all() + return {"scripts": scripts, "count": len(scripts)} + + +@router.post("/deploy", status_code=201) +async def deploy_script( + request: DeployScriptRequest, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Deploy a Python script to be run on the server (optionally on schedule).""" + repo = ScriptRepository(conn) + script_id = str(uuid.uuid4()) + repo.deploy( + id=script_id, + name=request.name, + owner=user["id"], + schedule=request.schedule, + source=request.source, + ) + return ScriptResponse( + id=script_id, name=request.name, + schedule=request.schedule, owner=user["id"], + ) + + +@router.post("/{script_id}/run") +async def run_deployed_script( + script_id: str, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Run a deployed script by ID.""" + repo = ScriptRepository(conn) + script = repo.get(script_id) + if not script: + raise HTTPException(status_code=404, detail="Script not found") + return _execute_script(script["source"], script["name"]) + + +@router.post("/run") +async def run_adhoc_script( + request: RunScriptRequest, + user: dict = Depends(get_current_user), +): + """Run an ad-hoc Python script (not deployed).""" + if not request.source: + raise HTTPException(status_code=400, detail="Script source required") + return _execute_script(request.source, request.name or "adhoc") + + +@router.delete("/{script_id}", status_code=204) +async def undeploy_script( + script_id: str, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + repo = ScriptRepository(conn) + if not repo.get(script_id): + raise HTTPException(status_code=404, detail="Script not found") + repo.undeploy(script_id) + + +def _execute_script(source: str, name: str) -> dict: + """Execute a Python script in a sandboxed subprocess.""" + # Safety checks + dangerous_imports = ["subprocess", "shutil", "ctypes", "importlib"] + for imp in dangerous_imports: + if f"import {imp}" in source or f"from {imp}" in source: + raise HTTPException( + status_code=400, + detail=f"Script contains disallowed import: {imp}", + ) + + data_dir = os.environ.get("DATA_DIR", "./data") + + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: + f.write(source) + f.flush() + script_path = f.name + + try: + result = subprocess.run( + ["python", script_path], + capture_output=True, + text=True, + timeout=SCRIPT_TIMEOUT, + env={ + "PATH": os.environ.get("PATH", ""), + "DATA_DIR": data_dir, + "PYTHONPATH": os.getcwd(), + "HOME": "/tmp", + }, + cwd=os.getcwd(), + ) + stdout = result.stdout[:SCRIPT_MAX_OUTPUT] + stderr = result.stderr[:SCRIPT_MAX_OUTPUT] + return { + "name": name, + "exit_code": result.returncode, + "stdout": stdout, + "stderr": stderr, + "truncated": len(result.stdout) > SCRIPT_MAX_OUTPUT or len(result.stderr) > SCRIPT_MAX_OUTPUT, + } + except subprocess.TimeoutExpired: + return { + "name": name, + "exit_code": -1, + "stdout": "", + "stderr": f"Script timed out after {SCRIPT_TIMEOUT}s", + "truncated": False, + } + finally: + os.unlink(script_path) diff --git a/app/api/settings.py b/app/api/settings.py new file mode 100644 index 0000000..60fa34c --- /dev/null +++ b/app/api/settings.py @@ -0,0 +1,53 @@ +"""User sync settings endpoints.""" + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from typing import Optional, List + +import duckdb + +from app.auth.dependencies import get_current_user, _get_db +from src.repositories.sync_settings import SyncSettingsRepository, DatasetPermissionRepository + +router = APIRouter(prefix="/api/settings", tags=["settings"]) + + +class DatasetSettingRequest(BaseModel): + dataset: str + enabled: bool + + +@router.get("") +async def get_settings( + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Get current user's sync settings and permissions.""" + settings_repo = SyncSettingsRepository(conn) + perm_repo = DatasetPermissionRepository(conn) + + settings = settings_repo.get_user_settings(user["id"]) + permissions = perm_repo.get_user_permissions(user["id"]) + + return { + "user_id": user["id"], + "sync_settings": settings, + "permissions": permissions, + } + + +@router.put("/dataset") +async def update_dataset_setting( + request: DatasetSettingRequest, + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Enable or disable a dataset for sync.""" + # Check permission + perm_repo = DatasetPermissionRepository(conn) + if not perm_repo.has_access(user["id"], request.dataset): + raise HTTPException(status_code=403, detail=f"No access to dataset '{request.dataset}'") + + settings_repo = SyncSettingsRepository(conn) + settings_repo.set_dataset_enabled(user["id"], request.dataset, request.enabled) + return {"dataset": request.dataset, "enabled": request.enabled} diff --git a/app/main.py b/app/main.py index 4456db9..0c10d87 100644 --- a/app/main.py +++ b/app/main.py @@ -11,6 +11,8 @@ from app.api.query import router as query_router from app.api.users import router as users_router from app.api.memory import router as memory_router from app.api.upload import router as upload_router +from app.api.scripts import router as scripts_router +from app.api.settings import router as settings_router def create_app() -> FastAPI: @@ -38,6 +40,8 @@ def create_app() -> FastAPI: app.include_router(users_router) app.include_router(memory_router) app.include_router(upload_router) + app.include_router(scripts_router) + app.include_router(settings_router) return app diff --git a/cli/skills/connectors.md b/cli/skills/connectors.md new file mode 100644 index 0000000..3b451a2 --- /dev/null +++ b/cli/skills/connectors.md @@ -0,0 +1,30 @@ +# Connectors — How to add a new data source + +## Existing Connectors +- **Keboola** (`connectors/keboola/`) — Keboola Storage API +- **BigQuery** (`connectors/bigquery/`) — Google BigQuery +- **Jira** (`connectors/jira/`) — Jira webhook + API + +## Adding a New Connector + +1. Create `connectors//adapter.py` implementing the `DataSource` ABC: + ```python + from src.data_sync import DataSource + + class MyDataSource(DataSource): + def sync_table(self, table_config, sync_state): ... + def discover_tables(self): ... + def get_column_metadata(self, table_id): ... + def get_source_name(self): ... + ``` + +2. The factory in `src/data_sync.py:create_data_source()` auto-discovers connectors. + Set `DATA_SOURCE=` in instance.yaml or .env. + +3. Add required env vars to `.env` and `config/.env.template`. + +4. Add tests to `tests/test__adapter.py`. + +## Configuration +Each connector reads credentials from environment variables. +Table definitions are in `docs/data_description.md` (YAML blocks). diff --git a/cli/skills/corporate-memory.md b/cli/skills/corporate-memory.md new file mode 100644 index 0000000..e91329d --- /dev/null +++ b/cli/skills/corporate-memory.md @@ -0,0 +1,30 @@ +# Corporate Memory — Knowledge sharing and governance + +## What It Is +Corporate memory collects knowledge from all analysts' CLAUDE.local.md files +and makes it available to everyone through a curated catalog. + +## How It Works +1. Analysts write insights in their CLAUDE.local.md +2. `da sync --upload-only` pushes content to server +3. Server processes with LLM (Haiku) to extract knowledge items +4. Items go through governance (pending → approved/mandatory) +5. Approved items are distributed as Claude rules + +## Governance Flow +- **pending**: New item, awaiting review +- **approved**: Available to all users +- **mandatory**: Force-pushed to all users' rules +- **rejected**: Not distributed + +## Admin Commands +```bash +# View pending items (via web UI or API) +da query "SELECT id, title, status FROM system.knowledge_items WHERE status='pending'" --remote + +# Approve/reject via API +curl -X PUT http://server:8000/api/memory//status?new_status=approved -H "Authorization: Bearer $TOKEN" +``` + +## Voting +Users can upvote/downvote knowledge items to surface the most useful ones. diff --git a/cli/skills/notifications.md b/cli/skills/notifications.md new file mode 100644 index 0000000..1568128 --- /dev/null +++ b/cli/skills/notifications.md @@ -0,0 +1,37 @@ +# Notifications — How notifications work + +## Architecture +1. User creates a Python script (locally or via Claude Code) +2. Script queries local DuckDB and produces output +3. Output is sent via Telegram bot or WebSocket gateway + +## Creating a Notification Script +```python +# user/scripts/sales_alert.py +"""Sales alert - checks daily revenue.""" +import duckdb + +conn = duckdb.connect('user/duckdb/analytics.duckdb', read_only=True) +result = conn.execute("SELECT sum(amount) as revenue FROM orders WHERE date = current_date").fetchone() +print(f"Today's revenue: ${result[0]:,.2f}") +``` + +## Running Locally +```bash +da scripts run sales_alert # runs on your machine +``` + +## Deploying to Server +```bash +da scripts deploy sales_alert --schedule "0 8 * * MON" # every Monday 8 AM +``` + +## Delivery Channels +- **Telegram**: Link via `da auth telegram-link` +- **Desktop app**: Via WebSocket gateway (automatic if connected) + +## Managing Scripts +```bash +da scripts list # all deployed scripts +da scripts undeploy # remove from server +``` diff --git a/cli/skills/security.md b/cli/skills/security.md new file mode 100644 index 0000000..9a36e5a --- /dev/null +++ b/cli/skills/security.md @@ -0,0 +1,37 @@ +# Security — RBAC, permissions, and audit + +## Roles +| Role | Permissions | +|------|-------------| +| `viewer` | Read catalog, view profiles, browse corporate memory | +| `analyst` | + sync data, run queries, vote, run/deploy scripts | +| `admin` | + manage users, approve knowledge, trigger sync | +| `km_admin` | + corporate memory governance | + +## Managing Users +```bash +da admin add-user user@company.com --role analyst +da admin list-users +da admin remove-user +``` + +## Dataset Permissions +Admins grant dataset access per user. Users can only sync datasets they have access to. + +## Audit Trail +Every API call is logged. Query with: +```bash +da query "SELECT * FROM system.audit_log ORDER BY timestamp DESC LIMIT 20" --remote +``` + +## Script Sandboxing +User scripts run in isolated subprocess with: +- Limited environment (no access to secrets) +- Timeout (default 5 min) +- Blocked imports (subprocess, shutil, ctypes) +- Stdout/stderr size cap (64KB) + +## JWT Tokens +- Issued on login, valid 30 days +- Contains: user_id, email, role +- Set JWT_SECRET_KEY in .env (min 32 chars) diff --git a/config/deploy.staging.yml b/config/deploy.staging.yml new file mode 100644 index 0000000..9d74e07 --- /dev/null +++ b/config/deploy.staging.yml @@ -0,0 +1,20 @@ +# Kamal staging deployment config +# Usage: kamal deploy -d staging + +servers: + web: + hosts: + - YOUR_STAGING_SERVER_IP + +accessories: + scheduler: + host: YOUR_STAGING_SERVER_IP + telegram-bot: + host: YOUR_STAGING_SERVER_IP + +proxy: + host: staging.data.your-domain.com + +env: + clear: + LOG_LEVEL: debug diff --git a/config/deploy.yml b/config/deploy.yml new file mode 100644 index 0000000..444c8b6 --- /dev/null +++ b/config/deploy.yml @@ -0,0 +1,67 @@ +# Kamal production deployment config +# Usage: kamal deploy + +service: data-analyst + +image: ghcr.io/keboola/data-analyst + +registry: + server: ghcr.io + username: + - KAMAL_REGISTRY_USERNAME + password: + - KAMAL_REGISTRY_PASSWORD + +servers: + web: + hosts: + - YOUR_SERVER_IP + cmd: uvicorn app.main:app --host 0.0.0.0 --port 8000 + options: + volume: + - /data:/data + +accessories: + scheduler: + image: ghcr.io/keboola/data-analyst + host: YOUR_SERVER_IP + cmd: python -m services.scheduler + env: + clear: + API_URL: http://data-analyst-web:8000 + secret: + - SCHEDULER_API_TOKEN + volumes: + - /data:/data + + telegram-bot: + image: ghcr.io/keboola/data-analyst + host: YOUR_SERVER_IP + cmd: python -m services.telegram_bot + env: + secret: + - TELEGRAM_BOT_TOKEN + volumes: + - /data:/data + +proxy: + ssl: true + host: data.your-domain.com + +healthcheck: + path: /api/health + port: 8000 + interval: 30 + +env: + clear: + DATA_DIR: /data + LOG_LEVEL: info + secret: + - JWT_SECRET_KEY + - KEBOOLA_STORAGE_TOKEN + - KEBOOLA_STACK_URL + - KEBOOLA_PROJECT_ID + - GOOGLE_CLIENT_ID + - GOOGLE_CLIENT_SECRET + - TELEGRAM_BOT_TOKEN diff --git a/src/repositories/sync_settings.py b/src/repositories/sync_settings.py new file mode 100644 index 0000000..2d45c30 --- /dev/null +++ b/src/repositories/sync_settings.py @@ -0,0 +1,87 @@ +"""Repository for user sync settings and dataset permissions.""" + +from datetime import datetime, timezone +from typing import Any, Optional, List, Dict + +import duckdb + + +class SyncSettingsRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def get_user_settings(self, user_id: str) -> List[Dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM user_sync_settings WHERE user_id = ? ORDER BY dataset", + [user_id], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def set_dataset_enabled(self, user_id: str, dataset: str, enabled: bool) -> None: + now = datetime.now(timezone.utc) + self.conn.execute( + """INSERT INTO user_sync_settings (user_id, dataset, enabled, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT (user_id, dataset) DO UPDATE SET enabled = excluded.enabled, updated_at = excluded.updated_at""", + [user_id, dataset, enabled, now], + ) + + def is_dataset_enabled(self, user_id: str, dataset: str) -> bool: + result = self.conn.execute( + "SELECT enabled FROM user_sync_settings WHERE user_id = ? AND dataset = ?", + [user_id, dataset], + ).fetchone() + return bool(result and result[0]) + + def get_enabled_datasets(self, user_id: str) -> List[str]: + results = self.conn.execute( + "SELECT dataset FROM user_sync_settings WHERE user_id = ? AND enabled = true", + [user_id], + ).fetchall() + return [r[0] for r in results] + + +class DatasetPermissionRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def grant(self, user_id: str, dataset: str, access: str = "read") -> None: + self.conn.execute( + """INSERT INTO dataset_permissions (user_id, dataset, access) + VALUES (?, ?, ?) + ON CONFLICT (user_id, dataset) DO UPDATE SET access = excluded.access""", + [user_id, dataset, access], + ) + + def revoke(self, user_id: str, dataset: str) -> None: + self.conn.execute( + "DELETE FROM dataset_permissions WHERE user_id = ? AND dataset = ?", + [user_id, dataset], + ) + + def has_access(self, user_id: str, dataset: str) -> bool: + result = self.conn.execute( + "SELECT access FROM dataset_permissions WHERE user_id = ? AND dataset = ?", + [user_id, dataset], + ).fetchone() + return result is not None and result[0] != "none" + + def get_user_permissions(self, user_id: str) -> List[Dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM dataset_permissions WHERE user_id = ? ORDER BY dataset", + [user_id], + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + return [dict(zip(columns, row)) for row in results] + + def get_accessible_datasets(self, user_id: str) -> List[str]: + results = self.conn.execute( + "SELECT dataset FROM dataset_permissions WHERE user_id = ? AND access != 'none'", + [user_id], + ).fetchall() + return [r[0] for r in results] diff --git a/tests/test_api_scripts.py b/tests/test_api_scripts.py new file mode 100644 index 0000000..b9dcae7 --- /dev/null +++ b/tests/test_api_scripts.py @@ -0,0 +1,126 @@ +"""Tests for scripts and settings API endpoints.""" + +import os +import pytest +from fastapi.testclient import TestClient + + +@pytest.fixture +def client(tmp_path): + os.environ["DATA_DIR"] = str(tmp_path) + os.environ["JWT_SECRET_KEY"] = "test-secret-32chars-minimum!!!!!" + os.environ["SCRIPT_TIMEOUT"] = "10" + + from app.main import create_app + from src.db import get_system_db + from src.repositories.users import UserRepository + from src.repositories.sync_settings import DatasetPermissionRepository + from app.auth.jwt import create_access_token + + conn = get_system_db() + user_repo = UserRepository(conn) + user_repo.create(id="admin1", email="admin@acme.com", name="Admin", role="admin") + user_repo.create(id="analyst1", email="analyst@acme.com", name="Analyst", role="analyst") + + perm_repo = DatasetPermissionRepository(conn) + perm_repo.grant("analyst1", "sales", "read") + perm_repo.grant("analyst1", "support", "read") + conn.close() + + app = create_app() + test_client = TestClient(app) + admin_token = create_access_token("admin1", "admin@acme.com", "admin") + analyst_token = create_access_token("analyst1", "analyst@acme.com", "analyst") + + return test_client, admin_token, analyst_token + + +class TestScriptsAPI: + def test_list_scripts_empty(self, client): + c, _, analyst_token = client + resp = c.get("/api/scripts", headers={"Authorization": f"Bearer {analyst_token}"}) + assert resp.status_code == 200 + assert resp.json()["count"] == 0 + + def test_deploy_and_list(self, client): + c, _, analyst_token = client + headers = {"Authorization": f"Bearer {analyst_token}"} + + resp = c.post("/api/scripts/deploy", json={ + "name": "hello", "source": "print('hello world')", + }, headers=headers) + assert resp.status_code == 201 + script_id = resp.json()["id"] + + resp = c.get("/api/scripts", headers=headers) + assert resp.json()["count"] == 1 + + def test_run_script(self, client): + c, _, analyst_token = client + headers = {"Authorization": f"Bearer {analyst_token}"} + + resp = c.post("/api/scripts/run", json={ + "source": "print('hello from script')", "name": "test", + }, headers=headers) + assert resp.status_code == 200 + data = resp.json() + assert data["exit_code"] == 0 + assert "hello from script" in data["stdout"] + + def test_run_blocked_import(self, client): + c, _, analyst_token = client + headers = {"Authorization": f"Bearer {analyst_token}"} + + resp = c.post("/api/scripts/run", json={ + "source": "import subprocess; subprocess.run(['ls'])", "name": "bad", + }, headers=headers) + assert resp.status_code == 400 + assert "disallowed" in resp.json()["detail"] + + def test_deploy_run_undeploy(self, client): + c, _, analyst_token = client + headers = {"Authorization": f"Bearer {analyst_token}"} + + # Deploy + resp = c.post("/api/scripts/deploy", json={ + "name": "calc", "source": "print(2+2)", "schedule": "0 8 * * MON", + }, headers=headers) + script_id = resp.json()["id"] + + # Run + resp = c.post(f"/api/scripts/{script_id}/run", headers=headers) + assert resp.status_code == 200 + assert "4" in resp.json()["stdout"] + + # Undeploy + resp = c.delete(f"/api/scripts/{script_id}", headers=headers) + assert resp.status_code == 204 + + +class TestSettingsAPI: + def test_get_settings(self, client): + c, _, analyst_token = client + resp = c.get("/api/settings", headers={"Authorization": f"Bearer {analyst_token}"}) + assert resp.status_code == 200 + data = resp.json() + assert data["user_id"] == "analyst1" + assert len(data["permissions"]) == 2 + + def test_enable_dataset(self, client): + c, _, analyst_token = client + headers = {"Authorization": f"Bearer {analyst_token}"} + + resp = c.put("/api/settings/dataset", json={ + "dataset": "sales", "enabled": True, + }, headers=headers) + assert resp.status_code == 200 + assert resp.json()["enabled"] is True + + def test_enable_unauthorized_dataset(self, client): + c, _, analyst_token = client + headers = {"Authorization": f"Bearer {analyst_token}"} + + resp = c.put("/api/settings/dataset", json={ + "dataset": "hr_secret", "enabled": True, + }, headers=headers) + assert resp.status_code == 403 diff --git a/tests/test_permissions.py b/tests/test_permissions.py new file mode 100644 index 0000000..976d777 --- /dev/null +++ b/tests/test_permissions.py @@ -0,0 +1,80 @@ +"""Tests for sync settings, dataset permissions, and script execution.""" + +import os +import pytest + + +@pytest.fixture +def db_conn(tmp_path): + os.environ["DATA_DIR"] = str(tmp_path) + from src.db import get_system_db + conn = get_system_db() + yield conn + conn.close() + + +class TestSyncSettingsRepository: + def test_set_and_get(self, db_conn): + from src.repositories.sync_settings import SyncSettingsRepository + repo = SyncSettingsRepository(db_conn) + repo.set_dataset_enabled("u1", "sales", True) + repo.set_dataset_enabled("u1", "support", False) + settings = repo.get_user_settings("u1") + assert len(settings) == 2 + + def test_is_enabled(self, db_conn): + from src.repositories.sync_settings import SyncSettingsRepository + repo = SyncSettingsRepository(db_conn) + repo.set_dataset_enabled("u1", "sales", True) + assert repo.is_dataset_enabled("u1", "sales") is True + assert repo.is_dataset_enabled("u1", "support") is False + + def test_get_enabled_datasets(self, db_conn): + from src.repositories.sync_settings import SyncSettingsRepository + repo = SyncSettingsRepository(db_conn) + repo.set_dataset_enabled("u1", "sales", True) + repo.set_dataset_enabled("u1", "support", False) + repo.set_dataset_enabled("u1", "hr", True) + enabled = repo.get_enabled_datasets("u1") + assert set(enabled) == {"sales", "hr"} + + def test_toggle_dataset(self, db_conn): + from src.repositories.sync_settings import SyncSettingsRepository + repo = SyncSettingsRepository(db_conn) + repo.set_dataset_enabled("u1", "sales", True) + assert repo.is_dataset_enabled("u1", "sales") is True + repo.set_dataset_enabled("u1", "sales", False) + assert repo.is_dataset_enabled("u1", "sales") is False + + +class TestDatasetPermissionRepository: + def test_grant_and_check(self, db_conn): + from src.repositories.sync_settings import DatasetPermissionRepository + repo = DatasetPermissionRepository(db_conn) + repo.grant("u1", "sales", "read") + assert repo.has_access("u1", "sales") is True + assert repo.has_access("u1", "hr") is False + + def test_revoke(self, db_conn): + from src.repositories.sync_settings import DatasetPermissionRepository + repo = DatasetPermissionRepository(db_conn) + repo.grant("u1", "sales", "read") + repo.revoke("u1", "sales") + assert repo.has_access("u1", "sales") is False + + def test_get_accessible_datasets(self, db_conn): + from src.repositories.sync_settings import DatasetPermissionRepository + repo = DatasetPermissionRepository(db_conn) + repo.grant("u1", "sales", "read") + repo.grant("u1", "hr", "read") + repo.grant("u1", "finance", "none") + accessible = repo.get_accessible_datasets("u1") + assert set(accessible) == {"sales", "hr"} + + def test_get_user_permissions(self, db_conn): + from src.repositories.sync_settings import DatasetPermissionRepository + repo = DatasetPermissionRepository(db_conn) + repo.grant("u1", "sales", "read") + repo.grant("u1", "hr", "read") + perms = repo.get_user_permissions("u1") + assert len(perms) == 2