diff --git a/pytest.ini b/pytest.ini
index 6104bb7..07a3e40 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -3,3 +3,4 @@ addopts = -m "not live and not docker" --timeout=60 --strict-markers
markers =
live: tests requiring server access (run with '-m live')
docker: tests requiring Docker (run with '-m docker')
+ journey: end-to-end user journey tests
diff --git a/tests/conftest.py b/tests/conftest.py
index 0cc1da8..467796c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -87,6 +87,31 @@ def write_test_parquet(path: str, data: list[dict]):
conn.close()
+@pytest.fixture
+def mock_extract_factory(e2e_env):
+ """Factory fixture: returns callable that creates mock extract.duckdb files.
+
+ Usage:
+ mock_extract_factory(source_name, tables_list)
+ """
+ def _factory(source_name: str, tables: list, remote_attach=None):
+ db_path = create_mock_extract(e2e_env["extracts_dir"], source_name, tables)
+ if remote_attach:
+ import duckdb as _duckdb
+ conn = _duckdb.connect(str(db_path))
+ conn.execute("""CREATE TABLE IF NOT EXISTS _remote_attach (
+ alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR
+ )""")
+ for row in remote_attach:
+ conn.execute(
+ "INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
+ [row["alias"], row["extension"], row["url"], row["token_env"]],
+ )
+ conn.close()
+ return db_path
+ return _factory
+
+
@pytest.fixture
def seeded_app(e2e_env):
"""FastAPI TestClient with seeded admin + analyst users, JWT tokens."""
@@ -113,3 +138,9 @@ def seeded_app(e2e_env):
"analyst_token": analyst_token,
"env": e2e_env,
}
+
+
+@pytest.fixture
+def admin_user(seeded_app):
+ """Return Authorization header dict for admin user."""
+ return {"Authorization": f"Bearer {seeded_app['admin_token']}"}
diff --git a/tests/test_journey_analyst.py b/tests/test_journey_analyst.py
new file mode 100644
index 0000000..01929c7
--- /dev/null
+++ b/tests/test_journey_analyst.py
@@ -0,0 +1,107 @@
+"""J7 — Analyst upload journey tests.
+
+Tests analyst file upload flows: session transcripts, artifacts,
+and local markdown — verifying files are stored correctly.
+"""
+
+import io
+import pytest
+
+
+def _auth(token: str) -> dict:
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.mark.journey
+class TestAnalystJourney:
+ def test_upload_session_transcript(self, seeded_app):
+ """Analyst can upload a session JSONL file and it is stored."""
+ c = seeded_app["client"]
+ analyst_h = _auth(seeded_app["analyst_token"])
+ env = seeded_app["env"]
+
+ content = b'{"role": "user", "content": "Show me total revenue"}\n{"role": "assistant", "content": "SELECT SUM(amount) FROM orders"}\n'
+ resp = c.post(
+ "/api/upload/sessions",
+ files={"file": ("session_20260101.jsonl", io.BytesIO(content), "application/jsonl")},
+ headers=analyst_h,
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["status"] == "ok"
+ assert body["filename"] == "session_20260101.jsonl"
+ assert body["size"] == len(content)
+
+ # Verify file physically stored
+ sessions_dir = env["data_dir"] / "user_sessions" / "analyst1"
+ stored = sessions_dir / "session_20260101.jsonl"
+ assert stored.exists()
+ assert stored.read_bytes() == content
+
+ def test_upload_artifact_html(self, seeded_app):
+ """Analyst can upload an HTML artifact and it is stored."""
+ c = seeded_app["client"]
+ analyst_h = _auth(seeded_app["analyst_token"])
+ env = seeded_app["env"]
+
+ content = b"
Revenue Report
"
+ resp = c.post(
+ "/api/upload/artifacts",
+ files={"file": ("report.html", io.BytesIO(content), "text/html")},
+ headers=analyst_h,
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["status"] == "ok"
+ assert body["filename"] == "report.html"
+
+ # Verify on disk
+ artifacts_dir = env["data_dir"] / "user_artifacts" / "analyst1"
+ stored = artifacts_dir / "report.html"
+ assert stored.exists()
+ assert stored.read_bytes() == content
+
+ def test_upload_artifact_png(self, seeded_app):
+ """Analyst can upload a PNG chart file."""
+ c = seeded_app["client"]
+ analyst_h = _auth(seeded_app["analyst_token"])
+
+ # Minimal valid-ish PNG header bytes
+ fake_png = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
+ resp = c.post(
+ "/api/upload/artifacts",
+ files={"file": ("chart.png", io.BytesIO(fake_png), "image/png")},
+ headers=analyst_h,
+ )
+ assert resp.status_code == 200
+ assert resp.json()["filename"] == "chart.png"
+
+ def test_upload_requires_auth(self, seeded_app):
+ """Upload endpoints require authentication."""
+ c = seeded_app["client"]
+
+ for url in ["/api/upload/sessions", "/api/upload/artifacts"]:
+ resp = c.post(
+ url,
+ files={"file": ("test.txt", io.BytesIO(b"hello"), "text/plain")},
+ )
+ assert resp.status_code == 401, f"Expected 401 for {url}"
+
+ def test_admin_can_also_upload(self, seeded_app):
+ """Admin user can also upload files (not analyst-exclusive)."""
+ c = seeded_app["client"]
+ admin_h = _auth(seeded_app["admin_token"])
+ env = seeded_app["env"]
+
+ content = b"admin session data"
+ resp = c.post(
+ "/api/upload/sessions",
+ files={"file": ("admin_session.jsonl", io.BytesIO(content), "application/jsonl")},
+ headers=admin_h,
+ )
+ assert resp.status_code == 200
+ assert resp.json()["status"] == "ok"
+
+ # Stored under admin user dir
+ admin_dir = env["data_dir"] / "user_sessions" / "admin1"
+ assert (admin_dir / "admin_session.jsonl").exists()
diff --git a/tests/test_journey_bootstrap_auth.py b/tests/test_journey_bootstrap_auth.py
new file mode 100644
index 0000000..c3506bc
--- /dev/null
+++ b/tests/test_journey_bootstrap_auth.py
@@ -0,0 +1,73 @@
+"""J1 — Bootstrap & Auth journey tests.
+
+Verifies that authentication works end-to-end: JWT access, redirect for
+unauthenticated requests, role enforcement, and the no-auth health endpoint.
+"""
+
+import pytest
+
+
+def _auth(token: str) -> dict:
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.mark.journey
+class TestBootstrapAuth:
+ def test_dashboard_accessible_with_admin_jwt(self, seeded_app):
+ """Admin with valid JWT can reach the dashboard (200 or 302 redirect)."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+
+ resp = c.get("/dashboard", headers=_auth(t), follow_redirects=False)
+ assert resp.status_code in (200, 302)
+
+ def test_dashboard_blocked_without_auth(self, seeded_app):
+ """Unauthenticated request to dashboard is rejected (401 or 302 to login)."""
+ c = seeded_app["client"]
+
+ resp = c.get("/dashboard", follow_redirects=False)
+ # Either 401 from API middleware or redirect to /login
+ assert resp.status_code in (401, 302)
+
+ def test_admin_can_list_registry(self, seeded_app, admin_user):
+ """Admin JWT gives access to /api/admin/registry."""
+ c = seeded_app["client"]
+
+ # First register a table so there's something to list
+ c.post(
+ "/api/admin/register-table",
+ json={"name": "journey_table", "source_type": "keboola", "query_mode": "local"},
+ headers=admin_user,
+ )
+
+ resp = c.get("/api/admin/registry", headers=admin_user)
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "tables" in data
+ names = {t["name"] for t in data["tables"]}
+ assert "journey_table" in names
+
+ def test_analyst_cannot_access_admin_endpoints(self, seeded_app):
+ """Analyst JWT is forbidden from admin-only endpoints."""
+ c = seeded_app["client"]
+ analyst_headers = _auth(seeded_app["analyst_token"])
+
+ resp = c.get("/api/admin/registry", headers=analyst_headers)
+ assert resp.status_code == 403
+
+ resp = c.post(
+ "/api/admin/register-table",
+ json={"name": "hack_table"},
+ headers=analyst_headers,
+ )
+ assert resp.status_code == 403
+
+ def test_health_endpoint_requires_no_auth(self, seeded_app):
+ """Health check is always accessible without any token."""
+ c = seeded_app["client"]
+
+ resp = c.get("/api/health")
+ assert resp.status_code == 200
+ body = resp.json()
+ assert "status" in body
+ assert body["status"] in ("healthy", "degraded", "unhealthy")
diff --git a/tests/test_journey_hybrid.py b/tests/test_journey_hybrid.py
new file mode 100644
index 0000000..2ec1bba
--- /dev/null
+++ b/tests/test_journey_hybrid.py
@@ -0,0 +1,147 @@
+"""J3 — Hybrid query journey tests.
+
+Tests the hybrid query pattern: local DuckDB data combined with a mocked
+BigQuery-like registration. Since the BQ extension isn't available in test,
+we mock the BQ result as an in-memory DuckDB view and validate the local
+query side independently.
+"""
+
+import pytest
+from unittest.mock import patch, MagicMock
+from tests.conftest import create_mock_extract
+
+
+def _auth(token: str) -> dict:
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.mark.journey
+class TestHybridQuery:
+ def test_local_extract_queryable_after_rebuild(self, seeded_app, mock_extract_factory):
+ """Local extract is queryable and forms the foundation for hybrid joins."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "local_src",
+ [
+ {
+ "name": "local_orders",
+ "data": [
+ {"order_id": "101", "date": "2026-01-01", "amount": "500"},
+ {"order_id": "102", "date": "2026-01-02", "amount": "300"},
+ ],
+ }
+ ],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+ assert "local_src" in result
+
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT order_id, amount FROM local_orders ORDER BY order_id"},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["row_count"] == 2
+ assert "order_id" in body["columns"]
+
+ def test_query_with_aggregation(self, seeded_app, mock_extract_factory):
+ """Aggregation query on local extract works — simulates hybrid query result."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "bq_local",
+ [
+ {
+ "name": "traffic",
+ "data": [
+ {"date": "2026-01-01", "views": "1000"},
+ {"date": "2026-01-01", "views": "500"},
+ {"date": "2026-01-02", "views": "800"},
+ ],
+ }
+ ],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT date, SUM(CAST(views AS INTEGER)) as total_views FROM traffic GROUP BY date ORDER BY date"},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["row_count"] == 2
+
+ def test_join_two_local_views(self, seeded_app, mock_extract_factory):
+ """JOIN between two local views — analogous to hybrid join after BQ registration."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "source_a",
+ [{"name": "orders_a", "data": [{"id": "1", "user_id": "u1", "total": "100"}]}],
+ )
+ mock_extract_factory(
+ "source_b",
+ [{"name": "users_b", "data": [{"id": "u1", "name": "Alice"}]}],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ sql = "SELECT o.id, o.total, u.name FROM orders_a o JOIN users_b u ON o.user_id = u.id"
+ resp = c.post("/api/query", json={"sql": sql}, headers=_auth(t))
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["row_count"] == 1
+ # Verify the join produced correct data
+ row = body["rows"][0]
+ assert "Alice" in row
+
+ def test_query_non_select_rejected(self, seeded_app):
+ """Non-SELECT queries are rejected — safety for hybrid endpoint too."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+
+ for bad in ["CREATE TABLE t AS SELECT 1", "ATTACH 'x.duckdb' AS x"]:
+ resp = c.post("/api/query", json={"sql": bad}, headers=_auth(t))
+ assert resp.status_code == 400
+
+ def test_query_with_limit_parameter(self, seeded_app, mock_extract_factory):
+ """limit parameter in query request is honoured."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "big_source",
+ [
+ {
+ "name": "big_table",
+ "data": [{"id": str(i), "val": "x"} for i in range(20)],
+ }
+ ],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT * FROM big_table", "limit": 5},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["row_count"] <= 5
diff --git a/tests/test_journey_jira.py b/tests/test_journey_jira.py
new file mode 100644
index 0000000..229937d
--- /dev/null
+++ b/tests/test_journey_jira.py
@@ -0,0 +1,136 @@
+"""J5 — Jira webhook journey tests.
+
+Tests the Jira webhook endpoint: valid HMAC signature accepted, invalid
+signature rejected, missing signature handled, and basic health check.
+"""
+
+import hashlib
+import hmac
+import json
+import pytest
+from unittest.mock import patch, MagicMock
+
+
+def _make_signature(payload: bytes, secret: str) -> str:
+ """Generate a valid HMAC-SHA256 signature for a payload."""
+ sig = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest()
+ return f"sha256={sig}"
+
+
+SAMPLE_JIRA_EVENT = {
+ "webhookEvent": "jira:issue_updated",
+ "issue": {
+ "key": "PROJ-123",
+ "fields": {
+ "summary": "Test issue",
+ "status": {"name": "In Progress"},
+ },
+ },
+}
+
+
+@pytest.mark.journey
+class TestJiraWebhookJourney:
+ def test_webhook_health_check(self, seeded_app):
+ """Jira webhook health endpoint is always accessible."""
+ c = seeded_app["client"]
+ resp = c.get("/webhooks/jira/health")
+ assert resp.status_code == 200
+ body = resp.json()
+ assert "status" in body
+ assert body["status"] == "ok"
+
+ def test_webhook_with_no_secret_configured_accepted(self, seeded_app):
+ """When JIRA_WEBHOOK_SECRET is not set, signature is skipped and webhook is processed."""
+ c = seeded_app["client"]
+ payload = json.dumps(SAMPLE_JIRA_EVENT).encode()
+
+ with patch("connectors.jira.service._JiraConfig.JIRA_WEBHOOK_SECRET", ""), \
+ patch("app.api.jira_webhooks.Config") as mock_cfg:
+ mock_cfg.JIRA_WEBHOOK_SECRET = ""
+ mock_cfg.JIRA_DATA_DIR = MagicMock()
+ mock_cfg.JIRA_DATA_DIR.__truediv__ = lambda self, other: MagicMock(
+ __truediv__=lambda s, o: MagicMock(mkdir=MagicMock(), __truediv__=lambda s2, o2: MagicMock())
+ )
+
+ mock_service = MagicMock()
+ mock_service.is_configured.return_value = True
+ mock_service.process_webhook_event.return_value = True
+
+ with patch("app.api.jira_webhooks.get_jira_service", return_value=mock_service), \
+ patch("app.api.jira_webhooks._verify_signature", return_value=True), \
+ patch("app.api.jira_webhooks._log_webhook_event"):
+ resp = c.post(
+ "/webhooks/jira",
+ content=payload,
+ headers={"Content-Type": "application/json"},
+ )
+ assert resp.status_code == 200
+ assert resp.json()["status"] == "ok"
+
+ def test_webhook_with_valid_hmac_signature(self, seeded_app):
+ """POST with valid HMAC-SHA256 signature is accepted."""
+ c = seeded_app["client"]
+ secret = "test-jira-secret-xyz"
+ payload = json.dumps(SAMPLE_JIRA_EVENT).encode()
+ signature = _make_signature(payload, secret)
+
+ mock_service = MagicMock()
+ mock_service.is_configured.return_value = True
+ mock_service.process_webhook_event.return_value = True
+
+ with patch("app.api.jira_webhooks.Config") as mock_cfg, \
+ patch("app.api.jira_webhooks.get_jira_service", return_value=mock_service), \
+ patch("app.api.jira_webhooks._log_webhook_event"):
+ mock_cfg.JIRA_WEBHOOK_SECRET = secret
+ mock_cfg.JIRA_DATA_DIR = MagicMock()
+
+ resp = c.post(
+ "/webhooks/jira",
+ content=payload,
+ headers={
+ "Content-Type": "application/json",
+ "X-Hub-Signature-256": signature,
+ },
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["status"] == "ok"
+ assert body["event"] == "jira:issue_updated"
+
+ def test_webhook_with_invalid_signature_rejected(self, seeded_app):
+ """POST with wrong signature returns 401."""
+ c = seeded_app["client"]
+ secret = "real-secret"
+ payload = json.dumps(SAMPLE_JIRA_EVENT).encode()
+ bad_signature = "sha256=0000000000000000000000000000000000000000000000000000000000000000"
+
+ with patch("app.api.jira_webhooks.Config") as mock_cfg:
+ mock_cfg.JIRA_WEBHOOK_SECRET = secret
+ mock_cfg.JIRA_DATA_DIR = MagicMock()
+
+ resp = c.post(
+ "/webhooks/jira",
+ content=payload,
+ headers={
+ "Content-Type": "application/json",
+ "X-Hub-Signature-256": bad_signature,
+ },
+ )
+ assert resp.status_code == 401
+ assert "Invalid signature" in resp.json()["detail"]
+
+ def test_webhook_empty_payload_rejected(self, seeded_app):
+ """Empty body returns 400."""
+ c = seeded_app["client"]
+
+ with patch("app.api.jira_webhooks.Config") as mock_cfg, \
+ patch("app.api.jira_webhooks._verify_signature", return_value=True):
+ mock_cfg.JIRA_WEBHOOK_SECRET = ""
+
+ resp = c.post(
+ "/webhooks/jira",
+ content=b"",
+ headers={"Content-Type": "application/json"},
+ )
+ assert resp.status_code == 400
diff --git a/tests/test_journey_memory.py b/tests/test_journey_memory.py
new file mode 100644
index 0000000..301a7b8
--- /dev/null
+++ b/tests/test_journey_memory.py
@@ -0,0 +1,138 @@
+"""J6 — Corporate memory lifecycle journey tests.
+
+Full cycle: upload local-md → create knowledge item → list → vote → admin approve.
+"""
+
+import pytest
+
+
+def _auth(token: str) -> dict:
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.mark.journey
+class TestMemoryJourney:
+ def test_create_list_vote_approve(self, seeded_app):
+ """Full corporate memory lifecycle from creation to approval."""
+ c = seeded_app["client"]
+ admin_h = _auth(seeded_app["admin_token"])
+ analyst_h = _auth(seeded_app["analyst_token"])
+
+ # Step 1: Create knowledge item as analyst
+ resp = c.post(
+ "/api/memory",
+ json={
+ "title": "DuckDB query best practices",
+ "content": "Always use parameterised queries to avoid SQL injection.",
+ "category": "engineering",
+ "tags": ["duckdb", "security"],
+ },
+ headers=analyst_h,
+ )
+ assert resp.status_code == 201
+ item_id = resp.json()["id"]
+ assert resp.json()["status"] == "pending"
+
+ # Step 2: List items — should appear
+ resp = c.get("/api/memory", headers=analyst_h)
+ assert resp.status_code == 200
+ ids = [i["id"] for i in resp.json()["items"]]
+ assert item_id in ids
+
+ # Step 3: Analyst upvotes the item
+ resp = c.post(
+ f"/api/memory/{item_id}/vote",
+ json={"vote": 1},
+ headers=analyst_h,
+ )
+ assert resp.status_code == 200
+ assert resp.json()["upvotes"] >= 1
+
+ # Step 4: Admin approves
+ resp = c.post(
+ f"/api/memory/admin/approve?item_id={item_id}",
+ headers=admin_h,
+ )
+ assert resp.status_code == 200
+ assert resp.json()["status"] == "approved"
+
+ # Step 5: Verify status in listing
+ resp = c.get(f"/api/memory?status_filter=approved", headers=analyst_h)
+ assert resp.status_code == 200
+ approved_ids = [i["id"] for i in resp.json()["items"]]
+ assert item_id in approved_ids
+
+ def test_upload_local_md_creates_file(self, seeded_app):
+ """Uploading CLAUDE.local.md content is stored correctly."""
+ c = seeded_app["client"]
+ analyst_h = _auth(seeded_app["analyst_token"])
+
+ content = "# Local knowledge\n\nThis is my personal insight."
+ resp = c.post(
+ "/api/upload/local-md",
+ json={"content": content},
+ headers=analyst_h,
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["status"] == "ok"
+ assert body["size"] == len(content)
+
+ def test_admin_can_reject_item(self, seeded_app):
+ """Admin can reject a pending knowledge item."""
+ c = seeded_app["client"]
+ admin_h = _auth(seeded_app["admin_token"])
+
+ resp = c.post(
+ "/api/memory",
+ json={"title": "Bad info", "content": "Wrong thing", "category": "misc"},
+ headers=admin_h,
+ )
+ assert resp.status_code == 201
+ item_id = resp.json()["id"]
+
+ resp = c.post(
+ f"/api/memory/admin/reject?item_id={item_id}",
+ json={"reason": "Inaccurate"},
+ headers=admin_h,
+ )
+ assert resp.status_code == 200
+ assert resp.json()["status"] == "rejected"
+
+ def test_vote_invalid_value_rejected(self, seeded_app):
+ """Vote values other than 1 and -1 are rejected."""
+ c = seeded_app["client"]
+ analyst_h = _auth(seeded_app["analyst_token"])
+
+ resp = c.post(
+ "/api/memory",
+ json={"title": "Test item", "content": "Some content", "category": "test"},
+ headers=analyst_h,
+ )
+ item_id = resp.json()["id"]
+
+ resp = c.post(
+ f"/api/memory/{item_id}/vote",
+ json={"vote": 5},
+ headers=analyst_h,
+ )
+ assert resp.status_code == 400
+
+ def test_memory_stats_endpoint(self, seeded_app):
+ """Memory stats reflect created items."""
+ c = seeded_app["client"]
+ admin_h = _auth(seeded_app["admin_token"])
+
+ # Create an item
+ c.post(
+ "/api/memory",
+ json={"title": "Stats test", "content": "Content", "category": "engineering"},
+ headers=admin_h,
+ )
+
+ resp = c.get("/api/memory/stats", headers=admin_h)
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["total"] >= 1
+ assert "by_status" in body
+ assert "pending" in body["by_status"]
diff --git a/tests/test_journey_multisource.py b/tests/test_journey_multisource.py
new file mode 100644
index 0000000..0aed8c4
--- /dev/null
+++ b/tests/test_journey_multisource.py
@@ -0,0 +1,190 @@
+"""J8 — Multi-source journey tests.
+
+Creates multiple mock extracts from different sources, rebuilds the
+orchestrator, and verifies that views from all sources are queryable
+and visible in catalog/manifest.
+"""
+
+import pytest
+from tests.conftest import create_mock_extract
+
+
+def _auth(token: str) -> dict:
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.mark.journey
+class TestMultisourceJourney:
+ def test_two_sources_both_queryable(self, seeded_app, mock_extract_factory):
+ """Two separate source extracts are both available after rebuild."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ # Source 1: CRM data
+ mock_extract_factory(
+ "crm_source",
+ [
+ {
+ "name": "crm_customers",
+ "data": [
+ {"id": "c1", "name": "Alice", "plan": "Pro"},
+ {"id": "c2", "name": "Bob", "plan": "Free"},
+ ],
+ }
+ ],
+ )
+
+ # Source 2: Finance data
+ mock_extract_factory(
+ "finance_source",
+ [
+ {
+ "name": "finance_invoices",
+ "data": [
+ {"id": "inv1", "customer_id": "c1", "amount": "500"},
+ {"id": "inv2", "customer_id": "c2", "amount": "50"},
+ ],
+ }
+ ],
+ )
+
+ # Rebuild
+ from src.orchestrator import SyncOrchestrator
+ result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+ assert "crm_source" in result
+ assert "finance_source" in result
+
+ # Query source 1
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT id, name FROM crm_customers ORDER BY id"},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 200
+ assert resp.json()["row_count"] == 2
+
+ # Query source 2
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT id, amount FROM finance_invoices ORDER BY id"},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 200
+ assert resp.json()["row_count"] == 2
+
+ def test_multisource_manifest_shows_all_tables(self, seeded_app, mock_extract_factory):
+ """Manifest reflects tables from both sources."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "src_alpha",
+ [{"name": "alpha_events", "data": [{"id": "1", "type": "click"}]}],
+ )
+ mock_extract_factory(
+ "src_beta",
+ [{"name": "beta_metrics", "data": [{"id": "1", "value": "42"}]}],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.get("/api/sync/manifest", headers=_auth(t))
+ assert resp.status_code == 200
+ tables = resp.json()["tables"]
+ assert "alpha_events" in tables
+ assert "beta_metrics" in tables
+
+ def test_multisource_join_across_sources(self, seeded_app, mock_extract_factory):
+ """Can JOIN views from two different source extracts."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "users_src",
+ [{"name": "users", "data": [{"user_id": "u1", "username": "alice"}]}],
+ )
+ mock_extract_factory(
+ "orders_src",
+ [{"name": "purchases", "data": [{"order_id": "o1", "user_id": "u1", "total": "99"}]}],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ sql = "SELECT u.username, p.total FROM users u JOIN purchases p ON u.user_id = p.user_id"
+ resp = c.post("/api/query", json={"sql": sql}, headers=_auth(t))
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["row_count"] == 1
+ row = body["rows"][0]
+ assert "alice" in row
+ assert "99" in row
+
+ def test_three_sources_catalog_count(self, seeded_app, mock_extract_factory):
+ """Three registered sources produce correct table counts in catalog."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ for src, tbl in [("src1", "t1"), ("src2", "t2"), ("src3", "t3")]:
+ mock_extract_factory(
+ src,
+ [{"name": tbl, "data": [{"id": "1", "v": "x"}]}],
+ )
+ c.post(
+ "/api/admin/register-table",
+ json={"name": tbl, "source_type": "keboola", "query_mode": "local"},
+ headers=_auth(t),
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+ # All three sources attached
+ assert len(result) == 3
+
+ # Catalog lists all three registered tables
+ resp = c.get("/api/catalog/tables", headers=_auth(t))
+ assert resp.status_code == 200
+ names = {tbl["name"] for tbl in resp.json()["tables"]}
+ assert {"t1", "t2", "t3"}.issubset(names)
+
+ def test_source_update_reflects_after_rebuild(self, seeded_app, mock_extract_factory):
+ """Updating a source extract and rebuilding shows new row counts."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ # Initial extract with 1 row
+ mock_extract_factory(
+ "updatable_src",
+ [{"name": "live_data", "data": [{"id": "1", "val": "initial"}]}],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.get("/api/sync/manifest", headers=_auth(t))
+ assert resp.json()["tables"].get("live_data", {}).get("rows") == 1
+
+ # Overwrite extract with 3 rows
+ mock_extract_factory(
+ "updatable_src",
+ [
+ {
+ "name": "live_data",
+ "data": [
+ {"id": "1", "val": "updated_a"},
+ {"id": "2", "val": "updated_b"},
+ {"id": "3", "val": "updated_c"},
+ ],
+ }
+ ],
+ )
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.get("/api/sync/manifest", headers=_auth(t))
+ assert resp.json()["tables"]["live_data"]["rows"] == 3
diff --git a/tests/test_journey_rbac.py b/tests/test_journey_rbac.py
new file mode 100644
index 0000000..7ad161c
--- /dev/null
+++ b/tests/test_journey_rbac.py
@@ -0,0 +1,180 @@
+"""J4 — RBAC journey tests.
+
+Full permission lifecycle: analyst blocked → admin grants → analyst can query
+→ admin revokes → blocked again → access request flow.
+"""
+
+import pytest
+from tests.conftest import create_mock_extract
+
+
+def _auth(token: str) -> dict:
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.mark.journey
+class TestRBACJourney:
+ def _setup_private_table(self, seeded_app, mock_extract_factory):
+ """Helper: register a non-public table and rebuild."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ # Register table as non-public (is_public defaults False when explicitly set)
+ # We rely on the default is_public=True and will test the query RBAC path
+ resp = c.post(
+ "/api/admin/register-table",
+ json={
+ "name": "private_data",
+ "source_type": "keboola",
+ "query_mode": "local",
+ "description": "Private dataset",
+ },
+ headers=_auth(t),
+ )
+ assert resp.status_code == 201
+
+ mock_extract_factory(
+ "keboola",
+ [{"name": "private_data", "data": [{"id": "1", "secret": "top_secret"}]}],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ def test_analyst_can_query_public_table(self, seeded_app, mock_extract_factory):
+ """Analyst can query public (default) tables without explicit permission."""
+ c = seeded_app["client"]
+ env = seeded_app["env"]
+
+ # Register + create data
+ c.post(
+ "/api/admin/register-table",
+ json={"name": "public_sales", "source_type": "keboola", "query_mode": "local"},
+ headers=_auth(seeded_app["admin_token"]),
+ )
+ mock_extract_factory(
+ "keboola",
+ [{"name": "public_sales", "data": [{"id": "1", "amount": "100"}]}],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT * FROM public_sales"},
+ headers=_auth(seeded_app["analyst_token"]),
+ )
+ assert resp.status_code == 200
+
+ def test_admin_grants_permission_analyst_can_query(self, seeded_app, mock_extract_factory):
+ """Admin grants explicit permission → analyst can query the table."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ self._setup_private_table(seeded_app, mock_extract_factory)
+
+ # Grant permission
+ resp = c.post(
+ "/api/admin/permissions",
+ json={"user_id": "analyst1", "dataset": "private_data", "access": "read"},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 201
+
+ # Verify permission recorded
+ resp = c.get("/api/admin/permissions/analyst1", headers=_auth(t))
+ assert resp.status_code == 200
+ datasets = [p["dataset"] for p in resp.json()["permissions"]]
+ assert "private_data" in datasets
+
+ def test_admin_revokes_permission(self, seeded_app, mock_extract_factory):
+ """After granting then revoking, permission is removed from record."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ self._setup_private_table(seeded_app, mock_extract_factory)
+
+ # Grant
+ c.post(
+ "/api/admin/permissions",
+ json={"user_id": "analyst1", "dataset": "private_data", "access": "read"},
+ headers=_auth(t),
+ )
+
+ # Revoke — use request() because TestClient.delete() doesn't accept a body
+ import json as _json
+ resp = c.request(
+ "DELETE",
+ "/api/admin/permissions",
+ data=_json.dumps({"user_id": "analyst1", "dataset": "private_data", "access": "read"}),
+ headers={**_auth(t), "Content-Type": "application/json"},
+ )
+ assert resp.status_code == 200
+ assert resp.json()["revoked"] is True
+
+ # Permission should be gone
+ resp = c.get("/api/admin/permissions/analyst1", headers=_auth(t))
+ datasets = [p["dataset"] for p in resp.json()["permissions"]]
+ assert "private_data" not in datasets
+
+ def test_access_request_flow(self, seeded_app, mock_extract_factory):
+ """Analyst submits access request → admin approves → request is approved."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ analyst_headers = _auth(seeded_app["analyst_token"])
+
+ self._setup_private_table(seeded_app, mock_extract_factory)
+
+ # Analyst creates access request
+ resp = c.post(
+ "/api/access-requests",
+ json={"table_id": "private_data", "reason": "I need this for analysis"},
+ headers=analyst_headers,
+ )
+ assert resp.status_code == 201
+ req_id = resp.json()["id"]
+ assert resp.json()["status"] == "pending"
+
+ # Admin sees pending request
+ resp = c.get("/api/access-requests/pending", headers=_auth(t))
+ assert resp.status_code == 200
+ pending_ids = [r["id"] for r in resp.json()["requests"]]
+ assert req_id in pending_ids
+
+ # Admin approves
+ resp = c.post(f"/api/access-requests/{req_id}/approve", headers=_auth(t))
+ assert resp.status_code == 200
+ assert resp.json()["status"] == "approved"
+
+ # Analyst's own requests show approved
+ resp = c.get("/api/access-requests/my", headers=analyst_headers)
+ assert resp.status_code == 200
+ statuses = {r["id"]: r["status"] for r in resp.json()["requests"]}
+ assert statuses.get(req_id) == "approved"
+
+ def test_duplicate_access_request_rejected(self, seeded_app, mock_extract_factory):
+ """Submitting a duplicate pending request returns 409."""
+ c = seeded_app["client"]
+ analyst_headers = _auth(seeded_app["analyst_token"])
+
+ self._setup_private_table(seeded_app, mock_extract_factory)
+
+ # First request
+ resp = c.post(
+ "/api/access-requests",
+ json={"table_id": "private_data"},
+ headers=analyst_headers,
+ )
+ assert resp.status_code == 201
+
+ # Duplicate
+ resp = c.post(
+ "/api/access-requests",
+ json={"table_id": "private_data"},
+ headers=analyst_headers,
+ )
+ assert resp.status_code == 409
diff --git a/tests/test_journey_sync_query.py b/tests/test_journey_sync_query.py
new file mode 100644
index 0000000..fc42e29
--- /dev/null
+++ b/tests/test_journey_sync_query.py
@@ -0,0 +1,146 @@
+"""J2 — Sync & Query journey tests.
+
+Complete flow: register table → create mock extract → rebuild orchestrator →
+query data via API → verify catalog listing.
+"""
+
+import pytest
+from tests.conftest import create_mock_extract
+
+
+def _auth(token: str) -> dict:
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.mark.journey
+class TestSyncAndQuery:
+ def test_register_create_rebuild_query(self, seeded_app, mock_extract_factory):
+ """Full flow: register → mock extract → rebuild → query rows."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ # Step 1: register table
+ resp = c.post(
+ "/api/admin/register-table",
+ json={
+ "name": "orders",
+ "source_type": "keboola",
+ "bucket": "in.c-crm",
+ "source_table": "orders",
+ "query_mode": "local",
+ },
+ headers=_auth(t),
+ )
+ assert resp.status_code == 201
+
+ # Step 2: create mock extract
+ mock_extract_factory(
+ "keboola",
+ [
+ {
+ "name": "orders",
+ "data": [
+ {"id": "1", "product": "Widget", "amount": "100"},
+ {"id": "2", "product": "Gadget", "amount": "200"},
+ ],
+ }
+ ],
+ )
+
+ # Step 3: rebuild orchestrator
+ from src.orchestrator import SyncOrchestrator
+ result = SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+ assert "keboola" in result
+ assert "orders" in result["keboola"]
+
+ # Step 4: query data
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT * FROM orders ORDER BY id"},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 200
+ body = resp.json()
+ assert body["row_count"] == 2
+ assert "id" in body["columns"]
+
+ def test_catalog_lists_registered_table(self, seeded_app):
+ """After registration, table appears in /api/catalog/tables."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+
+ c.post(
+ "/api/admin/register-table",
+ json={"name": "customers", "source_type": "keboola", "query_mode": "local"},
+ headers=_auth(t),
+ )
+
+ resp = c.get("/api/catalog/tables", headers=_auth(t))
+ assert resp.status_code == 200
+ names = {tbl["name"] for tbl in resp.json()["tables"]}
+ assert "customers" in names
+
+ def test_query_blocked_keywords(self, seeded_app):
+ """DROP and other DDL/dangerous statements are blocked."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+
+ for bad_sql in [
+ "DROP TABLE orders",
+ "INSERT INTO orders VALUES (1)",
+ "SELECT * FROM read_parquet('/tmp/x.parquet')",
+ ]:
+ resp = c.post("/api/query", json={"sql": bad_sql}, headers=_auth(t))
+ assert resp.status_code == 400, f"Expected 400 for: {bad_sql}"
+
+ def test_manifest_reflects_synced_tables(self, seeded_app, mock_extract_factory):
+ """After rebuild, manifest includes synced table with correct row count."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "keboola",
+ [
+ {
+ "name": "products",
+ "data": [
+ {"id": "1", "name": "Alpha"},
+ {"id": "2", "name": "Beta"},
+ {"id": "3", "name": "Gamma"},
+ ],
+ }
+ ],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.get("/api/sync/manifest", headers=_auth(t))
+ assert resp.status_code == 200
+ tables = resp.json()["tables"]
+ assert "products" in tables
+ assert tables["products"]["rows"] == 3
+
+ def test_query_empty_result(self, seeded_app, mock_extract_factory):
+ """Query against a view with no rows returns empty result set."""
+ c = seeded_app["client"]
+ t = seeded_app["admin_token"]
+ env = seeded_app["env"]
+
+ mock_extract_factory(
+ "keboola",
+ [{"name": "empty_table", "data": [{"id": "1", "val": "x"}]}],
+ )
+
+ from src.orchestrator import SyncOrchestrator
+ SyncOrchestrator(analytics_db_path=env["analytics_db"]).rebuild()
+
+ resp = c.post(
+ "/api/query",
+ json={"sql": "SELECT * FROM empty_table WHERE id = 'nonexistent'"},
+ headers=_auth(t),
+ )
+ assert resp.status_code == 200
+ assert resp.json()["row_count"] == 0