diff --git a/CHANGELOG.md b/CHANGELOG.md index 41a4863..4b5d694 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -128,37 +128,6 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C The dashboard-served setup payload (`app/web/setup_instructions.py`) already branches between the two automatically based on platform; the doc snippet now matches that behavior for manual flows. -- **`agnes pull` chunked download for large parquets**: when the server - advertises `accept-ranges: bytes` and a parquet exceeds - `AGNES_PULL_CHUNK_THRESHOLD_BYTES` (default 50 MB), the CLI now splits - the file into N parallel HTTP Range requests - (`AGNES_PULL_CHUNK_PARALLELISM`, default 4, capped 1..16) and assembles - the parts into the destination atomically. Targets the per-flow-shaped - network (corp VPN with per-TCP-connection rate-limiting) where a single - stream is throttled but N parallel streams over the same connection - scale roughly linearly. Falls back to single-stream when the server - responds 200 instead of 206 to a Range probe, when no - `accept-ranges: bytes` is advertised, or when content is below the - threshold — no behavior change in the small-file / non-cooperating- - server cases. -- **Persistent HTTP/2 client across `agnes pull`**: `stream_download` now - routes through a process-wide pooled `httpx.Client` so N parquet - downloads share a single TLS handshake; HTTP/2 multiplexing - (when the optional `h2` package is installed) lets all chunk Range - requests share one TCP connection. Gracefully falls back to HTTP/1.1 - pooling when `h2` is missing — no crash, just slightly less benefit. - -### Added -- New optional dependency `h2>=4.1.0` (HTTP/2 transport for httpx). Pure - performance — `agnes pull` works on HTTP/1.1 if the install skips it. -- **Textual progress fallback for non-TTY `agnes pull`**: when stderr is - not a terminal (Claude Code SessionStart hook, CI runner, Docker log - capture, …), `agnes pull --no-quiet` now emits a plain-text progress - line per file at most every 10% or 30 s, plus a final completion line. - Replaces the previous Rich-bar-on-pipe behavior that either suppressed - output entirely or leaked ANSI escape sequences. TTY path unchanged - (Rich progress bar with bytes / speed / ETA, aggregated per-file - across chunked-download chunks). ## [0.38.0] — 2026-05-06 diff --git a/app/api/query.py b/app/api/query.py index 473bd39..c5ec7a2 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -31,6 +31,37 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/query", tags=["query"]) + +# Heuristic: did the BQ-side execution of a `bigquery_query()`-rewritten +# query reject the inner SQL? Errors we want to fall back on are upstream +# parse / validation errors — DuckDB-only syntax that survives identifier +# rewrite (`::INT` casts, `STRPTIME`, COALESCE arity differences) is +# accepted by the wrapping DuckDB layer but BQ refuses with a parse +# message that DuckDB surfaces back as a BinderException / RuntimeError +# with the BQ message embedded. Forbidden / quota / network errors should +# NOT trigger fallback (they would just fail again on the legacy path). +_BQ_REWRITE_PARSE_ERROR_PATTERNS = ( + "Syntax error", + "syntax error", + "Unrecognized name", + "unrecognized name", + "Function not found", + "function not found", + "No matching signature", + "Invalid cast", + "invalid cast", +) + + +def _looks_like_bq_rewrite_parse_error(exc: BaseException) -> bool: + """Return True when ``exc`` is the BQ-rejected-inner-SQL flavour we + want to fall back from. Conservative: matches against the exception + message text only, no isinstance checks, so it works whether the + DuckDB BQ extension wrapped the error as BinderException, IOException, + or a plain Python Exception.""" + msg = str(exc) + return any(pat in msg for pat in _BQ_REWRITE_PARSE_ERROR_PATTERNS) + # Issue #160 §4.3.1 — direct `bq..` references in user # SQL. Catalog token accepts both `bq` (the unquoted DuckDB-style name) and # `"bq"` (quoted identifier). DuckDB resolves both to the same ATTACHed @@ -204,9 +235,22 @@ def execute_query( request.sql, conn, ) if did_rewrite: + # Memory-safety: ``bigquery_query()`` materialises the entire + # BQ result into DuckDB before fetchmany sees it (vs the + # ATTACH-catalog Storage Read API path, which streams rows + # lazily). Wrap the rewritten SQL in an outer ``LIMIT N+1`` + # so a `SELECT *` against a billion-row remote table doesn't + # buffer the full table into the worker process — the cap + # is pushed into the BQ job itself. Aliased subquery so the + # outer LIMIT applies to the final rewritten result. + execution_sql = ( + f"SELECT * FROM ({execution_sql}) AS _bqq_outer " + f"LIMIT {request.limit + 1}" + ) logger.info( "query_rewrite_to_bigquery_query: user_id=%s — wrapped " - "SQL in bigquery_query() for BQ predicate pushdown", + "SQL in bigquery_query() with outer LIMIT for BQ " + "predicate pushdown", user_id, ) else: @@ -216,8 +260,27 @@ def execute_query( user_id, ) - # Open in read-only mode for extra safety - result = analytics.execute(execution_sql).fetchmany(request.limit + 1) + # Open in read-only mode for extra safety. If the rewritten + # path errors (e.g. user SQL contained DuckDB-only syntax — + # ``::INT`` casts, ``STRPTIME``, COALESCE arity differences — + # that survives identifier rewrite but BQ refuses), fall back + # to the original SQL via the legacy ATTACH-catalog path so + # the request still succeeds (slower, but correct). Same + # safety contract as the dry-run fallback in + # ``_bq_quota_and_cap_guard``. + try: + result = analytics.execute(execution_sql).fetchmany(request.limit + 1) + except Exception as exc: + if did_rewrite and _looks_like_bq_rewrite_parse_error(exc): + logger.warning( + "query_rewrite_fallback: user_id=%s — bigquery_query() " + "rewrite rejected by BQ (%s); retrying via " + "ATTACH-catalog path", + user_id, type(exc).__name__, + ) + result = analytics.execute(request.sql).fetchmany(request.limit + 1) + else: + raise columns = [desc[0] for desc in analytics.description] if analytics.description else [] truncated = len(result) > request.limit rows = result[:request.limit] @@ -683,20 +746,28 @@ def _rewrite_user_sql_for_bigquery_query( # backticked paths resolve to the same project. inner_sql = _rewrite_bq_table_refs_to_native(user_sql, name_lookups, project) - # SQL string literal escaping: BQ accepts standard SQL doubled-quote - # escaping inside single-quoted strings. We pass `inner_sql` as a - # parameter to DuckDB's prepared statement at execute-time - # (analytics.execute(sql, [project, inner_sql]) shape) — but the - # current handler runs ``analytics.execute(rewritten_sql)`` with no - # params, so we MUST embed the inner SQL safely. Use parameterised - # form: emit ``bigquery_query(?, ?)`` and signal the params via a - # sentinel? No — the handler treats SQL as opaque. Instead, embed - # using single-quote doubling (standard SQL escape; both DuckDB and - # BQ honor it). - escaped_inner = inner_sql.replace("'", "''") - rewritten = ( - f"SELECT * FROM bigquery_query('{project}', '{escaped_inner}')" - ) + # Embed the inner SQL using DuckDB's dollar-quoted string literal form + # (`$tag$ ... $tag$`). Naive `replace("'", "''")` doubling misses + # backslash-escape sequences DuckDB's lexer recognises (`\\`, `\n`, + # `\t`, …) — a predicate like `WHERE name = 'O\'Brien'` is unsafe + # under doubling. Dollar-quoting takes the inner SQL verbatim with no + # escape sequences whatsoever, so the user's exact bytes reach BQ. + # Tag is a fixed conventional value; the absurdly unlikely collision + # (user SQL containing the literal `$bqq_inner$`) falls back to the + # legacy doubling path so the rewrite still proceeds — over-doubled + # quotes are at worst a parse error caught by the handler's fallback + # at the call site, not a silent bad result. + DOLLAR_TAG = "$bqq_inner$" + if DOLLAR_TAG in inner_sql: + escaped_inner = inner_sql.replace("'", "''") + rewritten = ( + f"SELECT * FROM bigquery_query('{project}', '{escaped_inner}')" + ) + else: + rewritten = ( + f"SELECT * FROM bigquery_query('{project}', " + f"{DOLLAR_TAG}{inner_sql}{DOLLAR_TAG})" + ) return rewritten, True diff --git a/tests/test_query_remote_rewrite.py b/tests/test_query_remote_rewrite.py index 7d78a3e..9147a32 100644 --- a/tests/test_query_remote_rewrite.py +++ b/tests/test_query_remote_rewrite.py @@ -107,10 +107,13 @@ def test_simple_select_where_against_one_bq_table_rewrites(seeded_registry, monk assert "test-prj" in rewritten # Inner SQL: bare name rewritten to backticked BQ-native path. assert "`test-prj.fin.ue`" in rewritten - # WHERE predicate is preserved (single-quote-doubled for embedding - # inside the outer string literal — standard SQL escaping that - # both DuckDB and BQ honor). - assert "event_date = ''2026-01-01''" in rewritten + # Inner SQL is dollar-quoted (`$bqq_inner$ ... $bqq_inner$`), so + # single quotes inside the WHERE predicate remain literal — no + # doubling, no backslash escaping. Verifies the safer embedding form + # introduced after the code review caught naive single-quote-only + # escape doubling missing DuckDB backslash sequences. + assert "$bqq_inner$" in rewritten + assert "event_date = '2026-01-01'" in rewritten def test_direct_bq_path_rewrites(seeded_registry, monkeypatch): @@ -442,3 +445,141 @@ def test_endpoint_passes_original_sql_when_no_bq_table( assert r.status_code == 200, r.json() assert captured["sql"] == user_sql assert "bigquery_query(" not in captured["sql"] + + +def test_endpoint_wraps_rewritten_sql_with_outer_limit( + seeded_app, stub_bq_for_endpoint, monkeypatch, +): + """Memory-safety regression — when the rewriter fires, the handler + MUST wrap the bigquery_query() call in an outer ``LIMIT N+1`` so a + `SELECT *` against a billion-row remote table doesn't materialise the + full result into the worker before fetchmany applies the cap. + Code-review #2a fix. + """ + _register_bq_remote_row("ue", "fin", "ue") + + captured = {"sql": None} + + class _StubAnalytics: + description = [("c0",)] + def execute(self, sql, *args, **kwargs): + captured["sql"] = sql + class _R: + def fetchmany(self, _n): + return [] + return _R() + def close(self): + pass + + monkeypatch.setattr( + "app.api.query.get_analytics_db_readonly", + lambda: _StubAnalytics(), + raising=False, + ) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT * FROM ue", "limit": 100}, + headers=_auth(token), + ) + assert r.status_code == 200, r.json() + sent = captured["sql"] + # The bigquery_query() wrap is present, AND the whole thing is wrapped + # again with an outer LIMIT that includes the user-requested cap +1 + # (the +1 is the existing truncation-detection pattern). + assert "bigquery_query(" in sent + assert "_bqq_outer" in sent + assert "LIMIT 101" in sent # request.limit (100) + 1 + + +def test_endpoint_falls_back_to_original_sql_on_bq_parse_error( + seeded_app, stub_bq_for_endpoint, monkeypatch, +): + """When the rewritten ``bigquery_query()`` path fails with a parse- + level error (e.g. user SQL contained DuckDB-only syntax that BQ + can't parse), the handler MUST retry with the original SQL via the + ATTACH-catalog path so the user request still succeeds. Code-review + #4 fix. + """ + _register_bq_remote_row("ue", "fin", "ue") + + calls = {"sqls": []} + + class _StubAnalytics: + description = [("c0",)] + def execute(self, sql, *args, **kwargs): + calls["sqls"].append(sql) + # First call (rewritten) raises a BQ-style parse error; + # second call (original SQL fallback) returns rows. + if "bigquery_query(" in sql: + raise RuntimeError( + "BinderException: Query execution failed: " + "Syntax error: Unexpected token at [1:42]" + ) + class _R: + def fetchmany(self, _n): + return [(1,)] + return _R() + def close(self): + pass + + monkeypatch.setattr( + "app.api.query.get_analytics_db_readonly", + lambda: _StubAnalytics(), + raising=False, + ) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + # DuckDB-only ::INT cast — survives identifier rewrite, BQ refuses. + json={"sql": "SELECT (count(*))::INT FROM ue"}, + headers=_auth(token), + ) + assert r.status_code == 200, r.json() + # Two execute calls: 1) rewritten (raised) 2) fallback to original. + assert len(calls["sqls"]) == 2 + assert "bigquery_query(" in calls["sqls"][0] + assert calls["sqls"][1] == "SELECT (count(*))::INT FROM ue" + + +def test_endpoint_does_not_fall_back_on_non_parse_errors( + seeded_app, stub_bq_for_endpoint, monkeypatch, +): + """Non-parse-error exceptions from the rewritten path (network, + quota, forbidden, generic runtime) must propagate, NOT silently + retry against the legacy path. Otherwise the legacy path would + just fail again and the user sees a slow + double-failure. + """ + _register_bq_remote_row("ue", "fin", "ue") + + calls = {"sqls": []} + + class _StubAnalytics: + description = [("c0",)] + def execute(self, sql, *args, **kwargs): + calls["sqls"].append(sql) + raise RuntimeError("Network unreachable: BQ endpoint timed out") + def close(self): + pass + + monkeypatch.setattr( + "app.api.query.get_analytics_db_readonly", + lambda: _StubAnalytics(), + raising=False, + ) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT count(*) FROM ue"}, + headers=_auth(token), + ) + # Generic 400 from the handler's outer except — body will surface + # the runtime error message; we just need to confirm no fallback. + assert r.status_code in (400, 500, 502) + assert len(calls["sqls"]) == 1, "must not retry on non-parse error"