From 510608813cb0251d776da8e33eb1b2a703d5d3d1 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sun, 12 Apr 2026 11:05:35 +0200 Subject: [PATCH] test: add shared test infrastructure (fixtures, factories, assertions, mocks) Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 + pytest.ini | 2 + tests/conftest.py | 45 ++++++++++ tests/helpers/assertions.py | 75 ++++++++++++++++ tests/helpers/factories.py | 169 ++++++++++++++++++++++++++++++++++++ tests/helpers/mocks.py | 133 ++++++++++++++++++++++++++++ 6 files changed, 426 insertions(+) create mode 100644 tests/helpers/assertions.py create mode 100644 tests/helpers/factories.py create mode 100644 tests/helpers/mocks.py diff --git a/pyproject.toml b/pyproject.toml index 314cae5..7b8f52f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ dev = [ "pytest>=9.0.0", "pytest-timeout>=2.0.0", + "pytest-xdist>=3.0.0", "faker>=24.0.0", "anthropic>=0.30.0", "openai>=1.30.0", @@ -65,6 +66,7 @@ packages = ["app", "src", "connectors", "cli", "services", "config"] dev-dependencies = [ "pytest>=9.0.0", "pytest-timeout>=2.0.0", + "pytest-xdist>=3.0.0", "faker>=24.0.0", "anthropic>=0.30.0", "openai>=1.30.0", diff --git a/pytest.ini b/pytest.ini index 6104bb7..2db8938 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,3 +3,5 @@ addopts = -m "not live and not docker" --timeout=60 --strict-markers markers = live: tests requiring server access (run with '-m live') docker: tests requiring Docker (run with '-m docker') + integration: FastAPI TestClient API integration tests + journey: end-to-end user flow tests spanning multiple components diff --git a/tests/conftest.py b/tests/conftest.py index 0cc1da8..294edd0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -113,3 +113,48 @@ def seeded_app(e2e_env): "analyst_token": analyst_token, "env": e2e_env, } + + +@pytest.fixture +def mock_extract_factory(e2e_env): + """Factory fixture for creating mock extract.duckdb files. + + Returns a callable: factory(source_name, tables, remote_attach=None) + - source_name: str — name of the connector source directory + - tables: list[dict] — same format as create_mock_extract + - remote_attach: list[dict] | None — rows for _remote_attach table, + each dict with keys: alias, extension, url, token_env + """ + def _factory(source_name: str, tables: list[dict], remote_attach=None): + db_path = create_mock_extract(e2e_env["extracts_dir"], source_name, tables) + if remote_attach: + conn = duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE IF NOT EXISTS _remote_attach ( + alias VARCHAR, + extension VARCHAR, + url VARCHAR, + token_env VARCHAR + )""") + for row in remote_attach: + conn.execute( + "INSERT INTO _remote_attach VALUES (?, ?, ?, ?)", + [row["alias"], row["extension"], row["url"], row["token_env"]], + ) + conn.close() + return db_path + + return _factory + + +@pytest.fixture +def analyst_user(seeded_app): + """Convenience fixture returning analyst auth headers dict.""" + token = seeded_app["analyst_token"] + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def admin_user(seeded_app): + """Convenience fixture returning admin auth headers dict.""" + token = seeded_app["admin_token"] + return {"Authorization": f"Bearer {token}"} diff --git a/tests/helpers/assertions.py b/tests/helpers/assertions.py new file mode 100644 index 0000000..4da1157 --- /dev/null +++ b/tests/helpers/assertions.py @@ -0,0 +1,75 @@ +"""Reusable assertion helpers for the test suite.""" + +from pathlib import Path + +import duckdb + + +def assert_api_error(response, expected_status: int, detail_contains: str = "") -> None: + """Assert that an API response is an error with the expected status code. + + Args: + response: httpx / TestClient response object. + expected_status: Expected HTTP status code (e.g. 400, 404, 422). + detail_contains: If non-empty, assert the response JSON 'detail' + field contains this substring (case-sensitive). + """ + assert response.status_code == expected_status, ( + f"Expected status {expected_status}, got {response.status_code}. " + f"Response body: {response.text}" + ) + if detail_contains: + try: + body = response.json() + except Exception: + body = {} + detail = body.get("detail", "") + if isinstance(detail, list): + # FastAPI validation errors return a list of error dicts + detail_str = str(detail) + else: + detail_str = str(detail) + assert detail_contains in detail_str, ( + f"Expected detail to contain {detail_contains!r}, got: {detail_str!r}" + ) + + +def assert_parquet_readable(path: str | Path, min_rows: int = 0) -> None: + """Assert that a parquet file is readable and contains at least min_rows rows. + + Args: + path: Filesystem path to the parquet file. + min_rows: Minimum number of rows expected (default 0 = non-empty optional). + """ + path = str(path) + conn = duckdb.connect() + try: + result = conn.execute(f"SELECT COUNT(*) FROM read_parquet('{path}')").fetchone() + assert result is not None, f"Could not read parquet file: {path}" + row_count = result[0] + assert row_count >= min_rows, ( + f"Parquet file {path!r} has {row_count} rows, expected >= {min_rows}" + ) + finally: + conn.close() + + +def assert_duckdb_table_exists(db_path: str | Path, table_name: str) -> None: + """Assert that a table (or view) with the given name exists in a DuckDB file. + + Args: + db_path: Filesystem path to the DuckDB database file. + table_name: Name of the table or view to check. + """ + db_path = str(db_path) + conn = duckdb.connect(db_path, read_only=True) + try: + result = conn.execute( + "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?", + [table_name], + ).fetchone() + assert result is not None and result[0] > 0, ( + f"Table or view {table_name!r} does not exist in DuckDB database {db_path!r}" + ) + finally: + conn.close() diff --git a/tests/helpers/factories.py b/tests/helpers/factories.py new file mode 100644 index 0000000..208d321 --- /dev/null +++ b/tests/helpers/factories.py @@ -0,0 +1,169 @@ +"""Faker-based test data factories with deterministic seed.""" + +import hashlib +import hmac +import json +import uuid +from typing import Any + +from faker import Faker + +Faker.seed(42) +_fake = Faker() + + +class UserFactory: + """Factory for user dicts matching UserRepository.create() signature.""" + + @staticmethod + def build(role: str = "analyst", **overrides) -> dict[str, Any]: + """Build a user dict. + + Returns keys: id, email, name, role. + Pass keyword overrides to replace any field. + """ + data = { + "id": str(uuid.uuid4()), + "email": _fake.unique.email(), + "name": _fake.name(), + "role": role, + } + data.update(overrides) + return data + + +class TableRegistryFactory: + """Factory for table_registry entry dicts.""" + + _SOURCE_TYPES = ["keboola", "bigquery", "csv"] + _QUERY_MODES = ["local", "remote"] + _SCHEDULES = ["0 * * * *", "0 6 * * *", "*/30 * * * *"] + + @staticmethod + def build(**overrides) -> dict[str, Any]: + """Build a table registry dict. + + Returns keys: name, source_type, bucket, source_table, + query_mode, sync_schedule, description. + """ + source_type = overrides.pop("source_type", _fake.random_element(TableRegistryFactory._SOURCE_TYPES)) + data = { + "name": _fake.unique.slug().replace("-", "_"), + "source_type": source_type, + "bucket": f"in.c-{_fake.word()}", + "source_table": _fake.word() + "_data", + "query_mode": _fake.random_element(TableRegistryFactory._QUERY_MODES), + "sync_schedule": _fake.random_element(TableRegistryFactory._SCHEDULES), + "description": _fake.sentence(), + } + data["source_type"] = source_type + data.update(overrides) + return data + + +class KnowledgeItemFactory: + """Factory for knowledge item dicts.""" + + _CATEGORIES = ["business", "technical", "process", "metrics"] + + @staticmethod + def build(**overrides) -> dict[str, Any]: + """Build a knowledge item dict. + + Returns keys: title, content, category, tags. + """ + data = { + "title": _fake.sentence(nb_words=6).rstrip("."), + "content": _fake.paragraph(nb_sentences=4), + "category": _fake.random_element(KnowledgeItemFactory._CATEGORIES), + "tags": [_fake.word() for _ in range(_fake.random_int(1, 4))], + } + data.update(overrides) + return data + + +class WebhookEventFactory: + """Factory for webhook event payloads.""" + + @staticmethod + def build_jira_event( + event_type: str = "jira:issue_updated", + issue_key: str | None = None, + **overrides, + ) -> dict[str, Any]: + """Build a Jira webhook event payload dict. + + Args: + event_type: Jira webhook event name, e.g. 'jira:issue_created'. + issue_key: Issue key like 'PROJ-123'. Generated if not provided. + **overrides: Top-level keys to override in the payload. + + Returns a dict matching the Jira webhook JSON structure. + """ + if issue_key is None: + project = _fake.lexify("????").upper() + issue_key = f"{project}-{_fake.random_int(1, 9999)}" + + project_key = issue_key.split("-")[0] + + payload: dict[str, Any] = { + "webhookEvent": event_type, + "timestamp": _fake.unix_time() * 1000, + "issue": { + "id": str(_fake.random_int(10000, 99999)), + "key": issue_key, + "self": f"https://jira.example.com/rest/api/2/issue/{issue_key}", + "fields": { + "summary": _fake.sentence(nb_words=8).rstrip("."), + "status": { + "name": _fake.random_element(["To Do", "In Progress", "Done"]), + "id": str(_fake.random_int(1, 10)), + }, + "issuetype": { + "name": _fake.random_element(["Bug", "Story", "Task", "Epic"]), + "id": str(_fake.random_int(1, 10)), + }, + "priority": { + "name": _fake.random_element(["Low", "Medium", "High", "Critical"]), + }, + "assignee": { + "displayName": _fake.name(), + "emailAddress": _fake.email(), + "accountId": _fake.uuid4(), + }, + "reporter": { + "displayName": _fake.name(), + "emailAddress": _fake.email(), + "accountId": _fake.uuid4(), + }, + "project": { + "key": project_key, + "name": f"{project_key} Project", + "id": str(_fake.random_int(10000, 99999)), + }, + "created": _fake.iso8601(), + "updated": _fake.iso8601(), + "description": _fake.paragraph(nb_sentences=2), + "labels": [_fake.word() for _ in range(_fake.random_int(0, 3))], + }, + }, + "user": { + "displayName": _fake.name(), + "emailAddress": _fake.email(), + "accountId": _fake.uuid4(), + }, + } + payload.update(overrides) + return payload + + @staticmethod + def sign_payload(payload: dict[str, Any], secret: str) -> str: + """Return HMAC-SHA256 signature string for a webhook payload. + + The signature is computed over the JSON-serialised payload (compact, + sorted keys) and returned as a hex digest, matching the common Jira + webhook signature scheme: 'sha256='. + """ + body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() + sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + return f"sha256={sig}" diff --git a/tests/helpers/mocks.py b/tests/helpers/mocks.py new file mode 100644 index 0000000..1bc4e8a --- /dev/null +++ b/tests/helpers/mocks.py @@ -0,0 +1,133 @@ +"""Mock classes for unit and integration tests.""" + +from __future__ import annotations + +import json +from typing import Any +from unittest.mock import MagicMock + + +class MockLLMProvider: + """Mock LLM provider that returns pre-configured responses. + + Usage:: + + provider = MockLLMProvider(responses=[{"key": "value"}, {"other": "result"}]) + result = provider.extract_json("some prompt") # returns {"key": "value"} + result = provider.extract_json("another prompt") # returns {"other": "result"} + # After exhausting responses, returns last item repeatedly. + """ + + def __init__(self, responses: list[Any] | None = None) -> None: + self._responses: list[Any] = responses if responses is not None else [{}] + self._call_count = 0 + + def extract_json(self, *args, **kwargs) -> Any: + """Return the next configured response, cycling at the last one.""" + idx = min(self._call_count, len(self._responses) - 1) + result = self._responses[idx] + self._call_count += 1 + return result + + def complete(self, *args, **kwargs) -> str: + """Return the next configured response as a JSON string.""" + return json.dumps(self.extract_json(*args, **kwargs)) + + @property + def call_count(self) -> int: + """Number of times extract_json / complete was called.""" + return self._call_count + + def reset(self) -> None: + """Reset the call counter.""" + self._call_count = 0 + + +class MockHTTPResponse: + """Mock httpx-compatible HTTP response. + + Mimics the interface used by httpx.Response / requests.Response so that + code that calls `.json()`, `.text`, `.status_code`, and + `.raise_for_status()` works without a real HTTP server. + + Usage:: + + response = MockHTTPResponse(200, json_data={"id": 1}, text='{"id": 1}') + response.json() # {"id": 1} + response.raise_for_status() # no-op for 2xx + response.status_code # 200 + + error = MockHTTPResponse(404, json_data={"detail": "not found"}) + error.raise_for_status() # raises RuntimeError + """ + + def __init__( + self, + status_code: int = 200, + json_data: Any = None, + text: str = "", + ) -> None: + self.status_code = status_code + self._json_data = json_data + self.text = text or (json.dumps(json_data) if json_data is not None else "") + + def json(self) -> Any: + """Return the configured JSON data.""" + if self._json_data is None: + raise ValueError("No JSON data configured for this MockHTTPResponse") + return self._json_data + + def raise_for_status(self) -> None: + """Raise RuntimeError for 4xx/5xx status codes (mirrors httpx behaviour).""" + if self.status_code >= 400: + raise RuntimeError( + f"HTTP error {self.status_code}: {self.text}" + ) + + +def mock_duckdb_connection(tables: dict[str, list[dict]] | None = None) -> MagicMock: + """Return a MagicMock that mimics a DuckDB connection. + + Args: + tables: Mapping of SQL pattern → list-of-tuples results that + ``fetchall()`` should return when the executed SQL contains the + key as a substring. ``fetchone()`` returns the first tuple (or + None). If no key matches, fetchall returns [] and fetchone None. + + The returned mock exposes: + - ``.execute(sql, params=None)`` — returns self (chainable) + - ``.fetchall()`` — returns matching rows or [] + - ``.fetchone()`` — returns first matching row or None + - ``.close()`` — no-op + + Example:: + + conn = mock_duckdb_connection({"SELECT * FROM users": [("alice", "admin")]}) + conn.execute("SELECT * FROM users").fetchall() # [("alice", "admin")] + """ + tables = tables or {} + + class _MockConn: + def __init__(self) -> None: + self._last_sql: str = "" + self._last_rows: list = [] + + def execute(self, sql: str, params: Any = None) -> "_MockConn": + self._last_sql = sql + self._last_rows = [] + for pattern, rows in tables.items(): + if pattern in sql: + self._last_rows = list(rows) + break + return self + + def fetchall(self) -> list: + return self._last_rows + + def fetchone(self) -> Any: + return self._last_rows[0] if self._last_rows else None + + def close(self) -> None: + pass + + return _MockConn()