From 617e724d210eaf78a3e26fe3ca594b34e2328ecf Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 31 Mar 2026 08:18:54 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20add=20E2E=20test=20suite=20=E2=80=94=20?= =?UTF-8?q?API,=20extractor,=20Docker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tests/conftest.py: shared fixtures (e2e_env, seeded_app, create_mock_extract) tests/test_e2e_api.py: 11 tests — full sync flow, RBAC, table lifecycle tests/test_e2e_extract.py: 6 tests — Keboola/BQ/Jira pipelines, multi-source, corrupt handling tests/test_e2e_docker.py: 3 tests — Docker health + full flow (opt-in via -m docker) Fix admin update route (duplicate id kwarg, .dict() → .model_dump()). 705 tests passing. --- app/api/admin.py | 8 +- pytest.ini | 3 +- tests/conftest.py | 118 ++++++++++++++++ tests/test_e2e_api.py | 200 ++++++++++++++++++++++++++ tests/test_e2e_docker.py | 114 +++++++++++++++ tests/test_e2e_extract.py | 286 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 726 insertions(+), 3 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_e2e_api.py create mode 100644 tests/test_e2e_docker.py create mode 100644 tests/test_e2e_extract.py diff --git a/app/api/admin.py b/app/api/admin.py index a5968bc..9fb46fc 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -121,9 +121,13 @@ async def update_table( if not repo.get(table_id): raise HTTPException(status_code=404, detail="Table not found") - updates = {k: v for k, v in request.dict().items() if v is not None} + updates = {k: v for k, v in request.model_dump().items() if v is not None} if updates: - repo.register(id=table_id, **{**repo.get(table_id), **updates}) + existing = repo.get(table_id) + merged = {k: v for k, v in existing.items() if k != "registered_at"} + merged.update(updates) + merged.pop("id", None) # avoid duplicate id kwarg + repo.register(id=table_id, **merged) return {"id": table_id, "updated": list(updates.keys())} diff --git a/pytest.ini b/pytest.ini index fbcda6f..3aa5f82 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,5 @@ [pytest] -addopts = -m "not live" +addopts = -m "not live and not docker" markers = live: tests requiring server access (run with '-m live') + docker: tests requiring Docker (run with '-m docker') diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e5443a9 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,118 @@ +"""Shared test fixtures for E2E tests.""" + +import os +from pathlib import Path + +import duckdb +import pytest + + +@pytest.fixture +def e2e_env(tmp_path): + """Set up complete E2E environment with DATA_DIR, create dirs.""" + os.environ["DATA_DIR"] = str(tmp_path) + os.environ["JWT_SECRET_KEY"] = "test-secret-e2e" + + (tmp_path / "extracts").mkdir() + (tmp_path / "analytics").mkdir() + (tmp_path / "state").mkdir() + + yield { + "data_dir": tmp_path, + "extracts_dir": tmp_path / "extracts", + "analytics_db": str(tmp_path / "analytics" / "server.duckdb"), + } + + os.environ.pop("DATA_DIR", None) + os.environ.pop("JWT_SECRET_KEY", None) + + +def create_mock_extract(extracts_dir: Path, source_name: str, tables: list[dict]): + """Create a mock extract.duckdb with _meta and data tables. + + tables: [{"name": "orders", "data": [{"id": "1", "total": "100"}], "query_mode": "local"}] + """ + source_dir = extracts_dir / source_name + source_dir.mkdir(exist_ok=True) + data_dir = source_dir / "data" + data_dir.mkdir(exist_ok=True) + + db_path = source_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + + conn.execute("""CREATE TABLE IF NOT EXISTS _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'local' + )""") + # Delete existing meta rows to allow re-calling + conn.execute("DELETE FROM _meta") + + for t in tables: + name = t["name"] + rows_data = t.get("data", []) + query_mode = t.get("query_mode", "local") + + if rows_data and query_mode == "local": + # Write actual parquet file + pq_path = str(data_dir / f"{name}.parquet") + # Build SQL from data + selects = [] + for row in rows_data: + vals = ", ".join(f"'{v}' AS {k}" for k, v in row.items()) + selects.append(f"SELECT {vals}") + union_sql = " UNION ALL ".join(selects) + conn.execute(f"COPY ({union_sql}) TO '{pq_path}' (FORMAT PARQUET)") + + rows = len(rows_data) + size = os.path.getsize(pq_path) + conn.execute(f'CREATE OR REPLACE VIEW "{name}" AS SELECT * FROM read_parquet(\'{pq_path}\')') + conn.execute("INSERT INTO _meta VALUES (?, ?, ?, ?, current_timestamp, 'local')", + [name, t.get("description", ""), rows, size]) + else: + # Remote or empty table + conn.execute(f'CREATE TABLE IF NOT EXISTS "{name}" (id VARCHAR)') + conn.execute("INSERT INTO _meta VALUES (?, ?, 0, 0, current_timestamp, ?)", + [name, t.get("description", ""), query_mode]) + + conn.close() + return db_path + + +def write_test_parquet(path: str, data: list[dict]): + """Create a parquet file from list of dicts.""" + conn = duckdb.connect() + selects = [] + for row in data: + vals = ", ".join(f"'{v}' AS {k}" for k, v in row.items()) + selects.append(f"SELECT {vals}") + union_sql = " UNION ALL ".join(selects) + conn.execute(f"COPY ({union_sql}) TO '{path}' (FORMAT PARQUET)") + conn.close() + + +@pytest.fixture +def seeded_app(e2e_env): + """FastAPI TestClient with seeded admin + analyst users, JWT tokens.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + from app.auth.jwt import create_access_token + from app.main import create_app + from fastapi.testclient import TestClient + + conn = get_system_db() + repo = UserRepository(conn) + repo.create(id="admin1", email="admin@test.com", name="Admin", role="admin") + repo.create(id="analyst1", email="analyst@test.com", name="Analyst", role="analyst") + conn.close() + + app = create_app() + client = TestClient(app) + admin_token = create_access_token("admin1", "admin@test.com", "admin") + analyst_token = create_access_token("analyst1", "analyst@test.com", "analyst") + + return { + "client": client, + "admin_token": admin_token, + "analyst_token": analyst_token, + "env": e2e_env, + } diff --git a/tests/test_e2e_api.py b/tests/test_e2e_api.py new file mode 100644 index 0000000..3864d73 --- /dev/null +++ b/tests/test_e2e_api.py @@ -0,0 +1,200 @@ +"""E2E API tests — full server-side flow via FastAPI TestClient.""" + +import os +import tempfile +from pathlib import Path + +import duckdb +import pytest + +from tests.conftest import create_mock_extract + + +def _auth(token): + return {"Authorization": f"Bearer {token}"} + + +class TestFullSyncFlow: + """Complete flow: register -> extract -> manifest -> download.""" + + def test_register_tables_and_get_catalog(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["admin_token"] + env = seeded_app["env"] + + # Register tables + resp = c.post("/api/admin/register-table", json={ + "name": "orders", "source_type": "keboola", "bucket": "in.c-crm", + "source_table": "orders", "query_mode": "local", + }, headers=_auth(t)) + assert resp.status_code == 201 + + resp = c.post("/api/admin/register-table", json={ + "name": "customers", "source_type": "keboola", "bucket": "in.c-crm", + "source_table": "customers", "query_mode": "local", + }, headers=_auth(t)) + assert resp.status_code == 201 + + # Verify catalog + resp = c.get("/api/catalog/tables", headers=_auth(t)) + assert resp.status_code == 200 + tables = resp.json()["tables"] + names = {tbl["name"] for tbl in tables} + assert "orders" in names + assert "customers" in names + + def test_manifest_after_extract(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["admin_token"] + env = seeded_app["env"] + + # Create mock extract with real data + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [ + {"id": "1", "product": "Widget", "price": "99.99"}, + {"id": "2", "product": "Gadget", "price": "49.99"}, + ]}, + {"name": "customers", "data": [ + {"id": "1", "name": "Alice", "email": "alice@test.com"}, + ]}, + ]) + + # Run orchestrator to populate sync_state + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Check manifest + resp = c.get("/api/sync/manifest", headers=_auth(t)) + assert resp.status_code == 200 + manifest = resp.json() + assert "orders" in manifest["tables"] + assert "customers" in manifest["tables"] + assert manifest["tables"]["orders"]["rows"] == 2 + assert manifest["tables"]["customers"]["rows"] == 1 + assert "server_time" in manifest + + def test_download_parquet_and_verify_content(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["admin_token"] + env = seeded_app["env"] + + # Create extract + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [ + {"id": "1", "product": "Widget", "price": "99.99"}, + {"id": "2", "product": "Gadget", "price": "49.99"}, + ]}, + ]) + + # Download parquet + resp = c.get("/api/data/orders/download", headers=_auth(t)) + assert resp.status_code == 200 + assert "application/octet-stream" in resp.headers.get("content-type", "") + + # Verify content by writing to temp file and reading with DuckDB + with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as f: + f.write(resp.content) + tmp_path = f.name + + try: + conn = duckdb.connect() + rows = conn.execute(f"SELECT * FROM read_parquet('{tmp_path}') ORDER BY id").fetchall() + conn.close() + assert len(rows) == 2 + assert rows[0][1] == "Widget" # product column + assert rows[1][1] == "Gadget" + finally: + os.unlink(tmp_path) + + def test_download_nonexistent_table_404(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["admin_token"] + resp = c.get("/api/data/nonexistent/download", headers=_auth(t)) + assert resp.status_code == 404 + + +class TestRBACEnforcement: + """Verify role-based access control across API endpoints.""" + + def test_analyst_cannot_register_table(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["analyst_token"] + resp = c.post("/api/admin/register-table", json={ + "name": "test", "source_type": "keboola", + }, headers=_auth(t)) + assert resp.status_code == 403 + + def test_analyst_can_read_manifest(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["analyst_token"] + resp = c.get("/api/sync/manifest", headers=_auth(t)) + assert resp.status_code == 200 + + def test_analyst_can_download_data(self, seeded_app): + c = seeded_app["client"] + env = seeded_app["env"] + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1"}]}, + ]) + t = seeded_app["analyst_token"] + resp = c.get("/api/data/orders/download", headers=_auth(t)) + assert resp.status_code == 200 + + def test_admin_can_trigger_sync(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["admin_token"] + resp = c.post("/api/sync/trigger", headers=_auth(t)) + assert resp.status_code == 200 + + def test_analyst_cannot_trigger_sync(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["analyst_token"] + resp = c.post("/api/sync/trigger", headers=_auth(t)) + assert resp.status_code == 403 + + def test_unauthenticated_blocked(self, seeded_app): + c = seeded_app["client"] + resp = c.get("/api/sync/manifest") + assert resp.status_code == 401 + + +class TestTableLifecycle: + """Register -> update -> delete table via admin API.""" + + def test_full_lifecycle(self, seeded_app): + c = seeded_app["client"] + t = seeded_app["admin_token"] + + # Create + resp = c.post("/api/admin/register-table", json={ + "name": "lifecycle_test", "source_type": "keboola", + "query_mode": "local", "description": "Test table", + }, headers=_auth(t)) + assert resp.status_code == 201 + table_id = resp.json()["id"] + + # Read + resp = c.get("/api/admin/registry", headers=_auth(t)) + assert resp.status_code == 200 + names = {tbl["name"] for tbl in resp.json()["tables"]} + assert "lifecycle_test" in names + + # Update + resp = c.put(f"/api/admin/registry/{table_id}", json={ + "query_mode": "remote", + }, headers=_auth(t)) + assert resp.status_code == 200 + + # Verify update + resp = c.get("/api/admin/registry", headers=_auth(t)) + table = next(tbl for tbl in resp.json()["tables"] if tbl["id"] == table_id) + assert table["query_mode"] == "remote" + + # Delete + resp = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(t)) + assert resp.status_code == 204 + + # Verify gone + resp = c.get("/api/admin/registry", headers=_auth(t)) + ids = {tbl["id"] for tbl in resp.json()["tables"]} + assert table_id not in ids diff --git a/tests/test_e2e_docker.py b/tests/test_e2e_docker.py new file mode 100644 index 0000000..0162a20 --- /dev/null +++ b/tests/test_e2e_docker.py @@ -0,0 +1,114 @@ +"""E2E Docker tests — spin up containers, test API from outside. + +Run with: pytest tests/test_e2e_docker.py -m docker -v +Requires: Docker and docker compose installed. +""" + +import os +import subprocess +import time + +import pytest + +# Skip all tests in this module if docker marker not selected +pytestmark = pytest.mark.docker + +COMPOSE_FILE = "docker-compose.test.yml" +BASE_URL = "http://localhost:8000" + + +def _docker_compose(*args, timeout=60): + """Run docker compose command.""" + cmd = ["docker", "compose", "-f", COMPOSE_FILE] + list(args) + return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + + +def _wait_for_health(url, timeout=30): + """Poll health endpoint until it responds 200.""" + import httpx + deadline = time.time() + timeout + while time.time() < deadline: + try: + resp = httpx.get(f"{url}/api/health", timeout=5) + if resp.status_code == 200: + return True + except Exception: + pass + time.sleep(1) + return False + + +@pytest.fixture(scope="module") +def docker_env(): + """Start docker compose, yield, then tear down.""" + # Check docker is available + result = subprocess.run(["docker", "info"], capture_output=True, timeout=10) + if result.returncode != 0: + pytest.skip("Docker not available") + + # Check compose file exists + if not os.path.exists(COMPOSE_FILE): + pytest.skip(f"{COMPOSE_FILE} not found") + + # Start services + _docker_compose("up", "-d", "--build") + + # Wait for health + if not _wait_for_health(BASE_URL, timeout=60): + # Capture logs for debugging + logs = _docker_compose("logs") + _docker_compose("down", "-v") + pytest.fail(f"Service did not become healthy.\nLogs:\n{logs.stdout}") + + yield BASE_URL + + # Teardown + _docker_compose("down", "-v") + + +class TestDockerHealth: + def test_health_endpoint(self, docker_env): + import httpx + resp = httpx.get(f"{docker_env}/api/health") + assert resp.status_code == 200 + data = resp.json() + assert data.get("status") in ("ok", "healthy") + + def test_health_has_duckdb(self, docker_env): + import httpx + resp = httpx.get(f"{docker_env}/api/health") + data = resp.json() + checks = data.get("checks", {}) + assert "duckdb" in checks or "database" in checks + + +class TestDockerFullFlow: + def test_register_and_query_flow(self, docker_env): + import httpx + url = docker_env + + # Get auth token + resp = httpx.post(f"{url}/auth/token", json={"email": "admin@test.com"}) + if resp.status_code != 200: + # Auto-create user first if needed + pytest.skip("Auth setup required — no admin user in Docker env") + + token = resp.json().get("token", "") + headers = {"Authorization": f"Bearer {token}"} + + # Register a table + resp = httpx.post(f"{url}/api/admin/register-table", json={ + "name": "docker_test", "source_type": "keboola", "query_mode": "local", + }, headers=headers) + assert resp.status_code in (201, 409) # 409 if already exists + + # Get registry + resp = httpx.get(f"{url}/api/admin/registry", headers=headers) + assert resp.status_code == 200 + assert resp.json()["count"] >= 1 + + # Get manifest + resp = httpx.get(f"{url}/api/sync/manifest", headers=headers) + assert resp.status_code == 200 + assert "tables" in resp.json() + assert "server_time" in resp.json() diff --git a/tests/test_e2e_extract.py b/tests/test_e2e_extract.py new file mode 100644 index 0000000..76b3b6f --- /dev/null +++ b/tests/test_e2e_extract.py @@ -0,0 +1,286 @@ +"""E2E tests — extractor + orchestrator pipeline.""" + +import os +from pathlib import Path +from unittest.mock import patch, MagicMock + +import duckdb +import pytest + + +class TestKeboolaExtractToQuery: + """Keboola extractor -> extract.duckdb -> orchestrator -> queryable views.""" + + def test_full_pipeline(self, e2e_env): + env = e2e_env + + # 1. Register table in registry + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + conn = get_system_db() + repo = TableRegistryRepository(conn) + repo.register(id="orders", name="orders", source_type="keboola", + bucket="in.c-crm", source_table="orders", query_mode="local") + tables = repo.list_by_source("keboola") + conn.close() + + # 2. Run extractor (mock the DuckDB extension) + from connectors.keboola.extractor import run as keboola_run + + def mock_legacy(tc, pq_path, keboola_url, keboola_token): + local = duckdb.connect() + local.execute( + f"COPY (SELECT '1' AS id, 'Widget' AS product, '99.99' AS price " + f"UNION ALL SELECT '2', 'Gadget', '49.99') TO '{pq_path}' (FORMAT PARQUET)" + ) + local.close() + + output = str(env["extracts_dir"] / "keboola") + with patch("connectors.keboola.extractor._try_attach_extension", return_value=False), \ + patch("connectors.keboola.extractor._extract_via_legacy", side_effect=mock_legacy): + result = keboola_run(output, tables, "https://example.com", "fake-token") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 0 + + # 3. Verify extract.duckdb + ext_conn = duckdb.connect(str(env["extracts_dir"] / "keboola" / "extract.duckdb")) + meta = ext_conn.execute("SELECT table_name, rows, query_mode FROM _meta").fetchall() + assert len(meta) == 1 + assert meta[0][0] == "orders" + assert meta[0][1] == 2 + ext_conn.close() + + # 4. Run orchestrator + from src.orchestrator import SyncOrchestrator + orch = SyncOrchestrator(analytics_db_path=env["analytics_db"]) + result = orch.rebuild() + assert "keboola" in result + assert "orders" in result["keboola"] + + # 5. Verify sync_state updated + conn2 = get_system_db() + from src.repositories.sync_state import SyncStateRepository + state = SyncStateRepository(conn2).get_table_state("orders") + assert state is not None + assert state["rows"] == 2 + conn2.close() + + # 6. Verify data queryable via extract.duckdb + ext_conn2 = duckdb.connect(str(env["extracts_dir"] / "keboola" / "extract.duckdb")) + rows = ext_conn2.execute("SELECT product FROM orders ORDER BY id").fetchall() + assert rows[0][0] == "Widget" + assert rows[1][0] == "Gadget" + ext_conn2.close() + + +class TestBigQueryRemoteExtract: + """BigQuery extractor -- remote only, no data download.""" + + def test_remote_only_pipeline(self, e2e_env): + env = e2e_env + output = str(env["extracts_dir"] / "bigquery") + + table_configs = [ + {"name": "page_views", "bucket": "analytics", "source_table": "page_views", "description": "Web traffic"}, + {"name": "sessions", "bucket": "analytics", "source_table": "sessions", "description": "User sessions"}, + ] + + from connectors.bigquery import extractor as bq_mod + + # Build extract.duckdb manually to simulate what the BQ extractor would produce, + # since the real DuckDB BigQuery extension is not available in test environments. + output_path = Path(output) + output_path.mkdir(parents=True, exist_ok=True) + db_path = output_path / "extract.duckdb" + + conn = duckdb.connect(str(db_path)) + bq_mod._create_meta_table(conn) + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + for tc in table_configs: + name = tc["name"] + # Create a placeholder table (no actual BQ data) + conn.execute(f'CREATE OR REPLACE TABLE "{name}" (dummy INTEGER)') + conn.execute( + "INSERT INTO _meta VALUES (?, ?, 0, 0, ?, 'remote')", + [name, tc.get("description", ""), now], + ) + conn.close() + + # Verify _meta + conn = duckdb.connect(str(db_path)) + meta = conn.execute("SELECT table_name, query_mode FROM _meta ORDER BY table_name").fetchall() + assert len(meta) == 2 + assert all(m[1] == "remote" for m in meta) + conn.close() + + # Verify no parquet files + data_dir = env["extracts_dir"] / "bigquery" / "data" + assert not data_dir.exists() or not list(data_dir.glob("*.parquet")) + + # Verify orchestrator picks up remote tables + from src.orchestrator import SyncOrchestrator + orch = SyncOrchestrator(analytics_db_path=env["analytics_db"]) + result = orch.rebuild() + assert "bigquery" in result + assert "page_views" in result["bigquery"] + assert "sessions" in result["bigquery"] + + +class TestJiraWebhookToQuery: + """Jira webhook -> incremental parquet -> extract.duckdb -> query.""" + + def test_jira_incremental_flow(self, e2e_env): + env = e2e_env + jira_dir = env["extracts_dir"] / "jira" + + # 1. Init Jira extract + from connectors.jira.extract_init import init_extract, update_meta + init_extract(jira_dir) + + # 2. Simulate incremental_transform: write a parquet to data/issues/ + issues_dir = jira_dir / "data" / "issues" + pq_path = str(issues_dir / "2026-03.parquet") + tmp = duckdb.connect() + tmp.execute( + f"COPY (SELECT 'PROJ-1' AS issue_key, 'Bug' AS type, 'Fix login' AS summary) " + f"TO '{pq_path}' (FORMAT PARQUET)" + ) + tmp.close() + + # 3. Update _meta + update_meta(jira_dir, "issues") + + # 4. Verify _meta updated + conn = duckdb.connect(str(jira_dir / "extract.duckdb")) + meta = conn.execute("SELECT rows FROM _meta WHERE table_name='issues'").fetchone() + assert meta[0] == 1 + + # 5. Verify view works + row = conn.execute("SELECT issue_key, summary FROM issues").fetchone() + assert row[0] == "PROJ-1" + assert row[1] == "Fix login" + conn.close() + + # 6. Orchestrator picks it up + from src.orchestrator import SyncOrchestrator + orch = SyncOrchestrator(analytics_db_path=env["analytics_db"]) + result = orch.rebuild() + assert "jira" in result + assert "issues" in result["jira"] + + +class TestMultiSourceOrchestration: + """Multiple sources -> single analytics.duckdb.""" + + def test_three_sources(self, e2e_env): + env = e2e_env + from tests.conftest import create_mock_extract + + # Keboola: 2 tables + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1", "total": "100"}]}, + {"name": "customers", "data": [{"id": "1", "name": "Alice"}]}, + ]) + + # Jira: 1 table + create_mock_extract(env["extracts_dir"], "jira", [ + {"name": "issues", "data": [{"issue_key": "PROJ-1"}]}, + ]) + + # Rebuild + from src.orchestrator import SyncOrchestrator + orch = SyncOrchestrator(analytics_db_path=env["analytics_db"]) + result = orch.rebuild() + + assert len(result) == 2 # keboola + jira + total_tables = sum(len(v) for v in result.values()) + assert total_tables == 3 # orders + customers + issues + + # Verify sync_state + from src.db import get_system_db + from src.repositories.sync_state import SyncStateRepository + conn = get_system_db() + states = SyncStateRepository(conn).get_all_states() + conn.close() + table_ids = {s["table_id"] for s in states} + assert {"orders", "customers", "issues"}.issubset(table_ids) + + +class TestCorruptExtractHandling: + """Orchestrator gracefully handles corrupt extract.duckdb.""" + + def test_skips_corrupt_continues_valid(self, e2e_env): + env = e2e_env + from tests.conftest import create_mock_extract + + # Valid source + create_mock_extract(env["extracts_dir"], "keboola", [ + {"name": "orders", "data": [{"id": "1"}]}, + ]) + + # Corrupt source: write garbage to extract.duckdb + corrupt_dir = env["extracts_dir"] / "broken" + corrupt_dir.mkdir() + (corrupt_dir / "extract.duckdb").write_bytes(b"this is not a duckdb file") + + from src.orchestrator import SyncOrchestrator + orch = SyncOrchestrator(analytics_db_path=env["analytics_db"]) + result = orch.rebuild() + + # Keboola should work, broken should be skipped + assert "keboola" in result + assert "orders" in result["keboola"] + assert "broken" not in result or result.get("broken") == [] + + +class TestSchemaMigration: + """Schema v1->v2 migration preserves data and adds new columns.""" + + def test_migration_preserves_and_extends(self, e2e_env): + env = e2e_env + + # Create a v1-style database manually + db_path = env["data_dir"] / "state" / "system.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute("CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP)") + conn.execute("INSERT INTO schema_version VALUES (1, current_timestamp)") + conn.execute("""CREATE TABLE table_registry ( + id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL, folder VARCHAR, + sync_strategy VARCHAR, primary_key VARCHAR, description TEXT, + registered_by VARCHAR, registered_at TIMESTAMP DEFAULT current_timestamp + )""") + conn.execute("INSERT INTO table_registry (id, name, folder) VALUES ('old_table', 'Old', 'legacy')") + # Create minimal required tables + for tbl in ["users", "sync_state", "sync_history", "user_sync_settings", + "knowledge_items", "knowledge_votes", "audit_log", "telegram_links", + "pending_codes", "script_registry", "table_profiles", "dataset_permissions"]: + conn.execute(f"CREATE TABLE IF NOT EXISTS {tbl} (id VARCHAR PRIMARY KEY)") + conn.close() + + # Open via get_system_db -> triggers migration + from src.db import get_system_db, get_schema_version + conn2 = get_system_db() + + assert get_schema_version(conn2) == 2 + + # Old data preserved + old = conn2.execute("SELECT name, folder FROM table_registry WHERE id='old_table'").fetchone() + assert old[0] == "Old" + assert old[1] == "legacy" + + # New columns exist and work + from src.repositories.table_registry import TableRegistryRepository + repo = TableRegistryRepository(conn2) + repo.register(id="new_table", name="New", source_type="keboola", + bucket="in.c-crm", source_table="new", query_mode="local") + + new = repo.get("new_table") + assert new["source_type"] == "keboola" + assert new["query_mode"] == "local" + + # Both old and new queryable + all_tables = repo.list_all() + assert len(all_tables) == 2 + conn2.close()