Task 0.5 of clean-analyst-bootstrap. Greenfield rewrite — no fallback, no aliases. Existing dev environments lose their cached PAT and must re-authenticate. Env var renames (hard cutover): - DA_CONFIG_DIR -> AGNES_CONFIG_DIR - DA_SERVER -> AGNES_SERVER - DA_SERVER_URL -> AGNES_SERVER_URL (test-only stale ref, not in spec) - DA_NO_UPDATE_CHECK -> AGNES_NO_UPDATE_CHECK - DA_LOCAL_DIR -> AGNES_LOCAL_DIR - DA_TOKEN -> AGNES_TOKEN - DA_STREAM_RETRIES -> AGNES_STREAM_RETRIES Config dir rename: ~/.config/da/ -> ~/.config/agnes/ (across code, comments, docstrings, error messages, install templates, dev scripts). Stale `da X` references in CLI source (and adjacent app/, tests/): swept docstrings, comments, help text, and error messages where the verb survives the rewrite (init, pull, push, catalog, status, diagnose, auth, admin, skills, query, schema, describe, explore, disk-info, snapshot, login, logout, whoami, server, setup) and replaced `da X` with `agnes X`. Intentionally kept `da sync`, `da fetch`, `da analyst`, `da metrics` — those verbs are removed in later tasks; the legacy strings will be detected by `_LEGACY_STRINGS` (added in Task 2). Test fixes: - TestCLIVersion now asserts output starts with `agnes ` (was `da `). Test results: 2675 passed, 25 skipped (full pytest run, excluding 9 pre-existing test_db.py / test_user_management.py / test_e2e_extract.py / test_cli_binary_rename.py failures unrelated to this rename).
53 lines
1.8 KiB
Python
53 lines
1.8 KiB
Python
"""`agnes disk-info` — show snapshot dir disk usage (spec §4.3)."""
|
|
|
|
import json as json_lib
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
import typer
|
|
|
|
disk_info_app = typer.Typer(help="Show snapshot disk usage")
|
|
|
|
|
|
def _local_dir() -> Path:
|
|
return Path(os.environ.get("AGNES_LOCAL_DIR", ".")).resolve()
|
|
|
|
|
|
def _format_size(n: int) -> str:
|
|
size = float(n)
|
|
for unit in ("B", "KB", "MB", "GB", "TB"):
|
|
if size < 1024 or unit == "TB":
|
|
return f"{size:.1f} {unit}"
|
|
size /= 1024
|
|
return f"{size:.1f} TB"
|
|
|
|
|
|
@disk_info_app.callback(invoke_without_command=True)
|
|
def disk_info(
|
|
ctx: typer.Context,
|
|
json: bool = typer.Option(False, "--json"),
|
|
):
|
|
"""Show snapshots disk usage."""
|
|
if ctx.invoked_subcommand is not None:
|
|
return
|
|
snap_dir = _local_dir() / "user" / "snapshots"
|
|
used = sum(p.stat().st_size for p in snap_dir.rglob("*") if p.is_file()) if snap_dir.exists() else 0
|
|
count = len(list(snap_dir.glob("*.parquet"))) if snap_dir.exists() else 0
|
|
# Fall back to the user's local dir for free-space stats when the
|
|
# snapshot dir doesn't exist yet (first run, before any `da fetch`).
|
|
# `0` would misleadingly suggest the disk is full.
|
|
free = shutil.disk_usage(snap_dir if snap_dir.exists() else _local_dir()).free
|
|
quota_gb = int(os.environ.get("AGNES_SNAPSHOT_QUOTA_GB", "10"))
|
|
|
|
if json:
|
|
typer.echo(json_lib.dumps({
|
|
"snapshots_dir": str(snap_dir),
|
|
"used_bytes": used, "snapshot_count": count,
|
|
"free_bytes": free, "quota_gb": quota_gb,
|
|
}))
|
|
return
|
|
|
|
typer.echo(f"Snapshots dir: {snap_dir}")
|
|
typer.echo(f"Used by Agnes: {_format_size(used)} across {count} snapshots")
|
|
typer.echo(f"Free disk: {_format_size(free)}")
|
|
typer.echo(f"Configured cap: {quota_gb} GB (set AGNES_SNAPSHOT_QUOTA_GB to override)")
|