feat: add E2E test suite — API, extractor, Docker
tests/conftest.py: shared fixtures (e2e_env, seeded_app, create_mock_extract) tests/test_e2e_api.py: 11 tests — full sync flow, RBAC, table lifecycle tests/test_e2e_extract.py: 6 tests — Keboola/BQ/Jira pipelines, multi-source, corrupt handling tests/test_e2e_docker.py: 3 tests — Docker health + full flow (opt-in via -m docker) Fix admin update route (duplicate id kwarg, .dict() → .model_dump()). 705 tests passing.
This commit is contained in:
parent
b0eaef88cc
commit
617e724d21
6 changed files with 726 additions and 3 deletions
|
|
@ -121,9 +121,13 @@ async def update_table(
|
|||
if not repo.get(table_id):
|
||||
raise HTTPException(status_code=404, detail="Table not found")
|
||||
|
||||
updates = {k: v for k, v in request.dict().items() if v is not None}
|
||||
updates = {k: v for k, v in request.model_dump().items() if v is not None}
|
||||
if updates:
|
||||
repo.register(id=table_id, **{**repo.get(table_id), **updates})
|
||||
existing = repo.get(table_id)
|
||||
merged = {k: v for k, v in existing.items() if k != "registered_at"}
|
||||
merged.update(updates)
|
||||
merged.pop("id", None) # avoid duplicate id kwarg
|
||||
repo.register(id=table_id, **merged)
|
||||
return {"id": table_id, "updated": list(updates.keys())}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
[pytest]
|
||||
addopts = -m "not live"
|
||||
addopts = -m "not live and not docker"
|
||||
markers =
|
||||
live: tests requiring server access (run with '-m live')
|
||||
docker: tests requiring Docker (run with '-m docker')
|
||||
|
|
|
|||
118
tests/conftest.py
Normal file
118
tests/conftest.py
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
"""Shared test fixtures for E2E tests."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import duckdb
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def e2e_env(tmp_path):
|
||||
"""Set up complete E2E environment with DATA_DIR, create dirs."""
|
||||
os.environ["DATA_DIR"] = str(tmp_path)
|
||||
os.environ["JWT_SECRET_KEY"] = "test-secret-e2e"
|
||||
|
||||
(tmp_path / "extracts").mkdir()
|
||||
(tmp_path / "analytics").mkdir()
|
||||
(tmp_path / "state").mkdir()
|
||||
|
||||
yield {
|
||||
"data_dir": tmp_path,
|
||||
"extracts_dir": tmp_path / "extracts",
|
||||
"analytics_db": str(tmp_path / "analytics" / "server.duckdb"),
|
||||
}
|
||||
|
||||
os.environ.pop("DATA_DIR", None)
|
||||
os.environ.pop("JWT_SECRET_KEY", None)
|
||||
|
||||
|
||||
def create_mock_extract(extracts_dir: Path, source_name: str, tables: list[dict]):
|
||||
"""Create a mock extract.duckdb with _meta and data tables.
|
||||
|
||||
tables: [{"name": "orders", "data": [{"id": "1", "total": "100"}], "query_mode": "local"}]
|
||||
"""
|
||||
source_dir = extracts_dir / source_name
|
||||
source_dir.mkdir(exist_ok=True)
|
||||
data_dir = source_dir / "data"
|
||||
data_dir.mkdir(exist_ok=True)
|
||||
|
||||
db_path = source_dir / "extract.duckdb"
|
||||
conn = duckdb.connect(str(db_path))
|
||||
|
||||
conn.execute("""CREATE TABLE IF NOT EXISTS _meta (
|
||||
table_name VARCHAR, description VARCHAR, rows BIGINT,
|
||||
size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR DEFAULT 'local'
|
||||
)""")
|
||||
# Delete existing meta rows to allow re-calling
|
||||
conn.execute("DELETE FROM _meta")
|
||||
|
||||
for t in tables:
|
||||
name = t["name"]
|
||||
rows_data = t.get("data", [])
|
||||
query_mode = t.get("query_mode", "local")
|
||||
|
||||
if rows_data and query_mode == "local":
|
||||
# Write actual parquet file
|
||||
pq_path = str(data_dir / f"{name}.parquet")
|
||||
# Build SQL from data
|
||||
selects = []
|
||||
for row in rows_data:
|
||||
vals = ", ".join(f"'{v}' AS {k}" for k, v in row.items())
|
||||
selects.append(f"SELECT {vals}")
|
||||
union_sql = " UNION ALL ".join(selects)
|
||||
conn.execute(f"COPY ({union_sql}) TO '{pq_path}' (FORMAT PARQUET)")
|
||||
|
||||
rows = len(rows_data)
|
||||
size = os.path.getsize(pq_path)
|
||||
conn.execute(f'CREATE OR REPLACE VIEW "{name}" AS SELECT * FROM read_parquet(\'{pq_path}\')')
|
||||
conn.execute("INSERT INTO _meta VALUES (?, ?, ?, ?, current_timestamp, 'local')",
|
||||
[name, t.get("description", ""), rows, size])
|
||||
else:
|
||||
# Remote or empty table
|
||||
conn.execute(f'CREATE TABLE IF NOT EXISTS "{name}" (id VARCHAR)')
|
||||
conn.execute("INSERT INTO _meta VALUES (?, ?, 0, 0, current_timestamp, ?)",
|
||||
[name, t.get("description", ""), query_mode])
|
||||
|
||||
conn.close()
|
||||
return db_path
|
||||
|
||||
|
||||
def write_test_parquet(path: str, data: list[dict]):
|
||||
"""Create a parquet file from list of dicts."""
|
||||
conn = duckdb.connect()
|
||||
selects = []
|
||||
for row in data:
|
||||
vals = ", ".join(f"'{v}' AS {k}" for k, v in row.items())
|
||||
selects.append(f"SELECT {vals}")
|
||||
union_sql = " UNION ALL ".join(selects)
|
||||
conn.execute(f"COPY ({union_sql}) TO '{path}' (FORMAT PARQUET)")
|
||||
conn.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def seeded_app(e2e_env):
|
||||
"""FastAPI TestClient with seeded admin + analyst users, JWT tokens."""
|
||||
from src.db import get_system_db
|
||||
from src.repositories.users import UserRepository
|
||||
from app.auth.jwt import create_access_token
|
||||
from app.main import create_app
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
conn = get_system_db()
|
||||
repo = UserRepository(conn)
|
||||
repo.create(id="admin1", email="admin@test.com", name="Admin", role="admin")
|
||||
repo.create(id="analyst1", email="analyst@test.com", name="Analyst", role="analyst")
|
||||
conn.close()
|
||||
|
||||
app = create_app()
|
||||
client = TestClient(app)
|
||||
admin_token = create_access_token("admin1", "admin@test.com", "admin")
|
||||
analyst_token = create_access_token("analyst1", "analyst@test.com", "analyst")
|
||||
|
||||
return {
|
||||
"client": client,
|
||||
"admin_token": admin_token,
|
||||
"analyst_token": analyst_token,
|
||||
"env": e2e_env,
|
||||
}
|
||||
200
tests/test_e2e_api.py
Normal file
200
tests/test_e2e_api.py
Normal file
|
|
@ -0,0 +1,200 @@
|
|||
"""E2E API tests — full server-side flow via FastAPI TestClient."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import duckdb
|
||||
import pytest
|
||||
|
||||
from tests.conftest import create_mock_extract
|
||||
|
||||
|
||||
def _auth(token):
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
class TestFullSyncFlow:
|
||||
"""Complete flow: register -> extract -> manifest -> download."""
|
||||
|
||||
def test_register_tables_and_get_catalog(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["admin_token"]
|
||||
env = seeded_app["env"]
|
||||
|
||||
# Register tables
|
||||
resp = c.post("/api/admin/register-table", json={
|
||||
"name": "orders", "source_type": "keboola", "bucket": "in.c-crm",
|
||||
"source_table": "orders", "query_mode": "local",
|
||||
}, headers=_auth(t))
|
||||
assert resp.status_code == 201
|
||||
|
||||
resp = c.post("/api/admin/register-table", json={
|
||||
"name": "customers", "source_type": "keboola", "bucket": "in.c-crm",
|
||||
"source_table": "customers", "query_mode": "local",
|
||||
}, headers=_auth(t))
|
||||
assert resp.status_code == 201
|
||||
|
||||
# Verify catalog
|
||||
resp = c.get("/api/catalog/tables", headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
tables = resp.json()["tables"]
|
||||
names = {tbl["name"] for tbl in tables}
|
||||
assert "orders" in names
|
||||
assert "customers" in names
|
||||
|
||||
def test_manifest_after_extract(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["admin_token"]
|
||||
env = seeded_app["env"]
|
||||
|
||||
# Create mock extract with real data
|
||||
create_mock_extract(env["extracts_dir"], "keboola", [
|
||||
{"name": "orders", "data": [
|
||||
{"id": "1", "product": "Widget", "price": "99.99"},
|
||||
{"id": "2", "product": "Gadget", "price": "49.99"},
|
||||
]},
|
||||
{"name": "customers", "data": [
|
||||
{"id": "1", "name": "Alice", "email": "alice@test.com"},
|
||||
]},
|
||||
])
|
||||
|
||||
# Run orchestrator to populate sync_state
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
SyncOrchestrator().rebuild()
|
||||
|
||||
# Check manifest
|
||||
resp = c.get("/api/sync/manifest", headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
manifest = resp.json()
|
||||
assert "orders" in manifest["tables"]
|
||||
assert "customers" in manifest["tables"]
|
||||
assert manifest["tables"]["orders"]["rows"] == 2
|
||||
assert manifest["tables"]["customers"]["rows"] == 1
|
||||
assert "server_time" in manifest
|
||||
|
||||
def test_download_parquet_and_verify_content(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["admin_token"]
|
||||
env = seeded_app["env"]
|
||||
|
||||
# Create extract
|
||||
create_mock_extract(env["extracts_dir"], "keboola", [
|
||||
{"name": "orders", "data": [
|
||||
{"id": "1", "product": "Widget", "price": "99.99"},
|
||||
{"id": "2", "product": "Gadget", "price": "49.99"},
|
||||
]},
|
||||
])
|
||||
|
||||
# Download parquet
|
||||
resp = c.get("/api/data/orders/download", headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
assert "application/octet-stream" in resp.headers.get("content-type", "")
|
||||
|
||||
# Verify content by writing to temp file and reading with DuckDB
|
||||
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as f:
|
||||
f.write(resp.content)
|
||||
tmp_path = f.name
|
||||
|
||||
try:
|
||||
conn = duckdb.connect()
|
||||
rows = conn.execute(f"SELECT * FROM read_parquet('{tmp_path}') ORDER BY id").fetchall()
|
||||
conn.close()
|
||||
assert len(rows) == 2
|
||||
assert rows[0][1] == "Widget" # product column
|
||||
assert rows[1][1] == "Gadget"
|
||||
finally:
|
||||
os.unlink(tmp_path)
|
||||
|
||||
def test_download_nonexistent_table_404(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["admin_token"]
|
||||
resp = c.get("/api/data/nonexistent/download", headers=_auth(t))
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
class TestRBACEnforcement:
|
||||
"""Verify role-based access control across API endpoints."""
|
||||
|
||||
def test_analyst_cannot_register_table(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["analyst_token"]
|
||||
resp = c.post("/api/admin/register-table", json={
|
||||
"name": "test", "source_type": "keboola",
|
||||
}, headers=_auth(t))
|
||||
assert resp.status_code == 403
|
||||
|
||||
def test_analyst_can_read_manifest(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["analyst_token"]
|
||||
resp = c.get("/api/sync/manifest", headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_analyst_can_download_data(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
env = seeded_app["env"]
|
||||
create_mock_extract(env["extracts_dir"], "keboola", [
|
||||
{"name": "orders", "data": [{"id": "1"}]},
|
||||
])
|
||||
t = seeded_app["analyst_token"]
|
||||
resp = c.get("/api/data/orders/download", headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_admin_can_trigger_sync(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["admin_token"]
|
||||
resp = c.post("/api/sync/trigger", headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_analyst_cannot_trigger_sync(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["analyst_token"]
|
||||
resp = c.post("/api/sync/trigger", headers=_auth(t))
|
||||
assert resp.status_code == 403
|
||||
|
||||
def test_unauthenticated_blocked(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
resp = c.get("/api/sync/manifest")
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
class TestTableLifecycle:
|
||||
"""Register -> update -> delete table via admin API."""
|
||||
|
||||
def test_full_lifecycle(self, seeded_app):
|
||||
c = seeded_app["client"]
|
||||
t = seeded_app["admin_token"]
|
||||
|
||||
# Create
|
||||
resp = c.post("/api/admin/register-table", json={
|
||||
"name": "lifecycle_test", "source_type": "keboola",
|
||||
"query_mode": "local", "description": "Test table",
|
||||
}, headers=_auth(t))
|
||||
assert resp.status_code == 201
|
||||
table_id = resp.json()["id"]
|
||||
|
||||
# Read
|
||||
resp = c.get("/api/admin/registry", headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
names = {tbl["name"] for tbl in resp.json()["tables"]}
|
||||
assert "lifecycle_test" in names
|
||||
|
||||
# Update
|
||||
resp = c.put(f"/api/admin/registry/{table_id}", json={
|
||||
"query_mode": "remote",
|
||||
}, headers=_auth(t))
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Verify update
|
||||
resp = c.get("/api/admin/registry", headers=_auth(t))
|
||||
table = next(tbl for tbl in resp.json()["tables"] if tbl["id"] == table_id)
|
||||
assert table["query_mode"] == "remote"
|
||||
|
||||
# Delete
|
||||
resp = c.delete(f"/api/admin/registry/{table_id}", headers=_auth(t))
|
||||
assert resp.status_code == 204
|
||||
|
||||
# Verify gone
|
||||
resp = c.get("/api/admin/registry", headers=_auth(t))
|
||||
ids = {tbl["id"] for tbl in resp.json()["tables"]}
|
||||
assert table_id not in ids
|
||||
114
tests/test_e2e_docker.py
Normal file
114
tests/test_e2e_docker.py
Normal file
|
|
@ -0,0 +1,114 @@
|
|||
"""E2E Docker tests — spin up containers, test API from outside.
|
||||
|
||||
Run with: pytest tests/test_e2e_docker.py -m docker -v
|
||||
Requires: Docker and docker compose installed.
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
# Skip all tests in this module if docker marker not selected
|
||||
pytestmark = pytest.mark.docker
|
||||
|
||||
COMPOSE_FILE = "docker-compose.test.yml"
|
||||
BASE_URL = "http://localhost:8000"
|
||||
|
||||
|
||||
def _docker_compose(*args, timeout=60):
|
||||
"""Run docker compose command."""
|
||||
cmd = ["docker", "compose", "-f", COMPOSE_FILE] + list(args)
|
||||
return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
||||
|
||||
|
||||
def _wait_for_health(url, timeout=30):
|
||||
"""Poll health endpoint until it responds 200."""
|
||||
import httpx
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
resp = httpx.get(f"{url}/api/health", timeout=5)
|
||||
if resp.status_code == 200:
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(1)
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def docker_env():
|
||||
"""Start docker compose, yield, then tear down."""
|
||||
# Check docker is available
|
||||
result = subprocess.run(["docker", "info"], capture_output=True, timeout=10)
|
||||
if result.returncode != 0:
|
||||
pytest.skip("Docker not available")
|
||||
|
||||
# Check compose file exists
|
||||
if not os.path.exists(COMPOSE_FILE):
|
||||
pytest.skip(f"{COMPOSE_FILE} not found")
|
||||
|
||||
# Start services
|
||||
_docker_compose("up", "-d", "--build")
|
||||
|
||||
# Wait for health
|
||||
if not _wait_for_health(BASE_URL, timeout=60):
|
||||
# Capture logs for debugging
|
||||
logs = _docker_compose("logs")
|
||||
_docker_compose("down", "-v")
|
||||
pytest.fail(f"Service did not become healthy.\nLogs:\n{logs.stdout}")
|
||||
|
||||
yield BASE_URL
|
||||
|
||||
# Teardown
|
||||
_docker_compose("down", "-v")
|
||||
|
||||
|
||||
class TestDockerHealth:
|
||||
def test_health_endpoint(self, docker_env):
|
||||
import httpx
|
||||
resp = httpx.get(f"{docker_env}/api/health")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data.get("status") in ("ok", "healthy")
|
||||
|
||||
def test_health_has_duckdb(self, docker_env):
|
||||
import httpx
|
||||
resp = httpx.get(f"{docker_env}/api/health")
|
||||
data = resp.json()
|
||||
checks = data.get("checks", {})
|
||||
assert "duckdb" in checks or "database" in checks
|
||||
|
||||
|
||||
class TestDockerFullFlow:
|
||||
def test_register_and_query_flow(self, docker_env):
|
||||
import httpx
|
||||
url = docker_env
|
||||
|
||||
# Get auth token
|
||||
resp = httpx.post(f"{url}/auth/token", json={"email": "admin@test.com"})
|
||||
if resp.status_code != 200:
|
||||
# Auto-create user first if needed
|
||||
pytest.skip("Auth setup required — no admin user in Docker env")
|
||||
|
||||
token = resp.json().get("token", "")
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
# Register a table
|
||||
resp = httpx.post(f"{url}/api/admin/register-table", json={
|
||||
"name": "docker_test", "source_type": "keboola", "query_mode": "local",
|
||||
}, headers=headers)
|
||||
assert resp.status_code in (201, 409) # 409 if already exists
|
||||
|
||||
# Get registry
|
||||
resp = httpx.get(f"{url}/api/admin/registry", headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["count"] >= 1
|
||||
|
||||
# Get manifest
|
||||
resp = httpx.get(f"{url}/api/sync/manifest", headers=headers)
|
||||
assert resp.status_code == 200
|
||||
assert "tables" in resp.json()
|
||||
assert "server_time" in resp.json()
|
||||
286
tests/test_e2e_extract.py
Normal file
286
tests/test_e2e_extract.py
Normal file
|
|
@ -0,0 +1,286 @@
|
|||
"""E2E tests — extractor + orchestrator pipeline."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import duckdb
|
||||
import pytest
|
||||
|
||||
|
||||
class TestKeboolaExtractToQuery:
|
||||
"""Keboola extractor -> extract.duckdb -> orchestrator -> queryable views."""
|
||||
|
||||
def test_full_pipeline(self, e2e_env):
|
||||
env = e2e_env
|
||||
|
||||
# 1. Register table in registry
|
||||
from src.db import get_system_db
|
||||
from src.repositories.table_registry import TableRegistryRepository
|
||||
conn = get_system_db()
|
||||
repo = TableRegistryRepository(conn)
|
||||
repo.register(id="orders", name="orders", source_type="keboola",
|
||||
bucket="in.c-crm", source_table="orders", query_mode="local")
|
||||
tables = repo.list_by_source("keboola")
|
||||
conn.close()
|
||||
|
||||
# 2. Run extractor (mock the DuckDB extension)
|
||||
from connectors.keboola.extractor import run as keboola_run
|
||||
|
||||
def mock_legacy(tc, pq_path, keboola_url, keboola_token):
|
||||
local = duckdb.connect()
|
||||
local.execute(
|
||||
f"COPY (SELECT '1' AS id, 'Widget' AS product, '99.99' AS price "
|
||||
f"UNION ALL SELECT '2', 'Gadget', '49.99') TO '{pq_path}' (FORMAT PARQUET)"
|
||||
)
|
||||
local.close()
|
||||
|
||||
output = str(env["extracts_dir"] / "keboola")
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", return_value=False), \
|
||||
patch("connectors.keboola.extractor._extract_via_legacy", side_effect=mock_legacy):
|
||||
result = keboola_run(output, tables, "https://example.com", "fake-token")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
assert result["tables_failed"] == 0
|
||||
|
||||
# 3. Verify extract.duckdb
|
||||
ext_conn = duckdb.connect(str(env["extracts_dir"] / "keboola" / "extract.duckdb"))
|
||||
meta = ext_conn.execute("SELECT table_name, rows, query_mode FROM _meta").fetchall()
|
||||
assert len(meta) == 1
|
||||
assert meta[0][0] == "orders"
|
||||
assert meta[0][1] == 2
|
||||
ext_conn.close()
|
||||
|
||||
# 4. Run orchestrator
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
orch = SyncOrchestrator(analytics_db_path=env["analytics_db"])
|
||||
result = orch.rebuild()
|
||||
assert "keboola" in result
|
||||
assert "orders" in result["keboola"]
|
||||
|
||||
# 5. Verify sync_state updated
|
||||
conn2 = get_system_db()
|
||||
from src.repositories.sync_state import SyncStateRepository
|
||||
state = SyncStateRepository(conn2).get_table_state("orders")
|
||||
assert state is not None
|
||||
assert state["rows"] == 2
|
||||
conn2.close()
|
||||
|
||||
# 6. Verify data queryable via extract.duckdb
|
||||
ext_conn2 = duckdb.connect(str(env["extracts_dir"] / "keboola" / "extract.duckdb"))
|
||||
rows = ext_conn2.execute("SELECT product FROM orders ORDER BY id").fetchall()
|
||||
assert rows[0][0] == "Widget"
|
||||
assert rows[1][0] == "Gadget"
|
||||
ext_conn2.close()
|
||||
|
||||
|
||||
class TestBigQueryRemoteExtract:
|
||||
"""BigQuery extractor -- remote only, no data download."""
|
||||
|
||||
def test_remote_only_pipeline(self, e2e_env):
|
||||
env = e2e_env
|
||||
output = str(env["extracts_dir"] / "bigquery")
|
||||
|
||||
table_configs = [
|
||||
{"name": "page_views", "bucket": "analytics", "source_table": "page_views", "description": "Web traffic"},
|
||||
{"name": "sessions", "bucket": "analytics", "source_table": "sessions", "description": "User sessions"},
|
||||
]
|
||||
|
||||
from connectors.bigquery import extractor as bq_mod
|
||||
|
||||
# Build extract.duckdb manually to simulate what the BQ extractor would produce,
|
||||
# since the real DuckDB BigQuery extension is not available in test environments.
|
||||
output_path = Path(output)
|
||||
output_path.mkdir(parents=True, exist_ok=True)
|
||||
db_path = output_path / "extract.duckdb"
|
||||
|
||||
conn = duckdb.connect(str(db_path))
|
||||
bq_mod._create_meta_table(conn)
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc)
|
||||
for tc in table_configs:
|
||||
name = tc["name"]
|
||||
# Create a placeholder table (no actual BQ data)
|
||||
conn.execute(f'CREATE OR REPLACE TABLE "{name}" (dummy INTEGER)')
|
||||
conn.execute(
|
||||
"INSERT INTO _meta VALUES (?, ?, 0, 0, ?, 'remote')",
|
||||
[name, tc.get("description", ""), now],
|
||||
)
|
||||
conn.close()
|
||||
|
||||
# Verify _meta
|
||||
conn = duckdb.connect(str(db_path))
|
||||
meta = conn.execute("SELECT table_name, query_mode FROM _meta ORDER BY table_name").fetchall()
|
||||
assert len(meta) == 2
|
||||
assert all(m[1] == "remote" for m in meta)
|
||||
conn.close()
|
||||
|
||||
# Verify no parquet files
|
||||
data_dir = env["extracts_dir"] / "bigquery" / "data"
|
||||
assert not data_dir.exists() or not list(data_dir.glob("*.parquet"))
|
||||
|
||||
# Verify orchestrator picks up remote tables
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
orch = SyncOrchestrator(analytics_db_path=env["analytics_db"])
|
||||
result = orch.rebuild()
|
||||
assert "bigquery" in result
|
||||
assert "page_views" in result["bigquery"]
|
||||
assert "sessions" in result["bigquery"]
|
||||
|
||||
|
||||
class TestJiraWebhookToQuery:
|
||||
"""Jira webhook -> incremental parquet -> extract.duckdb -> query."""
|
||||
|
||||
def test_jira_incremental_flow(self, e2e_env):
|
||||
env = e2e_env
|
||||
jira_dir = env["extracts_dir"] / "jira"
|
||||
|
||||
# 1. Init Jira extract
|
||||
from connectors.jira.extract_init import init_extract, update_meta
|
||||
init_extract(jira_dir)
|
||||
|
||||
# 2. Simulate incremental_transform: write a parquet to data/issues/
|
||||
issues_dir = jira_dir / "data" / "issues"
|
||||
pq_path = str(issues_dir / "2026-03.parquet")
|
||||
tmp = duckdb.connect()
|
||||
tmp.execute(
|
||||
f"COPY (SELECT 'PROJ-1' AS issue_key, 'Bug' AS type, 'Fix login' AS summary) "
|
||||
f"TO '{pq_path}' (FORMAT PARQUET)"
|
||||
)
|
||||
tmp.close()
|
||||
|
||||
# 3. Update _meta
|
||||
update_meta(jira_dir, "issues")
|
||||
|
||||
# 4. Verify _meta updated
|
||||
conn = duckdb.connect(str(jira_dir / "extract.duckdb"))
|
||||
meta = conn.execute("SELECT rows FROM _meta WHERE table_name='issues'").fetchone()
|
||||
assert meta[0] == 1
|
||||
|
||||
# 5. Verify view works
|
||||
row = conn.execute("SELECT issue_key, summary FROM issues").fetchone()
|
||||
assert row[0] == "PROJ-1"
|
||||
assert row[1] == "Fix login"
|
||||
conn.close()
|
||||
|
||||
# 6. Orchestrator picks it up
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
orch = SyncOrchestrator(analytics_db_path=env["analytics_db"])
|
||||
result = orch.rebuild()
|
||||
assert "jira" in result
|
||||
assert "issues" in result["jira"]
|
||||
|
||||
|
||||
class TestMultiSourceOrchestration:
|
||||
"""Multiple sources -> single analytics.duckdb."""
|
||||
|
||||
def test_three_sources(self, e2e_env):
|
||||
env = e2e_env
|
||||
from tests.conftest import create_mock_extract
|
||||
|
||||
# Keboola: 2 tables
|
||||
create_mock_extract(env["extracts_dir"], "keboola", [
|
||||
{"name": "orders", "data": [{"id": "1", "total": "100"}]},
|
||||
{"name": "customers", "data": [{"id": "1", "name": "Alice"}]},
|
||||
])
|
||||
|
||||
# Jira: 1 table
|
||||
create_mock_extract(env["extracts_dir"], "jira", [
|
||||
{"name": "issues", "data": [{"issue_key": "PROJ-1"}]},
|
||||
])
|
||||
|
||||
# Rebuild
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
orch = SyncOrchestrator(analytics_db_path=env["analytics_db"])
|
||||
result = orch.rebuild()
|
||||
|
||||
assert len(result) == 2 # keboola + jira
|
||||
total_tables = sum(len(v) for v in result.values())
|
||||
assert total_tables == 3 # orders + customers + issues
|
||||
|
||||
# Verify sync_state
|
||||
from src.db import get_system_db
|
||||
from src.repositories.sync_state import SyncStateRepository
|
||||
conn = get_system_db()
|
||||
states = SyncStateRepository(conn).get_all_states()
|
||||
conn.close()
|
||||
table_ids = {s["table_id"] for s in states}
|
||||
assert {"orders", "customers", "issues"}.issubset(table_ids)
|
||||
|
||||
|
||||
class TestCorruptExtractHandling:
|
||||
"""Orchestrator gracefully handles corrupt extract.duckdb."""
|
||||
|
||||
def test_skips_corrupt_continues_valid(self, e2e_env):
|
||||
env = e2e_env
|
||||
from tests.conftest import create_mock_extract
|
||||
|
||||
# Valid source
|
||||
create_mock_extract(env["extracts_dir"], "keboola", [
|
||||
{"name": "orders", "data": [{"id": "1"}]},
|
||||
])
|
||||
|
||||
# Corrupt source: write garbage to extract.duckdb
|
||||
corrupt_dir = env["extracts_dir"] / "broken"
|
||||
corrupt_dir.mkdir()
|
||||
(corrupt_dir / "extract.duckdb").write_bytes(b"this is not a duckdb file")
|
||||
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
orch = SyncOrchestrator(analytics_db_path=env["analytics_db"])
|
||||
result = orch.rebuild()
|
||||
|
||||
# Keboola should work, broken should be skipped
|
||||
assert "keboola" in result
|
||||
assert "orders" in result["keboola"]
|
||||
assert "broken" not in result or result.get("broken") == []
|
||||
|
||||
|
||||
class TestSchemaMigration:
|
||||
"""Schema v1->v2 migration preserves data and adds new columns."""
|
||||
|
||||
def test_migration_preserves_and_extends(self, e2e_env):
|
||||
env = e2e_env
|
||||
|
||||
# Create a v1-style database manually
|
||||
db_path = env["data_dir"] / "state" / "system.duckdb"
|
||||
conn = duckdb.connect(str(db_path))
|
||||
conn.execute("CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP)")
|
||||
conn.execute("INSERT INTO schema_version VALUES (1, current_timestamp)")
|
||||
conn.execute("""CREATE TABLE table_registry (
|
||||
id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL, folder VARCHAR,
|
||||
sync_strategy VARCHAR, primary_key VARCHAR, description TEXT,
|
||||
registered_by VARCHAR, registered_at TIMESTAMP DEFAULT current_timestamp
|
||||
)""")
|
||||
conn.execute("INSERT INTO table_registry (id, name, folder) VALUES ('old_table', 'Old', 'legacy')")
|
||||
# Create minimal required tables
|
||||
for tbl in ["users", "sync_state", "sync_history", "user_sync_settings",
|
||||
"knowledge_items", "knowledge_votes", "audit_log", "telegram_links",
|
||||
"pending_codes", "script_registry", "table_profiles", "dataset_permissions"]:
|
||||
conn.execute(f"CREATE TABLE IF NOT EXISTS {tbl} (id VARCHAR PRIMARY KEY)")
|
||||
conn.close()
|
||||
|
||||
# Open via get_system_db -> triggers migration
|
||||
from src.db import get_system_db, get_schema_version
|
||||
conn2 = get_system_db()
|
||||
|
||||
assert get_schema_version(conn2) == 2
|
||||
|
||||
# Old data preserved
|
||||
old = conn2.execute("SELECT name, folder FROM table_registry WHERE id='old_table'").fetchone()
|
||||
assert old[0] == "Old"
|
||||
assert old[1] == "legacy"
|
||||
|
||||
# New columns exist and work
|
||||
from src.repositories.table_registry import TableRegistryRepository
|
||||
repo = TableRegistryRepository(conn2)
|
||||
repo.register(id="new_table", name="New", source_type="keboola",
|
||||
bucket="in.c-crm", source_table="new", query_mode="local")
|
||||
|
||||
new = repo.get("new_table")
|
||||
assert new["source_type"] == "keboola"
|
||||
assert new["query_mode"] == "local"
|
||||
|
||||
# Both old and new queryable
|
||||
all_tables = repo.list_all()
|
||||
assert len(all_tables) == 2
|
||||
conn2.close()
|
||||
Loading…
Reference in a new issue