agnes-the-ai-analyst/cli/commands/query.py
ZdenekSrotyr 57482be263 feat(cli): #160 shared structured error renderer for BQ-typed responses
The reporter (#160) saw `USER_PROJECT_DENIED` raw in the CLI because
all three CLI error-rendering paths flatten typed BqAccessError /
guardrail / RBAC dicts to a truncated single-line string, hiding the
structured `hint` field that explains how to fix the misconfig.

Fix: shared `cli/error_render.py:render_error(status_code, body)` that
recognizes the canonical typed shapes and pretty-prints them. Falls
back to truncated-and-flattened form for unrecognized bodies, so the
renderer never makes worse-than-status-quo output.

Recognized shapes:
- {detail: {kind: ..., hint?, billing_project?, data_project?}}
  — typed BqAccessError responses from /api/v2/scan, /sample, /schema,
  /api/query (when /api/query escalates a BQ failure)
- {detail: {reason: 'remote_scan_too_large', scan_bytes, limit_bytes,
  tables, suggestion}} — new /api/query cost-guardrail rejection
- {detail: {reason: 'bq_path_not_registered'/'bq_path_access_denied',
  path, hint?, registered_as?}} — new /api/query RBAC patch
- {detail: '...'} — string detail (legacy endpoints)

Wired through 3 CLI paths:
- cli/v2_client.py: V2ClientError.__str__ delegates to render_error;
  pre-truncation removed from V2ClientError.message (was hiding hints
  past 200 chars).
- cli/commands/query.py:_query_remote: parse JSON body, call renderer
  on error.
- cli/commands/query.py:_query_hybrid: catch RemoteQueryError, build
  synthetic `{detail: {kind: error_type, **details}}` payload, render.

tests/test_cli_query.py:test_remote_query_failure: assertion updated
from `"Query failed"` (no longer printed) to `HTTP 400` + `bad SQL`
(what the renderer surfaces for string detail).

Sample output for cross_project_forbidden:

  Error: cross_project_forbidden (HTTP 502)
    billing_project: (empty)
    data_project: prj-example-data-001
    message: USER_PROJECT_DENIED on bigquery.googleapis.com
    hint: Set data_source.bigquery.billing_project in
        /admin/server-config to a project where the SA has
        serviceusage.services.use, or grant the SA that role on the
        data project.

19 tests pass — 10 from T4a now GREEN + 3 prior cli_query tests still
green + 6 ancillary.
2026-05-04 10:31:35 +02:00

199 lines
7.4 KiB
Python

"""Query commands — da query."""
import json
import os
import sys
from pathlib import Path
from typing import List, Optional
import typer
def query_command(
sql: Optional[str] = typer.Argument(None, help="SQL query to execute (positional)"),
sql_opt: Optional[str] = typer.Option(None, "--sql", help="SQL query to execute (named option)"),
remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"),
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"),
limit: int = typer.Option(1000, "--limit", help="Max rows to return"),
register_bq: Optional[List[str]] = typer.Option(
None,
"--register-bq",
help="Register a BigQuery result as a DuckDB view. Format: alias=BQ_SQL. Can be repeated.",
),
stdin: bool = typer.Option(False, "--stdin", help="Read SQL from stdin as JSON {\"sql\": \"...\"}"),
):
"""Execute SQL query against DuckDB."""
# Resolve SQL from exactly one of: positional, --sql, or --stdin
sources_provided = sum([
sql is not None,
sql_opt is not None,
stdin,
])
if sources_provided == 0:
typer.echo("Error: provide SQL as a positional argument, --sql option, or --stdin flag.", err=True)
raise typer.Exit(1)
if sources_provided > 1:
typer.echo("Error: only one of positional SQL, --sql, or --stdin may be used at a time.", err=True)
raise typer.Exit(1)
if stdin:
raw = sys.stdin.read()
try:
payload = json.loads(raw)
resolved_sql = payload["sql"]
# Extract register_bq from stdin JSON
stdin_bq = payload.get("register_bq", {})
if stdin_bq and isinstance(stdin_bq, dict):
register_bq = [f"{k}={v}" for k, v in stdin_bq.items()]
except (json.JSONDecodeError, KeyError) as exc:
typer.echo(f"Error: failed to parse stdin JSON: {exc}", err=True)
raise typer.Exit(1)
elif sql_opt is not None:
resolved_sql = sql_opt
else:
resolved_sql = sql
if register_bq:
_query_hybrid(resolved_sql, fmt, limit, register_bq)
elif remote:
_query_remote(resolved_sql, fmt, limit)
else:
_query_local(resolved_sql, fmt, limit)
def _query_local(sql: str, fmt: str, limit: int):
"""Run query against local DuckDB."""
import duckdb
local_dir = Path(os.environ.get("DA_LOCAL_DIR", "."))
db_path = local_dir / "user" / "duckdb" / "analytics.duckdb"
if not db_path.exists():
typer.echo("Local DuckDB not found. Run: da sync", err=True)
raise typer.Exit(1)
conn = duckdb.connect(str(db_path), read_only=True)
try:
result = conn.execute(sql).fetchmany(limit)
columns = [desc[0] for desc in conn.description] if conn.description else []
_output(columns, result, fmt)
except Exception as e:
typer.echo(f"Query error: {e}", err=True)
raise typer.Exit(1)
finally:
conn.close()
def _query_remote(sql: str, fmt: str, limit: int):
"""Run query against server DuckDB via API."""
from cli.client import api_post
from cli.error_render import render_error
resp = api_post("/api/query", json={"sql": sql, "limit": limit})
if resp.status_code != 200:
# Parse JSON body if possible, fall back to text. The shared
# renderer pretty-prints typed BQ errors (cross_project_forbidden,
# remote_scan_too_large, bq_path_not_registered) instead of
# flattening the structured detail to a single truncated line.
try:
body = resp.json()
except Exception:
body = resp.text
typer.echo(render_error(resp.status_code, body), err=True)
raise typer.Exit(1)
data = resp.json()
_output(data["columns"], data["rows"], fmt)
if data.get("truncated"):
typer.echo(f"(truncated at {limit} rows)", err=True)
def _query_hybrid(sql: str, fmt: str, limit: int, register_bq_specs: List[str]):
"""Run a hybrid query: register BigQuery results as DuckDB views, then execute locally."""
import duckdb
from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config
local_dir = Path(os.environ.get("DA_LOCAL_DIR", "."))
db_path = local_dir / "user" / "duckdb" / "analytics.duckdb"
if not db_path.exists():
typer.echo("Local DuckDB not found. Run: da sync", err=True)
raise typer.Exit(1)
conn = duckdb.connect(str(db_path), read_only=True)
try:
config = load_config()
engine_kwargs = {k: v for k, v in config.items() if k in (
"max_bq_registration_rows", "max_memory_mb", "max_result_rows", "timeout_seconds"
)}
# CLI --limit flag overrides config max_result_rows
engine_kwargs["max_result_rows"] = limit
engine = RemoteQueryEngine(conn, **engine_kwargs)
for spec in register_bq_specs:
if "=" not in spec:
typer.echo(
f"Error: --register-bq spec must be 'alias=BQ_SQL', got: {spec!r}",
err=True,
)
raise typer.Exit(1)
alias, bq_sql = spec.split("=", 1)
alias = alias.strip()
bq_sql = bq_sql.strip()
try:
info = engine.register_bq(alias, bq_sql)
typer.echo(
f"Registered BQ alias '{alias}': {info['rows']:,} rows, "
f"{info['memory_mb']:.1f} MiB",
err=True,
)
except RemoteQueryError as exc:
# Use the shared renderer so typed BqAccessError details
# (carried via RemoteQueryError.details) surface as a
# multi-line block with the operator-facing hint.
from cli.error_render import render_error
synthetic = {"detail": {
"kind": exc.error_type,
"alias": alias,
"message": str(exc),
**(exc.details or {}),
}}
typer.echo(render_error(400, synthetic), err=True)
raise typer.Exit(1)
try:
result = engine.execute(sql)
except RemoteQueryError as exc:
from cli.error_render import render_error
synthetic = {"detail": {
"kind": exc.error_type,
"message": str(exc),
**(exc.details or {}),
}}
typer.echo(render_error(400, synthetic), err=True)
raise typer.Exit(1)
_output(result["columns"], result["rows"], fmt)
if result.get("truncated"):
typer.echo(f"(truncated at {result['row_count']} rows)", err=True)
finally:
conn.close()
def _output(columns: list, rows: list, fmt: str):
if fmt == "json":
output = [dict(zip(columns, row)) for row in rows]
typer.echo(json.dumps(output, indent=2, default=str))
elif fmt == "csv":
typer.echo(",".join(columns))
for row in rows:
typer.echo(",".join(str(v) if v is not None else "" for v in row))
else:
# Table format using rich
from rich.console import Console
from rich.table import Table
console = Console()
table = Table()
for col in columns:
table.add_column(col)
for row in rows:
table.add_row(*(str(v) if v is not None else "" for v in row))
console.print(table)