diff --git a/config/instance.yaml.example b/config/instance.yaml.example index 5c542d1..a711738 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -205,7 +205,48 @@ ai: # model: "anthropic/claude-3-haiku" # structured_output: "auto" -# --- User display (for Corporate Memory avatars) --- +# --- Corporate Memory governance (optional) --- +# Controls how AI-extracted knowledge is reviewed and distributed. +# If not present, system operates in legacy mode (democratic wiki, no admin review). +# +# corporate_memory: +# # How knowledge reaches users: +# # "mandatory_only" — admin controls everything, no user voting +# # "admin_curated" — admin controls, users vote as feedback signal +# # "hybrid" — mandatory from admin + optional from user voting (default) +# distribution_mode: "hybrid" +# +# # How new AI-extracted items enter the system: +# # "review_queue" — nothing published without admin approval (default) +# # "auto_publish" — items go live immediately, admin intervenes retroactively +# # "threshold" — high-confidence auto-publish, low-confidence to review queue +# approval_mode: "review_queue" +# +# # Default review period for approved/mandatory items (months) +# review_period_months: 6 +# +# # Notify km_admins about new pending items +# notify_on_new_items: true + +# --- User groups for audience targeting (optional) --- +# Used with Corporate Memory governance to target mandatory knowledge to specific groups. +# +# groups: +# finance: +# label: "Finance & Analytics" +# members: ["analyst1@company.com", "analyst2@company.com"] +# engineering: +# label: "Engineering" +# members: ["dev1@company.com", "dev2@company.com"] + +# --- User display and permissions --- +# Corporate Memory avatars + optional km_admin flag for governance. +# users: +# admin@company.com: +# display_name: "Admin User" +# km_admin: true # Corporate Memory admin (approve/mandate knowledge) +# analyst@company.com: +# display_name: "Analyst User" users: {} # --- Username mapping (webapp email -> server username, only if different) --- diff --git a/docs/corporate-memory-governance.md b/docs/corporate-memory-governance.md new file mode 100644 index 0000000..a2d78dc --- /dev/null +++ b/docs/corporate-memory-governance.md @@ -0,0 +1,489 @@ +# Corporate Memory Governance — Design Document + +> Reviewed by: Google Gemini, Claude Sonnet 4.5, OpenAI GPT-5.4 +> Version: 2 (feedback incorporated from all three reviewers) + +## Problem + +Today's Corporate Memory is a democratic wiki: AI extracts knowledge, everyone +votes, each person picks what they want. This doesn't work for enterprise: + +- **No authority** — CEO can't mandate "everyone must know this" +- **No quality gate** — AI output goes live without human review +- **Depends on user activity** — if nobody votes, nothing gets distributed +- **No explanation** — users don't know WHY they're getting specific knowledge +- **No audit trail** — no record of who decided what +- **No expiry** — knowledge goes stale silently + +## Solution + +Add a governance layer where administrators curate and control knowledge +distribution. The system becomes self-operating: AI extracts, admins approve, +mandatory items distribute automatically to all users. + +Everything is configurable per instance — each client picks the governance +model that fits their organization. + +This is **v1 admin curation** — a credible first step toward full enterprise +governance, with a clear path to audience targeting, attestation, and compliance +features in future versions. + +--- + +## Three Governance Modes (configurable) + +### Mode 1: "mandatory_only" + +CEO/admin has full control. Users receive what's mandated, nothing else. + +``` +AI extracts → Review queue → Admin approves/rejects → Mandatory items → target users +``` + +- Users see the knowledge catalog in webapp (read-only, no voting) +- Each mandatory item has an explanation ("Why this matters") +- Users cannot add or remove items from their rules +- Users CAN flag items for correction ("Report issue" button) + +**Best for:** Compliance-heavy environments, small teams with strong leadership. + +### Mode 2: "admin_curated" + +Admin curates, users give feedback via voting (but votes don't distribute). + +``` +AI extracts → Review queue → Admin approves/rejects/mandates + ↓ + Mandatory items → target users (automatic) + Approved items → visible in catalog (users vote as feedback) +``` + +- Voting is a signal for admins: "people find this useful" +- Admin sees vote counts when deciding what to mandate +- Users see catalog with mandatory badge + vote buttons +- Distribution is always admin-driven + +**Best for:** Medium-sized teams where admin wants user input but retains control. + +### Mode 3: "hybrid" (default) + +Two distribution channels: mandatory (admin) + optional (user choice). + +``` +AI extracts → Review queue → Admin approves/rejects/mandates + ↓ + Mandatory items → target users (automatic) + Approved items → catalog → users upvote → personal rules +``` + +- Mandatory items go to target audience (no opt-out) +- Approved items are available for individual opt-in via voting (like today) +- Users get mandatory + their personal picks +- Best of both worlds + +**Best for:** Larger teams, diverse roles, balance of governance and autonomy. + +--- + +## Approval Workflow (configurable) + +### Option A: "review_queue" (default) + +New items from AI go to a pending queue. Nothing reaches users until +an admin reviews it. + +``` +AI extraction → status: "pending" → Admin reviews → approve / reject / mandate +``` + +- Admin sees a queue of pending items in the webapp +- **Batch operations**: checkboxes + "Approve selected" / "Reject selected" buttons +- Can approve (visible in catalog), reject (hidden), or mandate (goes to target users) +- Can edit title/content before approving +- Can add "Why this matters" explanation for mandatory items +- Queue has filters: by category, by source user, by date, by AI confidence +- Keyboard shortcuts for fast review (j/k navigate, a/r/m = approve/reject/mandate) + +### Option B: "auto_publish" + +Items go live immediately (like today). Admin intervenes retroactively. + +``` +AI extraction → status: "approved" (auto) → Admin can veto or mandate later +``` + +- Less admin work, faster knowledge flow +- Risk: bad content visible until admin catches it +- Admin gets digest of new items (e.g., Telegram notification) +- Recommended only for trusted, small-team environments + +### Option C: "threshold" + +AI assigns a confidence score during extraction. High confidence = auto-publish, +low = review queue. + +``` +AI extraction → confidence > threshold? → auto-publish (approved) + confidence ≤ threshold? → review queue (pending) +``` + +- Admin only reviews borderline items +- Reduces review burden while maintaining quality gate +- Threshold configurable in instance.yaml +- Confidence score visible to admin in review queue (helps calibrate trust over time) +- **Implementation note**: requires adding a confidence assessment step to the + AI extraction prompt (new field in CATALOG_SCHEMA) + +--- + +## Admin Role: Per-User Flag + +No new role system. Existing `users:` section in instance.yaml gets a flag: + +```yaml +users: + ceo@company.com: + display_name: "Jan Novák" + km_admin: true + lead@company.com: + display_name: "Petra Dvořáková" + km_admin: true # multiple admins supported + analyst@company.com: + display_name: "Anna Kovářová" + # no km_admin = regular user +``` + +**Multiple admins** are supported. Conflict resolution: last write wins with +audit trail. No locking — concurrent admin actions are recorded, the most +recent state is authoritative. + +`km_admin: true` grants: +- Access to review queue in webapp +- Approve / reject / mandate buttons on items +- Batch operations (select multiple, act on all) +- Edit item content before publishing +- Add "Why this matters" explanation +- Set audience targeting for mandatory items +- View all items including pending and rejected +- View audit log +- Emergency revoke capability + +Regular users see: +- Approved and mandatory items only +- Mandatory items highlighted with explanation +- Voting buttons (when governance mode allows) +- "Report issue" button on any item +- Their personal rules list + +--- + +## Item Lifecycle + +``` + ┌─────────┐ + AI extracts ──→ │ PENDING │ (only admins see) + └────┬────┘ + │ + ┌──────────┼──────────┐ + ↓ ↓ ↓ + ┌────────┐ ┌─────────┐ ┌──────────┐ + │APPROVED│ │MANDATORY│ │ REJECTED │ + └───┬────┘ └────┬────┘ └──────────┘ + │ │ + │ ┌────┴────┐ + │ ↓ ↓ + │ ┌───────┐ ┌───────┐ + │ │REVOKED│ │EXPIRED│ + │ └───────┘ └───────┘ + ↓ + catalog + (opt-in) +``` + +**Statuses:** +- **pending** — new from AI, waiting for admin review +- **approved** — admin approved, visible in catalog, users can opt-in +- **mandatory** — admin mandated, distributed to target audience automatically +- **rejected** — admin rejected, not visible to anyone (kept for audit) +- **revoked** — was mandatory, emergency pulled by admin (removed from rules on next sync) +- **expired** — past its review date, moved to re-review queue + +**Allowed transitions:** +- pending → approved / mandatory / rejected +- approved → mandatory (promote) / rejected (remove) +- mandatory → approved (demote to optional) / revoked (emergency pull) +- rejected → approved (reinstate) +- revoked → approved / mandatory (re-enable after fix) +- expired → approved / mandatory (re-confirmed) / rejected (retire) + +**Edited items**: When an admin edits a mandatory item, the item enters +"needs_reapproval" state — it stays distributed but is flagged in admin +dashboard for review. This prevents silent content drift. + +--- + +## Audience Targeting (v1: simple groups) + +Mandatory items can target specific groups instead of all users: + +```yaml +# In the admin UI when mandating: +audience: "all" # everyone (default) +audience: "group:finance" # only finance team +audience: "group:engineering" # only engineering +``` + +Groups are defined in instance.yaml: + +```yaml +groups: + finance: + label: "Finance & Analytics" + members: ["analyst1@co.com", "analyst2@co.com"] + engineering: + label: "Engineering" + members: ["dev1@co.com", "dev2@co.com"] +``` + +This is intentionally simple for v1. Future versions can support +role-based targeting, department hierarchies, or LDAP/SSO group sync. + +--- + +## Audit Log + +Every admin action is recorded in an immutable append-only log: + +``` +/data/corporate-memory/audit.jsonl +``` + +Each line is a JSON object: +- `timestamp` — when the action happened +- `admin` — who performed it (email) +- `action` — what happened (approved, rejected, mandated, revoked, edited, etc.) +- `item_id` — which item +- `details` — action-specific (e.g., old status, new status, reason, audience) + +The audit log is: +- **Append-only** — never edited or truncated (compliance requirement) +- **Separate from knowledge.json** — survives resets +- **Viewable by km_admins** in webapp (filterable by date, admin, action) +- **Exportable** as CSV for compliance reporting + +--- + +## Knowledge Freshness & Expiry + +### Review dates + +When approving or mandating, admin can optionally set: +- `review_by` — date when item should be re-reviewed (default: 6 months) + +Items past their `review_by` date: +- Status changes to **expired** +- Appear in admin's "Needs re-review" queue +- If mandatory: stay distributed until admin acts (no surprise removals) +- Admin can re-confirm (resets review date) or retire (reject) + +### Stale detection + +System flags items that may be stale: +- Source CLAUDE.local.md files were changed/removed but item wasn't re-extracted +- Item hasn't been re-confirmed in > 12 months (configurable) +- Multiple users flagged "Report issue" on the item + +--- + +## Emergency Controls + +### Emergency Revoke + +Any km_admin can immediately revoke a mandatory item: +- Status changes to **revoked** +- Rules regenerated for all affected users immediately +- Item removed from `.claude_rules/` on next sync +- Audit log records: who revoked, when, why +- Revoked items visible in admin dashboard with "Revoked" badge + +### User "Report Issue" Button + +All users (not just admins) can flag any visible item: +- Button on every item in the catalog +- Opens text field for description ("Contains outdated info", "Incorrect SQL", etc.) +- Report goes to km_admins as notification +- Admin can review and act (edit, revoke, reject) +- Prevents situations where admin is unavailable and bad item stays + +--- + +## What Changes for Each Actor + +### For the Admin / CEO + +**Review Queue (new webapp section)** +- List of pending items with AI-extracted content, category, source users +- **Batch operations**: checkboxes + "Approve selected" / "Reject selected" +- Keyboard shortcuts for fast review +- Filters: category, source user, date, AI confidence +- For each item: Approve / Reject / Mandate buttons +- Mandate requires: "Why this matters" text + audience selection +- Edit button to refine content before publishing +- Dashboard: pending count, approved count, mandatory count, expired count + +**Audit & Reporting** +- Audit log viewer (filterable by date, admin, action) +- Export audit log as CSV +- Coverage stats: how many mandatory items, how many users have them + +**Notifications** +- After AI collection: "N new items awaiting review" +- When user reports issue on an item +- When items reach their review date + +### For the Regular User + +**Knowledge Catalog (redesigned)** +- Mandatory items section at top, highlighted with distinct badge +- Each mandatory item shows "Why this matters" explanation from admin +- "Report issue" button on every item +- Below: approved items (browsable, searchable, filterable) +- Voting buttons visible when governance mode allows +- Clean, read-focused UI — no admin clutter + +**Rules distribution** +- Mandatory items → automatically in `.claude/rules/` after next sync +- Optional items → user upvotes in hybrid mode (like today) +- Revoked items → automatically removed on next sync +- User doesn't need to do anything for mandatory knowledge — it just appears + +### For the AI (Haiku) + +Extraction logic stays the same with one addition for threshold mode: +- New optional field in CATALOG_SCHEMA: `confidence` (float 0-1) +- AI rates its confidence that each extracted item is valuable and accurate +- Used by threshold approval mode to auto-publish high-confidence items + +--- + +## Migration from Current System + +When upgrading from the current democratic wiki: + +1. **Existing knowledge.json items** get `status: "approved"` (not pending — + they already passed sensitivity check and may have votes) +2. **Existing votes** are preserved (work as before in hybrid/admin_curated modes) +3. **Existing rules** in `.claude_rules/` continue working +4. **No user disruption** — everything looks the same until admin starts curating +5. **Admin enables governance** by setting `distribution_mode` and `approval_mode` + in instance.yaml — until then, system behaves exactly as today + +The migration is **non-breaking and gradual**. An instance can run in legacy +mode indefinitely. + +--- + +## Configuration in instance.yaml + +```yaml +corporate_memory: + # How knowledge reaches users + # "mandatory_only" — admin controls everything, no user voting + # "admin_curated" — admin controls, users vote as feedback signal + # "hybrid" — mandatory from admin + optional from user voting (default) + distribution_mode: "hybrid" + + # How new AI-extracted items enter the system + # "review_queue" — nothing published without admin approval (default) + # "auto_publish" — items go live immediately, admin intervenes retroactively + # "threshold" — high-confidence auto-publish, low-confidence to review queue + approval_mode: "review_queue" + + # For threshold mode: minimum AI confidence to auto-publish (0.0-1.0) + # auto_confidence_threshold: 0.8 + + # Default review period for approved/mandatory items (months) + # Items past this date appear in "Needs re-review" queue + review_period_months: 6 + + # Notify km_admins about new pending items (requires Telegram or email) + notify_on_new_items: true + +# User groups for audience targeting +groups: + finance: + label: "Finance & Analytics" + members: ["analyst1@company.com", "analyst2@company.com"] + engineering: + label: "Engineering" + members: ["dev1@company.com", "dev2@company.com"] +``` + +--- + +## What We DON'T Change + +- AI extraction logic (collector.py) — stays the same (except optional confidence field) +- Sensitivity filtering — stays the same +- Hash-based change detection — stays the same +- CLAUDE.local.md input mechanism — stays the same +- LLM connector (connectors/llm/) — just built, stays the same +- sync_data.sh mechanism — stays the same +- Timer scheduling — stays the same + +We're adding a governance layer BETWEEN extraction and distribution. +The pipes stay the same, we're adding a valve. + +--- + +## Implementation Phases + +### Phase 1: Data Model + Audit + Admin API +- Add status, approved_by, mandatory_reason, audience, review_by fields to knowledge.json +- Create audit.jsonl (append-only log) +- New approval API endpoints in webapp (approve, reject, mandate, revoke, edit) +- Batch operations API (approve/reject multiple) +- km_admin flag in users config +- Collector writes new items as "pending" (when review_queue mode) +- Migration logic: existing items get status "approved" + +### Phase 2: Admin UI — Review Queue +- Review queue page with batch operations +- Approve/reject/mandate buttons with keyboard shortcuts +- Filters: category, source user, date, confidence +- Edit before publish + "Why this matters" text field +- Audience selection (all / specific group) +- Audit log viewer + +### Phase 3: User UI Redesign +- Mandatory items section at top with explanation +- "Report issue" button on all items +- Governance-mode-aware voting visibility +- Revoked items automatically hidden + +### Phase 4: Automatic Distribution + Notifications +- Mandatory items → regenerate rules for target users +- Revoked items → remove from rules on next regeneration +- Notification to admins: new pending items, user-reported issues, expiring items +- Expired items → "Needs re-review" queue + +### Phase 5: Configuration + Groups +- distribution_mode, approval_mode in instance.yaml +- Groups definition and audience targeting +- All three governance modes tested and working +- Threshold mode with AI confidence scoring + +--- + +## Future Considerations (not in v1) + +These were raised in review but are deferred for future versions: + +- **Attestation / acknowledgment** — users confirm they read mandatory items +- **Coverage dashboard** — which users have synced, who's behind +- **LDAP/SSO group sync** — automatic group membership from corporate directory +- **Multi-reviewer approval** — 4-eyes rule for mandatory items +- **Version history** — full diff history for edited items +- **Contradiction detection** — flag when two mandatory items conflict +- **Data retention policy** — automatic purge of rejected items after N months +- **GDPR compliance** — right to deletion for personal data in extracted items diff --git a/services/corporate_memory/collector.py b/services/corporate_memory/collector.py index 635c58a..fbe26a3 100644 --- a/services/corporate_memory/collector.py +++ b/services/corporate_memory/collector.py @@ -24,6 +24,12 @@ from connectors.llm.exceptions import LLMError from .prompts import CATALOG_REFRESH_PROMPT, SENSITIVITY_CHECK_PROMPT +# Fields preserved across re-collections when item already exists +GOVERNANCE_FIELDS = ( + "status", "approved_by", "approved_at", "mandatory_reason", + "audience", "review_by", "edited_by", "edited_at", +) + # Configuration CORPORATE_MEMORY_DIR = Path(os.environ.get("CORPORATE_MEMORY_DIR", "/data/corporate-memory")) KNOWLEDGE_FILE = CORPORATE_MEMORY_DIR / "knowledge.json" @@ -245,12 +251,18 @@ def _format_user_files(user_files: dict[str, tuple[str, str]]) -> str: def _process_catalog_response( response_items: list[dict], existing: dict, + initial_status: str = "approved", ) -> dict[str, dict]: """Map HAIKU's response back to real IDs, preserving existing ones. - For items with existing_id: keep that ID, update fields. + For items with existing_id: keep that ID, update fields, preserve governance. For new items (existing_id is null): generate SHA256 ID from title+content. + Args: + response_items: Items returned by the LLM extractor. + existing: Current knowledge.json data (with "items" dict). + initial_status: Status to assign to new items ("approved" or "pending"). + Returns dict of items keyed by ID. """ existing_items = existing.get("items", {}) @@ -275,6 +287,9 @@ def _process_catalog_response( "extracted_at": old_item.get("extracted_at", now), "updated_at": now, } + # Preserve governance fields from old item + for field in GOVERNANCE_FIELDS: + result[existing_id][field] = old_item.get(field) else: # New item - generate ID from title+content content_hash = item["title"] + item["content"] @@ -294,6 +309,14 @@ def _process_catalog_response( "source_users": item["source_users"], "extracted_at": now, "updated_at": now, + "status": initial_status, + "approved_by": None, + "approved_at": None, + "mandatory_reason": None, + "audience": "all", + "review_by": None, + "edited_by": None, + "edited_at": None, } return result @@ -351,6 +374,7 @@ def collect_all(dry_run: bool = False) -> dict: "items_filtered": 0, "items_preserved": 0, "items_new": 0, + "items_pending": 0, "skipped": False, "errors": [], } @@ -385,6 +409,16 @@ def collect_all(dry_run: bool = False) -> dict: logger.error("Failed to initialize AI extractor: %s", e) return stats + # Determine initial status for new items based on approval mode + governance_config = instance_config.get("corporate_memory", {}) + approval_mode = governance_config.get("approval_mode", "review_queue") + if not governance_config: + initial_status = "approved" # Legacy mode: no governance config + elif approval_mode == "auto_publish": + initial_status = "approved" + else: + initial_status = "pending" # review_queue and threshold default to pending + # Step 3: Load existing catalog existing = _read_json(KNOWLEDGE_FILE) if not existing: @@ -422,7 +456,7 @@ def collect_all(dry_run: bool = False) -> dict: return stats # Step 6: Process response - map to existing IDs - processed_items = _process_catalog_response(response_items, existing) + processed_items = _process_catalog_response(response_items, existing, initial_status=initial_status) # Step 7: Run sensitivity check on NEW items only # Items with IDs that existed before already passed the check @@ -442,6 +476,8 @@ def collect_all(dry_run: bool = False) -> dict: else: stats["items_filtered"] += 1 + stats["items_pending"] = sum(1 for item in final_items.values() if item.get("status") == "pending") + # Step 8: Build updated knowledge.json updated = { "items": final_items, @@ -547,6 +583,8 @@ def main() -> int: print(f" Items preserved: {stats['items_preserved']}") print(f" Items new: {stats['items_new']}") print(f" Items filtered (sensitive): {stats['items_filtered']}") + if stats.get("items_pending"): + print(f" Items pending review: {stats['items_pending']}") if stats["errors"]: print(f"\nErrors ({len(stats['errors'])}):") diff --git a/tests/test_corporate_memory_governance.py b/tests/test_corporate_memory_governance.py new file mode 100644 index 0000000..92623e8 --- /dev/null +++ b/tests/test_corporate_memory_governance.py @@ -0,0 +1,1365 @@ +""" +Tests for Corporate Memory Governance Phase 1. + +Covers: +- Governance config loading and modes +- KM admin checks +- Status transition validation +- Admin actions: approve, reject, mandate, revoke, edit, batch +- Knowledge retrieval with governance filtering +- Voting restrictions under governance modes +- User rules generation (legacy, hybrid, mandatory_only) +- Audience group checks +- Migration of pre-governance items +- Audit log write/read/pagination/filtering +- Collector governance integration (initial status on new items) +""" + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +ADMIN_EMAIL = "admin@co.com" +USER_EMAIL = "user@co.com" +OUTSIDER_EMAIL = "outsider@co.com" + + +@pytest.fixture(autouse=True) +def _clear_module_caches(): + """Reset module-level caches before every test.""" + import webapp.corporate_memory_service as svc + + svc._governance_config_cache = None + svc._groups_cache = None + yield + svc._governance_config_cache = None + svc._groups_cache = None + + +def _base_instance_config( + *, + governance: dict | None = None, + users: dict | None = None, + groups: dict | None = None, +): + """Build a minimal instance config dict for mocking load_instance_config.""" + cfg = { + "instance": {"name": "test"}, + "auth": {"allowed_domain": "co.com", "webapp_secret_key": "s3cret"}, + "server": {"host": "127.0.0.1", "hostname": "test.local"}, + } + if governance is not None: + cfg["corporate_memory"] = governance + if users is not None: + cfg["users"] = users + if groups is not None: + cfg["groups"] = groups + return cfg + + +@pytest.fixture +def governance_config(): + """Standard governance block used by most tests.""" + return { + "distribution_mode": "hybrid", + "approval_mode": "review_queue", + "review_period_months": 6, + } + + +@pytest.fixture +def users_config(): + return { + ADMIN_EMAIL: {"display_name": "Admin User", "km_admin": True}, + USER_EMAIL: {"display_name": "Regular User"}, + } + + +@pytest.fixture +def groups_config(): + return { + "finance": {"label": "Finance Team", "members": [USER_EMAIL]}, + } + + +@pytest.fixture +def full_instance_config(governance_config, users_config, groups_config): + """Complete instance config with governance, users, and groups.""" + return _base_instance_config( + governance=governance_config, + users=users_config, + groups=groups_config, + ) + + +@pytest.fixture +def legacy_instance_config(users_config): + """Instance config WITHOUT corporate_memory section (legacy mode).""" + return _base_instance_config(users=users_config) + + +def _make_knowledge_data(items: dict | None = None) -> dict: + """Build a knowledge.json structure.""" + return { + "items": items or {}, + "metadata": {"last_collection": "2026-03-20T10:00:00+00:00"}, + } + + +def _make_item( + item_id: str = "km_abc123", + *, + title: str = "Test Rule", + content: str = "Do the thing correctly.", + status: str = "pending", + category: str = "data_analysis", + audience: str = "all", + **extra, +) -> dict: + """Build a single knowledge item dict.""" + item = { + "id": item_id, + "title": title, + "content": content, + "category": category, + "tags": ["test"], + "source_users": [USER_EMAIL], + "status": status, + "extracted_at": "2026-03-20T10:00:00+00:00", + "updated_at": "2026-03-20T10:00:00+00:00", + "approved_by": None, + "approved_at": None, + "mandatory_reason": None, + "audience": audience, + "review_by": None, + "edited_by": None, + "edited_at": None, + } + item.update(extra) + return item + + +# --------------------------------------------------------------------------- +# Helper: set up file-backed service state in tmp_path +# --------------------------------------------------------------------------- + + +@pytest.fixture +def service_env(tmp_path, full_instance_config): + """Patch module-level paths and config loader for the service module. + + Returns a dict with helper functions to read back the written files. + """ + knowledge_path = tmp_path / "knowledge.json" + votes_path = tmp_path / "votes.json" + audit_path = tmp_path / "audit.jsonl" + + def _setup( + knowledge: dict | None = None, + votes: dict | None = None, + instance_config: dict | None = None, + ): + if knowledge is not None: + knowledge_path.write_text(json.dumps(knowledge), encoding="utf-8") + if votes is not None: + votes_path.write_text(json.dumps(votes), encoding="utf-8") + + cfg = instance_config or full_instance_config + + patches = [ + patch("webapp.corporate_memory_service.KNOWLEDGE_FILE", knowledge_path), + patch("webapp.corporate_memory_service.VOTES_FILE", votes_path), + patch("webapp.corporate_memory_service.AUDIT_FILE", audit_path), + patch("webapp.corporate_memory_service.load_instance_config", return_value=cfg), + ] + for p in patches: + p.start() + + return patches + + def _read_knowledge(): + return json.loads(knowledge_path.read_text(encoding="utf-8")) + + def _read_votes(): + if votes_path.exists(): + return json.loads(votes_path.read_text(encoding="utf-8")) + return {} + + def _read_audit_lines(): + if not audit_path.exists(): + return [] + lines = audit_path.read_text(encoding="utf-8").strip().splitlines() + return [json.loads(line) for line in lines if line.strip()] + + ctx = { + "setup": _setup, + "read_knowledge": _read_knowledge, + "read_votes": _read_votes, + "read_audit": _read_audit_lines, + "knowledge_path": knowledge_path, + "votes_path": votes_path, + "audit_path": audit_path, + } + + yield ctx + + # Stop all patches + patch.stopall() + + +# =================================================================== +# TestGovernanceConfig +# =================================================================== + + +class TestGovernanceConfig: + """Tests for get_governance_mode / get_approval_mode helpers.""" + + def test_governance_mode_legacy(self, service_env, legacy_instance_config): + from webapp.corporate_memory_service import get_governance_mode + + service_env["setup"]( + knowledge=_make_knowledge_data(), + instance_config=legacy_instance_config, + ) + assert get_governance_mode() is None + + def test_governance_mode_hybrid(self, service_env, full_instance_config): + from webapp.corporate_memory_service import get_governance_mode + + service_env["setup"]( + knowledge=_make_knowledge_data(), + instance_config=full_instance_config, + ) + assert get_governance_mode() == "hybrid" + + def test_governance_mode_mandatory_only(self, service_env, users_config, groups_config): + from webapp.corporate_memory_service import get_governance_mode + + cfg = _base_instance_config( + governance={"distribution_mode": "mandatory_only"}, + users=users_config, + groups=groups_config, + ) + service_env["setup"](knowledge=_make_knowledge_data(), instance_config=cfg) + assert get_governance_mode() == "mandatory_only" + + def test_approval_mode_default(self, service_env, full_instance_config): + from webapp.corporate_memory_service import get_approval_mode + + service_env["setup"]( + knowledge=_make_knowledge_data(), + instance_config=full_instance_config, + ) + assert get_approval_mode() == "review_queue" + + def test_approval_mode_auto_publish(self, service_env, users_config, groups_config): + from webapp.corporate_memory_service import get_approval_mode + + cfg = _base_instance_config( + governance={ + "distribution_mode": "hybrid", + "approval_mode": "auto_publish", + }, + users=users_config, + groups=groups_config, + ) + service_env["setup"](knowledge=_make_knowledge_data(), instance_config=cfg) + assert get_approval_mode() == "auto_publish" + + +# =================================================================== +# TestKmAdmin +# =================================================================== + + +class TestKmAdmin: + """Tests for is_km_admin().""" + + def test_is_km_admin_true(self, service_env, full_instance_config): + from webapp.corporate_memory_service import is_km_admin + + service_env["setup"]( + knowledge=_make_knowledge_data(), + instance_config=full_instance_config, + ) + assert is_km_admin(ADMIN_EMAIL) is True + + def test_is_km_admin_false(self, service_env, full_instance_config): + from webapp.corporate_memory_service import is_km_admin + + service_env["setup"]( + knowledge=_make_knowledge_data(), + instance_config=full_instance_config, + ) + assert is_km_admin(USER_EMAIL) is False + + def test_is_km_admin_user_not_in_config(self, service_env, full_instance_config): + from webapp.corporate_memory_service import is_km_admin + + service_env["setup"]( + knowledge=_make_knowledge_data(), + instance_config=full_instance_config, + ) + assert is_km_admin(OUTSIDER_EMAIL) is False + + def test_is_km_admin_no_governance_config(self, service_env): + from webapp.corporate_memory_service import is_km_admin + + # Config with NO users section at all → everyone is False + cfg = _base_instance_config() + service_env["setup"]( + knowledge=_make_knowledge_data(), + instance_config=cfg, + ) + assert is_km_admin(USER_EMAIL) is False + assert is_km_admin(ADMIN_EMAIL) is False + + +# =================================================================== +# TestTransitionValidation +# =================================================================== + + +class TestTransitionValidation: + """Tests for _validate_transition().""" + + def test_pending_to_approved(self): + from webapp.corporate_memory_service import _validate_transition + + assert _validate_transition("pending", "approved") is True + + def test_pending_to_mandatory(self): + from webapp.corporate_memory_service import _validate_transition + + assert _validate_transition("pending", "mandatory") is True + + def test_pending_to_revoked(self): + from webapp.corporate_memory_service import _validate_transition + + assert _validate_transition("pending", "revoked") is False + + def test_approved_to_mandatory(self): + from webapp.corporate_memory_service import _validate_transition + + assert _validate_transition("approved", "mandatory") is True + + def test_mandatory_to_revoked(self): + from webapp.corporate_memory_service import _validate_transition + + assert _validate_transition("mandatory", "revoked") is True + + def test_rejected_to_approved(self): + from webapp.corporate_memory_service import _validate_transition + + assert _validate_transition("rejected", "approved") is True + + def test_approved_to_pending(self): + from webapp.corporate_memory_service import _validate_transition + + assert _validate_transition("approved", "pending") is False + + +# =================================================================== +# TestApproveItem +# =================================================================== + + +class TestApproveItem: + """Tests for approve_item().""" + + def test_approve_success(self, service_env): + from webapp.corporate_memory_service import approve_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = approve_item(ADMIN_EMAIL, "km_abc123") + assert ok is True + assert "approved" in msg.lower() + + data = service_env["read_knowledge"]() + approved = data["items"]["km_abc123"] + assert approved["status"] == "approved" + assert approved["approved_by"] == ADMIN_EMAIL + assert approved["approved_at"] is not None + assert approved["review_by"] is not None + assert approved["updated_at"] is not None + + def test_approve_wrong_status(self, service_env): + from webapp.corporate_memory_service import approve_item + + item = _make_item(status="revoked") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = approve_item(ADMIN_EMAIL, "km_abc123") + # revoked -> approved IS valid per VALID_TRANSITIONS + assert ok is True + + def test_approve_from_mandatory_invalid(self, service_env): + """mandatory -> approved IS valid (demoting from mandatory).""" + from webapp.corporate_memory_service import approve_item + + item = _make_item(status="mandatory") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = approve_item(ADMIN_EMAIL, "km_abc123") + assert ok is True + + def test_approve_item_not_found(self, service_env): + from webapp.corporate_memory_service import approve_item + + service_env["setup"](knowledge=_make_knowledge_data()) + + ok, msg = approve_item(ADMIN_EMAIL, "km_nonexistent") + assert ok is False + assert "not found" in msg.lower() + + def test_approve_not_admin(self, service_env): + from webapp.corporate_memory_service import approve_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = approve_item(USER_EMAIL, "km_abc123") + assert ok is False + assert "permission" in msg.lower() + + def test_approve_writes_audit_log(self, service_env): + from webapp.corporate_memory_service import approve_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + approve_item(ADMIN_EMAIL, "km_abc123") + + entries = service_env["read_audit"]() + assert len(entries) == 1 + entry = entries[0] + assert entry["admin"] == ADMIN_EMAIL + assert entry["action"] == "approved" + assert entry["item_id"] == "km_abc123" + assert entry["details"]["previous_status"] == "pending" + assert "timestamp" in entry + + +# =================================================================== +# TestRejectItem +# =================================================================== + + +class TestRejectItem: + """Tests for reject_item().""" + + def test_reject_success(self, service_env): + from webapp.corporate_memory_service import reject_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = reject_item(ADMIN_EMAIL, "km_abc123") + assert ok is True + + data = service_env["read_knowledge"]() + assert data["items"]["km_abc123"]["status"] == "rejected" + assert data["items"]["km_abc123"]["rejected_by"] == ADMIN_EMAIL + + def test_reject_with_reason(self, service_env): + from webapp.corporate_memory_service import reject_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = reject_item(ADMIN_EMAIL, "km_abc123", reason="Not relevant") + assert ok is True + + data = service_env["read_knowledge"]() + assert data["items"]["km_abc123"]["rejection_reason"] == "Not relevant" + + entries = service_env["read_audit"]() + assert entries[0]["details"]["reason"] == "Not relevant" + + +# =================================================================== +# TestMandateItem +# =================================================================== + + +class TestMandateItem: + """Tests for mandate_item().""" + + def test_mandate_success(self, service_env): + from webapp.corporate_memory_service import mandate_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + with patch("webapp.corporate_memory_service._regenerate_rules_for_audience"): + ok, msg = mandate_item( + ADMIN_EMAIL, "km_abc123", + mandatory_reason="Company policy", + audience="group:finance", + ) + + assert ok is True + + data = service_env["read_knowledge"]() + item_data = data["items"]["km_abc123"] + assert item_data["status"] == "mandatory" + assert item_data["mandatory_reason"] == "Company policy" + assert item_data["audience"] == "group:finance" + assert item_data["approved_by"] == ADMIN_EMAIL + assert item_data["review_by"] is not None + + def test_mandate_missing_reason(self, service_env): + from webapp.corporate_memory_service import mandate_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = mandate_item(ADMIN_EMAIL, "km_abc123", mandatory_reason="") + assert ok is False + assert "mandatory_reason" in msg.lower() + + def test_mandate_invalid_audience(self, service_env): + from webapp.corporate_memory_service import mandate_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = mandate_item( + ADMIN_EMAIL, "km_abc123", + mandatory_reason="Reason", + audience="invalid_format", + ) + assert ok is False + assert "invalid audience" in msg.lower() + + def test_mandate_triggers_rule_regeneration(self, service_env): + from webapp.corporate_memory_service import mandate_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + with patch( + "webapp.corporate_memory_service._regenerate_rules_for_audience" + ) as mock_regen: + mandate_item( + ADMIN_EMAIL, "km_abc123", + mandatory_reason="Policy", + audience="all", + ) + mock_regen.assert_called_once_with("all") + + +# =================================================================== +# TestRevokeItem +# =================================================================== + + +class TestRevokeItem: + """Tests for revoke_item().""" + + def test_revoke_success(self, service_env): + from webapp.corporate_memory_service import revoke_item + + item = _make_item(status="mandatory") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + with patch("webapp.corporate_memory_service.regenerate_all_user_rules"): + ok, msg = revoke_item(ADMIN_EMAIL, "km_abc123") + + assert ok is True + + data = service_env["read_knowledge"]() + assert data["items"]["km_abc123"]["status"] == "revoked" + assert data["items"]["km_abc123"]["revoked_by"] == ADMIN_EMAIL + + def test_revoke_not_mandatory(self, service_env): + from webapp.corporate_memory_service import revoke_item + + item = _make_item(status="approved") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = revoke_item(ADMIN_EMAIL, "km_abc123") + assert ok is False + assert "cannot transition" in msg.lower() + + def test_revoke_triggers_rule_regeneration(self, service_env): + from webapp.corporate_memory_service import revoke_item + + item = _make_item(status="mandatory") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + with patch( + "webapp.corporate_memory_service.regenerate_all_user_rules" + ) as mock_regen: + revoke_item(ADMIN_EMAIL, "km_abc123") + mock_regen.assert_called_once() + + +# =================================================================== +# TestEditItem +# =================================================================== + + +class TestEditItem: + """Tests for edit_item().""" + + def test_edit_title_only(self, service_env): + from webapp.corporate_memory_service import edit_item + + item = _make_item(status="approved") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = edit_item(ADMIN_EMAIL, "km_abc123", title="New Title") + assert ok is True + + data = service_env["read_knowledge"]() + assert data["items"]["km_abc123"]["title"] == "New Title" + assert data["items"]["km_abc123"]["content"] == "Do the thing correctly." + + def test_edit_content_only(self, service_env): + from webapp.corporate_memory_service import edit_item + + item = _make_item(status="approved") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = edit_item(ADMIN_EMAIL, "km_abc123", content="Updated content.") + assert ok is True + + data = service_env["read_knowledge"]() + assert data["items"]["km_abc123"]["content"] == "Updated content." + assert data["items"]["km_abc123"]["title"] == "Test Rule" + + def test_edit_both(self, service_env): + from webapp.corporate_memory_service import edit_item + + item = _make_item(status="approved") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = edit_item( + ADMIN_EMAIL, "km_abc123", + title="New Title", + content="New content.", + ) + assert ok is True + + data = service_env["read_knowledge"]() + assert data["items"]["km_abc123"]["title"] == "New Title" + assert data["items"]["km_abc123"]["content"] == "New content." + assert data["items"]["km_abc123"]["edited_by"] == ADMIN_EMAIL + assert data["items"]["km_abc123"]["edited_at"] is not None + + def test_edit_nothing_provided(self, service_env): + from webapp.corporate_memory_service import edit_item + + item = _make_item(status="approved") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + ok, msg = edit_item(ADMIN_EMAIL, "km_abc123") + assert ok is False + assert "at least one" in msg.lower() + + def test_edit_writes_audit_with_old_new_values(self, service_env): + from webapp.corporate_memory_service import edit_item + + item = _make_item(status="approved", title="Old Title", content="Old content") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + edit_item( + ADMIN_EMAIL, "km_abc123", + title="New Title", + content="New content", + ) + + entries = service_env["read_audit"]() + assert len(entries) == 1 + details = entries[0]["details"] + assert details["old_title"] == "Old Title" + assert details["new_title"] == "New Title" + assert details["old_content"] == "Old content" + assert details["new_content"] == "New content" + + +# =================================================================== +# TestBatchAction +# =================================================================== + + +class TestBatchAction: + """Tests for batch_action().""" + + def test_batch_approve_all_success(self, service_env): + from webapp.corporate_memory_service import batch_action + + items = { + "km_001": _make_item("km_001", status="pending"), + "km_002": _make_item("km_002", status="pending"), + "km_003": _make_item("km_003", status="pending"), + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + result = batch_action(ADMIN_EMAIL, ["km_001", "km_002", "km_003"], "approve") + assert set(result["success"]) == {"km_001", "km_002", "km_003"} + assert result["failed"] == [] + + def test_batch_partial_failure(self, service_env): + from webapp.corporate_memory_service import batch_action + + items = { + "km_001": _make_item("km_001", status="pending"), + # km_002 does not exist + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + result = batch_action(ADMIN_EMAIL, ["km_001", "km_002"], "approve") + assert result["success"] == ["km_001"] + assert len(result["failed"]) == 1 + assert result["failed"][0]["id"] == "km_002" + + def test_batch_invalid_action(self, service_env): + from webapp.corporate_memory_service import batch_action + + service_env["setup"](knowledge=_make_knowledge_data()) + + result = batch_action(ADMIN_EMAIL, ["km_001"], "delete") + assert result["success"] == [] + assert len(result["failed"]) == 1 + assert "invalid action" in result["failed"][0]["error"].lower() + + +# =================================================================== +# TestGetKnowledge +# =================================================================== + + +class TestGetKnowledge: + """Tests for get_knowledge() with governance filtering.""" + + def test_legacy_mode_no_filtering(self, service_env, legacy_instance_config, users_config): + from webapp.corporate_memory_service import get_knowledge + + items = { + "km_001": _make_item("km_001", status="pending"), + "km_002": _make_item("km_002", status="approved"), + } + # Legacy config has users but NO corporate_memory section + cfg = _base_instance_config(users=users_config) + service_env["setup"](knowledge=_make_knowledge_data(items), instance_config=cfg) + + result = get_knowledge() + # Legacy mode: no status filtering, both items visible + assert result["total"] == 2 + + def test_governance_mode_filters_pending(self, service_env): + from webapp.corporate_memory_service import get_knowledge + + items = { + "km_001": _make_item("km_001", status="pending"), + "km_002": _make_item("km_002", status="approved"), + "km_003": _make_item("km_003", status="mandatory"), + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + result = get_knowledge() + # Default governance filtering: approved + mandatory only + assert result["total"] == 2 + ids = {i["id"] for i in result["items"]} + assert "km_001" not in ids + assert "km_002" in ids + assert "km_003" in ids + + def test_admin_can_see_pending(self, service_env): + from webapp.corporate_memory_service import get_knowledge + + items = { + "km_001": _make_item("km_001", status="pending"), + "km_002": _make_item("km_002", status="approved"), + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + result = get_knowledge(include_statuses={"pending", "approved", "mandatory"}) + assert result["total"] == 2 + + def test_mandatory_items_have_is_mandatory_flag(self, service_env): + from webapp.corporate_memory_service import get_knowledge + + items = { + "km_001": _make_item("km_001", status="mandatory", mandatory_reason="Policy"), + "km_002": _make_item("km_002", status="approved"), + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + result = get_knowledge() + for item in result["items"]: + if item["id"] == "km_001": + assert item["is_mandatory"] is True + assert item["mandatory_reason"] == "Policy" + else: + assert item["is_mandatory"] is False + + +# =================================================================== +# TestVoteGovernance +# =================================================================== + + +class TestVoteGovernance: + """Tests for vote() under different governance modes.""" + + def test_vote_disabled_mandatory_only(self, service_env, users_config, groups_config): + from webapp.corporate_memory_service import vote + + cfg = _base_instance_config( + governance={"distribution_mode": "mandatory_only"}, + users=users_config, + groups=groups_config, + ) + items = {"km_001": _make_item("km_001", status="approved")} + service_env["setup"](knowledge=_make_knowledge_data(items), instance_config=cfg) + + ok, msg = vote(USER_EMAIL, "km_001", 1) + assert ok is False + assert "disabled" in msg.lower() + + def test_vote_allowed_hybrid(self, service_env): + from webapp.corporate_memory_service import vote + + items = {"km_001": _make_item("km_001", status="approved")} + service_env["setup"](knowledge=_make_knowledge_data(items), votes={}) + + with patch("webapp.corporate_memory_service._regenerate_user_rules"): + ok, msg = vote(USER_EMAIL, "km_001", 1) + assert ok is True + + def test_vote_allowed_legacy(self, service_env, users_config): + from webapp.corporate_memory_service import vote + + cfg = _base_instance_config(users=users_config) + items = {"km_001": _make_item("km_001", status="approved")} + service_env["setup"]( + knowledge=_make_knowledge_data(items), + votes={}, + instance_config=cfg, + ) + + with patch("webapp.corporate_memory_service._regenerate_user_rules"): + ok, msg = vote(USER_EMAIL, "km_001", 1) + assert ok is True + + +# =================================================================== +# TestGetUserRules +# =================================================================== + + +class TestGetUserRules: + """Tests for get_user_rules() across governance modes.""" + + def test_legacy_mode_upvoted_only(self, service_env, users_config): + from webapp.corporate_memory_service import get_user_rules + + cfg = _base_instance_config(users=users_config) + items = { + "km_001": _make_item("km_001", status="approved"), + "km_002": _make_item("km_002", status="approved"), + } + votes = {USER_EMAIL: {"km_001": 1}} + service_env["setup"]( + knowledge=_make_knowledge_data(items), + votes=votes, + instance_config=cfg, + ) + + rules = get_user_rules(USER_EMAIL) + assert len(rules) == 1 + assert rules[0]["id"] == "km_001" + + def test_mandatory_included_for_all_users(self, service_env): + from webapp.corporate_memory_service import get_user_rules + + items = { + "km_001": _make_item("km_001", status="mandatory", audience="all"), + } + service_env["setup"](knowledge=_make_knowledge_data(items), votes={}) + + rules = get_user_rules(USER_EMAIL) + assert len(rules) == 1 + assert rules[0]["id"] == "km_001" + + def test_hybrid_mandatory_plus_upvoted(self, service_env): + from webapp.corporate_memory_service import get_user_rules + + items = { + "km_mand": _make_item("km_mand", status="mandatory", audience="all"), + "km_appr": _make_item("km_appr", status="approved"), + "km_pend": _make_item("km_pend", status="pending"), + } + votes = {USER_EMAIL: {"km_appr": 1, "km_pend": 1}} + service_env["setup"](knowledge=_make_knowledge_data(items), votes=votes) + + rules = get_user_rules(USER_EMAIL) + rule_ids = {r["id"] for r in rules} + # mandatory + approved upvoted; pending upvoted NOT included + assert "km_mand" in rule_ids + assert "km_appr" in rule_ids + assert "km_pend" not in rule_ids + + def test_mandatory_only_no_upvoted(self, service_env, users_config, groups_config): + from webapp.corporate_memory_service import get_user_rules + + cfg = _base_instance_config( + governance={"distribution_mode": "mandatory_only"}, + users=users_config, + groups=groups_config, + ) + items = { + "km_mand": _make_item("km_mand", status="mandatory", audience="all"), + "km_appr": _make_item("km_appr", status="approved"), + } + votes = {USER_EMAIL: {"km_appr": 1}} + service_env["setup"]( + knowledge=_make_knowledge_data(items), + votes=votes, + instance_config=cfg, + ) + + rules = get_user_rules(USER_EMAIL) + assert len(rules) == 1 + assert rules[0]["id"] == "km_mand" + + def test_audience_group_filtering(self, service_env): + from webapp.corporate_memory_service import get_user_rules + + items = { + "km_fin": _make_item("km_fin", status="mandatory", audience="group:finance"), + "km_all": _make_item("km_all", status="mandatory", audience="all"), + } + service_env["setup"](knowledge=_make_knowledge_data(items), votes={}) + + # USER_EMAIL is a finance group member + rules_user = get_user_rules(USER_EMAIL) + rule_ids_user = {r["id"] for r in rules_user} + assert "km_fin" in rule_ids_user + assert "km_all" in rule_ids_user + + # OUTSIDER_EMAIL is NOT a member + rules_outsider = get_user_rules(OUTSIDER_EMAIL) + rule_ids_outsider = {r["id"] for r in rules_outsider} + assert "km_fin" not in rule_ids_outsider + assert "km_all" in rule_ids_outsider + + +# =================================================================== +# TestCheckAudience +# =================================================================== + + +class TestCheckAudience: + """Tests for _check_audience().""" + + def test_audience_all(self, service_env): + from webapp.corporate_memory_service import _check_audience + + service_env["setup"](knowledge=_make_knowledge_data()) + + assert _check_audience({"audience": "all"}, USER_EMAIL) is True + + def test_audience_none(self, service_env): + from webapp.corporate_memory_service import _check_audience + + service_env["setup"](knowledge=_make_knowledge_data()) + + assert _check_audience({}, USER_EMAIL) is True + assert _check_audience({"audience": None}, USER_EMAIL) is True + + def test_audience_group_member(self, service_env): + from webapp.corporate_memory_service import _check_audience + + service_env["setup"](knowledge=_make_knowledge_data()) + + assert _check_audience({"audience": "group:finance"}, USER_EMAIL) is True + + def test_audience_group_not_member(self, service_env): + from webapp.corporate_memory_service import _check_audience + + service_env["setup"](knowledge=_make_knowledge_data()) + + assert _check_audience({"audience": "group:finance"}, OUTSIDER_EMAIL) is False + + def test_audience_group_not_found(self, service_env): + from webapp.corporate_memory_service import _check_audience + + service_env["setup"](knowledge=_make_knowledge_data()) + + assert _check_audience({"audience": "group:nonexistent"}, USER_EMAIL) is False + + +# =================================================================== +# TestMigration +# =================================================================== + + +class TestMigration: + """Tests for migrate_existing_items().""" + + def test_migrate_adds_status_to_items(self, service_env): + from webapp.corporate_memory_service import migrate_existing_items + + # Items without a "status" field (pre-governance) + items = { + "km_old1": { + "id": "km_old1", + "title": "Old Rule 1", + "content": "Content 1", + "category": "workflow", + "tags": [], + "source_users": [USER_EMAIL], + "extracted_at": "2026-03-01T00:00:00+00:00", + "updated_at": "2026-03-01T00:00:00+00:00", + }, + "km_old2": { + "id": "km_old2", + "title": "Old Rule 2", + "content": "Content 2", + "category": "workflow", + "tags": [], + "source_users": [USER_EMAIL], + "extracted_at": "2026-03-01T00:00:00+00:00", + "updated_at": "2026-03-01T00:00:00+00:00", + }, + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + count = migrate_existing_items() + assert count == 2 + + data = service_env["read_knowledge"]() + for item_id in ["km_old1", "km_old2"]: + assert data["items"][item_id]["status"] == "approved" + assert data["items"][item_id]["approved_by"] == "migration" + assert data["items"][item_id]["approved_at"] is not None + assert data["items"][item_id]["review_by"] is not None + + def test_migrate_idempotent(self, service_env): + from webapp.corporate_memory_service import migrate_existing_items + + items = { + "km_old": { + "id": "km_old", + "title": "Old Rule", + "content": "Content", + "category": "workflow", + "tags": [], + "source_users": [USER_EMAIL], + "extracted_at": "2026-03-01T00:00:00+00:00", + "updated_at": "2026-03-01T00:00:00+00:00", + }, + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + count1 = migrate_existing_items() + assert count1 == 1 + + # Second run: items already have status → 0 migrated + count2 = migrate_existing_items() + assert count2 == 0 + + def test_migrate_writes_audit_entries(self, service_env): + from webapp.corporate_memory_service import migrate_existing_items + + items = { + "km_old": { + "id": "km_old", + "title": "Old Rule", + "content": "Content", + "category": "workflow", + "tags": [], + "source_users": [USER_EMAIL], + "extracted_at": "2026-03-01T00:00:00+00:00", + "updated_at": "2026-03-01T00:00:00+00:00", + }, + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + migrate_existing_items() + + entries = service_env["read_audit"]() + assert len(entries) == 1 + assert entries[0]["admin"] == "migration" + assert entries[0]["action"] == "migration_auto_approved" + assert entries[0]["item_id"] == "km_old" + + def test_migrate_preserves_existing_status(self, service_env): + from webapp.corporate_memory_service import migrate_existing_items + + items = { + "km_existing": _make_item("km_existing", status="mandatory"), + "km_new": { + "id": "km_new", + "title": "No Status", + "content": "Content", + "category": "workflow", + "tags": [], + "source_users": [USER_EMAIL], + "extracted_at": "2026-03-01T00:00:00+00:00", + "updated_at": "2026-03-01T00:00:00+00:00", + }, + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + count = migrate_existing_items() + assert count == 1 # only km_new was migrated + + data = service_env["read_knowledge"]() + assert data["items"]["km_existing"]["status"] == "mandatory" + assert data["items"]["km_new"]["status"] == "approved" + + +# =================================================================== +# TestAuditLog +# =================================================================== + + +class TestAuditLog: + """Tests for audit log writing and reading.""" + + def test_audit_log_written_on_approve(self, service_env): + from webapp.corporate_memory_service import approve_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + approve_item(ADMIN_EMAIL, "km_abc123") + + entries = service_env["read_audit"]() + assert len(entries) == 1 + assert entries[0]["action"] == "approved" + + def test_audit_log_format(self, service_env): + from webapp.corporate_memory_service import approve_item + + item = _make_item(status="pending") + service_env["setup"](knowledge=_make_knowledge_data({"km_abc123": item})) + + approve_item(ADMIN_EMAIL, "km_abc123") + + entries = service_env["read_audit"]() + entry = entries[0] + assert isinstance(entry["timestamp"], str) + assert isinstance(entry["admin"], str) + assert isinstance(entry["action"], str) + assert isinstance(entry["item_id"], str) + assert isinstance(entry["details"], dict) + # Verify timestamp is ISO format (parseable) + datetime.fromisoformat(entry["timestamp"]) + + def test_get_audit_log_paginated(self, service_env): + from webapp.corporate_memory_service import approve_item, get_audit_log, reject_item + + items = { + "km_001": _make_item("km_001", status="pending"), + "km_002": _make_item("km_002", status="pending"), + "km_003": _make_item("km_003", status="pending"), + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + approve_item(ADMIN_EMAIL, "km_001") + approve_item(ADMIN_EMAIL, "km_002") + approve_item(ADMIN_EMAIL, "km_003") + + # Page 0, 2 per page + result = get_audit_log(page=0, per_page=2) + assert len(result["entries"]) == 2 + assert result["total"] == 3 + assert result["page"] == 0 + + # Page 1 + result = get_audit_log(page=1, per_page=2) + assert len(result["entries"]) == 1 + + def test_get_audit_log_filtered_by_action(self, service_env): + from webapp.corporate_memory_service import approve_item, get_audit_log, reject_item + + items = { + "km_001": _make_item("km_001", status="pending"), + "km_002": _make_item("km_002", status="pending"), + } + service_env["setup"](knowledge=_make_knowledge_data(items)) + + approve_item(ADMIN_EMAIL, "km_001") + reject_item(ADMIN_EMAIL, "km_002") + + result = get_audit_log(action="approved") + assert result["total"] == 1 + assert result["entries"][0]["action"] == "approved" + + result = get_audit_log(action="rejected") + assert result["total"] == 1 + assert result["entries"][0]["action"] == "rejected" + + +# =================================================================== +# TestCollectorGovernance +# =================================================================== + + +class TestCollectorGovernance: + """Tests for governance fields in the collector's _process_catalog_response.""" + + def test_new_items_get_pending_status(self): + from services.corporate_memory.collector import _process_catalog_response + + response_items = [ + { + "existing_id": None, + "title": "New Rule", + "content": "Do something", + "category": "workflow", + "tags": ["test"], + "source_users": [USER_EMAIL], + }, + ] + existing = {"items": {}} + + result = _process_catalog_response( + response_items, existing, initial_status="pending", + ) + + assert len(result) == 1 + item = list(result.values())[0] + assert item["status"] == "pending" + assert item["approved_by"] is None + assert item["audience"] == "all" + + def test_new_items_get_approved_status(self): + from services.corporate_memory.collector import _process_catalog_response + + response_items = [ + { + "existing_id": None, + "title": "New Rule", + "content": "Do something", + "category": "workflow", + "tags": ["test"], + "source_users": [USER_EMAIL], + }, + ] + existing = {"items": {}} + + result = _process_catalog_response( + response_items, existing, initial_status="approved", + ) + + item = list(result.values())[0] + assert item["status"] == "approved" + + def test_existing_items_preserve_governance_fields(self): + from services.corporate_memory.collector import _process_catalog_response + + existing = { + "items": { + "km_exist": { + "id": "km_exist", + "title": "Old Title", + "content": "Old content", + "category": "workflow", + "tags": ["old"], + "source_users": [USER_EMAIL], + "extracted_at": "2026-03-01T00:00:00+00:00", + "updated_at": "2026-03-01T00:00:00+00:00", + "status": "mandatory", + "approved_by": ADMIN_EMAIL, + "approved_at": "2026-03-10T00:00:00+00:00", + "mandatory_reason": "Company policy", + "audience": "group:finance", + "review_by": "2026-09-10T00:00:00+00:00", + "edited_by": None, + "edited_at": None, + }, + }, + } + + response_items = [ + { + "existing_id": "km_exist", + "title": "Updated Title", + "content": "Updated content", + "category": "workflow", + "tags": ["new"], + "source_users": [USER_EMAIL, ADMIN_EMAIL], + }, + ] + + result = _process_catalog_response( + response_items, existing, initial_status="pending", + ) + + item = result["km_exist"] + # Title/content updated by LLM + assert item["title"] == "Updated Title" + assert item["content"] == "Updated content" + # Governance fields preserved from existing item + assert item["status"] == "mandatory" + assert item["approved_by"] == ADMIN_EMAIL + assert item["mandatory_reason"] == "Company policy" + assert item["audience"] == "group:finance" + + def test_no_governance_config_legacy_behavior(self): + """Without governance config, initial_status is 'approved' (legacy).""" + from services.corporate_memory.collector import _process_catalog_response + + response_items = [ + { + "existing_id": None, + "title": "A Rule", + "content": "Content", + "category": "data_analysis", + "tags": [], + "source_users": [USER_EMAIL], + }, + ] + existing = {"items": {}} + + # Legacy mode: collector passes initial_status="approved" + result = _process_catalog_response( + response_items, existing, initial_status="approved", + ) + + item = list(result.values())[0] + assert item["status"] == "approved" + + def test_items_pending_stat_counted(self): + """Verify that collect_all counts pending items in stats.""" + from services.corporate_memory.collector import _process_catalog_response + + response_items = [ + { + "existing_id": None, + "title": f"Rule {i}", + "content": f"Content {i}", + "category": "workflow", + "tags": [], + "source_users": [USER_EMAIL], + } + for i in range(3) + ] + existing = {"items": {}} + + result = _process_catalog_response( + response_items, existing, initial_status="pending", + ) + + pending_count = sum( + 1 for item in result.values() if item.get("status") == "pending" + ) + assert pending_count == 3 diff --git a/webapp/app.py b/webapp/app.py index 2a9ba80..f141f67 100644 --- a/webapp/app.py +++ b/webapp/app.py @@ -17,7 +17,7 @@ import yaml from flask import Flask, flash, jsonify, redirect, render_template, request, session, url_for -from .auth import admin_required, auth_bp, login_required +from .auth import admin_required, auth_bp, km_admin_required, login_required from .config import Config from .desktop_auth import require_desktop_auth from .notification_images import images_bp @@ -38,6 +38,17 @@ from .corporate_memory_service import ( get_user_stats as get_memory_user_stats, get_user_votes, vote as memory_vote, + is_km_admin, + get_governance_mode, + approve_item, + reject_item, + mandate_item, + revoke_item, + edit_item, + batch_action, + get_pending_queue, + get_audit_log, + migrate_existing_items, ) from .user_service import ( UserInfo, @@ -1381,12 +1392,19 @@ def register_routes(app: Flask) -> None: # Get initial page of knowledge knowledge = get_knowledge(page=0, per_page=20) + # Governance context for admin features + governance = { + "mode": get_governance_mode(), + "is_km_admin": is_km_admin(email) if email else False, + } + return render_template( "corporate_memory.html", stats=stats, user_stats=user_stats, user_votes=user_votes, knowledge=knowledge, + governance=governance, ) # ───────────────────────────────────────────────────────────────── @@ -1419,6 +1437,12 @@ def register_routes(app: Flask) -> None: # Limit per_page to reasonable maximum per_page = min(per_page, 100) + # Admin status filter (only km_admins can filter by status) + status = request.args.get("status") + include_statuses = None + if status and is_km_admin(email): + include_statuses = {status} + result = get_knowledge( category=category, search=search, @@ -1427,6 +1451,7 @@ def register_routes(app: Flask) -> None: sort=sort, username=username, my_rules=my_rules, + include_statuses=include_statuses, ) return jsonify(result) @@ -1450,6 +1475,10 @@ def register_routes(app: Flask) -> None: @login_required def api_corporate_memory_vote(): """Vote on a knowledge item.""" + mode = get_governance_mode() + if mode == "mandatory_only": + return jsonify({"ok": False, "error": "Voting is disabled in mandatory-only mode"}), 400 + user = session.get("user", {}) email = user.get("email", "") username = get_webapp_username(email) @@ -1482,6 +1511,226 @@ def register_routes(app: Flask) -> None: votes = get_user_votes(username) return jsonify({"votes": votes}) + # ───────────────────────────────────────────────────────────────── + # Corporate Memory Admin API + # ───────────────────────────────────────────────────────────────── + + @app.route("/api/corporate-memory/admin/approve", methods=["POST"]) + @login_required + @km_admin_required + def corporate_memory_admin_approve(): + """Approve a pending knowledge item.""" + data = request.get_json(silent=True) or {} + if "item_id" not in data: + return jsonify({"ok": False, "error": "item_id is required"}), 400 + + email = session.get("user", {}).get("email", "") + try: + success, message = approve_item(email, data["item_id"]) + if not success: + return jsonify({"ok": False, "error": message}), 400 + return jsonify({"ok": True, "message": message}) + except Exception as e: + logger.exception("Error approving item") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/reject", methods=["POST"]) + @login_required + @km_admin_required + def corporate_memory_admin_reject(): + """Reject a knowledge item.""" + data = request.get_json(silent=True) or {} + if "item_id" not in data: + return jsonify({"ok": False, "error": "item_id is required"}), 400 + + email = session.get("user", {}).get("email", "") + try: + success, message = reject_item( + email, data["item_id"], reason=data.get("reason"), + ) + if not success: + return jsonify({"ok": False, "error": message}), 400 + return jsonify({"ok": True, "message": message}) + except Exception as e: + logger.exception("Error rejecting item") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/mandate", methods=["POST"]) + @login_required + @km_admin_required + def corporate_memory_admin_mandate(): + """Mark a knowledge item as mandatory.""" + data = request.get_json(silent=True) or {} + if "item_id" not in data: + return jsonify({"ok": False, "error": "item_id is required"}), 400 + + mandatory_reason = data.get("mandatory_reason", "") + if not mandatory_reason or not mandatory_reason.strip(): + return jsonify({"ok": False, "error": "mandatory_reason is required"}), 400 + + email = session.get("user", {}).get("email", "") + try: + success, message = mandate_item( + email, + data["item_id"], + mandatory_reason=mandatory_reason, + audience=data.get("audience", "all"), + ) + if not success: + return jsonify({"ok": False, "error": message}), 400 + return jsonify({"ok": True, "message": message}) + except Exception as e: + logger.exception("Error mandating item") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/revoke", methods=["POST"]) + @login_required + @km_admin_required + def corporate_memory_admin_revoke(): + """Revoke a mandatory knowledge item.""" + data = request.get_json(silent=True) or {} + if "item_id" not in data: + return jsonify({"ok": False, "error": "item_id is required"}), 400 + + email = session.get("user", {}).get("email", "") + try: + success, message = revoke_item( + email, data["item_id"], reason=data.get("reason"), + ) + if not success: + return jsonify({"ok": False, "error": message}), 400 + return jsonify({"ok": True, "message": message}) + except Exception as e: + logger.exception("Error revoking item") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/edit", methods=["POST"]) + @login_required + @km_admin_required + def corporate_memory_admin_edit(): + """Edit a knowledge item's title and/or content.""" + data = request.get_json(silent=True) or {} + if "item_id" not in data: + return jsonify({"ok": False, "error": "item_id is required"}), 400 + + title = data.get("title") + content = data.get("content") + if title is None and content is None: + return jsonify({"ok": False, "error": "At least one of title or content must be provided"}), 400 + + email = session.get("user", {}).get("email", "") + try: + success, message = edit_item( + email, data["item_id"], title=title, content=content, + ) + if not success: + return jsonify({"ok": False, "error": message}), 400 + return jsonify({"ok": True, "message": message}) + except Exception as e: + logger.exception("Error editing item") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/batch", methods=["POST"]) + @login_required + @km_admin_required + def corporate_memory_admin_batch(): + """Perform a governance action on multiple items.""" + data = request.get_json(silent=True) or {} + item_ids = data.get("item_ids") + action = data.get("action") + + if not item_ids or not isinstance(item_ids, list): + return jsonify({"ok": False, "error": "item_ids must be a non-empty list"}), 400 + if not action: + return jsonify({"ok": False, "error": "action is required"}), 400 + + email = session.get("user", {}).get("email", "") + try: + result = batch_action( + email, + item_ids, + action, + mandatory_reason=data.get("mandatory_reason"), + audience=data.get("audience"), + reason=data.get("reason"), + ) + return jsonify({"ok": True, **result}) + except Exception as e: + logger.exception("Error in batch action") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/pending") + @login_required + @km_admin_required + def corporate_memory_admin_pending(): + """Get pending knowledge items awaiting review.""" + category = request.args.get("category") + page = request.args.get("page", 0, type=int) + per_page = request.args.get("per_page", 20, type=int) + + per_page = min(per_page, 100) + + try: + result = get_pending_queue( + category=category, page=page, per_page=per_page, + ) + return jsonify(result) + except Exception as e: + logger.exception("Error fetching pending queue") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/audit") + @login_required + @km_admin_required + def corporate_memory_admin_audit(): + """Get the governance audit log.""" + page = request.args.get("page", 0, type=int) + per_page = request.args.get("per_page", 50, type=int) + admin_filter = request.args.get("admin") + action_filter = request.args.get("action") + + per_page = min(per_page, 100) + + try: + result = get_audit_log( + page=page, + per_page=per_page, + admin=admin_filter, + action=action_filter, + ) + return jsonify(result) + except Exception as e: + logger.exception("Error fetching audit log") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/migrate", methods=["POST"]) + @login_required + @km_admin_required + def corporate_memory_admin_migrate(): + """Migrate existing items without status to approved.""" + email = session.get("user", {}).get("email", "") + try: + count = migrate_existing_items() + logger.info(f"Migration triggered by {email}: {count} items migrated") + return jsonify({"ok": True, "migrated": count}) + except Exception as e: + logger.exception("Error migrating items") + return jsonify({"ok": False, "error": str(e)}), 500 + + @app.route("/api/corporate-memory/admin/config") + @login_required + @km_admin_required + def corporate_memory_admin_config(): + """Get current governance configuration.""" + try: + return jsonify({ + "ok": True, + "governance_mode": get_governance_mode(), + }) + except Exception as e: + logger.exception("Error fetching governance config") + return jsonify({"ok": False, "error": str(e)}), 500 + # ───────────────────────────────────────────────────────────────── # Admin pages # ───────────────────────────────────────────────────────────────── diff --git a/webapp/auth.py b/webapp/auth.py index 77583dc..12eceeb 100644 --- a/webapp/auth.py +++ b/webapp/auth.py @@ -65,6 +65,34 @@ def admin_required(f): return decorated_function +def km_admin_required(f): + """Decorator to require Corporate Memory admin privileges for a route. + + Checks km_admin flag via corporate_memory_service.is_km_admin(). + Returns 403 JSON for API routes, redirect for HTML routes. + """ + + @functools.wraps(f) + def decorated_function(*args, **kwargs): + if "user" not in session: + if request.path.startswith("/api/"): + return jsonify({"error": "Authentication required"}), 401 + return redirect(url_for("auth.login")) + + from .corporate_memory_service import is_km_admin + + email = session.get("user", {}).get("email", "") + if not is_km_admin(email): + if request.path.startswith("/api/"): + return jsonify({"error": "Corporate Memory admin access required"}), 403 + flash("Corporate Memory admin access required.", "error") + return redirect(url_for("dashboard")) + + return f(*args, **kwargs) + + return decorated_function + + def validate_email_domain(email: str) -> bool: """Check if email belongs to an allowed domain or whitelist. diff --git a/webapp/corporate_memory_service.py b/webapp/corporate_memory_service.py index 7dab1af..eaa042e 100644 --- a/webapp/corporate_memory_service.py +++ b/webapp/corporate_memory_service.py @@ -1,7 +1,7 @@ """ Corporate Memory service for the webapp. -Manages knowledge items, voting, and user rules generation. +Manages knowledge items, voting, user rules generation, and governance. Follows patterns from telegram_service.py for JSON I/O. """ @@ -11,19 +11,36 @@ import os import shutil import subprocess import tempfile +from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any +from config.loader import load_instance_config, get_instance_value + logger = logging.getLogger(__name__) CORPORATE_MEMORY_DIR = Path(os.environ.get("CORPORATE_MEMORY_DIR", "/data/corporate-memory")) KNOWLEDGE_FILE = CORPORATE_MEMORY_DIR / "knowledge.json" VOTES_FILE = CORPORATE_MEMORY_DIR / "votes.json" +AUDIT_FILE = CORPORATE_MEMORY_DIR / "audit.jsonl" + +VALID_STATUSES = frozenset({ + "pending", "approved", "mandatory", "rejected", "revoked", "expired", +}) + +VALID_TRANSITIONS = { + "pending": {"approved", "mandatory", "rejected"}, + "approved": {"mandatory", "rejected"}, + "mandatory": {"approved", "revoked"}, + "rejected": {"approved"}, + "revoked": {"approved", "mandatory"}, + "expired": {"approved", "mandatory", "rejected"}, +} + def _load_user_mappings(): """Load user display names and username mappings from instance config.""" try: - from config.loader import load_instance_config, get_instance_value config = load_instance_config() users = get_instance_value(config, "users", default={}) mapping = get_instance_value(config, "username_mapping", default={}) @@ -36,6 +53,139 @@ _USER_CONFIG = _load_user_mappings() USER_DISPLAY_NAMES = _USER_CONFIG[0] WEBAPP_TO_SERVER_USERNAME = _USER_CONFIG[1] +# Module-level caches for governance config and groups +_governance_config_cache: dict | None = None +_groups_cache: dict | None = None + + +def _load_governance_config() -> dict: + """Load corporate_memory section from instance config, cached at module level. + + Returns empty dict if not configured (legacy mode). + """ + global _governance_config_cache + if _governance_config_cache is not None: + return _governance_config_cache + + try: + config = load_instance_config() + _governance_config_cache = get_instance_value( + config, "corporate_memory", default={}, + ) or {} + except Exception: + _governance_config_cache = {} + + return _governance_config_cache + + +def _load_groups() -> dict: + """Load groups section from instance config, cached at module level. + + Returns empty dict if not present. + """ + global _groups_cache + if _groups_cache is not None: + return _groups_cache + + try: + config = load_instance_config() + _groups_cache = get_instance_value(config, "groups", default={}) or {} + except Exception: + _groups_cache = {} + + return _groups_cache + + +def get_governance_mode() -> str | None: + """Return the governance distribution mode, or None if legacy (no config).""" + gov = _load_governance_config() + if not gov: + return None + return gov.get("distribution_mode", "hybrid") + + +def get_approval_mode() -> str | None: + """Return the approval mode, or None if legacy (no config).""" + gov = _load_governance_config() + if not gov: + return None + return gov.get("approval_mode", "review_queue") + + +def is_km_admin(email: str) -> bool: + """Check if the given email has km_admin privileges. + + Looks up the email in the users dict from instance.yaml. + Returns False if no governance config or user not found. + """ + try: + config = load_instance_config() + users = get_instance_value(config, "users", default={}) or {} + user = users.get(email) + if not user or not isinstance(user, dict): + return False + return bool(user.get("km_admin", False)) + except Exception: + return False + + +def _write_audit_log(admin: str, action: str, item_id: str, details: dict) -> None: + """Append one JSON line to the audit log file. + + Creates parent directory if needed. Uses append mode. + """ + AUDIT_FILE.parent.mkdir(parents=True, exist_ok=True) + + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "admin": admin, + "action": action, + "item_id": item_id, + "details": details, + } + + try: + with open(AUDIT_FILE, "a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + except Exception as e: + logger.error(f"Failed to write audit log: {e}") + + +def _validate_transition(current_status: str, new_status: str) -> bool: + """Check if a status transition is valid according to VALID_TRANSITIONS.""" + allowed = VALID_TRANSITIONS.get(current_status, set()) + return new_status in allowed + + +def _check_audience(item: dict, email: str) -> bool: + """Check if a user is in the target audience for an item. + + audience of "all" or None means everyone. + audience of "group:name" checks group membership. + """ + audience = item.get("audience") + if audience is None or audience == "all": + return True + + if audience.startswith("group:"): + group_name = audience[len("group:"):] + groups = _load_groups() + group = groups.get(group_name) + if not group or not isinstance(group, dict): + return False + members = group.get("members", []) + return email in members + + return False + + +def _default_review_by() -> str: + """Return ISO8601 timestamp for now + review_period_months from config.""" + gov = _load_governance_config() + months = gov.get("review_period_months", 6) + review_date = datetime.now(timezone.utc) + timedelta(days=months * 30) + return review_date.isoformat() + def get_user_display(username: str) -> dict: """Get display info for a username. @@ -92,6 +242,7 @@ def get_knowledge( sort: str = "score", username: str | None = None, my_rules: bool = False, + include_statuses: set[str] | None = None, ) -> dict[str, Any]: """Get knowledge items with optional filtering and pagination. @@ -103,6 +254,8 @@ def get_knowledge( sort: Sort field (score, updated_at, contributors) username: Current user's username (for my_rules filter) my_rules: If True, only show items user has upvoted + include_statuses: If governance active, filter to these statuses. + None = default to approved+mandatory. Ignored in legacy mode. Returns: Dict with items list, total count, and pagination info. @@ -111,9 +264,26 @@ def get_knowledge( items_dict = data.get("items", {}) votes_data = _read_json(VOTES_FILE) + governance_mode = get_governance_mode() + + # Determine which statuses to include + if governance_mode is not None: + if include_statuses is not None: + allowed_statuses = include_statuses + else: + allowed_statuses = {"approved", "mandatory"} + else: + allowed_statuses = None # Legacy: no filtering + # Convert to list and calculate scores items = [] for item_id, item in items_dict.items(): + # Status filtering for governance mode + if allowed_statuses is not None: + item_status = item.get("status", "approved") + if item_status not in allowed_statuses: + continue + # Calculate upvotes and downvotes separately upvotes = 0 downvotes = 0 @@ -140,6 +310,11 @@ def get_knowledge( {"username": u, **get_user_display(u)} for u in item.get("source_users", []) ] + + # Add governance fields + item_copy["is_mandatory"] = item.get("status") == "mandatory" + item_copy["mandatory_reason"] = item.get("mandatory_reason") + items.append(item_copy) # Apply filters @@ -188,6 +363,7 @@ def get_stats() -> dict[str, Any]: Returns: Dict with contributor count, knowledge count, etc. + If governance is active, also includes status counts. """ data = _read_json(KNOWLEDGE_FILE) items = data.get("items", {}) @@ -204,13 +380,32 @@ def get_stats() -> dict[str, Any]: cat = item.get("category", "general") categories[cat] = categories.get(cat, 0) + 1 - return { + result = { "knowledge_count": len(items), "contributors": len(contributors), "categories": categories, "last_collection": metadata.get("last_collection"), } + # Add governance status counts if active + if get_governance_mode() is not None: + pending_count = 0 + approved_count = 0 + mandatory_count = 0 + for item in items.values(): + status = item.get("status", "approved") + if status == "pending": + pending_count += 1 + elif status == "approved": + approved_count += 1 + elif status == "mandatory": + mandatory_count += 1 + result["pending_count"] = pending_count + result["approved_count"] = approved_count + result["mandatory_count"] = mandatory_count + + return result + def get_user_stats(username: str) -> dict[str, Any]: """Get user-specific statistics. @@ -251,6 +446,11 @@ def vote(username: str, item_id: str, vote_value: int) -> tuple[bool, str]: Returns: Tuple of (success, message). """ + # Check governance mode restrictions + governance_mode = get_governance_mode() + if governance_mode == "mandatory_only": + return False, "Voting is disabled in this governance mode" + # Validate vote value if vote_value not in (-1, 0, 1): return False, "Invalid vote value. Use -1, 0, or 1." @@ -297,24 +497,54 @@ def get_user_votes(username: str) -> dict[str, int]: def get_user_rules(username: str) -> list[dict]: """Get knowledge items that should be synced to user's rules. - Returns all items the user has upvoted (personal choice, no threshold). + In legacy mode (no governance): returns all items the user has upvoted. + In governance mode: + - "hybrid": mandatory items (audience-checked) + user-upvoted approved items + - "mandatory_only" / "admin_curated": only mandatory items (audience-checked) Args: username: The username. Returns: - List of knowledge items to sync. + List of knowledge items to sync (deduplicated). """ - votes_data = _read_json(VOTES_FILE) - user_votes = votes_data.get(username, {}) - knowledge_data = _read_json(KNOWLEDGE_FILE) items = knowledge_data.get("items", {}) - rules = [] - for item_id, vote_val in user_votes.items(): - if vote_val > 0 and item_id in items: - rules.append(items[item_id]) + governance_mode = get_governance_mode() + + if governance_mode is None: + # Legacy mode: upvoted items only (original behavior) + votes_data = _read_json(VOTES_FILE) + user_votes = votes_data.get(username, {}) + + rules = [] + for item_id, vote_val in user_votes.items(): + if vote_val > 0 and item_id in items: + rules.append(items[item_id]) + return rules + + # Governance mode: collect mandatory + optionally upvoted items + seen_ids: set[str] = set() + rules: list[dict] = [] + + # Always include mandatory items that pass audience check + for item_id, item in items.items(): + if item.get("status") == "mandatory" and _check_audience(item, username): + rules.append(item) + seen_ids.add(item_id) + + # In hybrid mode, also include user-upvoted approved items + if governance_mode == "hybrid": + votes_data = _read_json(VOTES_FILE) + user_votes = votes_data.get(username, {}) + + for item_id, vote_val in user_votes.items(): + if vote_val > 0 and item_id in items and item_id not in seen_ids: + item = items[item_id] + if item.get("status") == "approved": + rules.append(item) + seen_ids.add(item_id) return rules @@ -420,3 +650,462 @@ def regenerate_all_user_rules() -> dict[str, int]: results[username] = len(rules) return results + + +# --------------------------------------------------------------------------- +# Governance: admin action functions +# --------------------------------------------------------------------------- + + +def _regenerate_rules_for_audience(audience: str) -> None: + """Regenerate rules for all users affected by an audience change. + + If audience is "all", regenerate for all users who have voted. + If audience is "group:name", regenerate for group members only. + """ + if audience == "all" or audience is None: + regenerate_all_user_rules() + return + + if audience.startswith("group:"): + group_name = audience[len("group:"):] + groups = _load_groups() + group = groups.get(group_name) + if group and isinstance(group, dict): + for member_email in group.get("members", []): + _regenerate_user_rules(member_email) + + +def approve_item(admin_email: str, item_id: str) -> tuple[bool, str]: + """Approve a knowledge item. + + Args: + admin_email: Email of the admin performing the action. + item_id: The knowledge item ID to approve. + + Returns: + Tuple of (success, error_or_success_message). + """ + if not is_km_admin(admin_email): + return False, "Permission denied: user is not a km_admin" + + knowledge_data = _read_json(KNOWLEDGE_FILE) + items = knowledge_data.get("items", {}) + + if item_id not in items: + return False, f"Knowledge item {item_id} not found" + + item = items[item_id] + current_status = item.get("status", "pending") + + if not _validate_transition(current_status, "approved"): + return False, f"Cannot transition from '{current_status}' to 'approved'" + + now = datetime.now(timezone.utc).isoformat() + item["status"] = "approved" + item["approved_by"] = admin_email + item["approved_at"] = now + item["review_by"] = _default_review_by() + item["updated_at"] = now + + _write_json(KNOWLEDGE_FILE, knowledge_data) + _write_audit_log(admin_email, "approved", item_id, { + "previous_status": current_status, + }) + + logger.info(f"Item {item_id} approved by {admin_email}") + return True, "Item approved" + + +def reject_item( + admin_email: str, + item_id: str, + reason: str | None = None, +) -> tuple[bool, str]: + """Reject a knowledge item. + + Args: + admin_email: Email of the admin performing the action. + item_id: The knowledge item ID to reject. + reason: Optional rejection reason. + + Returns: + Tuple of (success, error_or_success_message). + """ + if not is_km_admin(admin_email): + return False, "Permission denied: user is not a km_admin" + + knowledge_data = _read_json(KNOWLEDGE_FILE) + items = knowledge_data.get("items", {}) + + if item_id not in items: + return False, f"Knowledge item {item_id} not found" + + item = items[item_id] + current_status = item.get("status", "pending") + + if not _validate_transition(current_status, "rejected"): + return False, f"Cannot transition from '{current_status}' to 'rejected'" + + now = datetime.now(timezone.utc).isoformat() + item["status"] = "rejected" + item["rejected_by"] = admin_email + item["rejected_at"] = now + if reason: + item["rejection_reason"] = reason + item["updated_at"] = now + + _write_json(KNOWLEDGE_FILE, knowledge_data) + _write_audit_log(admin_email, "rejected", item_id, { + "previous_status": current_status, + "reason": reason, + }) + + logger.info(f"Item {item_id} rejected by {admin_email}") + return True, "Item rejected" + + +def mandate_item( + admin_email: str, + item_id: str, + mandatory_reason: str, + audience: str = "all", +) -> tuple[bool, str]: + """Mark a knowledge item as mandatory for a target audience. + + Args: + admin_email: Email of the admin performing the action. + item_id: The knowledge item ID to mandate. + mandatory_reason: Required reason for mandating (must be non-empty). + audience: Target audience — "all" or "group:name". + + Returns: + Tuple of (success, error_or_success_message). + """ + if not is_km_admin(admin_email): + return False, "Permission denied: user is not a km_admin" + + if not mandatory_reason or not mandatory_reason.strip(): + return False, "mandatory_reason is required and must be non-empty" + + # Validate audience format + if audience != "all" and not audience.startswith("group:"): + return False, f"Invalid audience format: '{audience}'. Use 'all' or 'group:'" + + # Validate group exists if specified + if audience.startswith("group:"): + group_name = audience[len("group:"):] + groups = _load_groups() + if group_name not in groups: + return False, f"Group '{group_name}' not found in config" + + knowledge_data = _read_json(KNOWLEDGE_FILE) + items = knowledge_data.get("items", {}) + + if item_id not in items: + return False, f"Knowledge item {item_id} not found" + + item = items[item_id] + current_status = item.get("status", "pending") + + if not _validate_transition(current_status, "mandatory"): + return False, f"Cannot transition from '{current_status}' to 'mandatory'" + + now = datetime.now(timezone.utc).isoformat() + item["status"] = "mandatory" + item["mandatory_reason"] = mandatory_reason.strip() + item["audience"] = audience + item["approved_by"] = admin_email + item["approved_at"] = now + item["review_by"] = _default_review_by() + item["updated_at"] = now + + _write_json(KNOWLEDGE_FILE, knowledge_data) + _write_audit_log(admin_email, "mandated", item_id, { + "previous_status": current_status, + "mandatory_reason": mandatory_reason.strip(), + "audience": audience, + }) + + # Regenerate rules for affected users + _regenerate_rules_for_audience(audience) + + logger.info(f"Item {item_id} mandated by {admin_email} for audience={audience}") + return True, "Item mandated" + + +def revoke_item( + admin_email: str, + item_id: str, + reason: str | None = None, +) -> tuple[bool, str]: + """Revoke a mandatory knowledge item. + + Only valid from "mandatory" status. Sets status to "revoked" and + triggers rule regeneration for all users to remove the revoked item. + + Args: + admin_email: Email of the admin performing the action. + item_id: The knowledge item ID to revoke. + reason: Optional revocation reason. + + Returns: + Tuple of (success, error_or_success_message). + """ + if not is_km_admin(admin_email): + return False, "Permission denied: user is not a km_admin" + + knowledge_data = _read_json(KNOWLEDGE_FILE) + items = knowledge_data.get("items", {}) + + if item_id not in items: + return False, f"Knowledge item {item_id} not found" + + item = items[item_id] + current_status = item.get("status", "pending") + + if not _validate_transition(current_status, "revoked"): + return False, f"Cannot transition from '{current_status}' to 'revoked'" + + now = datetime.now(timezone.utc).isoformat() + item["status"] = "revoked" + item["revoked_by"] = admin_email + item["revoked_at"] = now + if reason: + item["revocation_reason"] = reason + item["updated_at"] = now + + _write_json(KNOWLEDGE_FILE, knowledge_data) + _write_audit_log(admin_email, "revoked", item_id, { + "previous_status": current_status, + "reason": reason, + }) + + # Regenerate rules for ALL users to remove revoked item + regenerate_all_user_rules() + + logger.info(f"Item {item_id} revoked by {admin_email}") + return True, "Item revoked" + + +def edit_item( + admin_email: str, + item_id: str, + title: str | None = None, + content: str | None = None, +) -> tuple[bool, str]: + """Edit a knowledge item's title and/or content. + + At least one of title or content must be provided. + + Args: + admin_email: Email of the admin performing the edit. + item_id: The knowledge item ID to edit. + title: New title (or None to keep existing). + content: New content (or None to keep existing). + + Returns: + Tuple of (success, error_or_success_message). + """ + if not is_km_admin(admin_email): + return False, "Permission denied: user is not a km_admin" + + if title is None and content is None: + return False, "At least one of title or content must be provided" + + knowledge_data = _read_json(KNOWLEDGE_FILE) + items = knowledge_data.get("items", {}) + + if item_id not in items: + return False, f"Knowledge item {item_id} not found" + + item = items[item_id] + now = datetime.now(timezone.utc).isoformat() + + audit_details: dict[str, Any] = {} + + if title is not None: + audit_details["old_title"] = item.get("title") + audit_details["new_title"] = title + item["title"] = title + + if content is not None: + audit_details["old_content"] = item.get("content") + audit_details["new_content"] = content + item["content"] = content + + item["edited_by"] = admin_email + item["edited_at"] = now + item["updated_at"] = now + + _write_json(KNOWLEDGE_FILE, knowledge_data) + _write_audit_log(admin_email, "edited", item_id, audit_details) + + logger.info(f"Item {item_id} edited by {admin_email}") + return True, "Item edited" + + +def batch_action( + admin_email: str, + item_ids: list[str], + action: str, + **kwargs: Any, +) -> dict: + """Perform a governance action on multiple items. + + Not atomic — partial success is OK. + + Args: + admin_email: Email of the admin performing the action. + item_ids: List of knowledge item IDs. + action: One of "approve", "reject", "mandate". + **kwargs: Additional arguments passed to the action function. + For "mandate": mandatory_reason (required), audience (default "all"). + + Returns: + Dict with "success" (list of IDs) and "failed" (list of {id, error}). + """ + valid_actions = {"approve", "reject", "mandate"} + if action not in valid_actions: + return { + "success": [], + "failed": [{"id": "N/A", "error": f"Invalid action: '{action}'. Must be one of {valid_actions}"}], + } + + action_map = { + "approve": lambda item_id: approve_item(admin_email, item_id), + "reject": lambda item_id: reject_item( + admin_email, item_id, reason=kwargs.get("reason"), + ), + "mandate": lambda item_id: mandate_item( + admin_email, + item_id, + mandatory_reason=kwargs.get("mandatory_reason", ""), + audience=kwargs.get("audience", "all"), + ), + } + + action_fn = action_map[action] + success: list[str] = [] + failed: list[dict] = [] + + for item_id in item_ids: + ok, message = action_fn(item_id) + if ok: + success.append(item_id) + else: + failed.append({"id": item_id, "error": message}) + + return {"success": success, "failed": failed} + + +def get_pending_queue( + category: str | None = None, + page: int = 0, + per_page: int = 20, +) -> dict: + """Get pending knowledge items awaiting admin review. + + Args: + category: Optional category filter. + page: Page number (0-indexed). + per_page: Items per page. + + Returns: + Dict with items list, total count, and pagination info. + """ + return get_knowledge( + category=category, + page=page, + per_page=per_page, + include_statuses={"pending"}, + ) + + +def get_audit_log( + page: int = 0, + per_page: int = 50, + admin: str | None = None, + action: str | None = None, +) -> dict: + """Read and paginate the audit log. + + Args: + page: Page number (0-indexed). + per_page: Entries per page. + admin: Filter by admin email. + action: Filter by action type. + + Returns: + Dict with entries, total count, and pagination info. + """ + entries: list[dict] = [] + + try: + with open(AUDIT_FILE, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + entries.append(entry) + except json.JSONDecodeError: + continue + except FileNotFoundError: + pass + + # Apply filters + if admin: + entries = [e for e in entries if e.get("admin") == admin] + if action: + entries = [e for e in entries if e.get("action") == action] + + # Sort newest first + entries.sort(key=lambda e: e.get("timestamp", ""), reverse=True) + + # Paginate + total = len(entries) + start = page * per_page + end = start + per_page + page_entries = entries[start:end] + + return { + "entries": page_entries, + "total": total, + "page": page, + "per_page": per_page, + } + + +def migrate_existing_items() -> int: + """Migrate existing knowledge items without a status field. + + Sets status="approved" with migration metadata for items that lack + a status field. Idempotent — items that already have a status are skipped. + + Returns: + Number of items migrated. + """ + knowledge_data = _read_json(KNOWLEDGE_FILE) + items = knowledge_data.get("items", {}) + now = datetime.now(timezone.utc).isoformat() + count = 0 + + for item_id, item in items.items(): + if "status" not in item: + item["status"] = "approved" + item["approved_by"] = "migration" + item["approved_at"] = now + item["review_by"] = _default_review_by() + + _write_audit_log("migration", "migration_auto_approved", item_id, { + "reason": "Pre-governance item auto-approved during migration", + }) + count += 1 + + if count > 0: + _write_json(KNOWLEDGE_FILE, knowledge_data) + logger.info(f"Migrated {count} items to approved status") + + return count