diff --git a/app/api/query.py b/app/api/query.py index 121cceb..207c27f 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -46,16 +46,23 @@ router = APIRouter(prefix="/api/query", tags=["query"]) # fallback there is a 2× latency tax on every typo (devil's-advocate R1 # finding #2). # -# Conservative pattern set: only ``Syntax error`` covers genuine -# parse-level dialect mismatch. ``Unrecognized name`` etc. surface for -# both bad-user-column AND DuckDB-only-name cases — the safe assumption -# is that user-column-typo is the more common case, so we don't fall -# back. If a deployment surfaces a real DuckDB-only-name regression, -# it's better caught as a BinderException with the original SQL in the -# logs than amplified via slow-path retry. +# Conservative pattern set: only the BQ-emitted ``Syntax error: `` +# (with trailing colon) covers genuine parse-level dialect mismatch. +# ``Unrecognized name`` etc. surface for both bad-user-column AND +# DuckDB-only-name cases — the safe assumption is that user-column-typo +# is the more common case, so we don't fall back. If a deployment +# surfaces a real DuckDB-only-name regression, it's better caught as +# a BinderException with the original SQL in the logs than amplified +# via slow-path retry. +# +# The trailing colon (devil's-advocate R2 finding #3) anchors the match +# against BQ's verbatim error format and avoids false positives where +# the literal substring `Syntax error` appears in a user's SQL string +# literal that DuckDB then echoes back in an unrelated error message +# (e.g. `WHERE log_msg = 'Syntax error in foo'` failing on quota). _BQ_REWRITE_PARSE_ERROR_PATTERNS = ( - "Syntax error", - "syntax error", + "Syntax error: ", + "syntax error: ", ) diff --git a/cli/client.py b/cli/client.py index d791e8c..257efa7 100644 --- a/cli/client.py +++ b/cli/client.py @@ -425,7 +425,17 @@ def _download_chunked( caller's `.tmp` and renamed atomically. """ target = Path(target_path) - tmp_path = Path(f"{target_path}.tmp") + # Per-process tmp + part suffixes (devil's-advocate R2 finding #2): + # if two `agnes pull` invocations target the same parquet + # concurrently (e.g. SessionStart hook + manual run, or two + # terminals), bare `.tmp` and `.partN` paths would + # collide — one process's part-write yanks the other's in-progress + # write, manifest hash check then fails spuriously. Including PID + # in the suffix makes each invocation's intermediate files + # disjoint; the final `os.replace` to the bare target is atomic so + # last-writer-wins, both processes succeed individually. + pid = os.getpid() + tmp_path = Path(f"{target_path}.{pid}.tmp") parallelism = max(1, parallelism) # Build chunks — last chunk takes the remainder. chunk_size = total_size // parallelism @@ -438,8 +448,8 @@ def _download_chunked( end = (start + chunk_size - 1) if i < parallelism - 1 else (total_size - 1) ranges.append((i, start, end)) - part_paths = [Path(f"{target_path}.part{i}") for i, _, _ in ranges] - # Pre-clean any leftovers from a prior run. + part_paths = [Path(f"{target_path}.{pid}.part{i}") for i, _, _ in ranges] + # Pre-clean any leftovers from a prior run of THIS process. for p in part_paths: p.unlink(missing_ok=True) @@ -510,7 +520,11 @@ def _download_single_stream( ) -> int: """Original single-stream path with retry. Used when chunking is disabled (small file, no range support, or fallback after 200-on-Range).""" - tmp_path = Path(f"{target_path}.tmp") + # Per-process tmp suffix — same rationale as `_download_chunked` + # (devil's-advocate R2 finding #2): concurrent `agnes pull` + # invocations against the same target dir must not yank each + # other's in-progress writes. + tmp_path = Path(f"{target_path}.{os.getpid()}.tmp") last_exc: Optional[Exception] = None for attempt in range(_RETRY_ATTEMPTS + 1): try: @@ -568,13 +582,18 @@ def stream_download(path: str, target_path: str, progress_callback=None) -> int: # handshake amortized across N stream_download calls within the same # process, and HTTP/2 stream multiplexing across the chunk Range # requests within a single download. Falls back to a fresh per-call - # client if shared-client construction fails for any reason. + # client if shared-client construction fails (e.g. `h2` install + # broken at runtime). Devil's-advocate R2 finding #1: scope the + # try/except to *only* the shared-client construction — the actual + # download must NOT be retried under this except, otherwise hard + # failures (401/403/404/5xx) waste a full second download attempt + # and revoked-PAT cases don't fail-fast. try: client = _get_shared_client() - return _stream_download_via(client, path, target_path, progress_callback) except Exception: with get_client(timeout=300.0) as client: return _stream_download_via(client, path, target_path, progress_callback) + return _stream_download_via(client, path, target_path, progress_callback) def _stream_download_via(