fix: devil's advocate R1 — chunked probe, parse-error heuristic narrow, pool settings refresh, content-length sanity, multi-project skip
R1 adversarial review surfaced 5 issues, all addressed: #1 chunked download silently disabled in non-Caddy deployments (HEAD on GET-only FastAPI route returns 405). _probe_range_support now falls back to GET with Range: bytes=0-0 when HEAD fails — works against both Caddy file_server (HEAD-friendly) and dev FastAPI direct (GET-only). #2 parse-error fallback heuristic too broad — matched on Unrecognized name / Function not found / No matching signature / Invalid cast, which BQ surfaces for ordinary user-column typos. That triggered slow ATTACH-catalog retry on every typo (2× latency tax). Narrowed to just 'Syntax error' / 'syntax error' which are the genuine DuckDB-vs-BQ dialect mismatch markers. #3 apply_bq_session_settings was only run on fresh-built pool entries, not on reuse. An operator's /admin/server-config change to bq_query _timeout_ms wouldn't propagate to long-lived pooled sessions until restart. Fixed: re-apply on every pool acquire (idempotent + fail-soft). #4 content-length sanity bound — a misconfigured proxy returning a wildly inflated Content-Length would cause overlapping chunked Range requests against the actual file → corrupt assembled output (caught by manifest hash check, but only after wasted bandwidth). Cap at 100 GiB; above that, drop to single-stream. #5 rewriter assumed every BQ row resolves under the single bq.projects.data project. Bucket containing '.' suggests a project- qualified bucket (multi-project deployment); rewriter would silently target the wrong project. Conservative skip with regression test.
This commit is contained in:
parent
8e56d45c68
commit
e5645fd280
4 changed files with 193 additions and 19 deletions
|
|
@ -33,23 +33,29 @@ router = APIRouter(prefix="/api/query", tags=["query"])
|
||||||
|
|
||||||
|
|
||||||
# Heuristic: did the BQ-side execution of a `bigquery_query()`-rewritten
|
# Heuristic: did the BQ-side execution of a `bigquery_query()`-rewritten
|
||||||
# query reject the inner SQL? Errors we want to fall back on are upstream
|
# query reject the inner SQL because of a **DuckDB-vs-BQ dialect mismatch**
|
||||||
# parse / validation errors — DuckDB-only syntax that survives identifier
|
# specifically? We want to fall back ONLY on cases where the same SQL
|
||||||
# rewrite (`::INT` casts, `STRPTIME`, COALESCE arity differences) is
|
# would have worked under the legacy DuckDB ATTACH-catalog path —
|
||||||
# accepted by the wrapping DuckDB layer but BQ refuses with a parse
|
# DuckDB-only syntax (``::INT`` casts, ``STRPTIME``, COALESCE arity quirks)
|
||||||
# message that DuckDB surfaces back as a BinderException / RuntimeError
|
# that BQ's parser rejects.
|
||||||
# with the BQ message embedded. Forbidden / quota / network errors should
|
#
|
||||||
# NOT trigger fallback (they would just fail again on the legacy path).
|
# We DO NOT want to fall back on user-data errors that BQ would reject in
|
||||||
|
# either path (unknown column name, wrong function signature, invalid cast
|
||||||
|
# of literal user input). For those, the legacy ATTACH path would issue
|
||||||
|
# the same query and fail the same way — just 50-100× slower. Triggering
|
||||||
|
# fallback there is a 2× latency tax on every typo (devil's-advocate R1
|
||||||
|
# finding #2).
|
||||||
|
#
|
||||||
|
# Conservative pattern set: only ``Syntax error`` covers genuine
|
||||||
|
# parse-level dialect mismatch. ``Unrecognized name`` etc. surface for
|
||||||
|
# both bad-user-column AND DuckDB-only-name cases — the safe assumption
|
||||||
|
# is that user-column-typo is the more common case, so we don't fall
|
||||||
|
# back. If a deployment surfaces a real DuckDB-only-name regression,
|
||||||
|
# it's better caught as a BinderException with the original SQL in the
|
||||||
|
# logs than amplified via slow-path retry.
|
||||||
_BQ_REWRITE_PARSE_ERROR_PATTERNS = (
|
_BQ_REWRITE_PARSE_ERROR_PATTERNS = (
|
||||||
"Syntax error",
|
"Syntax error",
|
||||||
"syntax error",
|
"syntax error",
|
||||||
"Unrecognized name",
|
|
||||||
"unrecognized name",
|
|
||||||
"Function not found",
|
|
||||||
"function not found",
|
|
||||||
"No matching signature",
|
|
||||||
"Invalid cast",
|
|
||||||
"invalid cast",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -677,6 +683,17 @@ def _rewrite_user_sql_for_bigquery_query(
|
||||||
# surface anything user-visible.
|
# surface anything user-visible.
|
||||||
return user_sql, False
|
return user_sql, False
|
||||||
|
|
||||||
|
# Multi-project guard (devil's-advocate R1 finding #5): the rewriter
|
||||||
|
# assumes every BQ-remote table resolves under the single
|
||||||
|
# `bq.projects.data` project. The current registry schema doesn't
|
||||||
|
# store `source_project` per row, so `bucket` is the only place a
|
||||||
|
# cross-project leak could hide. A bucket containing `.` (e.g.
|
||||||
|
# `other_prj.dataset`) suggests the operator encoded a project
|
||||||
|
# prefix into the bucket name — wrapping that under our single
|
||||||
|
# project would silently target the wrong project. Conservative
|
||||||
|
# skip: any BQ row whose bucket contains `.` aborts the rewrite,
|
||||||
|
# falling through to the legacy ATTACH-catalog path which uses
|
||||||
|
# whatever resolution the operator's _remote_attach configured.
|
||||||
for r in bq_rows:
|
for r in bq_rows:
|
||||||
if (r.get("query_mode") or "") != "remote":
|
if (r.get("query_mode") or "") != "remote":
|
||||||
continue
|
continue
|
||||||
|
|
@ -685,6 +702,11 @@ def _rewrite_user_sql_for_bigquery_query(
|
||||||
name = r.get("name")
|
name = r.get("name")
|
||||||
if not (bucket and source_table and name):
|
if not (bucket and source_table and name):
|
||||||
continue
|
continue
|
||||||
|
if "." in str(bucket):
|
||||||
|
# Project-qualified bucket — can't safely wrap under our
|
||||||
|
# single-project assumption. Bail out completely so we don't
|
||||||
|
# mix rewritten and non-rewritten BQ paths in one query.
|
||||||
|
return user_sql, False
|
||||||
pattern = r'\b' + re.escape(str(name).lower()) + r'\b'
|
pattern = r'\b' + re.escape(str(name).lower()) + r'\b'
|
||||||
if re.search(pattern, sql_lower):
|
if re.search(pattern, sql_lower):
|
||||||
key = (bucket.lower(), source_table.lower())
|
key = (bucket.lower(), source_table.lower())
|
||||||
|
|
|
||||||
|
|
@ -322,14 +322,55 @@ def _probe_range_support(client: httpx.Client, path: str) -> tuple[int, bool]:
|
||||||
Never raises; transport errors during the probe are treated as
|
Never raises; transport errors during the probe are treated as
|
||||||
"no chunking, try the GET instead and let it surface the failure
|
"no chunking, try the GET instead and let it surface the failure
|
||||||
in the normal retry loop".
|
in the normal retry loop".
|
||||||
|
|
||||||
|
Probe order: HEAD first (cheap, idempotent), then GET-with-tiny-range
|
||||||
|
fallback. The HEAD path covers Caddy's `file_server` (which advertises
|
||||||
|
HEAD) and Caddy's `reverse_proxy` (which forwards HEAD upstream). The
|
||||||
|
GET-fallback covers the dev `docker compose up` deployment where
|
||||||
|
requests go straight to FastAPI's GET-only `/api/data/{tid}/download`
|
||||||
|
route — FastAPI returns **405 Method Not Allowed** to a HEAD on a
|
||||||
|
GET-only route, which without this fallback would silently disable
|
||||||
|
chunked download for every dev / non-TLS install. The GET-with-Range
|
||||||
|
probe asks for 1 byte so the server response is bounded; we discard
|
||||||
|
the body and read only the headers + status code.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
resp = client.head(path)
|
resp = client.head(path)
|
||||||
if getattr(resp, "status_code", 200) >= 400:
|
status = getattr(resp, "status_code", 200)
|
||||||
return (0, False)
|
if status < 400:
|
||||||
size = int(resp.headers.get("content-length", "0") or 0)
|
size = int(resp.headers.get("content-length", "0") or 0)
|
||||||
accepts = (resp.headers.get("accept-ranges", "").lower() == "bytes")
|
accepts = (resp.headers.get("accept-ranges", "").lower() == "bytes")
|
||||||
return (size, accepts)
|
if size > 0:
|
||||||
|
return (size, accepts)
|
||||||
|
# HEAD failed (405 from GET-only route is the common case in
|
||||||
|
# non-Caddy deployments) or returned 0-length — fall through to
|
||||||
|
# the tiny-Range GET probe.
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
with client.stream("GET", path, headers={"Range": "bytes=0-0"}) as resp:
|
||||||
|
status = getattr(resp, "status_code", 0)
|
||||||
|
if status not in (200, 206):
|
||||||
|
return (0, False)
|
||||||
|
# Drain the 1-byte body so the connection is reusable.
|
||||||
|
for _ in resp.iter_bytes():
|
||||||
|
pass
|
||||||
|
# Content-Range on a 206 response carries the total: `bytes 0-0/12345`.
|
||||||
|
# On a 200 response the server didn't honor Range — content-length is the total.
|
||||||
|
if status == 206:
|
||||||
|
cr = resp.headers.get("content-range", "")
|
||||||
|
if "/" in cr:
|
||||||
|
try:
|
||||||
|
total = int(cr.rsplit("/", 1)[1])
|
||||||
|
return (total, True)
|
||||||
|
except ValueError:
|
||||||
|
return (0, False)
|
||||||
|
return (0, False)
|
||||||
|
# status == 200 → server ignored Range; we can read content-length but
|
||||||
|
# accept-ranges is False (or missing) so the caller will not chunk.
|
||||||
|
size = int(resp.headers.get("content-length", "0") or 0)
|
||||||
|
accepts = (resp.headers.get("accept-ranges", "").lower() == "bytes")
|
||||||
|
return (size, accepts)
|
||||||
except Exception:
|
except Exception:
|
||||||
return (0, False)
|
return (0, False)
|
||||||
|
|
||||||
|
|
@ -552,6 +593,22 @@ def _stream_download_via(
|
||||||
if parallelism > 1:
|
if parallelism > 1:
|
||||||
total_size, accepts_ranges = _probe_range_support(client, path)
|
total_size, accepts_ranges = _probe_range_support(client, path)
|
||||||
|
|
||||||
|
# Sanity bound on the advertised total size (devil's-advocate R1
|
||||||
|
# finding #4): a misconfigured proxy or buggy server returning a
|
||||||
|
# wildly inflated `Content-Length` would make us split into huge
|
||||||
|
# `Range: bytes=N-M` requests; the server then clamps each to actual
|
||||||
|
# bytes available, and we end up with overlapping bytes from the
|
||||||
|
# start of the file in every part → corrupt assembled output (caught
|
||||||
|
# later by manifest hash check, but only after wasted bandwidth).
|
||||||
|
# 100 GiB is the operational ceiling for any single materialized
|
||||||
|
# parquet on a typical Agnes deployment; values above suggest a
|
||||||
|
# server / proxy bug rather than a legitimate huge file. Drop to
|
||||||
|
# single-stream (which can't be confused by overlapping chunks).
|
||||||
|
SANE_MAX_TOTAL = 100 * 1024**3 # 100 GiB
|
||||||
|
if total_size > SANE_MAX_TOTAL:
|
||||||
|
total_size = 0
|
||||||
|
accepts_ranges = False
|
||||||
|
|
||||||
use_chunked = (
|
use_chunked = (
|
||||||
parallelism > 1
|
parallelism > 1
|
||||||
and accepts_ranges
|
and accepts_ranges
|
||||||
|
|
|
||||||
|
|
@ -435,6 +435,22 @@ def _default_duckdb_session_factory(projects: BqProjects):
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
continue
|
continue
|
||||||
|
# Re-apply session settings (`bq_query_timeout_ms`, …) on
|
||||||
|
# every reuse so an operator's `/admin/server-config` change
|
||||||
|
# propagates to pooled entries without requiring container
|
||||||
|
# restart. Without this, a long-lived pool entry keeps the
|
||||||
|
# value baked in at first build forever (devil's-advocate
|
||||||
|
# R1 finding #3). `apply_bq_session_settings` is idempotent
|
||||||
|
# and fail-soft — re-running on every acquire is cheap.
|
||||||
|
try:
|
||||||
|
apply_bq_session_settings(entry)
|
||||||
|
except Exception:
|
||||||
|
# apply_bq_session_settings is documented as never
|
||||||
|
# raising for legitimate "extension doesn't recognise
|
||||||
|
# setting" cases (it only logs). Defensive guard for
|
||||||
|
# any unforeseen failure mode — keep the entry, the
|
||||||
|
# caller's actual query may still succeed.
|
||||||
|
pass
|
||||||
conn = entry
|
conn = entry
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -546,6 +546,85 @@ def test_endpoint_falls_back_to_original_sql_on_bq_parse_error(
|
||||||
assert calls["sqls"][1] == "SELECT (count(*))::INT FROM ue"
|
assert calls["sqls"][1] == "SELECT (count(*))::INT FROM ue"
|
||||||
|
|
||||||
|
|
||||||
|
def test_rewriter_skips_when_bq_row_bucket_contains_dot(
|
||||||
|
seeded_registry, monkeypatch,
|
||||||
|
):
|
||||||
|
"""Devil's-advocate R1 finding #5: a BQ row whose `bucket` contains
|
||||||
|
`.` suggests the operator encoded a project prefix in the bucket
|
||||||
|
name. Wrapping under our single-project assumption could silently
|
||||||
|
target the wrong project. Rewriter must skip in that case (fall
|
||||||
|
through to ATTACH-catalog path which respects the operator's
|
||||||
|
`_remote_attach` configuration).
|
||||||
|
"""
|
||||||
|
from app.api.query import _rewrite_user_sql_for_bigquery_query
|
||||||
|
_register_bq_remote(
|
||||||
|
seeded_registry,
|
||||||
|
table_id="bq.other-prj.dataset.ue",
|
||||||
|
name="ue",
|
||||||
|
# Project-qualified bucket — the multi-project red flag.
|
||||||
|
bucket="other-prj.dataset",
|
||||||
|
source_table="ue",
|
||||||
|
)
|
||||||
|
_set_bq_project(monkeypatch, "test-prj")
|
||||||
|
|
||||||
|
rewritten, did_rewrite = _rewrite_user_sql_for_bigquery_query(
|
||||||
|
"SELECT count(*) FROM ue",
|
||||||
|
seeded_registry,
|
||||||
|
)
|
||||||
|
# Skip — original SQL returned, no rewrite.
|
||||||
|
assert did_rewrite is False
|
||||||
|
assert rewritten == "SELECT count(*) FROM ue"
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_does_not_trigger_on_user_column_typo(
|
||||||
|
seeded_app, stub_bq_for_endpoint, monkeypatch,
|
||||||
|
):
|
||||||
|
"""Devil's-advocate R1 finding #2: previously the fallback
|
||||||
|
heuristic matched `Unrecognized name`, which BQ surfaces for both
|
||||||
|
DuckDB-only-name AND user-column-typo cases. The user-typo case
|
||||||
|
triggered re-running the original SQL through the slow ATTACH-
|
||||||
|
catalog path (90+ s) → 2× latency tax on every typo.
|
||||||
|
|
||||||
|
Post-fix: heuristic only matches `Syntax error`. A BQ-side
|
||||||
|
`Unrecognized name: bad_col` should propagate as-is, NOT trigger
|
||||||
|
a fallback retry.
|
||||||
|
"""
|
||||||
|
_register_bq_remote_row("ue", "fin", "ue")
|
||||||
|
|
||||||
|
calls = {"sqls": []}
|
||||||
|
|
||||||
|
class _StubAnalytics:
|
||||||
|
description = [("c0",)]
|
||||||
|
def execute(self, sql, *args, **kwargs):
|
||||||
|
calls["sqls"].append(sql)
|
||||||
|
raise RuntimeError(
|
||||||
|
"BinderException: Query execution failed: "
|
||||||
|
"Unrecognized name: bad_col at [1:8]"
|
||||||
|
)
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"app.api.query.get_analytics_db_readonly",
|
||||||
|
lambda: _StubAnalytics(),
|
||||||
|
raising=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
c = seeded_app["client"]
|
||||||
|
token = seeded_app["admin_token"]
|
||||||
|
r = c.post(
|
||||||
|
"/api/query",
|
||||||
|
json={"sql": "SELECT bad_col FROM ue"},
|
||||||
|
headers=_auth(token),
|
||||||
|
)
|
||||||
|
# Error propagates; fallback NOT triggered (only one execute call).
|
||||||
|
assert r.status_code in (400, 500, 502)
|
||||||
|
assert len(calls["sqls"]) == 1, (
|
||||||
|
"user column typo must NOT trigger fallback retry"
|
||||||
|
)
|
||||||
|
assert "bigquery_query(" in calls["sqls"][0]
|
||||||
|
|
||||||
|
|
||||||
def test_endpoint_does_not_fall_back_on_non_parse_errors(
|
def test_endpoint_does_not_fall_back_on_non_parse_errors(
|
||||||
seeded_app, stub_bq_for_endpoint, monkeypatch,
|
seeded_app, stub_bq_for_endpoint, monkeypatch,
|
||||||
):
|
):
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue