agnes-the-ai-analyst/cli/commands/memory_admin.py
ZdenekSrotyr 82c5d71d63
feat(memory): #62 — duplicate hints + tree-view + bulk-edit (#126)
Issue #62. Tree view with cross-axis filtering, duplicate-candidate hints (Jaccard score on entity overlap), bulk-edit endpoints (PATCH /api/memory/admin/{id} + POST /api/memory/admin/bulk-update), schema v17 (knowledge_item_relations), full CLI parity (da admin memory tree/edit/bulk-edit/duplicates list/resolve).
2026-04-29 13:55:15 +02:00

290 lines
11 KiB
Python

"""Admin commands for corporate memory — ``da admin memory ...``.
Mounted under the existing ``da admin`` Typer group. API/CLI parity is the
design rule (issue #62): every endpoint exposed by ``app/api/memory.py``'s
admin surface has a CLI counterpart here. Output defaults to a compact
human-readable form; pass ``--json`` for machine-friendly output.
"""
import json as _json
from typing import Optional
import typer
from cli.client import api_get, api_post, api_patch
memory_admin_app = typer.Typer(
help="Corporate memory admin operations (requires admin role)",
no_args_is_help=True,
)
duplicates_app = typer.Typer(
help="List and resolve duplicate-candidate hints",
no_args_is_help=True,
)
memory_admin_app.add_typer(duplicates_app, name="duplicates")
def _fail(resp, what: str) -> None:
"""Print a CLI-friendly error and exit non-zero."""
try:
body = resp.json()
msg = body.get("detail") or body.get("error") or resp.text
except Exception:
msg = resp.text
typer.echo(f"Failed to {what}: {msg}", err=True)
raise typer.Exit(1)
# ----- tree -----
@memory_admin_app.command("tree")
def tree(
axis: str = typer.Option("domain", "--axis", help="domain | category | tag | audience"),
status_filter: Optional[str] = typer.Option(None, "--status", help="Filter by status"),
source_type: Optional[str] = typer.Option(None, "--source-type", help="Filter by source_type"),
audience: Optional[str] = typer.Option(None, "--audience", help="Filter by audience value"),
q: Optional[str] = typer.Option(None, "-q", "--query", help="Substring filter on title/content"),
has_duplicate: bool = typer.Option(False, "--has-duplicate", help="Only items with unresolved duplicate-candidates"),
per_page: int = typer.Option(50, "--per-page", help="Groups per page"),
page: int = typer.Option(1, "--page"),
as_json: bool = typer.Option(False, "--json", help="Emit raw JSON"),
):
"""Group knowledge items by ``axis`` and apply chip filters."""
params: dict = {"axis": axis, "page": page, "per_page": per_page}
if status_filter:
params["status_filter"] = status_filter
if source_type:
params["source_type"] = source_type
if audience:
params["audience"] = audience
if q:
params["q"] = q
if has_duplicate:
params["has_duplicate"] = "true"
resp = api_get("/api/memory/tree", params=params)
if resp.status_code != 200:
_fail(resp, "list tree")
data = resp.json()
if as_json:
typer.echo(_json.dumps(data, indent=2))
return
typer.echo(f"axis={data['axis']} groups={data['total_groups']} items={data['total_items']}")
for g in data.get("groups", []):
typer.echo(f" [{g['count']:>4}] {g['label']}")
for item in g.get("items", [])[:5]:
typer.echo(f" - {item.get('title', '(untitled)')} (id={item.get('id', '')[:12]})")
if g["count"] > 5:
typer.echo(f" ... +{g['count'] - 5} more")
# ----- edit -----
@memory_admin_app.command("edit")
def edit(
item_id: str = typer.Argument(..., help="Knowledge item id"),
title: Optional[str] = typer.Option(None, "--title"),
content: Optional[str] = typer.Option(None, "--content"),
category: Optional[str] = typer.Option(None, "--category"),
domain: Optional[str] = typer.Option(None, "--domain"),
audience: Optional[str] = typer.Option(None, "--audience"),
add_tag: list[str] = typer.Option([], "--add-tag", help="Tag to add (repeatable)"),
remove_tag: list[str] = typer.Option([], "--remove-tag", help="Tag to remove (repeatable)"),
set_tags: Optional[str] = typer.Option(None, "--set-tags", help="Comma-separated replacement set (overrides add/remove)"),
):
"""Patch a knowledge item — partial update."""
body: dict = {}
if title is not None:
body["title"] = title
if content is not None:
body["content"] = content
if category is not None:
body["category"] = category
if domain is not None:
body["domain"] = domain
if audience is not None:
body["audience"] = audience
# ``--set-tags`` is a hard replacement and goes through the PATCH path
# alongside the other full-replacement fields.
if set_tags is not None:
body["tags"] = [t.strip() for t in set_tags.split(",") if t.strip()]
nothing_to_do = not body and not add_tag and not remove_tag and set_tags is None
if nothing_to_do:
typer.echo("Nothing to update — pass at least one --title/--content/--category/--domain/--audience/--add-tag/--remove-tag/--set-tags option.", err=True)
raise typer.Exit(2)
updated_fields: list[str] = []
if body:
resp = api_patch(f"/api/memory/admin/{item_id}", json=body)
if resp.status_code != 200:
_fail(resp, "patch item")
updated_fields.extend(resp.json().get("updated", []))
# ``--add-tag`` / ``--remove-tag`` route through the bulk-update endpoint
# so the merge-with-existing-tags step happens server-side. The previous
# client-side GET-then-PATCH had a silent-data-loss bug when the item id
# lived past page 1 of /api/memory: existing tags came back empty and got
# overwritten by the partial set. The bulk-update endpoint accepts a
# single-id array and supports tags_add / tags_remove natively.
if (add_tag or remove_tag) and set_tags is None:
updates: dict = {}
if add_tag:
updates["tags_add"] = list(add_tag)
if remove_tag:
updates["tags_remove"] = list(remove_tag)
bulk_resp = api_post(
"/api/memory/admin/bulk-update",
json={"item_ids": [item_id], "updates": updates},
)
if bulk_resp.status_code != 200:
_fail(bulk_resp, "update tags")
body_b = bulk_resp.json()
if item_id in (body_b.get("not_found") or []):
typer.echo(f"Item not found: {item_id}", err=True)
raise typer.Exit(1)
if item_id in (body_b.get("errors") or {}):
typer.echo(f"Failed to update tags: {body_b['errors'][item_id]}", err=True)
raise typer.Exit(1)
updated_fields.append("tags")
typer.echo(f"Updated {item_id}: {', '.join(updated_fields)}")
# ----- bulk-edit -----
@memory_admin_app.command("bulk-edit")
def bulk_edit(
ids: str = typer.Option(..., "--ids", help="Comma-separated knowledge item ids"),
category: Optional[str] = typer.Option(None, "--category"),
domain: Optional[str] = typer.Option(None, "--domain"),
audience: Optional[str] = typer.Option(None, "--audience"),
add_tag: list[str] = typer.Option([], "--add-tag"),
remove_tag: list[str] = typer.Option([], "--remove-tag"),
):
"""Apply the same updates to many items in one call."""
item_ids = [s.strip() for s in ids.split(",") if s.strip()]
if not item_ids:
typer.echo("No --ids provided.", err=True)
raise typer.Exit(2)
updates: dict = {}
if category is not None:
updates["category"] = category
if domain is not None:
updates["domain"] = domain
if audience is not None:
updates["audience"] = audience
if add_tag:
updates["tags_add"] = list(add_tag)
if remove_tag:
updates["tags_remove"] = list(remove_tag)
if not updates:
typer.echo("Nothing to update — pass at least one mutation option.", err=True)
raise typer.Exit(2)
resp = api_post(
"/api/memory/admin/bulk-update",
json={"item_ids": item_ids, "updates": updates},
)
if resp.status_code != 200:
_fail(resp, "bulk-update")
body = resp.json()
typer.echo(
f"Updated: {len(body.get('updated', []))} "
f"Not found: {len(body.get('not_found', []))} "
f"Errors: {len(body.get('errors', {}))}"
)
for item_id, msg in (body.get("errors") or {}).items():
typer.echo(f" ! {item_id}: {msg}")
# ----- stats -----
@memory_admin_app.command("stats")
def stats(as_json: bool = typer.Option(False, "--json")):
"""Knowledge-base aggregations including the new by_tag / by_audience."""
resp = api_get("/api/memory/stats")
if resp.status_code != 200:
_fail(resp, "load stats")
data = resp.json()
if as_json:
typer.echo(_json.dumps(data, indent=2))
return
typer.echo(f"Total: {data.get('total', 0)}")
typer.echo("By status:")
for k, v in (data.get("by_status") or {}).items():
typer.echo(f" {k:>12}: {v}")
typer.echo("By domain:")
for k, v in (data.get("by_domain") or {}).items():
typer.echo(f" {k:>12}: {v}")
typer.echo("By source_type:")
for k, v in (data.get("by_source_type") or {}).items():
typer.echo(f" {k:>20}: {v}")
typer.echo("By tag (top 10):")
for k, v in list((data.get("by_tag") or {}).items())[:10]:
typer.echo(f" {k:>20}: {v}")
typer.echo("By audience:")
for k, v in (data.get("by_audience") or {}).items():
typer.echo(f" {k:>20}: {v}")
# ----- duplicates -----
@duplicates_app.command("list")
def duplicates_list(
resolved: Optional[bool] = typer.Option(
None,
"--resolved/--unresolved",
help="Filter by resolution state (omit both flags to fetch all)",
),
limit: int = typer.Option(100, "--limit"),
as_json: bool = typer.Option(False, "--json"),
):
"""List duplicate-candidate relations."""
params: dict = {"limit": limit}
# Only forward ``resolved`` when the user passed --resolved/--unresolved
# explicitly. Omitting it lets the API return both states (the API treats
# ``resolved=None`` as "no filter"). The API's own default is False, so
# the no-flag CLI invocation now matches the bare ``GET`` against the API.
if resolved is not None:
params["resolved"] = "true" if resolved else "false"
resp = api_get("/api/memory/admin/duplicate-candidates", params=params)
if resp.status_code != 200:
_fail(resp, "list duplicates")
data = resp.json()
if as_json:
typer.echo(_json.dumps(data, indent=2))
return
relations = data.get("relations", [])
typer.echo(f"Duplicate candidates: {len(relations)}")
for r in relations:
a = r.get("item_a") or {}
b = r.get("item_b") or {}
score = r.get("score")
score_str = f"{score:.2f}" if isinstance(score, (int, float)) else "?"
typer.echo(
f" [{score_str}] {r['item_a_id'][:12]}={a.get('title', '?')[:60]!r} "
f"<-> {r['item_b_id'][:12]}={b.get('title', '?')[:60]!r}"
)
@duplicates_app.command("resolve")
def duplicates_resolve(
item_a_id: str = typer.Argument(...),
item_b_id: str = typer.Argument(...),
as_: str = typer.Option(..., "--as", help="duplicate | different | dismissed"),
):
"""Resolve a duplicate-candidate pair."""
resp = api_post(
"/api/memory/admin/duplicate-candidates/resolve",
params={"item_a_id": item_a_id, "item_b_id": item_b_id},
json={"resolution": as_},
)
if resp.status_code != 200:
_fail(resp, "resolve duplicate")
typer.echo(f"Resolved {item_a_id} <-> {item_b_id} as '{as_}'.")