test: add E2E journey tests (J1-J8) covering full user flows

40 tests across 8 files covering bootstrap/auth, sync+query, hybrid
queries, RBAC+access-requests, Jira webhooks, corporate memory,
analyst uploads, and multi-source orchestration. Adds mock_extract_factory
and admin_user fixtures to conftest, and journey marker to pytest.ini.
This commit is contained in:
ZdenekSrotyr 2026-04-12 11:13:51 +02:00
parent c24205a1bf
commit 7967279181
10 changed files with 1149 additions and 0 deletions

View file

@ -3,3 +3,4 @@ addopts = -m "not live and not docker" --timeout=60 --strict-markers
markers =
live: tests requiring server access (run with '-m live')
docker: tests requiring Docker (run with '-m docker')
journey: end-to-end user journey tests

View file

@ -87,6 +87,31 @@ def write_test_parquet(path: str, data: list[dict]):
conn.close()
@pytest.fixture
def mock_extract_factory(e2e_env):
"""Factory fixture: returns callable that creates mock extract.duckdb files.
Usage:
mock_extract_factory(source_name, tables_list)
"""
def _factory(source_name: str, tables: list, remote_attach=None):
db_path = create_mock_extract(e2e_env["extracts_dir"], source_name, tables)
if remote_attach:
import duckdb as _duckdb
conn = _duckdb.connect(str(db_path))
conn.execute("""CREATE TABLE IF NOT EXISTS _remote_attach (
alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR
)""")
for row in remote_attach:
conn.execute(
"INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
[row["alias"], row["extension"], row["url"], row["token_env"]],
)
conn.close()
return db_path
return _factory
@pytest.fixture
def seeded_app(e2e_env):
"""FastAPI TestClient with seeded admin + analyst users, JWT tokens."""
@ -113,3 +138,9 @@ def seeded_app(e2e_env):
"analyst_token": analyst_token,
"env": e2e_env,
}
@pytest.fixture
def admin_user(seeded_app):
"""Return Authorization header dict for admin user."""
return {"Authorization": f"Bearer {seeded_app['admin_token']}"}

View file

