diff --git a/tests/helpers/factories.py b/tests/helpers/factories.py index 208d321..6bc5ff8 100644 --- a/tests/helpers/factories.py +++ b/tests/helpers/factories.py @@ -17,11 +17,6 @@ class UserFactory: @staticmethod def build(role: str = "analyst", **overrides) -> dict[str, Any]: - """Build a user dict. - - Returns keys: id, email, name, role. - Pass keyword overrides to replace any field. - """ data = { "id": str(uuid.uuid4()), "email": _fake.unique.email(), @@ -41,11 +36,6 @@ class TableRegistryFactory: @staticmethod def build(**overrides) -> dict[str, Any]: - """Build a table registry dict. - - Returns keys: name, source_type, bucket, source_table, - query_mode, sync_schedule, description. - """ source_type = overrides.pop("source_type", _fake.random_element(TableRegistryFactory._SOURCE_TYPES)) data = { "name": _fake.unique.slug().replace("-", "_"), @@ -68,10 +58,6 @@ class KnowledgeItemFactory: @staticmethod def build(**overrides) -> dict[str, Any]: - """Build a knowledge item dict. - - Returns keys: title, content, category, tags. - """ data = { "title": _fake.sentence(nb_words=6).rstrip("."), "content": _fake.paragraph(nb_sentences=4), @@ -91,21 +77,10 @@ class WebhookEventFactory: issue_key: str | None = None, **overrides, ) -> dict[str, Any]: - """Build a Jira webhook event payload dict. - - Args: - event_type: Jira webhook event name, e.g. 'jira:issue_created'. - issue_key: Issue key like 'PROJ-123'. Generated if not provided. - **overrides: Top-level keys to override in the payload. - - Returns a dict matching the Jira webhook JSON structure. - """ if issue_key is None: project = _fake.lexify("????").upper() issue_key = f"{project}-{_fake.random_int(1, 9999)}" - project_key = issue_key.split("-")[0] - payload: dict[str, Any] = { "webhookEvent": event_type, "timestamp": _fake.unix_time() * 1000, @@ -115,55 +90,58 @@ class WebhookEventFactory: "self": f"https://jira.example.com/rest/api/2/issue/{issue_key}", "fields": { "summary": _fake.sentence(nb_words=8).rstrip("."), - "status": { - "name": _fake.random_element(["To Do", "In Progress", "Done"]), - "id": str(_fake.random_int(1, 10)), - }, - "issuetype": { - "name": _fake.random_element(["Bug", "Story", "Task", "Epic"]), - "id": str(_fake.random_int(1, 10)), - }, - "priority": { - "name": _fake.random_element(["Low", "Medium", "High", "Critical"]), - }, - "assignee": { - "displayName": _fake.name(), - "emailAddress": _fake.email(), - "accountId": _fake.uuid4(), - }, - "reporter": { - "displayName": _fake.name(), - "emailAddress": _fake.email(), - "accountId": _fake.uuid4(), - }, - "project": { - "key": project_key, - "name": f"{project_key} Project", - "id": str(_fake.random_int(10000, 99999)), - }, + "status": {"name": _fake.random_element(["To Do", "In Progress", "Done"]), "id": str(_fake.random_int(1, 10))}, + "issuetype": {"name": _fake.random_element(["Bug", "Story", "Task", "Epic"]), "id": str(_fake.random_int(1, 10))}, + "priority": {"name": _fake.random_element(["Low", "Medium", "High", "Critical"])}, + "project": {"key": project_key, "name": f"{project_key} Project", "id": str(_fake.random_int(10000, 99999))}, "created": _fake.iso8601(), "updated": _fake.iso8601(), - "description": _fake.paragraph(nb_sentences=2), - "labels": [_fake.word() for _ in range(_fake.random_int(0, 3))], }, }, - "user": { - "displayName": _fake.name(), - "emailAddress": _fake.email(), - "accountId": _fake.uuid4(), - }, + "user": {"displayName": _fake.name(), "emailAddress": _fake.email()}, } payload.update(overrides) return payload @staticmethod def sign_payload(payload: dict[str, Any], secret: str) -> str: - """Return HMAC-SHA256 signature string for a webhook payload. - - The signature is computed over the JSON-serialised payload (compact, - sorted keys) and returned as a hex digest, matching the common Jira - webhook signature scheme: 'sha256='. - """ body = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() return f"sha256={sig}" + + # Convenience methods used by Block D connector tests + @staticmethod + def issue_updated(issue_key: str = "PROJ-123", webhook_secret: str = "") -> tuple[dict, bytes, str]: + event_data = { + "webhookEvent": "jira:issue_updated", + "issue": {"key": issue_key, "id": "10001", "fields": {"summary": "Test issue", "status": {"name": "Open"}, "issuetype": {"name": "Bug"}}}, + "user": {"displayName": "Test User"}, + } + payload = json.dumps(event_data).encode() + sig = WebhookEventFactory._sign(payload, webhook_secret) if webhook_secret else "" + return event_data, payload, sig + + @staticmethod + def issue_deleted(issue_key: str = "PROJ-456", webhook_secret: str = "") -> tuple[dict, bytes, str]: + event_data = { + "webhookEvent": "jira:issue_deleted", + "issue": {"key": issue_key, "id": "10002", "fields": {"summary": "Deleted issue"}}, + } + payload = json.dumps(event_data).encode() + sig = WebhookEventFactory._sign(payload, webhook_secret) if webhook_secret else "" + return event_data, payload, sig + + @staticmethod + def issue_created(issue_key: str = "PROJ-789", webhook_secret: str = "") -> tuple[dict, bytes, str]: + event_data = { + "webhookEvent": "jira:issue_created", + "issue": {"key": issue_key, "id": "10003", "fields": {"summary": "New issue", "status": {"name": "Open"}, "issuetype": {"name": "Story"}}}, + } + payload = json.dumps(event_data).encode() + sig = WebhookEventFactory._sign(payload, webhook_secret) if webhook_secret else "" + return event_data, payload, sig + + @staticmethod + def _sign(payload: bytes, secret: str) -> str: + mac = hmac.new(secret.encode("utf-8"), payload, hashlib.sha256).hexdigest() + return f"sha256={mac}" diff --git a/tests/test_bigquery_extractor_full.py b/tests/test_bigquery_extractor_full.py new file mode 100644 index 0000000..5e6513e --- /dev/null +++ b/tests/test_bigquery_extractor_full.py @@ -0,0 +1,210 @@ +"""Full tests for the BigQuery extractor connector.""" + +import re +from pathlib import Path +from unittest.mock import MagicMock, patch + +import duckdb +import pytest + +from tests.helpers.contract import validate_extract_contract + + +@pytest.fixture +def output_dir(tmp_path): + d = tmp_path / "extracts" / "bigquery" + d.mkdir(parents=True) + return str(d) + + +@pytest.fixture +def sample_configs(): + return [ + { + "id": "proj.analytics.orders", + "name": "orders", + "source_type": "bigquery", + "bucket": "analytics", + "source_table": "orders", + "query_mode": "remote", + "description": "Order data from BQ", + }, + { + "id": "proj.analytics.sessions", + "name": "sessions", + "source_type": "bigquery", + "bucket": "analytics", + "source_table": "sessions", + "query_mode": "remote", + "description": "Session data from BQ", + }, + ] + + +class _DuckDBProxy: + """Proxy around a real DuckDB connection that intercepts BigQuery extension SQL.""" + + def __init__(self, real_conn): + self._real = real_conn + + def execute(self, sql, *args, **kwargs): + sql_upper = sql.strip().upper() + if sql_upper.startswith("INSTALL BIGQUERY") or sql_upper.startswith("LOAD BIGQUERY"): + return MagicMock() + if "ATTACH" in sql_upper and "BIGQUERY" in sql_upper: + return MagicMock() + if sql_upper.startswith("DETACH BQ"): + return MagicMock() + # CREATE VIEW referencing bq.* -> create a dummy table instead + if "FROM BQ." in sql_upper and "CREATE" in sql_upper: + match = re.search(r'VIEW\s+"?(\w+)"?', sql, re.IGNORECASE) + if match: + view_name = match.group(1) + self._real.execute(f'CREATE OR REPLACE TABLE "{view_name}" (dummy INTEGER)') + return MagicMock() + return self._real.execute(sql, *args, **kwargs) + + def close(self): + return self._real.close() + + def __getattr__(self, name): + return getattr(self._real, name) + + +def _proxy_connect(path=None, **kwargs): + real_conn = duckdb.connect(path) + return _DuckDBProxy(real_conn) + + +class TestBigQueryExtractorFull: + def test_init_extract_creates_contract_compliant_db(self, output_dir, sample_configs): + """init_extract() creates extract.duckdb that passes contract validation.""" + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = _proxy_connect + from connectors.bigquery.extractor import init_extract + result = init_extract(output_dir, "my-gcp-project", sample_configs) + + assert result["tables_registered"] == 2 + assert result["errors"] == [] + + db_path = str(Path(output_dir) / "extract.duckdb") + validate_extract_contract(db_path) + + def test_remote_attach_table_has_correct_values(self, output_dir, sample_configs): + """_remote_attach row must have alias=bq, extension=bigquery, url=project=, token_env=''.""" + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = _proxy_connect + from connectors.bigquery.extractor import init_extract + init_extract(output_dir, "acme-project", sample_configs) + + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True) + ra = conn.execute("SELECT alias, extension, url, token_env FROM _remote_attach").fetchone() + conn.close() + + assert ra[0] == "bq" + assert ra[1] == "bigquery" + assert ra[2] == "project=acme-project" + assert ra[3] == "" # BigQuery uses GOOGLE_APPLICATION_CREDENTIALS, not token_env + + def test_all_tables_have_remote_query_mode(self, output_dir, sample_configs): + """All BigQuery tables must have query_mode='remote' in _meta.""" + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = _proxy_connect + from connectors.bigquery.extractor import init_extract + init_extract(output_dir, "my-project", sample_configs) + + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True) + modes = conn.execute("SELECT DISTINCT query_mode FROM _meta").fetchall() + conn.close() + + assert len(modes) == 1 + assert modes[0][0] == "remote" + + def test_no_data_directory_created(self, output_dir, sample_configs): + """BigQuery is remote-only — no data/ directory should be created.""" + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = _proxy_connect + from connectors.bigquery.extractor import init_extract + init_extract(output_dir, "my-project", sample_configs) + + assert not (Path(output_dir) / "data").exists() + + def test_meta_table_schema(self, output_dir): + """_meta table must have the exact contract-required columns.""" + from connectors.bigquery.extractor import _create_meta_table + + db_path = Path(output_dir) / "schema_check.duckdb" + conn = duckdb.connect(str(db_path)) + _create_meta_table(conn) + cols = conn.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name='_meta' ORDER BY ordinal_position" + ).fetchall() + conn.close() + + assert [c[0] for c in cols] == [ + "table_name", "description", "rows", "size_bytes", "extracted_at", "query_mode" + ] + + def test_remote_attach_table_schema(self, output_dir): + """_remote_attach table must have the exact contract-required columns.""" + from connectors.bigquery.extractor import _create_remote_attach_table + + db_path = Path(output_dir) / "ra_check.duckdb" + conn = duckdb.connect(str(db_path)) + _create_remote_attach_table(conn, "test-project") + cols = conn.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name='_remote_attach' ORDER BY ordinal_position" + ).fetchall() + conn.close() + + assert [c[0] for c in cols] == ["alias", "extension", "url", "token_env"] + + def test_table_registration_failure_records_error(self, output_dir): + """A failed table registration records the error but continues others.""" + configs = [ + {"name": "good", "bucket": "ds", "source_table": "good", "query_mode": "remote", "description": ""}, + {"name": "bad", "bucket": "ds", "source_table": "bad", "query_mode": "remote", "description": ""}, + ] + + call_count = [0] + + class FailingProxy(_DuckDBProxy): + def execute(self, sql, *args, **kwargs): + sql_upper = sql.strip().upper() + # Intercept: fail view creation for 'bad' + if "FROM BQ." in sql_upper and "CREATE" in sql_upper and '"bad"' in sql.lower(): + call_count[0] += 1 + raise Exception("Table not found in BigQuery") + return super().execute(sql, *args, **kwargs) + + def failing_connect(path=None, **kwargs): + real_conn = duckdb.connect(path) + return FailingProxy(real_conn) + + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = failing_connect + from connectors.bigquery.extractor import init_extract + result = init_extract(output_dir, "my-project", configs) + + assert result["tables_registered"] == 1 + assert len(result["errors"]) == 1 + assert result["errors"][0]["table"] == "bad" + + def test_empty_table_list(self, output_dir): + """init_extract with no tables still creates a valid (empty) extract.duckdb.""" + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = _proxy_connect + from connectors.bigquery.extractor import init_extract + result = init_extract(output_dir, "my-project", []) + + assert result["tables_registered"] == 0 + assert result["errors"] == [] + + db_path = Path(output_dir) / "extract.duckdb" + assert db_path.exists() + conn = duckdb.connect(str(db_path), read_only=True) + count = conn.execute("SELECT count(*) FROM _meta").fetchone()[0] + conn.close() + assert count == 0 diff --git a/tests/test_jira_incremental.py b/tests/test_jira_incremental.py new file mode 100644 index 0000000..4bcc6d9 --- /dev/null +++ b/tests/test_jira_incremental.py @@ -0,0 +1,185 @@ +"""Tests for incremental Jira parquet transform (upsert_dataframe and friends).""" + +from pathlib import Path +from unittest.mock import patch + +import duckdb +import pandas as pd +import pytest + +from connectors.jira.incremental_transform import ( + load_parquet_month, + save_parquet_month, + upsert_dataframe, +) + + +# Minimal schema compatible with ISSUES_SCHEMA for testing purposes +_SIMPLE_SCHEMA = { + "issue_key": "string", + "summary": "string", +} + + +@pytest.fixture +def parquet_dir(tmp_path): + d = tmp_path / "parquet_data" + d.mkdir() + return d + + +def _make_df(rows: list[dict]) -> pd.DataFrame: + return pd.DataFrame(rows) + + +class TestUpsertDataframe: + def test_insert_into_empty(self): + """Upserting into None/empty creates a new DataFrame.""" + new_records = [{"issue_key": "PROJ-1", "summary": "Bug A"}] + result = upsert_dataframe(None, new_records, "issue_key", "PROJ-1") + assert len(result) == 1 + assert result.iloc[0]["issue_key"] == "PROJ-1" + + def test_insert_new_issue(self): + """Upserting a new issue_key adds a new row.""" + existing = _make_df([{"issue_key": "PROJ-1", "summary": "Existing"}]) + new_records = [{"issue_key": "PROJ-2", "summary": "New issue"}] + result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-2") + assert len(result) == 2 + keys = set(result["issue_key"].tolist()) + assert keys == {"PROJ-1", "PROJ-2"} + + def test_update_existing_issue(self): + """Upserting an existing issue_key replaces the old row.""" + existing = _make_df([ + {"issue_key": "PROJ-1", "summary": "Old summary"}, + {"issue_key": "PROJ-2", "summary": "Other issue"}, + ]) + new_records = [{"issue_key": "PROJ-1", "summary": "Updated summary"}] + result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-1") + assert len(result) == 2 + proj1 = result[result["issue_key"] == "PROJ-1"] + assert proj1.iloc[0]["summary"] == "Updated summary" + + def test_delete_issue(self): + """Upserting with empty records removes the issue (deletion case).""" + existing = _make_df([ + {"issue_key": "PROJ-1", "summary": "To be deleted"}, + {"issue_key": "PROJ-2", "summary": "Keep this"}, + ]) + result = upsert_dataframe(existing, [], "issue_key", "PROJ-1") + assert len(result) == 1 + assert result.iloc[0]["issue_key"] == "PROJ-2" + + def test_upsert_empty_existing_df(self): + """Upserting into an empty (non-None) DataFrame works correctly.""" + existing = pd.DataFrame(columns=["issue_key", "summary"]) + new_records = [{"issue_key": "PROJ-5", "summary": "First issue"}] + result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-5") + assert len(result) == 1 + assert result.iloc[0]["issue_key"] == "PROJ-5" + + def test_upsert_multiple_records_same_issue(self): + """Multiple records for the same issue_key are all replaced.""" + existing = _make_df([ + {"issue_key": "PROJ-1", "summary": "Comment 1"}, + {"issue_key": "PROJ-1", "summary": "Comment 2"}, + {"issue_key": "PROJ-2", "summary": "Other"}, + ]) + new_records = [{"issue_key": "PROJ-1", "summary": "Updated comment"}] + result = upsert_dataframe(existing, new_records, "issue_key", "PROJ-1") + proj1_rows = result[result["issue_key"] == "PROJ-1"] + assert len(proj1_rows) == 1 # Only the updated record + assert proj1_rows.iloc[0]["summary"] == "Updated comment" + + +class TestParquetMonthlyPartitioning: + def test_save_and_load_parquet(self, parquet_dir): + """save_parquet_month writes and load_parquet_month reads correctly.""" + df = _make_df([ + {"issue_key": "PROJ-1", "summary": "Test issue"}, + ]) + save_parquet_month(df, _SIMPLE_SCHEMA, parquet_dir, "2026-04") + loaded = load_parquet_month(parquet_dir, "2026-04") + assert loaded is not None + assert len(loaded) == 1 + assert loaded.iloc[0]["issue_key"] == "PROJ-1" + + def test_load_nonexistent_returns_none(self, parquet_dir): + """load_parquet_month returns None if the file doesn't exist.""" + result = load_parquet_month(parquet_dir, "2099-01") + assert result is None + + def test_save_empty_df_removes_file(self, parquet_dir): + """save_parquet_month with empty df removes existing parquet file.""" + # First write a file + df = _make_df([{"issue_key": "PROJ-1", "summary": "Test"}]) + save_parquet_month(df, _SIMPLE_SCHEMA, parquet_dir, "2026-01") + assert (parquet_dir / "2026-01.parquet").exists() + + # Save empty df — file should be removed + empty = pd.DataFrame() + save_parquet_month(empty, _SIMPLE_SCHEMA, parquet_dir, "2026-01") + assert not (parquet_dir / "2026-01.parquet").exists() + + def test_separate_months_independent_files(self, parquet_dir): + """Different month_keys write to separate parquet files.""" + df_april = _make_df([{"issue_key": "PROJ-A", "summary": "April issue"}]) + df_may = _make_df([{"issue_key": "PROJ-B", "summary": "May issue"}]) + + save_parquet_month(df_april, _SIMPLE_SCHEMA, parquet_dir, "2026-04") + save_parquet_month(df_may, _SIMPLE_SCHEMA, parquet_dir, "2026-05") + + assert (parquet_dir / "2026-04.parquet").exists() + assert (parquet_dir / "2026-05.parquet").exists() + + april_loaded = load_parquet_month(parquet_dir, "2026-04") + may_loaded = load_parquet_month(parquet_dir, "2026-05") + + assert april_loaded.iloc[0]["issue_key"] == "PROJ-A" + assert may_loaded.iloc[0]["issue_key"] == "PROJ-B" + + def test_parquet_readable_by_duckdb(self, parquet_dir): + """Parquet files written by save_parquet_month are readable by DuckDB.""" + df = _make_df([ + {"issue_key": "PROJ-1", "summary": "DuckDB readable"}, + {"issue_key": "PROJ-2", "summary": "Also readable"}, + ]) + save_parquet_month(df, _SIMPLE_SCHEMA, parquet_dir, "2026-04") + + pq_file = str(parquet_dir / "2026-04.parquet") + conn = duckdb.connect() + rows = conn.execute(f"SELECT count(*) FROM read_parquet('{pq_file}')").fetchone() + conn.close() + assert rows[0] == 2 + + def test_upsert_round_trip_with_real_parquet(self, parquet_dir): + """Full upsert round trip: write, load, upsert, save, verify.""" + # Initial write + initial = _make_df([ + {"issue_key": "PROJ-1", "summary": "Original"}, + {"issue_key": "PROJ-2", "summary": "Keep"}, + ]) + save_parquet_month(initial, _SIMPLE_SCHEMA, parquet_dir, "2026-04") + + # Load existing + existing = load_parquet_month(parquet_dir, "2026-04") + + # Upsert update for PROJ-1 + updated = upsert_dataframe( + existing, + [{"issue_key": "PROJ-1", "summary": "Updated"}], + "issue_key", + "PROJ-1", + ) + + # Save back + save_parquet_month(updated, _SIMPLE_SCHEMA, parquet_dir, "2026-04") + + # Reload and verify + final = load_parquet_month(parquet_dir, "2026-04") + assert len(final) == 2 + proj1 = final[final["issue_key"] == "PROJ-1"] + assert proj1.iloc[0]["summary"] == "Updated" + proj2 = final[final["issue_key"] == "PROJ-2"] + assert proj2.iloc[0]["summary"] == "Keep" diff --git a/tests/test_jira_service_full.py b/tests/test_jira_service_full.py new file mode 100644 index 0000000..0d4e405 --- /dev/null +++ b/tests/test_jira_service_full.py @@ -0,0 +1,182 @@ +"""Full tests for the Jira service (JiraService.process_webhook_event and friends).""" + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from tests.helpers.factories import WebhookEventFactory + + +@pytest.fixture +def jira_env(tmp_path, monkeypatch): + """Set up a Jira environment with required dirs and env vars.""" + data_dir = tmp_path / "jira_data" + data_dir.mkdir() + (data_dir / "issues").mkdir() + + monkeypatch.setenv("JIRA_DOMAIN", "mycompany.atlassian.net") + monkeypatch.setenv("JIRA_EMAIL", "bot@mycompany.com") + monkeypatch.setenv("JIRA_API_TOKEN", "test-token-xyz") + monkeypatch.setenv("JIRA_DATA_DIR", str(data_dir)) + monkeypatch.setenv("JIRA_WEBHOOK_SECRET", "webhook-secret-123") + + return data_dir + + +def _make_jira_service(jira_env): + """Create a fresh JiraService with test configuration.""" + from connectors.jira import service as svc + svc.Config.JIRA_DOMAIN = "mycompany.atlassian.net" + svc.Config.JIRA_EMAIL = "bot@mycompany.com" + svc.Config.JIRA_API_TOKEN = "test-token-xyz" + svc.Config.JIRA_DATA_DIR = jira_env + svc.Config.JIRA_WEBHOOK_SECRET = "webhook-secret-123" + svc._jira_service = None + return svc.JiraService() + + +def _fake_issue_data(issue_key: str = "TEST-1") -> dict: + return { + "key": issue_key, + "id": "10001", + "fields": { + "summary": "Test issue summary", + "status": {"name": "Open"}, + "issuetype": {"name": "Bug"}, + "attachment": [], + "comment": {"comments": []}, + }, + } + + +class TestJiraServiceWebhookProcessing: + def test_process_issue_updated_calls_fetch_and_save(self, jira_env): + """process_webhook_event for issue_updated fetches fresh data from API.""" + service = _make_jira_service(jira_env) + event_data, _, _ = WebhookEventFactory.issue_updated("PROJ-100") + issue_data = _fake_issue_data("PROJ-100") + + with patch.object(service, "fetch_issue", return_value=issue_data), \ + patch.object(service, "fetch_remote_links", return_value=[]), \ + patch.object(service, "fetch_sla_fields", return_value=None), \ + patch.object(service, "download_all_attachments", return_value=[]), \ + patch("connectors.jira.service.trigger_incremental_transform", return_value=True): + result = service.process_webhook_event(event_data) + + assert result is True + saved_file = jira_env / "issues" / "PROJ-100.json" + assert saved_file.exists() + with open(saved_file) as f: + saved = json.load(f) + assert saved["key"] == "PROJ-100" + + def test_process_issue_deleted_marks_file(self, jira_env): + """process_webhook_event for issue_deleted marks existing JSON with _deleted_at.""" + service = _make_jira_service(jira_env) + + # Pre-create the issue JSON + issue_file = jira_env / "issues" / "PROJ-200.json" + issue_file.write_text(json.dumps({"key": "PROJ-200", "fields": {}})) + + event_data, _, _ = WebhookEventFactory.issue_deleted("PROJ-200") + + with patch("connectors.jira.service.trigger_incremental_transform", return_value=True): + result = service.process_webhook_event(event_data) + + assert result is True + with open(issue_file) as f: + saved = json.load(f) + assert "_deleted_at" in saved + + def test_process_missing_issue_key_returns_false(self, jira_env): + """Webhook event without issue key returns False.""" + service = _make_jira_service(jira_env) + result = service.process_webhook_event({"webhookEvent": "jira:issue_updated"}) + assert result is False + + def test_process_uses_embedded_data_when_fetch_fails(self, jira_env): + """Falls back to embedded issue data in webhook payload when API fetch fails.""" + service = _make_jira_service(jira_env) + event_data = { + "webhookEvent": "jira:issue_updated", + "issue": { + "key": "PROJ-300", + "id": "10003", + "fields": { + "summary": "Embedded issue", + "attachment": [], + "comment": {"comments": []}, + }, + }, + } + + with patch.object(service, "fetch_issue", return_value=None), \ + patch.object(service, "fetch_remote_links", return_value=[]), \ + patch.object(service, "fetch_sla_fields", return_value=None), \ + patch.object(service, "download_all_attachments", return_value=[]), \ + patch("connectors.jira.service.trigger_incremental_transform", return_value=True): + result = service.process_webhook_event(event_data) + + assert result is True + saved_file = jira_env / "issues" / "PROJ-300.json" + assert saved_file.exists() + + def test_deletion_of_nonexistent_issue_returns_true(self, jira_env): + """Deleting an issue that has no local file returns True (idempotent).""" + service = _make_jira_service(jira_env) + event_data, _, _ = WebhookEventFactory.issue_deleted("PROJ-NOEXIST") + + result = service.process_webhook_event(event_data) + assert result is True + + def test_fetch_issue_returns_none_on_404(self, jira_env): + """fetch_issue returns None when Jira returns 404.""" + import httpx + + service = _make_jira_service(jira_env) + + mock_response = MagicMock() + mock_response.status_code = 404 + mock_client = MagicMock() + mock_client.get.return_value = mock_response + mock_client.__enter__ = lambda s: mock_client + mock_client.__exit__ = MagicMock(return_value=False) + + with patch("connectors.jira.service.httpx.Client", return_value=mock_client): + result = service.fetch_issue("PROJ-MISSING") + + assert result is None + + def test_fetch_issue_returns_data_on_200(self, jira_env): + """fetch_issue returns parsed JSON on HTTP 200.""" + service = _make_jira_service(jira_env) + issue_data = _fake_issue_data("PROJ-42") + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = issue_data + mock_client = MagicMock() + mock_client.get.return_value = mock_response + mock_client.__enter__ = lambda s: mock_client + mock_client.__exit__ = MagicMock(return_value=False) + + with patch("connectors.jira.service.httpx.Client", return_value=mock_client): + result = service.fetch_issue("PROJ-42") + + assert result is not None + assert result["key"] == "PROJ-42" + + def test_webhook_event_factory_signature_verification(self): + """WebhookEventFactory produces correct HMAC-SHA256 signatures.""" + import hashlib + import hmac + + secret = "test-secret" + event_data, payload, sig = WebhookEventFactory.issue_updated("TEST-1", secret) + + expected_mac = hmac.new( + secret.encode("utf-8"), payload, hashlib.sha256 + ).hexdigest() + assert sig == f"sha256={expected_mac}" diff --git a/tests/test_keboola_extractor_full.py b/tests/test_keboola_extractor_full.py new file mode 100644 index 0000000..fe15503 --- /dev/null +++ b/tests/test_keboola_extractor_full.py @@ -0,0 +1,229 @@ +"""Full tests for the Keboola extractor connector.""" + +from pathlib import Path +from unittest.mock import patch + +import duckdb +import pytest + +from tests.conftest import create_mock_extract +from tests.helpers.contract import validate_extract_contract + + +@pytest.fixture +def output_dir(tmp_path): + d = tmp_path / "extracts" / "keboola" + d.mkdir(parents=True) + return str(d) + + +@pytest.fixture +def extracts_dir(tmp_path): + d = tmp_path / "extracts" + d.mkdir(parents=True) + return d + + +@pytest.fixture +def sample_local_configs(): + return [ + { + "id": "in.c-finance.orders", + "name": "orders", + "source_type": "keboola", + "bucket": "in.c-finance", + "source_table": "orders", + "query_mode": "local", + "description": "Finance orders", + }, + { + "id": "in.c-finance.customers", + "name": "customers", + "source_type": "keboola", + "bucket": "in.c-finance", + "source_table": "customers", + "query_mode": "local", + "description": "Customer data", + }, + ] + + +def _mock_attach(conn, url, token): + """Mock extension attach: creates kbc alias so views can be created.""" + conn.execute("ATTACH ':memory:' AS kbc") + return True + + +def _write_parquet(pq_path, data_sql="SELECT 1 AS id, 'test' AS name"): + local_conn = duckdb.connect() + local_conn.execute(f"COPY ({data_sql}) TO '{pq_path}' (FORMAT PARQUET)") + local_conn.close() + + +class TestKeboolaExtractorFull: + def test_run_with_extension_creates_contract_compliant_db(self, output_dir, sample_local_configs): + """run() with extension produces extract.duckdb that passes contract validation.""" + from connectors.keboola.extractor import run + + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 1 AS id, 'Alice' AS name") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + result = run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc") + + assert result["tables_extracted"] == 2 + assert result["tables_failed"] == 0 + assert result["errors"] == [] + + db_path = str(Path(output_dir) / "extract.duckdb") + validate_extract_contract(db_path) + + def test_run_fallback_to_legacy_client(self, output_dir): + """When DuckDB extension unavailable, falls back to legacy client.""" + from connectors.keboola.extractor import run + + configs = [{"name": "t", "id": "in.c-test.t", "query_mode": "local", "description": ""}] + + def mock_legacy(tc, pq_path, url, token): + _write_parquet(pq_path, "SELECT 99 AS value") + + with patch("connectors.keboola.extractor._try_attach_extension", return_value=False), \ + patch("connectors.keboola.extractor._extract_via_legacy", side_effect=mock_legacy): + result = run(output_dir, configs, "https://kbc.example.com", "token-abc") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 0 + # Verify data is actually readable (parquet stores integers as int, not str) + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True) + row = conn.execute("SELECT value FROM t").fetchone() + conn.close() + assert row[0] == 99 + + def test_meta_table_schema_correct(self, output_dir): + """_meta table must have exactly the required columns in the right order.""" + from connectors.keboola.extractor import run + + configs = [{"name": "t", "query_mode": "local", "description": "desc"}] + + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 1 AS x") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + run(output_dir, configs, "https://kbc.example.com", "token-abc") + + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True) + cols = conn.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name='_meta' ORDER BY ordinal_position" + ).fetchall() + conn.close() + assert [c[0] for c in cols] == [ + "table_name", "description", "rows", "size_bytes", "extracted_at", "query_mode" + ] + + def test_remote_attach_table_created_for_remote_tables(self, output_dir): + """_remote_attach table is created when any table has query_mode='remote'.""" + from connectors.keboola.extractor import run + + def mock_attach_with_schema(conn, url, token): + conn.execute("ATTACH ':memory:' AS kbc") + conn.execute('CREATE SCHEMA kbc."in.c-events"') + conn.execute('CREATE TABLE kbc."in.c-events"."big_table" (id VARCHAR)') + return True + + configs = [{ + "name": "big_table", + "bucket": "in.c-events", + "source_table": "big_table", + "query_mode": "remote", + "description": "Large remote table", + }] + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=mock_attach_with_schema): + result = run(output_dir, configs, "https://kbc.example.com", "token-abc") + + assert result["tables_extracted"] == 1 + + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True) + ra = conn.execute("SELECT alias, extension, url, token_env FROM _remote_attach").fetchone() + conn.close() + + assert ra[0] == "kbc" + assert ra[1] == "keboola" + assert ra[2] == "https://kbc.example.com" + assert ra[3] == "KEBOOLA_STORAGE_TOKEN" + + def test_remote_tables_not_downloaded(self, output_dir): + """Remote tables have no parquet file — they are views pointing to kbc.""" + from connectors.keboola.extractor import run + + def mock_attach_with_schema(conn, url, token): + conn.execute("ATTACH ':memory:' AS kbc") + conn.execute('CREATE SCHEMA kbc."in.c-big"') + conn.execute('CREATE TABLE kbc."in.c-big"."events" (id VARCHAR)') + return True + + configs = [{ + "name": "events", + "bucket": "in.c-big", + "source_table": "events", + "query_mode": "remote", + "description": "", + }] + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=mock_attach_with_schema): + run(output_dir, configs, "https://kbc.example.com", "token-abc") + + # No parquet file for remote table + assert not (Path(output_dir) / "data" / "events.parquet").exists() + + # _meta has remote query_mode + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"), read_only=True) + row = conn.execute("SELECT query_mode FROM _meta WHERE table_name='events'").fetchone() + conn.close() + assert row[0] == "remote" + + def test_partial_failure_continues(self, output_dir, sample_local_configs): + """A single table failure should not abort remaining tables.""" + from connectors.keboola.extractor import run + + call_count = [0] + + def failing_first(conn, tc, pq_path): + call_count[0] += 1 + if call_count[0] == 1: + raise RuntimeError("Connection reset") + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=failing_first): + result = run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 1 + assert len(result["errors"]) == 1 + assert "Connection reset" in result["errors"][0]["error"] + + def test_create_mock_extract_contract(self, extracts_dir): + """create_mock_extract helper produces contract-compliant extract.duckdb.""" + db_path = create_mock_extract(extracts_dir, "keboola", [ + {"name": "orders", "data": [{"id": "1", "amount": "100"}], "query_mode": "local"}, + ]) + validate_extract_contract(str(db_path)) + + def test_data_directory_created(self, output_dir, sample_local_configs): + """data/ subdirectory is created alongside extract.duckdb.""" + from connectors.keboola.extractor import run + + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), \ + patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + run(output_dir, sample_local_configs, "https://kbc.example.com", "token-abc") + + assert (Path(output_dir) / "data").is_dir() + assert (Path(output_dir) / "data" / "orders.parquet").exists() + assert (Path(output_dir) / "data" / "customers.parquet").exists() diff --git a/tests/test_llm_providers_full.py b/tests/test_llm_providers_full.py new file mode 100644 index 0000000..21570f7 --- /dev/null +++ b/tests/test_llm_providers_full.py @@ -0,0 +1,273 @@ +"""Full tests for LLM provider factory and extractors.""" + +import json +from unittest.mock import MagicMock, patch + +import anthropic +import openai +import pytest + +from connectors.llm.anthropic_provider import AnthropicExtractor +from connectors.llm.exceptions import ( + LLMAuthError, + LLMFormatError, + LLMRateLimitError, + LLMRefusalError, + LLMTimeoutError, + LLMUnsupportedError, +) +from connectors.llm.factory import DEFAULT_MODEL, create_extractor +from connectors.llm.openai_compat import OpenAICompatExtractor, _extract_json_from_text + + +# --------------------------------------------------------------------------- +# Mock response helpers +# --------------------------------------------------------------------------- + + +def _anthropic_response(text: str, stop_reason: str = "end_turn"): + block = MagicMock() + block.text = text + resp = MagicMock() + resp.content = [block] + resp.stop_reason = stop_reason + return resp + + +def _openai_response(content: str | None, finish_reason: str = "stop"): + message = MagicMock() + message.content = content + choice = MagicMock() + choice.message = message + choice.finish_reason = finish_reason + resp = MagicMock() + resp.choices = [choice] + return resp + + +_SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}} + + +# --------------------------------------------------------------------------- +# Factory tests +# --------------------------------------------------------------------------- + + +class TestCreateExtractor: + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_anthropic_provider_returns_anthropic_extractor(self, _mock): + config = {"provider": "anthropic", "api_key": "sk-ant-test"} + ext = create_extractor(config) + assert isinstance(ext, AnthropicExtractor) + + @patch("connectors.llm.openai_compat.openai.OpenAI") + @patch("connectors.llm.openai_compat.httpx.Client") + def test_openai_compat_provider_returns_openai_extractor(self, _mock_http, _mock_oai): + config = { + "provider": "openai_compat", + "api_key": "sk-test", + "base_url": "https://api.openai.com/v1", + } + ext = create_extractor(config) + assert isinstance(ext, OpenAICompatExtractor) + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_legacy_anthropic_key_format(self, _mock): + """anthropic_api_key (legacy format) still creates AnthropicExtractor.""" + config = {"anthropic_api_key": "sk-ant-legacy"} + ext = create_extractor(config) + assert isinstance(ext, AnthropicExtractor) + + def test_missing_provider_raises_value_error(self): + with pytest.raises(ValueError, match="ai.provider is required"): + create_extractor({"api_key": "sk-test"}) + + def test_empty_config_raises_value_error(self): + with pytest.raises(ValueError): + create_extractor({}) + + def test_unknown_provider_raises_value_error(self): + with pytest.raises(ValueError, match="Unknown ai.provider"): + create_extractor({"provider": "cohere", "api_key": "sk-test"}) + + @patch("connectors.llm.openai_compat.openai.OpenAI") + @patch("connectors.llm.openai_compat.httpx.Client") + def test_openai_compat_missing_base_url_raises(self, _mock_http, _mock_oai): + with pytest.raises(ValueError, match="base_url is required"): + create_extractor({"provider": "openai_compat", "api_key": "sk-test"}) + + def test_empty_api_key_raises_value_error(self): + with pytest.raises(ValueError, match="api_key"): + create_extractor({"provider": "anthropic", "api_key": ""}) + + +# --------------------------------------------------------------------------- +# AnthropicExtractor tests +# --------------------------------------------------------------------------- + + +class TestAnthropicExtractor: + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_extract_json_success(self, mock_cls): + """extract_json returns parsed dict on successful API call.""" + mock_client = MagicMock() + mock_cls.return_value = mock_client + mock_client.messages.create.return_value = _anthropic_response('{"value": "hello"}') + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + result = ext.extract_json("prompt", 1000, _SCHEMA, "test_schema") + + assert result == {"value": "hello"} + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_auth_error_raises_immediately(self, mock_cls): + """AuthenticationError is raised immediately without retry.""" + mock_client = MagicMock() + mock_cls.return_value = mock_client + mock_client.messages.create.side_effect = anthropic.AuthenticationError( + message="Invalid key", response=MagicMock(), body={} + ) + + ext = AnthropicExtractor(api_key="bad-key", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMAuthError): + ext.extract_json("prompt", 1000, _SCHEMA, "test_schema") + + # Should only be called once — no retry + assert mock_client.messages.create.call_count == 1 + + @patch("connectors.llm.anthropic_provider.time.sleep") + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_rate_limit_retries_and_raises(self, mock_cls, mock_sleep): + """RateLimitError is retried MAX_RETRIES times then raises LLMRateLimitError.""" + from connectors.llm.anthropic_provider import MAX_RETRIES + + mock_client = MagicMock() + mock_cls.return_value = mock_client + mock_client.messages.create.side_effect = anthropic.RateLimitError( + message="Rate limited", response=MagicMock(), body={} + ) + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMRateLimitError): + ext.extract_json("prompt", 1000, _SCHEMA, "test_schema") + + assert mock_client.messages.create.call_count == MAX_RETRIES + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_truncated_response_raises_format_error(self, mock_cls): + """max_tokens stop_reason raises LLMFormatError.""" + mock_client = MagicMock() + mock_cls.return_value = mock_client + mock_client.messages.create.return_value = _anthropic_response( + '{"partial":', stop_reason="max_tokens" + ) + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMFormatError, match="truncated"): + ext.extract_json("prompt", 10, _SCHEMA, "test_schema") + + +# --------------------------------------------------------------------------- +# OpenAICompatExtractor tests +# --------------------------------------------------------------------------- + + +class TestOpenAICompatExtractor: + def _make_extractor(self, structured_output: str = "auto") -> OpenAICompatExtractor: + with patch("connectors.llm.openai_compat.openai.OpenAI"), \ + patch("connectors.llm.openai_compat.httpx.Client"): + return OpenAICompatExtractor( + api_key="sk-test", + base_url="https://api.example.com/v1", + model="gpt-4o-mini", + structured_output=structured_output, + ) + + def test_extract_json_success_json_schema(self): + """extract_json succeeds with json_schema strategy.""" + ext = self._make_extractor() + ext._client = MagicMock() + ext._client.chat.completions.create.return_value = _openai_response('{"value": "ok"}') + + result = ext.extract_json("prompt", 1000, _SCHEMA, "test") + assert result == {"value": "ok"} + + def test_strategy_cascade_falls_back_on_bad_request(self): + """json_schema unsupported -> falls back to json_object strategy (auto mode).""" + ext = self._make_extractor(structured_output="auto") + ext._client = MagicMock() + + # First call (json_schema) raises BadRequestError about response_format + bad_request_error = openai.BadRequestError( + message="response_format json_schema not supported", + response=MagicMock(status_code=400), + body={"error": {"message": "response_format json_schema not supported"}}, + ) + success_response = _openai_response('{"value": "fallback"}') + ext._client.chat.completions.create.side_effect = [bad_request_error, success_response] + + result = ext.extract_json("prompt", 1000, _SCHEMA, "test") + assert result == {"value": "fallback"} + + def test_auth_error_raises_immediately(self): + """AuthenticationError is not retried.""" + ext = self._make_extractor() + ext._client = MagicMock() + ext._client.chat.completions.create.side_effect = openai.AuthenticationError( + message="Invalid key", + response=MagicMock(status_code=401), + body={}, + ) + + with pytest.raises(LLMAuthError): + ext.extract_json("prompt", 1000, _SCHEMA, "test") + + assert ext._client.chat.completions.create.call_count == 1 + + def test_strict_mode_raises_unsupported(self): + """strict mode does not fall back; raises LLMUnsupportedError.""" + ext = self._make_extractor(structured_output="strict") + ext._client = MagicMock() + ext._client.chat.completions.create.side_effect = openai.BadRequestError( + message="json_schema not supported", + response=MagicMock(status_code=400), + body={"error": {"message": "json_schema not supported"}}, + ) + + with pytest.raises(LLMUnsupportedError): + ext.extract_json("prompt", 1000, _SCHEMA, "test") + + def test_refusal_raises_immediately(self): + """Empty content (refusal) raises LLMRefusalError.""" + ext = self._make_extractor() + ext._client = MagicMock() + ext._client.chat.completions.create.return_value = _openai_response(None) + + with pytest.raises(LLMRefusalError): + ext.extract_json("prompt", 1000, _SCHEMA, "test") + + +# --------------------------------------------------------------------------- +# _extract_json_from_text tests +# --------------------------------------------------------------------------- + + +class TestExtractJsonFromText: + def test_direct_json_parse(self): + assert _extract_json_from_text('{"key": "val"}') == {"key": "val"} + + def test_strips_markdown_code_fence(self): + text = '```json\n{"key": "fenced"}\n```' + assert _extract_json_from_text(text) == {"key": "fenced"} + + def test_strips_plain_code_fence(self): + text = "```\n{\"key\": \"plain\"}\n```" + assert _extract_json_from_text(text) == {"key": "plain"} + + def test_brace_extraction_fallback(self): + text = "Here is the JSON: {\"key\": \"brace\"} and some trailing text." + assert _extract_json_from_text(text) == {"key": "brace"} + + def test_raises_format_error_on_invalid(self): + with pytest.raises(LLMFormatError): + _extract_json_from_text("This is definitely not JSON at all.")