Add modular LLM connector for Corporate Memory

Replace hardwired Anthropic API calls with a pluggable provider system.
Each deployment configures its AI provider in instance.yaml — switching
between Anthropic, LiteLLM, OpenRouter, or any OpenAI-compatible proxy
is a config change, not a code change.

New connectors/llm/ module:
- StructuredExtractor Protocol with extract_json() interface
- AnthropicExtractor: direct Anthropic SDK with retry + backoff
- OpenAICompatExtractor: any OpenAI-compatible proxy with three-layer
  structured output fallback (json_schema -> json_object -> prompt)
- Configurable structured_output policy (strict/json/auto)
- Custom exception hierarchy (auth/rate_limit/timeout/format/refusal)
- Zero secrets in logs: no API keys, prompts, or responses logged

Reviewed by: Google Gemini, Claude Sonnet, OpenAI GPT-5.4.
Security audit passed with all critical findings resolved.
This commit is contained in:
Petr 2026-03-23 12:08:33 +01:00
parent 84d14da611
commit 95358448e6
16 changed files with 2467 additions and 56 deletions

View file

@ -20,3 +20,6 @@ GOOGLE_CLIENT_SECRET=
# JIRA_WEBHOOK_SECRET=
# JIRA_SLA_API_TOKEN=
# ANTHROPIC_API_KEY=
# LLM API key for proxy routing (referenced as ${LLM_API_KEY} in instance.yaml)
# Provider and model configured in instance.yaml ai: section
# LLM_API_KEY=

View file

@ -153,8 +153,57 @@ jira:
cloud_id: ""
# --- Corporate Memory AI (optional) ---
# Extracts shared knowledge from team members' CLAUDE.local.md files.
# Provider: "anthropic" (direct API) or "openai_compat" (LiteLLM, OpenRouter, Azure, etc.)
ai:
anthropic_api_key: "${ANTHROPIC_API_KEY}"
provider: "anthropic" # or "openai_compat"
api_key: "${ANTHROPIC_API_KEY}" # or "${LLM_API_KEY}" for proxy
# base_url: "https://litellm.example.com" # required for openai_compat
model: "claude-haiku-4-5-20251001" # any model available on your provider
# --- Structured output quality control ---
# AI models can return JSON in three ways, each with different reliability:
#
# Layer 1 - "json_schema" (best):
# The provider enforces an exact schema. Every field, type, and structure
# is guaranteed. Available on: Anthropic, OpenAI, Claude via LiteLLM.
#
# Layer 2 - "json_object" (good):
# The provider guarantees valid JSON, but does not enforce a specific schema.
# Fields may be missing or have wrong types. Available on most providers.
#
# Layer 3 - "prompt" (acceptable):
# The AI is asked to respond in JSON via instructions in the prompt.
# No technical enforcement -- the model may still return invalid JSON.
# Works everywhere, but least reliable.
#
# "strict" = only Layer 1. Fail if provider doesn't support json_schema.
# Use when data quality is non-negotiable.
# "json" = Layer 1, fall back to Layer 2. No prompt-based fallback.
# Good balance of quality and compatibility.
# "auto" = All three layers as progressive fallback. Maximum compatibility.
# Use when you'd rather get imperfect data than no data.
structured_output: "auto"
# Legacy format (still supported, equivalent to provider: "anthropic"):
# ai:
# anthropic_api_key: "${ANTHROPIC_API_KEY}"
# Examples:
# --- LiteLLM proxy ---
# ai:
# provider: "openai_compat"
# base_url: "https://litellm.example.com"
# api_key: "${LLM_API_KEY}"
# model: "claude-haiku-4-5-20251001"
# structured_output: "strict"
#
# --- OpenRouter ---
# ai:
# provider: "openai_compat"
# base_url: "https://openrouter.ai/api/v1"
# api_key: "${OPENROUTER_API_KEY}"
# model: "anthropic/claude-3-haiku"
# structured_output: "auto"
# --- User display (for Corporate Memory avatars) ---
users: {}

View file

@ -0,0 +1,11 @@
"""LLM connector module for structured extraction.
Provides a provider-agnostic interface for extracting structured JSON
from language models. Supports Anthropic (native) and OpenAI-compatible
providers with automatic fallback strategies for structured output.
"""
from .base import StructuredExtractor
from .factory import create_extractor
__all__ = ["StructuredExtractor", "create_extractor"]

View file