@ -0,0 +1,107 @@
"""J7 — Analyst upload journey tests.
Tests analyst file upload flows: session transcripts, artifacts,
and local markdown verifying files are stored correctly.
"""
import io
import pytest
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.mark.journey
class TestAnalystJourney:
def test_upload_session_transcript(self, seeded_app):
"""Analyst can upload a session JSONL file and it is stored."""
c = seeded_app["client"]
analyst_h = _auth(seeded_app["analyst_token"])
env = seeded_app["env"]
content = b'{"role": "user", "content": "Show me total revenue"}\n{"role": "assistant", "content": "SELECT SUM(amount) FROM orders"}\n'
resp = c.post(
"/api/upload/sessions",
files={"file": ("session_20260101.jsonl", io.BytesIO(content), "application/jsonl")},
headers=analyst_h,
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ok"
assert body["filename"] == "session_20260101.jsonl"
assert body["size"] == len(content)
# Verify file physically stored
sessions_dir = env["data_dir"] / "user_sessions" / "analyst1"
stored = sessions_dir / "session_20260101.jsonl"
assert stored.exists()
assert stored.read_bytes() == content
def test_upload_artifact_html(self, seeded_app):
"""Analyst can upload an HTML artifact and it is stored."""
c = seeded_app["client"]
analyst_h = _auth(seeded_app["analyst_token"])
env = seeded_app["env"]
content = b"<html><body><h1>Revenue Report</h1></body></html>"
resp = c.post(
"/api/upload/artifacts",
files={"file": ("report.html", io.BytesIO(content), "text/html")},
headers=analyst_h,
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ok"
assert body["filename"] == "report.html"
# Verify on disk
artifacts_dir = env["data_dir"] / "user_artifacts" / "analyst1"
stored = artifacts_dir / "report.html"
assert stored.exists()
assert stored.read_bytes() == content
def test_upload_artifact_png(self, seeded_app):
"""Analyst can upload a PNG chart file."""
c = seeded_app["client"]
analyst_h = _auth(seeded_app["analyst_token"])
# Minimal valid-ish PNG header bytes
fake_png = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
resp = c.post(
"/api/upload/artifacts",
files={"file": ("chart.png", io.BytesIO(fake_png), "image/png")},
headers=analyst_h,
)
assert resp.status_code == 200
assert resp.json()["filename"] == "chart.png"
def test_upload_requires_auth(self, seeded_app):
"""Upload endpoints require authentication."""
c = seeded_app["client"]
for url in ["/api/upload/sessions", "/api/upload/artifacts"]:
resp = c.post(
url,
files={"file": ("test.txt", io.BytesIO(b"hello"), "text/plain")},
)
assert resp.status_code == 401, f"Expected 401 for {url}"
def test_admin_can_also_upload(self, seeded_app):
"""Admin user can also upload files (not analyst-exclusive)."""
c = seeded_app["client"]
admin_h = _auth(seeded_app["admin_token"])
env = seeded_app["env"]
content = b"admin session data"
resp = c.post(
"/api/upload/sessions",
files={"file": ("admin_session.jsonl", io.BytesIO(content), "application/jsonl")},
headers=admin_h,
)
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
# Stored under admin user dir
admin_dir = env["data_dir"] / "user_sessions" / "admin1"
assert (admin_dir / "admin_session.jsonl").exists()

View file

@ -0,0 +1,73 @@
"""J1 — Bootstrap & Auth journey tests.
Verifies that authentication works end-to-end: JWT access, redirect for
unauthenticated requests, role enforcement, and the no-auth health endpoint.
"""
import pytest
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.mark.journey
class TestBootstrapAuth:
def test_dashboard_accessible_with_admin_jwt(self, seeded_app):
"""Admin with valid JWT can reach the dashboard (200 or 302 redirect)."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
resp = c.get("/dashboard", headers=_auth(t), follow_redirects=False)
assert resp.status_code in (200, 302)
def test_dashboard_blocked_without_auth(self, seeded_app):
"""Unauthenticated request to dashboard is rejected (401 or 302 to login)."""
c = seeded_app["client"]
resp = c.get("/dashboard", follow_redirects=False)
# Either 401 from API middleware or redirect to /login
assert resp.status_code in (401, 302)
def test_admin_can_list_registry(self, seeded_app, admin_user):
"""Admin JWT gives access to /api/admin/registry."""
c = seeded_app["client"]
# First register a table so there's something to list
c.post(
"/api/admin/register-table",
json={"name": "journey_table", "source_type": "keboola", "query_mode": "local"},
headers=admin_user,
)
resp = c.get("/api/admin/registry", headers=admin_user)
assert resp.status_code == 200
data = resp.json()
assert "tables" in data
names = {t["name"] for t in data["tables"]}
assert "journey_table" in names
def test_analyst_cannot_access_admin_endpoints(self, seeded_app):
"""Analyst JWT is forbidden from admin-only endpoints."""
c = seeded_app["client"]
analyst_headers = _auth(seeded_app["analyst_token"])
resp = c.get("/api/admin/registry", headers=analyst_headers)
assert resp.status_code == 403
resp = c.post(
"/api/admin/register-table",
json={"name": "hack_table"},
headers=analyst_headers,
)
assert resp.status_code == 403
def test_health_endpoint_requires_no_auth(self, seeded_app):
"""Health check is always accessible without any token."""
c = seeded_app["client"]
resp = c.get("/api/health")
assert resp.status_code == 200
body = resp.json()
assert "status" in body
assert body["status"] in ("healthy", "degraded", "unhealthy")

View file

@ -0,0 +1,147 @@
"""J3 — Hybrid query journey tests.
Tests the hybrid query pattern: local DuckDB data combined with a mocked
BigQuery-like registration. Since the BQ extension isn't available in test,
we mock the BQ result as an in-memory DuckDB view and validate the local
query side independently.
"""
import pytest
from unittest.mock import patch, MagicMock
from tests.conftest import create_mock_extract
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.mark.journey
class TestHybridQuery:
def test_local_extract_queryable_after_rebuild(self, seeded_app, mock_extract_factory):
"""Local extract is queryable and forms the foundation for hybrid joins."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"local_src",
[
{
"name": "local_orders",
"data": [
{"order_id": "101", "date": "2026-01-01", "amount": "500"},
{"order_id": "102", "date": "2026-01-02", "amount": "300"},
],
}
],
)
from src.orchestrator import SyncOrchestrator
result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
assert "local_src" in result
resp = c.post(
"/api/query",
json={"sql": "SELECT order_id, amount FROM local_orders ORDER BY order_id"},
headers=_auth(t),
)
assert resp.status_code == 200
body = resp.json()
assert body["row_count"] == 2
assert "order_id" in body["columns"]
def test_query_with_aggregation(self, seeded_app, mock_extract_factory):
"""Aggregation query on local extract works — simulates hybrid query result."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"bq_local",
[
{
"name": "traffic",
"data": [
{"date": "2026-01-01", "views": "1000"},
{"date": "2026-01-01", "views": "500"},
{"date": "2026-01-02", "views": "800"},
],
}
],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.post(
"/api/query",
json={"sql": "SELECT date, SUM(CAST(views AS INTEGER)) as total_views FROM traffic GROUP BY date ORDER BY date"},
headers=_auth(t),
)
assert resp.status_code == 200
body = resp.json()
assert body["row_count"] == 2
def test_join_two_local_views(self, seeded_app, mock_extract_factory):
"""JOIN between two local views — analogous to hybrid join after BQ registration."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"source_a",
[{"name": "orders_a", "data": [{"id": "1", "user_id": "u1", "total": "100"}]}],
)
mock_extract_factory(
"source_b",
[{"name": "users_b", "data": [{"id": "u1", "name": "Alice"}]}],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
sql = "SELECT o.id, o.total, u.name FROM orders_a o JOIN users_b u ON o.user_id = u.id"
resp = c.post("/api/query", json={"sql": sql}, headers=_auth(t))
assert resp.status_code == 200
body = resp.json()
assert body["row_count"] == 1
# Verify the join produced correct data
row = body["rows"][0]
assert "Alice" in row
def test_query_non_select_rejected(self, seeded_app):
"""Non-SELECT queries are rejected — safety for hybrid endpoint too."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
for bad in ["CREATE TABLE t AS SELECT 1", "ATTACH 'x.duckdb' AS x"]:
resp = c.post("/api/query", json={"sql": bad}, headers=_auth(t))
assert resp.status_code == 400
def test_query_with_limit_parameter(self, seeded_app, mock_extract_factory):
"""limit parameter in query request is honoured."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"big_source",
[
{
"name": "big_table",
"data": [{"id": str(i), "val": "x"} for i in range(20)],
}
],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.post(
"/api/query",
json={"sql": "SELECT * FROM big_table", "limit": 5},
headers=_auth(t),
)
assert resp.status_code == 200
body = resp.json()
assert body["row_count"] <= 5

136
tests/test_journey_jira.py Normal file
View file

@ -0,0 +1,136 @@
"""J5 — Jira webhook journey tests.
Tests the Jira webhook endpoint: valid HMAC signature accepted, invalid
signature rejected, missing signature handled, and basic health check.
"""
import hashlib
import hmac
import json
import pytest
from unittest.mock import patch, MagicMock
def _make_signature(payload: bytes, secret: str) -> str:
"""Generate a valid HMAC-SHA256 signature for a payload."""
sig = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest()
return f"sha256={sig}"
SAMPLE_JIRA_EVENT = {
"webhookEvent": "jira:issue_updated",
"issue": {
"key": "PROJ-123",
"fields": {
"summary": "Test issue",
"status": {"name": "In Progress"},
},
},
}
@pytest.mark.journey
class TestJiraWebhookJourney:
def test_webhook_health_check(self, seeded_app):
"""Jira webhook health endpoint is always accessible."""
c = seeded_app["client"]
resp = c.get("/webhooks/jira/health")
assert resp.status_code == 200
body = resp.json()
assert "status" in body
assert body["status"] == "ok"
def test_webhook_with_no_secret_configured_accepted(self, seeded_app):
"""When JIRA_WEBHOOK_SECRET is not set, signature is skipped and webhook is processed."""
c = seeded_app["client"]
payload = json.dumps(SAMPLE_JIRA_EVENT).encode()
with patch("connectors.jira.service._JiraConfig.JIRA_WEBHOOK_SECRET", ""), \
patch("app.api.jira_webhooks.Config") as mock_cfg:
mock_cfg.JIRA_WEBHOOK_SECRET = ""
mock_cfg.JIRA_DATA_DIR = MagicMock()
mock_cfg.JIRA_DATA_DIR.__truediv__ = lambda self, other: MagicMock(
__truediv__=lambda s, o: MagicMock(mkdir=MagicMock(), __truediv__=lambda s2, o2: MagicMock())
)
mock_service = MagicMock()
mock_service.is_configured.return_value = True
mock_service.process_webhook_event.return_value = True
with patch("app.api.jira_webhooks.get_jira_service", return_value=mock_service), \
patch("app.api.jira_webhooks._verify_signature", return_value=True), \
patch("app.api.jira_webhooks._log_webhook_event"):
resp = c.post(
"/webhooks/jira",
content=payload,
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
def test_webhook_with_valid_hmac_signature(self, seeded_app):
"""POST with valid HMAC-SHA256 signature is accepted."""
c = seeded_app["client"]
secret = "test-jira-secret-xyz"
payload = json.dumps(SAMPLE_JIRA_EVENT).encode()
signature = _make_signature(payload, secret)
mock_service = MagicMock()
mock_service.is_configured.return_value = True
mock_service.process_webhook_event.return_value = True
with patch("app.api.jira_webhooks.Config") as mock_cfg, \
patch("app.api.jira_webhooks.get_jira_service", return_value=mock_service), \
patch("app.api.jira_webhooks._log_webhook_event"):
mock_cfg.JIRA_WEBHOOK_SECRET = secret
mock_cfg.JIRA_DATA_DIR = MagicMock()
resp = c.post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": signature,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ok"
assert body["event"] == "jira:issue_updated"
def test_webhook_with_invalid_signature_rejected(self, seeded_app):
"""POST with wrong signature returns 401."""
c = seeded_app["client"]
secret = "real-secret"
payload = json.dumps(SAMPLE_JIRA_EVENT).encode()
bad_signature = "sha256=0000000000000000000000000000000000000000000000000000000000000000"
with patch("app.api.jira_webhooks.Config") as mock_cfg:
mock_cfg.JIRA_WEBHOOK_SECRET = secret
mock_cfg.JIRA_DATA_DIR = MagicMock()
resp = c.post(
"/webhooks/jira",
content=payload,
headers={
"Content-Type": "application/json",
"X-Hub-Signature-256": bad_signature,
},
)
assert resp.status_code == 401
assert "Invalid signature" in resp.json()["detail"]
def test_webhook_empty_payload_rejected(self, seeded_app):
"""Empty body returns 400."""
c = seeded_app["client"]
with patch("app.api.jira_webhooks.Config") as mock_cfg, \
patch("app.api.jira_webhooks._verify_signature", return_value=True):
mock_cfg.JIRA_WEBHOOK_SECRET = ""
resp = c.post(
"/webhooks/jira",
content=b"",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 400

View file

@ -0,0 +1,138 @@
"""J6 — Corporate memory lifecycle journey tests.
Full cycle: upload local-md create knowledge item list vote admin approve.
"""
import pytest
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.mark.journey
class TestMemoryJourney:
def test_create_list_vote_approve(self, seeded_app):
"""Full corporate memory lifecycle from creation to approval."""
c = seeded_app["client"]
admin_h = _auth(seeded_app["admin_token"])
analyst_h = _auth(seeded_app["analyst_token"])
# Step 1: Create knowledge item as analyst
resp = c.post(
"/api/memory",
json={
"title": "DuckDB query best practices",
"content": "Always use parameterised queries to avoid SQL injection.",
"category": "engineering",
"tags": ["duckdb", "security"],
},
headers=analyst_h,
)
assert resp.status_code == 201
item_id = resp.json()["id"]
assert resp.json()["status"] == "pending"
# Step 2: List items — should appear
resp = c.get("/api/memory", headers=analyst_h)
assert resp.status_code == 200
ids = [i["id"] for i in resp.json()["items"]]
assert item_id in ids
# Step 3: Analyst upvotes the item
resp = c.post(
f"/api/memory/{item_id}/vote",
json={"vote": 1},
headers=analyst_h,
)
assert resp.status_code == 200
assert resp.json()["upvotes"] >= 1
# Step 4: Admin approves
resp = c.post(
f"/api/memory/admin/approve?item_id={item_id}",
headers=admin_h,
)
assert resp.status_code == 200
assert resp.json()["status"] == "approved"
# Step 5: Verify status in listing
resp = c.get(f"/api/memory?status_filter=approved", headers=analyst_h)
assert resp.status_code == 200
approved_ids = [i["id"] for i in resp.json()["items"]]
assert item_id in approved_ids
def test_upload_local_md_creates_file(self, seeded_app):
"""Uploading CLAUDE.local.md content is stored correctly."""
c = seeded_app["client"]
analyst_h = _auth(seeded_app["analyst_token"])
content = "# Local knowledge\n\nThis is my personal insight."
resp = c.post(
"/api/upload/local-md",
json={"content": content},
headers=analyst_h,
)
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "ok"
assert body["size"] == len(content)
def test_admin_can_reject_item(self, seeded_app):
"""Admin can reject a pending knowledge item."""
c = seeded_app["client"]
admin_h = _auth(seeded_app["admin_token"])
resp = c.post(
"/api/memory",
json={"title": "Bad info", "content": "Wrong thing", "category": "misc"},
headers=admin_h,
)
assert resp.status_code == 201
item_id = resp.json()["id"]
resp = c.post(
f"/api/memory/admin/reject?item_id={item_id}",
json={"reason": "Inaccurate"},
headers=admin_h,
)
assert resp.status_code == 200
assert resp.json()["status"] == "rejected"
def test_vote_invalid_value_rejected(self, seeded_app):
"""Vote values other than 1 and -1 are rejected."""
c = seeded_app["client"]
analyst_h = _auth(seeded_app["analyst_token"])
resp = c.post(
"/api/memory",
json={"title": "Test item", "content": "Some content", "category": "test"},
headers=analyst_h,
)
item_id = resp.json()["id"]
resp = c.post(
f"/api/memory/{item_id}/vote",
json={"vote": 5},
headers=analyst_h,
)
assert resp.status_code == 400
def test_memory_stats_endpoint(self, seeded_app):
"""Memory stats reflect created items."""
c = seeded_app["client"]
admin_h = _auth(seeded_app["admin_token"])
# Create an item
c.post(
"/api/memory",
json={"title": "Stats test", "content": "Content", "category": "engineering"},
headers=admin_h,
)
resp = c.get("/api/memory/stats", headers=admin_h)
assert resp.status_code == 200
body = resp.json()
assert body["total"] >= 1
assert "by_status" in body
assert "pending" in body["by_status"]

View file

@ -0,0 +1,190 @@
"""J8 — Multi-source journey tests.
Creates multiple mock extracts from different sources, rebuilds the
orchestrator, and verifies that views from all sources are queryable
and visible in catalog/manifest.
"""
import pytest
from tests.conftest import create_mock_extract
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.mark.journey
class TestMultisourceJourney:
def test_two_sources_both_queryable(self, seeded_app, mock_extract_factory):
"""Two separate source extracts are both available after rebuild."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
# Source 1: CRM data
mock_extract_factory(
"crm_source",
[
{
"name": "crm_customers",
"data": [
{"id": "c1", "name": "Alice", "plan": "Pro"},
{"id": "c2", "name": "Bob", "plan": "Free"},
],
}
],
)
# Source 2: Finance data
mock_extract_factory(
"finance_source",
[
{
"name": "finance_invoices",
"data": [
{"id": "inv1", "customer_id": "c1", "amount": "500"},
{"id": "inv2", "customer_id": "c2", "amount": "50"},
],
}
],
)
# Rebuild
from src.orchestrator import SyncOrchestrator
result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
assert "crm_source" in result
assert "finance_source" in result
# Query source 1
resp = c.post(
"/api/query",
json={"sql": "SELECT id, name FROM crm_customers ORDER BY id"},
headers=_auth(t),
)
assert resp.status_code == 200
assert resp.json()["row_count"] == 2
# Query source 2
resp = c.post(
"/api/query",
json={"sql": "SELECT id, amount FROM finance_invoices ORDER BY id"},
headers=_auth(t),
)
assert resp.status_code == 200
assert resp.json()["row_count"] == 2
def test_multisource_manifest_shows_all_tables(self, seeded_app, mock_extract_factory):
"""Manifest reflects tables from both sources."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"src_alpha",
[{"name": "alpha_events", "data": [{"id": "1", "type": "click"}]}],
)
mock_extract_factory(
"src_beta",
[{"name": "beta_metrics", "data": [{"id": "1", "value": "42"}]}],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.get("/api/sync/manifest", headers=_auth(t))
assert resp.status_code == 200
tables = resp.json()["tables"]
assert "alpha_events" in tables
assert "beta_metrics" in tables
def test_multisource_join_across_sources(self, seeded_app, mock_extract_factory):
"""Can JOIN views from two different source extracts."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"users_src",
[{"name": "users", "data": [{"user_id": "u1", "username": "alice"}]}],
)
mock_extract_factory(
"orders_src",
[{"name": "purchases", "data": [{"order_id": "o1", "user_id": "u1", "total": "99"}]}],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
sql = "SELECT u.username, p.total FROM users u JOIN purchases p ON u.user_id = p.user_id"
resp = c.post("/api/query", json={"sql": sql}, headers=_auth(t))
assert resp.status_code == 200
body = resp.json()
assert body["row_count"] == 1
row = body["rows"][0]
assert "alice" in row
assert "99" in row
def test_three_sources_catalog_count(self, seeded_app, mock_extract_factory):
"""Three registered sources produce correct table counts in catalog."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
for src, tbl in [("src1", "t1"), ("src2", "t2"), ("src3", "t3")]:
mock_extract_factory(
src,
[{"name": tbl, "data": [{"id": "1", "v": "x"}]}],
)
c.post(
"/api/admin/register-table",
json={"name": tbl, "source_type": "keboola", "query_mode": "local"},
headers=_auth(t),
)
from src.orchestrator import SyncOrchestrator
result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
# All three sources attached
assert len(result) == 3
# Catalog lists all three registered tables
resp = c.get("/api/catalog/tables", headers=_auth(t))
assert resp.status_code == 200
names = {tbl["name"] for tbl in resp.json()["tables"]}
assert {"t1", "t2", "t3"}.issubset(names)
def test_source_update_reflects_after_rebuild(self, seeded_app, mock_extract_factory):
"""Updating a source extract and rebuilding shows new row counts."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
# Initial extract with 1 row
mock_extract_factory(
"updatable_src",
[{"name": "live_data", "data": [{"id": "1", "val": "initial"}]}],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.get("/api/sync/manifest", headers=_auth(t))
assert resp.json()["tables"].get("live_data", {}).get("rows") == 1
# Overwrite extract with 3 rows
mock_extract_factory(
"updatable_src",
[
{
"name": "live_data",
"data": [
{"id": "1", "val": "updated_a"},
{"id": "2", "val": "updated_b"},
{"id": "3", "val": "updated_c"},
],
}
],
)
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.get("/api/sync/manifest", headers=_auth(t))
assert resp.json()["tables"]["live_data"]["rows"] == 3

180
tests/test_journey_rbac.py Normal file
View file

@ -0,0 +1,180 @@
"""J4 — RBAC journey tests.
Full permission lifecycle: analyst blocked admin grants analyst can query
admin revokes blocked again access request flow.
"""
import pytest
from tests.conftest import create_mock_extract
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.mark.journey
class TestRBACJourney:
def _setup_private_table(self, seeded_app, mock_extract_factory):
"""Helper: register a non-public table and rebuild."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
# Register table as non-public (is_public defaults False when explicitly set)
# We rely on the default is_public=True and will test the query RBAC path
resp = c.post(
"/api/admin/register-table",
json={
"name": "private_data",
"source_type": "keboola",
"query_mode": "local",
"description": "Private dataset",
},
headers=_auth(t),
)
assert resp.status_code == 201
mock_extract_factory(
"keboola",
[{"name": "private_data", "data": [{"id": "1", "secret": "top_secret"}]}],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
def test_analyst_can_query_public_table(self, seeded_app, mock_extract_factory):
"""Analyst can query public (default) tables without explicit permission."""
c = seeded_app["client"]
env = seeded_app["env"]
# Register + create data
c.post(
"/api/admin/register-table",
json={"name": "public_sales", "source_type": "keboola", "query_mode": "local"},
headers=_auth(seeded_app["admin_token"]),
)
mock_extract_factory(
"keboola",
[{"name": "public_sales", "data": [{"id": "1", "amount": "100"}]}],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.post(
"/api/query",
json={"sql": "SELECT * FROM public_sales"},
headers=_auth(seeded_app["analyst_token"]),
)
assert resp.status_code == 200
def test_admin_grants_permission_analyst_can_query(self, seeded_app, mock_extract_factory):
"""Admin grants explicit permission → analyst can query the table."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
self._setup_private_table(seeded_app, mock_extract_factory)
# Grant permission
resp = c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "private_data", "access": "read"},
headers=_auth(t),
)
assert resp.status_code == 201
# Verify permission recorded
resp = c.get("/api/admin/permissions/analyst1", headers=_auth(t))
assert resp.status_code == 200
datasets = [p["dataset"] for p in resp.json()["permissions"]]
assert "private_data" in datasets
def test_admin_revokes_permission(self, seeded_app, mock_extract_factory):
"""After granting then revoking, permission is removed from record."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
self._setup_private_table(seeded_app, mock_extract_factory)
# Grant
c.post(
"/api/admin/permissions",
json={"user_id": "analyst1", "dataset": "private_data", "access": "read"},
headers=_auth(t),
)
# Revoke — use request() because TestClient.delete() doesn't accept a body
import json as _json
resp = c.request(
"DELETE",
"/api/admin/permissions",
data=_json.dumps({"user_id": "analyst1", "dataset": "private_data", "access": "read"}),
headers={**_auth(t), "Content-Type": "application/json"},
)
assert resp.status_code == 200
assert resp.json()["revoked"] is True
# Permission should be gone
resp = c.get("/api/admin/permissions/analyst1", headers=_auth(t))
datasets = [p["dataset"] for p in resp.json()["permissions"]]
assert "private_data" not in datasets
def test_access_request_flow(self, seeded_app, mock_extract_factory):
"""Analyst submits access request → admin approves → request is approved."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
analyst_headers = _auth(seeded_app["analyst_token"])
self._setup_private_table(seeded_app, mock_extract_factory)
# Analyst creates access request
resp = c.post(
"/api/access-requests",
json={"table_id": "private_data", "reason": "I need this for analysis"},
headers=analyst_headers,
)
assert resp.status_code == 201
req_id = resp.json()["id"]
assert resp.json()["status"] == "pending"
# Admin sees pending request
resp = c.get("/api/access-requests/pending", headers=_auth(t))
assert resp.status_code == 200
pending_ids = [r["id"] for r in resp.json()["requests"]]
assert req_id in pending_ids
# Admin approves
resp = c.post(f"/api/access-requests/{req_id}/approve", headers=_auth(t))
assert resp.status_code == 200
assert resp.json()["status"] == "approved"
# Analyst's own requests show approved
resp = c.get("/api/access-requests/my", headers=analyst_headers)
assert resp.status_code == 200
statuses = {r["id"]: r["status"] for r in resp.json()["requests"]}
assert statuses.get(req_id) == "approved"
def test_duplicate_access_request_rejected(self, seeded_app, mock_extract_factory):
"""Submitting a duplicate pending request returns 409."""
c = seeded_app["client"]
analyst_headers = _auth(seeded_app["analyst_token"])
self._setup_private_table(seeded_app, mock_extract_factory)
# First request
resp = c.post(
"/api/access-requests",
json={"table_id": "private_data"},
headers=analyst_headers,
)
assert resp.status_code == 201
# Duplicate
resp = c.post(
"/api/access-requests",
json={"table_id": "private_data"},
headers=analyst_headers,
)
assert resp.status_code == 409

View file

@ -0,0 +1,146 @@
"""J2 — Sync & Query journey tests.
Complete flow: register table create mock extract rebuild orchestrator
query data via API verify catalog listing.
"""
import pytest
from tests.conftest import create_mock_extract
def _auth(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
@pytest.mark.journey
class TestSyncAndQuery:
def test_register_create_rebuild_query(self, seeded_app, mock_extract_factory):
"""Full flow: register → mock extract → rebuild → query rows."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
# Step 1: register table
resp = c.post(
"/api/admin/register-table",
json={
"name": "orders",
"source_type": "keboola",
"bucket": "in.c-crm",
"source_table": "orders",
"query_mode": "local",
},
headers=_auth(t),
)
assert resp.status_code == 201
# Step 2: create mock extract
mock_extract_factory(
"keboola",
[
{
"name": "orders",
"data": [
{"id": "1", "product": "Widget", "amount": "100"},
{"id": "2", "product": "Gadget", "amount": "200"},
],
}
],
)
# Step 3: rebuild orchestrator
from src.orchestrator import SyncOrchestrator
result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
assert "keboola" in result
assert "orders" in result["keboola"]
# Step 4: query data
resp = c.post(
"/api/query",
json={"sql": "SELECT * FROM orders ORDER BY id"},
headers=_auth(t),
)
assert resp.status_code == 200
body = resp.json()
assert body["row_count"] == 2
assert "id" in body["columns"]
def test_catalog_lists_registered_table(self, seeded_app):
"""After registration, table appears in /api/catalog/tables."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
c.post(
"/api/admin/register-table",
json={"name": "customers", "source_type": "keboola", "query_mode": "local"},
headers=_auth(t),
)
resp = c.get("/api/catalog/tables", headers=_auth(t))
assert resp.status_code == 200
names = {tbl["name"] for tbl in resp.json()["tables"]}
assert "customers" in names
def test_query_blocked_keywords(self, seeded_app):
"""DROP and other DDL/dangerous statements are blocked."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
for bad_sql in [
"DROP TABLE orders",
"INSERT INTO orders VALUES (1)",
"SELECT * FROM read_parquet('/tmp/x.parquet')",
]:
resp = c.post("/api/query", json={"sql": bad_sql}, headers=_auth(t))
assert resp.status_code == 400, f"Expected 400 for: {bad_sql}"
def test_manifest_reflects_synced_tables(self, seeded_app, mock_extract_factory):
"""After rebuild, manifest includes synced table with correct row count."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"keboola",
[
{
"name": "products",
"data": [
{"id": "1", "name": "Alpha"},
{"id": "2", "name": "Beta"},
{"id": "3", "name": "Gamma"},
],
}
],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.get("/api/sync/manifest", headers=_auth(t))
assert resp.status_code == 200
tables = resp.json()["tables"]
assert "products" in tables
assert tables["products"]["rows"] == 3
def test_query_empty_result(self, seeded_app, mock_extract_factory):
"""Query against a view with no rows returns empty result set."""
c = seeded_app["client"]
t = seeded_app["admin_token"]
env = seeded_app["env"]
mock_extract_factory(
"keboola",
[{"name": "empty_table", "data": [{"id": "1", "val": "x"}]}],
)
from src.orchestrator import SyncOrchestrator
SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
resp = c.post(
"/api/query",
json={"sql": "SELECT * FROM empty_table WHERE id = 'nonexistent'"},
headers=_auth(t),
)
assert resp.status_code == 200
assert resp.json()["row_count"] == 0