diff --git a/config/.env.template b/config/.env.template index e85e3f7..b604c10 100644 --- a/config/.env.template +++ b/config/.env.template @@ -20,3 +20,6 @@ GOOGLE_CLIENT_SECRET= # JIRA_WEBHOOK_SECRET= # JIRA_SLA_API_TOKEN= # ANTHROPIC_API_KEY= +# LLM API key for proxy routing (referenced as ${LLM_API_KEY} in instance.yaml) +# Provider and model configured in instance.yaml ai: section +# LLM_API_KEY= diff --git a/config/instance.yaml.example b/config/instance.yaml.example index 17d28a1..5c542d1 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -153,8 +153,57 @@ jira: cloud_id: "" # --- Corporate Memory AI (optional) --- +# Extracts shared knowledge from team members' CLAUDE.local.md files. +# Provider: "anthropic" (direct API) or "openai_compat" (LiteLLM, OpenRouter, Azure, etc.) ai: - anthropic_api_key: "${ANTHROPIC_API_KEY}" + provider: "anthropic" # or "openai_compat" + api_key: "${ANTHROPIC_API_KEY}" # or "${LLM_API_KEY}" for proxy + # base_url: "https://litellm.example.com" # required for openai_compat + model: "claude-haiku-4-5-20251001" # any model available on your provider + # --- Structured output quality control --- + # AI models can return JSON in three ways, each with different reliability: + # + # Layer 1 - "json_schema" (best): + # The provider enforces an exact schema. Every field, type, and structure + # is guaranteed. Available on: Anthropic, OpenAI, Claude via LiteLLM. + # + # Layer 2 - "json_object" (good): + # The provider guarantees valid JSON, but does not enforce a specific schema. + # Fields may be missing or have wrong types. Available on most providers. + # + # Layer 3 - "prompt" (acceptable): + # The AI is asked to respond in JSON via instructions in the prompt. + # No technical enforcement -- the model may still return invalid JSON. + # Works everywhere, but least reliable. + # + # "strict" = only Layer 1. Fail if provider doesn't support json_schema. + # Use when data quality is non-negotiable. + # "json" = Layer 1, fall back to Layer 2. No prompt-based fallback. + # Good balance of quality and compatibility. + # "auto" = All three layers as progressive fallback. Maximum compatibility. + # Use when you'd rather get imperfect data than no data. + structured_output: "auto" + +# Legacy format (still supported, equivalent to provider: "anthropic"): +# ai: +# anthropic_api_key: "${ANTHROPIC_API_KEY}" + +# Examples: +# --- LiteLLM proxy --- +# ai: +# provider: "openai_compat" +# base_url: "https://litellm.example.com" +# api_key: "${LLM_API_KEY}" +# model: "claude-haiku-4-5-20251001" +# structured_output: "strict" +# +# --- OpenRouter --- +# ai: +# provider: "openai_compat" +# base_url: "https://openrouter.ai/api/v1" +# api_key: "${OPENROUTER_API_KEY}" +# model: "anthropic/claude-3-haiku" +# structured_output: "auto" # --- User display (for Corporate Memory avatars) --- users: {} diff --git a/connectors/llm/__init__.py b/connectors/llm/__init__.py new file mode 100644 index 0000000..ea01dd9 --- /dev/null +++ b/connectors/llm/__init__.py @@ -0,0 +1,11 @@ +"""LLM connector module for structured extraction. + +Provides a provider-agnostic interface for extracting structured JSON +from language models. Supports Anthropic (native) and OpenAI-compatible +providers with automatic fallback strategies for structured output. +""" + +from .base import StructuredExtractor +from .factory import create_extractor + +__all__ = ["StructuredExtractor", "create_extractor"] diff --git a/connectors/llm/anthropic_provider.py b/connectors/llm/anthropic_provider.py new file mode 100644 index 0000000..df0e050 --- /dev/null +++ b/connectors/llm/anthropic_provider.py @@ -0,0 +1,152 @@ +"""Anthropic provider for structured JSON extraction. + +Uses the Anthropic API with native structured output (json_schema) +for reliable JSON extraction. Includes retry logic for transient errors. +""" + +import json +import logging +import time + +import anthropic + +from .exceptions import ( + LLMAuthError, + LLMFormatError, + LLMRateLimitError, + LLMRefusalError, + LLMTimeoutError, +) + +logger = logging.getLogger(__name__) + +# Retry configuration +MAX_RETRIES = 3 +INITIAL_BACKOFF_SECONDS = 2 +BACKOFF_MULTIPLIER = 2 + + +class AnthropicExtractor: + """Structured JSON extractor using the Anthropic API. + + Uses output_config with json_schema format for structured output. + Retries transient errors (rate limit, timeout, connection) with + exponential backoff. + """ + + def __init__(self, api_key: str, model: str) -> None: + """Initialize the Anthropic extractor. + + Args: + api_key: Anthropic API key. + model: Model identifier (e.g., "claude-haiku-4-5-20251001"). + """ + self._client = anthropic.Anthropic(api_key=api_key) + self._model = model + + def extract_json( + self, + prompt: str, + max_tokens: int, + json_schema: dict, + schema_name: str, + ) -> dict: + """Extract structured JSON using the Anthropic API. + + Args: + prompt: The extraction prompt to send to the model. + max_tokens: Maximum tokens in the response. + json_schema: JSON Schema that the response must conform to. + schema_name: Human-readable name for the schema. + + Returns: + Parsed JSON dictionary conforming to the provided schema. + + Raises: + LLMAuthError: Invalid API key. + LLMRateLimitError: Rate limited after all retries. + LLMTimeoutError: Timeout/connection error after all retries. + LLMFormatError: Response is not valid JSON. + LLMRefusalError: Model refused to respond. + """ + last_exception: Exception | None = None + + for attempt in range(1, MAX_RETRIES + 1): + try: + return self._attempt_extraction( + prompt, max_tokens, json_schema, schema_name, attempt, + ) + except LLMAuthError: + raise + except LLMRefusalError: + raise + except (LLMRateLimitError, LLMTimeoutError) as e: + last_exception = e + if attempt < MAX_RETRIES: + delay = INITIAL_BACKOFF_SECONDS * (BACKOFF_MULTIPLIER ** (attempt - 1)) + logger.warning( + "Transient error on attempt %d/%d for model %s, " + "retrying in %ds: %s", + attempt, MAX_RETRIES, self._model, delay, + type(e).__name__, + ) + time.sleep(delay) + + raise last_exception # type: ignore[misc] + + def _attempt_extraction( + self, + prompt: str, + max_tokens: int, + json_schema: dict, + schema_name: str, + attempt: int, + ) -> dict: + """Single extraction attempt against the Anthropic API.""" + logger.info( + "Anthropic extraction attempt %d/%d, model=%s, schema=%s", + attempt, MAX_RETRIES, self._model, schema_name, + ) + + try: + response = self._client.messages.create( + model=self._model, + max_tokens=max_tokens, + messages=[{"role": "user", "content": prompt}], + output_config={ + "format": { + "type": "json_schema", + "schema": json_schema, + }, + }, + ) + except anthropic.AuthenticationError as e: + raise LLMAuthError("Anthropic authentication failed (check API key)") from e + except anthropic.RateLimitError as e: + raise LLMRateLimitError("Anthropic rate limited") from e + except (anthropic.APITimeoutError, anthropic.APIConnectionError) as e: + raise LLMTimeoutError( + f"Anthropic connection error ({type(e).__name__})" + ) from e + + # Check for truncation - raise and let outer retry loop handle it + if response.stop_reason == "max_tokens": + raise LLMFormatError( + f"Response truncated (max_tokens) for schema {schema_name}" + ) + + # Check for refusal + if response.stop_reason == "end_turn" and not response.content: + raise LLMRefusalError( + f"Model refused to generate response for schema {schema_name}" + ) + + # Parse JSON from response + try: + text = response.content[0].text + return json.loads(text) + except (json.JSONDecodeError, IndexError, AttributeError) as e: + raise LLMFormatError( + f"Failed to parse Anthropic response as JSON for " + f"schema {schema_name} ({type(e).__name__})" + ) from e diff --git a/connectors/llm/base.py b/connectors/llm/base.py new file mode 100644 index 0000000..fe4de51 --- /dev/null +++ b/connectors/llm/base.py @@ -0,0 +1,41 @@ +"""Base protocol for structured extraction from LLMs.""" + +from typing import Protocol + + +class StructuredExtractor(Protocol): + """Protocol for structured JSON extraction from language models. + + This is a structured extraction interface, NOT a general LLM chat + interface. Implementations must return parsed JSON matching the + provided schema. + """ + + def extract_json( + self, + prompt: str, + max_tokens: int, + json_schema: dict, + schema_name: str, + ) -> dict: + """Extract structured JSON from a prompt. + + Args: + prompt: The extraction prompt to send to the model. + max_tokens: Maximum tokens in the response. + json_schema: JSON Schema that the response must conform to. + schema_name: Human-readable name for the schema (used in + logging and error messages). + + Returns: + Parsed JSON dictionary conforming to the provided schema. + + Raises: + LLMError: Base class for all LLM-related errors. + LLMAuthError: Invalid API key (permanent, do not retry). + LLMRateLimitError: Rate limited (transient, retry with backoff). + LLMTimeoutError: Timeout or connection error (transient, retry). + LLMFormatError: Invalid JSON or unexpected structure. + LLMRefusalError: Model refused due to safety filter. + """ + ... diff --git a/connectors/llm/exceptions.py b/connectors/llm/exceptions.py new file mode 100644 index 0000000..6c9606c --- /dev/null +++ b/connectors/llm/exceptions.py @@ -0,0 +1,55 @@ +"""Exception hierarchy for LLM connector errors. + +All exceptions inherit from LLMError so callers can catch the base +class for broad error handling or specific subclasses for targeted +recovery strategies. +""" + + +class LLMError(Exception): + """Base exception for all LLM-related errors.""" + + +class LLMAuthError(LLMError): + """Invalid API key or authentication failure. + + This is a permanent error - do not retry. + """ + + +class LLMRateLimitError(LLMError): + """Rate limited by the provider. + + This is a transient error - retry with exponential backoff. + """ + + +class LLMTimeoutError(LLMError): + """Timeout or connection error. + + This is a transient error - retry with exponential backoff. + """ + + +class LLMFormatError(LLMError): + """Invalid JSON or unexpected response structure. + + The model returned content that could not be parsed as valid JSON + or did not match the expected schema. + """ + + +class LLMUnsupportedError(LLMError): + """Provider does not support a required feature. + + For example, the provider does not support structured output + (json_schema response format) and the configuration does not + allow fallback strategies. + """ + + +class LLMRefusalError(LLMError): + """Model refused to generate a response. + + Typically triggered by safety filters or content policy violations. + """ diff --git a/connectors/llm/factory.py b/connectors/llm/factory.py new file mode 100644 index 0000000..c160dcc --- /dev/null +++ b/connectors/llm/factory.py @@ -0,0 +1,137 @@ +"""Factory for creating structured extractors from instance configuration. + +Reads the ai: section from instance.yaml (already resolved by config/loader.py) +and creates the appropriate StructuredExtractor implementation. +""" + +import logging +from urllib.parse import urlparse + +from .anthropic_provider import AnthropicExtractor +from .base import StructuredExtractor +from .openai_compat import OpenAICompatExtractor + +logger = logging.getLogger(__name__) + +# Default model when not specified in config +DEFAULT_MODEL = "claude-haiku-4-5-20251001" + +# Default structured output strategy +DEFAULT_STRUCTURED_OUTPUT = "auto" + + +def create_extractor(ai_config: dict) -> StructuredExtractor: + """Create a structured extractor from the ai: config section. + + Supports two configuration formats: + + New format (explicit provider): + ai: + provider: anthropic | openai_compat + api_key: ${ANTHROPIC_API_KEY} + model: claude-haiku-4-5-20251001 + base_url: https://api.example.com/v1 # required for openai_compat + structured_output: auto # strict | json | auto + + Legacy format (backward compatible): + ai: + anthropic_api_key: ${ANTHROPIC_API_KEY} + + Args: + ai_config: The ai: section dict from instance.yaml, + already resolved by config/loader.py. + + Returns: + A StructuredExtractor instance. + + Raises: + ValueError: If configuration is invalid or incomplete. + """ + if not ai_config or not isinstance(ai_config, dict): + raise ValueError( + "ai: section in instance.yaml must be a non-empty dict. " + "Example:\n ai:\n provider: anthropic\n api_key: ${ANTHROPIC_API_KEY}" + ) + + provider = ai_config.get("provider") + + # Legacy format detection: anthropic_api_key present, no provider + if not provider and "anthropic_api_key" in ai_config: + api_key = ai_config["anthropic_api_key"] + _validate_api_key(api_key) + model = ai_config.get("model", DEFAULT_MODEL) + logger.info( + "Creating AnthropicExtractor (legacy config), model=%s", model + ) + return AnthropicExtractor(api_key=api_key, model=model) + + if not provider: + raise ValueError( + "ai.provider is required in instance.yaml. " + "Supported: 'anthropic', 'openai_compat'. " + "Hint: use ${ENV_VAR} syntax for secrets." + ) + + api_key = ai_config.get("api_key", "") + _validate_api_key(api_key) + model = ai_config.get("model", DEFAULT_MODEL) + + if provider == "anthropic": + logger.info("Creating AnthropicExtractor, model=%s", model) + return AnthropicExtractor(api_key=api_key, model=model) + + elif provider == "openai_compat": + base_url = ai_config.get("base_url", "") + if not base_url: + raise ValueError( + "ai.base_url is required when provider is 'openai_compat'. " + "Example: base_url: https://api.openai.com/v1" + ) + structured_output = ai_config.get( + "structured_output", DEFAULT_STRUCTURED_OUTPUT, + ) + if structured_output not in ("strict", "json", "auto"): + raise ValueError( + f"ai.structured_output must be 'strict', 'json', or 'auto', " + f"got '{structured_output}'" + ) + + safe_url = _sanitize_url(base_url) + logger.info( + "Creating OpenAICompatExtractor, url=%s, model=%s, " + "structured_output=%s", + safe_url, model, structured_output, + ) + return OpenAICompatExtractor( + api_key=api_key, + base_url=base_url, + model=model, + structured_output=structured_output, + ) + + else: + raise ValueError( + f"Unknown ai.provider '{provider}'. " + f"Supported: 'anthropic', 'openai_compat'. " + f"Hint: use ${{ENV_VAR}} syntax for secrets." + ) + + +def _validate_api_key(api_key: str) -> None: + """Validate that an API key is present and non-empty. + + Raises: + ValueError: If api_key is empty or missing. + """ + if not api_key or not api_key.strip(): + raise ValueError( + "ai.api_key (or ai.anthropic_api_key) must not be empty. " + "Check that the corresponding environment variable is set " + "and referenced with ${ENV_VAR} syntax in instance.yaml." + ) + + +def _sanitize_url(url: str) -> str: + """Extract scheme://host from a URL for safe logging.""" + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" diff --git a/connectors/llm/openai_compat.py b/connectors/llm/openai_compat.py new file mode 100644 index 0000000..d11621b --- /dev/null +++ b/connectors/llm/openai_compat.py @@ -0,0 +1,305 @@ +"""OpenAI-compatible provider for structured JSON extraction. + +Supports any OpenAI-compatible API endpoint with progressive fallback +for structured output: json_schema -> json_object -> prompt-based JSON. +""" + +import json +import logging +import re +import time +from urllib.parse import urlparse + +import openai + +from .exceptions import ( + LLMAuthError, + LLMFormatError, + LLMRateLimitError, + LLMRefusalError, + LLMTimeoutError, + LLMUnsupportedError, +) + +logger = logging.getLogger(__name__) + +# Retry configuration +MAX_RETRIES = 3 +INITIAL_BACKOFF_SECONDS = 2 +BACKOFF_MULTIPLIER = 2 + +# Regex to strip markdown code fences and extract JSON +_JSON_FENCE_PATTERN = re.compile(r"```(?:json)?\s*\n?(.*?)\n?\s*```", re.DOTALL) + + +def _sanitize_url(url: str) -> str: + """Extract scheme://host from a URL for safe logging. + + Never logs path, query params, or fragments which may contain + tokens or sensitive information. + """ + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + +def _extract_json_from_text(text: str) -> dict: + """Parse JSON from potentially markdown-wrapped text. + + Tries direct parsing first, then strips markdown code fences, + then falls back to finding content between first { and last }. + + Raises: + LLMFormatError: If no valid JSON can be extracted. + """ + # Try direct parse first + stripped = text.strip() + try: + return json.loads(stripped) + except json.JSONDecodeError: + pass + + # Try stripping markdown code fences + fence_match = _JSON_FENCE_PATTERN.search(stripped) + if fence_match: + try: + return json.loads(fence_match.group(1).strip()) + except json.JSONDecodeError: + pass + + # Fallback: find JSON between first { and last } + first_brace = stripped.find("{") + last_brace = stripped.rfind("}") + if first_brace != -1 and last_brace > first_brace: + try: + return json.loads(stripped[first_brace:last_brace + 1]) + except json.JSONDecodeError: + pass + + raise LLMFormatError(f"Could not extract valid JSON from model response") + + +class OpenAICompatExtractor: + """Structured JSON extractor for OpenAI-compatible APIs. + + Supports progressive fallback for structured output based on the + configured strategy: + - "strict": json_schema only, raises LLMUnsupportedError if not supported + - "json": json_schema -> json_object fallback + - "auto": json_schema -> json_object -> prompt-based JSON (default) + """ + + def __init__( + self, + api_key: str, + base_url: str, + model: str, + structured_output: str = "auto", + ) -> None: + """Initialize the OpenAI-compatible extractor. + + Args: + api_key: API key for authentication. + base_url: Base URL of the OpenAI-compatible API. + model: Model identifier. + structured_output: Fallback strategy - "strict", "json", or "auto". + """ + self._client = openai.OpenAI(api_key=api_key, base_url=base_url) + self._model = model + self._structured_output = structured_output + self._safe_url = _sanitize_url(base_url) + + def extract_json( + self, + prompt: str, + max_tokens: int, + json_schema: dict, + schema_name: str, + ) -> dict: + """Extract structured JSON using an OpenAI-compatible API. + + Attempts structured output strategies in order of preference, + falling back as allowed by the configured strategy. + + Args: + prompt: The extraction prompt to send to the model. + max_tokens: Maximum tokens in the response. + json_schema: JSON Schema that the response must conform to. + schema_name: Human-readable name for the schema. + + Returns: + Parsed JSON dictionary conforming to the provided schema. + + Raises: + LLMAuthError: Invalid API key. + LLMRateLimitError: Rate limited after all retries. + LLMTimeoutError: Timeout/connection error after all retries. + LLMFormatError: Response is not valid JSON. + LLMRefusalError: Model refused to respond. + LLMUnsupportedError: Required feature not supported and no fallback allowed. + """ + strategies = self._get_strategies() + + for strategy in strategies: + try: + logger.info( + "OpenAI-compat extraction: url=%s, model=%s, strategy=%s, schema=%s", + self._safe_url, self._model, strategy, schema_name, + ) + return self._extract_with_strategy( + prompt, max_tokens, json_schema, schema_name, strategy, + ) + except LLMUnsupportedError: + logger.info( + "Strategy %s not supported at %s, trying next fallback", + strategy, self._safe_url, + ) + continue + + raise LLMUnsupportedError( + f"No supported structured output strategy for {self._safe_url} " + f"with configured mode '{self._structured_output}'" + ) + + def _get_strategies(self) -> list[str]: + """Get ordered list of strategies to try based on configuration.""" + if self._structured_output == "strict": + return ["json_schema"] + elif self._structured_output == "json": + return ["json_schema", "json_object"] + else: # "auto" + return ["json_schema", "json_object", "text"] + + def _extract_with_strategy( + self, + prompt: str, + max_tokens: int, + json_schema: dict, + schema_name: str, + strategy: str, + ) -> dict: + """Execute extraction with a specific structured output strategy.""" + last_exception: Exception | None = None + + for attempt in range(1, MAX_RETRIES + 1): + try: + return self._attempt_extraction( + prompt, max_tokens, json_schema, schema_name, + strategy, attempt, + ) + except LLMAuthError: + raise + except LLMRefusalError: + raise + except LLMUnsupportedError: + raise + except (LLMRateLimitError, LLMTimeoutError) as e: + last_exception = e + if attempt < MAX_RETRIES: + delay = INITIAL_BACKOFF_SECONDS * (BACKOFF_MULTIPLIER ** (attempt - 1)) + logger.warning( + "Transient error on attempt %d/%d for %s model %s, " + "retrying in %ds: %s", + attempt, MAX_RETRIES, self._safe_url, + self._model, delay, type(e).__name__, + ) + time.sleep(delay) + + raise last_exception # type: ignore[misc] + + def _attempt_extraction( + self, + prompt: str, + max_tokens: int, + json_schema: dict, + schema_name: str, + strategy: str, + attempt: int, + ) -> dict: + """Single extraction attempt with a specific strategy.""" + logger.info( + "OpenAI-compat attempt %d/%d, url=%s, model=%s, strategy=%s", + attempt, MAX_RETRIES, self._safe_url, self._model, strategy, + ) + + messages = [{"role": "user", "content": prompt}] + kwargs: dict = { + "model": self._model, + "max_tokens": max_tokens, + "messages": messages, + } + + if strategy == "json_schema": + kwargs["response_format"] = { + "type": "json_schema", + "json_schema": { + "name": schema_name, + "strict": True, + "schema": json_schema, + }, + } + elif strategy == "json_object": + kwargs["response_format"] = {"type": "json_object"} + elif strategy == "text": + # Append JSON instruction to prompt for text-based fallback + messages = [ + { + "role": "user", + "content": prompt + "\n\nIMPORTANT: Respond with valid JSON only, no markdown.", + }, + ] + kwargs["messages"] = messages + + try: + response = self._client.chat.completions.create(**kwargs) + except openai.AuthenticationError as e: + raise LLMAuthError( + f"OpenAI-compat authentication failed at {self._safe_url} (check API key)" + ) from e + except openai.RateLimitError as e: + raise LLMRateLimitError( + f"OpenAI-compat rate limited at {self._safe_url}" + ) from e + except (openai.APITimeoutError, openai.APIConnectionError) as e: + raise LLMTimeoutError( + f"OpenAI-compat connection error at {self._safe_url} ({type(e).__name__})" + ) from e + except openai.BadRequestError as e: + # json_schema format not supported by this endpoint + error_msg = str(e).lower() + if "response_format" in error_msg or "json_schema" in error_msg: + raise LLMUnsupportedError( + f"Structured output strategy '{strategy}' not supported " + f"at {self._safe_url}" + ) from e + raise LLMFormatError( + f"Bad request at {self._safe_url} ({type(e).__name__})" + ) from e + + choice = response.choices[0] + + # Check for truncation - raise and let outer retry loop handle it + if choice.finish_reason == "length": + raise LLMFormatError( + f"Response truncated (max_tokens) for schema {schema_name} " + f"at {self._safe_url}" + ) + + # Check for refusal + content = choice.message.content + if not content: + raise LLMRefusalError( + f"Model at {self._safe_url} refused to generate response " + f"for schema {schema_name}" + ) + + # Parse JSON from response + if strategy == "text": + return _extract_json_from_text(content) + + try: + return json.loads(content) + except json.JSONDecodeError as e: + raise LLMFormatError( + f"Failed to parse response as JSON for schema {schema_name} " + f"at {self._safe_url} ({type(e).__name__})" + ) from e diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 16e94f7..67cfe28 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -126,5 +126,6 @@ catalog: | `SENDGRID_API_KEY` | For password auth emails | | `TELEGRAM_BOT_TOKEN` | For Telegram notifications | | `ANTHROPIC_API_KEY` | For Corporate Memory AI | +| `LLM_API_KEY` | API key for LLM proxy (LiteLLM, OpenRouter, etc.) | | `JIRA_WEBHOOK_SECRET` | For Jira integration | | `CONFIG_DIR` | Override config directory path | diff --git a/docs/llm-routing.md b/docs/llm-routing.md new file mode 100644 index 0000000..fa0f222 --- /dev/null +++ b/docs/llm-routing.md @@ -0,0 +1,326 @@ +# Plan: Modular LLM Routing for Corporate Memory + +> Reviewed by: Claude Opus (author), Google Gemini, Claude Sonnet, OpenAI GPT-5.4 +> Feedback incorporated from all three external reviewers. + +## Context + +Corporate Memory is a feature that reads team members' local notes (CLAUDE.local.md), +sends them to a small AI model (Claude Haiku) for knowledge extraction, and builds +a shared knowledge base. Currently it's hardwired to call Anthropic's API directly. + +Different clients deploying this platform use different AI providers: + +| Client | AI Provider | Why | +|--------|------------|-----| +| Groupon | LiteLLM proxy | Corporate AI gateway, cost control, audit | +| Keboola | Direct Anthropic | Simple setup, single provider | +| Future client A | OpenRouter | Multi-model access, cost optimization | +| Future client B | Google Gemini | Existing Google Cloud relationship | + +**Problem**: The code only works with Anthropic. Adding a second client means duplicating +or rewriting the AI calling logic. + +**Solution**: Extract the AI calling logic into a modular connector that each instance +configures for its own provider. The connector lives in the open-source repo (code), +the configuration lives in the private instance repo (config). + +## Design Principles + +### 1. Structured Extraction, Not General AI + +This connector has one job: send a prompt, get back structured JSON. +It is NOT a general-purpose AI chat interface. The naming and interface reflect this: +`StructuredExtractor` (not "LLMProvider"), `extract_json()` (not "chat" or "generate"). + +This keeps the scope tight and the interface honest. If we need general AI capabilities +later, we build a separate abstraction. + +### 2. Instance Config Drives Provider Selection + +Each deployment configures its AI provider in `instance.yaml` (the same file that +already configures authentication, branding, data sources, and catalog integration). +Secrets use `${ENV_VAR}` references, resolved at load time by the existing config loader. + +The open-source code never knows which provider it's talking to. It receives a configured +extractor and calls `extract_json()`. + +### 3. Backward Compatibility + +Existing deployments using `ai.anthropic_api_key` in their config continue to work +without changes. The factory recognizes the legacy config shape and creates the +appropriate provider automatically. No migration step required for existing instances. + +### 4. Structured Output Strategy (Configurable) + +Not all AI providers support JSON schema enforcement equally. The connector supports +three levels, but the operator controls which are allowed: + +1. **JSON Schema mode** — provider enforces the exact schema (best quality) +2. **JSON Object mode** — provider guarantees valid JSON but no schema (good quality) +3. **Prompt-based JSON** — instructions in the prompt ask for JSON (acceptable quality) + +By default, all three layers are available as progressive fallback. But the operator +can restrict this in config: + +```yaml +ai: + provider: "openai_compat" + # --- Structured output quality control --- + # AI models can return JSON in three ways, each with different reliability: + # + # Layer 1 - "json_schema" (best): + # The provider enforces an exact schema. Every field, type, and structure + # is guaranteed. Available on: Anthropic, OpenAI, Claude via LiteLLM. + # + # Layer 2 - "json_object" (good): + # The provider guarantees valid JSON, but does not enforce a specific schema. + # Fields may be missing or have wrong types. Available on most providers. + # + # Layer 3 - "prompt" (acceptable): + # The AI is asked to respond in JSON via instructions in the prompt. + # No technical enforcement — the model may still return invalid JSON. + # Works everywhere, but least reliable. + # + # "strict" = only Layer 1. Fail if provider doesn't support json_schema. + # Use when data quality is non-negotiable. + # "json" = Layer 1, fall back to Layer 2. No prompt-based fallback. + # Good balance of quality and compatibility. + # "auto" = All three layers as progressive fallback. Maximum compatibility. + # Use when you'd rather get imperfect data than no data. + structured_output: "strict" +``` + +When set to `"strict"`, the connector will NOT fall back to weaker strategies. +If the provider doesn't support JSON schema, the extraction fails with a clear error. +This is the right choice when data quality is non-negotiable. + +### 5. Fail-Safe by Default + +- Missing config → Corporate Memory logs a warning and skips AI extraction (doesn't crash) +- AI call fails → item marked as "unsafe" (conservative, nothing leaks) +- Truncated response → detected and retried once +- Auth error → fails fast with clear message (don't retry forever) +- Rate limit → waits and retries with backoff + +### 6. Zero Secrets in Logs + +The connector NEVER logs: +- API keys, tokens, or any secret values +- Prompt content (may contain user notes with sensitive data) +- Response content (may contain extracted knowledge before sensitivity check) +- Full URLs with query parameters (may contain tokens) + +What IS logged: +- Provider type and model name +- Sanitized base URL (scheme + host only, no path/query) +- Structured output strategy selected +- Call duration (latency) +- Error classification (auth/rate_limit/timeout/format — never the error body) +- Whether fallback was triggered + +This is a hard rule, not a guideline. API keys and user content must never +appear in logs, stdout, stderr, or error messages propagated to callers. + +## Architecture + +### Where the code lives + +``` +OSS Repo (open-source, shared): + connectors/llm/ ← NEW: AI provider abstraction + base.py Interface definition + anthropic_provider.py Direct Anthropic API + openai_compat.py Any OpenAI-compatible proxy + factory.py Creates the right provider from config + + services/corporate_memory/ + collector.py ← MODIFIED: uses connector instead of direct API calls + +Instance Repo (private, per-client): + config/instance.yaml ← MODIFIED: new ai: section + .env ← MODIFIED: new LLM_API_KEY secret +``` + +### How it flows + +``` +instance.yaml (ai: section) + ↓ + Config loader resolves ${ENV_VAR} secrets + ↓ + Factory reads provider type, creates extractor + ↓ + Corporate Memory calls extractor.extract_json(prompt, schema) + ↓ + Extractor routes to the right API: + ├─ Anthropic SDK → api.anthropic.com/v1/messages + └─ OpenAI SDK → litellm.groupondev.com/v1/chat/completions + openrouter.ai/v1/chat/completions + any OpenAI-compatible endpoint +``` + +### Config examples + +**Groupon (LiteLLM proxy):** +```yaml +ai: + provider: "openai_compat" + base_url: "https://litellm.groupondev.com" + api_key: "${LLM_API_KEY}" + model: "claude-haiku-4-5-20251001" +``` + +**Keboola (direct Anthropic):** +```yaml +ai: + provider: "anthropic" + api_key: "${ANTHROPIC_API_KEY}" + model: "claude-haiku-4-5-20251001" +``` + +**Legacy (existing deployments, no changes needed):** +```yaml +ai: + anthropic_api_key: "${ANTHROPIC_API_KEY}" +``` + +## What We're Improving + +### A. From Hardwired to Pluggable + +**Before**: One provider, baked into the code. Changing provider = changing code. +**After**: Provider is a config choice. Switching from Anthropic to LiteLLM to OpenRouter +is a YAML change + secret rotation. No code touches needed. + +### B. From Fragile to Resilient + +**Before**: API error = entire collection run fails. No retries. +**After**: +- Transient errors (rate limits, timeouts, network) → automatic retry with backoff +- Permanent errors (bad API key, unsupported model) → fail fast, clear error message +- Truncated responses (model hit token limit) → detected, retried with note +- Model refuses request → logged, item skipped safely + +### C. From All-or-Nothing to Progressive Degradation + +**Before**: Structured output works or it doesn't. Binary. +**After**: Three fallback layers (schema → json_object → prompt-based). The connector +adapts to what each provider actually supports instead of assuming capabilities. + +### D. From Silent to Observable + +**Before**: No visibility into what the AI extraction does. +**After**: +- Which provider/model is being used (logged at startup) +- Which structured output strategy was selected (logged once) +- How long each call takes (logged per call) +- Whether fallback was triggered (logged as warning) +- Clear error classification in logs + +### E. From Coupled to Separated + +**Before**: AI provider choice is an engineering decision embedded in code. +**After**: AI provider choice is an operations decision in instance config. +Each client controls their own provider, model, and API gateway independently. + +## Error Handling Strategy + +| Error Type | What Happens | Why | +|-----------|-------------|-----| +| Missing `ai:` config | Corporate Memory skips AI extraction, logs warning | Don't crash the whole service | +| Invalid API key | Fail fast, log error, skip collection run | Don't waste retries on permanent failure | +| Rate limit (429) | Wait + retry with exponential backoff (3 attempts) | Transient, will resolve | +| Network timeout | Retry once, then fail | Might be transient | +| Truncated response | Detect via finish_reason, retry once | Model hit token limit | +| Model refusal | Log, mark item as unsafe | Conservative: don't share uncertain content | +| Invalid JSON response | Log, mark item as unsafe | Better to skip than crash | +| Structured output unsupported | Fall back to json_object, then prompt-based | Adapt to provider capabilities | + +## Scope Boundaries + +**In scope (v1):** +- Anthropic direct provider (existing behavior, tested) +- OpenAI-compatible proxy provider (LiteLLM, verified against Groupon proxy) +- Backward compatibility with existing `ai.anthropic_api_key` config +- Three-layer structured output fallback +- Custom error hierarchy (auth / rate limit / timeout / format) +- Retry with backoff for transient errors +- Corporate Memory collector integration + +**Explicitly NOT in scope (future):** +- Azure OpenAI, OpenRouter, Gemini — listed as "untested" until verified per-provider +- General-purpose AI chat/generation interface +- Streaming responses +- Multi-turn conversations +- Token usage tracking / cost monitoring (v2 consideration) +- Provider capability auto-detection at startup + +## Testing Strategy + +### Unit Tests (connector internals) +- Factory creates correct provider from each config shape +- Factory handles legacy `ai.anthropic_api_key` config +- Missing/invalid config raises clear errors +- Each provider formats API calls correctly (mocked SDK) +- Structured output fallback chain works +- Error classification (auth vs rate limit vs timeout) + +### Integration Tests (Corporate Memory behavior) +- Full collection run with mocked provider +- Skip when no files changed (hash check) +- Preserve existing item IDs across runs +- Sensitivity check runs only on new items +- Fail-closed on sensitivity check errors +- user_hashes.json written only after successful processing +- Graceful degradation when `ai:` config is missing + +### Manual Verification (before production) +- Dry-run against actual Groupon LiteLLM proxy +- Verify structured output works through proxy +- Verify sensitivity check works through proxy +- Full collection produces valid knowledge.json + +## Deployment + +The existing `deploy.sh` handles dependency installation from `requirements.txt`, +so no manual pip install is needed. The deployment sequence: + +1. Add `openai` to `requirements.txt` (OSS repo) +2. Update `collector.py` to use new connector (OSS repo) +3. Add `ai:` section to `instance.yaml` (instance repo) +4. Add `LLM_API_KEY` secret to GHA secrets and deploy.yml (instance repo) +5. Add `CONFIG_DIR` to the wrapper script `collect-knowledge` (OSS repo) +6. Push both repos → CI/CD deploys automatically +7. Verify via `--dry-run` on server + +**Rollback**: Revert both repos to previous commit. The legacy config path +means existing `ai.anthropic_api_key` still works if we need to roll back. + +## Files to Modify + +| File | Repo | Change | +|------|------|--------| +| `connectors/llm/` (5 new files) | OSS | New connector module | +| `services/corporate_memory/collector.py` | OSS | Use connector instead of direct API | +| `server/bin/collect-knowledge` | OSS | Add CONFIG_DIR | +| `requirements.txt` | OSS | Add `openai>=1.0.0` | +| `server/deploy.sh` | OSS | Add LLM_API_KEY to env propagation | +| `config/.env.template` | OSS | Document LLM_API_KEY | +| `config/instance.yaml.example` | OSS | Expanded ai: section with examples | +| `docs/CONFIGURATION.md` | OSS | Add AI provider docs | +| `tests/test_llm_connector.py` | OSS | New: connector tests | +| `tests/test_corporate_memory.py` | OSS | New/expanded: behavior tests | +| `config/instance.yaml` | Instance | Add ai: section for Groupon | +| `.github/workflows/deploy.yml` | Instance | Add LLM_API_KEY to .env | +| `env.example` | Instance | Document LLM_API_KEY | + +## Risk Assessment + +| Risk | Level | Mitigation | +|------|-------|-----------| +| LiteLLM structured output translation | Medium | Three-layer fallback + manual verification before deploy | +| Config migration breaks existing instances | Low | Backward compat shim for legacy config shape | +| New `openai` dependency conflicts | Low | Standard package, deploy.sh handles install | +| Corporate Memory regression | Medium | Expanded behavior tests covering all current logic | +| Systemd/wrapper script CONFIG_DIR | Low | Follows existing pattern from other services | diff --git a/requirements.txt b/requirements.txt index 914e03e..1ae6ad2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -55,6 +55,9 @@ sendgrid>=6.11.0 # anthropic - Claude API client for HAIKU-based knowledge extraction anthropic>=0.39.0 +# OpenAI-compatible API client for LLM proxy routing (LiteLLM, OpenRouter, etc.) +openai>=1.0.0 + # Sample data generation (development/testing) # faker - realistic synthetic data for demo datasets faker>=24.0.0 diff --git a/server/bin/collect-knowledge b/server/bin/collect-knowledge index d10494b..996dec2 100644 --- a/server/bin/collect-knowledge +++ b/server/bin/collect-knowledge @@ -27,5 +27,9 @@ if [[ -f "${REPO_DIR}/.env" ]]; then set +a fi +# Config directory for instance.yaml +export CONFIG_DIR="${APP_DIR}/instance/config" +export PYTHONPATH="${REPO_DIR}" + # Run the collector exec "$VENV_PYTHON" -m services.corporate_memory "$@" diff --git a/server/deploy.sh b/server/deploy.sh index dd9204e..ca5d445 100755 --- a/server/deploy.sh +++ b/server/deploy.sh @@ -313,7 +313,7 @@ log "Creating data sync .env file..." for var in TELEGRAM_BOT_TOKEN DESKTOP_JWT_SECRET SENDGRID_API_KEY \ JIRA_SLA_EMAIL JIRA_SLA_API_TOKEN JIRA_CLOUD_ID \ EMAIL_FROM_ADDRESS EMAIL_FROM_NAME ALLOWED_EMAILS \ - ANTHROPIC_API_KEY; do + ANTHROPIC_API_KEY LLM_API_KEY; do if [[ -n "${!var:-}" ]]; then echo "${var}=${!var}" fi diff --git a/services/corporate_memory/collector.py b/services/corporate_memory/collector.py index b0855c2..635c58a 100644 --- a/services/corporate_memory/collector.py +++ b/services/corporate_memory/collector.py @@ -19,7 +19,8 @@ from datetime import datetime, timezone from pathlib import Path from typing import Any -import anthropic +from connectors.llm import create_extractor +from connectors.llm.exceptions import LLMError from .prompts import CATALOG_REFRESH_PROMPT, SENSITIVITY_CHECK_PROMPT @@ -30,9 +31,6 @@ COLLECTION_LOG = CORPORATE_MEMORY_DIR / "collection.log" USER_HASHES_FILE = CORPORATE_MEMORY_DIR / "user_hashes.json" HOME_BASE = Path("/home") -# HAIKU model for cost-effective extraction -HAIKU_MODEL = "claude-haiku-4-5-20251001" - # Configure logging logging.basicConfig( level=logging.INFO, @@ -125,14 +123,6 @@ def _generate_id(content: str) -> str: return f"km_{h}" -def _get_claude_client() -> anthropic.Anthropic: - """Get Anthropic client with API key from environment.""" - api_key = os.environ.get("ANTHROPIC_API_KEY") - if not api_key: - raise ValueError("ANTHROPIC_API_KEY environment variable is required") - return anthropic.Anthropic(api_key=api_key) - - def _find_claude_local_files() -> list[tuple[str, Path]]: """Find all CLAUDE.local.md files in user home directories. @@ -309,7 +299,7 @@ def _process_catalog_response( return result -def check_sensitivity(client: anthropic.Anthropic, item: dict) -> bool: +def check_sensitivity(extractor, item: dict) -> bool: """Check if a knowledge item is safe to share. Returns True if safe, False if contains sensitive data. @@ -321,29 +311,20 @@ def check_sensitivity(client: anthropic.Anthropic, item: dict) -> bool: ) try: - response = client.messages.create( - model=HAIKU_MODEL, - max_tokens=256, - messages=[{"role": "user", "content": prompt}], - output_config={ - "format": { - "type": "json_schema", - "schema": SENSITIVITY_SCHEMA, - }, - }, + result = extractor.extract_json( + prompt, max_tokens=256, + json_schema=SENSITIVITY_SCHEMA, schema_name="sensitivity_check", ) - result = json.loads(response.content[0].text) - if not result.get("safe", False): reason = result.get("reason", "unknown") - logger.info(f"Filtered sensitive item: {item.get('title', 'untitled')} - {reason}") + logger.info("Filtered sensitive item id=%s - %s", item.get("id", "unknown"), reason) return False return True - except (json.JSONDecodeError, anthropic.APIError) as e: - logger.warning(f"Sensitivity check failed, assuming unsafe: {e}") + except LLMError as e: + logger.warning("Sensitivity check failed, assuming unsafe: %s", type(e).__name__) return False @@ -389,12 +370,19 @@ def collect_all(dry_run: bool = False) -> dict: stats["skipped"] = True return stats - # Step 2: Initialize client + # Step 2: Initialize AI extractor try: - client = _get_claude_client() - except ValueError as e: + from config.loader import load_instance_config + instance_config = load_instance_config() + ai_config = instance_config.get("ai") + if not ai_config: + logger.warning("No ai: section in instance.yaml, skipping catalog refresh") + stats["skipped"] = True + return stats + extractor = create_extractor(ai_config) + except (ValueError, FileNotFoundError) as e: stats["errors"].append(str(e)) - logger.error(str(e)) + logger.error("Failed to initialize AI extractor: %s", e) return stats # Step 3: Load existing catalog @@ -420,30 +408,17 @@ def collect_all(dry_run: bool = False) -> dict: ) try: - response = client.messages.create( - model=HAIKU_MODEL, - max_tokens=8192, - messages=[{"role": "user", "content": prompt}], - output_config={ - "format": { - "type": "json_schema", - "schema": CATALOG_SCHEMA, - }, - }, + response_data = extractor.extract_json( + prompt, max_tokens=8192, + json_schema=CATALOG_SCHEMA, schema_name="catalog_refresh", ) - - response_data = json.loads(response.content[0].text) response_items = response_data.get("items", []) stats["items_extracted"] = len(response_items) - logger.info(f"HAIKU returned {len(response_items)} catalog items") + logger.info(f"Extractor returned {len(response_items)} catalog items") - except json.JSONDecodeError as e: - logger.error(f"Failed to parse HAIKU response as JSON: {e}") - stats["errors"].append(f"JSON parse error: {e}") - return stats - except anthropic.APIError as e: - logger.error(f"HAIKU API error: {e}") - stats["errors"].append(f"API error: {e}") + except LLMError as e: + logger.error("LLM extraction error: %s", type(e).__name__) + stats["errors"].append(f"LLM error: {type(e).__name__}") return stats # Step 6: Process response - map to existing IDs @@ -460,10 +435,10 @@ def collect_all(dry_run: bool = False) -> dict: stats["items_preserved"] += 1 else: # New item - run sensitivity check - if check_sensitivity(client, item): + if check_sensitivity(extractor, item): final_items[item_id] = item stats["items_new"] += 1 - logger.info(f"Added new knowledge item: {item['title']}") + logger.info("Added new knowledge item: id=%s", item_id) else: stats["items_filtered"] += 1 diff --git a/services/corporate_memory/systemd/corporate-memory.service b/services/corporate_memory/systemd/corporate-memory.service index 8fd2758..04d7680 100644 --- a/services/corporate_memory/systemd/corporate-memory.service +++ b/services/corporate_memory/systemd/corporate-memory.service @@ -13,6 +13,8 @@ ExecStart=/usr/local/bin/collect-knowledge # Environment EnvironmentFile=/opt/data-analyst/.env EnvironmentFile=/opt/data-analyst/repo/.env +Environment=CONFIG_DIR=/opt/data-analyst/instance/config +Environment=PYTHONPATH=/opt/data-analyst/repo # Security hardening - root needed to read /home/*/CLAUDE.local.md ProtectSystem=strict diff --git a/tests/test_llm_connector.py b/tests/test_llm_connector.py new file mode 100644 index 0000000..29297d6 --- /dev/null +++ b/tests/test_llm_connector.py @@ -0,0 +1,1347 @@ +""" +Tests for LLM connector module and Corporate Memory collector integration. + +Covers: +- Factory (create_extractor) with various configs +- AnthropicExtractor (mock anthropic SDK) +- OpenAICompatExtractor (mock openai SDK) with fallback strategies +- Security (no secrets in logs) +- Corporate Memory collector using the LLM connector +""" + +import json +import logging +from pathlib import Path +from unittest.mock import MagicMock, patch + +import anthropic +import openai +import pytest + +from connectors.llm.anthropic_provider import AnthropicExtractor +from connectors.llm.exceptions import ( + LLMAuthError, + LLMFormatError, + LLMRateLimitError, + LLMRefusalError, + LLMTimeoutError, + LLMUnsupportedError, +) +from connectors.llm.factory import DEFAULT_MODEL, create_extractor +from connectors.llm.openai_compat import ( + OpenAICompatExtractor, + _extract_json_from_text, + _sanitize_url, +) + + +# --------------------------------------------------------------------------- +# Helpers: mock response builders +# --------------------------------------------------------------------------- + + +def _anthropic_response(text: str, stop_reason: str = "end_turn"): + """Build a mock Anthropic API response.""" + block = MagicMock() + block.text = text + response = MagicMock() + response.content = [block] + response.stop_reason = stop_reason + return response + + +def _openai_response(content: str | None, finish_reason: str = "stop"): + """Build a mock OpenAI chat completion response.""" + message = MagicMock() + message.content = content + choice = MagicMock() + choice.message = message + choice.finish_reason = finish_reason + response = MagicMock() + response.choices = [choice] + return response + + +# =================================================================== +# Factory tests +# =================================================================== + + +class TestCreateExtractor: + """Tests for connectors.llm.factory.create_extractor.""" + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_anthropic_config(self, mock_client_cls): + """Anthropic provider config returns AnthropicExtractor.""" + config = {"provider": "anthropic", "api_key": "sk-ant-test123"} + ext = create_extractor(config) + assert isinstance(ext, AnthropicExtractor) + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_openai_compat_config(self, mock_client_cls): + """openai_compat provider config returns OpenAICompatExtractor.""" + config = { + "provider": "openai_compat", + "api_key": "sk-test", + "base_url": "https://api.example.com/v1", + } + ext = create_extractor(config) + assert isinstance(ext, OpenAICompatExtractor) + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_legacy_config(self, mock_client_cls): + """Legacy config with anthropic_api_key returns AnthropicExtractor.""" + config = {"anthropic_api_key": "sk-ant-legacy"} + ext = create_extractor(config) + assert isinstance(ext, AnthropicExtractor) + + def test_empty_config_raises(self): + """Empty config dict raises ValueError.""" + with pytest.raises(ValueError, match="non-empty dict"): + create_extractor({}) + + def test_none_config_raises(self): + """None config raises ValueError.""" + with pytest.raises(ValueError, match="non-empty dict"): + create_extractor(None) # type: ignore[arg-type] + + def test_missing_api_key_raises(self): + """Config with provider but empty api_key raises ValueError.""" + with pytest.raises(ValueError, match="must not be empty"): + create_extractor({"provider": "anthropic", "api_key": ""}) + + def test_missing_api_key_whitespace_raises(self): + """Config with whitespace-only api_key raises ValueError.""" + with pytest.raises(ValueError, match="must not be empty"): + create_extractor({"provider": "anthropic", "api_key": " "}) + + def test_openai_compat_missing_base_url_raises(self): + """openai_compat without base_url raises ValueError.""" + with pytest.raises(ValueError, match="base_url is required"): + create_extractor({ + "provider": "openai_compat", + "api_key": "sk-test", + }) + + def test_unknown_provider_raises(self): + """Unknown provider string raises ValueError.""" + with pytest.raises(ValueError, match="Unknown ai.provider"): + create_extractor({ + "provider": "gemini", + "api_key": "sk-test", + }) + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_default_model(self, mock_client_cls): + """Default model is claude-haiku-4-5-20251001.""" + config = {"provider": "anthropic", "api_key": "sk-ant-test"} + ext = create_extractor(config) + assert ext._model == DEFAULT_MODEL + assert ext._model == "claude-haiku-4-5-20251001" + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_custom_model(self, mock_client_cls): + """Custom model from config is used.""" + config = { + "provider": "anthropic", + "api_key": "sk-ant-test", + "model": "claude-sonnet-4-20250514", + } + ext = create_extractor(config) + assert ext._model == "claude-sonnet-4-20250514" + + def test_invalid_structured_output_raises(self): + """Invalid structured_output value raises ValueError.""" + with pytest.raises(ValueError, match="strict.*json.*auto"): + create_extractor({ + "provider": "openai_compat", + "api_key": "sk-test", + "base_url": "https://api.example.com/v1", + "structured_output": "whatever", + }) + + +# =================================================================== +# AnthropicExtractor tests +# =================================================================== + + +class TestAnthropicExtractor: + """Tests for connectors.llm.anthropic_provider.AnthropicExtractor.""" + + SCHEMA = {"type": "object", "properties": {"items": {"type": "array"}}} + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_extract_json_success(self, mock_client_cls): + """Successful extraction returns parsed dict.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": [{"name": "test"}]} + mock_client.messages.create.return_value = _anthropic_response( + json.dumps(payload) + ) + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + result = ext.extract_json( + prompt="Extract items", + max_tokens=1024, + json_schema=self.SCHEMA, + schema_name="test_schema", + ) + + assert result == payload + mock_client.messages.create.assert_called_once() + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_auth_error_raises_immediately(self, mock_client_cls): + """AuthenticationError raises LLMAuthError without retries.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.messages.create.side_effect = anthropic.AuthenticationError( + message="bad key", + response=MagicMock(status_code=401), + body=None, + ) + + ext = AnthropicExtractor(api_key="sk-bad", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMAuthError, match="authentication failed"): + ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + # Only one call - no retries for auth errors + assert mock_client.messages.create.call_count == 1 + + @patch("connectors.llm.anthropic_provider.time.sleep") + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_rate_limit_retries_then_succeeds(self, mock_client_cls, mock_sleep): + """RateLimitError retries and succeeds on second attempt.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": []} + mock_client.messages.create.side_effect = [ + anthropic.RateLimitError( + message="rate limited", + response=MagicMock(status_code=429), + body=None, + ), + _anthropic_response(json.dumps(payload)), + ] + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + result = ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + assert result == payload + assert mock_client.messages.create.call_count == 2 + mock_sleep.assert_called_once() + + @patch("connectors.llm.anthropic_provider.time.sleep") + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_rate_limit_exhausts_retries(self, mock_client_cls, mock_sleep): + """RateLimitError after max retries raises LLMRateLimitError.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.messages.create.side_effect = anthropic.RateLimitError( + message="rate limited", + response=MagicMock(status_code=429), + body=None, + ) + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMRateLimitError): + ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + assert mock_client.messages.create.call_count == 3 # MAX_RETRIES + + @patch("connectors.llm.anthropic_provider.time.sleep") + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_timeout_retries(self, mock_client_cls, mock_sleep): + """APITimeoutError retries with backoff then succeeds.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": []} + mock_client.messages.create.side_effect = [ + anthropic.APITimeoutError(request=MagicMock()), + _anthropic_response(json.dumps(payload)), + ] + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + result = ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + assert result == payload + mock_sleep.assert_called_once_with(2) # INITIAL_BACKOFF_SECONDS + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_truncation_raises_format_error(self, mock_client_cls): + """stop_reason='max_tokens' raises LLMFormatError immediately (no recursion).""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + truncated = _anthropic_response('{"items": [', stop_reason="max_tokens") + mock_client.messages.create.return_value = truncated + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMFormatError, match="truncated"): + ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_invalid_json_raises_format_error(self, mock_client_cls): + """Non-JSON response raises LLMFormatError.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.messages.create.return_value = _anthropic_response( + "This is not JSON at all" + ) + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMFormatError, match="Failed to parse"): + ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_empty_content_raises_refusal(self, mock_client_cls): + """Empty content list with end_turn raises LLMRefusalError.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + response = MagicMock() + response.content = [] + response.stop_reason = "end_turn" + mock_client.messages.create.return_value = response + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMRefusalError, match="refused"): + ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + @patch("connectors.llm.anthropic_provider.time.sleep") + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_backoff_multiplier(self, mock_client_cls, mock_sleep): + """Exponential backoff doubles the delay on each retry.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.messages.create.side_effect = anthropic.RateLimitError( + message="limited", + response=MagicMock(status_code=429), + body=None, + ) + + ext = AnthropicExtractor(api_key="sk-ant-test", model="claude-haiku-4-5-20251001") + with pytest.raises(LLMRateLimitError): + ext.extract_json("test", 1024, self.SCHEMA, "test_schema") + + # 3 attempts, 2 sleeps: delay(1)=2, delay(2)=4 + assert mock_sleep.call_count == 2 + mock_sleep.assert_any_call(2) + mock_sleep.assert_any_call(4) + + +# =================================================================== +# OpenAICompatExtractor tests +# =================================================================== + + +class TestOpenAICompatExtractor: + """Tests for connectors.llm.openai_compat.OpenAICompatExtractor.""" + + SCHEMA = {"type": "object", "properties": {"items": {"type": "array"}}} + BASE_URL = "https://api.example.com/v1" + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_json_schema_success(self, mock_client_cls): + """json_schema strategy returns parsed dict on success.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": [{"id": 1}]} + mock_client.chat.completions.create.return_value = _openai_response( + json.dumps(payload) + ) + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="gpt-4o", structured_output="auto", + ) + result = ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + assert result == payload + call_kwargs = mock_client.chat.completions.create.call_args[1] + assert call_kwargs["response_format"]["type"] == "json_schema" + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_fallback_json_schema_to_json_object(self, mock_client_cls): + """Auto mode falls back from json_schema to json_object.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": []} + # First call (json_schema) fails with BadRequestError, second (json_object) succeeds + mock_client.chat.completions.create.side_effect = [ + openai.BadRequestError( + message="response_format json_schema not supported", + response=MagicMock(status_code=400), + body=None, + ), + _openai_response(json.dumps(payload)), + ] + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="local-model", structured_output="auto", + ) + result = ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + assert result == payload + assert mock_client.chat.completions.create.call_count == 2 + + # Second call should use json_object format + second_call_kwargs = mock_client.chat.completions.create.call_args_list[1][1] + assert second_call_kwargs["response_format"]["type"] == "json_object" + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_fallback_to_text_mode(self, mock_client_cls): + """Auto mode falls back to text when both json_schema and json_object fail.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": [{"x": 1}]} + mock_client.chat.completions.create.side_effect = [ + openai.BadRequestError( + message="response_format json_schema not supported", + response=MagicMock(status_code=400), + body=None, + ), + openai.BadRequestError( + message="response_format json_object not supported", + response=MagicMock(status_code=400), + body=None, + ), + _openai_response(json.dumps(payload)), + ] + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="local-model", structured_output="auto", + ) + result = ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + assert result == payload + assert mock_client.chat.completions.create.call_count == 3 + + # Third call should NOT have response_format (text fallback) + third_call_kwargs = mock_client.chat.completions.create.call_args_list[2][1] + assert "response_format" not in third_call_kwargs + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_strict_mode_raises_unsupported(self, mock_client_cls): + """strict mode raises LLMUnsupportedError when json_schema fails.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.chat.completions.create.side_effect = openai.BadRequestError( + message="response_format json_schema not supported", + response=MagicMock(status_code=400), + body=None, + ) + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="local-model", structured_output="strict", + ) + with pytest.raises(LLMUnsupportedError, match="No supported structured output"): + ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_json_mode_no_text_fallback(self, mock_client_cls): + """json mode tries json_schema + json_object but NOT text.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.chat.completions.create.side_effect = openai.BadRequestError( + message="response_format json_schema not supported", + response=MagicMock(status_code=400), + body=None, + ) + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="local-model", structured_output="json", + ) + with pytest.raises(LLMUnsupportedError, match="No supported structured output"): + ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + # json mode: json_schema -> LLMUnsupportedError (skip), json_object -> same + assert mock_client.chat.completions.create.call_count == 2 + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_text_fallback_strips_markdown_fences(self, mock_client_cls): + """Text fallback strips markdown code fences from response.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": [{"v": 42}]} + fenced_json = f"```json\n{json.dumps(payload)}\n```" + + mock_client.chat.completions.create.side_effect = [ + openai.BadRequestError( + message="response_format json_schema not supported", + response=MagicMock(status_code=400), + body=None, + ), + openai.BadRequestError( + message="response_format json_object not supported", + response=MagicMock(status_code=400), + body=None, + ), + _openai_response(fenced_json), + ] + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="local-model", structured_output="auto", + ) + result = ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + assert result == payload + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_auth_error(self, mock_client_cls): + """AuthenticationError raises LLMAuthError without retries.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.chat.completions.create.side_effect = openai.AuthenticationError( + message="invalid key", + response=MagicMock(status_code=401), + body=None, + ) + + ext = OpenAICompatExtractor( + api_key="sk-bad", base_url=self.BASE_URL, + model="gpt-4o", structured_output="auto", + ) + with pytest.raises(LLMAuthError, match="authentication failed"): + ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + assert mock_client.chat.completions.create.call_count == 1 + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_truncation_raises_format_error(self, mock_client_cls): + """finish_reason='length' raises LLMFormatError immediately (no recursion).""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + mock_client.chat.completions.create.return_value = _openai_response( + '{"items": [', finish_reason="length", + ) + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="gpt-4o", structured_output="auto", + ) + with pytest.raises(LLMFormatError, match="truncated"): + ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_empty_content_raises_refusal(self, mock_client_cls): + """Empty content raises LLMRefusalError.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + mock_client.chat.completions.create.return_value = _openai_response( + None, finish_reason="stop" + ) + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="gpt-4o", structured_output="auto", + ) + with pytest.raises(LLMRefusalError, match="refused"): + ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + @patch("connectors.llm.openai_compat.time.sleep") + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_rate_limit_retries(self, mock_client_cls, mock_sleep): + """RateLimitError retries with backoff then succeeds.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + payload = {"items": []} + mock_client.chat.completions.create.side_effect = [ + openai.RateLimitError( + message="too many requests", + response=MagicMock(status_code=429), + body=None, + ), + _openai_response(json.dumps(payload)), + ] + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="gpt-4o", structured_output="auto", + ) + result = ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + assert result == payload + mock_sleep.assert_called_once_with(2) + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_bad_request_non_format_raises_format_error(self, mock_client_cls): + """BadRequestError not about response_format raises LLMFormatError.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.chat.completions.create.side_effect = openai.BadRequestError( + message="invalid model parameter", + response=MagicMock(status_code=400), + body=None, + ) + + ext = OpenAICompatExtractor( + api_key="sk-test", base_url=self.BASE_URL, + model="gpt-4o", structured_output="auto", + ) + with pytest.raises(LLMFormatError, match="Bad request"): + ext.extract_json("Extract", 1024, self.SCHEMA, "test") + + +# =================================================================== +# URL sanitization tests +# =================================================================== + + +class TestURLSanitization: + """Tests for URL sanitization in logging.""" + + def test_sanitize_url_removes_path(self): + """Sanitized URL has no path component.""" + result = _sanitize_url("https://api.example.com/v1/chat/completions") + assert result == "https://api.example.com" + + def test_sanitize_url_removes_query(self): + """Sanitized URL has no query params.""" + result = _sanitize_url("https://api.example.com/v1?token=secret123") + assert result == "https://api.example.com" + + def test_sanitize_url_preserves_port(self): + """Sanitized URL preserves port number.""" + result = _sanitize_url("http://localhost:8080/v1") + assert result == "http://localhost:8080" + + +# =================================================================== +# _extract_json_from_text tests +# =================================================================== + + +class TestExtractJsonFromText: + """Tests for the text-based JSON extraction helper.""" + + def test_direct_json(self): + """Plain JSON parses directly.""" + result = _extract_json_from_text('{"key": "value"}') + assert result == {"key": "value"} + + def test_markdown_fence_json(self): + """JSON wrapped in ```json fences is extracted.""" + text = '```json\n{"key": "value"}\n```' + result = _extract_json_from_text(text) + assert result == {"key": "value"} + + def test_markdown_fence_no_lang(self): + """JSON wrapped in ``` fences (no language) is extracted.""" + text = '```\n{"key": "value"}\n```' + result = _extract_json_from_text(text) + assert result == {"key": "value"} + + def test_brace_extraction_fallback(self): + """Fallback: extract JSON between first { and last }.""" + text = 'Here is the result: {"key": "value"} -- done' + result = _extract_json_from_text(text) + assert result == {"key": "value"} + + def test_no_json_raises_format_error(self): + """No valid JSON raises LLMFormatError.""" + with pytest.raises(LLMFormatError, match="Could not extract valid JSON"): + _extract_json_from_text("This is just plain text without braces") + + def test_invalid_json_in_braces_raises(self): + """Malformed JSON in braces raises LLMFormatError.""" + with pytest.raises(LLMFormatError): + _extract_json_from_text("{not: valid json}") + + +# =================================================================== +# Security tests (no secrets in logs) +# =================================================================== + + +class TestSecurity: + """Verify that API keys, prompts, and responses never appear in log output.""" + + SECRET_KEY = "sk-ant-SUPER-SECRET-KEY-12345" + PROMPT_TEXT = "Extract the following secret data from documents" + RESPONSE_TEXT = '{"items": [{"classified": "top-secret-info"}]}' + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_api_key_not_in_logs(self, mock_client_cls, caplog): + """API key must never appear in log messages.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.messages.create.return_value = _anthropic_response(self.RESPONSE_TEXT) + + ext = AnthropicExtractor(api_key=self.SECRET_KEY, model="claude-haiku-4-5-20251001") + with caplog.at_level(logging.DEBUG, logger="connectors.llm"): + ext.extract_json( + self.PROMPT_TEXT, 1024, + {"type": "object"}, "test_schema", + ) + + full_log = caplog.text + assert self.SECRET_KEY not in full_log + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_prompt_not_in_logs(self, mock_client_cls, caplog): + """Prompt content must never appear in log messages.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.messages.create.return_value = _anthropic_response(self.RESPONSE_TEXT) + + ext = AnthropicExtractor(api_key=self.SECRET_KEY, model="claude-haiku-4-5-20251001") + with caplog.at_level(logging.DEBUG, logger="connectors.llm"): + ext.extract_json( + self.PROMPT_TEXT, 1024, + {"type": "object"}, "test_schema", + ) + + full_log = caplog.text + assert self.PROMPT_TEXT not in full_log + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_response_not_in_logs(self, mock_client_cls, caplog): + """Response content must never appear in log messages.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.messages.create.return_value = _anthropic_response(self.RESPONSE_TEXT) + + ext = AnthropicExtractor(api_key=self.SECRET_KEY, model="claude-haiku-4-5-20251001") + with caplog.at_level(logging.DEBUG, logger="connectors.llm"): + ext.extract_json( + self.PROMPT_TEXT, 1024, + {"type": "object"}, "test_schema", + ) + + full_log = caplog.text + assert "top-secret-info" not in full_log + assert self.RESPONSE_TEXT not in full_log + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_openai_api_key_not_in_logs(self, mock_client_cls, caplog): + """OpenAI-compat API key must never appear in log messages.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.chat.completions.create.return_value = _openai_response( + self.RESPONSE_TEXT + ) + + ext = OpenAICompatExtractor( + api_key=self.SECRET_KEY, + base_url="https://api.example.com/v1", + model="gpt-4o", + structured_output="auto", + ) + with caplog.at_level(logging.DEBUG, logger="connectors.llm"): + ext.extract_json( + self.PROMPT_TEXT, 1024, + {"type": "object"}, "test_schema", + ) + + full_log = caplog.text + assert self.SECRET_KEY not in full_log + + @patch("connectors.llm.openai_compat.openai.OpenAI") + def test_openai_url_path_not_in_logs(self, mock_client_cls, caplog): + """URL paths (may contain tokens) must not appear in logs.""" + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + mock_client.chat.completions.create.return_value = _openai_response( + '{"ok": true}' + ) + + sensitive_url = "https://api.example.com/v1/secret-path?token=abc123" + ext = OpenAICompatExtractor( + api_key="sk-test", + base_url=sensitive_url, + model="gpt-4o", + structured_output="auto", + ) + with caplog.at_level(logging.DEBUG, logger="connectors.llm"): + ext.extract_json("test", 1024, {"type": "object"}, "test_schema") + + full_log = caplog.text + assert "secret-path" not in full_log + assert "token=abc123" not in full_log + # But the host SHOULD be present (safe to log) + assert "api.example.com" in full_log + + +# =================================================================== +# Corporate Memory collector tests +# =================================================================== + + +class TestCorporateMemoryCollector: + """Tests for services.corporate_memory.collector integration with LLM connector.""" + + def test_collect_all_no_files_skips(self, tmp_path): + """collect_all skips when no CLAUDE.local.md files found.""" + from services.corporate_memory.collector import collect_all + + # Use an empty directory as HOME_BASE + empty_home = tmp_path / "empty_home" + empty_home.mkdir() + + with patch("services.corporate_memory.collector.HOME_BASE", empty_home): + stats = collect_all(dry_run=True) + + assert stats["skipped"] is True + assert stats["files_found"] == 0 + + def test_collect_all_no_changes_skips(self, tmp_path): + """collect_all skips when hashes match (no changes).""" + import hashlib as hl + + from services.corporate_memory.collector import collect_all + + # Set up a user directory with CLAUDE.local.md + home = tmp_path / "home" + home.mkdir() + user_dir = home / "alice" + user_dir.mkdir() + claude_file = user_dir / "CLAUDE.local.md" + claude_file.write_text("Some knowledge content") + + content_hash = hl.md5("Some knowledge content".encode()).hexdigest() + + # Stored hashes match current hashes -> no changes + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch( + "services.corporate_memory.collector._read_json", + return_value={"hashes": {"alice": content_hash}}, + ), + ): + stats = collect_all(dry_run=True) + + assert stats["skipped"] is True + assert stats["files_found"] == 1 + + def test_collect_all_with_changes(self, tmp_path): + """collect_all processes files when changes detected.""" + from services.corporate_memory.collector import collect_all + + home = tmp_path / "home" + home.mkdir() + user_dir = home / "bob" + user_dir.mkdir() + claude_file = user_dir / "CLAUDE.local.md" + claude_file.write_text("## Useful DuckDB trick\nUse QUALIFY for window filters") + + # Mock extractor + mock_extractor = MagicMock() + mock_extractor.extract_json.side_effect = [ + # First call: catalog refresh + { + "items": [ + { + "existing_id": None, + "title": "DuckDB QUALIFY clause", + "content": "Use QUALIFY for window function filtering", + "category": "data_analysis", + "tags": ["duckdb", "sql"], + "source_users": ["bob"], + }, + ], + }, + # Second call: sensitivity check + {"safe": True}, + ] + + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch("services.corporate_memory.collector._read_json", return_value={}), + patch("services.corporate_memory.collector._write_json"), + patch( + "config.loader.load_instance_config", + return_value={"ai": {"provider": "anthropic", "api_key": "sk-test"}}, + ), + patch( + "services.corporate_memory.collector.create_extractor", + return_value=mock_extractor, + ), + ): + stats = collect_all(dry_run=True) + + assert stats["skipped"] is False + assert stats["items_extracted"] == 1 + assert stats["items_new"] == 1 + assert stats["items_filtered"] == 0 + assert stats["errors"] == [] + + def test_collect_all_filters_sensitive_items(self, tmp_path): + """collect_all filters items that fail sensitivity check.""" + from services.corporate_memory.collector import collect_all + + home = tmp_path / "home" + home.mkdir() + user_dir = home / "carol" + user_dir.mkdir() + (user_dir / "CLAUDE.local.md").write_text("API key: sk-secret-123") + + mock_extractor = MagicMock() + mock_extractor.extract_json.side_effect = [ + # Catalog refresh + { + "items": [ + { + "existing_id": None, + "title": "API credentials", + "content": "Use sk-secret-123 for auth", + "category": "api_integration", + "tags": ["api", "auth"], + "source_users": ["carol"], + }, + ], + }, + # Sensitivity check: NOT safe + {"safe": False, "reason": "Contains API key"}, + ] + + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch("services.corporate_memory.collector._read_json", return_value={}), + patch("services.corporate_memory.collector._write_json"), + patch( + "config.loader.load_instance_config", + return_value={"ai": {"provider": "anthropic", "api_key": "sk-test"}}, + ), + patch( + "services.corporate_memory.collector.create_extractor", + return_value=mock_extractor, + ), + ): + stats = collect_all(dry_run=True) + + assert stats["items_extracted"] == 1 + assert stats["items_new"] == 0 + assert stats["items_filtered"] == 1 + + def test_collect_all_preserves_existing_items(self, tmp_path): + """Existing items (by ID) skip sensitivity check and are preserved.""" + from services.corporate_memory.collector import collect_all + + home = tmp_path / "home" + home.mkdir() + user_dir = home / "dave" + user_dir.mkdir() + (user_dir / "CLAUDE.local.md").write_text("Updated knowledge") + + existing = { + "items": { + "km_abc123": { + "id": "km_abc123", + "title": "Existing item", + "content": "This is already validated", + "category": "workflow", + "tags": ["existing"], + "source_users": ["dave"], + "extracted_at": "2026-01-01T00:00:00+00:00", + }, + }, + "metadata": {}, + } + + def read_json_side_effect(path): + path_str = str(path) + if "user_hashes" in path_str: + return {} # No stored hashes -> force change detection + if "knowledge" in path_str: + return existing + return {} + + mock_extractor = MagicMock() + # HAIKU returns the existing item (preserving ID) + mock_extractor.extract_json.return_value = { + "items": [ + { + "existing_id": "km_abc123", + "title": "Existing item (updated)", + "content": "This is already validated with updates", + "category": "workflow", + "tags": ["existing"], + "source_users": ["dave"], + }, + ], + } + + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch( + "services.corporate_memory.collector._read_json", + side_effect=read_json_side_effect, + ), + patch("services.corporate_memory.collector._write_json"), + patch( + "config.loader.load_instance_config", + return_value={"ai": {"provider": "anthropic", "api_key": "sk-test"}}, + ), + patch( + "services.corporate_memory.collector.create_extractor", + return_value=mock_extractor, + ), + ): + stats = collect_all(dry_run=True) + + assert stats["items_preserved"] == 1 + assert stats["items_new"] == 0 + # extract_json called ONCE (catalog refresh only, no sensitivity for existing) + assert mock_extractor.extract_json.call_count == 1 + + def test_collect_all_handles_llm_error(self, tmp_path): + """collect_all captures LLMError and returns it in stats.""" + from services.corporate_memory.collector import collect_all + + home = tmp_path / "home" + home.mkdir() + user_dir = home / "eve" + user_dir.mkdir() + (user_dir / "CLAUDE.local.md").write_text("Some content") + + mock_extractor = MagicMock() + mock_extractor.extract_json.side_effect = LLMRateLimitError("too many requests") + + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch("services.corporate_memory.collector._read_json", return_value={}), + patch( + "config.loader.load_instance_config", + return_value={"ai": {"provider": "anthropic", "api_key": "sk-test"}}, + ), + patch( + "services.corporate_memory.collector.create_extractor", + return_value=mock_extractor, + ), + ): + stats = collect_all(dry_run=True) + + assert len(stats["errors"]) == 1 + assert "LLM error" in stats["errors"][0] + + def test_collect_all_no_ai_config_skips(self, tmp_path): + """collect_all skips when instance.yaml has no ai: section.""" + from services.corporate_memory.collector import collect_all + + home = tmp_path / "home" + home.mkdir() + user_dir = home / "frank" + user_dir.mkdir() + (user_dir / "CLAUDE.local.md").write_text("Some content") + + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch("services.corporate_memory.collector._read_json", return_value={}), + patch( + "config.loader.load_instance_config", + return_value={"server": {"host": "example.com"}}, + ), + ): + stats = collect_all(dry_run=True) + + assert stats["skipped"] is True + + +# =================================================================== +# Corporate Memory collector - helper function tests +# =================================================================== + + +class TestCollectorHelpers: + """Tests for collector helper functions.""" + + def test_generate_id_deterministic(self): + """_generate_id returns consistent IDs for same content.""" + from services.corporate_memory.collector import _generate_id + + id1 = _generate_id("test content") + id2 = _generate_id("test content") + assert id1 == id2 + assert id1.startswith("km_") + assert len(id1) == 15 # "km_" + 12 hex chars + + def test_generate_id_different_for_different_content(self): + """_generate_id returns different IDs for different content.""" + from services.corporate_memory.collector import _generate_id + + id1 = _generate_id("content A") + id2 = _generate_id("content B") + assert id1 != id2 + + def test_format_existing_catalog_empty(self): + """Empty catalog formats as fresh message.""" + from services.corporate_memory.collector import _format_existing_catalog + + result = _format_existing_catalog({}) + assert "fresh catalog" in result.lower() or "No existing items" in result + + def test_format_existing_catalog_with_items(self): + """Catalog with items formats each item.""" + from services.corporate_memory.collector import _format_existing_catalog + + existing = { + "items": { + "km_abc": { + "title": "Test Item", + "content": "Test content", + "category": "workflow", + "tags": ["test"], + "source_users": ["alice"], + }, + }, + } + + result = _format_existing_catalog(existing) + assert "km_abc" in result + assert "Test Item" in result + assert "workflow" in result + assert "alice" in result + + def test_format_user_files(self): + """User files are formatted with username headers.""" + from services.corporate_memory.collector import _format_user_files + + user_files = { + "alice": ("Knowledge from alice", "hash_a"), + "bob": ("Knowledge from bob", "hash_b"), + } + + result = _format_user_files(user_files) + assert "### User: alice" in result + assert "### User: bob" in result + assert "Knowledge from alice" in result + assert "Knowledge from bob" in result + + def test_process_catalog_response_new_items(self): + """New items get generated IDs.""" + from services.corporate_memory.collector import _process_catalog_response + + items = [ + { + "existing_id": None, + "title": "New Knowledge", + "content": "Fresh insight", + "category": "data_analysis", + "tags": ["new"], + "source_users": ["alice"], + }, + ] + + result = _process_catalog_response(items, {"items": {}}) + + assert len(result) == 1 + item_id = list(result.keys())[0] + assert item_id.startswith("km_") + item = result[item_id] + assert item["title"] == "New Knowledge" + assert item["content"] == "Fresh insight" + assert "extracted_at" in item + assert "updated_at" in item + + def test_process_catalog_response_preserves_existing(self): + """Existing items keep their original ID and extracted_at.""" + from services.corporate_memory.collector import _process_catalog_response + + existing = { + "items": { + "km_existing": { + "title": "Old Title", + "content": "Old content", + "extracted_at": "2026-01-01T00:00:00+00:00", + }, + }, + } + + items = [ + { + "existing_id": "km_existing", + "title": "Updated Title", + "content": "Updated content", + "category": "workflow", + "tags": ["updated"], + "source_users": ["alice"], + }, + ] + + result = _process_catalog_response(items, existing) + + assert "km_existing" in result + assert result["km_existing"]["title"] == "Updated Title" + assert result["km_existing"]["extracted_at"] == "2026-01-01T00:00:00+00:00" + + def test_process_catalog_response_handles_collision(self): + """ID collision for new items is resolved.""" + from services.corporate_memory.collector import _process_catalog_response + + # Two items with identical title+content will produce same hash + items = [ + { + "existing_id": None, + "title": "Same", + "content": "Same", + "category": "workflow", + "tags": [], + "source_users": ["a"], + }, + { + "existing_id": None, + "title": "Same", + "content": "Same", + "category": "workflow", + "tags": [], + "source_users": ["b"], + }, + ] + + result = _process_catalog_response(items, {"items": {}}) + + # Both items should be present (collision resolved) + assert len(result) == 2 + + +# =================================================================== +# Corporate Memory - check_sensitivity tests +# =================================================================== + + +class TestCheckSensitivity: + """Tests for the sensitivity check function.""" + + def test_safe_item_returns_true(self): + """Safe items return True.""" + from services.corporate_memory.collector import check_sensitivity + + mock_extractor = MagicMock() + mock_extractor.extract_json.return_value = {"safe": True} + + item = { + "title": "SQL Tip", + "content": "Use GROUP BY for aggregation", + "tags": ["sql"], + } + + assert check_sensitivity(mock_extractor, item) is True + + def test_unsafe_item_returns_false(self): + """Unsafe items return False.""" + from services.corporate_memory.collector import check_sensitivity + + mock_extractor = MagicMock() + mock_extractor.extract_json.return_value = { + "safe": False, + "reason": "Contains API key", + } + + item = { + "title": "Auth setup", + "content": "Use key sk-12345", + "tags": ["auth"], + } + + assert check_sensitivity(mock_extractor, item) is False + + def test_llm_error_assumes_unsafe(self): + """LLMError during sensitivity check assumes item is unsafe.""" + from services.corporate_memory.collector import check_sensitivity + + mock_extractor = MagicMock() + mock_extractor.extract_json.side_effect = LLMRateLimitError("rate limited") + + item = {"title": "Test", "content": "Content", "tags": []} + + assert check_sensitivity(mock_extractor, item) is False + + def test_llm_format_error_assumes_unsafe(self): + """LLMFormatError during sensitivity check assumes item is unsafe.""" + from services.corporate_memory.collector import check_sensitivity + + mock_extractor = MagicMock() + mock_extractor.extract_json.side_effect = LLMFormatError("bad json") + + item = {"title": "Test", "content": "Content", "tags": []} + + assert check_sensitivity(mock_extractor, item) is False + + +# =================================================================== +# Integration: collector uses create_extractor +# =================================================================== + + +class TestCollectorExtractorIntegration: + """Verify collector properly initializes the LLM extractor.""" + + @patch("connectors.llm.anthropic_provider.anthropic.Anthropic") + def test_collector_creates_anthropic_extractor(self, mock_client_cls, tmp_path): + """Collector creates AnthropicExtractor from instance.yaml config.""" + from services.corporate_memory.collector import collect_all + + home = tmp_path / "home" + home.mkdir() + user_dir = home / "alice" + user_dir.mkdir() + (user_dir / "CLAUDE.local.md").write_text("Some knowledge") + + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + catalog_response = _anthropic_response(json.dumps({"items": []})) + mock_client.messages.create.return_value = catalog_response + + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch("services.corporate_memory.collector._read_json", return_value={}), + patch("services.corporate_memory.collector._write_json"), + patch( + "config.loader.load_instance_config", + return_value={ + "ai": { + "provider": "anthropic", + "api_key": "sk-ant-integration-test", + "model": "claude-haiku-4-5-20251001", + }, + }, + ), + ): + stats = collect_all(dry_run=True) + + # Verify Anthropic client was initialized + mock_client_cls.assert_called_once_with(api_key="sk-ant-integration-test") + assert stats["items_extracted"] == 0 + assert stats["errors"] == [] + + def test_collector_handles_invalid_config(self, tmp_path): + """Collector returns error when config is invalid.""" + from services.corporate_memory.collector import collect_all + + home = tmp_path / "home" + home.mkdir() + user_dir = home / "alice" + user_dir.mkdir() + (user_dir / "CLAUDE.local.md").write_text("Some knowledge") + + with ( + patch("services.corporate_memory.collector.HOME_BASE", home), + patch("services.corporate_memory.collector._read_json", return_value={}), + patch( + "config.loader.load_instance_config", + return_value={"ai": {"provider": "anthropic", "api_key": ""}}, + ), + ): + stats = collect_all(dry_run=True) + + assert len(stats["errors"]) == 1 + assert "must not be empty" in stats["errors"][0]