@ -0,0 +1,152 @@
"""Anthropic provider for structured JSON extraction.
Uses the Anthropic API with native structured output (json_schema)
for reliable JSON extraction. Includes retry logic for transient errors.
"""
import json
import logging
import time
import anthropic
from .exceptions import (
LLMAuthError,
LLMFormatError,
LLMRateLimitError,
LLMRefusalError,
LLMTimeoutError,
)
logger = logging.getLogger(__name__)
# Retry configuration
MAX_RETRIES = 3
INITIAL_BACKOFF_SECONDS = 2
BACKOFF_MULTIPLIER = 2
class AnthropicExtractor:
"""Structured JSON extractor using the Anthropic API.
Uses output_config with json_schema format for structured output.
Retries transient errors (rate limit, timeout, connection) with
exponential backoff.
"""
def __init__(self, api_key: str, model: str) -> None:
"""Initialize the Anthropic extractor.
Args:
api_key: Anthropic API key.
model: Model identifier (e.g., "claude-haiku-4-5-20251001").
"""
self._client = anthropic.Anthropic(api_key=api_key)
self._model = model
def extract_json(
self,
prompt: str,
max_tokens: int,
json_schema: dict,
schema_name: str,
) -> dict:
"""Extract structured JSON using the Anthropic API.
Args:
prompt: The extraction prompt to send to the model.
max_tokens: Maximum tokens in the response.
json_schema: JSON Schema that the response must conform to.
schema_name: Human-readable name for the schema.
Returns:
Parsed JSON dictionary conforming to the provided schema.
Raises:
LLMAuthError: Invalid API key.
LLMRateLimitError: Rate limited after all retries.
LLMTimeoutError: Timeout/connection error after all retries.
LLMFormatError: Response is not valid JSON.
LLMRefusalError: Model refused to respond.
"""
last_exception: Exception | None = None
for attempt in range(1, MAX_RETRIES + 1):
try:
return self._attempt_extraction(
prompt, max_tokens, json_schema, schema_name, attempt,
)
except LLMAuthError:
raise
except LLMRefusalError:
raise
except (LLMRateLimitError, LLMTimeoutError) as e:
last_exception = e
if attempt < MAX_RETRIES:
delay = INITIAL_BACKOFF_SECONDS * (BACKOFF_MULTIPLIER ** (attempt - 1))
logger.warning(
"Transient error on attempt %d/%d for model %s, "
"retrying in %ds: %s",
attempt, MAX_RETRIES, self._model, delay,
type(e).__name__,
)
time.sleep(delay)
raise last_exception # type: ignore[misc]
def _attempt_extraction(
self,
prompt: str,
max_tokens: int,
json_schema: dict,
schema_name: str,
attempt: int,
) -> dict:
"""Single extraction attempt against the Anthropic API."""
logger.info(
"Anthropic extraction attempt %d/%d, model=%s, schema=%s",
attempt, MAX_RETRIES, self._model, schema_name,
)
try:
response = self._client.messages.create(
model=self._model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
output_config={
"format": {
"type": "json_schema",
"schema": json_schema,
},
},
)
except anthropic.AuthenticationError as e:
raise LLMAuthError("Anthropic authentication failed (check API key)") from e
except anthropic.RateLimitError as e:
raise LLMRateLimitError("Anthropic rate limited") from e
except (anthropic.APITimeoutError, anthropic.APIConnectionError) as e:
raise LLMTimeoutError(
f"Anthropic connection error ({type(e).__name__})"
) from e
# Check for truncation - raise and let outer retry loop handle it
if response.stop_reason == "max_tokens":
raise LLMFormatError(
f"Response truncated (max_tokens) for schema {schema_name}"
)
# Check for refusal
if response.stop_reason == "end_turn" and not response.content:
raise LLMRefusalError(
f"Model refused to generate response for schema {schema_name}"
)
# Parse JSON from response
try:
text = response.content[0].text
return json.loads(text)
except (json.JSONDecodeError, IndexError, AttributeError) as e:
raise LLMFormatError(
f"Failed to parse Anthropic response as JSON for "
f"schema {schema_name} ({type(e).__name__})"
) from e

41
connectors/llm/base.py Normal file
View file

@ -0,0 +1,41 @@
"""Base protocol for structured extraction from LLMs."""
from typing import Protocol
class StructuredExtractor(Protocol):
"""Protocol for structured JSON extraction from language models.
This is a structured extraction interface, NOT a general LLM chat
interface. Implementations must return parsed JSON matching the
provided schema.
"""
def extract_json(
self,
prompt: str,
max_tokens: int,
json_schema: dict,
schema_name: str,
) -> dict:
"""Extract structured JSON from a prompt.
Args:
prompt: The extraction prompt to send to the model.
max_tokens: Maximum tokens in the response.
json_schema: JSON Schema that the response must conform to.
schema_name: Human-readable name for the schema (used in
logging and error messages).
Returns:
Parsed JSON dictionary conforming to the provided schema.
Raises:
LLMError: Base class for all LLM-related errors.
LLMAuthError: Invalid API key (permanent, do not retry).
LLMRateLimitError: Rate limited (transient, retry with backoff).
LLMTimeoutError: Timeout or connection error (transient, retry).
LLMFormatError: Invalid JSON or unexpected structure.
LLMRefusalError: Model refused due to safety filter.
"""
...

View file

@ -0,0 +1,55 @@
"""Exception hierarchy for LLM connector errors.
All exceptions inherit from LLMError so callers can catch the base
class for broad error handling or specific subclasses for targeted
recovery strategies.
"""
class LLMError(Exception):
"""Base exception for all LLM-related errors."""
class LLMAuthError(LLMError):
"""Invalid API key or authentication failure.
This is a permanent error - do not retry.
"""
class LLMRateLimitError(LLMError):
"""Rate limited by the provider.
This is a transient error - retry with exponential backoff.
"""
class LLMTimeoutError(LLMError):
"""Timeout or connection error.
This is a transient error - retry with exponential backoff.
"""
class LLMFormatError(LLMError):
"""Invalid JSON or unexpected response structure.
The model returned content that could not be parsed as valid JSON
or did not match the expected schema.
"""
class LLMUnsupportedError(LLMError):
"""Provider does not support a required feature.
For example, the provider does not support structured output
(json_schema response format) and the configuration does not
allow fallback strategies.
"""
class LLMRefusalError(LLMError):
"""Model refused to generate a response.
Typically triggered by safety filters or content policy violations.
"""

137
connectors/llm/factory.py Normal file
View file

