merge: resolve factories.py conflict — keep Faker factories + add Block D convenience methods
This commit is contained in:
commit
44317a86c6
6 changed files with 1121 additions and 64 deletions
|
|
@ -17,11 +17,6 @@ class UserFactory:
|
|||
|
||||
@staticmethod
|
||||
def build(role: str = "analyst", **overrides) -> dict[str, Any]:
|
||||
"""Build a user dict.
|
||||
|
||||
Returns keys: id, email, name, role.
|
||||
Pass keyword overrides to replace any field.
|
||||
"""
|
||||
data = {
|
||||
"id": str(uuid.uuid4()),
|
||||
"email": _fake.unique.email(),
|
||||
|
|
@ -41,11 +36,6 @@ class TableRegistryFactory:
|
|||
|
||||
@staticmethod
|
||||
def build(**overrides) -> dict[str, Any]:
|
||||
"""Build a table registry dict.
|
||||
|
||||
Returns keys: name, source_type, bucket, source_table,
|
||||
query_mode, sync_schedule, description.
|
||||
"""
|
||||
source_type = overrides.pop("source_type", _fake.random_element(TableRegistryFactory._SOURCE_TYPES))
|
||||
data = {
|
||||
"name": _fake.unique.slug().replace("-", "_"),
|
||||
|
|
@ -68,10 +58,6 @@ class KnowledgeItemFactory:
|
|||
|
||||
@staticmethod
|
||||
def build(**overrides) -> dict[str, Any]:
|
||||
"""Build a knowledge item dict.
|
||||
|
||||
Returns keys: title, content, category, tags.
|
||||
"""
|
||||
data = {
|
||||
"title": _fake.sentence(nb_words=6).rstrip("."),
|
||||
"content": _fake.paragraph(nb_sentences=4),
|
||||
|
|
@ -91,21 +77,10 @@ class WebhookEventFactory:
|
|||
issue_key: str | None = None,
|
||||
**overrides,
|
||||
) -> dict[str, Any]:
|
||||
"""Build a Jira webhook event payload dict.
|
||||
|
||||
Args:
|
||||
event_type: Jira webhook event name, e.g. 'jira:issue_created'.
|
||||
issue_key: Issue key like 'PROJ-123'. Generated if not provided.
|
||||
**overrides: Top-level keys to override in the payload.
|
||||
|
||||
Returns a dict matching the Jira webhook JSON structure.
|
||||
"""
|
||||
if issue_key is None:
|
||||
project = _fake.lexify("????").upper()
|
||||
issue_key = f"{project}-{_fake.random_int(1, 9999)}"
|
||||
|
||||
project_key = issue_key.split("-")[0]
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
"webhookEvent": event_type,
|
||||
"timestamp": _fake.unix_time() * 1000,
|
||||
|
|
@ -115,55 +90,58 @@ class WebhookEventFactory:
|
|||
"self": f"https://jira.example.com/rest/api/2/issue/{issue_key}",
|
||||
"fields": {
|
||||
"summary": _fake.sentence(nb_words=8).rstrip("."),
|
||||
"status": {
|
||||
"name": _fake.random_element(["To Do", "In Progress", "Done"]),
|
||||
"id": str(_fake.random_int(1, 10)),
|
||||
},
|
||||
"issuetype": {
|
||||
"name": _fake.random_element(["Bug", "Story", "Task", "Epic"]),
|
||||
"id": str(_fake.random_int(1, 10)),
|
||||
},
|
||||
"priority": {
|
||||
"name": _fake.random_element(["Low", "Medium", "High", "Critical"]),
|
||||
},
|
||||
"assignee": {
|
||||
"displayName": _fake.name(),
|
||||
"emailAddress": _fake.email(),
|
||||
"accountId": _fake.uuid4(),
|
||||
},
|
||||
"reporter": {
|
||||
"displayName": _fake.name(),
|
||||
"emailAddress": _fake.email(),
|
||||
"accountId": _fake.uuid4(),
|
||||
},
|
||||
"project": {
|
||||
"key": project_key,
|
||||
"name": f"{project_key} Project",
|
||||
"id": str(_fake.random_int(10000, 99999)),
|
||||
},
|
||||
"status": {"name": _fake.random_element(["To Do", "In Progress", "Done"]), "id": str(_fake.random_int(1, 10))},
|
||||
"issuetype": {"name": _fake.random_element(["Bug", "Story", "Task", "Epic"]), "id": str(_fake.random_int(1, 10))},
|
||||
"priority": {"name": _fake.random_element(["Low", "Medium", "High", "Critical"])},
|
||||
"project": {"key": project_key, "name": f"{project_key} Project", "id": str(_fake.random_int(10000, 99999))},
|
||||
"created": _fake.iso8601(),
|
||||
"updated": _fake.iso8601(),
|
||||
"description": _fake.paragraph(nb_sentences=2),
|
||||
"labels": [_fake.word() for _ in range(_fake.random_int(0, 3))],
|
||||
},
|
||||
},
|
||||
"user": {
|
||||
"displayName": _fake.name(),
|
||||
"emailAddress": _fake.email(),
|
||||
"accountId": _fake.uuid4(),
|
||||
},
|
||||
"user": {"displayName": _fake.name(), "emailAddress": _fake.email()},
|
||||
}
|
||||
payload.update(overrides)
|
||||
return payload
|
||||
|
||||
@staticmethod
|
||||
def sign_payload(payload: dict[str, Any], secret: str) -> str:
|
||||
"""Return HMAC-SHA256 signature string for a webhook payload.
|
||||
|
||||
The signature is computed over the JSON-serialised payload (compact,
|
||||
sorted keys) and returned as a hex digest, matching the common Jira
|
||||
webhook signature scheme: 'sha256=<hex>'.
|
||||
"""
|
||||
body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
|
||||
sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||
return f"sha256={sig}"
|
||||
|
||||
# Convenience methods used by Block D connector tests
|
||||
@staticmethod
|
||||
def issue_updated(issue_key: str = "PROJ-123", webhook_secret: str = "") -> tuple[dict, bytes, str]:
|
||||
event_data = {
|
||||
"webhookEvent": "jira:issue_updated",
|
||||
"issue": {"key": issue_key, "id": "10001", "fields": {"summary": "Test issue", "status": {"name": "Open"}, "issuetype": {"name": "Bug"}}},
|
||||
"user": {"displayName": "Test User"},
|
||||
}
|
||||
payload = json.dumps(event_data).encode()
|
||||
sig = WebhookEventFactory._sign(payload, webhook_secret) if webhook_secret else ""
|
||||
return event_data, payload, sig
|
||||
|
||||
@staticmethod
|
||||
def issue_deleted(issue_key: str = "PROJ-456", webhook_secret: str = "") -> tuple[dict, bytes, str]:
|
||||
event_data = {
|
||||
"webhookEvent": "jira:issue_deleted",
|
||||
"issue": {"key": issue_key, "id": "10002", "fields": {"summary": "Deleted issue"}},
|
||||
}
|
||||
payload = json.dumps(event_data).encode()
|
||||
sig = WebhookEventFactory._sign(payload, webhook_secret) if webhook_secret else ""
|
||||
return event_data, payload, sig
|
||||
|
||||
@staticmethod
|
||||
def issue_created(issue_key: str = "PROJ-789", webhook_secret: str = "") -> tuple[dict, bytes, str]:
|
||||
event_data = {
|
||||
"webhookEvent": "jira:issue_created",
|
||||
"issue": {"key": issue_key, "id": "10003", "fields": {"summary": "New issue", "status": {"name": "Open"}, "issuetype": {"name": "Story"}}},
|
||||
}
|
||||
payload = json.dumps(event_data).encode()
|
||||
sig = WebhookEventFactory._sign(payload, webhook_secret) if webhook_secret else ""
|
||||
return event_data, payload, sig
|
||||
|
||||
@staticmethod
|
||||
def _sign(payload: bytes, secret: str) -> str:
|
||||
mac = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest()
|
||||
return f"sha256={mac}"
|
||||
|
|
|
|||
210
tests/test_bigquery_extractor_full.py
Normal file
210
tests/test_bigquery_extractor_full.py
Normal file
|
|
@ -0,0 +1,210 @@
|
|||
"""Full tests for the BigQuery extractor connector."""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import duckdb
|
||||
import pytest
|
||||
|
||||
from tests.helpers.contract import validate_extract_contract
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def output_dir(tmp_path):
|
||||
d = tmp_path / "extracts" / "bigquery"
|
||||
d.mkdir(parents=True)
|
||||
return str(d)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_configs():
|
||||
return [
|
||||
{
|
||||
"id": "proj.analytics.orders",
|
||||
"name": "orders",
|
||||
"source_type": "bigquery",
|
||||
"bucket": "analytics",
|
||||
"source_table": "orders",
|
||||
"query_mode": "remote",
|
||||
"description": "Order data from BQ",
|
||||
},
|
||||
{
|
||||
"id": "proj.analytics.sessions",
|
||||
"name": "sessions",
|
||||
"source_type": "bigquery",
|
||||
"bucket": "analytics",
|
||||
"source_table": "sessions",
|
||||
"query_mode": "remote",
|
||||
"description": "Session data from BQ",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class _DuckDBProxy:
|
||||
"""Proxy around a real DuckDB connection that intercepts BigQuery extension SQL."""
|
||||
|
||||
def __init__(self, real_conn):
|
||||
self._real = real_conn
|
||||
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
sql_upper = sql.strip().upper()
|
||||
if sql_upper.startswith("INSTALL BIGQUERY") or sql_upper.startswith("LOAD BIGQUERY"):
|
||||
return MagicMock()
|
||||
if "ATTACH" in sql_upper and "BIGQUERY" in sql_upper:
|
||||
return MagicMock()
|
||||
if sql_upper.startswith("DETACH BQ"):
|
||||
return MagicMock()
|
||||
# CREATE VIEW referencing bq.* -> create a dummy table instead
|
||||
if "FROM BQ." in sql_upper and "CREATE" in sql_upper:
|
||||
match = re.search(r'VIEW\s+"?(\w+)"?', sql, re.IGNORECASE)
|
||||
if match:
|
||||
view_name = match.group(1)
|
||||
self._real.execute(f'CREATE OR REPLACE TABLE "{view_name}" (dummy INTEGER)')
|
||||
return MagicMock()
|
||||
return self._real.execute(sql, *args, **kwargs)
|
||||
|
||||
def close(self):
|
||||
return self._real.close()
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._real, name)
|
||||
|
||||
|
||||
def _proxy_connect(path=None, **kwargs):
|
||||
real_conn = duckdb.connect(path)
|
||||
return _DuckDBProxy(real_conn)
|
||||
|
||||
|
||||
class TestBigQueryExtractorFull:
|
||||
def test_init_extract_creates_contract_compliant_db(self, output_dir, sample_configs):
|
||||
"""init_extract() creates extract.duckdb that passes contract validation."""
|
||||
with patch("connectors.bigquery.extractor.duckdb") as mock_mod:
|
||||
mock_mod.connect = _proxy_connect
|
||||
from connectors.bigquery.extractor import init_extract
|
||||
result = init_extract(output_dir, "my-gcp-project", sample_configs)
|
||||
|
||||
assert result["tables_registered"] == 2
|
||||
assert result["errors"] == []
|
||||
|
||||
db_path = str(Path(output_dir) / "extract.duckdb")
|
||||
validate_extract_contract(db_path)
|
||||
|
||||
def test_remote_attach_table_has_correct_values(self, output_dir, sample_configs):
|
||||
"""_remote_attach row must have alias=bq, extension=bigquery, url=project=<id>, token_env=''."""
|
||||
with patch("connectors.bigquery.extractor.duckdb") as mock_mod:
|
||||
mock_mod.connect = _proxy_connect
|
||||
from connectors.bigquery.extractor import init_extract
|
||||
init_extract(output_dir, "acme-project", sample_configs)
|
||||
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True)
|
||||
ra = conn.execute("SELECT alias, extension, url, token_env FROM _remote_attach").fetchone()
|
||||
conn.close()
|
||||
|
||||
assert ra[0] == "bq"
|
||||
assert ra[1] == "bigquery"
|
||||
assert ra[2] == "project=acme-project"
|
||||
assert ra[3] == "" # BigQuery uses GOOGLE_APPLICATION_CREDENTIALS, not token_env
|
||||
|
||||
def test_all_tables_have_remote_query_mode(self, output_dir, sample_configs):
|
||||
"""All BigQuery tables must have query_mode='remote' in _meta."""
|
||||
with patch("connectors.bigquery.extractor.duckdb") as mock_mod:
|
||||
mock_mod.connect = _proxy_connect
|
||||
from connectors.bigquery.extractor import init_extract
|
||||
init_extract(output_dir, "my-project", sample_configs)
|
||||
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True)
|
||||
modes = conn.execute("SELECT DISTINCT query_mode FROM _meta").fetchall()
|
||||
conn.close()
|
||||
|
||||
assert len(modes) == 1
|
||||
assert modes[0][0] == "remote"
|
||||
|
||||
def test_no_data_directory_created(self, output_dir, sample_configs):
|
||||
"""BigQuery is remote-only — no data/ directory should be created."""
|
||||
with patch("connectors.bigquery.extractor.duckdb") as mock_mod:
|
||||
mock_mod.connect = _proxy_connect
|
||||
from connectors.bigquery.extractor import init_extract
|
||||
init_extract(output_dir, "my-project", sample_configs)
|
||||
|
||||
assert not (Path(output_dir) / "data").exists()
|
||||
|
||||
def test_meta_table_schema(self, output_dir):
|
||||
"""_meta table must have the exact contract-required columns."""
|
||||
from connectors.bigquery.extractor import _create_meta_table
|
||||
|
||||
db_path = Path(output_dir) / "schema_check.duckdb"
|
||||
conn = duckdb.connect(str(db_path))
|
||||
_create_meta_table(conn)
|
||||
cols = conn.execute(
|
||||
"SELECT column_name FROM information_schema.columns "
|
||||
"WHERE table_name='_meta' ORDER BY ordinal_position"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
|
||||
assert [c[0] for c in cols] == [
|
||||
"table_name", "description", "rows", "size_bytes", "extracted_at", "query_mode"
|
||||
]
|
||||
|
||||
def test_remote_attach_table_schema(self, output_dir):
|
||||
"""_remote_attach table must have the exact contract-required columns."""
|
||||
from connectors.bigquery.extractor import _create_remote_attach_table
|
||||
|
||||
db_path = Path(output_dir) / "ra_check.duckdb"
|
||||
conn = duckdb.connect(str(db_path))
|
||||
_create_remote_attach_table(conn, "test-project")
|
||||
cols = conn.execute(
|
||||
"SELECT column_name FROM information_schema.columns "
|
||||
"WHERE table_name='_remote_attach' ORDER BY ordinal_position"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
|
||||
assert [c[0] for c in cols] == ["alias", "extension", "url", "token_env"]
|
||||
|
||||
def test_table_registration_failure_records_error(self, output_dir):
|
||||
"""A failed table registration records the error but continues others."""
|
||||
configs = [
|
||||
{"name": "good", "bucket": "ds", "source_table": "good", "query_mode": "remote", "description": ""},
|
||||
{"name": "bad", "bucket": "ds", "source_table": "bad", "query_mode": "remote", "description": ""},
|
||||
]
|
||||
|
||||
call_count = [0]
|
||||
|
||||
class FailingProxy(_DuckDBProxy):
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
sql_upper = sql.strip().upper()
|
||||
# Intercept: fail view creation for 'bad'
|
||||
if "FROM BQ." in sql_upper and "CREATE" in sql_upper and '"bad"' in sql.lower():
|
||||
call_count[0] += 1
|
||||
raise Exception("Table not found in BigQuery")
|
||||
return super().execute(sql, *args, **kwargs)
|
||||
|
||||
def failing_connect(path=None, **kwargs):
|
||||
real_conn = duckdb.connect(path)
|
||||
return FailingProxy(real_conn)
|
||||
|
||||
with patch("connectors.bigquery.extractor.duckdb") as mock_mod:
|
||||
mock_mod.connect = failing_connect
|
||||
from connectors.bigquery.extractor import init_extract
|
||||
result = init_extract(output_dir, "my-project", configs)
|
||||
|
||||
assert result["tables_registered"] == 1
|
||||
assert len(result["errors"]) == 1
|
||||
assert result["errors"][0]["table"] == "bad"
|
||||
|
||||
def test_empty_table_list(self, output_dir):
|
||||
"""init_extract with no tables still creates a valid (empty) extract.duckdb."""
|
||||
with patch("connectors.bigquery.extractor.duckdb") as mock_mod:
|
||||
mock_mod.connect = _proxy_connect
|
||||
from connectors.bigquery.extractor import init_extract
|
||||
result = init_extract(output_dir, "my-project", [])
|
||||
|
||||
assert result["tables_registered"] == 0
|
||||
assert result["errors"] == []
|
||||
|
||||
db_path = Path(output_dir) / "extract.duckdb"
|
||||
assert db_path.exists()
|
||||
conn = duckdb.connect(str(db_path), read_only=True)
|
||||
count = conn.execute("SELECT count(*) FROM _meta").fetchone()[0]
|
||||
conn.close()
|
||||
assert count == 0
|
||||
185
tests/test_jira_incremental.py
Normal file
185
tests/test_jira_incremental.py
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
"""Tests for incremental Jira parquet transform (upsert_dataframe and friends)."""
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import duckdb
|
||||
import pandas as pd
|
||||
import pytest
|
||||
|
||||
from connectors.jira.incremental_transform import (
|
||||
load_parquet_month,
|
||||
save_parquet_month,
|
||||
upsert_dataframe,
|
||||
)
|
||||
|
||||
|
||||
# Minimal schema compatible with ISSUES_SCHEMA for testing purposes
|
||||
_SIMPLE_SCHEMA = {
|
||||
"issue_key": "string",
|
||||
"summary": "string",
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def parquet_dir(tmp_path):
|
||||
d = tmp_path / "parquet_data"
|
||||
d.mkdir()
|
||||
return d
|
||||
|
||||
|
||||
def _make_df(rows: list[dict]) -> pd.DataFrame:
|
||||
return pd.DataFrame(rows)
|
||||
|
||||
|
||||
class TestUpsertDataframe:
|
||||
def test_insert_into_empty(self):
|
||||
"""Upserting into None/empty creates a new DataFrame."""
|
||||
new_records = [{"issue_key": "PROJ-1", "summary": "Bug A"}]
|
||||
result = upsert_dataframe(None, new_records, "issue_key", "PROJ-1")
|
||||
assert len(result) == 1
|
||||
assert result.iloc[0]["issue_key"] == "PROJ-1"
|
||||
|
||||
def test_insert_new_issue(self):
|
||||
"""Upserting a new issue_key adds a new row."""
|
||||
existing = _make_df([{"issue_key": "PROJ-1", "summary": "Existing"}])
|
||||
new_records = [{"issue_key": "PROJ-2", "summary": "New issue"}]
|
||||
result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-2")
|
||||
assert len(result) == 2
|
||||
keys = set(result["issue_key"].tolist())
|
||||
assert keys == {"PROJ-1", "PROJ-2"}
|
||||
|
||||
def test_update_existing_issue(self):
|
||||
"""Upserting an existing issue_key replaces the old row."""
|
||||
existing = _make_df([
|
||||
{"issue_key": "PROJ-1", "summary": "Old summary"},
|
||||
{"issue_key": "PROJ-2", "summary": "Other issue"},
|
||||
])
|
||||
new_records = [{"issue_key": "PROJ-1", "summary": "Updated summary"}]
|
||||
result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-1")
|
||||
assert len(result) == 2
|
||||
proj1 = result[result["issue_key"] == "PROJ-1"]
|
||||
assert proj1.iloc[0]["summary"] == "Updated summary"
|
||||
|
||||
def test_delete_issue(self):
|
||||
"""Upserting with empty records removes the issue (deletion case)."""
|
||||
existing = _make_df([
|
||||
{"issue_key": "PROJ-1", "summary": "To be deleted"},
|
||||
{"issue_key": "PROJ-2", "summary": "Keep this"},
|
||||
])
|
||||
result = upsert_dataframe(existing, [], "issue_key", "PROJ-1")
|
||||
assert len(result) == 1
|
||||
assert result.iloc[0]["issue_key"] == "PROJ-2"
|
||||
|
||||
def test_upsert_empty_existing_df(self):
|
||||
"""Upserting into an empty (non-None) DataFrame works correctly."""
|
||||
existing = pd.DataFrame(columns=["issue_key", "summary"])
|
||||
new_records = [{"issue_key": "PROJ-5", "summary": "First issue"}]
|
||||
result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-5")
|
||||
assert len(result) == 1
|
||||
assert result.iloc[0]["issue_key"] == "PROJ-5"
|
||||
|
||||
def test_upsert_multiple_records_same_issue(self):
|
||||
"""Multiple records for the same issue_key are all replaced."""
|
||||
existing = _make_df([
|
||||
{"issue_key": "PROJ-1", "summary": "Comment 1"},
|
||||
{"issue_key": "PROJ-1", "summary": "Comment 2"},
|
||||
{"issue_key": "PROJ-2", "summary": "Other"},
|
||||
])
|
||||
new_records = [{"issue_key": "PROJ-1", "summary": "Updated comment"}]
|
||||
result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-1")
|
||||
proj1_rows = result[result["issue_key"] == "PROJ-1"]
|
||||
assert len(proj1_rows) == 1 # Only the updated record
|
||||
assert proj1_rows.iloc[0]["summary"] == "Updated comment"
|
||||
|
||||
|
||||
class TestParquetMonthlyPartitioning:
|
||||
def test_save_and_load_parquet(self, parquet_dir):
|
||||
"""save_parquet_month writes and load_parquet_month reads correctly."""
|
||||
df = _make_df([
|
||||
{"issue_key": "PROJ-1", "summary": "Test issue"},
|
||||
])
|
||||
save_parquet_month(df, _SIMPLE_SCHEMA, parquet_dir, "2026-04")
|
||||
loaded = load_parquet_month(parquet_dir, "2026-04")
|
||||
assert loaded is not None
|
||||
assert len(loaded) == 1
|
||||
assert loaded.iloc[0]["issue_key"] == "PROJ-1"
|
||||
|
||||
def test_load_nonexistent_returns_none(self, parquet_dir):
|
||||
"""load_parquet_month returns None if the file doesn't exist."""
|
||||
result = load_parquet_month(parquet_dir, "2099-01")
|
||||
assert result is None
|
||||
|
||||
def test_save_empty_df_removes_file(self, parquet_dir):
|
||||
"""save_parquet_month with empty df removes existing parquet file."""
|
||||
# First write a file
|
||||
df = _make_df([{"issue_key": "PROJ-1", "summary": "Test"}])
|
||||
save_parquet_month(df, _SIMPLE_SCHEMA, parquet_dir, "2026-01")
|
||||
assert (parquet_dir / "2026-01.parquet").exists()
|
||||
|
||||
# Save empty df — file should be removed
|
||||
empty = pd.DataFrame()
|
||||
save_parquet_month(empty, _SIMPLE_SCHEMA, parquet_dir, "2026-01")
|
||||
assert not (parquet_dir / "2026-01.parquet").exists()
|
||||
|
||||
def test_separate_months_independent_files(self, parquet_dir):
|
||||
"""Different month_keys write to separate parquet files."""
|
||||
df_april = _make_df([{"issue_key": "PROJ-A", "summary": "April issue"}])
|
||||
df_may = _make_df([{"issue_key": "PROJ-B", "summary": "May issue"}])
|
||||
|
||||
save_parquet_month(df_april, _SIMPLE_SCHEMA, parquet_dir, "2026-04")
|
||||
save_parquet_month(df_may, _SIMPLE_SCHEMA, parquet_dir, "2026-05")
|
||||
|
||||
assert (parquet_dir / "2026-04.parquet").exists()
|
||||
assert (parquet_dir / "2026-05.parquet").exists()
|
||||
|
||||
april_loaded = load_parquet_month(parquet_dir, "2026-04")
|
||||
may_loaded = load_parquet_month(parquet_dir, "2026-05")
|
||||
|
||||
assert april_loaded.iloc[0]["issue_key"] == "PROJ-A"
|
||||
assert may_loaded.iloc[0]["issue_key"] == "PROJ-B"
|
||||
|
||||
def test_parquet_readable_by_duckdb(self, parquet_dir):
|
||||
"""Parquet files written by save_parquet_month are readable by DuckDB."""
|
||||
df = _make_df([
|
||||
{"issue_key": "PROJ-1", "summary": "DuckDB readable"},
|
||||
{"issue_key": "PROJ-2", "summary": "Also readable"},
|
||||
])
|
||||
save_parquet_month(df, _SIMPLE_SCHEMA, parquet_dir, "2026-04")
|
||||
|
||||
pq_file = str(parquet_dir / "2026-04.parquet")
|
||||
conn = duckdb.connect()
|
||||
rows = conn.execute(f"SELECT count(*) FROM read_parquet('{pq_file}')").fetchone()
|
||||
conn.close()
|
||||
assert rows[0] == 2
|
||||
|
||||
def test_upsert_round_trip_with_real_parquet(self, parquet_dir):
|
||||
"""Full upsert round trip: write, load, upsert, save, verify."""
|
||||
# Initial write
|
||||
initial = _make_df([
|
||||
{"issue_key": "PROJ-1", "summary": "Original"},
|
||||
{"issue_key": "PROJ-2", "summary": "Keep"},
|
||||
])
|
||||
save_parquet_month(initial, _SIMPLE_SCHEMA, parquet_dir, "2026-04")
|
||||
|
||||
# Load existing
|
||||
existing = load_parquet_month(parquet_dir, "2026-04")
|
||||
|
||||
# Upsert update for PROJ-1
|
||||
updated = upsert_dataframe(
|
||||
existing,
|
||||
[{"issue_key": "PROJ-1", "summary": "Updated"}],
|
||||
"issue_key",
|
||||
"PROJ-1",
|
||||
)
|
||||
|
||||
# Save back
|
||||
save_parquet_month(updated, _SIMPLE_SCHEMA, parquet_dir, "2026-04")
|
||||
|
||||
# Reload and verify
|
||||
final = load_parquet_month(parquet_dir, "2026-04")
|
||||
assert len(final) == 2
|
||||
proj1 = final[final["issue_key"] == "PROJ-1"]
|
||||
assert proj1.iloc[0]["summary"] == "Updated"
|
||||
proj2 = final[final["issue_key"] == "PROJ-2"]
|
||||
assert proj2.iloc[0]["summary"] == "Keep"
|
||||
182
tests/test_jira_service_full.py
Normal file
182
tests/test_jira_service_full.py
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
"""Full tests for the Jira service (JiraService.process_webhook_event and friends)."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tests.helpers.factories import WebhookEventFactory
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def jira_env(tmp_path, monkeypatch):
|
||||
"""Set up a Jira environment with required dirs and env vars."""
|
||||
data_dir = tmp_path / "jira_data"
|
||||
data_dir.mkdir()
|
||||
(data_dir / "issues").mkdir()
|
||||
|
||||
monkeypatch.setenv("JIRA_DOMAIN", "mycompany.atlassian.net")
|
||||
monkeypatch.setenv("JIRA_EMAIL", "bot@mycompany.com")
|
||||
monkeypatch.setenv("JIRA_API_TOKEN", "test-token-xyz")
|
||||
monkeypatch.setenv("JIRA_DATA_DIR", str(data_dir))
|
||||
monkeypatch.setenv("JIRA_WEBHOOK_SECRET", "webhook-secret-123")
|
||||
|
||||
return data_dir
|
||||
|
||||
|
||||
def _make_jira_service(jira_env):
|
||||
"""Create a fresh JiraService with test configuration."""
|
||||
from connectors.jira import service as svc
|
||||
svc.Config.JIRA_DOMAIN = "mycompany.atlassian.net"
|
||||
svc.Config.JIRA_EMAIL = "bot@mycompany.com"
|
||||
svc.Config.JIRA_API_TOKEN = "test-token-xyz"
|
||||
svc.Config.JIRA_DATA_DIR = jira_env
|
||||
svc.Config.JIRA_WEBHOOK_SECRET = "webhook-secret-123"
|
||||
svc._jira_service = None
|
||||
return svc.JiraService()
|
||||
|
||||
|
||||
def _fake_issue_data(issue_key: str = "TEST-1") -> dict:
|
||||
return {
|
||||
"key": issue_key,
|
||||
"id": "10001",
|
||||
"fields": {
|
||||
"summary": "Test issue summary",
|
||||
"status": {"name": "Open"},
|
||||
"issuetype": {"name": "Bug"},
|
||||
"attachment": [],
|
||||
"comment": {"comments": []},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestJiraServiceWebhookProcessing:
|
||||
def test_process_issue_updated_calls_fetch_and_save(self, jira_env):
|
||||
"""process_webhook_event for issue_updated fetches fresh data from API."""
|
||||
service = _make_jira_service(jira_env)
|
||||
event_data, _, _ = WebhookEventFactory.issue_updated("PROJ-100")
|
||||
issue_data = _fake_issue_data("PROJ-100")
|
||||
|
||||
with patch.object(service, "fetch_issue", return_value=issue_data), \
|
||||
patch.object(service, "fetch_remote_links", return_value=[]), \
|
||||
patch.object(service, "fetch_sla_fields", return_value=None), \
|
||||
patch.object(service, "download_all_attachments", return_value=[]), \
|
||||
patch("connectors.jira.service.trigger_incremental_transform", return_value=True):
|
||||
result = service.process_webhook_event(event_data)
|
||||
|
||||
assert result is True
|
||||
saved_file = jira_env / "issues" / "PROJ-100.json"
|
||||
assert saved_file.exists()
|
||||
with open(saved_file) as f:
|
||||
saved = json.load(f)
|
||||
assert saved["key"] == "PROJ-100"
|
||||
|
||||
def test_process_issue_deleted_marks_file(self, jira_env):
|
||||
"""process_webhook_event for issue_deleted marks existing JSON with _deleted_at."""
|
||||
service = _make_jira_service(jira_env)
|
||||
|
||||
# Pre-create the issue JSON
|
||||
issue_file = jira_env / "issues" / "PROJ-200.json"
|
||||
issue_file.write_text(json.dumps({"key": "PROJ-200", "fields": {}}))
|
||||
|
||||
event_data, _, _ = WebhookEventFactory.issue_deleted("PROJ-200")
|
||||
|
||||
with patch("connectors.jira.service.trigger_incremental_transform", return_value=True):
|
||||
result = service.process_webhook_event(event_data)
|
||||
|
||||
assert result is True
|
||||
with open(issue_file) as f:
|
||||
saved = json.load(f)
|
||||
assert "_deleted_at" in saved
|
||||
|
||||
def test_process_missing_issue_key_returns_false(self, jira_env):
|
||||
"""Webhook event without issue key returns False."""
|
||||
service = _make_jira_service(jira_env)
|
||||
result = service.process_webhook_event({"webhookEvent": "jira:issue_updated"})
|
||||
assert result is False
|
||||
|
||||
def test_process_uses_embedded_data_when_fetch_fails(self, jira_env):
|
||||
"""Falls back to embedded issue data in webhook payload when API fetch fails."""
|
||||
service = _make_jira_service(jira_env)
|
||||
event_data = {
|
||||
"webhookEvent": "jira:issue_updated",
|
||||
"issue": {
|
||||
"key": "PROJ-300",
|
||||
"id": "10003",
|
||||
"fields": {
|
||||
"summary": "Embedded issue",
|
||||
"attachment": [],
|
||||
"comment": {"comments": []},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
with patch.object(service, "fetch_issue", return_value=None), \
|
||||
patch.object(service, "fetch_remote_links", return_value=[]), \
|
||||
patch.object(service, "fetch_sla_fields", return_value=None), \
|
||||
patch.object(service, "download_all_attachments", return_value=[]), \
|
||||
patch("connectors.jira.service.trigger_incremental_transform", return_value=True):
|
||||
result = service.process_webhook_event(event_data)
|
||||
|
||||
assert result is True
|
||||
saved_file = jira_env / "issues" / "PROJ-300.json"
|
||||
assert saved_file.exists()
|
||||
|
||||
def test_deletion_of_nonexistent_issue_returns_true(self, jira_env):
|
||||
"""Deleting an issue that has no local file returns True (idempotent)."""
|
||||
service = _make_jira_service(jira_env)
|
||||
event_data, _, _ = WebhookEventFactory.issue_deleted("PROJ-NOEXIST")
|
||||
|
||||
result = service.process_webhook_event(event_data)
|
||||
assert result is True
|
||||
|
||||
def test_fetch_issue_returns_none_on_404(self, jira_env):
|
||||
"""fetch_issue returns None when Jira returns 404."""
|
||||
import httpx
|
||||
|
||||
service = _make_jira_service(jira_env)
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 404
|
||||
mock_client = MagicMock()
|
||||
mock_client.get.return_value = mock_response
|
||||
mock_client.__enter__ = lambda s: mock_client
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch("connectors.jira.service.httpx.Client", return_value=mock_client):
|
||||
result = service.fetch_issue("PROJ-MISSING")
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_fetch_issue_returns_data_on_200(self, jira_env):
|
||||
"""fetch_issue returns parsed JSON on HTTP 200."""
|
||||
service = _make_jira_service(jira_env)
|
||||
issue_data = _fake_issue_data("PROJ-42")
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = issue_data
|
||||
mock_client = MagicMock()
|
||||
mock_client.get.return_value = mock_response
|
||||
mock_client.__enter__ = lambda s: mock_client
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
with patch("connectors.jira.service.httpx.Client", return_value=mock_client):
|
||||
result = service.fetch_issue("PROJ-42")
|
||||
|
||||
assert result is not None
|
||||
assert result["key"] == "PROJ-42"
|
||||
|
||||
def test_webhook_event_factory_signature_verification(self):
|
||||
"""WebhookEventFactory produces correct HMAC-SHA256 signatures."""
|
||||
import hashlib
|
||||
import hmac
|
||||
|
||||
secret = "test-secret"
|
||||
event_data, payload, sig = WebhookEventFactory.issue_updated("TEST-1", secret)
|
||||
|
||||
expected_mac = hmac.new(
|
||||
secret.encode("utf-8"), payload, hashlib.sha256
|
||||
).hexdigest()
|
||||
assert sig == f"sha256={expected_mac}"
|
||||
229
tests/test_keboola_extractor_full.py
Normal file
229
tests/test_keboola_extractor_full.py
Normal file
|
|
@ -0,0 +1,229 @@
|
|||
"""Full tests for the Keboola extractor connector."""
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import duckdb
|
||||
import pytest
|
||||
|
||||
from tests.conftest import create_mock_extract
|
||||
from tests.helpers.contract import validate_extract_contract
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def output_dir(tmp_path):
|
||||
d = tmp_path / "extracts" / "keboola"
|
||||
d.mkdir(parents=True)
|
||||
return str(d)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def extracts_dir(tmp_path):
|
||||
d = tmp_path / "extracts"
|
||||
d.mkdir(parents=True)
|
||||
return d
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_local_configs():
|
||||
return [
|
||||
{
|
||||
"id": "in.c-finance.orders",
|
||||
"name": "orders",
|
||||
"source_type": "keboola",
|
||||
"bucket": "in.c-finance",
|
||||
"source_table": "orders",
|
||||
"query_mode": "local",
|
||||
"description": "Finance orders",
|
||||
},
|
||||
{
|
||||
"id": "in.c-finance.customers",
|
||||
"name": "customers",
|
||||
"source_type": "keboola",
|
||||
"bucket": "in.c-finance",
|
||||
"source_table": "customers",
|
||||
"query_mode": "local",
|
||||
"description": "Customer data",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def _mock_attach(conn, url, token):
|
||||
"""Mock extension attach: creates kbc alias so views can be created."""
|
||||
conn.execute("ATTACH ':memory:' AS kbc")
|
||||
return True
|
||||
|
||||
|
||||
def _write_parquet(pq_path, data_sql="SELECT 1 AS id, 'test' AS name"):
|
||||
local_conn = duckdb.connect()
|
||||
local_conn.execute(f"COPY ({data_sql}) TO '{pq_path}' (FORMAT PARQUET)")
|
||||
local_conn.close()
|
||||
|
||||
|
||||
class TestKeboolaExtractorFull:
|
||||
def test_run_with_extension_creates_contract_compliant_db(self, output_dir, sample_local_configs):
|
||||
"""run() with extension produces extract.duckdb that passes contract validation."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
def write_pq(conn, tc, pq_path):
|
||||
_write_parquet(pq_path, "SELECT 1 AS id, 'Alice' AS name")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq):
|
||||
result = run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
assert result["tables_extracted"] == 2
|
||||
assert result["tables_failed"] == 0
|
||||
assert result["errors"] == []
|
||||
|
||||
db_path = str(Path(output_dir) / "extract.duckdb")
|
||||
validate_extract_contract(db_path)
|
||||
|
||||
def test_run_fallback_to_legacy_client(self, output_dir):
|
||||
"""When DuckDB extension unavailable, falls back to legacy client."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
configs = [{"name": "t", "id": "in.c-test.t", "query_mode": "local", "description": ""}]
|
||||
|
||||
def mock_legacy(tc, pq_path, url, token):
|
||||
_write_parquet(pq_path, "SELECT 99 AS value")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", return_value=False), \
|
||||
patch("connectors.keboola.extractor._extract_via_legacy", side_effect=mock_legacy):
|
||||
result = run(output_dir, configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
assert result["tables_failed"] == 0
|
||||
# Verify data is actually readable (parquet stores integers as int, not str)
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True)
|
||||
row = conn.execute("SELECT value FROM t").fetchone()
|
||||
conn.close()
|
||||
assert row[0] == 99
|
||||
|
||||
def test_meta_table_schema_correct(self, output_dir):
|
||||
"""_meta table must have exactly the required columns in the right order."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
configs = [{"name": "t", "query_mode": "local", "description": "desc"}]
|
||||
|
||||
def write_pq(conn, tc, pq_path):
|
||||
_write_parquet(pq_path, "SELECT 1 AS x")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq):
|
||||
run(output_dir, configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True)
|
||||
cols = conn.execute(
|
||||
"SELECT column_name FROM information_schema.columns "
|
||||
"WHERE table_name='_meta' ORDER BY ordinal_position"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
assert [c[0] for c in cols] == [
|
||||
"table_name", "description", "rows", "size_bytes", "extracted_at", "query_mode"
|
||||
]
|
||||
|
||||
def test_remote_attach_table_created_for_remote_tables(self, output_dir):
|
||||
"""_remote_attach table is created when any table has query_mode='remote'."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
def mock_attach_with_schema(conn, url, token):
|
||||
conn.execute("ATTACH ':memory:' AS kbc")
|
||||
conn.execute('CREATE SCHEMA kbc."in.c-events"')
|
||||
conn.execute('CREATE TABLE kbc."in.c-events"."big_table" (id VARCHAR)')
|
||||
return True
|
||||
|
||||
configs = [{
|
||||
"name": "big_table",
|
||||
"bucket": "in.c-events",
|
||||
"source_table": "big_table",
|
||||
"query_mode": "remote",
|
||||
"description": "Large remote table",
|
||||
}]
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=mock_attach_with_schema):
|
||||
result = run(output_dir, configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True)
|
||||
ra = conn.execute("SELECT alias, extension, url, token_env FROM _remote_attach").fetchone()
|
||||
conn.close()
|
||||
|
||||
assert ra[0] == "kbc"
|
||||
assert ra[1] == "keboola"
|
||||
assert ra[2] == "https://kbc.example.com"
|
||||
assert ra[3] == "KEBOOLA_STORAGE_TOKEN"
|
||||
|
||||
def test_remote_tables_not_downloaded(self, output_dir):
|
||||
"""Remote tables have no parquet file — they are views pointing to kbc."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
def mock_attach_with_schema(conn, url, token):
|
||||
conn.execute("ATTACH ':memory:' AS kbc")
|
||||
conn.execute('CREATE SCHEMA kbc."in.c-big"')
|
||||
conn.execute('CREATE TABLE kbc."in.c-big"."events" (id VARCHAR)')
|
||||
return True
|
||||
|
||||
configs = [{
|
||||
"name": "events",
|
||||
"bucket": "in.c-big",
|
||||
"source_table": "events",
|
||||
"query_mode": "remote",
|
||||
"description": "",
|
||||
}]
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=mock_attach_with_schema):
|
||||
run(output_dir, configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
# No parquet file for remote table
|
||||
assert not (Path(output_dir) / "data" / "events.parquet").exists()
|
||||
|
||||
# _meta has remote query_mode
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True)
|
||||
row = conn.execute("SELECT query_mode FROM _meta WHERE table_name='events'").fetchone()
|
||||
conn.close()
|
||||
assert row[0] == "remote"
|
||||
|
||||
def test_partial_failure_continues(self, output_dir, sample_local_configs):
|
||||
"""A single table failure should not abort remaining tables."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
call_count = [0]
|
||||
|
||||
def failing_first(conn, tc, pq_path):
|
||||
call_count[0] += 1
|
||||
if call_count[0] == 1:
|
||||
raise RuntimeError("Connection reset")
|
||||
_write_parquet(pq_path, "SELECT 1 AS id")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=failing_first):
|
||||
result = run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
assert result["tables_extracted"] == 1
|
||||
assert result["tables_failed"] == 1
|
||||
assert len(result["errors"]) == 1
|
||||
assert "Connection reset" in result["errors"][0]["error"]
|
||||
|
||||
def test_create_mock_extract_contract(self, extracts_dir):
|
||||
"""create_mock_extract helper produces contract-compliant extract.duckdb."""
|
||||
db_path = create_mock_extract(extracts_dir, "keboola", [
|
||||
{"name": "orders", "data": [{"id": "1", "amount": "100"}], "query_mode": "local"},
|
||||
])
|
||||
validate_extract_contract(str(db_path))
|
||||
|
||||
def test_data_directory_created(self, output_dir, sample_local_configs):
|
||||
"""data/ subdirectory is created alongside extract.duckdb."""
|
||||
from connectors.keboola.extractor import run
|
||||
|
||||
def write_pq(conn, tc, pq_path):
|
||||
_write_parquet(pq_path, "SELECT 1 AS id")
|
||||
|
||||
with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \
|
||||
patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq):
|
||||
run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc")
|
||||
|
||||
assert (Path(output_dir) / "data").is_dir()
|
||||
assert (Path(output_dir) / "data" / "orders.parquet").exists()
|
||||
assert (Path(output_dir) / "data" / "customers.parquet").exists()
|
||||
273
tests/test_llm_providers_full.py
Normal file
273
tests/test_llm_providers_full.py
Normal file
|
|
@ -0,0 +1,273 @@
|
|||
"""Full tests for LLM provider factory and extractors."""
|
||||
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import anthropic
|
||||
import openai
|
||||
import pytest
|
||||
|
||||
from connectors.llm.anthropic_provider import AnthropicExtractor
|
||||
from connectors.llm.exceptions import (
|
||||
LLMAuthError,
|
||||
LLMFormatError,
|
||||
LLMRateLimitError,
|
||||
LLMRefusalError,
|
||||
LLMTimeoutError,
|
||||
LLMUnsupportedError,
|
||||
)
|
||||
from connectors.llm.factory import DEFAULT_MODEL, create_extractor
|
||||
from connectors.llm.openai_compat import OpenAICompatExtractor, _extract_json_from_text
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mock response helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _anthropic_response(text: str, stop_reason: str = "end_turn"):
|
||||
block = MagicMock()
|
||||
block.text = text
|
||||
resp = MagicMock()
|
||||
resp.content = [block]
|
||||
resp.stop_reason = stop_reason
|
||||
return resp
|
||||
|
||||
|
||||
def _openai_response(content: str | None, finish_reason: str = "stop"):
|
||||
message = MagicMock()
|
||||
message.content = content
|
||||
choice = MagicMock()
|
||||
choice.message = message
|
||||
choice.finish_reason = finish_reason
|
||||
resp = MagicMock()
|
||||
resp.choices = [choice]
|
||||
return resp
|
||||
|
||||
|
||||
_SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Factory tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCreateExtractor:
|
||||
@patch("connectors.llm.anthropic_provider.anthropic.Anthropic")
|
||||
def test_anthropic_provider_returns_anthropic_extractor(self, _mock):
|
||||
config = {"provider": "anthropic", "api_key": "sk-ant-test"}
|
||||
ext = create_extractor(config)
|
||||
assert isinstance(ext, AnthropicExtractor)
|
||||
|
||||
@patch("connectors.llm.openai_compat.openai.OpenAI")
|
||||
@patch("connectors.llm.openai_compat.httpx.Client")
|
||||
def test_openai_compat_provider_returns_openai_extractor(self, _mock_http, _mock_oai):
|
||||
config = {
|
||||
"provider": "openai_compat",
|
||||
"api_key": "sk-test",
|
||||
"base_url": "https://api.openai.com/v1",
|
||||
}
|
||||
ext = create_extractor(config)
|
||||
assert isinstance(ext, OpenAICompatExtractor)
|
||||
|
||||
@patch("connectors.llm.anthropic_provider.anthropic.Anthropic")
|
||||
def test_legacy_anthropic_key_format(self, _mock):
|
||||
"""anthropic_api_key (legacy format) still creates AnthropicExtractor."""
|
||||
config = {"anthropic_api_key": "sk-ant-legacy"}
|
||||
ext = create_extractor(config)
|
||||
assert isinstance(ext, AnthropicExtractor)
|
||||
|
||||
def test_missing_provider_raises_value_error(self):
|
||||
with pytest.raises(ValueError, match="ai.provider is required"):
|
||||
create_extractor({"api_key": "sk-test"})
|
||||
|
||||
def test_empty_config_raises_value_error(self):
|
||||
with pytest.raises(ValueError):
|
||||
create_extractor({})
|
||||
|
||||
def test_unknown_provider_raises_value_error(self):
|
||||
with pytest.raises(ValueError, match="Unknown ai.provider"):
|
||||
create_extractor({"provider": "cohere", "api_key": "sk-test"})
|
||||
|
||||
@patch("connectors.llm.openai_compat.openai.OpenAI")
|
||||
@patch("connectors.llm.openai_compat.httpx.Client")
|
||||
def test_openai_compat_missing_base_url_raises(self, _mock_http, _mock_oai):
|
||||
with pytest.raises(ValueError, match="base_url is required"):
|
||||
create_extractor({"provider": "openai_compat", "api_key": "sk-test"})
|
||||
|
||||
def test_empty_api_key_raises_value_error(self):
|
||||
with pytest.raises(ValueError, match="api_key"):
|
||||
create_extractor({"provider": "anthropic", "api_key": ""})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AnthropicExtractor tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAnthropicExtractor:
|
||||
@patch("connectors.llm.anthropic_provider.anthropic.Anthropic")
|
||||
def test_extract_json_success(self, mock_cls):
|
||||
"""extract_json returns parsed dict on successful API call."""
|
||||
mock_client = MagicMock()
|
||||
mock_cls.return_value = mock_client
|
||||
mock_client.messages.create.return_value = _anthropic_response('{"value": "hello"}')
|
||||
|
||||
ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001")
|
||||
result = ext.extract_json("prompt", 1000, _SCHEMA, "test_schema")
|
||||
|
||||
assert result == {"value": "hello"}
|
||||
|
||||
@patch("connectors.llm.anthropic_provider.anthropic.Anthropic")
|
||||
def test_auth_error_raises_immediately(self, mock_cls):
|
||||
"""AuthenticationError is raised immediately without retry."""
|
||||
mock_client = MagicMock()
|
||||
mock_cls.return_value = mock_client
|
||||
mock_client.messages.create.side_effect = anthropic.AuthenticationError(
|
||||
message="Invalid key", response=MagicMock(), body={}
|
||||
)
|
||||
|
||||
ext = AnthropicExtractor(api_key="bad-key", model="claude-haiku-4-5-20251001")
|
||||
with pytest.raises(LLMAuthError):
|
||||
ext.extract_json("prompt", 1000, _SCHEMA, "test_schema")
|
||||
|
||||
# Should only be called once — no retry
|
||||
assert mock_client.messages.create.call_count == 1
|
||||
|
||||
@patch("connectors.llm.anthropic_provider.time.sleep")
|
||||
@patch("connectors.llm.anthropic_provider.anthropic.Anthropic")
|
||||
def test_rate_limit_retries_and_raises(self, mock_cls, mock_sleep):
|
||||
"""RateLimitError is retried MAX_RETRIES times then raises LLMRateLimitError."""
|
||||
from connectors.llm.anthropic_provider import MAX_RETRIES
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_cls.return_value = mock_client
|
||||
mock_client.messages.create.side_effect = anthropic.RateLimitError(
|
||||
message="Rate limited", response=MagicMock(), body={}
|
||||
)
|
||||
|
||||
ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001")
|
||||
with pytest.raises(LLMRateLimitError):
|
||||
ext.extract_json("prompt", 1000, _SCHEMA, "test_schema")
|
||||
|
||||
assert mock_client.messages.create.call_count == MAX_RETRIES
|
||||
|
||||
@patch("connectors.llm.anthropic_provider.anthropic.Anthropic")
|
||||
def test_truncated_response_raises_format_error(self, mock_cls):
|
||||
"""max_tokens stop_reason raises LLMFormatError."""
|
||||
mock_client = MagicMock()
|
||||
mock_cls.return_value = mock_client
|
||||
mock_client.messages.create.return_value = _anthropic_response(
|
||||
'{"partial":', stop_reason="max_tokens"
|
||||
)
|
||||
|
||||
ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001")
|
||||
with pytest.raises(LLMFormatError, match="truncated"):
|
||||
ext.extract_json("prompt", 10, _SCHEMA, "test_schema")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# OpenAICompatExtractor tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestOpenAICompatExtractor:
|
||||
def _make_extractor(self, structured_output: str = "auto") -> OpenAICompatExtractor:
|
||||
with patch("connectors.llm.openai_compat.openai.OpenAI"), \
|
||||
patch("connectors.llm.openai_compat.httpx.Client"):
|
||||
return OpenAICompatExtractor(
|
||||
api_key="sk-test",
|
||||
base_url="https://api.example.com/v1",
|
||||
model="gpt-4o-mini",
|
||||
structured_output=structured_output,
|
||||
)
|
||||
|
||||
def test_extract_json_success_json_schema(self):
|
||||
"""extract_json succeeds with json_schema strategy."""
|
||||
ext = self._make_extractor()
|
||||
ext._client = MagicMock()
|
||||
ext._client.chat.completions.create.return_value = _openai_response('{"value": "ok"}')
|
||||
|
||||
result = ext.extract_json("prompt", 1000, _SCHEMA, "test")
|
||||
assert result == {"value": "ok"}
|
||||
|
||||
def test_strategy_cascade_falls_back_on_bad_request(self):
|
||||
"""json_schema unsupported -> falls back to json_object strategy (auto mode)."""
|
||||
ext = self._make_extractor(structured_output="auto")
|
||||
ext._client = MagicMock()
|
||||
|
||||
# First call (json_schema) raises BadRequestError about response_format
|
||||
bad_request_error = openai.BadRequestError(
|
||||
message="response_format json_schema not supported",
|
||||
response=MagicMock(status_code=400),
|
||||
body={"error": {"message": "response_format json_schema not supported"}},
|
||||
)
|
||||
success_response = _openai_response('{"value": "fallback"}')
|
||||
ext._client.chat.completions.create.side_effect = [bad_request_error, success_response]
|
||||
|
||||
result = ext.extract_json("prompt", 1000, _SCHEMA, "test")
|
||||
assert result == {"value": "fallback"}
|
||||
|
||||
def test_auth_error_raises_immediately(self):
|
||||
"""AuthenticationError is not retried."""
|
||||
ext = self._make_extractor()
|
||||
ext._client = MagicMock()
|
||||
ext._client.chat.completions.create.side_effect = openai.AuthenticationError(
|
||||
message="Invalid key",
|
||||
response=MagicMock(status_code=401),
|
||||
body={},
|
||||
)
|
||||
|
||||
with pytest.raises(LLMAuthError):
|
||||
ext.extract_json("prompt", 1000, _SCHEMA, "test")
|
||||
|
||||
assert ext._client.chat.completions.create.call_count == 1
|
||||
|
||||
def test_strict_mode_raises_unsupported(self):
|
||||
"""strict mode does not fall back; raises LLMUnsupportedError."""
|
||||
ext = self._make_extractor(structured_output="strict")
|
||||
ext._client = MagicMock()
|
||||
ext._client.chat.completions.create.side_effect = openai.BadRequestError(
|
||||
message="json_schema not supported",
|
||||
response=MagicMock(status_code=400),
|
||||
body={"error": {"message": "json_schema not supported"}},
|
||||
)
|
||||
|
||||
with pytest.raises(LLMUnsupportedError):
|
||||
ext.extract_json("prompt", 1000, _SCHEMA, "test")
|
||||
|
||||
def test_refusal_raises_immediately(self):
|
||||
"""Empty content (refusal) raises LLMRefusalError."""
|
||||
ext = self._make_extractor()
|
||||
ext._client = MagicMock()
|
||||
ext._client.chat.completions.create.return_value = _openai_response(None)
|
||||
|
||||
with pytest.raises(LLMRefusalError):
|
||||
ext.extract_json("prompt", 1000, _SCHEMA, "test")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_json_from_text tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExtractJsonFromText:
|
||||
def test_direct_json_parse(self):
|
||||
assert _extract_json_from_text('{"key": "val"}') == {"key": "val"}
|
||||
|
||||
def test_strips_markdown_code_fence(self):
|
||||
text = '```json\n{"key": "fenced"}\n```'
|
||||
assert _extract_json_from_text(text) == {"key": "fenced"}
|
||||
|
||||
def test_strips_plain_code_fence(self):
|
||||
text = "```\n{\"key\": \"plain\"}\n```"
|
||||
assert _extract_json_from_text(text) == {"key": "plain"}
|
||||
|
||||
def test_brace_extraction_fallback(self):
|
||||
text = "Here is the JSON: {\"key\": \"brace\"} and some trailing text."
|
||||
assert _extract_json_from_text(text) == {"key": "brace"}
|
||||
|
||||
def test_raises_format_error_on_invalid(self):
|
||||
with pytest.raises(LLMFormatError):
|
||||
_extract_json_from_text("This is definitely not JSON at all.")
|
||||
Loading…
Reference in a new issue