release: 0.47.0 — source-agnostic catalog metadata + cache discipline (#223)

## Summary

- Catalog enrichment for `query_mode='remote'` rows: `rows`, `size_bytes`, `partition_by`, `clustered_by` per table (BQ + Keboola providers).
- `/api/v2/schema/{id}` cache miss: 2 BQ jobs → 1 (-50%) via shared `fetch_bq_columns_full`.
- All four catalog/schema/sample/metadata caches flush on registry change; single-row re-warm scheduled.
- Automatic cache warmup at server startup (bounded concurrency, opt-out via `AGNES_SKIP_CACHE_WARMUP=1`).
- SSE-driven freshness toolbar on `/admin/tables` with progress bar, log, and per-row badge.
- New admin doc `docs/admin/query-modes.md` — single source of truth on `local` / `remote` / `materialized` choice.

Closes #155.
Closes #156.

## Test plan

- [x] 65+ targeted tests pass across 11 new test modules + 3 modified ones.
- [x] No DB migration; no wire-break; `MIN_COMPAT_CLI_VERSION` unchanged.
- [ ] Reviewer: register a remote BQ table via `/admin/tables`, observe the toolbar populates within ~2 s and the per-row badge transitions warming → fresh.
- [ ] Reviewer: trigger `Re-warm all`, verify SSE log scrolls and `cacheWarmupBar` progresses.
- [ ] Reviewer: edit a registered row's bucket, verify `agnes schema <id>` returns updated columns immediately (no 1-hour staleness).
- [ ] Reviewer: confirm `agnes admin register-table --query-mode remote` prints the new IAM-smoke-check hint.

## Notable design decisions

- BigQuery `INFORMATION_SCHEMA.TABLE_STORAGE` is the only valid scope for size+rows (verified live 2026-05-07; dataset-scoped doesn't exist). Region resolved from `instance.yaml.data_source.bigquery.location` → `bq.client().get_dataset(...)` → fall back to legacy `__TABLES__`.
- VIEW handling: TABLE_STORAGE returns no rows for views, fall through to `__TABLES__` (also empty) → `TableMetadata(rows=None, size_bytes=None, partition_by=..., clustered_by=...)`. Null size signals analyst Claude to apply existing CLAUDE.md guidance.
- `size_bytes` is `active_logical_bytes + long_term_logical_bytes` — full BQ scan reads both; reporting only active undercounts aged partitioned tables.
- Source-agnostic provider seam: per-source `connectors/<source>/metadata.py:fetch(MetadataRequest)`; dispatcher in `app/api/v2_catalog.py:_metadata_provider_for` lazily imports per source_type so a Keboola-only deployment doesn't pay the BQ-extension import cost.
- Warmup non-blocking: FastAPI `lifespan` schedules `asyncio.create_task(_warm_catalog_caches_bg)` before `yield`. Per-row failures isolated.

## Out of scope

- Profile / column histograms / dimension cardinality for remote tables (separate issue).
- Onboarding nudge ("you have 0 remote tables, consider registering some BQ ones") — separate UX call.
- Provider plug-in registration via entry-points (the dispatch table is a hardcoded if-tree today; one line per future source).

## Release

Bumps `pyproject.toml` 0.46.1 → 0.47.0 (main shipped 0.46.0 + 0.46.1 during this PR — see commit `d98976ec`). New CHANGELOG section under `## [0.47.0] — 2026-05-07`.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
<!-- devin-review-badge-begin -->

---

<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/223" target="_blank">
  <picture>
    <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
    <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
  </picture>
</a>
<!-- devin-review-badge-end -->
This commit is contained in:
ZdenekSrotyr 2026-05-07 18:33:55 +02:00 committed by GitHub
parent 751cc25327
commit aa5921da67
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 6997 additions and 177 deletions

View file

@ -10,6 +10,88 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [Unreleased]
## [0.47.0] — 2026-05-07
Catalog metadata enrichment + cache discipline + automatic warmup.
Closes #155 + #156.
### Added
- **`/api/v2/catalog` returns four new optional fields per row** — `rows`,
`size_bytes`, `partition_by`, `clustered_by` — populated by per-source-type
metadata providers (`connectors/bigquery/metadata.py`,
`connectors/keboola/metadata.py`). For `query_mode='remote'` BigQuery rows,
`size_bytes` is `active_logical_bytes + long_term_logical_bytes` (a full
scan reads both); region resolved from `data_source.bigquery.location`
`bq_client.get_dataset(...)` → fall back to legacy `__TABLES__`.
Existing CLI consumers reading only `rough_size_hint` are unaffected.
- **Automatic cache warmup at startup.** FastAPI startup event schedules
a background task that walks BQ remote rows and pre-populates
`_metadata_cache` + `_schema_cache` with bounded concurrency (default 4,
tunable via `AGNES_WARMUP_CONCURRENCY`). Doesn't block readiness;
per-row failures logged + skipped. Opt-out via `AGNES_SKIP_CACHE_WARMUP=1`.
- **Three new admin endpoints under `/api/admin/cache-warmup/*`:**
- `GET /status` — JSON snapshot of the latest run.
- `POST /run` — manual trigger, idempotent under concurrent invocation.
- `GET /stream` — Server-Sent Events with `start` / `row` / `complete`
events for live UI updates.
- **`/admin/tables` cache freshness panel.** Toolbar above the per-source-type
listings with progress bar + "Re-warm all" button + collapsible
terminal-style log fed by SSE (polling fallback at 3 s). Per-row badge
in the existing `col-status` column updates live (fresh / warming /
pending / error).
- **`docs/admin/query-modes.md`** — source-agnostic admin reference for
registering tables as `local` / `remote` / `materialized`. Decision
tree, per-source-type IAM + setup, three worked examples. Linked from
the `?` icon next to the `query_mode` field in the admin UI edit modal
and from the third post-register hint in `agnes admin register-table`.
- **`agnes admin register-table` post-register hint** for `query_mode=remote`:
points at `agnes query --remote "SELECT COUNT(*)..."` as the IAM smoke
check so a missing `dataViewer` / `jobUser` surfaces at registration
time, not 30 minutes later.
### Changed
- **`/api/v2/schema/{id}` cache miss now does 1 BQ job instead of 2.**
`connectors/bigquery/access.py:fetch_bq_columns_full` collapses what
used to be `_fetch_bq_schema` + `_fetch_bq_table_options` into a single
`INFORMATION_SCHEMA.COLUMNS` query (same view, same predicate, just a
combined SELECT list). The metadata provider's partition/cluster path
shares the same helper — zero SQL duplication across the two consumers.
- **All four catalog/schema/sample/metadata caches are flushed on registry
change.** `app/api/v2_catalog.py:invalidate_for_table` is wired into
`POST /api/admin/register-table`, `PUT /api/admin/registry/{id}`, and
`DELETE /api/admin/registry/{id}`. After a registry write, a single-row
re-warm task is scheduled in the background so the admin's verification
request hits warm caches within ~1 s instead of waiting for the next
analyst miss. Pre-fix none of the caches were invalidated — admin
registers a table, `agnes catalog` doesn't show the new row for up to
5 min; admin updates a row's bucket, `agnes schema` returns the OLD
column list for up to 1 hour.
- **`v2_schema.build_schema` split into RBAC-aware outer + RBAC-naive
inner (`build_schema_uncached`).** Live endpoint behavior unchanged;
warmup uses the inner entry point to populate `_schema_cache` without
a user context.
### Internal
- New shared dataclass module `app/api/_metadata_models.py` with
`MetadataRequest` (frozen) + `TableMetadata` for source-agnostic
provider input/output.
- New `connectors/keboola/storage_api.py:KeboolaStorageClient.get_table_info`
thin wrapper — keeps `_get` private to the module.
- New env vars (operator-facing tuning, no required setup change):
- `AGNES_SKIP_CACHE_WARMUP` — opt-out of startup warmup.
- `AGNES_WARMUP_CONCURRENCY` — default 4, max parallel BQ
INFORMATION_SCHEMA jobs during a warmup pass.
- New runtime dependency: `sse-starlette>=2.0` (Server-Sent Events
responses for the cache-warmup stream).
- Tests added: `test_metadata_models`, `test_v2_schema_columns_consolidation`,
`test_v2_catalog_dispatcher`, `test_connectors_bigquery_metadata`,
`test_connectors_keboola_metadata`, `test_v2_catalog_remote_metadata`,
`test_v2_catalog_invalidation`, `test_cache_warmup`,
`test_main_startup_warmup`, `test_admin_tables_warmup_ui`.
## [0.46.5] — 2026-05-07
### Fixed
@ -47,75 +129,85 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
## [0.46.0] — 2026-05-07
Keboola cutover bundle: native parquet on the materialized sync,
auto-discover protection against admin overrides, sync-routing
correctness, plus a bunch of operational paper-cuts surfaced during
a fresh deploy on a Snowflake-backed Keboola project. **BREAKING**
for Keboola operators: schema bump to v26 migrates Keboola
`query_mode='local'` rows to `materialized` (auto-migration runs on
first start; same effective behavior, different internal path —
Storage API direct via `fileType=parquet` instead of the DuckDB
extension).
Catalog metadata enrichment + cache discipline + automatic warmup.
Closes #155 + #156.
### Added
- `AGNES_TEMP_DIR` env var (default in `docker-compose.yml`: `/data/tmp`) routes per-call extractor tempdirs (Snowflake-UNLOAD slice staging, CSV→parquet intermediates) off the container's overlayfs `/tmp` onto the data volume. Boot-disk overlayfs filled to 100% on agnes-dev during a multi-GiB sliced parquet export; the dedicated data disk had 15 GiB free at the time. Helper `connectors/keboola/storage_api.py:get_temp_root` mkdirs the target on first use; unset / empty / unwritable falls back to system `/tmp` for compat with OSS users on a single-disk host.
- `POST /api/admin/discover-and-register?dry_run=true` returns the planned mutations without writing — lists `would_register`, `drift` (existing rows whose registry coordinates differ from what discovery would write), and `invalid` ids. Useful for auditing before re-running auto-discovery on a registry that's already had admin overrides applied.
- `GET /api/sync/status` returns `{"locked": bool}` — public, no auth. Consumed by the host-side `agnes-auto-upgrade.sh` cron to decide whether to defer `docker compose up -d` until the running sync finishes. Cheap (single Lock check), no sensitive data.
### Fixed
- `app/api/admin.py`: `_discover_and_register_tables` no longer overwrites admin-corrected registry rows. Two drift flavours surfaced (and skipped):
- **same_id_diff_coords** — registry has a row at the same id but different `(bucket, source_table)`; admin migrated coordinates.
- **name_collision** — discovery's slugified id differs from any registry id, but the discovered `name` matches an existing row's `name` (case-insensitive). Real-world cause: the `kbc_job` row was registered manually with the right bucket; Keboola's discovery exposes it under a different stage prefix that slugs to a different id. Pre-fix, auto-discovery would have inserted a duplicate whose Storage API export-async 404s. Now classified as drift, surfaced with `registry_id` so an operator can reconcile.
- `app/api/admin.py`: bucket detection in auto-discovery now uses the Keboola API's authoritative `bucket_id` field directly (with id-string parsing only as a fallback). Pre-fix, parsing the id string was the primary path and a stripped stage prefix inserted 137 broken rows.
- `app/api/sync.py`: `POST /api/sync/trigger` with a `tables` payload now actually scopes the materialized pass too. Previously the targeted trigger only filtered the legacy extractor subprocess; `_run_materialized_pass` still iterated every materialized row in the registry, so an admin asking to re-sync `kbc_job` re-ran every other due materialized row alongside it. The pass now takes a `tables` arg and skips rows not in the target set with `reason="not_in_target"`. Both registry id and name match.
- `scripts/ops/agnes-auto-upgrade.sh`: defers `docker compose up -d` while a sync is in flight. Probes `GET /api/sync/status` with a 5s timeout; if the response carries `"locked":true`, exits 0 with a deferred-recreate log line and waits for the next 5-min cron tick. Connection failures (older app version without the endpoint, app crashed, etc.) fall through to the upgrade — being stuck on a wedged image is worse than interrupting a hypothetical sync.
- `connectors/keboola/extractor.py`: `materialize_query` per-call tempdir is now opened with `ignore_cleanup_errors=True`. Previously a worker death mid-write under disk-full state could leave a multi-GiB stale slice tree (12 GiB seen on agnes-dev) because `TemporaryDirectory.__exit__` itself raised, masking the original exception and skipping cleanup. Now cleanup is best-effort and always fires.
- `src/scheduler.py`: `is_valid_schedule` now accepts `every 0m` (interval = 0 = "always due"). Useful as a force-resync override on a row whose previous attempt errored without recording `last_sync` — the default `every 1h` would otherwise block the retry for an hour. Existing values reject as before.
- `app/api/sync.py`: `POST /api/sync/trigger` now accepts both `["table_id"]` (legacy) and `{"tables": ["table_id"]}` (mirrors response shape) request bodies, plus `null` / no body for "sync everything". Malformed shapes return HTTP 422 with a structured detail. No client breakage — the old wire format keeps working.
- **`/api/v2/catalog` returns four new optional fields per row** — `rows`,
`size_bytes`, `partition_by`, `clustered_by` — populated by per-source-type
metadata providers (`connectors/bigquery/metadata.py`,
`connectors/keboola/metadata.py`). For `query_mode='remote'` BigQuery rows,
`size_bytes` is `active_logical_bytes + long_term_logical_bytes` (a full
scan reads both); region resolved from `data_source.bigquery.location`
`bq_client.get_dataset(...)` → fall back to legacy `__TABLES__`.
Existing CLI consumers reading only `rough_size_hint` are unaffected.
- **Automatic cache warmup at startup.** FastAPI startup event schedules
a background task that walks BQ remote rows and pre-populates
`_metadata_cache` + `_schema_cache` with bounded concurrency (default 4,
tunable via `AGNES_WARMUP_CONCURRENCY`). Doesn't block readiness;
per-row failures logged + skipped. Opt-out via `AGNES_SKIP_CACHE_WARMUP=1`.
- **Three new admin endpoints under `/api/admin/cache-warmup/*`:**
- `GET /status` — JSON snapshot of the latest run.
- `POST /run` — manual trigger, idempotent under concurrent invocation.
- `GET /stream` — Server-Sent Events with `start` / `row` / `complete`
events for live UI updates.
- **`/admin/tables` cache freshness panel.** Toolbar above the per-source-type
listings with progress bar + "Re-warm all" button + collapsible
terminal-style log fed by SSE (polling fallback at 3 s). Per-row badge
in the existing `col-status` column updates live (fresh / warming /
pending / error).
- **`docs/admin/query-modes.md`** — source-agnostic admin reference for
registering tables as `local` / `remote` / `materialized`. Decision
tree, per-source-type IAM + setup, three worked examples. Linked from
the `?` icon next to the `query_mode` field in the admin UI edit modal
and from the third post-register hint in `agnes admin register-table`.
- **`agnes admin register-table` post-register hint** for `query_mode=remote`:
points at `agnes query --remote "SELECT COUNT(*)..."` as the IAM smoke
check so a missing `dataViewer` / `jobUser` surfaces at registration
time, not 30 minutes later.
### Changed
- `connectors/keboola`: materialized sync now requests **parquet directly** from the Storage API (`POST /v2/storage/tables/{id}/export-async` with `fileType=parquet`) instead of CSV → DuckDB COPY → parquet. The extractor downloads the Snowflake-UNLOADed parquet, renames into place, and skips the DuckDB roundtrip entirely. Eliminates the OOM that hits multi-GB Keboola tables when `read_csv(..., all_varchar=true, max_line_size=64MB)` materializes the whole CSV in memory before COPY. Sliced exports (large tables that Snowflake UNLOAD writes as multiple files) are merged via `DuckDB COPY (SELECT * FROM read_parquet([...]))` — peak memory bounded to one parquet row group (~1 MiB) regardless of table size. Admin can pin the legacy CSV path with `source_query='{"file_type":"csv"}'`. Backward-compat alias `KeboolaStorageClient.export_table_to_csv` retained.
- `connectors/keboola/storage_api.py`: `download_file` gzip detection no longer treats unencrypted files as gzipped (previous heuristic would have corrupted parquet downloads at gunzip time). Name-suffix-only.
- **BREAKING for Keboola operators**: schema bump to **v26**. Existing `query_mode='local'` Keboola rows are migrated to `query_mode='materialized'` (NULL `source_query` = full-table export — same effective behavior as before). New `register-table --source-type keboola` and `discover-and-register --source-type keboola` default to `materialized`. The `local` mode for Keboola is gone — it ran the DuckDB extension's COPY through Keboola QueryService, which is unreliable on linked-bucket projects (extension v0.1.6 fixes the linked-bucket case but not yet in the community CDN; pre-fix, projects with the `block-shared-snowflake-access` flag couldn't see bucket schemas at all). BigQuery and Jira `local` rows are untouched. See `connectors/keboola/storage_api.py` + the v25→v26 migration in `src/db.py`.
- **Keboola extract path is now Storage API direct**, not the DuckDB extension. New `connectors/keboola/storage_api.py` talks to Keboola Storage API straight via `requests`:
- `POST /v2/storage/tables/{id}/export-async` to kick off the job (with optional `whereFilters` / `columns` / `changedSince` from the row's `source_query` JSON);
- `GET /v2/storage/jobs/{id}` polled with bounded exponential backoff until `success` or `error`;
- `GET /v2/storage/files/{id}?federationToken=1` to fetch a signed URL;
- `GET <signed_url>` (or per-slice URLs from a manifest for sliced exports) → CSV → DuckDB COPY → parquet.
No `os.chdir`, no boto3/azure-blob/google-cloud-storage SDKs, no extension binary on the data path. Thread-safe. Same path is used both by `materialize_query()` (admin-registered tables with optional filter spec) and by `_extract_via_legacy()` (per-table fallback inside the parallel batch extractor).
- **`source_query` shape for Keboola materialized rows is JSON**, not SQL — Storage API takes a structured filter object, not free-form SQL. Mirrors the BQ materialized path conceptually but with a different payload. Schema:
```json
{
"where_filters": [{"column": "date", "operator": "ge", "values": ["2026-04-01"]}],
"columns": ["id", "date", "amount"],
"changed_since": "2026-04-01T00:00:00",
"limit": 1000
}
```
All fields optional. Empty / NULL = full-table export. Operators per Keboola Apiary: `eq`, `ne`, `in`, `notIn`, `ge`, `gt`, `le`, `lt`. See `connectors/keboola/storage_api.py:ExportFilter`.
- `POST /api/sync/trigger` is now singleton per process. A second trigger that arrives while the previous sync is still running returns **HTTP 409** (`detail: sync_already_in_progress`) instead of scheduling a parallel `_run_sync`. The scheduler container's `data-refresh` job logs the 409 as a normal warning and waits for its next tick — no retry loop. Operator-visible: clients that hand-roll their own polling on `/api/sync/trigger` now need to handle 409. Why it matters: two concurrent extractor subprocesses both write `extract.duckdb`, fight for its file lock, starve uvicorn's worker pool, and Docker flips `agnes-app` to `unhealthy` long enough for `reverse_proxy`-fronted deploys to return 503 to external traffic until contention drains.
- Keboola legacy Storage-API fallback now runs in parallel across a process pool. When the DuckDB extension's per-table scan fails (e.g. on projects with the `block-shared-snowflake-access` feature flag where workspace roles can't see bucket schemas, see keboola/duckdb-extension#17), tables that fall back to the legacy `kbcstorage` client are now drained concurrently instead of one-at-a-time. The dominant per-table cost is the synchronous wait on the Keboola Storage export job (which scans Snowflake into a CSV and returns); fanning out across N workers cuts wall-clock proportionally for batches that hit the fallback. Default 8 workers, override with `AGNES_KEBOOLA_PARALLELISM` (set to `1` for sequential, useful when debugging or seeing Keboola-side rate-limiting). Project-level concurrency is bounded by the operator's `storage.jobsParallelism` limit (typically 10); the default 8 leaves headroom for other clients. Workers are processes (not threads) because `connectors/keboola/client.py:export_table` does `os.chdir(temp_dir)` to redirect kbcstorage's slice-file downloads into a per-call temp directory — `os.chdir` is process-global, so two threads racing on it land slice files in the wrong directory and the merge step fails with `[Errno 2] No such file or directory: '<job_id>.csv_X_Y_Z.csv'`. Process workers each have their own CWD.
- Extractor subprocess timeout bumped from 1800s to 3600s (configurable via `AGNES_EXTRACTOR_TIMEOUT_SEC`). On projects where the legacy Storage-API fallback is the only working path (extension blocked by `block-shared-snowflake-access`), 28+ tables × multi-minute Keboola export jobs routinely overran the 30-min cap before the parallel fallback even existed; with parallelization in place the run usually fits, but `kbc_telemetry`-class tables and large CRM snapshots can still push it over. The 1h ceiling matches the longest practically-reasonable Keboola export job before an operator should intervene.
- Extractor subprocess is now launched in its own process group (`subprocess.Popen(..., start_new_session=True)`) so a timeout can take down the whole tree — the extractor parent plus the ProcessPoolExecutor workers it spawned for parallel legacy fallback. Without this, a `subprocess.run(timeout=...)` SIGKILLed only the immediate child; the pool workers were reparented to PID 1 and continued holding open Keboola Storage export jobs, blocking the next sync cycle. On timeout the parent now SIGTERMs the group (10s grace), then SIGKILLs stragglers. The extractor's inline Python script installs a SIGTERM → `sys.exit(143)` handler so the `with ProcessPoolExecutor(...)` block runs its `__exit__` (`shutdown(wait=True)`) cleanly before the process dies.
### Fixed (cutover regressions, surfaced 2026-05-06)
- `agnes pull` no longer fails with `hash mismatch: expected … got …` for every Keboola local-mode table. `src/orchestrator.py:_update_sync_state` stored `md5(f"{mtime_ns}:{size}")[:12]` — a 12-char fingerprint of file metadata — while the CLI's post-download integrity check compares against the full 32-char content MD5 it computes via `cli/commands/sync.py:_md5_file`. Those could never match, so every `agnes pull` reported `Updated 0 tables` even when the server had data. Now the orchestrator stores the same content MD5 the materialized SQL path already used (`app/api/sync.py:_file_hash`).
- Latent `NameError: name '_sys' is not defined` in `app/api/sync.py:_run_sync` when the function fell into its outer `except Exception` before reaching the inner `import sys as _sys`. Hoisted the import to the top of the body so the error path stays loggable instead of trading the original failure for a misleading stack trace.
- Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes:
- `kbcstorage>=0.9.0` is promoted from optional to core dependency. The legacy fallback path in `connectors/keboola/extractor.py:_extract_via_legacy` has been there since the extension landed, but until now the bare `from kbcstorage.client import Client` would crash any default install with `ModuleNotFoundError`.
- `connectors/keboola/extractor.py:run` now wraps `_extract_via_extension` in a per-table try/except — on any per-table scan failure it retries via the legacy client. Previously, when `ATTACH` succeeded but the table-level `COPY (SELECT * FROM kbc."<bucket>"."<table>")` failed, the table was just marked failed with no retry.
Together these unblock deployments where the extension's bucket-schema scans return `Schema '..."in.c-..."' does not exist or not authorized` (keboola/duckdb-extension#17) while the upstream extension fix is in flight.
- `connectors/keboola/access.py:KeboolaAccess.__init__` and `connectors/keboola/extractor.py:_try_attach_extension` now strip a trailing slash from the Keboola stack URL before passing it to the DuckDB Keboola extension's `ATTACH`. The canonical Keboola URL form (`https://connection.<region>.keboola.com/`) failed there with a network error; bare-host form works. Operators no longer have to massage the value out of `KEBOOLA_STACK_URL` / `instance.yaml`.
- `src/profiler.py:TableInfo.__init__` makes `description` optional (defaults to `""`). Two call sites in `app/api/catalog.py` and `app/api/sync.py` instantiate `TableInfo(name=..., table_id=...)` without it; the previous required-arg signature crashed sync's profiler pass with `TableInfo.__init__() missing 1 required positional argument: 'description'`, leaving `[SYNC] Profiled 0 tables` after every run.
- `scripts/ops/agnes-auto-upgrade.sh` now `chown`s `${STATE_DIR}` (`/data/state` by default), `/data/extracts`, `/data/analytics` to the new image's runtime UID:GID before `docker compose up` when the image digest moves. Catches root → non-root UID transitions across upgrades — without it, the new image's first start `PermissionError`s on `.session_secret` / DuckDB. Reads the target uid:gid from `/etc/passwd` inside the image so the script stays honest if the runtime user ever moves off uid 999.
- **`/api/v2/schema/{id}` cache miss now does 1 BQ job instead of 2.**
`connectors/bigquery/access.py:fetch_bq_columns_full` collapses what
used to be `_fetch_bq_schema` + `_fetch_bq_table_options` into a single
`INFORMATION_SCHEMA.COLUMNS` query (same view, same predicate, just a
combined SELECT list). The metadata provider's partition/cluster path
shares the same helper — zero SQL duplication across the two consumers.
- **All four catalog/schema/sample/metadata caches are flushed on registry
change.** `app/api/v2_catalog.py:invalidate_for_table` is wired into
`POST /api/admin/register-table`, `PUT /api/admin/registry/{id}`, and
`DELETE /api/admin/registry/{id}`. After a registry write, a single-row
re-warm task is scheduled in the background so the admin's verification
request hits warm caches within ~1 s instead of waiting for the next
analyst miss. Pre-fix none of the caches were invalidated — admin
registers a table, `agnes catalog` doesn't show the new row for up to
5 min; admin updates a row's bucket, `agnes schema` returns the OLD
column list for up to 1 hour.
- **`v2_schema.build_schema` split into RBAC-aware outer + RBAC-naive
inner (`build_schema_uncached`).** Live endpoint behavior unchanged;
warmup uses the inner entry point to populate `_schema_cache` without
a user context.
### Internal
- `infra/modules/customer-instance` (tag `infra-v1.8.0`): `startup-script.sh.tpl` no longer overwrites operator-edited `AGNES_TAG` / `AGNES_TEMP_DIR` in `/opt/agnes/.env` on every boot. Reads the existing values when present and lets them win over the template-computed `$IMAGE_TAG`. Pre-fix, an in-place TF action that stopped/started the VM (e.g. `machine_type` change) would re-run the startup script and clobber any manually-pinned image tag — operators had to re-edit the file post-restart. Fresh provisions still get the TF-driven values; the `.env` file's existence is the disambiguator. To force a TF-driven reset, `rm /opt/agnes/.env` and reboot.
- New shared dataclass module `app/api/_metadata_models.py` with
`MetadataRequest` (frozen) + `TableMetadata` for source-agnostic
provider input/output.
- New `connectors/keboola/storage_api.py:KeboolaStorageClient.get_table_info`
thin wrapper — keeps `_get` private to the module.
- New env vars (operator-facing tuning, no required setup change):
- `AGNES_SKIP_CACHE_WARMUP` — opt-out of startup warmup.
- `AGNES_WARMUP_CONCURRENCY` — default 4, max parallel BQ
INFORMATION_SCHEMA jobs during a warmup pass.
- New runtime dependency: `sse-starlette>=2.0` (Server-Sent Events
responses for the cache-warmup stream).
- Tests added: `test_metadata_models`, `test_v2_schema_columns_consolidation`,
`test_v2_catalog_dispatcher`, `test_connectors_bigquery_metadata`,
`test_connectors_keboola_metadata`, `test_v2_catalog_remote_metadata`,
`test_v2_catalog_invalidation`, `test_cache_warmup`,
`test_main_startup_warmup`, `test_admin_tables_warmup_ui`.
## [0.45.0] — 2026-05-07

View file

@ -0,0 +1,40 @@
"""Shared data shapes for source-agnostic table-metadata providers.
Lives under `app/api/` because the primary consumer is
`app/api/v2_catalog.py`. Connector-side providers in `connectors/<source>/`
import upward into this module the inverse layering would force
`v2_catalog.py` to depend on `connectors/__init__.py`, which is the
wrong direction.
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class MetadataRequest:
"""Narrow input passed to a metadata provider's `fetch()`.
`bucket` and `source_table` are pre-validated by the dispatcher
(`validate_quoted_identifier`) before construction, so the provider
can interpolate them into SQL/URL paths without re-checking. Frozen
so the (provider, request)-keyed cache lookup is stable.
"""
table_id: str
bucket: str
source_table: str
@dataclass
class TableMetadata:
"""Source-agnostic metadata bundle. Every field optional — providers
fill what they can cheaply get; callers tolerate `None`. Adding a new
field here is a non-breaking change: existing CLI consumers don't
even render `rough_size_hint` (verified `grep -rn rough_size_hint cli/`
is empty), let alone the new fields.
"""
rows: int | None = None
size_bytes: int | None = None
partition_by: str | None = None
clustered_by: list[str] | None = None

View file

@ -2179,6 +2179,9 @@ def register_table(
params=_sanitize_for_audit(request.model_dump()),
)
from app.api.v2_catalog import invalidate_for_table
invalidate_for_table(table_id)
if not is_bigquery:
# Keboola / Jira / local rows are insert-only here. 201 Created — the
# decorator no longer carries a default status, so each branch is
@ -2512,6 +2515,9 @@ async def update_table(
if after.get("source_type") == "bigquery":
background.add_task(_materialize_bigquery_extract_bg)
from app.api.v2_catalog import invalidate_for_table
invalidate_for_table(table_id)
return {"id": table_id, "updated": list(updates.keys())}
@ -2607,6 +2613,9 @@ async def unregister_table(
}),
)
from app.api.v2_catalog import invalidate_for_table
invalidate_for_table(table_id)
if was_bigquery:
background.add_task(_materialize_bigquery_extract_bg)

264
app/api/cache_warmup.py Normal file
View file

@ -0,0 +1,264 @@
"""Cache warmup framework — populates catalog/schema/metadata caches at
container startup so the first analyst hits warm caches.
Bounded concurrency (4 by default). Exposes:
- GET /api/admin/cache-warmup/status JSON snapshot
- POST /api/admin/cache-warmup/run manual trigger (idempotent)
- GET /api/admin/cache-warmup/stream Server-Sent Events
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Literal
from uuid import uuid4
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse
from app.auth.access import require_admin
logger = logging.getLogger(__name__)
router = APIRouter()
@dataclass
class WarmupRowState:
table_id: str
status: Literal["pending", "warming", "fresh", "error"]
started_at: str | None = None
completed_at: str | None = None
duration_ms: int | None = None
error: str | None = None
last_warmed_at: str | None = None
@dataclass
class WarmupRunState:
run_id: str
trigger: Literal["startup", "manual", "registry_change"]
started_at: str
completed_at: str | None = None
total: int = 0
completed: int = 0
failed: int = 0
rows: dict[str, WarmupRowState] = field(default_factory=dict)
_subscribers: list[asyncio.Queue] = field(default_factory=list, repr=False)
WARMUP_STATE: WarmupRunState | None = None
_RUN_LOCK = asyncio.Lock()
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def maybe_schedule_startup_warmup() -> None:
"""Called from app/main.py FastAPI startup event."""
if os.environ.get("AGNES_SKIP_CACHE_WARMUP") == "1":
logger.info("cache warmup skipped (AGNES_SKIP_CACHE_WARMUP=1)")
return
try:
asyncio.create_task(_warm_catalog_caches_bg(trigger="startup"))
except RuntimeError:
logger.warning("no running event loop — startup warmup skipped")
async def _warm_catalog_caches_bg(
trigger: str = "startup", state: WarmupRunState | None = None,
) -> None:
"""Walk registry, warm metadata + schema caches for every remote row.
If `state` is provided, use it (caller has already published it on
WARMUP_STATE). Otherwise build a fresh state and assign WARMUP_STATE.
"""
global WARMUP_STATE
if state is None:
async with _RUN_LOCK:
# Re-check inside the lock — another caller might have completed
# a run while we were waiting.
if WARMUP_STATE and WARMUP_STATE.completed_at is None:
return
state = WarmupRunState(
run_id=uuid4().hex[:8],
trigger=trigger,
started_at=_now_iso(),
)
WARMUP_STATE = state
run_id = state.run_id
rows = _list_remote_rows()
state.total = len(rows)
for r in rows:
state.rows[r["id"]] = WarmupRowState(
table_id=r["id"], status="pending",
)
_broadcast(state, {"event": "start", "data": {
"run_id": run_id, "trigger": trigger, "total": state.total,
}})
sem = asyncio.Semaphore(int(os.environ.get("AGNES_WARMUP_CONCURRENCY", "4")))
await asyncio.gather(
*(_warm_one(r, state, sem) for r in rows), return_exceptions=True,
)
state.completed_at = _now_iso()
_broadcast(state, {"event": "complete", "data": {
"run_id": run_id, "total": state.total,
"completed": state.completed, "failed": state.failed,
}})
logger.info(
"cache warmup complete: run_id=%s total=%d ok=%d fail=%d",
run_id, state.total, state.completed, state.failed,
)
def _list_remote_rows() -> list[dict]:
"""Snapshot of registry rows that need a warmup pass."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
conn = get_system_db()
rows = TableRegistryRepository(conn).list_all()
return [
r for r in rows
if r.get("query_mode") == "remote" and r.get("source_type") == "bigquery"
]
async def _warm_one(
row: dict, state: WarmupRunState, sem: asyncio.Semaphore,
) -> None:
async with sem:
rs = state.rows[row["id"]]
rs.status = "warming"
rs.started_at = _now_iso()
_broadcast(state, {"event": "row", "data": asdict(rs)})
t0 = time.monotonic()
try:
await asyncio.to_thread(_warm_metadata_sync, row)
await asyncio.to_thread(_warm_schema_sync, row)
rs.status = "fresh"
rs.last_warmed_at = _now_iso()
state.completed += 1
except Exception as e:
rs.status = "error"
rs.error = str(e)
state.failed += 1
logger.warning("cache warmup row=%s failed: %s", row["id"], e)
finally:
rs.completed_at = _now_iso()
rs.duration_ms = int((time.monotonic() - t0) * 1000)
_broadcast(state, {"event": "row", "data": asdict(rs)})
def _warm_metadata_sync(row: dict) -> None:
"""Trigger metadata cache populate via the catalog's normal path."""
from app.api.v2_catalog import _size_hint_for_row
_size_hint_for_row(row)
def _warm_schema_sync(row: dict) -> None:
"""Trigger schema cache populate via build_schema_uncached."""
from app.api.v2_schema import build_schema_uncached
from connectors.bigquery.access import get_bq_access
from src.db import get_system_db
bq = get_bq_access()
build_schema_uncached(get_system_db(), row["id"], bq=bq, row=row)
async def warm_one_table(table_id: str) -> None:
"""Single-row re-warm — invoked by `invalidate_for_table` after a
registry change. Does NOT update WARMUP_STATE (small change shouldn't
overwrite the last full run's status); just refreshes the caches."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
conn = get_system_db()
row = TableRegistryRepository(conn).get(table_id)
if not row or row.get("query_mode") != "remote":
return
try:
await asyncio.to_thread(_warm_metadata_sync, row)
await asyncio.to_thread(_warm_schema_sync, row)
except Exception as e:
logger.warning("single-row warmup failed for %s: %s", table_id, e)
def _broadcast(state: WarmupRunState, event: dict) -> None:
"""Send an event to every SSE subscriber. Dead queues are pruned."""
dead = []
for q in state._subscribers:
try:
q.put_nowait(event)
except asyncio.QueueFull:
dead.append(q)
for q in dead:
state._subscribers.remove(q)
def _serialize_state(state: WarmupRunState) -> dict:
return {
"run_id": state.run_id,
"trigger": state.trigger,
"started_at": state.started_at,
"completed_at": state.completed_at,
"total": state.total,
"completed": state.completed,
"failed": state.failed,
"rows": {tid: asdict(rs) for tid, rs in state.rows.items()},
}
# ─── Endpoints ────────────────────────────────────────────────────────
@router.get("/api/admin/cache-warmup/status")
async def warmup_status(user: dict = Depends(require_admin)):
if WARMUP_STATE is None:
return {"state": "never_run"}
return _serialize_state(WARMUP_STATE)
@router.post("/api/admin/cache-warmup/run")
async def warmup_run(user: dict = Depends(require_admin)):
global WARMUP_STATE
if WARMUP_STATE and WARMUP_STATE.completed_at is None:
return {"run_id": WARMUP_STATE.run_id, "status": "already_running"}
state = WarmupRunState(
run_id=uuid4().hex[:8],
trigger="manual",
started_at=_now_iso(),
)
WARMUP_STATE = state
asyncio.create_task(_warm_catalog_caches_bg(state=state))
return {"run_id": state.run_id, "status": "started"}
@router.get("/api/admin/cache-warmup/stream")
async def warmup_stream(user: dict = Depends(require_admin)):
async def gen():
q: asyncio.Queue = asyncio.Queue(maxsize=256)
if WARMUP_STATE is None:
yield {"event": "idle", "data": json.dumps({"state": "never_run"})}
return
WARMUP_STATE._subscribers.append(q)
yield {"event": "snapshot", "data": json.dumps(_serialize_state(WARMUP_STATE))}
try:
while True:
ev = await asyncio.wait_for(q.get(), timeout=30.0)
yield {"event": ev["event"], "data": json.dumps(ev["data"])}
if ev["event"] == "complete":
return
except asyncio.TimeoutError:
return
finally:
if WARMUP_STATE and q in WARMUP_STATE._subscribers:
WARMUP_STATE._subscribers.remove(q)
return EventSourceResponse(gen())

View file

@ -11,6 +11,8 @@ from app.utils import get_data_dir as _get_data_dir
from src.rbac import can_access_table
from src.repositories.table_registry import TableRegistryRepository
from app.api.v2_cache import TTLCache
from app.api._metadata_models import MetadataRequest, TableMetadata
from src.identifier_validation import validate_quoted_identifier
router = APIRouter(prefix="/api/v2", tags=["v2"])
@ -25,6 +27,51 @@ router = APIRouter(prefix="/api/v2", tags=["v2"])
_table_rows_cache = TTLCache(maxsize=1, ttl_seconds=300)
_TABLE_ROWS_KEY = "all"
# Per-table cached TableMetadata. 15-min TTL — long enough to amortise
# across an analyst session, short enough that a freshly-registered
# remote table shows real numbers within a coffee break (the cache-bust
# path in `invalidate_for_table` accelerates this for the common admin-
# verifies-registration flow).
_metadata_cache = TTLCache(maxsize=512, ttl_seconds=900)
def _metadata_provider_for(source_type: str):
"""Lazy-import dispatch for source-specific metadata providers.
Lazy because connector modules are heavy (BQ extension, google-cloud
client, etc.) and a Keboola-only deployment shouldn't pay the BQ
import cost. Returns ``None`` for unknown source types the caller
treats that as "no metadata enrichment available" and falls through.
"""
if source_type == "bigquery":
from connectors.bigquery import metadata as m
return m.fetch
if source_type == "keboola":
from connectors.keboola import metadata as m
return m.fetch
return None
def _build_metadata_request(row: dict) -> MetadataRequest | None:
"""Construct a validated MetadataRequest from a registry row.
Pre-validates the identifiers via `validate_quoted_identifier` before
constructing the request providers can then interpolate
`req.bucket` / `req.source_table` into SQL/URL paths without
re-checking. Returns ``None`` when validation fails; provider is not
dispatched for that row.
"""
bucket = row.get("bucket") or ""
source_table = row.get("source_table") or row.get("id") or ""
if not bucket or not source_table:
return None
if not (validate_quoted_identifier(bucket, "bucket")
and validate_quoted_identifier(source_table, "source_table")):
return None
return MetadataRequest(
table_id=row["id"], bucket=bucket, source_table=source_table,
)
def _flavor_for(source_type: str) -> str:
return "bigquery" if source_type == "bigquery" else "duckdb"
@ -65,23 +112,67 @@ def _bucket_size(byte_count: int) -> str:
return "very_large"
def _materialized_size_hint(table_id: str, source_type: str, query_mode: str) -> str | None:
"""Return a rough size bucket for a row whose data is on the server's
local filesystem (any `query_mode` that produces a parquet `local` and
`materialized`). Returns ``None`` for `remote` (size requires a BQ
INFORMATION_SCHEMA round-trip; tracked separately) and for tables whose
parquet hasn't been materialised yet so the AI gets ``null`` not a
misleading "small".
def _size_hint_for_row(row: dict) -> dict:
"""Resolve the per-row metadata bundle the catalog response surfaces.
Renamed from `_materialized_size_hint` (which always also handled
`local` rows; the old name was misleading). Returns a dict with up
to four keys: `rough_size_hint`, `rows`, `size_bytes`, `partition_by`,
`clustered_by`. Missing keys are reported as `null` in the response.
Branches:
- `local` / `materialized` existing on-disk parquet stat (cheap).
- `remote` dispatch to the per-source-type provider; cache the
TableMetadata for 15 min.
"""
table_id = row["id"]
source_type = row.get("source_type") or ""
query_mode = row.get("query_mode") or "local"
if query_mode in ("local", "materialized"):
return {"rough_size_hint": _materialized_parquet_size_bucket(
table_id, source_type, query_mode,
)}
if query_mode != "remote":
return {"rough_size_hint": None}
# Cache lookup (per-row TableMetadata).
cached = _metadata_cache.get(table_id)
if cached is None:
cached = _resolve_remote_metadata(row)
if cached is not None:
_metadata_cache.set(table_id, cached)
if cached is None:
return {"rough_size_hint": None}
return {
"rough_size_hint": _bucket_size(cached.size_bytes) if cached.size_bytes is not None else None,
"rows": cached.rows,
"size_bytes": cached.size_bytes,
"partition_by": cached.partition_by,
"clustered_by": cached.clustered_by,
}
def _materialized_parquet_size_bucket(
table_id: str, source_type: str, query_mode: str,
) -> str | None:
"""Size hint for rows whose data is on the server filesystem
(the old `_materialized_size_hint` body). Renamed for clarity now
that the new dispatcher is the entry point.
Layout matches the v2 extract.duckdb contract:
${DATA_DIR}/extracts/<source_type>/data/<table_id>.parquet
"""
if query_mode == "remote":
return None
if not source_type:
return None
try:
path = Path(_get_data_dir()) / "extracts" / source_type / "data" / f"{table_id}.parquet"
path = (
Path(_get_data_dir()) / "extracts" / source_type / "data"
/ f"{table_id}.parquet"
)
if not path.exists():
return None
return _bucket_size(path.stat().st_size)
@ -91,6 +182,75 @@ def _materialized_size_hint(table_id: str, source_type: str, query_mode: str) ->
return None
def _resolve_remote_metadata(row: dict) -> "TableMetadata | None":
"""Provider dispatch for a remote row. Returns None on any failure."""
source_type = row.get("source_type") or ""
provider = _metadata_provider_for(source_type)
if provider is None:
return None
req = _build_metadata_request(row)
if req is None:
return None
try:
return provider(req)
except Exception:
# Defense in depth — providers are documented as never-raises,
# but a regression would otherwise 500 the whole catalog.
return None
def invalidate_for_table(table_id: str) -> None:
"""Drop every per-table cache so the next /api/v2/* request reflects
the just-registered / updated / unregistered row immediately. Owned
by the catalog module so admin.py doesn't need to know which caches
exist.
Imports v2_schema and v2_sample lazily keeps catalog tests from
pulling in BQ-extension imports they don't need.
"""
import asyncio
from app.api import v2_schema, v2_sample
_table_rows_cache.clear()
_metadata_cache.invalidate(table_id)
v2_schema._schema_cache.invalidate(table_id)
# Sample cache key is `f"{table_id}|{n}"`; clearing the whole sample
# cache is heavier than precise invalidation, but registry-change
# frequency (handful per day on a typical instance) doesn't justify
# adding a prefix-invalidation primitive to TTLCache.
v2_sample._sample_cache.clear()
# Schedule a single-row re-warm so admins editing a registry row
# see fresh data within a couple of seconds rather than waiting for
# the next analyst to trigger a miss. Fire-and-forget; failures
# log + skip inside the coroutine.
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None:
# Running inside an async context (production FastAPI path).
asyncio.create_task(_rewarm_one_row(table_id))
# No running event loop (e.g. called from a sync test or a sync
# handler thread). Skip re-warm — the next live request will
# populate via miss.
async def _rewarm_one_row(table_id: str) -> None:
"""Background single-row re-warm. Imports cache_warmup lazily to
avoid a circular import at module load (cache_warmup.py is created
in Task 10; until then, this function logs a warning and returns)."""
try:
from app.api.cache_warmup import warm_one_table
await warm_one_table(table_id)
except Exception:
import logging
logging.getLogger(__name__).warning(
"single-row re-warm failed for %s — next live request will populate",
table_id,
)
def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict:
rows = _table_rows_cache.get(_TABLE_ROWS_KEY)
if rows is None:
@ -105,6 +265,7 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict:
for r in rows:
if not can_access_table(user, r["id"], conn):
continue
hint = _size_hint_for_row(r)
visible.append({
"id": r["id"],
"name": r.get("name") or r["id"],
@ -114,10 +275,11 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict:
"sql_flavor": _flavor_for(r.get("source_type") or ""),
"where_examples": _examples_for(r.get("source_type") or ""),
"fetch_via": _fetch_hint(r["id"], r.get("source_type") or ""),
"rough_size_hint": _materialized_size_hint(
r["id"], r.get("source_type") or "",
r.get("query_mode") or "local",
),
"rough_size_hint": hint.get("rough_size_hint"),
"rows": hint.get("rows"),
"size_bytes": hint.get("size_bytes"),
"partition_by": hint.get("partition_by"),
"clustered_by": hint.get("clustered_by"),
})
return {
@ -132,12 +294,12 @@ def catalog(
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
# Plain ``def`` so FastAPI auto-offloads to the anyio thread pool —
# build_catalog now calls `_materialized_size_hint` for every visible
# row, which does sync `Path.stat()` / `Path.exists()` on the data
# volume. On local FS that's microseconds, but on a network-mounted
# DATA_DIR (NFS / CIFS / GCS-FUSE) those calls can block. Plain ``def``
# means each request runs on its own thread; the event loop stays
# free for non-catalog traffic. Mirrors the Tier 1 conversion of
# /api/query, /api/v2/scan, /api/v2/sample, /api/v2/schema —
# Devin Review on PR #188.
# build_catalog now calls `_size_hint_for_row` for every visible row,
# which does sync `Path.stat()` / `Path.exists()` on the data volume
# (local/materialized) or provider dispatch (remote). On local FS
# that's microseconds, but on a network-mounted DATA_DIR (NFS / CIFS /
# GCS-FUSE) those calls can block. Plain ``def`` means each request
# runs on its own thread; the event loop stays free for non-catalog
# traffic. Mirrors the Tier 1 conversion of /api/query, /api/v2/scan,
# /api/v2/sample, /api/v2/schema — Devin Review on PR #188.
return build_catalog(conn, user)

View file

@ -31,51 +31,34 @@ _BQ_DIALECT_HINTS = {
def _fetch_bq_schema(bq, dataset: str, table: str) -> list[dict]:
"""Fetch column list via INFORMATION_SCHEMA.COLUMNS using DuckDB BQ extension.
"""Fetch column list via the shared ``_fetch_bq_columns_full_impl`` helper.
`bq.duckdb_session()` provides a DuckDB conn with the bigquery extension
loaded + auth secret installed. SQL here is server-constructed (queries
INFORMATION_SCHEMA.COLUMNS with validated identifiers, no user-derived
fragments), so a BQ BadRequest means registry corruption, not user input
surfaces as `bq_upstream_error` (HTTP 502), same as `/sample`, opposite
of `/scan*`.
Pre-#155 this had its own INFORMATION_SCHEMA.COLUMNS query; consolidating
with ``_fetch_bq_table_options`` (now also delegating to the same shared
SQL) halves the BQ job count on cache miss. Returns the schema-endpoint
column shape: name / type / nullable / description.
Calls the raising variant so BQ exceptions reach ``translate_bq_error``
with their original type (Forbidden 502, BadRequest 400, etc.).
"""
from connectors.bigquery.access import translate_bq_error
from src.identifier_validation import validate_quoted_identifier
from connectors.bigquery.access import _fetch_bq_columns_full_impl, translate_bq_error, BqAccessError
# Surface "BQ not configured" as the structured 500 BqAccessError(not_configured)
# with hint, not the misleading 400 unsafe_identifier the empty-string sentinel
# would otherwise trigger from validate_quoted_identifier below. Devin BUG_0002.
if not bq.projects.data:
bq.client() # raises BqAccessError(not_configured); endpoint catches it
# Defense in depth (cf. v2_sample) — registry already validates these,
# but the v2 endpoints are downstream of admin REST writes that could
# bypass that gate. A backtick in `dataset` would otherwise break out
# of `…` quoting and execute arbitrary BQ SQL.
if not (validate_quoted_identifier(bq.projects.data, "BQ project")
and validate_quoted_identifier(dataset, "BQ dataset")
and validate_quoted_identifier(table, "BQ source_table")):
raise ValueError("unsafe BQ identifier in registry — refusing to query")
bq_sql = (
f"SELECT column_name, data_type, is_nullable "
f"FROM `{bq.projects.data}.{dataset}.INFORMATION_SCHEMA.COLUMNS` "
f"WHERE table_name = ? ORDER BY ordinal_position"
)
with bq.duckdb_session() as conn:
try:
rows = conn.execute(
"SELECT * FROM bigquery_query(?, ?, ?)",
[bq.projects.billing, bq_sql, table],
).fetchall()
rows = _fetch_bq_columns_full_impl(bq, dataset, table)
except (ValueError, BqAccessError):
# ValueError ("unsafe identifier") and BqAccessError propagate
# unchanged — the endpoint's existing handlers expect those types.
raise
except Exception as e:
# Any other BQ-side exception goes through translate_bq_error so
# the response status is classified correctly.
raise translate_bq_error(e, bq.projects, bad_request_status="upstream_error")
return [
{
"name": r[0],
"type": r[1],
"nullable": r[2] == "YES",
"name": r["name"],
"type": r["type"],
"nullable": r["nullable"],
"description": "",
}
for r in rows
@ -83,61 +66,27 @@ def _fetch_bq_schema(bq, dataset: str, table: str) -> list[dict]:
def _fetch_bq_table_options(bq, dataset: str, table: str) -> dict:
"""Best-effort fetch of partition/cluster info from INFORMATION_SCHEMA.COLUMNS.
"""Best-effort fetch of partition/cluster info via the shared
`fetch_bq_columns_full` helper.
BigQuery exposes partition + cluster metadata as per-column flags:
- `is_partitioning_column` ('YES' / 'NO') at most one column per table
- `clustering_ordinal_position` (INT64, null for non-clustered columns;
otherwise 1, 2, ... in cluster-key order)
Returns `{}` on ANY failure (best-effort). The outer
`try/except Exception return {}` is a load-bearing contract: the
/schema endpoint must keep returning 200 with empty partition info even
when this query fails (e.g. on permissioned tables, on cross-project
misconfigurations). DO NOT route this through `translate_bq_error`
that would convert errors to BqAccessError which the endpoint would 502
on. See tests/test_v2_schema.py::test_schema_returns_200_with_empty_
Returns ``{}`` on ANY failure (best-effort). Same load-bearing
contract as before: the /schema endpoint must keep returning 200
with empty partition info when this fails.
"""
from src.identifier_validation import validate_quoted_identifier
from connectors.bigquery.access import fetch_bq_columns_full
# Best-effort path: if BQ isn't configured (sentinel BqAccess), return
# empty partition info silently — operator gets schema (200) without
# failing on the missing config. The strict /schema path (_fetch_bq_schema)
# surfaces the not_configured error separately.
if not bq.projects.data:
return {}
if not (validate_quoted_identifier(bq.projects.data, "BQ project")
and validate_quoted_identifier(dataset, "BQ dataset")
and validate_quoted_identifier(table, "BQ source_table")):
return {} # Best-effort; refuse to query unsafe identifiers.
try:
with bq.duckdb_session() as conn:
bq_sql = (
f"SELECT column_name, is_partitioning_column, clustering_ordinal_position "
f"FROM `{bq.projects.data}.{dataset}.INFORMATION_SCHEMA.COLUMNS` "
f"WHERE table_name = ? "
f"ORDER BY clustering_ordinal_position NULLS LAST"
)
rows = conn.execute(
"SELECT * FROM bigquery_query(?, ?, ?)",
[bq.projects.billing, bq_sql, table],
).fetchall()
rows = fetch_bq_columns_full(bq, dataset, table)
if not rows:
return {}
partition_by = next(
(r[0] for r in rows if (r[1] or "").upper() == "YES"),
(r["name"] for r in rows if r["is_partitioning_column"]),
None,
)
clustered_by = [r[0] for r in rows if r[2] is not None]
clustered_rows = [r for r in rows if r["clustering_ordinal_position"] is not None]
clustered_rows.sort(key=lambda r: r["clustering_ordinal_position"])
clustered_by = [r["name"] for r in clustered_rows]
return {"partition_by": partition_by, "clustered_by": clustered_by}
except Exception as e:
logger.warning(
"BQ table options fetch failed for %s.%s.%s: %s",
bq.projects.data, dataset, table, e,
)
return {}
def build_schema(
@ -157,11 +106,35 @@ def build_schema(
if not can_access_table(user, table_id, conn):
raise PermissionError(table_id)
cache_key = f"{table_id}"
cached = _schema_cache.get(cache_key)
cached = _schema_cache.get(table_id)
if cached is not None:
return cached
return build_schema_uncached(conn, table_id, bq=bq, row=row)
def build_schema_uncached(
conn: duckdb.DuckDBPyConnection,
table_id: str,
*,
bq: BqAccess,
row: dict | None = None,
) -> dict:
"""Build the schema response and populate `_schema_cache`. **Skips
RBAC and cache-hit short-circuit** call only from contexts where
those are unnecessary (warmup) or already enforced upstream
(`build_schema`).
Pass `row` from the upstream caller's `repo.get(table_id)` to avoid
a redundant DB round-trip; if not provided, `build_schema_uncached`
fetches it itself (the warmup-direct call site).
"""
if row is None:
repo = TableRegistryRepository(conn)
row = repo.get(table_id)
if not row:
raise NotFound(table_id)
source_type = row.get("source_type") or ""
if source_type == "bigquery":
dataset = row.get("bucket") or ""
@ -179,7 +152,6 @@ def build_schema(
}
else:
# Local source — read schema from the parquet via DuckDB
from pathlib import Path
from app.utils import get_data_dir
parquet = (
get_data_dir() / "extracts" / source_type / "data" / f"{table_id}.parquet"
@ -204,7 +176,7 @@ def build_schema(
"where_dialect_hints": {},
}
_schema_cache.set(cache_key, payload)
_schema_cache.set(table_id, payload)
return payload

View file

@ -113,6 +113,7 @@ from app.api.store import router as store_router
from app.api.my_stack import router as my_stack_router
from app.api.welcome import router as welcome_router
from app.api.claude_md import router as claude_md_router
from app.api.cache_warmup import router as cache_warmup_router
from app.marketplace_server.router import router as marketplace_server_router
from app.marketplace_server.git_router import make_git_wsgi_app
from app.web.router import router as web_router
@ -147,6 +148,9 @@ async def lifespan(app):
except Exception as e:
logger.warning("failed to bump anyio thread pool capacity: %s", e)
from app.api.cache_warmup import maybe_schedule_startup_warmup
maybe_schedule_startup_warmup()
yield
from src.db import close_system_db
close_system_db()
@ -552,6 +556,7 @@ def create_app() -> FastAPI:
app.include_router(my_stack_router)
app.include_router(welcome_router)
app.include_router(claude_md_router)
app.include_router(cache_warmup_router)
app.include_router(marketplace_server_router)
# Git smart-HTTP endpoint for Claude Code: /marketplace.git/*

View file

@ -871,6 +871,25 @@
<!-- ═══════════════ CONTENT ═══════════════ -->
<div class="content">
<section id="cacheWarmupCard" class="card" style="margin-bottom: 20px;">
<header class="card-header" style="display: flex; justify-content: space-between; align-items: center;">
<h2>Cache freshness</h2>
<button class="btn btn-secondary" id="cacheWarmupRunBtn" onclick="cacheWarmupRun()">
Re-warm all
</button>
</header>
<div class="card-body">
<div id="cacheWarmupProgress" style="margin-bottom: 8px;">
<span id="cacheWarmupSummary">Loading…</span>
</div>
<progress id="cacheWarmupBar" max="100" value="0" style="width: 100%; display: none;"></progress>
<details style="margin-top: 8px;">
<summary style="cursor: pointer; user-select: none;">Show log</summary>
<pre id="cacheWarmupLog" style="background: #0a0a0a; color: #dcdcdc; font-family: ui-monospace, Menlo, monospace; font-size: 12px; padding: 8px; max-height: 240px; overflow-y: auto; margin-top: 8px; border-radius: 4px;"></pre>
</details>
</div>
</section>
{# Phase D: tab-split scaffold. Per-connector tabs (BigQuery /
Keboola / Jira) replace the single mixed form. Each tab has its
own Register button + listing div + (later) form modals. The
@ -1080,7 +1099,9 @@
</div>
<div class="form-group">
<label class="form-label">How should analysts access this data?</label>
<label class="form-label">How should analysts access this data?
<a href="docs/admin/query-modes.md" target="_blank" title="When to use which mode" style="margin-left: 6px; text-decoration: none; cursor: help;">?</a>
</label>
<div style="display:flex; gap:12px; margin-top:6px;">
<label style="flex:1; padding:10px; border:1px solid var(--border); border-radius:8px; cursor:pointer;">
<input type="radio" name="editBqAccessMode" value="live" onchange="onEditBqAccessModeChange()">
@ -2880,6 +2901,175 @@
loadRegistry();
// ── Cache warmup toolbar (issue #155 / #156) ────────────────
let cacheWarmupSource = null;
function _cacheWarmupClearPollFallback() {
if (window._cacheWarmupPollInterval) {
clearInterval(window._cacheWarmupPollInterval);
window._cacheWarmupPollInterval = null;
}
}
function cacheWarmupInit() {
cacheWarmupRefreshSnapshot();
cacheWarmupOpenStream();
}
function cacheWarmupRefreshSnapshot() {
fetch('/api/admin/cache-warmup/status')
.then(function(r) { return r.json(); })
.then(function(state) { cacheWarmupRender(state); })
.catch(function() { /* silent */ });
}
function cacheWarmupOpenStream() {
try {
cacheWarmupSource = new EventSource('/api/admin/cache-warmup/stream');
cacheWarmupSource.addEventListener('start', cacheWarmupOnStart);
cacheWarmupSource.addEventListener('row', cacheWarmupOnRow);
cacheWarmupSource.addEventListener('complete', cacheWarmupOnComplete);
cacheWarmupSource.addEventListener('snapshot', function(e) {
_cacheWarmupClearPollFallback();
cacheWarmupRender(JSON.parse(e.data));
});
cacheWarmupSource.onerror = function() {
if (cacheWarmupSource) {
cacheWarmupSource.close();
cacheWarmupSource = null;
}
// Continuous polling fallback. Try to re-open SSE every 30 s in
// case the proxy / network heals. Only one polling interval at a
// time (prevent stacking on repeated errors).
if (!window._cacheWarmupPollInterval) {
window._cacheWarmupPollInterval = setInterval(
cacheWarmupRefreshSnapshot, 3000
);
setTimeout(function tryReconnect() {
if (cacheWarmupSource) return; // already reconnected
try {
clearInterval(window._cacheWarmupPollInterval);
window._cacheWarmupPollInterval = null;
cacheWarmupOpenStream(); // recursive — onerror retries again
} catch (e) {
window._cacheWarmupPollInterval = setInterval(
cacheWarmupRefreshSnapshot, 3000
);
setTimeout(tryReconnect, 30000);
}
}, 30000);
}
};
} catch (e) {
setInterval(cacheWarmupRefreshSnapshot, 3000);
}
}
function cacheWarmupRender(state) {
var summary = document.getElementById('cacheWarmupSummary');
var bar = document.getElementById('cacheWarmupBar');
var btn = document.getElementById('cacheWarmupRunBtn');
if (!summary) return;
if (!state || state.state === 'never_run') {
summary.textContent = 'No cache warmup yet — click Re-warm all to start.';
bar.style.display = 'none';
btn.disabled = false;
return;
}
var inProgress = state.completed_at === null || state.completed_at === undefined;
var pct = state.total > 0 ? Math.round((state.completed * 100) / state.total) : 0;
summary.textContent = inProgress
? state.completed + ' / ' + state.total + ' fresh — running…'
: 'Last run: ' + state.completed + ' ok, ' + state.failed + ' errors';
bar.style.display = 'block';
bar.value = pct;
btn.disabled = inProgress;
if (state.rows) {
for (var tid in state.rows) {
cacheWarmupSetRowBadge(tid, state.rows[tid]);
}
}
}
function cacheWarmupOnStart(e) {
_cacheWarmupClearPollFallback();
var data = JSON.parse(e.data);
var log = document.getElementById('cacheWarmupLog');
log.textContent = '';
cacheWarmupAppendLog(
'[' + nowHHMMSS() + '] start trigger=' + data.trigger + ' total=' + data.total
);
cacheWarmupRefreshSnapshot();
}
function cacheWarmupOnRow(e) {
_cacheWarmupClearPollFallback();
var rs = JSON.parse(e.data);
cacheWarmupAppendLog(
'[' + nowHHMMSS() + '] ' + rs.status.padEnd(7) + ' ' + rs.table_id +
(rs.duration_ms ? ' (' + (rs.duration_ms / 1000).toFixed(1) + ' s)' : '') +
(rs.error ? ' ' + rs.error : '')
);
cacheWarmupSetRowBadge(rs.table_id, rs);
cacheWarmupRefreshSnapshot();
}
function cacheWarmupOnComplete(e) {
_cacheWarmupClearPollFallback();
var data = JSON.parse(e.data);
cacheWarmupAppendLog(
'[' + nowHHMMSS() + '] complete total=' + data.total +
' ok=' + data.completed + ' fail=' + data.failed
);
cacheWarmupRefreshSnapshot();
}
function cacheWarmupAppendLog(line) {
var log = document.getElementById('cacheWarmupLog');
if (!log) return;
log.textContent += line + '\n';
log.scrollTop = log.scrollHeight;
}
function cacheWarmupSetRowBadge(tableId, rs) {
document.querySelectorAll('tr').forEach(function(tr) {
var idCell = tr.querySelector('td.col-id');
if (!idCell || idCell.textContent.trim() !== tableId) return;
var statusCell = tr.querySelector('td.col-status');
if (!statusCell) return;
var color = {fresh: '#10B77F', warming: '#0073D1', pending: '#9CA3AF', error: '#EA580C'}[rs.status] || '#9CA3AF';
var label = rs.status === 'fresh' ? 'fresh' : rs.status;
// Build via DOM API so rs.error escapes safely into the title
// attribute (XSS guard — rs.error is server-derived, may contain
// quotes / angle brackets).
var span = document.createElement('span');
span.style.cssText =
'display:inline-block;padding:2px 6px;border-radius:3px;' +
'font-size:11px;background:' + color + ';color:white;';
if (rs.error) span.setAttribute('title', rs.error);
span.textContent = label;
statusCell.replaceChildren(span);
});
}
function nowHHMMSS() {
var d = new Date();
return d.toTimeString().slice(0, 8);
}
function cacheWarmupRun() {
var btn = document.getElementById('cacheWarmupRunBtn');
btn.disabled = true;
fetch('/api/admin/cache-warmup/run', {method: 'POST'})
.then(function(r) { return r.json(); })
.then(function() { /* SSE stream picks up the new run */ })
.catch(function() { btn.disabled = false; });
}
document.addEventListener('DOMContentLoaded', cacheWarmupInit);
</script>
{% include "_version_badge.html" %}
</body>

View file

@ -231,6 +231,15 @@ def register_table(
f"`agnes admin grant create <group> table {name}` to "
f"make this visible in `agnes catalog` for non-admin users."
)
# Third hint: BQ-remote rows can fail at first analyst query if the
# SA lacks dataViewer/jobUser. Pointing at the smoke command
# surfaces the failure at registration time, not 30 minutes later.
if query_mode == "remote":
typer.echo(
f" Note: this is a remote-query table. Verify the SA can read it:\n"
f" agnes query --remote \"SELECT COUNT(*) FROM {name}\"\n"
f" If it 403s, see docs/admin/query-modes.md → \"BigQuery → IAM\"."
)
elif resp.status_code == 409:
typer.echo(f"Already exists: {name}")
else:

View file

@ -610,6 +610,67 @@ class BqAccess:
yield conn
def _fetch_bq_columns_full_impl(bq, dataset: str, table: str) -> list[dict]:
"""Implementation that raises on BQ errors. Returns the column list
or raises the original BQ exception. Validates identifiers; raises
``ValueError`` on bad shape. Sentinel-config (``bq.projects.data == ""``)
surfaces via ``bq.client()`` raising ``BqAccessError(not_configured)``.
Used by callers that need typed exceptions for HTTP status
classification currently only ``app/api/v2_schema._fetch_bq_schema``
via ``translate_bq_error``.
"""
from src.identifier_validation import validate_quoted_identifier
if not bq.projects.data:
bq.client() # raises BqAccessError(not_configured)
if not (validate_quoted_identifier(bq.projects.data, "BQ project")
and validate_quoted_identifier(dataset, "BQ dataset")
and validate_quoted_identifier(table, "BQ source_table")):
raise ValueError("unsafe BQ identifier in registry — refusing to query")
bq_sql = (
f"SELECT column_name, data_type, is_nullable, "
f" is_partitioning_column, clustering_ordinal_position "
f"FROM `{bq.projects.data}.{dataset}.INFORMATION_SCHEMA.COLUMNS` "
f"WHERE table_name = ? ORDER BY ordinal_position"
)
with bq.duckdb_session() as conn:
rows = conn.execute(
"SELECT * FROM bigquery_query(?, ?, ?)",
[bq.projects.billing, bq_sql, table],
).fetchall()
return [
{
"name": r[0],
"type": r[1],
"nullable": r[2] == "YES",
"is_partitioning_column": r[3] == "YES",
"clustering_ordinal_position": r[4],
}
for r in rows
]
def fetch_bq_columns_full(bq, dataset: str, table: str) -> list[dict] | None:
"""Best-effort wrapper around ``_fetch_bq_columns_full_impl`` — returns
``None`` on any failure (sentinel-unconfigured, unsafe identifier, BQ
query exception). Does NOT raise. For callers that don't need typed
exceptions (the metadata provider; the partition/cluster path of
v2_schema).
"""
try:
return _fetch_bq_columns_full_impl(bq, dataset, table)
except Exception as e:
logger.warning(
"BQ COLUMNS fetch failed for %s.%s.%s: %s",
bq.projects.data, dataset, table, e,
)
return None
@functools.cache
def get_bq_access() -> BqAccess:
"""Module-level FastAPI Depends target. Resolves projects from config and returns

View file

@ -0,0 +1,196 @@
"""BigQuery metadata provider — populates `TableMetadata` for a remote
BQ-backed registry row.
Two queries (different INFORMATION_SCHEMA scopes TABLE_STORAGE is
region-scoped, COLUMNS is dataset-scoped, can't be combined):
1. INFORMATION_SCHEMA.TABLE_STORAGE total_rows + active+long_term
bytes. Region-portable per Google's docs; only valid via
`<project>.region-<region>.INFORMATION_SCHEMA.TABLE_STORAGE`
(verified live 2026-05-07; dataset-scoped TABLE_STORAGE doesn't
exist).
2. INFORMATION_SCHEMA.COLUMNS partition_by + clustered_by. Reuses
the consolidated `fetch_bq_columns_full` helper that v2_schema also
calls; one shared shape, one round-trip.
Region resolution chain: `instance.yaml.data_source.bigquery.location`
`bq.client().get_dataset(...)` fall back to legacy `__TABLES__`
(dataset-scoped, no region required).
VIEW handling: TABLE_STORAGE returns no rows for entries whose
`table_type='VIEW'`; the legacy `__TABLES__` fallback also doesn't list
views. The provider returns `TableMetadata(rows=None, size_bytes=None,
partition_by=<from COLUMNS>, clustered_by=<from COLUMNS>)` analyst
Claude reads `null` size and applies the existing CLAUDE.md guidance.
`size_bytes` reports `active_logical_bytes + long_term_logical_bytes`
(a full BQ scan reads both reporting only active undercounts aged
partitioned tables).
"""
from __future__ import annotations
import logging
from app.api._metadata_models import MetadataRequest, TableMetadata
from app.instance_config import get_value
from connectors.bigquery.access import (
BqAccessError, fetch_bq_columns_full, get_bq_access,
)
logger = logging.getLogger(__name__)
def fetch(req: MetadataRequest) -> TableMetadata | None:
try:
bq = get_bq_access()
except BqAccessError:
return None
if not bq.projects.data:
return None
rows_size = _fetch_rows_and_size(bq, req)
columns = fetch_bq_columns_full(bq, req.bucket, req.source_table)
part_clust = _derive_partition_cluster(columns) if columns else None
if rows_size is None and part_clust is None:
return None
return TableMetadata(
rows=(rows_size or {}).get("rows"),
size_bytes=(rows_size or {}).get("size_bytes"),
partition_by=(part_clust or {}).get("partition_by"),
clustered_by=(part_clust or {}).get("clustered_by"),
)
def _derive_partition_cluster(columns: list[dict]) -> dict | None:
"""Mirror v2_schema._fetch_bq_table_options derivations from the
shared columns-full result."""
if not columns:
return None
partition_by = next(
(c["name"] for c in columns if c["is_partitioning_column"]),
None,
)
clustered = sorted(
(c for c in columns if c["clustering_ordinal_position"] is not None),
key=lambda c: c["clustering_ordinal_position"],
)
clustered_by = [c["name"] for c in clustered]
return {"partition_by": partition_by, "clustered_by": clustered_by}
def _fetch_rows_and_size(bq, req: MetadataRequest) -> dict | None:
"""Resolve rows + size_bytes via TABLE_STORAGE → __TABLES__ fallthrough.
See module docstring + spec Open Question §1 for view-path nuance.
"""
location = _resolve_bq_location(bq, req)
if location:
result = _fetch_via_table_storage(bq, req, location)
if result is not None:
return result
# TABLE_STORAGE returned None despite having a location: could
# be a typo in `data_source.bigquery.location`, a multi-region
# dataset operator misclassified, the table is a VIEW, or a
# transient permission gap. Try __TABLES__ before giving up.
return _fetch_via_legacy_tables(bq, req)
def _resolve_bq_location(bq, req: MetadataRequest) -> str | None:
"""instance.yaml.location → REST get_dataset → None."""
cfg_location = (get_value("data_source", "bigquery", "location") or "").strip()
if cfg_location:
return cfg_location
try:
ds = bq.client().get_dataset(
f"{bq.projects.data}.{req.bucket}"
)
return ds.location
except Exception as e:
logger.warning(
"BQ dataset.get failed for %s.%s — falling back to __TABLES__: %s",
bq.projects.data, req.bucket, e,
)
return None
def _fetch_via_table_storage(bq, req: MetadataRequest, location: str) -> dict | None:
"""Region-scoped INFORMATION_SCHEMA.TABLE_STORAGE — preferred path.
`validate_quoted_identifier` accepts `us-central1`, `europe-west1`,
`EU`, `us` etc. (regex `^[a-zA-Z0-9_][a-zA-Z0-9_.\\-]{0,127}$`).
Refuses anything that could break out of the backtick-quoted path.
Returns None on no-row (table is a VIEW, or different region than
configured) caller decides whether to fall through.
`size_bytes` is `active + long_term` logical bytes (a full BQ scan
reads both; reporting only active undercounts aged partitioned tables).
"""
from src.identifier_validation import validate_quoted_identifier
if not validate_quoted_identifier(location, "BQ region"):
return None
# `req.bucket` / `req.source_table` are pre-validated by the
# dispatcher; `location` is validated locally above because it
# originates from instance.yaml, not from the registry row.
try:
bq_sql = (
f"SELECT total_rows, "
f"IFNULL(active_logical_bytes, 0) + IFNULL(long_term_logical_bytes, 0) "
f"FROM `{bq.projects.data}.region-{location}.INFORMATION_SCHEMA.TABLE_STORAGE` "
f"WHERE table_schema = ? AND table_name = ?"
)
with bq.duckdb_session() as conn:
row = conn.execute(
"SELECT * FROM bigquery_query(?, ?, ?, ?)",
[bq.projects.billing, bq_sql, req.bucket, req.source_table],
).fetchone()
except Exception as e:
logger.warning(
"BQ TABLE_STORAGE fetch failed for %s.%s.%s: %s",
bq.projects.data, req.bucket, req.source_table, e,
)
return None
if row is None:
return None # VIEW or wrong region
rows_, size_bytes = row
return {
"rows": int(rows_) if rows_ is not None else None,
"size_bytes": int(size_bytes) if size_bytes is not None else None,
}
def _fetch_via_legacy_tables(bq, req: MetadataRequest) -> dict | None:
"""Last-resort dataset-scoped __TABLES__ — works without region."""
# `req.bucket` and `req.source_table` are pre-validated by
# `app/api/v2_catalog._build_metadata_request` via
# `validate_quoted_identifier` before MetadataRequest construction;
# safe to interpolate into the backtick-quoted path here.
try:
bq_sql = (
f"SELECT row_count, size_bytes "
f"FROM `{bq.projects.data}.{req.bucket}.__TABLES__` "
f"WHERE table_id = ?"
)
with bq.duckdb_session() as conn:
row = conn.execute(
"SELECT * FROM bigquery_query(?, ?, ?)",
[bq.projects.billing, bq_sql, req.source_table],
).fetchone()
except Exception as e:
logger.warning(
"BQ __TABLES__ fetch failed for %s.%s.%s: %s",
bq.projects.data, req.bucket, req.source_table, e,
)
return None
if row is None:
return None
rows_, size_bytes = row
return {
"rows": int(rows_) if rows_ is not None else None,
"size_bytes": int(size_bytes) if size_bytes is not None else None,
}

View file

@ -0,0 +1,52 @@
"""Keboola metadata provider — populates `TableMetadata` for a Keboola
registry row via the Storage API.
Reuses `KeboolaClient(token=None, url=None)` to inherit the existing
env-var fallback path (`KEBOOLA_STACK_URL` + `KEBOOLA_STORAGE_TOKEN`),
which is the same hierarchy `connectors/keboola/extractor.py` and
`connectors/keboola/client.py` already use. **Does NOT introduce a third
token-resolution helper.**
"""
from __future__ import annotations
import logging
import os
from app.api._metadata_models import MetadataRequest, TableMetadata
from connectors.keboola.storage_api import (
KeboolaStorageClient,
StorageApiError,
)
logger = logging.getLogger(__name__)
def fetch(req: MetadataRequest) -> TableMetadata | None:
"""Return Keboola Storage API metadata for the given table, or None.
Keboola has no BigQuery-style partition/cluster concept; primaryKey is
conceptually different (uniqueness, not physical layout), so
`partition_by` and `clustered_by` are left None.
"""
# Read credentials the same way KeboolaClient does — avoids constructing
# a KeboolaClient which raises ValueError when the token is absent.
url = os.environ.get("KEBOOLA_STACK_URL", "")
token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
if not url or not token:
return None # not configured — same posture as BQ sentinel
table_id = f"{req.bucket}.{req.source_table}"
try:
storage = KeboolaStorageClient(url=url, token=token)
info = storage.get_table_info(table_id)
except (StorageApiError, ValueError) as e:
logger.warning("Keboola metadata fetch failed for %s: %s", table_id, e)
return None
return TableMetadata(
rows=info.get("rowsCount"),
size_bytes=info.get("dataSizeBytes"),
partition_by=None,
clustered_by=None,
)

View file

@ -285,6 +285,16 @@ class KeboolaStorageClient:
via `wait_for_job` to find the file id when status='success'."""
return self._post(f"/tables/{table_id}/export-async", data=params)
def get_table_info(self, table_id: str) -> dict:
"""GET /v2/storage/tables/{table_id} — full table metadata.
Storage API guarantees `rowsCount` + `dataSizeBytes` on success.
Other fields (`columns`, `primaryKey`, ...) are present but not
consumed by the metadata provider today. Raises `StorageApiError`
on 4xx/5xx caller decides whether to soften to `None`.
"""
return self._get(f"/tables/{table_id}")
def wait_for_job(
self,
job_id: int,

116
docs/admin/query-modes.md Normal file
View file

@ -0,0 +1,116 @@
# Query Modes — when to register a table as `local`, `remote`, or `materialized`
Source-agnostic guide to the three `query_mode` values Agnes supports. Pick the right mode at registration time and the analyst-side experience is fast, cost-aware, and predictable. Pick wrong and you'll either burn BQ scan budget on every query or spend hours waiting on syncs that didn't need to happen.
## TL;DR — decision tree
```
Is the table small (< 1 GB) and updated daily-or-slower?
└─ YES → query_mode: local (sync to laptop, query offline)
Is the table the result of an aggregate SQL the operator controls?
└─ YES → query_mode: materialized (server runs SQL → parquet, distributed)
Otherwise:
└─ query_mode: remote (data stays in upstream; analyst queries on demand)
```
## Three modes side-by-side
| Aspect | `local` | `materialized` | `remote` |
|---|---|---|---|
| Where the data lives | Analyst laptop (parquet) | Agnes server filesystem (parquet) | Upstream (BigQuery, Keboola, …) |
| Who runs the query | Analyst's local DuckDB | Analyst's local DuckDB | Upstream engine via DuckDB extension |
| Cost model | Free after sync | Free after each sync | Per-query scan cost on the analyst's first hit |
| Freshness | As fresh as last sync | As fresh as last scheduled run | Live |
| Scan limits | None (laptop disk) | None (server disk) | `bq_max_scan_bytes` cost gate (default 5 GiB) |
| Best for | Stable reference data, daily-updated facts | Aggregates, daily snapshots | Big tables, live data, residency-restricted |
## Per-source-type reference
### BigQuery — `query_mode: remote`
The most common use case for `remote`. Data stays in BQ; analysts query on demand via the Agnes server's service account.
**IAM:** the server's SA must have:
- `roles/bigquery.dataViewer` on the dataset (read access)
- `roles/bigquery.jobUser` on the *billing* project (run jobs)
If `data_source.bigquery.billing_project == data_source.bigquery.project`, set the SA's `serviceusage.services.use` permission too — the BQ extension can otherwise 403 USER_PROJECT_DENIED on the first query. The instance health check (`agnes diagnose`) surfaces this as an `info`-tier entry on `bq_config`.
**Register via UI:** `/admin/tables` → "Add table" → Source type `bigquery` → Mode `remote` → fill `dataset` (your BQ dataset name) + `source_table` (the BQ table id within that dataset).
**Register via CLI:**
```bash
agnes admin register-table sales_2024 \
--source-type bigquery \
--bucket dwh_base \
--source-table sales_2024 \
--query-mode remote
```
After registration, smoke-test the SA's access:
```bash
agnes query --remote "SELECT COUNT(*) FROM sales_2024"
```
A 403 here means the SA is missing `dataViewer` or `jobUser`; fix in IAM and re-test.
**Cost guardrail:** `bq_max_scan_bytes` (default 5 GiB) refuses queries whose pre-execution scan estimate exceeds the cap. Configurable in `/admin/server-config`. When an analyst hits the cap, the response includes a hint to use `agnes snapshot create --where '<predicate>'` to materialise a scoped subset locally.
### BigQuery — `query_mode: materialized`
The server runs a scheduled SQL aggregate against BigQuery and writes the result to a parquet on the Agnes filesystem. Analysts get the parquet via `agnes pull` like any other local table.
**Register via CLI:**
```bash
agnes admin register-table monthly_kpis \
--source-type bigquery \
--bucket dwh_base \
--source-table monthly_kpis \
--query-mode materialized \
--query @path/to/monthly_kpis.sql \
--sync-schedule "daily 03:00"
```
**Cost guardrail:** `data_source.bigquery.max_bytes_per_materialize` (default 10 GiB; set `0` to disable) refuses materialise runs whose query plan exceeds the cap. Catches a typo'd `WHERE` clause that would otherwise scan a year of data.
### Keboola — `query_mode: local` (the production path)
The Agnes server's Keboola DuckDB extension downloads the table to a parquet on the server filesystem; `agnes pull` distributes it to analyst laptops.
**Setup:** `instance.yaml.data_source.type: keboola` + Storage API token via `KEBOOLA_STORAGE_TOKEN` env var (or whatever `instance.yaml.token_env` points at).
**Register via CLI:**
```bash
agnes admin register-table users \
--source-type keboola \
--bucket in.c-crm \
--source-table users \
--query-mode local
```
**`query_mode: remote` for Keboola** is architecturally supported via the `_remote_attach` mechanism (the orchestrator can ATTACH the Keboola DuckDB extension on demand the same way it does for BQ), but **not in active deployment use today**. If you have an analyst workflow against a Keboola table that's too big to sync, file an issue — the architecture is in place but the registration UX hasn't been polished.
### Jira — `query_mode: local` only
Event-driven: webhooks update parquets incrementally. No `remote` or `materialized` mode for Jira today.
## Worked examples
**1. Big BigQuery fact table you query weekly:** `query_mode: remote`. SA needs `dataViewer` + `jobUser`. Analyst uses `agnes query --remote` for one-off aggregates and `agnes snapshot create` for cross-week joins.
**2. Daily Keboola dimension table:** `query_mode: local`. Synced once a day by the scheduler; analyst's `agnes pull` picks it up.
**3. Monthly KPI aggregate from a BQ datawarehouse:** `query_mode: materialized` + `--sync-schedule "0 3 1 * *"` (3:00 on the 1st of each month). The server runs your aggregate SQL once a month; analysts get a parquet of the result.
## See also
- `docs/RBAC.md` — granting analysts access to a registered table.
- `config/instance.yaml.example` — the `data_source` config block.
- `agnes catalog --json` — inspect a registered table's mode + size hints.
- `agnes diagnose` — surface `bq_config` IAM issues and other health entries.

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[project]
name = "agnes-the-ai-analyst"
version = "0.46.5"
version = "0.47.0"
description = "Agnes — AI Data Analyst platform for AI analytical systems"
requires-python = ">=3.11,<3.14"
license = "MIT"
@ -76,6 +76,7 @@ dependencies = [
# directly via `requests` — no SDK dependency on the data-path side. The
# SDK stays for the metadata reads.
"kbcstorage>=0.9.0",
"sse-starlette>=2.0",
]
[project.optional-dependencies]

View file

@ -81,7 +81,29 @@ def _reset_module_caches():
_q._quota_singleton = None
except ImportError:
pass
try:
from app.api import v2_catalog as _vc
_vc._table_rows_cache.clear()
_vc._metadata_cache.clear()
except (ImportError, AttributeError):
pass
try:
import app.api.cache_warmup as _cw
_cw.WARMUP_STATE = None
except (ImportError, AttributeError):
pass
yield
try:
from app.api import v2_catalog as _vc
_vc._table_rows_cache.clear()
_vc._metadata_cache.clear()
except (ImportError, AttributeError):
pass
try:
import app.api.cache_warmup as _cw
_cw.WARMUP_STATE = None
except (ImportError, AttributeError):
pass
@pytest.fixture

View file

@ -0,0 +1,35 @@
"""Smoke test that /admin/tables HTML contains the cache toolbar markup,
the EventSource wiring, and the per-row col-status slot."""
def test_cache_toolbar_present(seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.get(
"/admin/tables", headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200, r.text
body = r.text
assert 'id="cacheWarmupCard"' in body
assert "Re-warm all" in body
assert "/api/admin/cache-warmup/stream" in body
assert "EventSource" in body
def test_query_mode_doc_link_present(seeded_app):
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.get(
"/admin/tables", headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
assert "query-modes" in r.text # link to docs/admin/query-modes.md or rendered URL
def test_col_status_th_present_in_renderer(seeded_app):
"""The renderRegistryListing JS still emits <th class='col-status'>
so the per-row badge slot exists."""
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.get("/admin/tables", headers={"Authorization": f"Bearer {token}"})
assert 'col-status' in r.text

154
tests/test_cache_warmup.py Normal file
View file

@ -0,0 +1,154 @@
"""Cache warmup framework — state, bg task, endpoints."""
import asyncio
from unittest.mock import patch
from app.api.cache_warmup import WarmupRunState
def test_warmup_run_state_starts_empty():
from app.api.cache_warmup import WARMUP_STATE
assert WARMUP_STATE is None or WARMUP_STATE.completed_at is not None
def test_warmup_skips_when_env_set(monkeypatch):
"""AGNES_SKIP_CACHE_WARMUP=1 → background warmup is a no-op."""
monkeypatch.setenv("AGNES_SKIP_CACHE_WARMUP", "1")
from app.api import cache_warmup
# When the env opt-out is set, maybe_schedule_startup_warmup must
# NOT call _warm_catalog_caches_bg.
with patch.object(cache_warmup, "_warm_catalog_caches_bg") as mock_bg:
cache_warmup.maybe_schedule_startup_warmup()
mock_bg.assert_not_called()
def test_warmup_runs_one_per_remote_row(monkeypatch):
"""`_warm_catalog_caches_bg` calls `_warm_one` once per remote row.
Uses asyncio.run rather than @pytest.mark.asyncio to match the
convention in this repo (see tests/test_selective_gzip.py).
"""
from app.api import cache_warmup
# Stub the registry to return 3 remote BQ rows + 1 local row.
fake_rows = [
{"id": "r1", "query_mode": "remote", "source_type": "bigquery"},
{"id": "r2", "query_mode": "remote", "source_type": "bigquery"},
{"id": "r3", "query_mode": "remote", "source_type": "bigquery"},
]
warmed = []
async def fake_warm_one(row, state, sem):
warmed.append(row["id"])
monkeypatch.setattr(cache_warmup, "_list_remote_rows", lambda: fake_rows)
monkeypatch.setattr(cache_warmup, "_warm_one", fake_warm_one)
asyncio.run(cache_warmup._warm_catalog_caches_bg(trigger="manual"))
assert sorted(warmed) == ["r1", "r2", "r3"]
def test_status_endpoint_before_first_run(seeded_app, monkeypatch):
"""GET /status returns {state: never_run} before any warmup."""
from app.api import cache_warmup
monkeypatch.setattr(cache_warmup, "WARMUP_STATE", None)
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.get(
"/api/admin/cache-warmup/status",
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
assert r.json() == {"state": "never_run"}
def test_run_endpoint_starts_warmup(seeded_app, monkeypatch):
"""POST /run schedules a warmup and returns 200."""
from app.api import cache_warmup
monkeypatch.setattr(cache_warmup, "WARMUP_STATE", None)
# Patch the actual warmup so the test doesn't run a real one.
monkeypatch.setattr(cache_warmup, "_warm_catalog_caches_bg",
lambda trigger="manual", state=None: _async_noop())
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/admin/cache-warmup/run",
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
def test_run_endpoint_returns_run_id_not_none(seeded_app, monkeypatch):
"""POST /run returns a non-null run_id even when the bg task hasn't
started running yet (no race between create_task and the handler return)."""
from app.api import cache_warmup
async def fake_bg(trigger="manual", state=None):
await asyncio.sleep(0.01) # don't actually warm
monkeypatch.setattr(cache_warmup, "WARMUP_STATE", None)
monkeypatch.setattr(cache_warmup, "_warm_catalog_caches_bg", fake_bg)
c = seeded_app["client"]
token = seeded_app["admin_token"]
r = c.post(
"/api/admin/cache-warmup/run",
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
body = r.json()
assert body["status"] == "started"
assert body["run_id"] is not None
assert len(body["run_id"]) == 8 # uuid4 hex prefix
def test_list_remote_rows_filters_to_bigquery_source_type(monkeypatch):
"""Devin Review #1 regression: `_list_remote_rows` previously returned
every `query_mode='remote'` row regardless of `source_type`. The downstream
`_warm_schema_sync` always calls `get_bq_access()`, so a non-BQ remote row
(hypothetical today, plausible as connectors expand) would crash the
warmup pass.
Fix: filter on `source_type == 'bigquery'` in `_list_remote_rows` so the
BQ-only warmup path only sees rows it can handle. Rows from other sources
are simply skipped they'll grow their own warmup paths as needed."""
from app.api import cache_warmup
fake_rows = [
{"id": "bq_remote", "query_mode": "remote", "source_type": "bigquery"},
{"id": "kbc_remote", "query_mode": "remote", "source_type": "keboola"},
{"id": "bq_local", "query_mode": "local", "source_type": "bigquery"},
{"id": "future_remote", "query_mode": "remote", "source_type": "snowflake"},
{"id": "bq_remote2", "query_mode": "remote", "source_type": "bigquery"},
]
class FakeRepo:
def __init__(self, conn):
pass
def list_all(self):
return fake_rows
class FakeConn:
def close(self):
pass
monkeypatch.setattr(
"src.repositories.table_registry.TableRegistryRepository", FakeRepo,
)
monkeypatch.setattr(
"src.db.get_system_db", lambda: FakeConn(),
)
result = cache_warmup._list_remote_rows()
ids = sorted(r["id"] for r in result)
assert ids == ["bq_remote", "bq_remote2"], (
f"only remote+bigquery rows should be warmed, got {ids}"
)
async def _async_noop():
return None

View file

@ -264,6 +264,37 @@ class TestUpdateTable:
assert result.exit_code == 1
class TestRegisterTableHints:
"""The CLI prints helpful follow-up hints after a successful
register-table call. v0.46 adds a third hint for query_mode=remote
pointing at the IAM verify-your-SA smoke check."""
def test_remote_register_emits_iam_verify_hint(self):
with patch("cli.commands.admin.api_post", return_value=_resp(201, {"id": "t"})):
result = runner.invoke(app, [
"admin", "register-table", "orders",
"--source-type", "bigquery",
"--bucket", "dwh_base",
"--source-table", "orders",
"--query-mode", "remote",
])
assert result.exit_code == 0
assert "agnes query --remote" in result.output
assert "query-modes.md" in result.output
def test_local_register_does_not_emit_remote_hint(self):
with patch("cli.commands.admin.api_post", return_value=_resp(201, {"id": "t"})):
result = runner.invoke(app, [
"admin", "register-table", "users",
"--source-type", "keboola",
"--bucket", "in.c-crm",
"--source-table", "users",
"--query-mode", "local",
])
assert result.exit_code == 0
assert "agnes query --remote" not in result.output
def test_admin_set_role_returns_hardfail():
"""v19: `agnes admin set-role` was removed. Calling it must hard-fail
with a non-zero exit code and a message pointing at the replacement

View file

@ -0,0 +1,246 @@
"""BigQuery metadata provider — 5 paths from spec test plan:
happy / sentinel / VIEW / region-typo / both-paths-fail."""
from unittest.mock import MagicMock, patch
import pytest
from app.api._metadata_models import MetadataRequest, TableMetadata
@pytest.fixture
def req():
return MetadataRequest(
table_id="orders", bucket="dwh_base", source_table="orders_2024",
)
def _bq_with_session(table_storage_rows=None, columns_rows=None,
table_storage_raises=None, columns_raises=None,
legacy_tables_rows=None, legacy_tables_raises=None,
projects_data="data-proj", projects_billing="billing-proj"):
"""Mock `BqAccess` whose `duckdb_session()` returns a context manager
routing `.execute(...)` based on the inner SQL string."""
bq = MagicMock()
bq.projects.data = projects_data
bq.projects.billing = projects_billing
def execute(outer_sql, params):
inner_sql = params[1] if len(params) > 1 else ""
if "TABLE_STORAGE" in inner_sql:
if table_storage_raises:
raise table_storage_raises
return MagicMock(
fetchone=lambda: table_storage_rows[0] if table_storage_rows else None,
fetchall=lambda: table_storage_rows or [],
)
if "INFORMATION_SCHEMA.COLUMNS" in inner_sql:
if columns_raises:
raise columns_raises
return MagicMock(
fetchall=lambda: columns_rows or [],
)
if "__TABLES__" in inner_sql:
if legacy_tables_raises:
raise legacy_tables_raises
return MagicMock(
fetchone=lambda: legacy_tables_rows[0] if legacy_tables_rows else None,
)
raise AssertionError(f"unexpected SQL: {inner_sql[:80]}")
session = MagicMock()
session.execute.side_effect = execute
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = False
bq.duckdb_session.return_value = cm
return bq
def _location_get_value(*keys, default=None):
"""Mock for `app.instance_config.get_value` matching its multi-positional
signature. Returns 'us-central1' for the BQ location key, default otherwise.
Regression-anchored to Devin Review #1: the prior buggy single-string call
silently dropped the configured location; this fixture intentionally
requires the correct ('data_source', 'bigquery', 'location') tuple."""
if keys == ("data_source", "bigquery", "location"):
return "us-central1"
return default
def test_happy_path_returns_full_metadata(req, monkeypatch):
"""TABLE_STORAGE returns rows+size, COLUMNS returns partition+cluster."""
from connectors.bigquery import metadata
monkeypatch.setattr(
"connectors.bigquery.metadata.get_value",
_location_get_value,
raising=False,
)
bq = _bq_with_session(
table_storage_rows=[(1234567, 5_000_000)],
columns_rows=[
("event_date", "DATE", "NO", "YES", None),
("country", "STRING", "YES", "NO", 1),
("user_id", "STRING", "NO", "NO", None),
],
)
with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq):
result = metadata.fetch(req)
assert result == TableMetadata(
rows=1234567,
size_bytes=5_000_000,
partition_by="event_date",
clustered_by=["country"],
)
def test_sentinel_unconfigured_returns_none_no_query(req):
"""`bq.projects.data == ''` → return None before any query."""
from connectors.bigquery import metadata
bq = _bq_with_session(projects_data="")
with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq):
assert metadata.fetch(req) is None
bq.duckdb_session.assert_not_called()
def test_view_path_returns_metadata_with_null_rows_size(req, monkeypatch):
"""VIEW: TABLE_STORAGE empty + __TABLES__ empty → rows/size = None;
partition + cluster from COLUMNS still surface."""
from connectors.bigquery import metadata
monkeypatch.setattr(
"connectors.bigquery.metadata.get_value",
_location_get_value,
raising=False,
)
bq = _bq_with_session(
table_storage_rows=[], # view → no row
legacy_tables_rows=[], # view also absent from __TABLES__
columns_rows=[
("event_date", "DATE", "NO", "YES", None),
],
)
with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq):
result = metadata.fetch(req)
assert result is not None
assert result.rows is None
assert result.size_bytes is None
assert result.partition_by == "event_date"
def test_region_typo_falls_through_to_legacy_tables(req, monkeypatch):
"""TABLE_STORAGE raises (typo'd region) → fall through to __TABLES__."""
from connectors.bigquery import metadata
def typo_get_value(*keys, default=None):
if keys == ("data_source", "bigquery", "location"):
return "us-central" # typo!
return default
monkeypatch.setattr(
"connectors.bigquery.metadata.get_value",
typo_get_value,
raising=False,
)
bq = _bq_with_session(
table_storage_raises=RuntimeError("Not found: ..."),
legacy_tables_rows=[(100, 2048)],
columns_rows=[("event_date", "DATE", "NO", "YES", None)],
)
with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq):
result = metadata.fetch(req)
assert result is not None
assert result.rows == 100
assert result.size_bytes == 2048
def test_both_paths_fail_returns_metadata_with_partition_only(req, monkeypatch):
"""Both TABLE_STORAGE and __TABLES__ fail → rows/size None, partition still fills."""
from connectors.bigquery import metadata
monkeypatch.setattr(
"connectors.bigquery.metadata.get_value",
_location_get_value,
raising=False,
)
bq = _bq_with_session(
table_storage_raises=RuntimeError("BQ down"),
legacy_tables_raises=RuntimeError("BQ still down"),
columns_rows=[("event_date", "DATE", "NO", "YES", None)],
)
with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq):
result = metadata.fetch(req)
assert result is not None
assert result.rows is None
assert result.size_bytes is None
assert result.partition_by == "event_date"
def test_location_config_uses_multi_positional_get_value_args(req, monkeypatch):
"""Devin Review #1 regression: `get_value` was called with a single
dot-separated string `'data_source.bigquery.location'`, but the function
iterates over separate positional keys so the call always returned None
and the BQ location config was never read.
This test records every call to `get_value` and asserts that the location
lookup goes through the correct multi-positional form
(`'data_source', 'bigquery', 'location'`)."""
from connectors.bigquery import metadata
calls: list[tuple] = []
def recording_get_value(*keys, default=None):
calls.append(keys)
if keys == ("data_source", "bigquery", "location"):
return "europe-west1"
return default
monkeypatch.setattr(
"connectors.bigquery.metadata.get_value",
recording_get_value,
raising=False,
)
captured: dict = {}
def execute(outer_sql, params):
if "TABLE_STORAGE" in (params[1] if len(params) > 1 else ""):
captured["table_storage_sql"] = params[1]
return MagicMock(fetchone=lambda: (5, 10))
return MagicMock(fetchall=lambda: [], fetchone=lambda: None)
bq = MagicMock()
bq.projects.data = "data-proj"
bq.projects.billing = "billing-proj"
session = MagicMock()
session.execute.side_effect = execute
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = False
bq.duckdb_session.return_value = cm
with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq):
metadata.fetch(req)
# The fix: `get_value("data_source", "bigquery", "location")` must appear.
assert ("data_source", "bigquery", "location") in calls, (
f"expected ('data_source','bigquery','location') tuple in get_value "
f"calls, got: {calls}"
)
# And the configured location must reach the TABLE_STORAGE SQL — proving
# the value was actually consumed, not just looked up.
assert "region-europe-west1" in captured.get("table_storage_sql", ""), (
f"location config was not propagated to BQ SQL: "
f"{captured.get('table_storage_sql', '<no SQL captured>')}"
)
def test_bq_access_error_returns_none(req):
"""get_bq_access() raises BqAccessError → return None gracefully."""
from connectors.bigquery import metadata
from connectors.bigquery.access import BqAccessError
with patch(
"connectors.bigquery.metadata.get_bq_access",
side_effect=BqAccessError("not_configured", "not configured"),
):
assert metadata.fetch(req) is None

View file

@ -0,0 +1,75 @@
"""Keboola metadata provider — happy + unconfigured + api-error paths."""
from unittest.mock import MagicMock, patch
import pytest
from app.api._metadata_models import MetadataRequest, TableMetadata
@pytest.fixture
def req():
return MetadataRequest(
table_id="orders", bucket="in.c-crm", source_table="orders",
)
def test_happy_path_returns_populated_metadata(req, monkeypatch):
from connectors.keboola import metadata
# KeboolaClient(token=None, url=None) reads env vars; pretend they're set.
monkeypatch.setenv("KEBOOLA_STACK_URL", "https://connection.keboola.com")
monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "tok")
with patch("connectors.keboola.metadata.KeboolaStorageClient") as MockStorage:
instance = MockStorage.return_value
instance.get_table_info.return_value = {
"rowsCount": 1234,
"dataSizeBytes": 500_000,
"primaryKey": ["id"],
}
result = metadata.fetch(req)
assert result == TableMetadata(
rows=1234,
size_bytes=500_000,
partition_by=None,
clustered_by=None,
)
def test_returns_none_when_unconfigured(req, monkeypatch):
"""No KEBOOLA_STACK_URL / KEBOOLA_STORAGE_TOKEN env → return None."""
from connectors.keboola import metadata
monkeypatch.delenv("KEBOOLA_STACK_URL", raising=False)
monkeypatch.delenv("KEBOOLA_STORAGE_TOKEN", raising=False)
assert metadata.fetch(req) is None
def test_returns_none_on_storage_api_error(req, monkeypatch):
"""`StorageApiError` from get_table_info → log + return None."""
from connectors.keboola import metadata
from connectors.keboola.storage_api import StorageApiError
monkeypatch.setenv("KEBOOLA_STACK_URL", "https://x.keboola.com")
monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "tok")
with patch("connectors.keboola.metadata.KeboolaStorageClient") as MockStorage:
instance = MockStorage.return_value
instance.get_table_info.side_effect = StorageApiError(
"404 not found", status=404, body={},
)
assert metadata.fetch(req) is None
def test_table_id_uses_bucket_dot_source_table(req, monkeypatch):
"""Storage API path is `<bucket>.<source_table>`."""
from connectors.keboola import metadata
monkeypatch.setenv("KEBOOLA_STACK_URL", "https://x.keboola.com")
monkeypatch.setenv("KEBOOLA_STORAGE_TOKEN", "tok")
with patch("connectors.keboola.metadata.KeboolaStorageClient") as MockStorage:
instance = MockStorage.return_value
instance.get_table_info.return_value = {
"rowsCount": 0, "dataSizeBytes": 0,
}
metadata.fetch(req)
instance.get_table_info.assert_called_once_with("in.c-crm.orders")

View file

@ -518,3 +518,45 @@ class TestParquetPath:
}, dest)
assert dest.read_bytes() == b"PAR1\x00\x00\x00binary"
# ---- get_table_info --------------------------------------------------------
class TestGetTableInfo:
"""`get_table_info` is a thin wrapper around the existing _get path
so the metadata provider doesn't have to bleed `_get` out of the
module (#155)."""
def test_calls_storage_api_with_table_id(self, monkeypatch):
from connectors.keboola.storage_api import KeboolaStorageClient
captured = {}
def fake_get(self, path, **kwargs):
captured["path"] = path
return {"rowsCount": 100, "dataSizeBytes": 4096}
monkeypatch.setattr(KeboolaStorageClient, "_get", fake_get)
client = KeboolaStorageClient(
url="https://connection.keboola.com", token="tok"
)
info = client.get_table_info("in.c-orders.events")
assert captured["path"] == "/tables/in.c-orders.events"
assert info["rowsCount"] == 100
assert info["dataSizeBytes"] == 4096
def test_propagates_storage_api_error(self, monkeypatch):
from connectors.keboola.storage_api import (
KeboolaStorageClient, StorageApiError,
)
def fake_get(self, path, **kwargs):
raise StorageApiError("404 not found", status=404, body={})
monkeypatch.setattr(KeboolaStorageClient, "_get", fake_get)
client = KeboolaStorageClient(url="https://x", token="tok")
import pytest
with pytest.raises(StorageApiError):
client.get_table_info("missing.table")

View file

@ -0,0 +1,31 @@
"""The FastAPI startup hook schedules cache warmup."""
from unittest.mock import patch
def test_startup_handler_calls_warmup_scheduler():
"""A startup handler in app.main calls maybe_schedule_startup_warmup."""
from app.main import app
# FastAPI startup events live on app.router.on_startup OR are
# registered via lifespan. Either way, we should be able to verify
# the scheduler is called.
handlers = list(app.router.on_startup)
handler_names = [getattr(h, "__name__", "?") for h in handlers]
# Either: a named handler that calls warmup, OR a lifespan that does.
has_warmup = any("warm" in n.lower() for n in handler_names)
if not has_warmup:
# Lifespan path — check for the lifespan fn
lifespan = getattr(app.router, "lifespan_context", None)
assert lifespan is not None, (
"Expected a startup handler (or lifespan) that calls "
"cache_warmup.maybe_schedule_startup_warmup. "
f"Found on_startup: {handler_names}"
)
def test_health_check_succeeds_immediately(seeded_app):
"""/api/health doesn't await warmup; readiness is fire-and-forget."""
c = seeded_app["client"]
r = c.get("/api/health")
assert r.status_code == 200

View file

@ -0,0 +1,39 @@
"""Sanity tests for the shared metadata dataclasses."""
from app.api._metadata_models import MetadataRequest, TableMetadata
def test_metadata_request_constructs():
req = MetadataRequest(
table_id="orders", bucket="dwh_base", source_table="orders_2024",
)
assert req.table_id == "orders"
assert req.bucket == "dwh_base"
assert req.source_table == "orders_2024"
def test_metadata_request_is_frozen():
"""Frozen so cache keys derived from a request are stable."""
req = MetadataRequest(table_id="x", bucket="b", source_table="t")
import dataclasses
try:
req.bucket = "other"
except dataclasses.FrozenInstanceError:
return
raise AssertionError("MetadataRequest should be frozen")
def test_table_metadata_all_fields_optional():
tm = TableMetadata()
assert tm.rows is None
assert tm.size_bytes is None
assert tm.partition_by is None
assert tm.clustered_by is None
def test_table_metadata_partial_population():
tm = TableMetadata(rows=100, size_bytes=2048)
assert tm.rows == 100
assert tm.size_bytes == 2048
assert tm.partition_by is None
assert tm.clustered_by is None

View file

@ -0,0 +1,71 @@
"""Dispatch + identifier-validation gate for the source-agnostic
metadata providers."""
from app.api._metadata_models import MetadataRequest
def test_dispatcher_returns_bq_provider_for_bigquery():
from app.api.v2_catalog import _metadata_provider_for
from connectors.bigquery import metadata as bq_meta
fn = _metadata_provider_for("bigquery")
assert fn is bq_meta.fetch
def test_dispatcher_returns_keboola_provider_for_keboola():
from app.api.v2_catalog import _metadata_provider_for
from connectors.keboola import metadata as kb_meta
fn = _metadata_provider_for("keboola")
assert fn is kb_meta.fetch
def test_dispatcher_returns_none_for_unknown_source():
from app.api.v2_catalog import _metadata_provider_for
assert _metadata_provider_for("jira") is None
assert _metadata_provider_for("") is None
assert _metadata_provider_for("snowflake") is None
def test_build_metadata_request_for_valid_row():
from app.api.v2_catalog import _build_metadata_request
req = _build_metadata_request({
"id": "orders",
"bucket": "dwh_base",
"source_table": "orders_2024",
})
assert isinstance(req, MetadataRequest)
assert req.table_id == "orders"
assert req.bucket == "dwh_base"
assert req.source_table == "orders_2024"
def test_build_metadata_request_rejects_unsafe_bucket():
from app.api.v2_catalog import _build_metadata_request
req = _build_metadata_request({
"id": "x",
"bucket": "evil`; DROP--",
"source_table": "t",
})
assert req is None
def test_build_metadata_request_falls_back_to_id_when_source_table_missing():
"""Some legacy Keboola registry rows have empty source_table; the row id
is the table name in that case (mirrors v2_schema:168 behavior)."""
from app.api.v2_catalog import _build_metadata_request
req = _build_metadata_request({
"id": "orders",
"bucket": "in.c-crm",
"source_table": "",
})
assert req is not None
assert req.source_table == "orders"
def test_stub_providers_return_none():
"""Providers don't have their real bodies yet — stubs return None
so the catalog endpoint stays 200 while we wire the rest."""
from connectors.bigquery import metadata as bq_meta
from connectors.keboola import metadata as kb_meta
req = MetadataRequest(table_id="x", bucket="b", source_table="t")
assert bq_meta.fetch(req) is None
assert kb_meta.fetch(req) is None

View file

@ -0,0 +1,99 @@
"""Unified cache flush across all four catalog/schema/sample/metadata
caches on registry write."""
from unittest.mock import patch
def test_invalidate_flushes_all_four_caches():
from app.api import v2_catalog, v2_schema, v2_sample
from app.api._metadata_models import TableMetadata
# Pre-populate.
v2_catalog._table_rows_cache.set("all", ["fake_row"])
v2_catalog._metadata_cache.set("orders", TableMetadata(rows=10))
v2_schema._schema_cache.set("orders", {"columns": []})
v2_sample._sample_cache.set("orders|10", [{"row": 1}])
v2_catalog.invalidate_for_table("orders")
assert v2_catalog._table_rows_cache.get("all") is None
assert v2_catalog._metadata_cache.get("orders") is None
assert v2_schema._schema_cache.get("orders") is None
# Sample cache is cleared whole (we don't have prefix-invalidation).
assert v2_sample._sample_cache.get("orders|10") is None
def test_invalidate_schedules_single_row_rewarm(monkeypatch):
"""After the flush, a background re-warm task is scheduled for the
same table_id. Assert via patching create_task."""
import asyncio
from app.api import v2_catalog
scheduled = []
def fake_create_task(coro):
# Drain the coroutine so the test doesn't leak it.
coro.close()
scheduled.append(coro)
return None
# Simulate a running event loop so the create_task branch is reached.
monkeypatch.setattr(asyncio, "get_running_loop", lambda: object())
monkeypatch.setattr(asyncio, "create_task", fake_create_task)
v2_catalog.invalidate_for_table("orders")
assert len(scheduled) == 1
def test_register_table_invalidates(seeded_app):
"""Registering a table flushes the rows cache so the next catalog
request reflects it without waiting for the 5-min TTL."""
from app.api import v2_catalog
v2_catalog._table_rows_cache.set("all", [])
client = seeded_app["client"]
token = seeded_app["admin_token"]
headers = {"Authorization": f"Bearer {token}"}
client.post("/api/admin/register-table", json={
"name": "new_t",
"source_type": "keboola",
"bucket": "in.c-x",
"source_table": "t",
"query_mode": "local",
}, headers=headers)
assert v2_catalog._table_rows_cache.get("all") is None
def test_update_table_invalidates(seeded_app):
from app.api import v2_catalog
client = seeded_app["client"]
token = seeded_app["admin_token"]
headers = {"Authorization": f"Bearer {token}"}
client.post("/api/admin/register-table", json={
"name": "u_t",
"source_type": "keboola",
"bucket": "in.c-x",
"source_table": "t",
"query_mode": "local",
}, headers=headers)
v2_catalog._table_rows_cache.set("all", ["pre-update"])
client.put("/api/admin/registry/u_t", json={"description": "new"}, headers=headers)
assert v2_catalog._table_rows_cache.get("all") is None
def test_unregister_table_invalidates(seeded_app):
from app.api import v2_catalog
client = seeded_app["client"]
token = seeded_app["admin_token"]
headers = {"Authorization": f"Bearer {token}"}
client.post("/api/admin/register-table", json={
"name": "d_t",
"source_type": "keboola",
"bucket": "in.c-x",
"source_table": "t",
"query_mode": "local",
}, headers=headers)
v2_catalog._table_rows_cache.set("all", ["pre-delete"])
client.delete("/api/admin/registry/d_t", headers=headers)
assert v2_catalog._table_rows_cache.get("all") is None

View file

@ -0,0 +1,179 @@
"""Catalog endpoint integration: per-table metadata enrichment for
remote rows."""
from unittest.mock import patch
from app.api._metadata_models import TableMetadata
def _register_table(seeded_app, **kwargs):
"""Register a table into the test DB using TableRegistryRepository."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
conn = get_system_db()
try:
repo = TableRegistryRepository(conn)
# `name` defaults to `id` if not supplied
name = kwargs.pop("name", kwargs.get("id"))
repo.register(name=name, **kwargs)
finally:
conn.close()
def test_remote_row_includes_metadata_fields(seeded_app, monkeypatch):
"""Catalog response for a query_mode='remote' BQ row carries the four
new fields populated by the provider."""
# Reset catalog row cache so this test's registered table is visible.
from app.api import v2_catalog
v2_catalog._table_rows_cache.clear()
v2_catalog._metadata_cache.clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
fake_meta = TableMetadata(
rows=10000, size_bytes=2_000_000,
partition_by="event_date", clustered_by=["country", "platform"],
)
_register_table(
seeded_app,
id="orders", source_type="bigquery", bucket="dwh_base",
source_table="orders_2024", query_mode="remote",
)
with patch(
"connectors.bigquery.metadata.fetch", return_value=fake_meta,
):
r = c.get(
"/api/v2/catalog",
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200, r.text
tables = r.json()["tables"]
orders = next(t for t in tables if t["id"] == "orders")
assert orders["rows"] == 10000
assert orders["size_bytes"] == 2_000_000
assert orders["partition_by"] == "event_date"
assert orders["clustered_by"] == ["country", "platform"]
# Existing fields still present.
assert orders["query_mode"] == "remote"
def test_local_row_unaffected_by_provider_dispatch(seeded_app):
"""query_mode='local' rows take the parquet-stat path; provider not called."""
from app.api import v2_catalog
v2_catalog._table_rows_cache.clear()
v2_catalog._metadata_cache.clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
_register_table(
seeded_app,
id="users", source_type="keboola", bucket="in.c-crm",
source_table="users", query_mode="local",
)
with patch("connectors.keboola.metadata.fetch") as mock_fetch:
r = c.get(
"/api/v2/catalog",
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200, r.text
mock_fetch.assert_not_called()
def test_provider_failure_returns_null_metadata(seeded_app):
"""Provider returns None → row appears with null new fields, not
a 500. Catalog endpoint must stay 200."""
from app.api import v2_catalog
v2_catalog._table_rows_cache.clear()
v2_catalog._metadata_cache.clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
_register_table(
seeded_app,
id="broken", source_type="bigquery", bucket="dwh_base",
source_table="broken_t", query_mode="remote",
)
with patch(
"connectors.bigquery.metadata.fetch", return_value=None,
):
r = c.get(
"/api/v2/catalog",
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200, r.text
tables = r.json()["tables"]
broken = next(t for t in tables if t["id"] == "broken")
assert broken["rows"] is None
assert broken["size_bytes"] is None
assert broken["partition_by"] is None
assert broken["clustered_by"] is None
def test_zero_size_bytes_reports_small_not_unknown(seeded_app):
"""Devin Review #1 regression: `if cached.size_bytes:` is falsy when
`size_bytes == 0` (genuinely empty table) that wrongly emitted
`rough_size_hint=None` ("unknown") instead of `"small"` (the bucket
`_bucket_size(0)` returns).
Fix in `_size_hint_for_row`: distinguish "size known to be zero" from
"size is unknown" with `is not None`."""
from app.api import v2_catalog
v2_catalog._table_rows_cache.clear()
v2_catalog._metadata_cache.clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
fake_meta = TableMetadata(
rows=0, size_bytes=0, partition_by=None, clustered_by=[],
)
_register_table(
seeded_app,
id="empty_t", source_type="bigquery", bucket="dwh_base",
source_table="empty_t", query_mode="remote",
)
with patch(
"connectors.bigquery.metadata.fetch", return_value=fake_meta,
):
r = c.get(
"/api/v2/catalog",
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200, r.text
tables = r.json()["tables"]
empty = next(t for t in tables if t["id"] == "empty_t")
# The whole point of this test: 0 bytes is NOT "unknown".
assert empty["size_bytes"] == 0
assert empty["rough_size_hint"] == "small", (
f"size_bytes=0 should bucket to 'small', got {empty['rough_size_hint']}"
)
def test_cache_hit_does_not_call_provider_twice(seeded_app):
"""First call invokes provider; second within 15 min hits cache."""
from app.api import v2_catalog
v2_catalog._table_rows_cache.clear()
v2_catalog._metadata_cache.clear()
c = seeded_app["client"]
token = seeded_app["admin_token"]
_register_table(
seeded_app,
id="orders", source_type="bigquery", bucket="dwh_base",
source_table="orders_2024", query_mode="remote",
)
fake_meta = TableMetadata(rows=1, size_bytes=2)
with patch(
"connectors.bigquery.metadata.fetch", return_value=fake_meta,
) as mock_fetch:
c.get("/api/v2/catalog", headers={"Authorization": f"Bearer {token}"})
c.get("/api/v2/catalog", headers={"Authorization": f"Bearer {token}"})
assert mock_fetch.call_count == 1

View file

@ -331,3 +331,67 @@ class TestBqAccessErrors:
assert captured["billing_project"] == "billing-proj"
# FROM clause uses data project (where INFORMATION_SCHEMA.COLUMNS lives)
assert "`data-proj.ds.INFORMATION_SCHEMA.COLUMNS`" in captured["bq_sql"]
class TestBuildSchemaUncached:
"""The uncached entry point exists for warmup, which has no user
context. RBAC + cache check live in `build_schema`; the BQ work +
cache write live in `build_schema_uncached`."""
def test_uncached_function_exists_and_does_not_take_user(self):
"""Signature: build_schema_uncached(conn, table_id, *, bq)"""
from app.api.v2_schema import build_schema_uncached
import inspect
sig = inspect.signature(build_schema_uncached)
params = list(sig.parameters)
assert "user" not in params, (
"build_schema_uncached should NOT require a user — that's "
"the whole point of the split (warmup has no user)."
)
assert "table_id" in params
assert "bq" in params
def test_build_schema_delegates_to_uncached(self, monkeypatch):
"""build_schema should call build_schema_uncached after RBAC+cache check."""
from app.api import v2_schema
called_with = {}
def fake_uncached(conn, table_id, *, bq, row=None):
called_with["table_id"] = table_id
called_with["row"] = row
return {"table_id": table_id, "columns": []}
monkeypatch.setattr(v2_schema, "build_schema_uncached", fake_uncached)
# Bypass the cache + RBAC for this assertion — both are tested elsewhere.
monkeypatch.setattr(v2_schema._schema_cache, "get", lambda k, default=None: None)
monkeypatch.setattr(v2_schema, "can_access_table", lambda u, tid, c: True)
# Synthetic registry row.
from unittest.mock import MagicMock
repo_mock = MagicMock()
repo_mock.get.return_value = {"id": "x", "source_type": "bigquery"}
monkeypatch.setattr(v2_schema, "TableRegistryRepository", lambda c: repo_mock)
v2_schema.build_schema(
conn=MagicMock(), user={"id": "u"}, table_id="x", bq=MagicMock(),
)
assert called_with["table_id"] == "x"
def test_uncached_raises_notfound_for_unregistered_table(self):
"""Warmup-direct call against an unregistered id raises NotFound,
not FileNotFoundError or other surprise."""
from app.api.v2_schema import build_schema_uncached, NotFound
from unittest.mock import MagicMock
conn = MagicMock()
repo_mock = MagicMock()
repo_mock.get.return_value = None
# Patch the repo lookup the same way the implementation imports it.
import app.api.v2_schema as v2_schema_mod
original = v2_schema_mod.TableRegistryRepository
v2_schema_mod.TableRegistryRepository = lambda c: repo_mock
try:
with pytest.raises(NotFound):
build_schema_uncached(conn, "nonexistent", bq=MagicMock())
finally:
v2_schema_mod.TableRegistryRepository = original

View file

@ -0,0 +1,111 @@
"""Asserts that /api/v2/schema/{id} for a BQ row makes exactly ONE
bigquery_query() call on cache miss, down from two pre-#155.
Counts via a side-effect tracker on the mocked DuckDB session.
"""
from unittest.mock import MagicMock, patch
import pytest
def _mock_duckdb_session_returning(rows):
"""Build a context-manager mock that returns `rows` on .fetchall().
Exposes `call_count` on the returned mock for assertion.
"""
session = MagicMock()
session.execute.return_value.fetchall.return_value = rows
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = False
return cm, session
def test_fetch_bq_columns_full_is_single_query():
"""The new shared helper makes exactly ONE call to bigquery_query."""
from connectors.bigquery.access import fetch_bq_columns_full
bq = MagicMock()
bq.projects.data = "data-proj"
bq.projects.billing = "billing-proj"
cm, session = _mock_duckdb_session_returning([
("event_date", "DATE", "NO", "YES", None),
("country", "STRING", "YES", "NO", 1),
("user_id", "STRING", "NO", "NO", None),
])
bq.duckdb_session.return_value = cm
rows = fetch_bq_columns_full(bq, "dwh_base", "events")
assert len(rows) == 3
# Exactly one bigquery_query() call — no second round-trip.
assert session.execute.call_count == 1
first_call = session.execute.call_args_list[0]
# Outer wrapper SQL is bigquery_query(?, ?, ?)
assert "bigquery_query" in first_call.args[0]
# Inner BQ SQL pulls all five columns we need at once.
inner_sql = first_call.args[1][1]
assert "column_name" in inner_sql
assert "data_type" in inner_sql
assert "is_nullable" in inner_sql
assert "is_partitioning_column" in inner_sql
assert "clustering_ordinal_position" in inner_sql
def test_fetch_bq_columns_full_returns_dicts():
"""Each row is a dict with the documented keys."""
from connectors.bigquery.access import fetch_bq_columns_full
bq = MagicMock()
bq.projects.data = "data-proj"
bq.projects.billing = "billing-proj"
cm, _ = _mock_duckdb_session_returning([
("event_date", "DATE", "NO", "YES", None),
])
bq.duckdb_session.return_value = cm
rows = fetch_bq_columns_full(bq, "dwh_base", "events")
assert rows == [{
"name": "event_date",
"type": "DATE",
"nullable": False,
"is_partitioning_column": True,
"clustering_ordinal_position": None,
}]
def test_fetch_bq_columns_full_returns_none_when_unconfigured():
"""Sentinel BqAccess (data project empty) → return None, no query."""
from connectors.bigquery.access import fetch_bq_columns_full
bq = MagicMock()
bq.projects.data = "" # sentinel
rows = fetch_bq_columns_full(bq, "dwh_base", "events")
assert rows is None
bq.duckdb_session.assert_not_called()
def test_fetch_bq_columns_full_returns_none_on_unsafe_identifier():
"""Refuses to interpolate identifiers that fail validation."""
from connectors.bigquery.access import fetch_bq_columns_full
bq = MagicMock()
bq.projects.data = "data-proj"
rows = fetch_bq_columns_full(bq, "evil`; DROP--", "events")
assert rows is None
bq.duckdb_session.assert_not_called()
def test_fetch_bq_columns_full_returns_none_on_query_error():
"""BQ failure → log + None; never raises."""
from connectors.bigquery.access import fetch_bq_columns_full
bq = MagicMock()
bq.projects.data = "data-proj"
bq.projects.billing = "billing-proj"
cm = MagicMock()
cm.__enter__.return_value.execute.side_effect = RuntimeError("BQ down")
cm.__exit__.return_value = False
bq.duckdb_session.return_value = cm
rows = fetch_bq_columns_full(bq, "dwh_base", "events")
assert rows is None