@ -0,0 +1,137 @@
"""Factory for creating structured extractors from instance configuration.
Reads the ai: section from instance.yaml (already resolved by config/loader.py)
and creates the appropriate StructuredExtractor implementation.
"""
import logging
from urllib.parse import urlparse
from .anthropic_provider import AnthropicExtractor
from .base import StructuredExtractor
from .openai_compat import OpenAICompatExtractor
logger = logging.getLogger(__name__)
# Default model when not specified in config
DEFAULT_MODEL = "claude-haiku-4-5-20251001"
# Default structured output strategy
DEFAULT_STRUCTURED_OUTPUT = "auto"
def create_extractor(ai_config: dict) -> StructuredExtractor:
"""Create a structured extractor from the ai: config section.
Supports two configuration formats:
New format (explicit provider):
ai:
provider: anthropic | openai_compat
api_key: ${ANTHROPIC_API_KEY}
model: claude-haiku-4-5-20251001
base_url: https://api.example.com/v1 # required for openai_compat
structured_output: auto # strict | json | auto
Legacy format (backward compatible):
ai:
anthropic_api_key: ${ANTHROPIC_API_KEY}
Args:
ai_config: The ai: section dict from instance.yaml,
already resolved by config/loader.py.
Returns:
A StructuredExtractor instance.
Raises:
ValueError: If configuration is invalid or incomplete.
"""
if not ai_config or not isinstance(ai_config, dict):
raise ValueError(
"ai: section in instance.yaml must be a non-empty dict. "
"Example:\n ai:\n provider: anthropic\n api_key: ${ANTHROPIC_API_KEY}"
)
provider = ai_config.get("provider")
# Legacy format detection: anthropic_api_key present, no provider
if not provider and "anthropic_api_key" in ai_config:
api_key = ai_config["anthropic_api_key"]
_validate_api_key(api_key)
model = ai_config.get("model", DEFAULT_MODEL)
logger.info(
"Creating AnthropicExtractor (legacy config), model=%s", model
)
return AnthropicExtractor(api_key=api_key, model=model)
if not provider:
raise ValueError(
"ai.provider is required in instance.yaml. "
"Supported: 'anthropic', 'openai_compat'. "
"Hint: use ${ENV_VAR} syntax for secrets."
)
api_key = ai_config.get("api_key", "")
_validate_api_key(api_key)
model = ai_config.get("model", DEFAULT_MODEL)
if provider == "anthropic":
logger.info("Creating AnthropicExtractor, model=%s", model)
return AnthropicExtractor(api_key=api_key, model=model)
elif provider == "openai_compat":
base_url = ai_config.get("base_url", "")
if not base_url:
raise ValueError(
"ai.base_url is required when provider is 'openai_compat'. "
"Example: base_url: https://api.openai.com/v1"
)
structured_output = ai_config.get(
"structured_output", DEFAULT_STRUCTURED_OUTPUT,
)
if structured_output not in ("strict", "json", "auto"):
raise ValueError(
f"ai.structured_output must be 'strict', 'json', or 'auto', "
f"got '{structured_output}'"
)
safe_url = _sanitize_url(base_url)
logger.info(
"Creating OpenAICompatExtractor, url=%s, model=%s, "
"structured_output=%s",
safe_url, model, structured_output,
)
return OpenAICompatExtractor(
api_key=api_key,
base_url=base_url,
model=model,
structured_output=structured_output,
)
else:
raise ValueError(
f"Unknown ai.provider '{provider}'. "
f"Supported: 'anthropic', 'openai_compat'. "
f"Hint: use ${{ENV_VAR}} syntax for secrets."
)
def _validate_api_key(api_key: str) -> None:
"""Validate that an API key is present and non-empty.
Raises:
ValueError: If api_key is empty or missing.
"""
if not api_key or not api_key.strip():
raise ValueError(
"ai.api_key (or ai.anthropic_api_key) must not be empty. "
"Check that the corresponding environment variable is set "
"and referenced with ${ENV_VAR} syntax in instance.yaml."
)
def _sanitize_url(url: str) -> str:
"""Extract scheme://host from a URL for safe logging."""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"

View file

