From d605e7d95f9498739b27946ef98e0159978e42f3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:09:11 +0200 Subject: [PATCH] feat: add --register-bq and --stdin to da query for hybrid BQ+local queries Co-Authored-By: Claude Sonnet 4.6 --- cli/commands/query.py | 98 +++++++++++++++++++++++++++++++++++++++++-- tests/test_cli.py | 7 ++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/cli/commands/query.py b/cli/commands/query.py index 461d68f..022c6c5 100644 --- a/cli/commands/query.py +++ b/cli/commands/query.py @@ -2,21 +2,59 @@ import json import os +import sys from pathlib import Path +from typing import List, Optional import typer + def query_command( - sql: str = typer.Argument(..., help="SQL query to execute"), + sql: Optional[str] = typer.Argument(None, help="SQL query to execute (positional)"), + sql_opt: Optional[str] = typer.Option(None, "--sql", help="SQL query to execute (named option)"), remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"), fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"), limit: int = typer.Option(1000, "--limit", help="Max rows to return"), + register_bq: Optional[List[str]] = typer.Option( + None, + "--register-bq", + help="Register a BigQuery result as a DuckDB view. Format: alias=BQ_SQL. Can be repeated.", + ), + stdin: bool = typer.Option(False, "--stdin", help="Read SQL from stdin as JSON {\"sql\": \"...\"}"), ): """Execute SQL query against DuckDB.""" - if remote: - _query_remote(sql, fmt, limit) + # Resolve SQL from exactly one of: positional, --sql, or --stdin + sources_provided = sum([ + sql is not None, + sql_opt is not None, + stdin, + ]) + if sources_provided == 0: + typer.echo("Error: provide SQL as a positional argument, --sql option, or --stdin flag.", err=True) + raise typer.Exit(1) + if sources_provided > 1: + typer.echo("Error: only one of positional SQL, --sql, or --stdin may be used at a time.", err=True) + raise typer.Exit(1) + + if stdin: + raw = sys.stdin.read() + try: + payload = json.loads(raw) + resolved_sql = payload["sql"] + except (json.JSONDecodeError, KeyError) as exc: + typer.echo(f"Error: failed to parse stdin JSON: {exc}", err=True) + raise typer.Exit(1) + elif sql_opt is not None: + resolved_sql = sql_opt else: - _query_local(sql, fmt, limit) + resolved_sql = sql + + if register_bq: + _query_hybrid(resolved_sql, fmt, limit, register_bq) + elif remote: + _query_remote(resolved_sql, fmt, limit) + else: + _query_local(resolved_sql, fmt, limit) def _query_local(sql: str, fmt: str, limit: int): @@ -56,6 +94,58 @@ def _query_remote(sql: str, fmt: str, limit: int): typer.echo(f"(truncated at {limit} rows)", err=True) +def _query_hybrid(sql: str, fmt: str, limit: int, register_bq_specs: List[str]): + """Run a hybrid query: register BigQuery results as DuckDB views, then execute locally.""" + import duckdb + from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config + + local_dir = Path(os.environ.get("DA_LOCAL_DIR", ".")) + db_path = local_dir / "user" / "duckdb" / "analytics.duckdb" + if not db_path.exists(): + typer.echo("Local DuckDB not found. Run: da sync", err=True) + raise typer.Exit(1) + + conn = duckdb.connect(str(db_path)) + try: + config = load_config() + engine = RemoteQueryEngine(conn, **{k: v for k, v in config.items() if k in ( + "max_bq_registration_rows", "max_memory_mb", "max_result_rows", "timeout_seconds" + )}) + + for spec in register_bq_specs: + if "=" not in spec: + typer.echo( + f"Error: --register-bq spec must be 'alias=BQ_SQL', got: {spec!r}", + err=True, + ) + raise typer.Exit(1) + alias, bq_sql = spec.split("=", 1) + alias = alias.strip() + bq_sql = bq_sql.strip() + try: + info = engine.register_bq(alias, bq_sql) + typer.echo( + f"Registered BQ alias '{alias}': {info['rows']:,} rows, " + f"{info['memory_mb']:.1f} MiB", + err=True, + ) + except RemoteQueryError as exc: + typer.echo(f"BQ registration failed for '{alias}': {exc}", err=True) + raise typer.Exit(1) + + try: + result = engine.execute(sql) + except RemoteQueryError as exc: + typer.echo(f"Query error: {exc}", err=True) + raise typer.Exit(1) + + _output(result["columns"], result["rows"], fmt) + if result.get("truncated"): + typer.echo(f"(truncated at {result['row_count']} rows)", err=True) + finally: + conn.close() + + def _output(columns: list, rows: list, fmt: str): if fmt == "json": output = [dict(zip(columns, row)) for row in rows] diff --git a/tests/test_cli.py b/tests/test_cli.py index 9393f56..74c3db1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -238,6 +238,13 @@ class TestAdminCommands: assert result.exit_code == 1 +class TestQueryHybrid: + def test_register_bq_flag_help(self): + result = runner.invoke(app, ["query", "--help"]) + assert result.exit_code == 0 + assert "register-bq" in result.output + + class TestMetricsHelp: def test_metrics_help(self): result = runner.invoke(app, ["metrics", "--help"])