fix: devil's advocate R2 — narrow shared-client try, PID tmp suffix, Syntax error anchor
R2 adversarial review surfaced 3 issues, all addressed: #1 cli/client.py:572-577 outer try/except wrapped both _get_shared_client() AND the actual download. A 401/403/404/5xx from the server triggered a full second download attempt with a fresh client — wasted bandwidth on hard failures, no fail-fast on revoked PAT. Narrowed the try to only the shared-client construction; the download itself is no longer retried under the fallback except. #2 concurrent agnes pull invocations (e.g. SessionStart hook + manual run) collided on bare <target>.tmp / <target>.partN paths — one process's in-progress write got yanked by the other's cleanup, manifest hash check then failed spuriously. Per-process suffix (<target>.{pid}.tmp, <target>.{pid}.partN) makes intermediate files disjoint; the final os.replace to the bare target is atomic so last-writer-wins. #3 _looks_like_bq_rewrite_parse_error patterns 'Syntax error' could false-positive on a query like WHERE log_msg = 'Syntax error in foo' that fails for an unrelated reason (quota, network) and has the literal substring echoed in the error text. Anchored to 'Syntax error: ' (with trailing colon) — BQ always emits the colon in this error format, user SQL string literals normally don't.
This commit is contained in:
parent
e5645fd280
commit
aee585fac6
2 changed files with 41 additions and 15 deletions
|
|
@ -46,16 +46,23 @@ router = APIRouter(prefix="/api/query", tags=["query"])
|
||||||
# fallback there is a 2× latency tax on every typo (devil's-advocate R1
|
# fallback there is a 2× latency tax on every typo (devil's-advocate R1
|
||||||
# finding #2).
|
# finding #2).
|
||||||
#
|
#
|
||||||
# Conservative pattern set: only ``Syntax error`` covers genuine
|
# Conservative pattern set: only the BQ-emitted ``Syntax error: <detail>``
|
||||||
# parse-level dialect mismatch. ``Unrecognized name`` etc. surface for
|
# (with trailing colon) covers genuine parse-level dialect mismatch.
|
||||||
# both bad-user-column AND DuckDB-only-name cases — the safe assumption
|
# ``Unrecognized name`` etc. surface for both bad-user-column AND
|
||||||
# is that user-column-typo is the more common case, so we don't fall
|
# DuckDB-only-name cases — the safe assumption is that user-column-typo
|
||||||
# back. If a deployment surfaces a real DuckDB-only-name regression,
|
# is the more common case, so we don't fall back. If a deployment
|
||||||
# it's better caught as a BinderException with the original SQL in the
|
# surfaces a real DuckDB-only-name regression, it's better caught as
|
||||||
# logs than amplified via slow-path retry.
|
# a BinderException with the original SQL in the logs than amplified
|
||||||
|
# via slow-path retry.
|
||||||
|
#
|
||||||
|
# The trailing colon (devil's-advocate R2 finding #3) anchors the match
|
||||||
|
# against BQ's verbatim error format and avoids false positives where
|
||||||
|
# the literal substring `Syntax error` appears in a user's SQL string
|
||||||
|
# literal that DuckDB then echoes back in an unrelated error message
|
||||||
|
# (e.g. `WHERE log_msg = 'Syntax error in foo'` failing on quota).
|
||||||
_BQ_REWRITE_PARSE_ERROR_PATTERNS = (
|
_BQ_REWRITE_PARSE_ERROR_PATTERNS = (
|
||||||
"Syntax error",
|
"Syntax error: ",
|
||||||
"syntax error",
|
"syntax error: ",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -425,7 +425,17 @@ def _download_chunked(
|
||||||
caller's `<target>.tmp` and renamed atomically.
|
caller's `<target>.tmp` and renamed atomically.
|
||||||
"""
|
"""
|
||||||
target = Path(target_path)
|
target = Path(target_path)
|
||||||
tmp_path = Path(f"{target_path}.tmp")
|
# Per-process tmp + part suffixes (devil's-advocate R2 finding #2):
|
||||||
|
# if two `agnes pull` invocations target the same parquet
|
||||||
|
# concurrently (e.g. SessionStart hook + manual run, or two
|
||||||
|
# terminals), bare `<target>.tmp` and `<target>.partN` paths would
|
||||||
|
# collide — one process's part-write yanks the other's in-progress
|
||||||
|
# write, manifest hash check then fails spuriously. Including PID
|
||||||
|
# in the suffix makes each invocation's intermediate files
|
||||||
|
# disjoint; the final `os.replace` to the bare target is atomic so
|
||||||
|
# last-writer-wins, both processes succeed individually.
|
||||||
|
pid = os.getpid()
|
||||||
|
tmp_path = Path(f"{target_path}.{pid}.tmp")
|
||||||
parallelism = max(1, parallelism)
|
parallelism = max(1, parallelism)
|
||||||
# Build chunks — last chunk takes the remainder.
|
# Build chunks — last chunk takes the remainder.
|
||||||
chunk_size = total_size // parallelism
|
chunk_size = total_size // parallelism
|
||||||
|
|
@ -438,8 +448,8 @@ def _download_chunked(
|
||||||
end = (start + chunk_size - 1) if i < parallelism - 1 else (total_size - 1)
|
end = (start + chunk_size - 1) if i < parallelism - 1 else (total_size - 1)
|
||||||
ranges.append((i, start, end))
|
ranges.append((i, start, end))
|
||||||
|
|
||||||
part_paths = [Path(f"{target_path}.part{i}") for i, _, _ in ranges]
|
part_paths = [Path(f"{target_path}.{pid}.part{i}") for i, _, _ in ranges]
|
||||||
# Pre-clean any leftovers from a prior run.
|
# Pre-clean any leftovers from a prior run of THIS process.
|
||||||
for p in part_paths:
|
for p in part_paths:
|
||||||
p.unlink(missing_ok=True)
|
p.unlink(missing_ok=True)
|
||||||
|
|
||||||
|
|
@ -510,7 +520,11 @@ def _download_single_stream(
|
||||||
) -> int:
|
) -> int:
|
||||||
"""Original single-stream path with retry. Used when chunking is
|
"""Original single-stream path with retry. Used when chunking is
|
||||||
disabled (small file, no range support, or fallback after 200-on-Range)."""
|
disabled (small file, no range support, or fallback after 200-on-Range)."""
|
||||||
tmp_path = Path(f"{target_path}.tmp")
|
# Per-process tmp suffix — same rationale as `_download_chunked`
|
||||||
|
# (devil's-advocate R2 finding #2): concurrent `agnes pull`
|
||||||
|
# invocations against the same target dir must not yank each
|
||||||
|
# other's in-progress writes.
|
||||||
|
tmp_path = Path(f"{target_path}.{os.getpid()}.tmp")
|
||||||
last_exc: Optional[Exception] = None
|
last_exc: Optional[Exception] = None
|
||||||
for attempt in range(_RETRY_ATTEMPTS + 1):
|
for attempt in range(_RETRY_ATTEMPTS + 1):
|
||||||
try:
|
try:
|
||||||
|
|
@ -568,13 +582,18 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int:
|
||||||
# handshake amortized across N stream_download calls within the same
|
# handshake amortized across N stream_download calls within the same
|
||||||
# process, and HTTP/2 stream multiplexing across the chunk Range
|
# process, and HTTP/2 stream multiplexing across the chunk Range
|
||||||
# requests within a single download. Falls back to a fresh per-call
|
# requests within a single download. Falls back to a fresh per-call
|
||||||
# client if shared-client construction fails for any reason.
|
# client if shared-client construction fails (e.g. `h2` install
|
||||||
|
# broken at runtime). Devil's-advocate R2 finding #1: scope the
|
||||||
|
# try/except to *only* the shared-client construction — the actual
|
||||||
|
# download must NOT be retried under this except, otherwise hard
|
||||||
|
# failures (401/403/404/5xx) waste a full second download attempt
|
||||||
|
# and revoked-PAT cases don't fail-fast.
|
||||||
try:
|
try:
|
||||||
client = _get_shared_client()
|
client = _get_shared_client()
|
||||||
return _stream_download_via(client, path, target_path, progress_callback)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
with get_client(timeout=300.0) as client:
|
with get_client(timeout=300.0) as client:
|
||||||
return _stream_download_via(client, path, target_path, progress_callback)
|
return _stream_download_via(client, path, target_path, progress_callback)
|
||||||
|
return _stream_download_via(client, path, target_path, progress_callback)
|
||||||
|
|
||||||
|
|
||||||
def _stream_download_via(
|
def _stream_download_via(
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue