fix: devil's advocate R2 — narrow shared-client try, PID tmp suffix, Syntax error anchor

R2 adversarial review surfaced 3 issues, all addressed:

#1 cli/client.py:572-577 outer try/except wrapped both _get_shared_client()
AND the actual download. A 401/403/404/5xx from the server triggered a
full second download attempt with a fresh client — wasted bandwidth on
hard failures, no fail-fast on revoked PAT. Narrowed the try to only
the shared-client construction; the download itself is no longer
retried under the fallback except.

#2 concurrent agnes pull invocations (e.g. SessionStart hook + manual
run) collided on bare <target>.tmp / <target>.partN paths — one process's
in-progress write got yanked by the other's cleanup, manifest hash
check then failed spuriously. Per-process suffix (<target>.{pid}.tmp,
<target>.{pid}.partN) makes intermediate files disjoint; the final
os.replace to the bare target is atomic so last-writer-wins.

#3 _looks_like_bq_rewrite_parse_error patterns 'Syntax error' could
false-positive on a query like WHERE log_msg = 'Syntax error in foo'
that fails for an unrelated reason (quota, network) and has the
literal substring echoed in the error text. Anchored to 'Syntax error: '
(with trailing colon) — BQ always emits the colon in this error
format, user SQL string literals normally don't.
This commit is contained in:
ZdenekSrotyr 2026-05-06 13:57:29 +02:00
parent e5645fd280
commit aee585fac6
2 changed files with 41 additions and 15 deletions

View file

@ -46,16 +46,23 @@ router = APIRouter(prefix="/api/query", tags=["query"])
# fallback there is a 2× latency tax on every typo (devil's-advocate R1 # fallback there is a 2× latency tax on every typo (devil's-advocate R1
# finding #2). # finding #2).
# #
# Conservative pattern set: only ``Syntax error`` covers genuine # Conservative pattern set: only the BQ-emitted ``Syntax error: <detail>``
# parse-level dialect mismatch. ``Unrecognized name`` etc. surface for # (with trailing colon) covers genuine parse-level dialect mismatch.
# both bad-user-column AND DuckDB-only-name cases — the safe assumption # ``Unrecognized name`` etc. surface for both bad-user-column AND
# is that user-column-typo is the more common case, so we don't fall # DuckDB-only-name cases — the safe assumption is that user-column-typo
# back. If a deployment surfaces a real DuckDB-only-name regression, # is the more common case, so we don't fall back. If a deployment
# it's better caught as a BinderException with the original SQL in the # surfaces a real DuckDB-only-name regression, it's better caught as
# logs than amplified via slow-path retry. # a BinderException with the original SQL in the logs than amplified
# via slow-path retry.
#
# The trailing colon (devil's-advocate R2 finding #3) anchors the match
# against BQ's verbatim error format and avoids false positives where
# the literal substring `Syntax error` appears in a user's SQL string
# literal that DuckDB then echoes back in an unrelated error message
# (e.g. `WHERE log_msg = 'Syntax error in foo'` failing on quota).
_BQ_REWRITE_PARSE_ERROR_PATTERNS = ( _BQ_REWRITE_PARSE_ERROR_PATTERNS = (
"Syntax error", "Syntax error: ",
"syntax error", "syntax error: ",
) )

View file

@ -425,7 +425,17 @@ def _download_chunked(
caller's `<target>.tmp` and renamed atomically. caller's `<target>.tmp` and renamed atomically.
""" """
target = Path(target_path) target = Path(target_path)
tmp_path = Path(f"{target_path}.tmp") # Per-process tmp + part suffixes (devil's-advocate R2 finding #2):
# if two `agnes pull` invocations target the same parquet
# concurrently (e.g. SessionStart hook + manual run, or two
# terminals), bare `<target>.tmp` and `<target>.partN` paths would
# collide — one process's part-write yanks the other's in-progress
# write, manifest hash check then fails spuriously. Including PID
# in the suffix makes each invocation's intermediate files
# disjoint; the final `os.replace` to the bare target is atomic so
# last-writer-wins, both processes succeed individually.
pid = os.getpid()
tmp_path = Path(f"{target_path}.{pid}.tmp")
parallelism = max(1, parallelism) parallelism = max(1, parallelism)
# Build chunks — last chunk takes the remainder. # Build chunks — last chunk takes the remainder.
chunk_size = total_size // parallelism chunk_size = total_size // parallelism
@ -438,8 +448,8 @@ def _download_chunked(
end = (start + chunk_size - 1) if i < parallelism - 1 else (total_size - 1) end = (start + chunk_size - 1) if i < parallelism - 1 else (total_size - 1)
ranges.append((i, start, end)) ranges.append((i, start, end))
part_paths = [Path(f"{target_path}.part{i}") for i, _, _ in ranges] part_paths = [Path(f"{target_path}.{pid}.part{i}") for i, _, _ in ranges]
# Pre-clean any leftovers from a prior run. # Pre-clean any leftovers from a prior run of THIS process.
for p in part_paths: for p in part_paths:
p.unlink(missing_ok=True) p.unlink(missing_ok=True)
@ -510,7 +520,11 @@ def _download_single_stream(
) -> int: ) -> int:
"""Original single-stream path with retry. Used when chunking is """Original single-stream path with retry. Used when chunking is
disabled (small file, no range support, or fallback after 200-on-Range).""" disabled (small file, no range support, or fallback after 200-on-Range)."""
tmp_path = Path(f"{target_path}.tmp") # Per-process tmp suffix — same rationale as `_download_chunked`
# (devil's-advocate R2 finding #2): concurrent `agnes pull`
# invocations against the same target dir must not yank each
# other's in-progress writes.
tmp_path = Path(f"{target_path}.{os.getpid()}.tmp")
last_exc: Optional[Exception] = None last_exc: Optional[Exception] = None
for attempt in range(_RETRY_ATTEMPTS + 1): for attempt in range(_RETRY_ATTEMPTS + 1):
try: try:
@ -568,13 +582,18 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int:
# handshake amortized across N stream_download calls within the same # handshake amortized across N stream_download calls within the same
# process, and HTTP/2 stream multiplexing across the chunk Range # process, and HTTP/2 stream multiplexing across the chunk Range
# requests within a single download. Falls back to a fresh per-call # requests within a single download. Falls back to a fresh per-call
# client if shared-client construction fails for any reason. # client if shared-client construction fails (e.g. `h2` install
# broken at runtime). Devil's-advocate R2 finding #1: scope the
# try/except to *only* the shared-client construction — the actual
# download must NOT be retried under this except, otherwise hard
# failures (401/403/404/5xx) waste a full second download attempt
# and revoked-PAT cases don't fail-fast.
try: try:
client = _get_shared_client() client = _get_shared_client()
return _stream_download_via(client, path, target_path, progress_callback)
except Exception: except Exception:
with get_client(timeout=300.0) as client: with get_client(timeout=300.0) as client:
return _stream_download_via(client, path, target_path, progress_callback) return _stream_download_via(client, path, target_path, progress_callback)
return _stream_download_via(client, path, target_path, progress_callback)
def _stream_download_via( def _stream_download_via(