test: add shared test infrastructure (fixtures, factories, assertions, mocks)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ZdenekSrotyr 2026-04-12 11:05:35 +02:00
parent 51f60bbf91
commit 510608813c
6 changed files with 426 additions and 0 deletions

View file

@ -46,6 +46,7 @@ dependencies = [
dev = [
"pytest>=9.0.0",
"pytest-timeout>=2.0.0",
"pytest-xdist>=3.0.0",
"faker>=24.0.0",
"anthropic>=0.30.0",
"openai>=1.30.0",
@ -65,6 +66,7 @@ packages = ["app", "src", "connectors", "cli", "services", "config"]
dev-dependencies = [
"pytest>=9.0.0",
"pytest-timeout>=2.0.0",
"pytest-xdist>=3.0.0",
"faker>=24.0.0",
"anthropic>=0.30.0",
"openai>=1.30.0",

View file

@ -3,3 +3,5 @@ addopts = -m "not live and not docker" --timeout=60 --strict-markers
markers =
live: tests requiring server access (run with '-m live')
docker: tests requiring Docker (run with '-m docker')
integration: FastAPI TestClient API integration tests
journey: end-to-end user flow tests spanning multiple components

View file

@ -113,3 +113,48 @@ def seeded_app(e2e_env):
"analyst_token": analyst_token,
"env": e2e_env,
}
@pytest.fixture
def mock_extract_factory(e2e_env):
"""Factory fixture for creating mock extract.duckdb files.
Returns a callable: factory(source_name, tables, remote_attach=None)
- source_name: str name of the connector source directory
- tables: list[dict] same format as create_mock_extract
- remote_attach: list[dict] | None rows for _remote_attach table,
each dict with keys: alias, extension, url, token_env
"""
def _factory(source_name: str, tables: list[dict], remote_attach=None):
db_path = create_mock_extract(e2e_env["extracts_dir"], source_name, tables)
if remote_attach:
conn = duckdb.connect(str(db_path))
conn.execute("""CREATE TABLE IF NOT EXISTS _remote_attach (
alias VARCHAR,
extension VARCHAR,
url VARCHAR,
token_env VARCHAR
)""")
for row in remote_attach:
conn.execute(
"INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
[row["alias"], row["extension"], row["url"], row["token_env"]],
)
conn.close()
return db_path
return _factory
@pytest.fixture
def analyst_user(seeded_app):
"""Convenience fixture returning analyst auth headers dict."""
token = seeded_app["analyst_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def admin_user(seeded_app):
"""Convenience fixture returning admin auth headers dict."""
token = seeded_app["admin_token"]
return {"Authorization": f"Bearer {token}"}

View file

@ -0,0 +1,75 @@
"""Reusable assertion helpers for the test suite."""
from pathlib import Path
import duckdb
def assert_api_error(response, expected_status: int, detail_contains: str = "") -> None:
"""Assert that an API response is an error with the expected status code.
Args:
response: httpx / TestClient response object.
expected_status: Expected HTTP status code (e.g. 400, 404, 422).
detail_contains: If non-empty, assert the response JSON 'detail'
field contains this substring (case-sensitive).
"""
assert response.status_code == expected_status, (
f"Expected status {expected_status}, got {response.status_code}. "
f"Response body: {response.text}"
)
if detail_contains:
try:
body = response.json()
except Exception:
body = {}
detail = body.get("detail", "")
if isinstance(detail, list):
# FastAPI validation errors return a list of error dicts
detail_str = str(detail)
else:
detail_str = str(detail)
assert detail_contains in detail_str, (
f"Expected detail to contain {detail_contains!r}, got: {detail_str!r}"
)
def assert_parquet_readable(path: str | Path, min_rows: int = 0) -> None:
"""Assert that a parquet file is readable and contains at least min_rows rows.
Args:
path: Filesystem path to the parquet file.
min_rows: Minimum number of rows expected (default 0 = non-empty optional).
"""
path = str(path)
conn = duckdb.connect()
try:
result = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{path}')").fetchone()
assert result is not None, f"Could not read parquet file: {path}"
row_count = result[0]
assert row_count >= min_rows, (
f"Parquet file {path!r} has {row_count} rows, expected >= {min_rows}"
)
finally:
conn.close()
def assert_duckdb_table_exists(db_path: str | Path, table_name: str) -> None:
"""Assert that a table (or view) with the given name exists in a DuckDB file.
Args:
db_path: Filesystem path to the DuckDB database file.
table_name: Name of the table or view to check.
"""
db_path = str(db_path)
conn = duckdb.connect(db_path, read_only=True)
try:
result = conn.execute(
"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?",
[table_name],
).fetchone()
assert result is not None and result[0] > 0, (
f"Table or view {table_name!r} does not exist in DuckDB database {db_path!r}"
)
finally:
conn.close()

169
tests/helpers/factories.py Normal file
View file

@ -0,0 +1,169 @@
"""Faker-based test data factories with deterministic seed."""
import hashlib
import hmac
import json
import uuid
from typing import Any
from faker import Faker
Faker.seed(42)
_fake = Faker()
class UserFactory:
"""Factory for user dicts matching UserRepository.create() signature."""
@staticmethod
def build(role: str = "analyst", **overrides) -> dict[str, Any]:
"""Build a user dict.
Returns keys: id, email, name, role.
Pass keyword overrides to replace any field.
"""
data = {
"id": str(uuid.uuid4()),
"email": _fake.unique.email(),
"name": _fake.name(),
"role": role,
}
data.update(overrides)
return data
class TableRegistryFactory:
"""Factory for table_registry entry dicts."""
_SOURCE_TYPES = ["keboola", "bigquery", "csv"]
_QUERY_MODES = ["local", "remote"]
_SCHEDULES = ["0 * * * *", "0 6 * * *", "*/30 * * * *"]
@staticmethod
def build(**overrides) -> dict[str, Any]:
"""Build a table registry dict.
Returns keys: name, source_type, bucket, source_table,
query_mode, sync_schedule, description.
"""
source_type = overrides.pop("source_type", _fake.random_element(TableRegistryFactory._SOURCE_TYPES))
data = {
"name": _fake.unique.slug().replace("-", "_"),
"source_type": source_type,
"bucket": f"in.c-{_fake.word()}",
"source_table": _fake.word() + "_data",
"query_mode": _fake.random_element(TableRegistryFactory._QUERY_MODES),
"sync_schedule": _fake.random_element(TableRegistryFactory._SCHEDULES),
"description": _fake.sentence(),
}
data["source_type"] = source_type
data.update(overrides)
return data
class KnowledgeItemFactory:
"""Factory for knowledge item dicts."""
_CATEGORIES = ["business", "technical", "process", "metrics"]
@staticmethod
def build(**overrides) -> dict[str, Any]:
"""Build a knowledge item dict.
Returns keys: title, content, category, tags.
"""
data = {
"title": _fake.sentence(nb_words=6).rstrip("."),
"content": _fake.paragraph(nb_sentences=4),
"category": _fake.random_element(KnowledgeItemFactory._CATEGORIES),
"tags": [_fake.word() for _ in range(_fake.random_int(1, 4))],
}
data.update(overrides)
return data
class WebhookEventFactory:
"""Factory for webhook event payloads."""
@staticmethod
def build_jira_event(
event_type: str = "jira:issue_updated",
issue_key: str | None = None,
**overrides,
) -> dict[str, Any]:
"""Build a Jira webhook event payload dict.
Args:
event_type: Jira webhook event name, e.g. 'jira:issue_created'.
issue_key: Issue key like 'PROJ-123'. Generated if not provided.
**overrides: Top-level keys to override in the payload.
Returns a dict matching the Jira webhook JSON structure.
"""
if issue_key is None:
project = _fake.lexify("????").upper()
issue_key = f"{project}-{_fake.random_int(1, 9999)}"
project_key = issue_key.split("-")[0]
payload: dict[str, Any] = {
"webhookEvent": event_type,
"timestamp": _fake.unix_time() * 1000,
"issue": {
"id": str(_fake.random_int(10000, 99999)),
"key": issue_key,
"self": f"https://jira.example.com/rest/api/2/issue/{issue_key}",
"fields": {
"summary": _fake.sentence(nb_words=8).rstrip("."),
"status": {
"name": _fake.random_element(["To Do", "In Progress", "Done"]),
"id": str(_fake.random_int(1, 10)),
},
"issuetype": {
"name": _fake.random_element(["Bug", "Story", "Task", "Epic"]),
"id": str(_fake.random_int(1, 10)),
},
"priority": {
"name": _fake.random_element(["Low", "Medium", "High", "Critical"]),
},
"assignee": {
"displayName": _fake.name(),
"emailAddress": _fake.email(),
"accountId": _fake.uuid4(),
},
"reporter": {
"displayName": _fake.name(),
"emailAddress": _fake.email(),
"accountId": _fake.uuid4(),
},
"project": {
"key": project_key,
"name": f"{project_key} Project",
"id": str(_fake.random_int(10000, 99999)),
},
"created": _fake.iso8601(),
"updated": _fake.iso8601(),
"description": _fake.paragraph(nb_sentences=2),
"labels": [_fake.word() for _ in range(_fake.random_int(0, 3))],
},
},
"user": {
"displayName": _fake.name(),
"emailAddress": _fake.email(),
"accountId": _fake.uuid4(),
},
}
payload.update(overrides)
return payload
@staticmethod
def sign_payload(payload: dict[str, Any], secret: str) -> str:
"""Return HMAC-SHA256 signature string for a webhook payload.
The signature is computed over the JSON-serialised payload (compact,
sorted keys) and returned as a hex digest, matching the common Jira
webhook signature scheme: 'sha256=<hex>'.
"""
body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return f"sha256={sig}"

133
tests/helpers/mocks.py Normal file
View file

@ -0,0 +1,133 @@
"""Mock classes for unit and integration tests."""
from __future__ import annotations
import json
from typing import Any
from unittest.mock import MagicMock
class MockLLMProvider:
"""Mock LLM provider that returns pre-configured responses.
Usage::
provider = MockLLMProvider(responses=[{"key": "value"}, {"other": "result"}])
result = provider.extract_json("some prompt") # returns {"key": "value"}
result = provider.extract_json("another prompt") # returns {"other": "result"}
# After exhausting responses, returns last item repeatedly.
"""
def __init__(self, responses: list[Any] | None = None) -> None:
self._responses: list[Any] = responses if responses is not None else [{}]
self._call_count = 0
def extract_json(self, *args, **kwargs) -> Any:
"""Return the next configured response, cycling at the last one."""
idx = min(self._call_count, len(self._responses) - 1)
result = self._responses[idx]
self._call_count += 1
return result
def complete(self, *args, **kwargs) -> str:
"""Return the next configured response as a JSON string."""
return json.dumps(self.extract_json(*args, **kwargs))
@property
def call_count(self) -> int:
"""Number of times extract_json / complete was called."""
return self._call_count
def reset(self) -> None:
"""Reset the call counter."""
self._call_count = 0
class MockHTTPResponse:
"""Mock httpx-compatible HTTP response.
Mimics the interface used by httpx.Response / requests.Response so that
code that calls `.json()`, `.text`, `.status_code`, and
`.raise_for_status()` works without a real HTTP server.
Usage::
response = MockHTTPResponse(200, json_data={"id": 1}, text='{"id": 1}')
response.json() # {"id": 1}
response.raise_for_status() # no-op for 2xx
response.status_code # 200
error = MockHTTPResponse(404, json_data={"detail": "not found"})
error.raise_for_status() # raises RuntimeError
"""
def __init__(
self,
status_code: int = 200,
json_data: Any = None,
text: str = "",
) -> None:
self.status_code = status_code
self._json_data = json_data
self.text = text or (json.dumps(json_data) if json_data is not None else "")
def json(self) -> Any:
"""Return the configured JSON data."""
if self._json_data is None:
raise ValueError("No JSON data configured for this MockHTTPResponse")
return self._json_data
def raise_for_status(self) -> None:
"""Raise RuntimeError for 4xx/5xx status codes (mirrors httpx behaviour)."""
if self.status_code >= 400:
raise RuntimeError(
f"HTTP error {self.status_code}: {self.text}"
)
def mock_duckdb_connection(tables: dict[str, list[dict]] | None = None) -> MagicMock:
"""Return a MagicMock that mimics a DuckDB connection.
Args:
tables: Mapping of SQL pattern list-of-tuples results that
``fetchall()`` should return when the executed SQL contains the
key as a substring. ``fetchone()`` returns the first tuple (or
None). If no key matches, fetchall returns [] and fetchone None.
The returned mock exposes:
- ``.execute(sql, params=None)`` returns self (chainable)
- ``.fetchall()`` returns matching rows or []
- ``.fetchone()`` returns first matching row or None
- ``.close()`` no-op
Example::
conn = mock_duckdb_connection({"SELECT * FROM users": [("alice", "admin")]})
conn.execute("SELECT * FROM users").fetchall() # [("alice", "admin")]
"""
tables = tables or {}
class _MockConn:
def __init__(self) -> None:
self._last_sql: str = ""
self._last_rows: list = []
def execute(self, sql: str, params: Any = None) -> "_MockConn":
self._last_sql = sql
self._last_rows = []
for pattern, rows in tables.items():
if pattern in sql:
self._last_rows = list(rows)
break
return self
def fetchall(self) -> list:
return self._last_rows
def fetchone(self) -> Any:
return self._last_rows[0] if self._last_rows else None
def close(self) -> None:
pass
return _MockConn()