feat: add --register-bq and --stdin to da query for hybrid BQ+local queries
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
86bbb8fce4
commit
d605e7d95f
2 changed files with 101 additions and 4 deletions
|
|
@ -2,21 +2,59 @@
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
import typer
|
import typer
|
||||||
|
|
||||||
|
|
||||||
def query_command(
|
def query_command(
|
||||||
sql: str = typer.Argument(..., help="SQL query to execute"),
|
sql: Optional[str] = typer.Argument(None, help="SQL query to execute (positional)"),
|
||||||
|
sql_opt: Optional[str] = typer.Option(None, "--sql", help="SQL query to execute (named option)"),
|
||||||
remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"),
|
remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"),
|
||||||
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"),
|
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"),
|
||||||
limit: int = typer.Option(1000, "--limit", help="Max rows to return"),
|
limit: int = typer.Option(1000, "--limit", help="Max rows to return"),
|
||||||
|
register_bq: Optional[List[str]] = typer.Option(
|
||||||
|
None,
|
||||||
|
"--register-bq",
|
||||||
|
help="Register a BigQuery result as a DuckDB view. Format: alias=BQ_SQL. Can be repeated.",
|
||||||
|
),
|
||||||
|
stdin: bool = typer.Option(False, "--stdin", help="Read SQL from stdin as JSON {\"sql\": \"...\"}"),
|
||||||
):
|
):
|
||||||
"""Execute SQL query against DuckDB."""
|
"""Execute SQL query against DuckDB."""
|
||||||
if remote:
|
# Resolve SQL from exactly one of: positional, --sql, or --stdin
|
||||||
_query_remote(sql, fmt, limit)
|
sources_provided = sum([
|
||||||
|
sql is not None,
|
||||||
|
sql_opt is not None,
|
||||||
|
stdin,
|
||||||
|
])
|
||||||
|
if sources_provided == 0:
|
||||||
|
typer.echo("Error: provide SQL as a positional argument, --sql option, or --stdin flag.", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
if sources_provided > 1:
|
||||||
|
typer.echo("Error: only one of positional SQL, --sql, or --stdin may be used at a time.", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
if stdin:
|
||||||
|
raw = sys.stdin.read()
|
||||||
|
try:
|
||||||
|
payload = json.loads(raw)
|
||||||
|
resolved_sql = payload["sql"]
|
||||||
|
except (json.JSONDecodeError, KeyError) as exc:
|
||||||
|
typer.echo(f"Error: failed to parse stdin JSON: {exc}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
elif sql_opt is not None:
|
||||||
|
resolved_sql = sql_opt
|
||||||
else:
|
else:
|
||||||
_query_local(sql, fmt, limit)
|
resolved_sql = sql
|
||||||
|
|
||||||
|
if register_bq:
|
||||||
|
_query_hybrid(resolved_sql, fmt, limit, register_bq)
|
||||||
|
elif remote:
|
||||||
|
_query_remote(resolved_sql, fmt, limit)
|
||||||
|
else:
|
||||||
|
_query_local(resolved_sql, fmt, limit)
|
||||||
|
|
||||||
|
|
||||||
def _query_local(sql: str, fmt: str, limit: int):
|
def _query_local(sql: str, fmt: str, limit: int):
|
||||||
|
|
@ -56,6 +94,58 @@ def _query_remote(sql: str, fmt: str, limit: int):
|
||||||
typer.echo(f"(truncated at {limit} rows)", err=True)
|
typer.echo(f"(truncated at {limit} rows)", err=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _query_hybrid(sql: str, fmt: str, limit: int, register_bq_specs: List[str]):
|
||||||
|
"""Run a hybrid query: register BigQuery results as DuckDB views, then execute locally."""
|
||||||
|
import duckdb
|
||||||
|
from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config
|
||||||
|
|
||||||
|
local_dir = Path(os.environ.get("DA_LOCAL_DIR", "."))
|
||||||
|
db_path = local_dir / "user" / "duckdb" / "analytics.duckdb"
|
||||||
|
if not db_path.exists():
|
||||||
|
typer.echo("Local DuckDB not found. Run: da sync", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
conn = duckdb.connect(str(db_path))
|
||||||
|
try:
|
||||||
|
config = load_config()
|
||||||
|
engine = RemoteQueryEngine(conn, **{k: v for k, v in config.items() if k in (
|
||||||
|
"max_bq_registration_rows", "max_memory_mb", "max_result_rows", "timeout_seconds"
|
||||||
|
)})
|
||||||
|
|
||||||
|
for spec in register_bq_specs:
|
||||||
|
if "=" not in spec:
|
||||||
|
typer.echo(
|
||||||
|
f"Error: --register-bq spec must be 'alias=BQ_SQL', got: {spec!r}",
|
||||||
|
err=True,
|
||||||
|
)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
alias, bq_sql = spec.split("=", 1)
|
||||||
|
alias = alias.strip()
|
||||||
|
bq_sql = bq_sql.strip()
|
||||||
|
try:
|
||||||
|
info = engine.register_bq(alias, bq_sql)
|
||||||
|
typer.echo(
|
||||||
|
f"Registered BQ alias '{alias}': {info['rows']:,} rows, "
|
||||||
|
f"{info['memory_mb']:.1f} MiB",
|
||||||
|
err=True,
|
||||||
|
)
|
||||||
|
except RemoteQueryError as exc:
|
||||||
|
typer.echo(f"BQ registration failed for '{alias}': {exc}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = engine.execute(sql)
|
||||||
|
except RemoteQueryError as exc:
|
||||||
|
typer.echo(f"Query error: {exc}", err=True)
|
||||||
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
_output(result["columns"], result["rows"], fmt)
|
||||||
|
if result.get("truncated"):
|
||||||
|
typer.echo(f"(truncated at {result['row_count']} rows)", err=True)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def _output(columns: list, rows: list, fmt: str):
|
def _output(columns: list, rows: list, fmt: str):
|
||||||
if fmt == "json":
|
if fmt == "json":
|
||||||
output = [dict(zip(columns, row)) for row in rows]
|
output = [dict(zip(columns, row)) for row in rows]
|
||||||
|
|
|
||||||
|
|
@ -238,6 +238,13 @@ class TestAdminCommands:
|
||||||
assert result.exit_code == 1
|
assert result.exit_code == 1
|
||||||
|
|
||||||
|
|
||||||
|
class TestQueryHybrid:
|
||||||
|
def test_register_bq_flag_help(self):
|
||||||
|
result = runner.invoke(app, ["query", "--help"])
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "register-bq" in result.output
|
||||||
|
|
||||||
|
|
||||||
class TestMetricsHelp:
|
class TestMetricsHelp:
|
||||||
def test_metrics_help(self):
|
def test_metrics_help(self):
|
||||||
result = runner.invoke(app, ["metrics", "--help"])
|
result = runner.invoke(app, ["metrics", "--help"])
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue