diff --git a/CHANGELOG.md b/CHANGELOG.md index 79d32c6..cac8bda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,116 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.45.0] — 2026-05-07 + +Operator-and-analyst quality bundle: a security fix for the optional +Telegram bot, two CLI gaps closed, and three rounds of UX polish on +`agnes diagnose` and `agnes pull` so non-TTY consumers (CI runners, +Claude Code SessionStart hooks, sub-agent watchdogs) get readable, +actionable signal. Closes #84, #164, #177, #178, #203, #204. + +### Security + +- **Telegram bot pairing-code RNG hardened (#84).** The pairing code + used to link a Telegram chat to an Agnes user is now generated via + `secrets.choice` (CSPRNG) rather than `random.choices`. Pre-fix an + attacker who scraped one issued code could recover the `random` + module's PRNG state and predict subsequent codes issued in the same + process — the fix neutralizes that class of attack + (`services/telegram_bot/storage.py:_generate_code`). +- **Telegram script runner refuses out-of-shape usernames (#84).** The + optional notification runner shells out via `sudo -u `. A + username controlled by an attacker — e.g. via tampering with + `telegram_users.json` — could otherwise carry sudo flags + (`-u`, `--shell=…`) or shell metacharacters. The runner now validates + the value against a POSIX-conservative regex + (`^[a-z_][a-z0-9._-]{0,31}$`) and returns `None` before invoking + `subprocess.run` if it doesn't match + (`services/telegram_bot/runner.py:_USERNAME_RE`). + +### Added + +- `agnes admin unregister-table ` — CLI wrapper for + `DELETE /api/admin/registry/{id}` (#177). Confirms before destructive + action; pass `--yes` to skip the prompt in scripts. The server-side + endpoint already does the parquet/`sync_state` cleanup; the CLI is a + thin client. +- `agnes admin update-table ` — CLI wrapper for + `PUT /api/admin/registry/{id}` (#177). Only the supplied flags go in + the body (`--name`, `--bucket`, `--source-table`, `--query-mode`, + `--query`, `--description`, `--sync-schedule`, `--source-type`); the + rest stay unchanged on the server. `--query` accepts `@path/to.sql` + for files. Calling with no flags errors (`No fields supplied`) + instead of silently no-opping. +- `agnes diagnose --include-schema` (#204). The default `agnes + diagnose` no longer surfaces the DB schema-version check — analysts + hitting the CLI rarely care about the integer, and it dominated the + agent-facing output. Pass `--include-schema` (or query + `/api/health/detailed?include=schema` directly) when verifying a + migration. +- **`info` severity tier in `/api/health/detailed`** (#178). Sits + between `ok` and `warning`: surfaces a non-trivial observation + worth reading without promoting the headline status to `degraded`. + See the module docstring at `app/api/health.py` for the full + severity ladder. The BQ billing-equals-data check is the first + consumer (was `warning` → now `info`). +- `AGNES_PULL_PROGRESS_INTERVAL_SECONDS` and + `AGNES_PULL_PROGRESS_INTERVAL_BYTES` env knobs for the textual + progress emitter (#203). Defaults are tighter than pre-fix (5 s / + 1 MiB vs the previous 30 s / 10%-of-total) so non-TTY consumers + see continuous output and don't trip dead-process watchdogs on + multi-GB parquets. Override either independently. + +### Changed + +- **`agnes pull` non-TTY progress is more chatty by default (#203).** + Previous cadence (30 s / 10%) produced one line every several + minutes on multi-GB parquets, long enough for Claude Code + sub-agent watchdogs to kill the pull as a hung process. New + defaults: emit when *any* of (10% boundary, 5 s elapsed, 1 MiB + bytes since last emit). The 10% boundary is unchanged so small + files still get the original visual rhythm. +- **`/api/health/detailed` no longer includes `db_schema` by default + (#204).** Pass `?include=schema` to opt back in. The aggregator + treats the schema check as "not asserted" when absent, so + unrelated services can still drive the headline. Operators using + the legacy entry should add the parameter to their probe + configuration. +- **BQ billing-project equals data-project surfaces as `info`, not + `warning` (#178).** Many valid single-project dev instances run + with billing == data; the message is informational. The `detail` + + `hint` strings are unchanged so the operator still gets the + USER_PROJECT_DENIED context if they're hitting it. Pre-fix, the + message alone promoted the overall headline to `degraded` even on + intentionally collapsed setups. +- `agnes init --force` now snapshots the prior `CLAUDE.md` to + `CLAUDE.md.bak.` before regenerating it (#164). Each + re-run produces a fresh backup; the prior backup is not clobbered. + A FS error on the backup path is logged but does not abort the + init (the existing-workspace gate still requires `--force`). + ### Internal -- `infra/modules/customer-instance` (tag `infra-v1.8.0`): `startup-script.sh.tpl` no longer overwrites operator-edited `AGNES_TAG` / `AGNES_TEMP_DIR` in `/opt/agnes/.env` on every boot. Reads the existing values when present and lets them win over the template-computed `$IMAGE_TAG`. Pre-fix, an in-place TF action that stopped/started the VM (e.g. `machine_type` change) would re-run the startup script and clobber any manually-pinned image tag — operators had to re-edit the file post-restart. Fresh provisions still get the TF-driven values; the `.env` file's existence is the disambiguator. To force a TF-driven reset, `rm /opt/agnes/.env` and reboot. +- New `cli.client.api_put` helper to mirror `api_get` / + `api_post` / `api_delete` / `api_patch` for the new + `update-table` command. +- Tests added: `tests/test_telegram_bot_runner.py`, + `tests/test_health_schema_gate.py`, plus extensions to + `test_telegram_storage`, `test_pull_progress`, `test_diagnose_billing`, + `test_cli_admin`, `test_cli_init`. +- `infra/modules/customer-instance` (tag `infra-v1.8.0`): + `startup-script.sh.tpl` no longer overwrites operator-edited + `AGNES_TAG` / `AGNES_TEMP_DIR` in `/opt/agnes/.env` on every boot. + Reads the existing values when present and lets them win over the + template-computed `$IMAGE_TAG`. Pre-fix, an in-place TF action that + stopped/started the VM (e.g. `machine_type` change) would re-run the + startup script and clobber any manually-pinned image tag — operators + had to re-edit the file post-restart. Fresh provisions still get the + TF-driven values; the `.env` file's existence is the disambiguator. + To force a TF-driven reset, `rm /opt/agnes/.env` and reboot. Folded + in from #214, which landed on main between 0.44.1 and this cut. + +## [0.44.1] — 2026-05-07 ### Fixed diff --git a/app/api/health.py b/app/api/health.py index 9b68f52..e937bbd 100644 --- a/app/api/health.py +++ b/app/api/health.py @@ -1,10 +1,29 @@ -"""Health check endpoint — structured diagnostics for AI agents.""" +"""Health check endpoint — structured diagnostics for AI agents. + +## Severity vocabulary + +Per-check `status` values, in order of escalation: + +- `ok` — nothing to surface. +- `info` — non-trivial observation worth showing the operator, but the + situation isn't broken. **Does not** promote the overall + status to `degraded` (issue #178). +- `unknown`— check couldn't run (missing dependency, FS error). Surfaced + but doesn't promote overall. +- `warning`— real issue, operator should look. Promotes overall to + `degraded`. +- `error` — critical. Promotes overall to `unhealthy`. + +Add an `info`-tier check by returning `{"status": "info", ...}` from the +check function. The aggregator at the bottom of `health_check_detailed` +treats `info` as non-promoting. +""" import os from datetime import datetime, timezone from pathlib import Path -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Query import duckdb from app.auth.dependencies import _get_db, get_current_user @@ -66,15 +85,18 @@ def _check_bq_billing_project() -> dict | None: return {"status": "ok", "detail": "BigQuery project not configured"} if billing == data: + # Issue #178: this is informational, not a fault. Many valid + # single-project dev instances run with billing == data and the SA + # has `serviceusage.services.use`. Keep the message visible but + # don't promote the overall status to `degraded` for it. return { - "status": "warning", + "status": "info", "detail": "BigQuery billing project equals data project", "hint": ( - "Set data_source.bigquery.billing_project in instance.yaml to a " + "If the SA hits USER_PROJECT_DENIED 403, set " + "data_source.bigquery.billing_project in instance.yaml to a " "project the SA can bill against (typically your dev/billable " "project, distinct from a shared read-only data project). " - "Otherwise BQ calls 403 USER_PROJECT_DENIED whenever the SA " - "lacks serviceusage.services.use on the data project. " "Configurable via /admin/server-config UI." ), "billing_project": billing, @@ -230,9 +252,21 @@ async def health_check(): async def health_check_detailed( conn: duckdb.DuckDBPyConnection = Depends(_get_db), _user: dict = Depends(get_current_user), + include: str = Query( + "", + description=( + "Comma-separated list of optional checks to include. " + "Recognised values: `schema` (DB schema version against the " + "expected migration). The default response omits these because " + "they're rarely actionable on a healthy instance and add noise " + "to `agnes diagnose` output (issue #204). Pass `?include=schema` " + "to get the legacy behavior." + ), + ), ): """Structured health check with deployment metadata. Requires authentication.""" checks = {} + include_set = {p.strip() for p in include.split(",") if p.strip()} # DuckDB state try: @@ -241,8 +275,14 @@ async def health_check_detailed( except Exception as e: checks["duckdb_state"] = {"status": "error", "detail": str(e)} - # DB schema version check - checks["db_schema"] = _check_db_schema() + # DB schema version check — opt-in (issue #204). Operators who run a + # fresh release pinned to the same image as the running schema rarely + # care about this number; analysts hitting the endpoint via + # `agnes diagnose` see it as noise. Surface it on demand via + # `?include=schema` (the dashboard / admin UI passes this; default + # CLI does not). + if "schema" in include_set: + checks["db_schema"] = _check_db_schema() # Sync state summary try: @@ -292,6 +332,10 @@ async def health_check_detailed( except Exception as e: checks["session_pipeline"] = {"status": "unknown", "detail": str(e)} + # Aggregate to overall status. `info` and `unknown` surface in the + # response but never escalate the headline (issue #178). `warning` + # promotes to `degraded`; `error` (or a schema mismatch when the + # caller asked for it) promotes to `unhealthy`. overall = "healthy" for check in checks.values(): if check.get("status") == "error": @@ -299,8 +343,9 @@ async def health_check_detailed( break if check.get("status") == "warning": overall = "degraded" - # DB schema mismatch or unreachable also makes the overall status unhealthy - if checks.get("db_schema", {}).get("db_schema") != "ok": + # Schema mismatch only escalates when the caller asked for the check + # — otherwise the absent key is treated as "not asserted". + if "db_schema" in checks and checks["db_schema"].get("db_schema") != "ok": overall = "unhealthy" return { diff --git a/cli/client.py b/cli/client.py index 54efb2d..5ed950c 100644 --- a/cli/client.py +++ b/cli/client.py @@ -387,6 +387,14 @@ def api_patch(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: raise _translate_transport_error(exc, context=f"PATCH {path}", timeout_s=timeout) from exc +def api_put(path: str, *, timeout: float = 30.0, **kwargs) -> httpx.Response: + try: + with get_client(timeout=timeout) as client: + return client.put(path, **kwargs) + except httpx.HTTPError as exc: + raise _translate_transport_error(exc, context=f"PUT {path}", timeout_s=timeout) from exc + + def _is_transient(exc: Exception) -> bool: """Worth retrying? Network blip or 5xx — yes. Auth / 4xx — no.""" if isinstance(exc, (httpx.ConnectError, httpx.ReadError, httpx.WriteError, diff --git a/cli/commands/admin.py b/cli/commands/admin.py index 4fed5bc..cf0f08e 100644 --- a/cli/commands/admin.py +++ b/cli/commands/admin.py @@ -4,7 +4,7 @@ import json import typer -from cli.client import api_get, api_post, api_delete, api_patch +from cli.client import api_get, api_post, api_delete, api_patch, api_put from cli.commands.admin_metrics import admin_metrics_app from cli.commands.admin_store import admin_store_app from cli.commands.memory_admin import memory_admin_app @@ -324,6 +324,139 @@ def list_tables(as_json: bool = typer.Option(False, "--json")): typer.echo(f" {t['name']:30s} src={t.get('source_type','?'):10s} mode={t.get('query_mode','?'):6s} bucket={t.get('bucket',''):20s}") +@admin_app.command("unregister-table") +def unregister_table( + table_id: str = typer.Argument(..., help="Table id to unregister"), + yes: bool = typer.Option( + False, "--yes", "-y", + help="Skip the confirmation prompt (for scripts).", + ), +): + """Unregister a table from the registry. + + Calls `DELETE /api/admin/registry/{table_id}`. The server unhooks the + master view, removes the canonical parquet for materialized rows, and + clears the matching `sync_state` row. Issue #177. + """ + if not yes: + typer.echo(f"About to unregister table: {table_id}") + if not typer.confirm("Continue?"): + typer.echo("Aborted.") + raise typer.Exit(0) + resp = api_delete(f"/api/admin/registry/{table_id}") + if resp.status_code == 204: + typer.echo(f"Unregistered: {table_id}") + return + if resp.status_code == 404: + typer.echo(f"Not registered: {table_id}", err=True) + raise typer.Exit(1) + try: + detail = resp.json().get("detail", resp.text) + except Exception: + detail = resp.text + typer.echo(f"Failed: {detail}", err=True) + raise typer.Exit(1) + + +@admin_app.command("update-table") +def update_table( + table_id: str = typer.Argument(..., help="Table id to update"), + name: str = typer.Option(None, "--name", help="New display name"), + bucket: str = typer.Option(None, "--bucket", help="New bucket / dataset"), + source_table: str = typer.Option( + None, "--source-table", help="New source table name" + ), + query_mode: str = typer.Option( + None, + "--query-mode", + help="New query mode: local | remote | materialized", + ), + query: str = typer.Option( + None, + "--query", + help=( + "New SQL body for query_mode='materialized' (BigQuery). " + "Inline SQL or `@path/to.sql` to read from disk. Use " + "`--query=` (empty value) to clear." + ), + ), + description: str = typer.Option( + None, "--description", help="New description" + ), + sync_schedule: str = typer.Option( + None, + "--sync-schedule", + help="New cron schedule (e.g. 'every 6h' / 'daily 03:00'); honored by materialized BQ rows", + ), + source_type: str = typer.Option( + None, + "--source-type", + help="Change source type. Rare — most edits keep this fixed.", + ), +): + """Update a registered table. + + Calls `PUT /api/admin/registry/{table_id}` with only the supplied + fields. Field omitted → unchanged. Issue #177. + + For BQ rows, the server schedules a background rebuild so the master + view picks up the change without waiting for the next scheduled sync. + Switching `query_mode` away from `materialized` clears the stale + `source_query` automatically. + """ + from pathlib import Path + + payload: dict = {} + if name is not None: + payload["name"] = name + if bucket is not None: + payload["bucket"] = bucket + if source_table is not None: + payload["source_table"] = source_table + if query_mode is not None: + payload["query_mode"] = query_mode + if description is not None: + payload["description"] = description + if sync_schedule is not None: + payload["sync_schedule"] = sync_schedule + if source_type is not None: + payload["source_type"] = source_type + if query is not None: + if query.startswith("@"): + sql_path = Path(query[1:]) + if not sql_path.exists(): + typer.echo(f"Error: SQL file not found: {sql_path}", err=True) + raise typer.Exit(2) + payload["source_query"] = sql_path.read_text(encoding="utf-8").strip() + else: + payload["source_query"] = query.strip() + + if not payload: + typer.echo( + "No fields supplied. Pass at least one of --name, --bucket, " + "--source-table, --query-mode, --query, --description, " + "--sync-schedule, --source-type.", + err=True, + ) + raise typer.Exit(2) + + resp = api_put(f"/api/admin/registry/{table_id}", json=payload) + if resp.status_code == 200: + data = resp.json() + updated = data.get("updated") or sorted(payload.keys()) + typer.echo(f"Updated {table_id}: {', '.join(updated)}") + return + if resp.status_code == 404: + typer.echo(f"Not registered: {table_id}", err=True) + raise typer.Exit(1) + try: + detail = resp.json().get("detail", resp.text) + except Exception: + detail = resp.text + typer.echo(f"Failed: {detail}", err=True) + raise typer.Exit(1) + + @admin_app.command("metadata-show") def metadata_show( table_id: str = typer.Argument(..., help="Table ID to show metadata for"), diff --git a/cli/commands/diagnose.py b/cli/commands/diagnose.py index 95dc77c..9195c94 100644 --- a/cli/commands/diagnose.py +++ b/cli/commands/diagnose.py @@ -16,6 +16,16 @@ def diagnose( symptom: str = typer.Option(None, "--symptom", help="Describe the problem"), component: str = typer.Option(None, "--component", help="Check specific component"), as_json: bool = typer.Option(False, "--json", help="Output as JSON"), + include_schema: bool = typer.Option( + False, + "--include-schema", + help=( + "Include the DB schema-version check. Off by default since the " + "answer is rarely actionable on a healthy instance and shows up " + "as noise in the agent-facing output (issue #204). On when the " + "operator is verifying a migration." + ), + ), ): """Run comprehensive system diagnostics. AI-agent friendly output.""" # If a subcommand was invoked (e.g. `agnes diagnose system`), defer to it @@ -33,7 +43,8 @@ def diagnose( # Detailed health (auth required) for service-level checks try: - resp_d = api_get("/api/health/detailed") + params = {"include": "schema"} if include_schema else None + resp_d = api_get("/api/health/detailed", params=params) detailed = resp_d.json() for svc_name, svc_data in detailed.get("services", {}).items(): check = {"name": svc_name, "status": svc_data.get("status", "unknown")} @@ -45,7 +56,8 @@ def diagnose( except Exception as e: checks.append({"name": "api", "status": "error", "detail": str(e)}) - # Determine overall + # Determine overall — `info` and `unknown` surface in the per-check + # output but never promote the headline (issue #178). overall = "healthy" for c in checks: if c["status"] == "error": diff --git a/cli/commands/init.py b/cli/commands/init.py index 61d539c..c2b4b74 100644 --- a/cli/commands/init.py +++ b/cli/commands/init.py @@ -95,6 +95,29 @@ def init( }}), err=True) raise typer.Exit(1) + # On --force, snapshot the existing CLAUDE.md before regenerating it + # so an operator who edited it can recover their notes (issue #164). + # Backup name carries an ISO timestamp so multiple `--force` runs in + # the same workspace don't clobber each other. We write the backup + # *after* the existing-workspace gate above so the un-forced path is + # unchanged. + if claude_md.exists() and force: + try: + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + backup_path = workspace / f"CLAUDE.md.bak.{ts}" + backup_path.write_bytes(claude_md.read_bytes()) + typer.echo(f"Backed up existing CLAUDE.md → {backup_path.name}") + except OSError as exc: + # FS error on the backup is annoying but shouldn't abort the + # init. Surface it so the operator knows their pre-existing + # CLAUDE.md is about to be overwritten without a recoverable + # copy on disk, then proceed. + typer.echo( + f"Warning: could not write CLAUDE.md backup ({exc}); " + f"continuing with --force overwrite", + err=True, + ) + # ------------------------------------------------------------------ # Step 2: verify the PAT via /api/catalog/tables. # diff --git a/cli/lib/pull.py b/cli/lib/pull.py index 59d4db9..22a436d 100644 --- a/cli/lib/pull.py +++ b/cli/lib/pull.py @@ -65,6 +65,44 @@ class PullResult: _SAFE_ID_RE = re.compile(r"^[a-zA-Z0-9_\-]{1,128}$") +def _read_progress_interval_seconds() -> float: + """Seconds between forced progress emissions per file. Default 5 s. + + Tighter cadence than the original 30 s default keeps non-TTY consumers + (Claude Code sub-agent watchdogs, CI runners) from killing the process + on apparent silence during a slow chunk. Override via + `AGNES_PULL_PROGRESS_INTERVAL_SECONDS`. Issue #203. + """ + raw = os.environ.get("AGNES_PULL_PROGRESS_INTERVAL_SECONDS", "") + if raw: + try: + v = float(raw) + if v > 0: + return v + except ValueError: + pass + return 5.0 + + +def _read_progress_interval_bytes() -> int: + """Bytes between forced progress emissions per file. Default 1 MiB. + + Complements the time-based cadence so fast downloads also emit at a + reasonable rate (the original "every 10% of total" boundary went + unobserved on multi-GB parquets where 10% is tens of seconds of bytes). + Override via `AGNES_PULL_PROGRESS_INTERVAL_BYTES`. Issue #203. + """ + raw = os.environ.get("AGNES_PULL_PROGRESS_INTERVAL_BYTES", "") + if raw: + try: + v = int(raw) + if v > 0: + return v + except ValueError: + pass + return 1024 * 1024 + + class _TextualProgress: """Plain-text progress emitter for non-TTY stderr. @@ -74,9 +112,17 @@ class _TextualProgress: minutes on a multi-GB parquet) or emits raw ANSI noise. This class instead emits one terse line per file at sensible cadence. - Cadence policy: emit when *either*: + Cadence policy: emit when *any* of: - per-file bytes-downloaded crosses a 10%-of-total boundary, OR - - 30 s have elapsed since this file's last emission. + - more than ``AGNES_PULL_PROGRESS_INTERVAL_BYTES`` bytes (default + 1 MiB) since this file's last emission, OR + - more than ``AGNES_PULL_PROGRESS_INTERVAL_SECONDS`` (default 5 s) + since this file's last emission. + + The byte+second floor exists because sub-agent / CI watchdogs read + "no output for N seconds" as a hung process and kill it (issue #203); + the original 30 s / 10% policy was silent enough to trip those gates + on slow links. Always emits one final "done" line per file via `finish()` so the operator sees a confirmed completion even on tiny files. @@ -102,11 +148,14 @@ class _TextualProgress: self._total_files = total_files self._file_sizes = file_sizes self._lock = threading.Lock() + self._interval_seconds = _read_progress_interval_seconds() + self._interval_bytes = _read_progress_interval_bytes() # Per-file state. self._bytes: dict[str, int] = {tid: 0 for tid in file_sizes} self._started_at: dict[str, float] = {} self._last_emit_at: dict[str, float] = {} self._last_emit_pct: dict[str, int] = {} + self._last_emit_bytes: dict[str, int] = {} self._finished_idx: int = 0 # files whose `finish` line has been emitted def advance(self, tid: str, n: int) -> None: @@ -118,16 +167,23 @@ class _TextualProgress: self._started_at[tid] = now self._last_emit_at[tid] = now self._last_emit_pct[tid] = 0 + self._last_emit_bytes[tid] = 0 self._bytes[tid] = self._bytes.get(tid, 0) + n total = self._file_sizes.get(tid, 0) current = self._bytes[tid] pct = int((current * 100) / total) if total > 0 else 0 elapsed = now - self._last_emit_at[tid] + bytes_since_emit = current - self._last_emit_bytes.get(tid, 0) crossed_10 = pct >= self._last_emit_pct[tid] + 10 - if crossed_10 or elapsed >= 30.0: + if ( + crossed_10 + or elapsed >= self._interval_seconds + or bytes_since_emit >= self._interval_bytes + ): self._last_emit_at[tid] = now self._last_emit_pct[tid] = pct - (pct % 10) + self._last_emit_bytes[tid] = current self._emit_line(tid, current, total, now) def finish(self) -> None: diff --git a/pyproject.toml b/pyproject.toml index d53dddc..c15f721 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.44.1" +version = "0.45.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/services/telegram_bot/runner.py b/services/telegram_bot/runner.py index ef89104..f761b59 100644 --- a/services/telegram_bot/runner.py +++ b/services/telegram_bot/runner.py @@ -7,6 +7,7 @@ Runs the script as the owning user via the notify-scripts helper. import json import logging +import re import subprocess from . import config @@ -15,12 +16,22 @@ logger = logging.getLogger(__name__) NOTIFY_SCRIPTS_BIN = "/usr/local/bin/notify-scripts" +# POSIX-conservative username shape: alphanumerics, dot, hyphen, underscore; +# must start with `[a-z_]` so the value can never be interpreted as a sudo +# flag (e.g. `-u`, `--shell`). Mirrors the `useradd` defaults. Anything +# outside this shape is refused before we hand it to `sudo -u`. +_USERNAME_RE = re.compile(r"^[a-z_][a-z0-9._-]{0,31}$") + def run_user_script(username: str, script_name: str) -> dict | None: """Run a notification script as the specified user and return parsed JSON output. Returns None on error, or the parsed JSON dict on success. """ + if not _USERNAME_RE.match(username): + logger.error(f"Refusing to run script: invalid username shape: {username!r}") + return None + if not script_name.endswith(".py"): logger.warning(f"Not a Python script: {script_name}") return None diff --git a/services/telegram_bot/storage.py b/services/telegram_bot/storage.py index 64150fc..03d1574 100644 --- a/services/telegram_bot/storage.py +++ b/services/telegram_bot/storage.py @@ -7,7 +7,7 @@ Thread-safe file operations with atomic writes. import json import logging import os -import random +import secrets import string import tempfile import time @@ -95,8 +95,14 @@ def get_user_status(username: str) -> dict | None: def _generate_code() -> str: - """Generate a random numeric verification code.""" - return "".join(random.choices(string.digits, k=config.CODE_LENGTH)) + """Generate a cryptographically random numeric verification code. + + Uses `secrets.choice` (CSPRNG) rather than `random.choices` because + pairing codes gate account linkage — a predictable PRNG output would + let an attacker who scrapes one code recover the RNG state and predict + others issued in the same process. + """ + return "".join(secrets.choice(string.digits) for _ in range(config.CODE_LENGTH)) def _cleanup_expired(codes: dict) -> dict: diff --git a/tests/test_cli_admin.py b/tests/test_cli_admin.py index e8ecb2c..ea03fdb 100644 --- a/tests/test_cli_admin.py +++ b/tests/test_cli_admin.py @@ -156,6 +156,114 @@ class TestMetadataShow: assert result.exit_code == 1 +class TestUnregisterTable: + """Issue #177: `agnes admin unregister-table` wraps DELETE + /api/admin/registry/{id}. The server endpoint already does the + parquet/sync_state cleanup; the CLI is a thin client.""" + + def test_unregister_success(self): + with patch("cli.commands.admin.api_delete", return_value=_resp(204)): + result = runner.invoke( + app, ["admin", "unregister-table", "orders", "--yes"] + ) + assert result.exit_code == 0, result.output + assert "Unregistered: orders" in result.output + + def test_unregister_not_found(self): + with patch( + "cli.commands.admin.api_delete", + return_value=_resp(404, {"detail": "Table not found"}), + ): + result = runner.invoke( + app, ["admin", "unregister-table", "nope", "--yes"] + ) + assert result.exit_code == 1 + + def test_unregister_prompts_without_yes(self): + """Without --yes, the CLI confirms before destructive action.""" + with patch("cli.commands.admin.api_delete", return_value=_resp(204)) as d: + # Simulate operator typing "n" at the prompt. + result = runner.invoke( + app, ["admin", "unregister-table", "orders"], input="n\n" + ) + # Either Aborted (exit 0) or refuses entirely; either way the + # server must not have been called. + d.assert_not_called() + assert result.exit_code == 0 + + +class TestUpdateTable: + """Issue #177: `agnes admin update-table` wraps PUT + /api/admin/registry/{id}. Only fields the operator passes go in the + body — server-side merge keeps the rest unchanged.""" + + def test_update_only_supplied_fields_sent(self): + captured = {} + + def fake_put(path, **kwargs): + captured["path"] = path + captured["json"] = kwargs.get("json") + return _resp(200, {"id": "orders", "updated": ["bucket"]}) + + with patch("cli.commands.admin.api_put", side_effect=fake_put): + result = runner.invoke( + app, ["admin", "update-table", "orders", "--bucket", "out.c-prod"] + ) + assert result.exit_code == 0, result.output + assert captured["path"] == "/api/admin/registry/orders" + # description must NOT be in the body — operator didn't pass it. + assert captured["json"] == {"bucket": "out.c-prod"} + assert "Updated orders" in result.output + + def test_update_inline_query_for_materialized(self): + captured = {} + + def fake_put(path, **kwargs): + captured["json"] = kwargs.get("json") + return _resp(200, {"id": "rev", "updated": ["query_mode", "source_query"]}) + + with patch("cli.commands.admin.api_put", side_effect=fake_put): + result = runner.invoke(app, [ + "admin", "update-table", "rev", + "--query-mode", "materialized", + "--query", "SELECT 1", + ]) + assert result.exit_code == 0, result.output + assert captured["json"]["query_mode"] == "materialized" + assert captured["json"]["source_query"] == "SELECT 1" + + def test_update_query_at_file(self, tmp_path): + sql_file = tmp_path / "q.sql" + sql_file.write_text("SELECT * FROM orders\n") + captured = {} + + def fake_put(path, **kwargs): + captured["json"] = kwargs.get("json") + return _resp(200, {"id": "rev", "updated": ["source_query"]}) + + with patch("cli.commands.admin.api_put", side_effect=fake_put): + result = runner.invoke( + app, ["admin", "update-table", "rev", "--query", f"@{sql_file}"] + ) + assert result.exit_code == 0, result.output + assert captured["json"]["source_query"] == "SELECT * FROM orders" + + def test_update_no_fields_supplied_errors(self): + result = runner.invoke(app, ["admin", "update-table", "orders"]) + assert result.exit_code == 2 + assert "No fields supplied" in (result.output + (result.stderr or "")) + + def test_update_table_not_found(self): + with patch( + "cli.commands.admin.api_put", + return_value=_resp(404, {"detail": "Table not found"}), + ): + result = runner.invoke( + app, ["admin", "update-table", "nope", "--bucket", "x"] + ) + assert result.exit_code == 1 + + def test_admin_set_role_returns_hardfail(): """v19: `agnes admin set-role` was removed. Calling it must hard-fail with a non-zero exit code and a message pointing at the replacement diff --git a/tests/test_cli_init.py b/tests/test_cli_init.py index ad11ea8..507a1ad 100644 --- a/tests/test_cli_init.py +++ b/tests/test_cli_init.py @@ -123,6 +123,39 @@ def test_init_force_preserves_local_md(tmp_path, monkeypatch): assert "my notes" in (tmp_path / ".claude" / "CLAUDE.local.md").read_text() +def test_init_force_backs_up_existing_claude_md(tmp_path, monkeypatch): + """Issue #164: --force overwrites CLAUDE.md, but the prior content + must be preserved as `CLAUDE.md.bak.` so an operator who + edited it can recover their notes. The backup carries an ISO + timestamp so re-running --force in the same workspace doesn't + clobber a prior backup. + """ + monkeypatch.setenv("AGNES_CONFIG_DIR", str(tmp_path / "_cfg")) + api_get = _make_api_get() + monkeypatch.setattr("cli.commands.init.api_get", api_get, raising=False) + monkeypatch.setattr("cli.lib.pull.api_get", api_get, raising=False) + + # Seed an existing CLAUDE.md the operator has edited. + (tmp_path / "CLAUDE.md").write_text( + "# AI Data Analyst\n\nMy custom edits — must survive reinit.\n" + ) + + r = runner.invoke(init_app, [ + "--server-url", "http://x", + "--token", "t", + "--workspace", str(tmp_path), + "--force", + ]) + assert r.exit_code == 0, r.output + + # Backup file: glob since the timestamp is dynamic. + backups = list(tmp_path.glob("CLAUDE.md.bak.*")) + assert len(backups) == 1, [p.name for p in backups] + assert "must survive reinit" in backups[0].read_text() + # The summary line names the backup so the operator can find it. + assert "Backed up" in r.output, r.output + + def test_init_partial_state_friendly_exit(tmp_path, monkeypatch): """CLAUDE.md exists with marker but no settings.json -> friendly hint, exit 1.""" monkeypatch.setenv("AGNES_CONFIG_DIR", str(tmp_path / "_cfg")) diff --git a/tests/test_diagnose_billing.py b/tests/test_diagnose_billing.py index de75829..f5e8dc8 100644 --- a/tests/test_diagnose_billing.py +++ b/tests/test_diagnose_billing.py @@ -1,9 +1,11 @@ -"""Phase K — `agnes diagnose` warning when BQ billing_project == project. +"""Phase K — `agnes diagnose` info entry when BQ billing_project == project. Surfaces via /api/health/detailed (which `agnes diagnose` already consumes): when data_source.type == 'bigquery' and the resolved BqProjects.billing equals BqProjects.data, the response includes a `services.bq_config` entry with -status='warning' and a hint about the 403 USER_PROJECT_DENIED footgun. +status='info' (since #178 — was 'warning' before) and a hint about the 403 +USER_PROJECT_DENIED footgun. `info` keeps the message visible without +promoting the overall check to 'degraded' the way 'warning' did. """ import pytest @@ -46,7 +48,14 @@ def _reset_after(monkeypatch): def test_diagnose_warns_when_billing_equals_project(seeded_app, monkeypatch): - """BQ instance with billing_project missing (or equal to project) → warning.""" + """BQ instance with billing_project missing (or equal to project) → info. + + Pre-#178 this returned `warning`, which promoted the overall headline + to `degraded`. The check is informational — many valid single-project + dev instances run with billing == data — so it now returns `info` and + the headline stays `healthy` (issue #178). The detail message still + appears so operators can see it. + """ _patch_instance_config(monkeypatch, { "data_source": { "type": "bigquery", @@ -65,10 +74,12 @@ def test_diagnose_warns_when_billing_equals_project(seeded_app, monkeypatch): bq_cfg = body.get("services", {}).get("bq_config") assert bq_cfg is not None, body - assert bq_cfg.get("status") == "warning", bq_cfg + assert bq_cfg.get("status") == "info", bq_cfg # Hint mentions the YAML field path so operators know what to fix. blob = (str(bq_cfg.get("detail", "")) + " " + str(bq_cfg.get("hint", ""))).lower() assert "billing_project" in blob, bq_cfg + # Info severity must not promote the headline to degraded. + assert body.get("status") == "healthy", body.get("status") def test_diagnose_clean_when_billing_differs(seeded_app, monkeypatch): diff --git a/tests/test_health_schema_gate.py b/tests/test_health_schema_gate.py new file mode 100644 index 0000000..58d9910 --- /dev/null +++ b/tests/test_health_schema_gate.py @@ -0,0 +1,90 @@ +"""Health-check schema-version check is opt-in (#204) and severity (#178). + +Two adjacent behaviors: + +- `GET /api/health/detailed` no longer includes `db_schema` by default. + Pass `?include=schema` to get it. Rationale: the schema version is + rarely actionable on a healthy instance and used to dominate the + agent-facing `agnes diagnose` output. + +- `info` severity entries appear in the response but never promote the + overall status to `degraded` (only `warning` does) or `unhealthy` + (only `error` does). This lets the BQ billing-equals-data check stay + visible without falsely tripping the headline. + +The `bq_config == info` assertion is in test_diagnose_billing.py; here +we cover the schema gate and a synthetic `info`-doesn't-promote case. +""" + +from __future__ import annotations + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def test_schema_check_omitted_by_default(seeded_app): + """Default response does not include `db_schema` (issue #204).""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.get("/api/health/detailed", headers=_auth(token)) + assert r.status_code == 200, r.text + assert "db_schema" not in r.json().get("services", {}) + + +def test_schema_check_present_when_include_schema(seeded_app): + """`?include=schema` returns the legacy entry verbatim.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.get( + "/api/health/detailed", + headers=_auth(token), + params={"include": "schema"}, + ) + assert r.status_code == 200, r.text + services = r.json().get("services", {}) + assert "db_schema" in services + # Healthy seeded test app must report ok against the current schema. + assert services["db_schema"].get("db_schema") == "ok", services["db_schema"] + + +def test_unrecognised_include_token_is_ignored(seeded_app): + """Unknown include tokens don't error or surface; forward-compatible.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.get( + "/api/health/detailed", + headers=_auth(token), + params={"include": "schema,bogus"}, + ) + assert r.status_code == 200, r.text + services = r.json().get("services", {}) + assert "db_schema" in services + assert "bogus" not in services + + +def test_info_severity_does_not_promote_overall(seeded_app, monkeypatch): + """Issue #178: a service returning `status: info` must NOT push the + headline to `degraded`. Only `warning`+ does that. + + We synthesize an `info` entry by patching `_check_session_pipeline` + (any of the lazy checks would do) so we exercise the aggregator + without depending on a particular check's natural state. + """ + import app.api.health as health_mod + + def _fake_session_pipeline(_conn): + return {"status": "info", "detail": "synthetic info entry"} + + monkeypatch.setattr( + health_mod, "_check_session_pipeline", _fake_session_pipeline + ) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.get("/api/health/detailed", headers=_auth(token)) + assert r.status_code == 200, r.text + body = r.json() + assert body["services"]["session_pipeline"]["status"] == "info" + # The critical assertion — info must not promote the headline. + assert body["status"] == "healthy", body["status"] diff --git a/tests/test_pull_progress.py b/tests/test_pull_progress.py index fe31c72..59ee424 100644 --- a/tests/test_pull_progress.py +++ b/tests/test_pull_progress.py @@ -121,3 +121,65 @@ def test_textual_progress_emits_at_completion( or "done" in captured.err.lower() or "complete" in captured.err.lower() ) + + +class TestProgressIntervalKnobs: + """Issue #203: cadence is configurable via env vars so non-TTY + consumers (CI runners, sub-agent watchdogs) can tighten the floor + when the default is too quiet for their dead-process detector.""" + + def _stream(self): + return io.StringIO() + + def test_default_seconds_floor_is_5s(self, monkeypatch): + """Default cadence is 5 s (was 30 s pre-#203).""" + monkeypatch.delenv("AGNES_PULL_PROGRESS_INTERVAL_SECONDS", raising=False) + from cli.lib.pull import _read_progress_interval_seconds + assert _read_progress_interval_seconds() == 5.0 + + def test_default_bytes_floor_is_1mib(self, monkeypatch): + """Default cadence is 1 MiB; complements the time-based floor.""" + monkeypatch.delenv("AGNES_PULL_PROGRESS_INTERVAL_BYTES", raising=False) + from cli.lib.pull import _read_progress_interval_bytes + assert _read_progress_interval_bytes() == 1024 * 1024 + + def test_seconds_env_override(self, monkeypatch): + monkeypatch.setenv("AGNES_PULL_PROGRESS_INTERVAL_SECONDS", "0.5") + from cli.lib.pull import _read_progress_interval_seconds + assert _read_progress_interval_seconds() == 0.5 + + def test_bytes_env_override(self, monkeypatch): + monkeypatch.setenv("AGNES_PULL_PROGRESS_INTERVAL_BYTES", "131072") + from cli.lib.pull import _read_progress_interval_bytes + assert _read_progress_interval_bytes() == 131072 + + def test_invalid_envs_fall_back_to_default(self, monkeypatch): + """Garbage input doesn't break the pull — fall back to defaults.""" + monkeypatch.setenv("AGNES_PULL_PROGRESS_INTERVAL_SECONDS", "nope") + monkeypatch.setenv("AGNES_PULL_PROGRESS_INTERVAL_BYTES", "-1") + from cli.lib.pull import ( + _read_progress_interval_bytes, + _read_progress_interval_seconds, + ) + assert _read_progress_interval_seconds() == 5.0 + assert _read_progress_interval_bytes() == 1024 * 1024 + + def test_byte_floor_emits_more_often_than_pct_threshold(self, monkeypatch): + """A 100 MB file with 1 MiB byte cadence should emit far more + than 10 progress lines (the 10%-of-total cadence alone would). + This was the operator complaint in #203: on multi-GB parquets + the 30 s / 10 % policy produced one line every ~3 minutes.""" + monkeypatch.setenv("AGNES_PULL_PROGRESS_INTERVAL_SECONDS", "9999") + monkeypatch.setenv("AGNES_PULL_PROGRESS_INTERVAL_BYTES", "1048576") + from cli.lib.pull import _TextualProgress + sink = self._stream() + total = 100 * 1024 * 1024 # 100 MiB + prog = _TextualProgress( + stream=sink, total_files=1, file_sizes={"tbl": total} + ) + chunk = 64 * 1024 # 64 KiB chunks → 1600 advances + for _ in range(total // chunk): + prog.advance("tbl", chunk) + prog.finish() + emitted = sink.getvalue().count("\n") + assert emitted >= 50, f"only {emitted} lines emitted; cadence too coarse" diff --git a/tests/test_telegram_bot_runner.py b/tests/test_telegram_bot_runner.py new file mode 100644 index 0000000..14092b9 --- /dev/null +++ b/tests/test_telegram_bot_runner.py @@ -0,0 +1,60 @@ +"""Tests for `services.telegram_bot.runner` username validation. + +Issue #84: the runner shells out via `sudo -u `. Without an +input gate, a username controlled by an attacker (via tampering with +the linked-users JSON, or via an upstream caller that doesn't validate) +could carry sudo flags or shell metacharacters. Every value flowing +into `subprocess.run([..., "-u", username, ...])` must match a +POSIX-conservative shape; bad shapes are refused before the subprocess +fires. +""" + +from unittest.mock import patch + +from services.telegram_bot.runner import _USERNAME_RE, run_user_script + + +def test_username_regex_accepts_normal_usernames(): + for u in ("alice", "bob42", "data_ops", "svc-agnes", "_system"): + assert _USERNAME_RE.match(u), u + + +def test_username_regex_rejects_obvious_attacks(): + bad = [ + "-u", # sudo flag + "--shell=/bin/bash", # GNU long flag + "alice; rm -rf /", # shell metachar + "alice && id", + "alice|cat /etc/shadow", + "alice$IFS", + "1starts_with_digit", + "alice/with/slash", + "alice with space", + "", # empty + "a" * 33, # too long + ] + for u in bad: + assert not _USERNAME_RE.match(u), u + + +def test_run_user_script_refuses_bad_username_without_subprocess(): + """If validation refuses the username, subprocess.run must not fire. + + Pre-fix, a tampered telegram_users.json with `username = "-u root"` + would have sudo'd as root via flag injection. The fix has the runner + short-circuit to None before any subprocess call. + """ + with patch("services.telegram_bot.runner.subprocess.run") as run_mock: + result = run_user_script("-u", "ok_script.py") + assert result is None + run_mock.assert_not_called() + + +def test_run_user_script_refuses_bad_script_name_without_subprocess(): + """Existing guard at L24 rejects non-.py scripts; verify it still does + after the new username gate so a valid username + bad script combo + doesn't slip through and run.""" + with patch("services.telegram_bot.runner.subprocess.run") as run_mock: + result = run_user_script("alice", "not_python.sh") + assert result is None + run_mock.assert_not_called() diff --git a/tests/test_telegram_storage.py b/tests/test_telegram_storage.py index accf308..1e02d9e 100644 --- a/tests/test_telegram_storage.py +++ b/tests/test_telegram_storage.py @@ -113,3 +113,29 @@ class TestVerificationCodes: result = verify_code("123456") assert result is None + + def test_code_uses_csprng_not_random_module(self, storage_paths): + """Issue #84: pairing-code RNG must not be derivable from + `random.seed`. Pre-fix the generator used `random.choices`, which + means an attacker who scrapes one code can recover the PRNG state + and predict subsequent codes issued in the same process. The fix + switched to `secrets.choice` (CSPRNG-backed); seeding the `random` + module must therefore have no effect on the produced codes. + """ + import random as _random + from services.telegram_bot.storage import _generate_code + + _random.seed(42) + first = _generate_code() + _random.seed(42) + second = _generate_code() + # If the generator still used `random`, seed(42) would force two + # identical sequences. With `secrets`, they're independent draws + # from the OS CSPRNG and equal only by astronomical coincidence + # — well below any reasonable test flake threshold for a + # length-CODE_LENGTH digit string (1 in 10**CODE_LENGTH). + assert first != second, ( + f"Generator appears to use seedable PRNG (got identical " + f"codes {first!r} after re-seeding); fix #84 may have " + f"regressed." + )