From 8309141705ef6f1c79bdcd6431727af55d719e9f Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 4 May 2026 18:32:30 +0200 Subject: [PATCH] feat(cli): agnes snapshot create (folded from da fetch); friendly exit if no DuckDB --- cli/commands/snapshot.py | 168 +++++++++++++++++++++++++++++- tests/test_cli_snapshot_create.py | 31 ++++++ 2 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 tests/test_cli_snapshot_create.py diff --git a/cli/commands/snapshot.py b/cli/commands/snapshot.py index 89cd11d..f56d427 100644 --- a/cli/commands/snapshot.py +++ b/cli/commands/snapshot.py @@ -1,4 +1,4 @@ -"""`agnes snapshot list/refresh/drop/prune` (spec §4.2).""" +"""`agnes snapshot list/create/refresh/drop/prune` (spec §4.2).""" from __future__ import annotations import hashlib @@ -15,7 +15,7 @@ from cli.snapshot_meta import ( list_snapshots, read_meta, write_meta, delete_snapshot, snapshot_lock, SnapshotMeta, ) -from cli.v2_client import api_post_arrow, V2ClientError +from cli.v2_client import api_post_arrow, api_post_json, V2ClientError snapshot_app = typer.Typer(help="Manage local snapshots") @@ -174,6 +174,170 @@ def prune_cmd( typer.echo("(no matches)") +def _print_estimate(d: dict) -> None: + # `dict.get(k, default)` returns `default` only when k is missing; if k + # maps to None (server returns None for non-BQ tables) the default doesn't + # kick in. `or 0` covers both cases. + typer.echo(f" estimated_scan_bytes: {(d.get('estimated_scan_bytes') or 0):>15,} bytes") + typer.echo(f" estimated_result_rows: {(d.get('estimated_result_rows') or 0):>15,}") + typer.echo(f" estimated_result_bytes: {(d.get('estimated_result_bytes') or 0):>15,} bytes") + typer.echo(f" bq_cost_estimate_usd: $ {(d.get('bq_cost_estimate_usd') or 0):.4f}") + + +def _exit_code_for(e: V2ClientError) -> int: + if e.status_code == 400: + # Inspect body for 'kind' + body = e.body if isinstance(e.body, dict) else {} + if body.get("error") == "validator_rejected": + return 2 + return 2 + if e.status_code == 401: + return 7 + if e.status_code == 403: + return 8 + if e.status_code == 404: + return 8 # treat unknown table as RBAC-equivalent + if e.status_code == 429: + return 3 + if e.status_code >= 500: + return 5 + return 9 + + +@snapshot_app.command("create") +def create_cmd( + table_id: str = typer.Argument(...), + select: str = typer.Option(None, "--select", help="Comma-separated column list"), + where: str = typer.Option(None, "--where", help="WHERE predicate (BQ flavor for remote tables)"), + limit: int = typer.Option(None, "--limit"), + order_by: str = typer.Option(None, "--order-by", help="Comma-separated"), + as_name: str = typer.Option(None, "--as", help="Local snapshot name (default: )"), + estimate: bool = typer.Option(False, "--estimate", help="Run dry-run only, do not fetch"), + no_estimate: bool = typer.Option(False, "--no-estimate", help="Skip the pre-fetch estimate"), + force: bool = typer.Option(False, "--force", help="Overwrite existing snapshot of the same name"), +): + """Create a snapshot — fetch a filtered subset of a remote table locally.""" + name = as_name or table_id + # Snapshot name lands in DuckDB CREATE VIEW as a quoted identifier; a `"` + # in the name would break out and enable arbitrary SQL execution against + # the user's local analytics.duckdb. Validate up-front with the same + # regex used elsewhere for safe identifiers. + import re as _re + if not _re.match(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$", name): + typer.echo( + f"Error: snapshot name {name!r} is not a safe identifier. " + f"Use letters, digits, and underscores; must start with a letter " + f"or underscore; max 64 characters.", + err=True, + ) + raise typer.Exit(2) + + # Guard: refuse to create snapshots before `agnes pull` has bootstrapped + # the local DuckDB. Otherwise we'd open an empty DB and confuse later + # `agnes pull` runs. + local_db = _local_dir() / "user" / "duckdb" / "analytics.duckdb" + if not local_db.exists(): + typer.echo("Local DuckDB not found. Run: agnes pull first.", err=True) + raise typer.Exit(1) + + snap_dir = _local_dir() / "user" / "snapshots" + snap_dir.mkdir(parents=True, exist_ok=True) + + # Build request + req = {"table_id": table_id} + if select: + req["select"] = [c.strip() for c in select.split(",") if c.strip()] + if where: + req["where"] = where + if limit: + req["limit"] = int(limit) + if order_by: + req["order_by"] = [c.strip() for c in order_by.split(",") if c.strip()] + + # Estimate (always shown unless --no-estimate). The `--estimate` early + # exit is OUTSIDE this block — `--estimate` is a cost-safety mechanism + # ("dry-run only, do not fetch") whose guarantee must hold even when + # the user also passes `--no-estimate` (silly combo; treat as dry-run + # because the fetch-blocking semantics dominate). + est = None + if not no_estimate: + try: + est = api_post_json("/api/v2/scan/estimate", req) + except V2ClientError as e: + typer.echo(f"Error: estimate failed: {e}", err=True) + raise typer.Exit(_exit_code_for(e)) + typer.echo(f"Estimate for {table_id}:") + _print_estimate(est) + if estimate: + return + + # Cheap existence pre-check (outside the lock) so we don't waste a BQ + # scan on an obviously-redundant fetch. Authoritative re-check happens + # under the lock below — necessary because between this check and the + # write a concurrent `agnes snapshot create --as same_name` could create + # the file. + if not force and read_meta(snap_dir, name) is not None: + existing = read_meta(snap_dir, name) + typer.echo( + f"Error: snapshot {name!r} already exists " + f"(fetched {existing.fetched_at}, {existing.rows:,} rows). " + f"Pass --force to overwrite, or 'agnes snapshot refresh {name}' to update in place.", + err=True, + ) + raise typer.Exit(6) + + # Fetch + try: + table = api_post_arrow("/api/v2/scan", req) + except V2ClientError as e: + typer.echo(f"Error: fetch failed: {e}", err=True) + raise typer.Exit(_exit_code_for(e)) + + # Install under flock — re-check existence here to close the TOCTOU + # window between the early check above and this write. + parquet_path = snap_dir / f"{name}.parquet" + with snapshot_lock(snap_dir): + if not force and read_meta(snap_dir, name) is not None: + existing = read_meta(snap_dir, name) + typer.echo( + f"Error: snapshot {name!r} was created by a concurrent " + f"`agnes snapshot create` (fetched {existing.fetched_at}, " + f"{existing.rows:,} rows). Pass --force to overwrite.", + err=True, + ) + raise typer.Exit(6) + pq.write_table(table, parquet_path) + # Register view in user analytics.duckdb (already verified to exist + # above — we still pass parents=True because the directory may have + # been deleted between the guard and here in pathological cases). + local_db.parent.mkdir(parents=True, exist_ok=True) + conn = duckdb.connect(str(local_db)) + try: + safe_path = str(parquet_path).replace("'", "''") + conn.execute( + f"CREATE OR REPLACE VIEW \"{name}\" AS SELECT * FROM read_parquet('{safe_path}')" + ) + finally: + conn.close() + + # Compute hash + write meta + result_hash = hashlib.md5(parquet_path.read_bytes()[:1_000_000]).hexdigest() + now = datetime.now(timezone.utc).isoformat() + meta = SnapshotMeta( + name=name, table_id=table_id, + select=req.get("select"), where=req.get("where"), + limit=req.get("limit"), order_by=req.get("order_by"), + fetched_at=now, effective_as_of=now, + rows=int(table.num_rows), + bytes_local=parquet_path.stat().st_size, + estimated_scan_bytes_at_fetch=int(est.get("estimated_scan_bytes", 0)) if est is not None else 0, + result_hash_md5=result_hash, + ) + write_meta(snap_dir, meta) + + typer.echo(f"Fetched {table.num_rows:,} rows -> {name}") + + def _parse_duration(s: str) -> timedelta: s = s.strip().lower() if s.endswith("d"): diff --git a/tests/test_cli_snapshot_create.py b/tests/test_cli_snapshot_create.py new file mode 100644 index 0000000..194e7c8 --- /dev/null +++ b/tests/test_cli_snapshot_create.py @@ -0,0 +1,31 @@ +"""Tests for `agnes snapshot create` (folded from `da fetch`).""" + +from typer.testing import CliRunner + +from cli.commands.snapshot import snapshot_app + + +def test_snapshot_create_help(): + runner = CliRunner() + result = runner.invoke(snapshot_app, ["create", "--help"]) + assert result.exit_code == 0 + for flag in [ + "--select", + "--where", + "--limit", + "--order-by", + "--as", + "--estimate", + "--no-estimate", + "--force", + ]: + assert flag in result.output + + +def test_snapshot_create_no_duckdb_friendly_exit(tmp_path, monkeypatch): + monkeypatch.setenv("AGNES_LOCAL_DIR", str(tmp_path)) + runner = CliRunner() + result = runner.invoke(snapshot_app, ["create", "any_table", "--as", "x", "--estimate"]) + assert result.exit_code == 1 + out = result.output + (result.stderr or "") + assert "Run: agnes pull" in out