@ -0,0 +1,305 @@
"""OpenAI-compatible provider for structured JSON extraction.
Supports any OpenAI-compatible API endpoint with progressive fallback
for structured output: json_schema -> json_object -> prompt-based JSON.
"""
import json
import logging
import re
import time
from urllib.parse import urlparse
import openai
from .exceptions import (
LLMAuthError,
LLMFormatError,
LLMRateLimitError,
LLMRefusalError,
LLMTimeoutError,
LLMUnsupportedError,
)
logger = logging.getLogger(__name__)
# Retry configuration
MAX_RETRIES = 3
INITIAL_BACKOFF_SECONDS = 2
BACKOFF_MULTIPLIER = 2
# Regex to strip markdown code fences and extract JSON
_JSON_FENCE_PATTERN = re.compile(r"```(?:json)?\s*\n?(.*?)\n?\s*```", re.DOTALL)
def _sanitize_url(url: str) -> str:
"""Extract scheme://host from a URL for safe logging.
Never logs path, query params, or fragments which may contain
tokens or sensitive information.
"""
parsed = urlparse(url)
return f"{parsed.scheme}://{parsed.netloc}"
def _extract_json_from_text(text: str) -> dict:
"""Parse JSON from potentially markdown-wrapped text.
Tries direct parsing first, then strips markdown code fences,
then falls back to finding content between first { and last }.
Raises:
LLMFormatError: If no valid JSON can be extracted.
"""
# Try direct parse first
stripped = text.strip()
try:
return json.loads(stripped)
except json.JSONDecodeError:
pass
# Try stripping markdown code fences
fence_match = _JSON_FENCE_PATTERN.search(stripped)
if fence_match:
try:
return json.loads(fence_match.group(1).strip())
except json.JSONDecodeError:
pass
# Fallback: find JSON between first { and last }
first_brace = stripped.find("{")
last_brace = stripped.rfind("}")
if first_brace != -1 and last_brace > first_brace:
try:
return json.loads(stripped[first_brace:last_brace + 1])
except json.JSONDecodeError:
pass
raise LLMFormatError(f"Could not extract valid JSON from model response")
class OpenAICompatExtractor:
"""Structured JSON extractor for OpenAI-compatible APIs.
Supports progressive fallback for structured output based on the
configured strategy:
- "strict": json_schema only, raises LLMUnsupportedError if not supported
- "json": json_schema -> json_object fallback
- "auto": json_schema -> json_object -> prompt-based JSON (default)
"""
def __init__(
self,
api_key: str,
base_url: str,
model: str,
structured_output: str = "auto",
) -> None:
"""Initialize the OpenAI-compatible extractor.
Args:
api_key: API key for authentication.
base_url: Base URL of the OpenAI-compatible API.
model: Model identifier.
structured_output: Fallback strategy - "strict", "json", or "auto".
"""
self._client = openai.OpenAI(api_key=api_key, base_url=base_url)
self._model = model
self._structured_output = structured_output
self._safe_url = _sanitize_url(base_url)
def extract_json(
self,
prompt: str,
max_tokens: int,
json_schema: dict,
schema_name: str,
) -> dict:
"""Extract structured JSON using an OpenAI-compatible API.
Attempts structured output strategies in order of preference,
falling back as allowed by the configured strategy.
Args:
prompt: The extraction prompt to send to the model.
max_tokens: Maximum tokens in the response.
json_schema: JSON Schema that the response must conform to.
schema_name: Human-readable name for the schema.
Returns:
Parsed JSON dictionary conforming to the provided schema.
Raises:
LLMAuthError: Invalid API key.
LLMRateLimitError: Rate limited after all retries.
LLMTimeoutError: Timeout/connection error after all retries.
LLMFormatError: Response is not valid JSON.
LLMRefusalError: Model refused to respond.
LLMUnsupportedError: Required feature not supported and no fallback allowed.
"""
strategies = self._get_strategies()
for strategy in strategies:
try:
logger.info(
"OpenAI-compat extraction: url=%s, model=%s, strategy=%s, schema=%s",
self._safe_url, self._model, strategy, schema_name,
)
return self._extract_with_strategy(
prompt, max_tokens, json_schema, schema_name, strategy,
)
except LLMUnsupportedError:
logger.info(
"Strategy %s not supported at %s, trying next fallback",
strategy, self._safe_url,
)
continue
raise LLMUnsupportedError(
f"No supported structured output strategy for {self._safe_url} "
f"with configured mode '{self._structured_output}'"
)
def _get_strategies(self) -> list[str]:
"""Get ordered list of strategies to try based on configuration."""
if self._structured_output == "strict":
return ["json_schema"]
elif self._structured_output == "json":
return ["json_schema", "json_object"]
else: # "auto"
return ["json_schema", "json_object", "text"]
def _extract_with_strategy(
self,
prompt: str,
max_tokens: int,
json_schema: dict,
schema_name: str,
strategy: str,
) -> dict:
"""Execute extraction with a specific structured output strategy."""
last_exception: Exception | None = None
for attempt in range(1, MAX_RETRIES + 1):
try:
return self._attempt_extraction(
prompt, max_tokens, json_schema, schema_name,
strategy, attempt,
)
except LLMAuthError:
raise
except LLMRefusalError:
raise
except LLMUnsupportedError:
raise
except (LLMRateLimitError, LLMTimeoutError) as e:
last_exception = e
if attempt < MAX_RETRIES:
delay = INITIAL_BACKOFF_SECONDS * (BACKOFF_MULTIPLIER ** (attempt - 1))
logger.warning(
"Transient error on attempt %d/%d for %s model %s, "
"retrying in %ds: %s",
attempt, MAX_RETRIES, self._safe_url,
self._model, delay, type(e).__name__,
)
time.sleep(delay)
raise last_exception # type: ignore[misc]
def _attempt_extraction(
self,
prompt: str,
max_tokens: int,
json_schema: dict,
schema_name: str,
strategy: str,
attempt: int,
) -> dict:
"""Single extraction attempt with a specific strategy."""
logger.info(
"OpenAI-compat attempt %d/%d, url=%s, model=%s, strategy=%s",
attempt, MAX_RETRIES, self._safe_url, self._model, strategy,
)
messages = [{"role": "user", "content": prompt}]
kwargs: dict = {
"model": self._model,
"max_tokens": max_tokens,
"messages": messages,
}
if strategy == "json_schema":
kwargs["response_format"] = {
"type": "json_schema",
"json_schema": {
"name": schema_name,
"strict": True,
"schema": json_schema,
},
}
elif strategy == "json_object":
kwargs["response_format"] = {"type": "json_object"}
elif strategy == "text":
# Append JSON instruction to prompt for text-based fallback
messages = [
{
"role": "user",
"content": prompt + "\n\nIMPORTANT: Respond with valid JSON only, no markdown.",
},
]
kwargs["messages"] = messages
try:
response = self._client.chat.completions.create(**kwargs)
except openai.AuthenticationError as e:
raise LLMAuthError(
f"OpenAI-compat authentication failed at {self._safe_url} (check API key)"
) from e
except openai.RateLimitError as e:
raise LLMRateLimitError(
f"OpenAI-compat rate limited at {self._safe_url}"
) from e
except (openai.APITimeoutError, openai.APIConnectionError) as e:
raise LLMTimeoutError(
f"OpenAI-compat connection error at {self._safe_url} ({type(e).__name__})"
) from e
except openai.BadRequestError as e:
# json_schema format not supported by this endpoint
error_msg = str(e).lower()
if "response_format" in error_msg or "json_schema" in error_msg:
raise LLMUnsupportedError(
f"Structured output strategy '{strategy}' not supported "
f"at {self._safe_url}"
) from e
raise LLMFormatError(
f"Bad request at {self._safe_url} ({type(e).__name__})"
) from e
choice = response.choices[0]
# Check for truncation - raise and let outer retry loop handle it
if choice.finish_reason == "length":
raise LLMFormatError(
f"Response truncated (max_tokens) for schema {schema_name} "
f"at {self._safe_url}"
)
# Check for refusal
content = choice.message.content
if not content:
raise LLMRefusalError(
f"Model at {self._safe_url} refused to generate response "
f"for schema {schema_name}"
)
# Parse JSON from response
if strategy == "text":
return _extract_json_from_text(content)
try:
return json.loads(content)
except json.JSONDecodeError as e:
raise LLMFormatError(
f"Failed to parse response as JSON for schema {schema_name} "
f"at {self._safe_url} ({type(e).__name__})"
) from e

