fix(pull): re-download parquet when file missing despite matching hash
Pre-fix `agnes pull` decided what to download from sync_state hash
equality alone:
if server_hash != local_hash or tid not in local_tables or not server_hash:
to_download.append(tid)
If the recorded local hash matched server but the actual parquet had
been deleted from disk, the download was skipped. The next DuckDB
view rebuild then fails on a missing file. Repro: `rm
server/parquet/X.parquet && agnes pull` → 'Updated 0 tables', X
still missing.
Failure modes that produce hash-equal-but-file-missing:
- manual `rm` of a single parquet
- operator-side cleanup of `server/parquet/`
- two workspaces sharing one user's
`~/.config/agnes/sync_state.json` (TODO(workspace-scoped-sync-state)
in pull.py): one workspace writes its parquets, the other reads
sync_state and concludes 'I already have these'
- disk corruption / partial restore from backup
Fix: existence check runs alongside the hash compare. Missing file
forces a re-download regardless of hash equality. `parquet_dir` is
hoisted above the loop so the existence check is in scope when the
download set is built.
Tests: regression test for the hash-equal-but-missing-file case +
counterpart for the fast-path (hash-equal-and-file-present must
still skip).
This commit is contained in:
parent
103efb69f0
commit
976d0c7160
3 changed files with 134 additions and 2 deletions
|
|
@ -38,6 +38,7 @@ End-to-end clean-analyst-bootstrap rewrite. The web `/setup?role=analyst` page n
|
||||||
- `agnes snapshot create` (formerly `da fetch`) no longer materializes an empty `user/duckdb/analytics.duckdb` when run before any `agnes pull`. Friendly hint redirects to `agnes pull`.
|
- `agnes snapshot create` (formerly `da fetch`) no longer materializes an empty `user/duckdb/analytics.duckdb` when run before any `agnes pull`. Friendly hint redirects to `agnes pull`.
|
||||||
- Workspace `agnes status` reads from the canonical `server/parquet/` and `user/duckdb/analytics.duckdb` paths (was reading legacy `data/parquet/`, `data/metadata/last_sync.json`).
|
- Workspace `agnes status` reads from the canonical `server/parquet/` and `user/duckdb/analytics.duckdb` paths (was reading legacy `data/parquet/`, `data/metadata/last_sync.json`).
|
||||||
- `agnes init` and `agnes pull` errors now use the `cli/error_render.py` typed-error renderer (added in 0.32.0), so analyst-facing error UX matches the structured shape `agnes query --remote` already produces.
|
- `agnes init` and `agnes pull` errors now use the `cli/error_render.py` typed-error renderer (added in 0.32.0), so analyst-facing error UX matches the structured shape `agnes query --remote` already produces.
|
||||||
|
- **`agnes pull` now re-downloads parquets when the local file is missing, even if the recorded hash matches the server.** Pre-fix the download set was computed from `sync_state.json` hash equality alone — if the parquet had been deleted (manual `rm`, disk cleanup, a different workspace sharing the same global `~/.config/agnes/sync_state.json` writing one workspace's parquets while another reads sync_state and assumes "I already have these"), the hash-equal check would short-circuit the download and the next DuckDB view rebuild would fail on a missing file. Now the existence check on `<workspace>/server/parquet/<tid>.parquet` runs alongside the hash compare; missing file → forced re-download regardless of hash.
|
||||||
- **`agnes query --remote` no longer over-rejects narrow queries on partitioned/clustered BigQuery tables.** Closes #171. Pre-fix the `/api/query` cost guardrail dry-ran a synthetic `SELECT * FROM <table>` per registered remote-BQ row referenced by the user SQL, which forced BQ to estimate "full table scan" — column projection, predicate pushdown, and partition pruning were all ignored, producing scan-byte estimates up to ~30,000× larger than the actual query would scan. Narrow queries on big partitioned tables (the documented happy-path use case) were rejected with 400 `remote_scan_too_large` even when BQ's own dry-run reported single-digit MB. Now the guardrail rewrites the user SQL from DuckDB-flavor (bare registered names + `bq."<ds>"."<tbl>"`) to BQ-native (`` `<project>.<ds>.<tbl>` ``) and runs ONE dry-run on the EXACT user SQL — partition pruning, column projection, and predicate pushdown all engage. Cap check uses the real estimate. Fallback: if BQ rejects the rewritten SQL with `bq_bad_request` (DuckDB-only syntax that doesn't translate, e.g. `::INT` casts), the guardrail falls back to the pre-fix per-table SELECT * estimate so a non-portable query still gets bounded; non-parse errors (forbidden / upstream) propagate as 502. Helpers exported as `_rewrite_user_sql_for_bq_dry_run` (test seam).
|
- **`agnes query --remote` no longer over-rejects narrow queries on partitioned/clustered BigQuery tables.** Closes #171. Pre-fix the `/api/query` cost guardrail dry-ran a synthetic `SELECT * FROM <table>` per registered remote-BQ row referenced by the user SQL, which forced BQ to estimate "full table scan" — column projection, predicate pushdown, and partition pruning were all ignored, producing scan-byte estimates up to ~30,000× larger than the actual query would scan. Narrow queries on big partitioned tables (the documented happy-path use case) were rejected with 400 `remote_scan_too_large` even when BQ's own dry-run reported single-digit MB. Now the guardrail rewrites the user SQL from DuckDB-flavor (bare registered names + `bq."<ds>"."<tbl>"`) to BQ-native (`` `<project>.<ds>.<tbl>` ``) and runs ONE dry-run on the EXACT user SQL — partition pruning, column projection, and predicate pushdown all engage. Cap check uses the real estimate. Fallback: if BQ rejects the rewritten SQL with `bq_bad_request` (DuckDB-only syntax that doesn't translate, e.g. `::INT` casts), the guardrail falls back to the pre-fix per-table SELECT * estimate so a non-portable query still gets bounded; non-parse errors (forbidden / upstream) propagate as 502. Helpers exported as `_rewrite_user_sql_for_bq_dry_run` (test seam).
|
||||||
- **Windows: `agnes` CLI no longer crashes on cs-CZ / non-UTF-8 consoles.** Two failure modes addressed (originally reported in #172 against the pre-rename `da` CLI; ported and broadened here): (1) `agnes pull` and any other Rich-progress-bar codepath crashed with `UnicodeEncodeError` because cp1250 / cp1252 cannot encode Rich's Braille spinner glyphs — `cli/main.py` now reconfigures `sys.stdout` / `sys.stderr` to UTF-8 with `errors="replace"` at import time when `sys.platform == "win32"`. (2) `agnes skills list` and `agnes skills show` crashed with `UnicodeDecodeError` reading skill markdown that contains em-dashes / accents — every `Path.read_text()` / `Path.write_text()` / `open()` call site in `cli/` (including ones not touched by #172, since several files were renamed in the bootstrap rewrite) now passes `encoding="utf-8"` explicitly. Defensive: also covers JSON / YAML config files that were ASCII-only in practice but were one non-ASCII value away from the same failure mode.
|
- **Windows: `agnes` CLI no longer crashes on cs-CZ / non-UTF-8 consoles.** Two failure modes addressed (originally reported in #172 against the pre-rename `da` CLI; ported and broadened here): (1) `agnes pull` and any other Rich-progress-bar codepath crashed with `UnicodeEncodeError` because cp1250 / cp1252 cannot encode Rich's Braille spinner glyphs — `cli/main.py` now reconfigures `sys.stdout` / `sys.stderr` to UTF-8 with `errors="replace"` at import time when `sys.platform == "win32"`. (2) `agnes skills list` and `agnes skills show` crashed with `UnicodeDecodeError` reading skill markdown that contains em-dashes / accents — every `Path.read_text()` / `Path.write_text()` / `open()` call site in `cli/` (including ones not touched by #172, since several files were renamed in the bootstrap rewrite) now passes `encoding="utf-8"` explicitly. Defensive: also covers JSON / YAML config files that were ASCII-only in practice but were one non-ASCII value away from the same failure mode.
|
||||||
- `agnes snapshot create … --estimate` in a pre-init directory no longer leaks an httpx `ConnectError` traceback to stderr. The estimate-guard fix (3d587681) let `--estimate` reach `api_post_json`, but the existing `except V2ClientError` clause didn't catch transport-layer errors when no server was configured (defaulted to `http://localhost:8000`). Now also catches `httpx.HTTPError` and renders the friendly hint `Run \`agnes init …\` first`.
|
- `agnes snapshot create … --estimate` in a pre-init directory no longer leaks an httpx `ConnectError` traceback to stderr. The estimate-guard fix (3d587681) let `--estimate` reach `api_post_json`, but the existing `except V2ClientError` clause didn't catch transport-layer errors when no server was configured (defaulted to `http://localhost:8000`). Now also catches `httpx.HTTPError` and renders the friendly hint `Run \`agnes init …\` first`.
|
||||||
|
|
|
||||||
|
|
@ -136,15 +136,33 @@ def run_pull(
|
||||||
|
|
||||||
# 2. Compute the download set, skipping remote-mode tables (no
|
# 2. Compute the download set, skipping remote-mode tables (no
|
||||||
# parquet on the server) and unchanged hashes.
|
# parquet on the server) and unchanged hashes.
|
||||||
|
#
|
||||||
|
# The parquet-existence check is load-bearing: a stale `sync_state.json`
|
||||||
|
# entry (hash matches server) is NOT proof the file is on disk. The
|
||||||
|
# file can disappear between runs — manual rm, disk corruption, an
|
||||||
|
# operator nuking `server/parquet/` during cleanup, a different
|
||||||
|
# workspace sharing the same `~/.config/agnes/sync_state.json`
|
||||||
|
# (TODO(workspace-scoped-sync-state) below) writing one workspace's
|
||||||
|
# parquets while another reads sync_state and assumes "I already
|
||||||
|
# have these." Without the existence guard, `agnes pull` would skip
|
||||||
|
# the download and the downstream DuckDB view rebuild fails on a
|
||||||
|
# missing file. Hash-equal-but-file-missing → force re-download.
|
||||||
to_download: list[str] = []
|
to_download: list[str] = []
|
||||||
non_remote_total = 0
|
non_remote_total = 0
|
||||||
|
parquet_dir = workspace / "server" / "parquet"
|
||||||
for tid, info in server_tables.items():
|
for tid, info in server_tables.items():
|
||||||
if info.get("query_mode") == "remote":
|
if info.get("query_mode") == "remote":
|
||||||
continue
|
continue
|
||||||
non_remote_total += 1
|
non_remote_total += 1
|
||||||
local_hash = local_tables.get(tid, {}).get("hash", "")
|
local_hash = local_tables.get(tid, {}).get("hash", "")
|
||||||
server_hash = info.get("hash", "")
|
server_hash = info.get("hash", "")
|
||||||
if server_hash != local_hash or tid not in local_tables or not server_hash:
|
target = parquet_dir / f"{tid}.parquet"
|
||||||
|
if (
|
||||||
|
server_hash != local_hash
|
||||||
|
or tid not in local_tables
|
||||||
|
or not server_hash
|
||||||
|
or not target.exists()
|
||||||
|
):
|
||||||
to_download.append(tid)
|
to_download.append(tid)
|
||||||
result.parquets_total = non_remote_total
|
result.parquets_total = non_remote_total
|
||||||
|
|
||||||
|
|
@ -156,7 +174,6 @@ def run_pull(
|
||||||
|
|
||||||
# 4. Download parquets. Lazy mkdir: only create server/parquet/
|
# 4. Download parquets. Lazy mkdir: only create server/parquet/
|
||||||
# when we have at least one table to write into it.
|
# when we have at least one table to write into it.
|
||||||
parquet_dir = workspace / "server" / "parquet"
|
|
||||||
for tid in to_download:
|
for tid in to_download:
|
||||||
if not parquet_dir.exists():
|
if not parquet_dir.exists():
|
||||||
parquet_dir.mkdir(parents=True, exist_ok=True)
|
parquet_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
|
||||||
|
|
@ -96,6 +96,120 @@ def test_run_pull_with_one_table(tmp_path, monkeypatch):
|
||||||
assert result.tables_updated == 1
|
assert result.tables_updated == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_pull_redownloads_when_parquet_missing_despite_matching_hash(
|
||||||
|
tmp_path, monkeypatch,
|
||||||
|
):
|
||||||
|
"""Regression: hash-equal-but-file-missing must re-download.
|
||||||
|
|
||||||
|
Repro: analyst's `~/.config/agnes/sync_state.json` says the local
|
||||||
|
parquet is in sync with the server (hashes match), but the actual
|
||||||
|
`<workspace>/server/parquet/<tid>.parquet` file is gone — manual rm,
|
||||||
|
a different workspace sharing the same global sync_state, an
|
||||||
|
operator nuking server/parquet/, etc. Pre-fix `agnes pull` would
|
||||||
|
skip the download (hash matches) and the next DuckDB view rebuild
|
||||||
|
would fail on a missing file. Now the existence check forces a
|
||||||
|
re-download even when the hash equality says "you have this."
|
||||||
|
"""
|
||||||
|
canned_manifest = {
|
||||||
|
"tables": {"tbl1": {"hash": "abc", "rows": 0, "size_bytes": 0}}
|
||||||
|
}
|
||||||
|
canned_memory = {"mandatory": [], "approved": []}
|
||||||
|
parquet_bytes = b"PAR1" + b"\x00" * 1000 + b"PAR1"
|
||||||
|
|
||||||
|
def _api_get(path, *args, **kwargs):
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 200
|
||||||
|
if path == "/api/sync/manifest":
|
||||||
|
resp.json.return_value = canned_manifest
|
||||||
|
elif path == "/api/memory/bundle":
|
||||||
|
resp.json.return_value = canned_memory
|
||||||
|
resp.raise_for_status = lambda: None
|
||||||
|
return resp
|
||||||
|
|
||||||
|
download_calls = {"count": 0}
|
||||||
|
|
||||||
|
def _stream_download(path, target_path, progress_callback=None):
|
||||||
|
from pathlib import Path as _P
|
||||||
|
download_calls["count"] += 1
|
||||||
|
_P(target_path).write_bytes(parquet_bytes)
|
||||||
|
return len(parquet_bytes)
|
||||||
|
|
||||||
|
monkeypatch.setattr("cli.lib.pull.api_get", _api_get, raising=False)
|
||||||
|
monkeypatch.setattr("cli.lib.pull.stream_download", _stream_download, raising=False)
|
||||||
|
monkeypatch.setattr("cli.lib.pull._is_valid_parquet", lambda p: True, raising=False)
|
||||||
|
monkeypatch.setattr("cli.lib.pull._file_md5", lambda p: "abc", raising=False)
|
||||||
|
|
||||||
|
# Seed sync_state.json claiming we already have tbl1 with the matching hash —
|
||||||
|
# but DON'T put a parquet on disk. Pre-fix this combo would short-circuit
|
||||||
|
# the download.
|
||||||
|
from cli.config import save_sync_state
|
||||||
|
save_sync_state({
|
||||||
|
"tables": {"tbl1": {"hash": "abc", "rows": 0, "size_bytes": 0}},
|
||||||
|
"last_sync": "2026-01-01T00:00:00+00:00",
|
||||||
|
})
|
||||||
|
|
||||||
|
target_parquet = tmp_path / "server" / "parquet" / "tbl1.parquet"
|
||||||
|
assert not target_parquet.exists(), "fixture precondition: parquet absent"
|
||||||
|
|
||||||
|
result = run_pull(server_url="http://x", token="t", workspace=tmp_path)
|
||||||
|
|
||||||
|
assert download_calls["count"] == 1, (
|
||||||
|
"hash-equal-but-file-missing must trigger a re-download — "
|
||||||
|
f"got {download_calls['count']} download calls"
|
||||||
|
)
|
||||||
|
assert target_parquet.exists(), "parquet must be on disk after re-download"
|
||||||
|
assert result.tables_updated == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_pull_skips_download_when_hash_matches_and_file_present(
|
||||||
|
tmp_path, monkeypatch,
|
||||||
|
):
|
||||||
|
"""Counterpart: when sync_state agrees with server AND the parquet
|
||||||
|
actually exists, the download is correctly skipped — that's the
|
||||||
|
fast-path the existence check must NOT regress."""
|
||||||
|
canned_manifest = {
|
||||||
|
"tables": {"tbl1": {"hash": "abc", "rows": 0, "size_bytes": 0}}
|
||||||
|
}
|
||||||
|
canned_memory = {"mandatory": [], "approved": []}
|
||||||
|
|
||||||
|
def _api_get(path, *args, **kwargs):
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 200
|
||||||
|
if path == "/api/sync/manifest":
|
||||||
|
resp.json.return_value = canned_manifest
|
||||||
|
elif path == "/api/memory/bundle":
|
||||||
|
resp.json.return_value = canned_memory
|
||||||
|
resp.raise_for_status = lambda: None
|
||||||
|
return resp
|
||||||
|
|
||||||
|
download_calls = {"count": 0}
|
||||||
|
|
||||||
|
def _stream_download(path, target_path, progress_callback=None):
|
||||||
|
download_calls["count"] += 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
monkeypatch.setattr("cli.lib.pull.api_get", _api_get, raising=False)
|
||||||
|
monkeypatch.setattr("cli.lib.pull.stream_download", _stream_download, raising=False)
|
||||||
|
|
||||||
|
# Seed both sync_state AND the parquet on disk.
|
||||||
|
from cli.config import save_sync_state
|
||||||
|
save_sync_state({
|
||||||
|
"tables": {"tbl1": {"hash": "abc", "rows": 0, "size_bytes": 0}},
|
||||||
|
"last_sync": "2026-01-01T00:00:00+00:00",
|
||||||
|
})
|
||||||
|
parquet_dir = tmp_path / "server" / "parquet"
|
||||||
|
parquet_dir.mkdir(parents=True)
|
||||||
|
(parquet_dir / "tbl1.parquet").write_bytes(b"PAR1" + b"\x00" * 100 + b"PAR1")
|
||||||
|
|
||||||
|
result = run_pull(server_url="http://x", token="t", workspace=tmp_path)
|
||||||
|
|
||||||
|
assert download_calls["count"] == 0, (
|
||||||
|
"hash equal AND file present must skip the download — "
|
||||||
|
f"got {download_calls['count']} unwanted downloads"
|
||||||
|
)
|
||||||
|
assert result.tables_updated == 0
|
||||||
|
|
||||||
|
|
||||||
def test_run_pull_dry_run_writes_nothing(tmp_path, fake_server):
|
def test_run_pull_dry_run_writes_nothing(tmp_path, fake_server):
|
||||||
run_pull(server_url="http://x", token="t", workspace=tmp_path, dry_run=True)
|
run_pull(server_url="http://x", token="t", workspace=tmp_path, dry_run=True)
|
||||||
assert not (tmp_path / "server").exists()
|
assert not (tmp_path / "server").exists()
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue