fix(query): code-review fixes — outer LIMIT wrap, dollar-quoting, parse-error fallback
Address code-reviewer findings on the bigquery_query() rewrite path:
1. Outer LIMIT wrap — bigquery_query() materialises BQ result into DuckDB
before fetchmany sees it (vs ATTACH-catalog Storage Read API streaming).
A user 'SELECT *' against a billion-row remote table would buffer the
entire result before request.limit applied. Wrap rewritten SQL in an
outer 'LIMIT N+1' so the cap pushes into the BQ job itself.
2. Dollar-quoted inner SQL — naive replace("'", "''") doubling missed
DuckDB backslash-escape sequences (\\, \\n, \\t, …). A predicate
like 'WHERE name = ''O\\'Brien''' was unsafe under the doubling
path. DuckDB $bqq_inner$ … $bqq_inner$ form takes the inner SQL
verbatim with no escapes whatsoever. Falls back to legacy doubling
if user SQL improbably contains the literal tag.
3. Parse-error fallback — when the rewritten path fails with a BQ-side
parse / validation error (DuckDB-only syntax like ::INT cast that
survives identifier rewrite but BQ refuses), retry the user's
original SQL via the legacy ATTACH-catalog path so the request still
succeeds. Mirrors the existing dry-run fallback contract.
4. CHANGELOG — delete duplicate CLI bullets that landed under
already-released [0.38.1] (file corruption from merge — entries are
correctly under [0.39.0]).
This commit is contained in:
parent
3b9f6b447d
commit
8e56d45c68
3 changed files with 233 additions and 52 deletions
31
CHANGELOG.md
31
CHANGELOG.md
|
|
@ -128,37 +128,6 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
|
|||
The dashboard-served setup payload (`app/web/setup_instructions.py`)
|
||||
already branches between the two automatically based on platform; the
|
||||
doc snippet now matches that behavior for manual flows.
|
||||
- **`agnes pull` chunked download for large parquets**: when the server
|
||||
advertises `accept-ranges: bytes` and a parquet exceeds
|
||||
`AGNES_PULL_CHUNK_THRESHOLD_BYTES` (default 50 MB), the CLI now splits
|
||||
the file into N parallel HTTP Range requests
|
||||
(`AGNES_PULL_CHUNK_PARALLELISM`, default 4, capped 1..16) and assembles
|
||||
the parts into the destination atomically. Targets the per-flow-shaped
|
||||
network (corp VPN with per-TCP-connection rate-limiting) where a single
|
||||
stream is throttled but N parallel streams over the same connection
|
||||
scale roughly linearly. Falls back to single-stream when the server
|
||||
responds 200 instead of 206 to a Range probe, when no
|
||||
`accept-ranges: bytes` is advertised, or when content is below the
|
||||
threshold — no behavior change in the small-file / non-cooperating-
|
||||
server cases.
|
||||
- **Persistent HTTP/2 client across `agnes pull`**: `stream_download` now
|
||||
routes through a process-wide pooled `httpx.Client` so N parquet
|
||||
downloads share a single TLS handshake; HTTP/2 multiplexing
|
||||
(when the optional `h2` package is installed) lets all chunk Range
|
||||
requests share one TCP connection. Gracefully falls back to HTTP/1.1
|
||||
pooling when `h2` is missing — no crash, just slightly less benefit.
|
||||
|
||||
### Added
|
||||
- New optional dependency `h2>=4.1.0` (HTTP/2 transport for httpx). Pure
|
||||
performance — `agnes pull` works on HTTP/1.1 if the install skips it.
|
||||
- **Textual progress fallback for non-TTY `agnes pull`**: when stderr is
|
||||
not a terminal (Claude Code SessionStart hook, CI runner, Docker log
|
||||
capture, …), `agnes pull --no-quiet` now emits a plain-text progress
|
||||
line per file at most every 10% or 30 s, plus a final completion line.
|
||||
Replaces the previous Rich-bar-on-pipe behavior that either suppressed
|
||||
output entirely or leaked ANSI escape sequences. TTY path unchanged
|
||||
(Rich progress bar with bytes / speed / ETA, aggregated per-file
|
||||
across chunked-download chunks).
|
||||
|
||||
## [0.38.0] — 2026-05-06
|
||||
|
||||
|
|
|
|||
105
app/api/query.py
105
app/api/query.py
|
|
@ -31,6 +31,37 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
router = APIRouter(prefix="/api/query", tags=["query"])
|
||||
|
||||
|
||||
# Heuristic: did the BQ-side execution of a `bigquery_query()`-rewritten
|
||||
# query reject the inner SQL? Errors we want to fall back on are upstream
|
||||
# parse / validation errors — DuckDB-only syntax that survives identifier
|
||||
# rewrite (`::INT` casts, `STRPTIME`, COALESCE arity differences) is
|
||||
# accepted by the wrapping DuckDB layer but BQ refuses with a parse
|
||||
# message that DuckDB surfaces back as a BinderException / RuntimeError
|
||||
# with the BQ message embedded. Forbidden / quota / network errors should
|
||||
# NOT trigger fallback (they would just fail again on the legacy path).
|
||||
_BQ_REWRITE_PARSE_ERROR_PATTERNS = (
|
||||
"Syntax error",
|
||||
"syntax error",
|
||||
"Unrecognized name",
|
||||
"unrecognized name",
|
||||
"Function not found",
|
||||
"function not found",
|
||||
"No matching signature",
|
||||
"Invalid cast",
|
||||
"invalid cast",
|
||||
)
|
||||
|
||||
|
||||
def _looks_like_bq_rewrite_parse_error(exc: BaseException) -> bool:
|
||||
"""Return True when ``exc`` is the BQ-rejected-inner-SQL flavour we
|
||||
want to fall back from. Conservative: matches against the exception
|
||||
message text only, no isinstance checks, so it works whether the
|
||||
DuckDB BQ extension wrapped the error as BinderException, IOException,
|
||||
or a plain Python Exception."""
|
||||
msg = str(exc)
|
||||
return any(pat in msg for pat in _BQ_REWRITE_PARSE_ERROR_PATTERNS)
|
||||
|
||||
# Issue #160 §4.3.1 — direct `bq.<dataset>.<source_table>` references in user
|
||||
# SQL. Catalog token accepts both `bq` (the unquoted DuckDB-style name) and
|
||||
# `"bq"` (quoted identifier). DuckDB resolves both to the same ATTACHed
|
||||
|
|
@ -204,9 +235,22 @@ def execute_query(
|
|||
request.sql, conn,
|
||||
)
|
||||
if did_rewrite:
|
||||
# Memory-safety: ``bigquery_query()`` materialises the entire
|
||||
# BQ result into DuckDB before fetchmany sees it (vs the
|
||||
# ATTACH-catalog Storage Read API path, which streams rows
|
||||
# lazily). Wrap the rewritten SQL in an outer ``LIMIT N+1``
|
||||
# so a `SELECT *` against a billion-row remote table doesn't
|
||||
# buffer the full table into the worker process — the cap
|
||||
# is pushed into the BQ job itself. Aliased subquery so the
|
||||
# outer LIMIT applies to the final rewritten result.
|
||||
execution_sql = (
|
||||
f"SELECT * FROM ({execution_sql}) AS _bqq_outer "
|
||||
f"LIMIT {request.limit + 1}"
|
||||
)
|
||||
logger.info(
|
||||
"query_rewrite_to_bigquery_query: user_id=%s — wrapped "
|
||||
"SQL in bigquery_query() for BQ predicate pushdown",
|
||||
"SQL in bigquery_query() with outer LIMIT for BQ "
|
||||
"predicate pushdown",
|
||||
user_id,
|
||||
)
|
||||
else:
|
||||
|
|
@ -216,8 +260,27 @@ def execute_query(
|
|||
user_id,
|
||||
)
|
||||
|
||||
# Open in read-only mode for extra safety
|
||||
result = analytics.execute(execution_sql).fetchmany(request.limit + 1)
|
||||
# Open in read-only mode for extra safety. If the rewritten
|
||||
# path errors (e.g. user SQL contained DuckDB-only syntax —
|
||||
# ``::INT`` casts, ``STRPTIME``, COALESCE arity differences —
|
||||
# that survives identifier rewrite but BQ refuses), fall back
|
||||
# to the original SQL via the legacy ATTACH-catalog path so
|
||||
# the request still succeeds (slower, but correct). Same
|
||||
# safety contract as the dry-run fallback in
|
||||
# ``_bq_quota_and_cap_guard``.
|
||||
try:
|
||||
result = analytics.execute(execution_sql).fetchmany(request.limit + 1)
|
||||
except Exception as exc:
|
||||
if did_rewrite and _looks_like_bq_rewrite_parse_error(exc):
|
||||
logger.warning(
|
||||
"query_rewrite_fallback: user_id=%s — bigquery_query() "
|
||||
"rewrite rejected by BQ (%s); retrying via "
|
||||
"ATTACH-catalog path",
|
||||
user_id, type(exc).__name__,
|
||||
)
|
||||
result = analytics.execute(request.sql).fetchmany(request.limit + 1)
|
||||
else:
|
||||
raise
|
||||
columns = [desc[0] for desc in analytics.description] if analytics.description else []
|
||||
truncated = len(result) > request.limit
|
||||
rows = result[:request.limit]
|
||||
|
|
@ -683,20 +746,28 @@ def _rewrite_user_sql_for_bigquery_query(
|
|||
# backticked paths resolve to the same project.
|
||||
inner_sql = _rewrite_bq_table_refs_to_native(user_sql, name_lookups, project)
|
||||
|
||||
# SQL string literal escaping: BQ accepts standard SQL doubled-quote
|
||||
# escaping inside single-quoted strings. We pass `inner_sql` as a
|
||||
# parameter to DuckDB's prepared statement at execute-time
|
||||
# (analytics.execute(sql, [project, inner_sql]) shape) — but the
|
||||
# current handler runs ``analytics.execute(rewritten_sql)`` with no
|
||||
# params, so we MUST embed the inner SQL safely. Use parameterised
|
||||
# form: emit ``bigquery_query(?, ?)`` and signal the params via a
|
||||
# sentinel? No — the handler treats SQL as opaque. Instead, embed
|
||||
# using single-quote doubling (standard SQL escape; both DuckDB and
|
||||
# BQ honor it).
|
||||
escaped_inner = inner_sql.replace("'", "''")
|
||||
rewritten = (
|
||||
f"SELECT * FROM bigquery_query('{project}', '{escaped_inner}')"
|
||||
)
|
||||
# Embed the inner SQL using DuckDB's dollar-quoted string literal form
|
||||
# (`$tag$ ... $tag$`). Naive `replace("'", "''")` doubling misses
|
||||
# backslash-escape sequences DuckDB's lexer recognises (`\\`, `\n`,
|
||||
# `\t`, …) — a predicate like `WHERE name = 'O\'Brien'` is unsafe
|
||||
# under doubling. Dollar-quoting takes the inner SQL verbatim with no
|
||||
# escape sequences whatsoever, so the user's exact bytes reach BQ.
|
||||
# Tag is a fixed conventional value; the absurdly unlikely collision
|
||||
# (user SQL containing the literal `$bqq_inner$`) falls back to the
|
||||
# legacy doubling path so the rewrite still proceeds — over-doubled
|
||||
# quotes are at worst a parse error caught by the handler's fallback
|
||||
# at the call site, not a silent bad result.
|
||||
DOLLAR_TAG = "$bqq_inner$"
|
||||
if DOLLAR_TAG in inner_sql:
|
||||
escaped_inner = inner_sql.replace("'", "''")
|
||||
rewritten = (
|
||||
f"SELECT * FROM bigquery_query('{project}', '{escaped_inner}')"
|
||||
)
|
||||
else:
|
||||
rewritten = (
|
||||
f"SELECT * FROM bigquery_query('{project}', "
|
||||
f"{DOLLAR_TAG}{inner_sql}{DOLLAR_TAG})"
|
||||
)
|
||||
return rewritten, True
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -107,10 +107,13 @@ def test_simple_select_where_against_one_bq_table_rewrites(seeded_registry, monk
|
|||
assert "test-prj" in rewritten
|
||||
# Inner SQL: bare name rewritten to backticked BQ-native path.
|
||||
assert "`test-prj.fin.ue`" in rewritten
|
||||
# WHERE predicate is preserved (single-quote-doubled for embedding
|
||||
# inside the outer string literal — standard SQL escaping that
|
||||
# both DuckDB and BQ honor).
|
||||
assert "event_date = ''2026-01-01''" in rewritten
|
||||
# Inner SQL is dollar-quoted (`$bqq_inner$ ... $bqq_inner$`), so
|
||||
# single quotes inside the WHERE predicate remain literal — no
|
||||
# doubling, no backslash escaping. Verifies the safer embedding form
|
||||
# introduced after the code review caught naive single-quote-only
|
||||
# escape doubling missing DuckDB backslash sequences.
|
||||
assert "$bqq_inner$" in rewritten
|
||||
assert "event_date = '2026-01-01'" in rewritten
|
||||
|
||||
|
||||
def test_direct_bq_path_rewrites(seeded_registry, monkeypatch):
|
||||
|
|
@ -442,3 +445,141 @@ def test_endpoint_passes_original_sql_when_no_bq_table(
|
|||
assert r.status_code == 200, r.json()
|
||||
assert captured["sql"] == user_sql
|
||||
assert "bigquery_query(" not in captured["sql"]
|
||||
|
||||
|
||||
def test_endpoint_wraps_rewritten_sql_with_outer_limit(
|
||||
seeded_app, stub_bq_for_endpoint, monkeypatch,
|
||||
):
|
||||
"""Memory-safety regression — when the rewriter fires, the handler
|
||||
MUST wrap the bigquery_query() call in an outer ``LIMIT N+1`` so a
|
||||
`SELECT *` against a billion-row remote table doesn't materialise the
|
||||
full result into the worker before fetchmany applies the cap.
|
||||
Code-review #2a fix.
|
||||
"""
|
||||
_register_bq_remote_row("ue", "fin", "ue")
|
||||
|
||||
captured = {"sql": None}
|
||||
|
||||
class _StubAnalytics:
|
||||
description = [("c0",)]
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
captured["sql"] = sql
|
||||
class _R:
|
||||
def fetchmany(self, _n):
|
||||
return []
|
||||
return _R()
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.api.query.get_analytics_db_readonly",
|
||||
lambda: _StubAnalytics(),
|
||||
raising=False,
|
||||
)
|
||||
|
||||
c = seeded_app["client"]
|
||||
token = seeded_app["admin_token"]
|
||||
r = c.post(
|
||||
"/api/query",
|
||||
json={"sql": "SELECT * FROM ue", "limit": 100},
|
||||
headers=_auth(token),
|
||||
)
|
||||
assert r.status_code == 200, r.json()
|
||||
sent = captured["sql"]
|
||||
# The bigquery_query() wrap is present, AND the whole thing is wrapped
|
||||
# again with an outer LIMIT that includes the user-requested cap +1
|
||||
# (the +1 is the existing truncation-detection pattern).
|
||||
assert "bigquery_query(" in sent
|
||||
assert "_bqq_outer" in sent
|
||||
assert "LIMIT 101" in sent # request.limit (100) + 1
|
||||
|
||||
|
||||
def test_endpoint_falls_back_to_original_sql_on_bq_parse_error(
|
||||
seeded_app, stub_bq_for_endpoint, monkeypatch,
|
||||
):
|
||||
"""When the rewritten ``bigquery_query()`` path fails with a parse-
|
||||
level error (e.g. user SQL contained DuckDB-only syntax that BQ
|
||||
can't parse), the handler MUST retry with the original SQL via the
|
||||
ATTACH-catalog path so the user request still succeeds. Code-review
|
||||
#4 fix.
|
||||
"""
|
||||
_register_bq_remote_row("ue", "fin", "ue")
|
||||
|
||||
calls = {"sqls": []}
|
||||
|
||||
class _StubAnalytics:
|
||||
description = [("c0",)]
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
calls["sqls"].append(sql)
|
||||
# First call (rewritten) raises a BQ-style parse error;
|
||||
# second call (original SQL fallback) returns rows.
|
||||
if "bigquery_query(" in sql:
|
||||
raise RuntimeError(
|
||||
"BinderException: Query execution failed: "
|
||||
"Syntax error: Unexpected token at [1:42]"
|
||||
)
|
||||
class _R:
|
||||
def fetchmany(self, _n):
|
||||
return [(1,)]
|
||||
return _R()
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.api.query.get_analytics_db_readonly",
|
||||
lambda: _StubAnalytics(),
|
||||
raising=False,
|
||||
)
|
||||
|
||||
c = seeded_app["client"]
|
||||
token = seeded_app["admin_token"]
|
||||
r = c.post(
|
||||
"/api/query",
|
||||
# DuckDB-only ::INT cast — survives identifier rewrite, BQ refuses.
|
||||
json={"sql": "SELECT (count(*))::INT FROM ue"},
|
||||
headers=_auth(token),
|
||||
)
|
||||
assert r.status_code == 200, r.json()
|
||||
# Two execute calls: 1) rewritten (raised) 2) fallback to original.
|
||||
assert len(calls["sqls"]) == 2
|
||||
assert "bigquery_query(" in calls["sqls"][0]
|
||||
assert calls["sqls"][1] == "SELECT (count(*))::INT FROM ue"
|
||||
|
||||
|
||||
def test_endpoint_does_not_fall_back_on_non_parse_errors(
|
||||
seeded_app, stub_bq_for_endpoint, monkeypatch,
|
||||
):
|
||||
"""Non-parse-error exceptions from the rewritten path (network,
|
||||
quota, forbidden, generic runtime) must propagate, NOT silently
|
||||
retry against the legacy path. Otherwise the legacy path would
|
||||
just fail again and the user sees a slow + double-failure.
|
||||
"""
|
||||
_register_bq_remote_row("ue", "fin", "ue")
|
||||
|
||||
calls = {"sqls": []}
|
||||
|
||||
class _StubAnalytics:
|
||||
description = [("c0",)]
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
calls["sqls"].append(sql)
|
||||
raise RuntimeError("Network unreachable: BQ endpoint timed out")
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.api.query.get_analytics_db_readonly",
|
||||
lambda: _StubAnalytics(),
|
||||
raising=False,
|
||||
)
|
||||
|
||||
c = seeded_app["client"]
|
||||
token = seeded_app["admin_token"]
|
||||
r = c.post(
|
||||
"/api/query",
|
||||
json={"sql": "SELECT count(*) FROM ue"},
|
||||
headers=_auth(token),
|
||||
)
|
||||
# Generic 400 from the handler's outer except — body will surface
|
||||
# the runtime error message; we just need to confirm no fallback.
|
||||
assert r.status_code in (400, 500, 502)
|
||||
assert len(calls["sqls"]) == 1, "must not retry on non-parse error"
|
||||
|
|
|
|||
Loading…
Reference in a new issue