View file

@ -126,5 +126,6 @@ catalog:
| `SENDGRID_API_KEY` | For password auth emails |
| `TELEGRAM_BOT_TOKEN` | For Telegram notifications |
| `ANTHROPIC_API_KEY` | For Corporate Memory AI |
| `LLM_API_KEY` | API key for LLM proxy (LiteLLM, OpenRouter, etc.) |
| `JIRA_WEBHOOK_SECRET` | For Jira integration |
| `CONFIG_DIR` | Override config directory path |

326
docs/llm-routing.md Normal file
View file

@ -0,0 +1,326 @@
# Plan: Modular LLM Routing for Corporate Memory
> Reviewed by: Claude Opus (author), Google Gemini, Claude Sonnet, OpenAI GPT-5.4
> Feedback incorporated from all three external reviewers.
## Context
Corporate Memory is a feature that reads team members' local notes (CLAUDE.local.md),
sends them to a small AI model (Claude Haiku) for knowledge extraction, and builds
a shared knowledge base. Currently it's hardwired to call Anthropic's API directly.
Different clients deploying this platform use different AI providers:
| Client | AI Provider | Why |
|--------|------------|-----|
| Groupon | LiteLLM proxy | Corporate AI gateway, cost control, audit |
| Keboola | Direct Anthropic | Simple setup, single provider |
| Future client A | OpenRouter | Multi-model access, cost optimization |
| Future client B | Google Gemini | Existing Google Cloud relationship |
**Problem**: The code only works with Anthropic. Adding a second client means duplicating
or rewriting the AI calling logic.
**Solution**: Extract the AI calling logic into a modular connector that each instance
configures for its own provider. The connector lives in the open-source repo (code),
the configuration lives in the private instance repo (config).
## Design Principles
### 1. Structured Extraction, Not General AI
This connector has one job: send a prompt, get back structured JSON.
It is NOT a general-purpose AI chat interface. The naming and interface reflect this:
`StructuredExtractor` (not "LLMProvider"), `extract_json()` (not "chat" or "generate").
This keeps the scope tight and the interface honest. If we need general AI capabilities
later, we build a separate abstraction.
### 2. Instance Config Drives Provider Selection
Each deployment configures its AI provider in `instance.yaml` (the same file that
already configures authentication, branding, data sources, and catalog integration).
Secrets use `${ENV_VAR}` references, resolved at load time by the existing config loader.
The open-source code never knows which provider it's talking to. It receives a configured
extractor and calls `extract_json()`.
### 3. Backward Compatibility
Existing deployments using `ai.anthropic_api_key` in their config continue to work
without changes. The factory recognizes the legacy config shape and creates the
appropriate provider automatically. No migration step required for existing instances.
### 4. Structured Output Strategy (Configurable)
Not all AI providers support JSON schema enforcement equally. The connector supports
three levels, but the operator controls which are allowed:
1. **JSON Schema mode** — provider enforces the exact schema (best quality)
2. **JSON Object mode** — provider guarantees valid JSON but no schema (good quality)
3. **Prompt-based JSON** — instructions in the prompt ask for JSON (acceptable quality)
By default, all three layers are available as progressive fallback. But the operator
can restrict this in config:
```yaml
ai:
provider: "openai_compat"
# --- Structured output quality control ---
# AI models can return JSON in three ways, each with different reliability:
#
# Layer 1 - "json_schema" (best):
# The provider enforces an exact schema. Every field, type, and structure
# is guaranteed. Available on: Anthropic, OpenAI, Claude via LiteLLM.
#
# Layer 2 - "json_object" (good):
# The provider guarantees valid JSON, but does not enforce a specific schema.
# Fields may be missing or have wrong types. Available on most providers.
#
# Layer 3 - "prompt" (acceptable):
# The AI is asked to respond in JSON via instructions in the prompt.
# No technical enforcement — the model may still return invalid JSON.
# Works everywhere, but least reliable.
#
# "strict" = only Layer 1. Fail if provider doesn't support json_schema.
# Use when data quality is non-negotiable.
# "json" = Layer 1, fall back to Layer 2. No prompt-based fallback.
# Good balance of quality and compatibility.
# "auto" = All three layers as progressive fallback. Maximum compatibility.
# Use when you'd rather get imperfect data than no data.
structured_output: "strict"
```
When set to `"strict"`, the connector will NOT fall back to weaker strategies.
If the provider doesn't support JSON schema, the extraction fails with a clear error.
This is the right choice when data quality is non-negotiable.
### 5. Fail-Safe by Default
- Missing config → Corporate Memory logs a warning and skips AI extraction (doesn't crash)
- AI call fails → item marked as "unsafe" (conservative, nothing leaks)
- Truncated response → detected and retried once
- Auth error → fails fast with clear message (don't retry forever)
- Rate limit → waits and retries with backoff
### 6. Zero Secrets in Logs
The connector NEVER logs:
- API keys, tokens, or any secret values
- Prompt content (may contain user notes with sensitive data)
- Response content (may contain extracted knowledge before sensitivity check)
- Full URLs with query parameters (may contain tokens)
What IS logged:
- Provider type and model name
- Sanitized base URL (scheme + host only, no path/query)
- Structured output strategy selected
- Call duration (latency)
- Error classification (auth/rate_limit/timeout/format — never the error body)
- Whether fallback was triggered
This is a hard rule, not a guideline. API keys and user content must never
appear in logs, stdout, stderr, or error messages propagated to callers.
## Architecture
### Where the code lives
```
OSS Repo (open-source, shared):
connectors/llm/ ← NEW: AI provider abstraction
base.py Interface definition
anthropic_provider.py Direct Anthropic API
openai_compat.py Any OpenAI-compatible proxy
factory.py Creates the right provider from config
services/corporate_memory/
collector.py ← MODIFIED: uses connector instead of direct API calls
Instance Repo (private, per-client):
config/instance.yaml ← MODIFIED: new ai: section
.env ← MODIFIED: new LLM_API_KEY secret
```
### How it flows
```
instance.yaml (ai: section)
Config loader resolves ${ENV_VAR} secrets
Factory reads provider type, creates extractor
Corporate Memory calls extractor.extract_json(prompt, schema)
Extractor routes to the right API:
├─ Anthropic SDK → api.anthropic.com/v1/messages
└─ OpenAI SDK → litellm.groupondev.com/v1/chat/completions
openrouter.ai/v1/chat/completions
any OpenAI-compatible endpoint
```
### Config examples
**Groupon (LiteLLM proxy):**
```yaml
ai:
provider: "openai_compat"
base_url: "https://litellm.groupondev.com"
api_key: "${LLM_API_KEY}"
model: "claude-haiku-4-5-20251001"
```
**Keboola (direct Anthropic):**
```yaml
ai:
provider: "anthropic"
api_key: "${ANTHROPIC_API_KEY}"
model: "claude-haiku-4-5-20251001"
```
**Legacy (existing deployments, no changes needed):**
```yaml
ai:
anthropic_api_key: "${ANTHROPIC_API_KEY}"
```
## What We're Improving
### A. From Hardwired to Pluggable
**Before**: One provider, baked into the code. Changing provider = changing code.
**After**: Provider is a config choice. Switching from Anthropic to LiteLLM to OpenRouter
is a YAML change + secret rotation. No code touches needed.
### B. From Fragile to Resilient
**Before**: API error = entire collection run fails. No retries.
**After**:
- Transient errors (rate limits, timeouts, network) → automatic retry with backoff
- Permanent errors (bad API key, unsupported model) → fail fast, clear error message
- Truncated responses (model hit token limit) → detected, retried with note
- Model refuses request → logged, item skipped safely
### C. From All-or-Nothing to Progressive Degradation
**Before**: Structured output works or it doesn't. Binary.
**After**: Three fallback layers (schema → json_object → prompt-based). The connector
adapts to what each provider actually supports instead of assuming capabilities.
### D. From Silent to Observable
**Before**: No visibility into what the AI extraction does.
**After**:
- Which provider/model is being used (logged at startup)
- Which structured output strategy was selected (logged once)
- How long each call takes (logged per call)
- Whether fallback was triggered (logged as warning)
- Clear error classification in logs
### E. From Coupled to Separated
**Before**: AI provider choice is an engineering decision embedded in code.
**After**: AI provider choice is an operations decision in instance config.
Each client controls their own provider, model, and API gateway independently.
## Error Handling Strategy
| Error Type | What Happens | Why |
|-----------|-------------|-----|
| Missing `ai:` config | Corporate Memory skips AI extraction, logs warning | Don't crash the whole service |
| Invalid API key | Fail fast, log error, skip collection run | Don't waste retries on permanent failure |
| Rate limit (429) | Wait + retry with exponential backoff (3 attempts) | Transient, will resolve |
| Network timeout | Retry once, then fail | Might be transient |
| Truncated response | Detect via finish_reason, retry once | Model hit token limit |
| Model refusal | Log, mark item as unsafe | Conservative: don't share uncertain content |
| Invalid JSON response | Log, mark item as unsafe | Better to skip than crash |
| Structured output unsupported | Fall back to json_object, then prompt-based | Adapt to provider capabilities |
## Scope Boundaries
**In scope (v1):**
- Anthropic direct provider (existing behavior, tested)
- OpenAI-compatible proxy provider (LiteLLM, verified against Groupon proxy)
- Backward compatibility with existing `ai.anthropic_api_key` config
- Three-layer structured output fallback
- Custom error hierarchy (auth / rate limit / timeout / format)
- Retry with backoff for transient errors
- Corporate Memory collector integration
**Explicitly NOT in scope (future):**
- Azure OpenAI, OpenRouter, Gemini — listed as "untested" until verified per-provider
- General-purpose AI chat/generation interface
- Streaming responses
- Multi-turn conversations
- Token usage tracking / cost monitoring (v2 consideration)
- Provider capability auto-detection at startup
## Testing Strategy
### Unit Tests (connector internals)
- Factory creates correct provider from each config shape
- Factory handles legacy `ai.anthropic_api_key` config
- Missing/invalid config raises clear errors
- Each provider formats API calls correctly (mocked SDK)
- Structured output fallback chain works
- Error classification (auth vs rate limit vs timeout)
### Integration Tests (Corporate Memory behavior)
- Full collection run with mocked provider
- Skip when no files changed (hash check)
- Preserve existing item IDs across runs
- Sensitivity check runs only on new items
- Fail-closed on sensitivity check errors
- user_hashes.json written only after successful processing
- Graceful degradation when `ai:` config is missing
### Manual Verification (before production)
- Dry-run against actual Groupon LiteLLM proxy
- Verify structured output works through proxy
- Verify sensitivity check works through proxy
- Full collection produces valid knowledge.json
## Deployment
The existing `deploy.sh` handles dependency installation from `requirements.txt`,
so no manual pip install is needed. The deployment sequence:
1. Add `openai` to `requirements.txt` (OSS repo)
2. Update `collector.py` to use new connector (OSS repo)
3. Add `ai:` section to `instance.yaml` (instance repo)
4. Add `LLM_API_KEY` secret to GHA secrets and deploy.yml (instance repo)
5. Add `CONFIG_DIR` to the wrapper script `collect-knowledge` (OSS repo)
6. Push both repos → CI/CD deploys automatically
7. Verify via `--dry-run` on server
**Rollback**: Revert both repos to previous commit. The legacy config path
means existing `ai.anthropic_api_key` still works if we need to roll back.
## Files to Modify
| File | Repo | Change |
|------|------|--------|
| `connectors/llm/` (5 new files) | OSS | New connector module |
| `services/corporate_memory/collector.py` | OSS | Use connector instead of direct API |
| `server/bin/collect-knowledge` | OSS | Add CONFIG_DIR |
| `requirements.txt` | OSS | Add `openai>=1.0.0` |
| `server/deploy.sh` | OSS | Add LLM_API_KEY to env propagation |
| `config/.env.template` | OSS | Document LLM_API_KEY |
| `config/instance.yaml.example` | OSS | Expanded ai: section with examples |
| `docs/CONFIGURATION.md` | OSS | Add AI provider docs |
| `tests/test_llm_connector.py` | OSS | New: connector tests |
| `tests/test_corporate_memory.py` | OSS | New/expanded: behavior tests |
| `config/instance.yaml` | Instance | Add ai: section for Groupon |
| `.github/workflows/deploy.yml` | Instance | Add LLM_API_KEY to .env |
| `env.example` | Instance | Document LLM_API_KEY |
## Risk Assessment
| Risk | Level | Mitigation |
|------|-------|-----------|
| LiteLLM structured output translation | Medium | Three-layer fallback + manual verification before deploy |
| Config migration breaks existing instances | Low | Backward compat shim for legacy config shape |
| New `openai` dependency conflicts | Low | Standard package, deploy.sh handles install |
| Corporate Memory regression | Medium | Expanded behavior tests covering all current logic |
| Systemd/wrapper script CONFIG_DIR | Low | Follows existing pattern from other services |

View file

@ -55,6 +55,9 @@ sendgrid>=6.11.0
# anthropic - Claude API client for HAIKU-based knowledge extraction
anthropic>=0.39.0
# OpenAI-compatible API client for LLM proxy routing (LiteLLM, OpenRouter, etc.)
openai>=1.0.0
# Sample data generation (development/testing)
# faker - realistic synthetic data for demo datasets
faker>=24.0.0

View file

@ -27,5 +27,9 @@ if [[ -f "${REPO_DIR}/.env" ]]; then
set +a
fi
# Config directory for instance.yaml
export CONFIG_DIR="${APP_DIR}/instance/config"
export PYTHONPATH="${REPO_DIR}"
# Run the collector
exec "$VENV_PYTHON" -m services.corporate_memory "$@"

View file

@ -313,7 +313,7 @@ log "Creating data sync .env file..."
for var in TELEGRAM_BOT_TOKEN DESKTOP_JWT_SECRET SENDGRID_API_KEY \
JIRA_SLA_EMAIL JIRA_SLA_API_TOKEN JIRA_CLOUD_ID \
EMAIL_FROM_ADDRESS EMAIL_FROM_NAME ALLOWED_EMAILS \
ANTHROPIC_API_KEY; do
ANTHROPIC_API_KEY LLM_API_KEY; do
if [[ -n "${!var:-}" ]]; then
echo "${var}=${!var}"
fi

View file

@ -19,7 +19,8 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import anthropic
from connectors.llm import create_extractor
from connectors.llm.exceptions import LLMError
from .prompts import CATALOG_REFRESH_PROMPT, SENSITIVITY_CHECK_PROMPT
@ -30,9 +31,6 @@ COLLECTION_LOG = CORPORATE_MEMORY_DIR / "collection.log"
USER_HASHES_FILE = CORPORATE_MEMORY_DIR / "user_hashes.json"
HOME_BASE = Path("/home")
# HAIKU model for cost-effective extraction
HAIKU_MODEL = "claude-haiku-4-5-20251001"
# Configure logging
logging.basicConfig(
level=logging.INFO,
@ -125,14 +123,6 @@ def _generate_id(content: str) -> str:
return f"km_{h}"
def _get_claude_client() -> anthropic.Anthropic:
"""Get Anthropic client with API key from environment."""
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable is required")
return anthropic.Anthropic(api_key=api_key)
def _find_claude_local_files() -> list[tuple[str, Path]]:
"""Find all CLAUDE.local.md files in user home directories.
@ -309,7 +299,7 @@ def _process_catalog_response(
return result
def check_sensitivity(client: anthropic.Anthropic, item: dict) -> bool:
def check_sensitivity(extractor, item: dict) -> bool:
"""Check if a knowledge item is safe to share.
Returns True if safe, False if contains sensitive data.
@ -321,29 +311,20 @@ def check_sensitivity(client: anthropic.Anthropic, item: dict) -> bool:
)
try:
response = client.messages.create(
model=HAIKU_MODEL,
max_tokens=256,
messages=[{"role": "user", "content": prompt}],
output_config={
"format": {
"type": "json_schema",
"schema": SENSITIVITY_SCHEMA,
},
},
result = extractor.extract_json(
prompt, max_tokens=256,
json_schema=SENSITIVITY_SCHEMA, schema_name="sensitivity_check",
)
result = json.loads(response.content[0].text)
if not result.get("safe", False):
reason = result.get("reason", "unknown")
logger.info(f"Filtered sensitive item: {item.get('title', 'untitled')} - {reason}")
logger.info("Filtered sensitive item id=%s - %s", item.get("id", "unknown"), reason)
return False
return True
except (json.JSONDecodeError, anthropic.APIError) as e:
logger.warning(f"Sensitivity check failed, assuming unsafe: {e}")
except LLMError as e:
logger.warning("Sensitivity check failed, assuming unsafe: %s", type(e).__name__)
return False
@ -389,12 +370,19 @@ def collect_all(dry_run: bool = False) -> dict:
stats["skipped"] = True
return stats
# Step 2: Initialize client
# Step 2: Initialize AI extractor
try:
client = _get_claude_client()
except ValueError as e:
from config.loader import load_instance_config
instance_config = load_instance_config()
ai_config = instance_config.get("ai")
if not ai_config:
logger.warning("No ai: section in instance.yaml, skipping catalog refresh")
stats["skipped"] = True
return stats
extractor = create_extractor(ai_config)
except (ValueError, FileNotFoundError) as e:
stats["errors"].append(str(e))
logger.error(str(e))
logger.error("Failed to initialize AI extractor: %s", e)
return stats
# Step 3: Load existing catalog
@ -420,30 +408,17 @@ def collect_all(dry_run: bool = False) -> dict:
)
try:
response = client.messages.create(
model=HAIKU_MODEL,
max_tokens=8192,
messages=[{"role": "user", "content": prompt}],
output_config={
"format": {
"type": "json_schema",
"schema": CATALOG_SCHEMA,
},
},
response_data = extractor.extract_json(
prompt, max_tokens=8192,
json_schema=CATALOG_SCHEMA, schema_name="catalog_refresh",
)
response_data = json.loads(response.content[0].text)
response_items = response_data.get("items", [])
stats["items_extracted"] = len(response_items)
logger.info(f"HAIKU returned {len(response_items)} catalog items")
logger.info(f"Extractor returned {len(response_items)} catalog items")
except json.JSONDecodeError as e:
logger.error(f"Failed to parse HAIKU response as JSON: {e}")
stats["errors"].append(f"JSON parse error: {e}")
return stats
except anthropic.APIError as e:
logger.error(f"HAIKU API error: {e}")
stats["errors"].append(f"API error: {e}")
except LLMError as e:
logger.error("LLM extraction error: %s", type(e).__name__)
stats["errors"].append(f"LLM error: {type(e).__name__}")
return stats
# Step 6: Process response - map to existing IDs
@ -460,10 +435,10 @@ def collect_all(dry_run: bool = False) -> dict:
stats["items_preserved"] += 1
else:
# New item - run sensitivity check
if check_sensitivity(client, item):
if check_sensitivity(extractor, item):
final_items[item_id] = item
stats["items_new"] += 1
logger.info(f"Added new knowledge item: {item['title']}")
logger.info("Added new knowledge item: id=%s", item_id)
else:
stats["items_filtered"] += 1

View file

@ -13,6 +13,8 @@ ExecStart=/usr/local/bin/collect-knowledge
# Environment
EnvironmentFile=/opt/data-analyst/.env
EnvironmentFile=/opt/data-analyst/repo/.env
Environment=CONFIG_DIR=/opt/data-analyst/instance/config
Environment=PYTHONPATH=/opt/data-analyst/repo
# Security hardening - root needed to read /home/*/CLAUDE.local.md
ProtectSystem=strict

1347
tests/test_llm_connector.py Normal file

File diff suppressed because it is too large Load diff