diff --git a/CHANGELOG.md b/CHANGELOG.md index 3af036b..2d98c83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,88 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.47.0] — 2026-05-07 + +Catalog metadata enrichment + cache discipline + automatic warmup. +Closes #155 + #156. + +### Added + +- **`/api/v2/catalog` returns four new optional fields per row** — `rows`, + `size_bytes`, `partition_by`, `clustered_by` — populated by per-source-type + metadata providers (`connectors/bigquery/metadata.py`, + `connectors/keboola/metadata.py`). For `query_mode='remote'` BigQuery rows, + `size_bytes` is `active_logical_bytes + long_term_logical_bytes` (a full + scan reads both); region resolved from `data_source.bigquery.location` → + `bq_client.get_dataset(...)` → fall back to legacy `__TABLES__`. + Existing CLI consumers reading only `rough_size_hint` are unaffected. +- **Automatic cache warmup at startup.** FastAPI startup event schedules + a background task that walks BQ remote rows and pre-populates + `_metadata_cache` + `_schema_cache` with bounded concurrency (default 4, + tunable via `AGNES_WARMUP_CONCURRENCY`). Doesn't block readiness; + per-row failures logged + skipped. Opt-out via `AGNES_SKIP_CACHE_WARMUP=1`. +- **Three new admin endpoints under `/api/admin/cache-warmup/*`:** + - `GET /status` — JSON snapshot of the latest run. + - `POST /run` — manual trigger, idempotent under concurrent invocation. + - `GET /stream` — Server-Sent Events with `start` / `row` / `complete` + events for live UI updates. +- **`/admin/tables` cache freshness panel.** Toolbar above the per-source-type + listings with progress bar + "Re-warm all" button + collapsible + terminal-style log fed by SSE (polling fallback at 3 s). Per-row badge + in the existing `col-status` column updates live (fresh / warming / + pending / error). +- **`docs/admin/query-modes.md`** — source-agnostic admin reference for + registering tables as `local` / `remote` / `materialized`. Decision + tree, per-source-type IAM + setup, three worked examples. Linked from + the `?` icon next to the `query_mode` field in the admin UI edit modal + and from the third post-register hint in `agnes admin register-table`. +- **`agnes admin register-table` post-register hint** for `query_mode=remote`: + points at `agnes query --remote "SELECT COUNT(*)..."` as the IAM smoke + check so a missing `dataViewer` / `jobUser` surfaces at registration + time, not 30 minutes later. + +### Changed + +- **`/api/v2/schema/{id}` cache miss now does 1 BQ job instead of 2.** + `connectors/bigquery/access.py:fetch_bq_columns_full` collapses what + used to be `_fetch_bq_schema` + `_fetch_bq_table_options` into a single + `INFORMATION_SCHEMA.COLUMNS` query (same view, same predicate, just a + combined SELECT list). The metadata provider's partition/cluster path + shares the same helper — zero SQL duplication across the two consumers. +- **All four catalog/schema/sample/metadata caches are flushed on registry + change.** `app/api/v2_catalog.py:invalidate_for_table` is wired into + `POST /api/admin/register-table`, `PUT /api/admin/registry/{id}`, and + `DELETE /api/admin/registry/{id}`. After a registry write, a single-row + re-warm task is scheduled in the background so the admin's verification + request hits warm caches within ~1 s instead of waiting for the next + analyst miss. Pre-fix none of the caches were invalidated — admin + registers a table, `agnes catalog` doesn't show the new row for up to + 5 min; admin updates a row's bucket, `agnes schema` returns the OLD + column list for up to 1 hour. +- **`v2_schema.build_schema` split into RBAC-aware outer + RBAC-naive + inner (`build_schema_uncached`).** Live endpoint behavior unchanged; + warmup uses the inner entry point to populate `_schema_cache` without + a user context. + +### Internal + +- New shared dataclass module `app/api/_metadata_models.py` with + `MetadataRequest` (frozen) + `TableMetadata` for source-agnostic + provider input/output. +- New `connectors/keboola/storage_api.py:KeboolaStorageClient.get_table_info` + thin wrapper — keeps `_get` private to the module. +- New env vars (operator-facing tuning, no required setup change): + - `AGNES_SKIP_CACHE_WARMUP` — opt-out of startup warmup. + - `AGNES_WARMUP_CONCURRENCY` — default 4, max parallel BQ + INFORMATION_SCHEMA jobs during a warmup pass. +- New runtime dependency: `sse-starlette>=2.0` (Server-Sent Events + responses for the cache-warmup stream). +- Tests added: `test_metadata_models`, `test_v2_schema_columns_consolidation`, + `test_v2_catalog_dispatcher`, `test_connectors_bigquery_metadata`, + `test_connectors_keboola_metadata`, `test_v2_catalog_remote_metadata`, + `test_v2_catalog_invalidation`, `test_cache_warmup`, + `test_main_startup_warmup`, `test_admin_tables_warmup_ui`. + ## [0.46.5] — 2026-05-07 ### Fixed @@ -47,75 +129,85 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [0.46.0] — 2026-05-07 -Keboola cutover bundle: native parquet on the materialized sync, -auto-discover protection against admin overrides, sync-routing -correctness, plus a bunch of operational paper-cuts surfaced during -a fresh deploy on a Snowflake-backed Keboola project. **BREAKING** -for Keboola operators: schema bump to v26 migrates Keboola -`query_mode='local'` rows to `materialized` (auto-migration runs on -first start; same effective behavior, different internal path — -Storage API direct via `fileType=parquet` instead of the DuckDB -extension). +Catalog metadata enrichment + cache discipline + automatic warmup. +Closes #155 + #156. ### Added -- `AGNES_TEMP_DIR` env var (default in `docker-compose.yml`: `/data/tmp`) routes per-call extractor tempdirs (Snowflake-UNLOAD slice staging, CSV→parquet intermediates) off the container's overlayfs `/tmp` onto the data volume. Boot-disk overlayfs filled to 100% on agnes-dev during a multi-GiB sliced parquet export; the dedicated data disk had 15 GiB free at the time. Helper `connectors/keboola/storage_api.py:get_temp_root` mkdirs the target on first use; unset / empty / unwritable falls back to system `/tmp` for compat with OSS users on a single-disk host. -- `POST /api/admin/discover-and-register?dry_run=true` returns the planned mutations without writing — lists `would_register`, `drift` (existing rows whose registry coordinates differ from what discovery would write), and `invalid` ids. Useful for auditing before re-running auto-discovery on a registry that's already had admin overrides applied. -- `GET /api/sync/status` returns `{"locked": bool}` — public, no auth. Consumed by the host-side `agnes-auto-upgrade.sh` cron to decide whether to defer `docker compose up -d` until the running sync finishes. Cheap (single Lock check), no sensitive data. - -### Fixed - -- `app/api/admin.py`: `_discover_and_register_tables` no longer overwrites admin-corrected registry rows. Two drift flavours surfaced (and skipped): - - **same_id_diff_coords** — registry has a row at the same id but different `(bucket, source_table)`; admin migrated coordinates. - - **name_collision** — discovery's slugified id differs from any registry id, but the discovered `name` matches an existing row's `name` (case-insensitive). Real-world cause: the `kbc_job` row was registered manually with the right bucket; Keboola's discovery exposes it under a different stage prefix that slugs to a different id. Pre-fix, auto-discovery would have inserted a duplicate whose Storage API export-async 404s. Now classified as drift, surfaced with `registry_id` so an operator can reconcile. -- `app/api/admin.py`: bucket detection in auto-discovery now uses the Keboola API's authoritative `bucket_id` field directly (with id-string parsing only as a fallback). Pre-fix, parsing the id string was the primary path and a stripped stage prefix inserted 137 broken rows. -- `app/api/sync.py`: `POST /api/sync/trigger` with a `tables` payload now actually scopes the materialized pass too. Previously the targeted trigger only filtered the legacy extractor subprocess; `_run_materialized_pass` still iterated every materialized row in the registry, so an admin asking to re-sync `kbc_job` re-ran every other due materialized row alongside it. The pass now takes a `tables` arg and skips rows not in the target set with `reason="not_in_target"`. Both registry id and name match. -- `scripts/ops/agnes-auto-upgrade.sh`: defers `docker compose up -d` while a sync is in flight. Probes `GET /api/sync/status` with a 5s timeout; if the response carries `"locked":true`, exits 0 with a deferred-recreate log line and waits for the next 5-min cron tick. Connection failures (older app version without the endpoint, app crashed, etc.) fall through to the upgrade — being stuck on a wedged image is worse than interrupting a hypothetical sync. -- `connectors/keboola/extractor.py`: `materialize_query` per-call tempdir is now opened with `ignore_cleanup_errors=True`. Previously a worker death mid-write under disk-full state could leave a multi-GiB stale slice tree (12 GiB seen on agnes-dev) because `TemporaryDirectory.__exit__` itself raised, masking the original exception and skipping cleanup. Now cleanup is best-effort and always fires. -- `src/scheduler.py`: `is_valid_schedule` now accepts `every 0m` (interval = 0 = "always due"). Useful as a force-resync override on a row whose previous attempt errored without recording `last_sync` — the default `every 1h` would otherwise block the retry for an hour. Existing values reject as before. -- `app/api/sync.py`: `POST /api/sync/trigger` now accepts both `["table_id"]` (legacy) and `{"tables": ["table_id"]}` (mirrors response shape) request bodies, plus `null` / no body for "sync everything". Malformed shapes return HTTP 422 with a structured detail. No client breakage — the old wire format keeps working. +- **`/api/v2/catalog` returns four new optional fields per row** — `rows`, + `size_bytes`, `partition_by`, `clustered_by` — populated by per-source-type + metadata providers (`connectors/bigquery/metadata.py`, + `connectors/keboola/metadata.py`). For `query_mode='remote'` BigQuery rows, + `size_bytes` is `active_logical_bytes + long_term_logical_bytes` (a full + scan reads both); region resolved from `data_source.bigquery.location` → + `bq_client.get_dataset(...)` → fall back to legacy `__TABLES__`. + Existing CLI consumers reading only `rough_size_hint` are unaffected. +- **Automatic cache warmup at startup.** FastAPI startup event schedules + a background task that walks BQ remote rows and pre-populates + `_metadata_cache` + `_schema_cache` with bounded concurrency (default 4, + tunable via `AGNES_WARMUP_CONCURRENCY`). Doesn't block readiness; + per-row failures logged + skipped. Opt-out via `AGNES_SKIP_CACHE_WARMUP=1`. +- **Three new admin endpoints under `/api/admin/cache-warmup/*`:** + - `GET /status` — JSON snapshot of the latest run. + - `POST /run` — manual trigger, idempotent under concurrent invocation. + - `GET /stream` — Server-Sent Events with `start` / `row` / `complete` + events for live UI updates. +- **`/admin/tables` cache freshness panel.** Toolbar above the per-source-type + listings with progress bar + "Re-warm all" button + collapsible + terminal-style log fed by SSE (polling fallback at 3 s). Per-row badge + in the existing `col-status` column updates live (fresh / warming / + pending / error). +- **`docs/admin/query-modes.md`** — source-agnostic admin reference for + registering tables as `local` / `remote` / `materialized`. Decision + tree, per-source-type IAM + setup, three worked examples. Linked from + the `?` icon next to the `query_mode` field in the admin UI edit modal + and from the third post-register hint in `agnes admin register-table`. +- **`agnes admin register-table` post-register hint** for `query_mode=remote`: + points at `agnes query --remote "SELECT COUNT(*)..."` as the IAM smoke + check so a missing `dataViewer` / `jobUser` surfaces at registration + time, not 30 minutes later. ### Changed -- `connectors/keboola`: materialized sync now requests **parquet directly** from the Storage API (`POST /v2/storage/tables/{id}/export-async` with `fileType=parquet`) instead of CSV → DuckDB COPY → parquet. The extractor downloads the Snowflake-UNLOADed parquet, renames into place, and skips the DuckDB roundtrip entirely. Eliminates the OOM that hits multi-GB Keboola tables when `read_csv(..., all_varchar=true, max_line_size=64MB)` materializes the whole CSV in memory before COPY. Sliced exports (large tables that Snowflake UNLOAD writes as multiple files) are merged via `DuckDB COPY (SELECT * FROM read_parquet([...]))` — peak memory bounded to one parquet row group (~1 MiB) regardless of table size. Admin can pin the legacy CSV path with `source_query='{"file_type":"csv"}'`. Backward-compat alias `KeboolaStorageClient.export_table_to_csv` retained. -- `connectors/keboola/storage_api.py`: `download_file` gzip detection no longer treats unencrypted files as gzipped (previous heuristic would have corrupted parquet downloads at gunzip time). Name-suffix-only. -- **BREAKING for Keboola operators**: schema bump to **v26**. Existing `query_mode='local'` Keboola rows are migrated to `query_mode='materialized'` (NULL `source_query` = full-table export — same effective behavior as before). New `register-table --source-type keboola` and `discover-and-register --source-type keboola` default to `materialized`. The `local` mode for Keboola is gone — it ran the DuckDB extension's COPY through Keboola QueryService, which is unreliable on linked-bucket projects (extension v0.1.6 fixes the linked-bucket case but not yet in the community CDN; pre-fix, projects with the `block-shared-snowflake-access` flag couldn't see bucket schemas at all). BigQuery and Jira `local` rows are untouched. See `connectors/keboola/storage_api.py` + the v25→v26 migration in `src/db.py`. -- **Keboola extract path is now Storage API direct**, not the DuckDB extension. New `connectors/keboola/storage_api.py` talks to Keboola Storage API straight via `requests`: - - `POST /v2/storage/tables/{id}/export-async` to kick off the job (with optional `whereFilters` / `columns` / `changedSince` from the row's `source_query` JSON); - - `GET /v2/storage/jobs/{id}` polled with bounded exponential backoff until `success` or `error`; - - `GET /v2/storage/files/{id}?federationToken=1` to fetch a signed URL; - - `GET ` (or per-slice URLs from a manifest for sliced exports) → CSV → DuckDB COPY → parquet. - No `os.chdir`, no boto3/azure-blob/google-cloud-storage SDKs, no extension binary on the data path. Thread-safe. Same path is used both by `materialize_query()` (admin-registered tables with optional filter spec) and by `_extract_via_legacy()` (per-table fallback inside the parallel batch extractor). -- **`source_query` shape for Keboola materialized rows is JSON**, not SQL — Storage API takes a structured filter object, not free-form SQL. Mirrors the BQ materialized path conceptually but with a different payload. Schema: - ```json - { - "where_filters": [{"column": "date", "operator": "ge", "values": ["2026-04-01"]}], - "columns": ["id", "date", "amount"], - "changed_since": "2026-04-01T00:00:00", - "limit": 1000 - } - ``` - All fields optional. Empty / NULL = full-table export. Operators per Keboola Apiary: `eq`, `ne`, `in`, `notIn`, `ge`, `gt`, `le`, `lt`. See `connectors/keboola/storage_api.py:ExportFilter`. -- `POST /api/sync/trigger` is now singleton per process. A second trigger that arrives while the previous sync is still running returns **HTTP 409** (`detail: sync_already_in_progress`) instead of scheduling a parallel `_run_sync`. The scheduler container's `data-refresh` job logs the 409 as a normal warning and waits for its next tick — no retry loop. Operator-visible: clients that hand-roll their own polling on `/api/sync/trigger` now need to handle 409. Why it matters: two concurrent extractor subprocesses both write `extract.duckdb`, fight for its file lock, starve uvicorn's worker pool, and Docker flips `agnes-app` to `unhealthy` long enough for `reverse_proxy`-fronted deploys to return 503 to external traffic until contention drains. -- Keboola legacy Storage-API fallback now runs in parallel across a process pool. When the DuckDB extension's per-table scan fails (e.g. on projects with the `block-shared-snowflake-access` feature flag where workspace roles can't see bucket schemas, see keboola/duckdb-extension#17), tables that fall back to the legacy `kbcstorage` client are now drained concurrently instead of one-at-a-time. The dominant per-table cost is the synchronous wait on the Keboola Storage export job (which scans Snowflake into a CSV and returns); fanning out across N workers cuts wall-clock proportionally for batches that hit the fallback. Default 8 workers, override with `AGNES_KEBOOLA_PARALLELISM` (set to `1` for sequential, useful when debugging or seeing Keboola-side rate-limiting). Project-level concurrency is bounded by the operator's `storage.jobsParallelism` limit (typically 10); the default 8 leaves headroom for other clients. Workers are processes (not threads) because `connectors/keboola/client.py:export_table` does `os.chdir(temp_dir)` to redirect kbcstorage's slice-file downloads into a per-call temp directory — `os.chdir` is process-global, so two threads racing on it land slice files in the wrong directory and the merge step fails with `[Errno 2] No such file or directory: '.csv_X_Y_Z.csv'`. Process workers each have their own CWD. -- Extractor subprocess timeout bumped from 1800s to 3600s (configurable via `AGNES_EXTRACTOR_TIMEOUT_SEC`). On projects where the legacy Storage-API fallback is the only working path (extension blocked by `block-shared-snowflake-access`), 28+ tables × multi-minute Keboola export jobs routinely overran the 30-min cap before the parallel fallback even existed; with parallelization in place the run usually fits, but `kbc_telemetry`-class tables and large CRM snapshots can still push it over. The 1h ceiling matches the longest practically-reasonable Keboola export job before an operator should intervene. -- Extractor subprocess is now launched in its own process group (`subprocess.Popen(..., start_new_session=True)`) so a timeout can take down the whole tree — the extractor parent plus the ProcessPoolExecutor workers it spawned for parallel legacy fallback. Without this, a `subprocess.run(timeout=...)` SIGKILLed only the immediate child; the pool workers were reparented to PID 1 and continued holding open Keboola Storage export jobs, blocking the next sync cycle. On timeout the parent now SIGTERMs the group (10s grace), then SIGKILLs stragglers. The extractor's inline Python script installs a SIGTERM → `sys.exit(143)` handler so the `with ProcessPoolExecutor(...)` block runs its `__exit__` (`shutdown(wait=True)`) cleanly before the process dies. - -### Fixed (cutover regressions, surfaced 2026-05-06) - -- `agnes pull` no longer fails with `hash mismatch: expected … got …` for every Keboola local-mode table. `src/orchestrator.py:_update_sync_state` stored `md5(f"{mtime_ns}:{size}")[:12]` — a 12-char fingerprint of file metadata — while the CLI's post-download integrity check compares against the full 32-char content MD5 it computes via `cli/commands/sync.py:_md5_file`. Those could never match, so every `agnes pull` reported `Updated 0 tables` even when the server had data. Now the orchestrator stores the same content MD5 the materialized SQL path already used (`app/api/sync.py:_file_hash`). -- Latent `NameError: name '_sys' is not defined` in `app/api/sync.py:_run_sync` when the function fell into its outer `except Exception` before reaching the inner `import sys as _sys`. Hoisted the import to the top of the body so the error path stays loggable instead of trading the original failure for a misleading stack trace. -- Keboola sync now falls back to the legacy Storage-API client when the DuckDB Keboola extension's per-table scan fails, not just when the initial `ATTACH` fails. Two changes: - - `kbcstorage>=0.9.0` is promoted from optional to core dependency. The legacy fallback path in `connectors/keboola/extractor.py:_extract_via_legacy` has been there since the extension landed, but until now the bare `from kbcstorage.client import Client` would crash any default install with `ModuleNotFoundError`. - - `connectors/keboola/extractor.py:run` now wraps `_extract_via_extension` in a per-table try/except — on any per-table scan failure it retries via the legacy client. Previously, when `ATTACH` succeeded but the table-level `COPY (SELECT * FROM kbc.""."")` failed, the table was just marked failed with no retry. - Together these unblock deployments where the extension's bucket-schema scans return `Schema '..."in.c-..."' does not exist or not authorized` (keboola/duckdb-extension#17) while the upstream extension fix is in flight. -- `connectors/keboola/access.py:KeboolaAccess.__init__` and `connectors/keboola/extractor.py:_try_attach_extension` now strip a trailing slash from the Keboola stack URL before passing it to the DuckDB Keboola extension's `ATTACH`. The canonical Keboola URL form (`https://connection..keboola.com/`) failed there with a network error; bare-host form works. Operators no longer have to massage the value out of `KEBOOLA_STACK_URL` / `instance.yaml`. -- `src/profiler.py:TableInfo.__init__` makes `description` optional (defaults to `""`). Two call sites in `app/api/catalog.py` and `app/api/sync.py` instantiate `TableInfo(name=..., table_id=...)` without it; the previous required-arg signature crashed sync's profiler pass with `TableInfo.__init__() missing 1 required positional argument: 'description'`, leaving `[SYNC] Profiled 0 tables` after every run. -- `scripts/ops/agnes-auto-upgrade.sh` now `chown`s `${STATE_DIR}` (`/data/state` by default), `/data/extracts`, `/data/analytics` to the new image's runtime UID:GID before `docker compose up` when the image digest moves. Catches root → non-root UID transitions across upgrades — without it, the new image's first start `PermissionError`s on `.session_secret` / DuckDB. Reads the target uid:gid from `/etc/passwd` inside the image so the script stays honest if the runtime user ever moves off uid 999. +- **`/api/v2/schema/{id}` cache miss now does 1 BQ job instead of 2.** + `connectors/bigquery/access.py:fetch_bq_columns_full` collapses what + used to be `_fetch_bq_schema` + `_fetch_bq_table_options` into a single + `INFORMATION_SCHEMA.COLUMNS` query (same view, same predicate, just a + combined SELECT list). The metadata provider's partition/cluster path + shares the same helper — zero SQL duplication across the two consumers. +- **All four catalog/schema/sample/metadata caches are flushed on registry + change.** `app/api/v2_catalog.py:invalidate_for_table` is wired into + `POST /api/admin/register-table`, `PUT /api/admin/registry/{id}`, and + `DELETE /api/admin/registry/{id}`. After a registry write, a single-row + re-warm task is scheduled in the background so the admin's verification + request hits warm caches within ~1 s instead of waiting for the next + analyst miss. Pre-fix none of the caches were invalidated — admin + registers a table, `agnes catalog` doesn't show the new row for up to + 5 min; admin updates a row's bucket, `agnes schema` returns the OLD + column list for up to 1 hour. +- **`v2_schema.build_schema` split into RBAC-aware outer + RBAC-naive + inner (`build_schema_uncached`).** Live endpoint behavior unchanged; + warmup uses the inner entry point to populate `_schema_cache` without + a user context. ### Internal -- `infra/modules/customer-instance` (tag `infra-v1.8.0`): `startup-script.sh.tpl` no longer overwrites operator-edited `AGNES_TAG` / `AGNES_TEMP_DIR` in `/opt/agnes/.env` on every boot. Reads the existing values when present and lets them win over the template-computed `$IMAGE_TAG`. Pre-fix, an in-place TF action that stopped/started the VM (e.g. `machine_type` change) would re-run the startup script and clobber any manually-pinned image tag — operators had to re-edit the file post-restart. Fresh provisions still get the TF-driven values; the `.env` file's existence is the disambiguator. To force a TF-driven reset, `rm /opt/agnes/.env` and reboot. +- New shared dataclass module `app/api/_metadata_models.py` with + `MetadataRequest` (frozen) + `TableMetadata` for source-agnostic + provider input/output. +- New `connectors/keboola/storage_api.py:KeboolaStorageClient.get_table_info` + thin wrapper — keeps `_get` private to the module. +- New env vars (operator-facing tuning, no required setup change): + - `AGNES_SKIP_CACHE_WARMUP` — opt-out of startup warmup. + - `AGNES_WARMUP_CONCURRENCY` — default 4, max parallel BQ + INFORMATION_SCHEMA jobs during a warmup pass. +- New runtime dependency: `sse-starlette>=2.0` (Server-Sent Events + responses for the cache-warmup stream). +- Tests added: `test_metadata_models`, `test_v2_schema_columns_consolidation`, + `test_v2_catalog_dispatcher`, `test_connectors_bigquery_metadata`, + `test_connectors_keboola_metadata`, `test_v2_catalog_remote_metadata`, + `test_v2_catalog_invalidation`, `test_cache_warmup`, + `test_main_startup_warmup`, `test_admin_tables_warmup_ui`. ## [0.45.0] — 2026-05-07 diff --git a/app/api/_metadata_models.py b/app/api/_metadata_models.py new file mode 100644 index 0000000..36981f5 --- /dev/null +++ b/app/api/_metadata_models.py @@ -0,0 +1,40 @@ +"""Shared data shapes for source-agnostic table-metadata providers. + +Lives under `app/api/` because the primary consumer is +`app/api/v2_catalog.py`. Connector-side providers in `connectors//` +import upward into this module — the inverse layering would force +`v2_catalog.py` to depend on `connectors/__init__.py`, which is the +wrong direction. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class MetadataRequest: + """Narrow input passed to a metadata provider's `fetch()`. + + `bucket` and `source_table` are pre-validated by the dispatcher + (`validate_quoted_identifier`) before construction, so the provider + can interpolate them into SQL/URL paths without re-checking. Frozen + so the (provider, request)-keyed cache lookup is stable. + """ + table_id: str + bucket: str + source_table: str + + +@dataclass +class TableMetadata: + """Source-agnostic metadata bundle. Every field optional — providers + fill what they can cheaply get; callers tolerate `None`. Adding a new + field here is a non-breaking change: existing CLI consumers don't + even render `rough_size_hint` (verified `grep -rn rough_size_hint cli/` + is empty), let alone the new fields. + """ + rows: int | None = None + size_bytes: int | None = None + partition_by: str | None = None + clustered_by: list[str] | None = None diff --git a/app/api/admin.py b/app/api/admin.py index 76f0ab1..8111cd3 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -2179,6 +2179,9 @@ def register_table( params=_sanitize_for_audit(request.model_dump()), ) + from app.api.v2_catalog import invalidate_for_table + invalidate_for_table(table_id) + if not is_bigquery: # Keboola / Jira / local rows are insert-only here. 201 Created — the # decorator no longer carries a default status, so each branch is @@ -2512,6 +2515,9 @@ async def update_table( if after.get("source_type") == "bigquery": background.add_task(_materialize_bigquery_extract_bg) + from app.api.v2_catalog import invalidate_for_table + invalidate_for_table(table_id) + return {"id": table_id, "updated": list(updates.keys())} @@ -2607,6 +2613,9 @@ async def unregister_table( }), ) + from app.api.v2_catalog import invalidate_for_table + invalidate_for_table(table_id) + if was_bigquery: background.add_task(_materialize_bigquery_extract_bg) diff --git a/app/api/cache_warmup.py b/app/api/cache_warmup.py new file mode 100644 index 0000000..4391f8c --- /dev/null +++ b/app/api/cache_warmup.py @@ -0,0 +1,264 @@ +"""Cache warmup framework — populates catalog/schema/metadata caches at +container startup so the first analyst hits warm caches. + +Bounded concurrency (4 by default). Exposes: + - GET /api/admin/cache-warmup/status — JSON snapshot + - POST /api/admin/cache-warmup/run — manual trigger (idempotent) + - GET /api/admin/cache-warmup/stream — Server-Sent Events +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import time +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from typing import Literal +from uuid import uuid4 + +from fastapi import APIRouter, Depends +from sse_starlette.sse import EventSourceResponse + +from app.auth.access import require_admin + +logger = logging.getLogger(__name__) +router = APIRouter() + + +@dataclass +class WarmupRowState: + table_id: str + status: Literal["pending", "warming", "fresh", "error"] + started_at: str | None = None + completed_at: str | None = None + duration_ms: int | None = None + error: str | None = None + last_warmed_at: str | None = None + + +@dataclass +class WarmupRunState: + run_id: str + trigger: Literal["startup", "manual", "registry_change"] + started_at: str + completed_at: str | None = None + total: int = 0 + completed: int = 0 + failed: int = 0 + rows: dict[str, WarmupRowState] = field(default_factory=dict) + _subscribers: list[asyncio.Queue] = field(default_factory=list, repr=False) + + +WARMUP_STATE: WarmupRunState | None = None +_RUN_LOCK = asyncio.Lock() + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def maybe_schedule_startup_warmup() -> None: + """Called from app/main.py FastAPI startup event.""" + if os.environ.get("AGNES_SKIP_CACHE_WARMUP") == "1": + logger.info("cache warmup skipped (AGNES_SKIP_CACHE_WARMUP=1)") + return + try: + asyncio.create_task(_warm_catalog_caches_bg(trigger="startup")) + except RuntimeError: + logger.warning("no running event loop — startup warmup skipped") + + +async def _warm_catalog_caches_bg( + trigger: str = "startup", state: WarmupRunState | None = None, +) -> None: + """Walk registry, warm metadata + schema caches for every remote row. + + If `state` is provided, use it (caller has already published it on + WARMUP_STATE). Otherwise build a fresh state and assign WARMUP_STATE. + """ + global WARMUP_STATE + if state is None: + async with _RUN_LOCK: + # Re-check inside the lock — another caller might have completed + # a run while we were waiting. + if WARMUP_STATE and WARMUP_STATE.completed_at is None: + return + state = WarmupRunState( + run_id=uuid4().hex[:8], + trigger=trigger, + started_at=_now_iso(), + ) + WARMUP_STATE = state + + run_id = state.run_id + rows = _list_remote_rows() + state.total = len(rows) + for r in rows: + state.rows[r["id"]] = WarmupRowState( + table_id=r["id"], status="pending", + ) + _broadcast(state, {"event": "start", "data": { + "run_id": run_id, "trigger": trigger, "total": state.total, + }}) + + sem = asyncio.Semaphore(int(os.environ.get("AGNES_WARMUP_CONCURRENCY", "4"))) + await asyncio.gather( + *(_warm_one(r, state, sem) for r in rows), return_exceptions=True, + ) + + state.completed_at = _now_iso() + _broadcast(state, {"event": "complete", "data": { + "run_id": run_id, "total": state.total, + "completed": state.completed, "failed": state.failed, + }}) + logger.info( + "cache warmup complete: run_id=%s total=%d ok=%d fail=%d", + run_id, state.total, state.completed, state.failed, + ) + + +def _list_remote_rows() -> list[dict]: + """Snapshot of registry rows that need a warmup pass.""" + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + conn = get_system_db() + rows = TableRegistryRepository(conn).list_all() + return [ + r for r in rows + if r.get("query_mode") == "remote" and r.get("source_type") == "bigquery" + ] + + +async def _warm_one( + row: dict, state: WarmupRunState, sem: asyncio.Semaphore, +) -> None: + async with sem: + rs = state.rows[row["id"]] + rs.status = "warming" + rs.started_at = _now_iso() + _broadcast(state, {"event": "row", "data": asdict(rs)}) + t0 = time.monotonic() + try: + await asyncio.to_thread(_warm_metadata_sync, row) + await asyncio.to_thread(_warm_schema_sync, row) + rs.status = "fresh" + rs.last_warmed_at = _now_iso() + state.completed += 1 + except Exception as e: + rs.status = "error" + rs.error = str(e) + state.failed += 1 + logger.warning("cache warmup row=%s failed: %s", row["id"], e) + finally: + rs.completed_at = _now_iso() + rs.duration_ms = int((time.monotonic() - t0) * 1000) + _broadcast(state, {"event": "row", "data": asdict(rs)}) + + +def _warm_metadata_sync(row: dict) -> None: + """Trigger metadata cache populate via the catalog's normal path.""" + from app.api.v2_catalog import _size_hint_for_row + _size_hint_for_row(row) + + +def _warm_schema_sync(row: dict) -> None: + """Trigger schema cache populate via build_schema_uncached.""" + from app.api.v2_schema import build_schema_uncached + from connectors.bigquery.access import get_bq_access + from src.db import get_system_db + bq = get_bq_access() + build_schema_uncached(get_system_db(), row["id"], bq=bq, row=row) + + +async def warm_one_table(table_id: str) -> None: + """Single-row re-warm — invoked by `invalidate_for_table` after a + registry change. Does NOT update WARMUP_STATE (small change shouldn't + overwrite the last full run's status); just refreshes the caches.""" + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + conn = get_system_db() + row = TableRegistryRepository(conn).get(table_id) + if not row or row.get("query_mode") != "remote": + return + try: + await asyncio.to_thread(_warm_metadata_sync, row) + await asyncio.to_thread(_warm_schema_sync, row) + except Exception as e: + logger.warning("single-row warmup failed for %s: %s", table_id, e) + + +def _broadcast(state: WarmupRunState, event: dict) -> None: + """Send an event to every SSE subscriber. Dead queues are pruned.""" + dead = [] + for q in state._subscribers: + try: + q.put_nowait(event) + except asyncio.QueueFull: + dead.append(q) + for q in dead: + state._subscribers.remove(q) + + +def _serialize_state(state: WarmupRunState) -> dict: + return { + "run_id": state.run_id, + "trigger": state.trigger, + "started_at": state.started_at, + "completed_at": state.completed_at, + "total": state.total, + "completed": state.completed, + "failed": state.failed, + "rows": {tid: asdict(rs) for tid, rs in state.rows.items()}, + } + + +# ─── Endpoints ──────────────────────────────────────────────────────── + + +@router.get("/api/admin/cache-warmup/status") +async def warmup_status(user: dict = Depends(require_admin)): + if WARMUP_STATE is None: + return {"state": "never_run"} + return _serialize_state(WARMUP_STATE) + + +@router.post("/api/admin/cache-warmup/run") +async def warmup_run(user: dict = Depends(require_admin)): + global WARMUP_STATE + if WARMUP_STATE and WARMUP_STATE.completed_at is None: + return {"run_id": WARMUP_STATE.run_id, "status": "already_running"} + state = WarmupRunState( + run_id=uuid4().hex[:8], + trigger="manual", + started_at=_now_iso(), + ) + WARMUP_STATE = state + asyncio.create_task(_warm_catalog_caches_bg(state=state)) + return {"run_id": state.run_id, "status": "started"} + + +@router.get("/api/admin/cache-warmup/stream") +async def warmup_stream(user: dict = Depends(require_admin)): + async def gen(): + q: asyncio.Queue = asyncio.Queue(maxsize=256) + if WARMUP_STATE is None: + yield {"event": "idle", "data": json.dumps({"state": "never_run"})} + return + WARMUP_STATE._subscribers.append(q) + yield {"event": "snapshot", "data": json.dumps(_serialize_state(WARMUP_STATE))} + try: + while True: + ev = await asyncio.wait_for(q.get(), timeout=30.0) + yield {"event": ev["event"], "data": json.dumps(ev["data"])} + if ev["event"] == "complete": + return + except asyncio.TimeoutError: + return + finally: + if WARMUP_STATE and q in WARMUP_STATE._subscribers: + WARMUP_STATE._subscribers.remove(q) + + return EventSourceResponse(gen()) diff --git a/app/api/v2_catalog.py b/app/api/v2_catalog.py index 43ade88..917beba 100644 --- a/app/api/v2_catalog.py +++ b/app/api/v2_catalog.py @@ -11,6 +11,8 @@ from app.utils import get_data_dir as _get_data_dir from src.rbac import can_access_table from src.repositories.table_registry import TableRegistryRepository from app.api.v2_cache import TTLCache +from app.api._metadata_models import MetadataRequest, TableMetadata +from src.identifier_validation import validate_quoted_identifier router = APIRouter(prefix="/api/v2", tags=["v2"]) @@ -25,6 +27,51 @@ router = APIRouter(prefix="/api/v2", tags=["v2"]) _table_rows_cache = TTLCache(maxsize=1, ttl_seconds=300) _TABLE_ROWS_KEY = "all" +# Per-table cached TableMetadata. 15-min TTL — long enough to amortise +# across an analyst session, short enough that a freshly-registered +# remote table shows real numbers within a coffee break (the cache-bust +# path in `invalidate_for_table` accelerates this for the common admin- +# verifies-registration flow). +_metadata_cache = TTLCache(maxsize=512, ttl_seconds=900) + + +def _metadata_provider_for(source_type: str): + """Lazy-import dispatch for source-specific metadata providers. + + Lazy because connector modules are heavy (BQ extension, google-cloud + client, etc.) and a Keboola-only deployment shouldn't pay the BQ + import cost. Returns ``None`` for unknown source types — the caller + treats that as "no metadata enrichment available" and falls through. + """ + if source_type == "bigquery": + from connectors.bigquery import metadata as m + return m.fetch + if source_type == "keboola": + from connectors.keboola import metadata as m + return m.fetch + return None + + +def _build_metadata_request(row: dict) -> MetadataRequest | None: + """Construct a validated MetadataRequest from a registry row. + + Pre-validates the identifiers via `validate_quoted_identifier` before + constructing the request — providers can then interpolate + `req.bucket` / `req.source_table` into SQL/URL paths without + re-checking. Returns ``None`` when validation fails; provider is not + dispatched for that row. + """ + bucket = row.get("bucket") or "" + source_table = row.get("source_table") or row.get("id") or "" + if not bucket or not source_table: + return None + if not (validate_quoted_identifier(bucket, "bucket") + and validate_quoted_identifier(source_table, "source_table")): + return None + return MetadataRequest( + table_id=row["id"], bucket=bucket, source_table=source_table, + ) + def _flavor_for(source_type: str) -> str: return "bigquery" if source_type == "bigquery" else "duckdb" @@ -65,23 +112,67 @@ def _bucket_size(byte_count: int) -> str: return "very_large" -def _materialized_size_hint(table_id: str, source_type: str, query_mode: str) -> str | None: - """Return a rough size bucket for a row whose data is on the server's - local filesystem (any `query_mode` that produces a parquet — `local` and - `materialized`). Returns ``None`` for `remote` (size requires a BQ - INFORMATION_SCHEMA round-trip; tracked separately) and for tables whose - parquet hasn't been materialised yet so the AI gets ``null`` not a - misleading "small". +def _size_hint_for_row(row: dict) -> dict: + """Resolve the per-row metadata bundle the catalog response surfaces. + + Renamed from `_materialized_size_hint` (which always also handled + `local` rows; the old name was misleading). Returns a dict with up + to four keys: `rough_size_hint`, `rows`, `size_bytes`, `partition_by`, + `clustered_by`. Missing keys are reported as `null` in the response. + + Branches: + - `local` / `materialized` → existing on-disk parquet stat (cheap). + - `remote` → dispatch to the per-source-type provider; cache the + TableMetadata for 15 min. + """ + table_id = row["id"] + source_type = row.get("source_type") or "" + query_mode = row.get("query_mode") or "local" + + if query_mode in ("local", "materialized"): + return {"rough_size_hint": _materialized_parquet_size_bucket( + table_id, source_type, query_mode, + )} + + if query_mode != "remote": + return {"rough_size_hint": None} + + # Cache lookup (per-row TableMetadata). + cached = _metadata_cache.get(table_id) + if cached is None: + cached = _resolve_remote_metadata(row) + if cached is not None: + _metadata_cache.set(table_id, cached) + + if cached is None: + return {"rough_size_hint": None} + + return { + "rough_size_hint": _bucket_size(cached.size_bytes) if cached.size_bytes is not None else None, + "rows": cached.rows, + "size_bytes": cached.size_bytes, + "partition_by": cached.partition_by, + "clustered_by": cached.clustered_by, + } + + +def _materialized_parquet_size_bucket( + table_id: str, source_type: str, query_mode: str, +) -> str | None: + """Size hint for rows whose data is on the server filesystem + (the old `_materialized_size_hint` body). Renamed for clarity now + that the new dispatcher is the entry point. Layout matches the v2 extract.duckdb contract: ${DATA_DIR}/extracts//data/.parquet """ - if query_mode == "remote": - return None if not source_type: return None try: - path = Path(_get_data_dir()) / "extracts" / source_type / "data" / f"{table_id}.parquet" + path = ( + Path(_get_data_dir()) / "extracts" / source_type / "data" + / f"{table_id}.parquet" + ) if not path.exists(): return None return _bucket_size(path.stat().st_size) @@ -91,6 +182,75 @@ def _materialized_size_hint(table_id: str, source_type: str, query_mode: str) -> return None +def _resolve_remote_metadata(row: dict) -> "TableMetadata | None": + """Provider dispatch for a remote row. Returns None on any failure.""" + source_type = row.get("source_type") or "" + provider = _metadata_provider_for(source_type) + if provider is None: + return None + req = _build_metadata_request(row) + if req is None: + return None + try: + return provider(req) + except Exception: + # Defense in depth — providers are documented as never-raises, + # but a regression would otherwise 500 the whole catalog. + return None + + +def invalidate_for_table(table_id: str) -> None: + """Drop every per-table cache so the next /api/v2/* request reflects + the just-registered / updated / unregistered row immediately. Owned + by the catalog module so admin.py doesn't need to know which caches + exist. + + Imports v2_schema and v2_sample lazily — keeps catalog tests from + pulling in BQ-extension imports they don't need. + """ + import asyncio + from app.api import v2_schema, v2_sample + + _table_rows_cache.clear() + _metadata_cache.invalidate(table_id) + v2_schema._schema_cache.invalidate(table_id) + # Sample cache key is `f"{table_id}|{n}"`; clearing the whole sample + # cache is heavier than precise invalidation, but registry-change + # frequency (handful per day on a typical instance) doesn't justify + # adding a prefix-invalidation primitive to TTLCache. + v2_sample._sample_cache.clear() + + # Schedule a single-row re-warm so admins editing a registry row + # see fresh data within a couple of seconds rather than waiting for + # the next analyst to trigger a miss. Fire-and-forget; failures + # log + skip inside the coroutine. + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + if loop is not None: + # Running inside an async context (production FastAPI path). + asyncio.create_task(_rewarm_one_row(table_id)) + # No running event loop (e.g. called from a sync test or a sync + # handler thread). Skip re-warm — the next live request will + # populate via miss. + + +async def _rewarm_one_row(table_id: str) -> None: + """Background single-row re-warm. Imports cache_warmup lazily to + avoid a circular import at module load (cache_warmup.py is created + in Task 10; until then, this function logs a warning and returns).""" + try: + from app.api.cache_warmup import warm_one_table + await warm_one_table(table_id) + except Exception: + import logging + logging.getLogger(__name__).warning( + "single-row re-warm failed for %s — next live request will populate", + table_id, + ) + + def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: rows = _table_rows_cache.get(_TABLE_ROWS_KEY) if rows is None: @@ -105,6 +265,7 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: for r in rows: if not can_access_table(user, r["id"], conn): continue + hint = _size_hint_for_row(r) visible.append({ "id": r["id"], "name": r.get("name") or r["id"], @@ -114,10 +275,11 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: "sql_flavor": _flavor_for(r.get("source_type") or ""), "where_examples": _examples_for(r.get("source_type") or ""), "fetch_via": _fetch_hint(r["id"], r.get("source_type") or ""), - "rough_size_hint": _materialized_size_hint( - r["id"], r.get("source_type") or "", - r.get("query_mode") or "local", - ), + "rough_size_hint": hint.get("rough_size_hint"), + "rows": hint.get("rows"), + "size_bytes": hint.get("size_bytes"), + "partition_by": hint.get("partition_by"), + "clustered_by": hint.get("clustered_by"), }) return { @@ -132,12 +294,12 @@ def catalog( conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): # Plain ``def`` so FastAPI auto-offloads to the anyio thread pool — - # build_catalog now calls `_materialized_size_hint` for every visible - # row, which does sync `Path.stat()` / `Path.exists()` on the data - # volume. On local FS that's microseconds, but on a network-mounted - # DATA_DIR (NFS / CIFS / GCS-FUSE) those calls can block. Plain ``def`` - # means each request runs on its own thread; the event loop stays - # free for non-catalog traffic. Mirrors the Tier 1 conversion of - # /api/query, /api/v2/scan, /api/v2/sample, /api/v2/schema — - # Devin Review on PR #188. + # build_catalog now calls `_size_hint_for_row` for every visible row, + # which does sync `Path.stat()` / `Path.exists()` on the data volume + # (local/materialized) or provider dispatch (remote). On local FS + # that's microseconds, but on a network-mounted DATA_DIR (NFS / CIFS / + # GCS-FUSE) those calls can block. Plain ``def`` means each request + # runs on its own thread; the event loop stays free for non-catalog + # traffic. Mirrors the Tier 1 conversion of /api/query, /api/v2/scan, + # /api/v2/sample, /api/v2/schema — Devin Review on PR #188. return build_catalog(conn, user) diff --git a/app/api/v2_schema.py b/app/api/v2_schema.py index a53c2a4..1a35529 100644 --- a/app/api/v2_schema.py +++ b/app/api/v2_schema.py @@ -31,51 +31,34 @@ _BQ_DIALECT_HINTS = { def _fetch_bq_schema(bq, dataset: str, table: str) -> list[dict]: - """Fetch column list via INFORMATION_SCHEMA.COLUMNS using DuckDB BQ extension. + """Fetch column list via the shared ``_fetch_bq_columns_full_impl`` helper. - `bq.duckdb_session()` provides a DuckDB conn with the bigquery extension - loaded + auth secret installed. SQL here is server-constructed (queries - INFORMATION_SCHEMA.COLUMNS with validated identifiers, no user-derived - fragments), so a BQ BadRequest means registry corruption, not user input - → surfaces as `bq_upstream_error` (HTTP 502), same as `/sample`, opposite - of `/scan*`. + Pre-#155 this had its own INFORMATION_SCHEMA.COLUMNS query; consolidating + with ``_fetch_bq_table_options`` (now also delegating to the same shared + SQL) halves the BQ job count on cache miss. Returns the schema-endpoint + column shape: name / type / nullable / description. + + Calls the raising variant so BQ exceptions reach ``translate_bq_error`` + with their original type (Forbidden → 502, BadRequest → 400, etc.). """ - from connectors.bigquery.access import translate_bq_error - from src.identifier_validation import validate_quoted_identifier + from connectors.bigquery.access import _fetch_bq_columns_full_impl, translate_bq_error, BqAccessError - # Surface "BQ not configured" as the structured 500 BqAccessError(not_configured) - # with hint, not the misleading 400 unsafe_identifier the empty-string sentinel - # would otherwise trigger from validate_quoted_identifier below. Devin BUG_0002. - if not bq.projects.data: - bq.client() # raises BqAccessError(not_configured); endpoint catches it + try: + rows = _fetch_bq_columns_full_impl(bq, dataset, table) + except (ValueError, BqAccessError): + # ValueError ("unsafe identifier") and BqAccessError propagate + # unchanged — the endpoint's existing handlers expect those types. + raise + except Exception as e: + # Any other BQ-side exception goes through translate_bq_error so + # the response status is classified correctly. + raise translate_bq_error(e, bq.projects, bad_request_status="upstream_error") - # Defense in depth (cf. v2_sample) — registry already validates these, - # but the v2 endpoints are downstream of admin REST writes that could - # bypass that gate. A backtick in `dataset` would otherwise break out - # of `…` quoting and execute arbitrary BQ SQL. - if not (validate_quoted_identifier(bq.projects.data, "BQ project") - and validate_quoted_identifier(dataset, "BQ dataset") - and validate_quoted_identifier(table, "BQ source_table")): - raise ValueError("unsafe BQ identifier in registry — refusing to query") - - bq_sql = ( - f"SELECT column_name, data_type, is_nullable " - f"FROM `{bq.projects.data}.{dataset}.INFORMATION_SCHEMA.COLUMNS` " - f"WHERE table_name = ? ORDER BY ordinal_position" - ) - with bq.duckdb_session() as conn: - try: - rows = conn.execute( - "SELECT * FROM bigquery_query(?, ?, ?)", - [bq.projects.billing, bq_sql, table], - ).fetchall() - except Exception as e: - raise translate_bq_error(e, bq.projects, bad_request_status="upstream_error") return [ { - "name": r[0], - "type": r[1], - "nullable": r[2] == "YES", + "name": r["name"], + "type": r["type"], + "nullable": r["nullable"], "description": "", } for r in rows @@ -83,61 +66,27 @@ def _fetch_bq_schema(bq, dataset: str, table: str) -> list[dict]: def _fetch_bq_table_options(bq, dataset: str, table: str) -> dict: - """Best-effort fetch of partition/cluster info from INFORMATION_SCHEMA.COLUMNS. + """Best-effort fetch of partition/cluster info via the shared + `fetch_bq_columns_full` helper. - BigQuery exposes partition + cluster metadata as per-column flags: - - `is_partitioning_column` ('YES' / 'NO') — at most one column per table - - `clustering_ordinal_position` (INT64, null for non-clustered columns; - otherwise 1, 2, ... in cluster-key order) - - Returns `{}` on ANY failure (best-effort). The outer - `try/except Exception → return {}` is a load-bearing contract: the - /schema endpoint must keep returning 200 with empty partition info even - when this query fails (e.g. on permissioned tables, on cross-project - misconfigurations). DO NOT route this through `translate_bq_error` — - that would convert errors to BqAccessError which the endpoint would 502 - on. See tests/test_v2_schema.py::test_schema_returns_200_with_empty_… + Returns ``{}`` on ANY failure (best-effort). Same load-bearing + contract as before: the /schema endpoint must keep returning 200 + with empty partition info when this fails. """ - from src.identifier_validation import validate_quoted_identifier + from connectors.bigquery.access import fetch_bq_columns_full - # Best-effort path: if BQ isn't configured (sentinel BqAccess), return - # empty partition info silently — operator gets schema (200) without - # failing on the missing config. The strict /schema path (_fetch_bq_schema) - # surfaces the not_configured error separately. - if not bq.projects.data: + rows = fetch_bq_columns_full(bq, dataset, table) + if not rows: return {} - if not (validate_quoted_identifier(bq.projects.data, "BQ project") - and validate_quoted_identifier(dataset, "BQ dataset") - and validate_quoted_identifier(table, "BQ source_table")): - return {} # Best-effort; refuse to query unsafe identifiers. - - try: - with bq.duckdb_session() as conn: - bq_sql = ( - f"SELECT column_name, is_partitioning_column, clustering_ordinal_position " - f"FROM `{bq.projects.data}.{dataset}.INFORMATION_SCHEMA.COLUMNS` " - f"WHERE table_name = ? " - f"ORDER BY clustering_ordinal_position NULLS LAST" - ) - rows = conn.execute( - "SELECT * FROM bigquery_query(?, ?, ?)", - [bq.projects.billing, bq_sql, table], - ).fetchall() - if not rows: - return {} - partition_by = next( - (r[0] for r in rows if (r[1] or "").upper() == "YES"), - None, - ) - clustered_by = [r[0] for r in rows if r[2] is not None] - return {"partition_by": partition_by, "clustered_by": clustered_by} - except Exception as e: - logger.warning( - "BQ table options fetch failed for %s.%s.%s: %s", - bq.projects.data, dataset, table, e, - ) - return {} + partition_by = next( + (r["name"] for r in rows if r["is_partitioning_column"]), + None, + ) + clustered_rows = [r for r in rows if r["clustering_ordinal_position"] is not None] + clustered_rows.sort(key=lambda r: r["clustering_ordinal_position"]) + clustered_by = [r["name"] for r in clustered_rows] + return {"partition_by": partition_by, "clustered_by": clustered_by} def build_schema( @@ -157,11 +106,35 @@ def build_schema( if not can_access_table(user, table_id, conn): raise PermissionError(table_id) - cache_key = f"{table_id}" - cached = _schema_cache.get(cache_key) + cached = _schema_cache.get(table_id) if cached is not None: return cached + return build_schema_uncached(conn, table_id, bq=bq, row=row) + + +def build_schema_uncached( + conn: duckdb.DuckDBPyConnection, + table_id: str, + *, + bq: BqAccess, + row: dict | None = None, +) -> dict: + """Build the schema response and populate `_schema_cache`. **Skips + RBAC and cache-hit short-circuit** — call only from contexts where + those are unnecessary (warmup) or already enforced upstream + (`build_schema`). + + Pass `row` from the upstream caller's `repo.get(table_id)` to avoid + a redundant DB round-trip; if not provided, `build_schema_uncached` + fetches it itself (the warmup-direct call site). + """ + if row is None: + repo = TableRegistryRepository(conn) + row = repo.get(table_id) + if not row: + raise NotFound(table_id) + source_type = row.get("source_type") or "" if source_type == "bigquery": dataset = row.get("bucket") or "" @@ -179,7 +152,6 @@ def build_schema( } else: # Local source — read schema from the parquet via DuckDB - from pathlib import Path from app.utils import get_data_dir parquet = ( get_data_dir() / "extracts" / source_type / "data" / f"{table_id}.parquet" @@ -204,7 +176,7 @@ def build_schema( "where_dialect_hints": {}, } - _schema_cache.set(cache_key, payload) + _schema_cache.set(table_id, payload) return payload diff --git a/app/main.py b/app/main.py index f3b6213..513e8ca 100644 --- a/app/main.py +++ b/app/main.py @@ -113,6 +113,7 @@ from app.api.store import router as store_router from app.api.my_stack import router as my_stack_router from app.api.welcome import router as welcome_router from app.api.claude_md import router as claude_md_router +from app.api.cache_warmup import router as cache_warmup_router from app.marketplace_server.router import router as marketplace_server_router from app.marketplace_server.git_router import make_git_wsgi_app from app.web.router import router as web_router @@ -147,6 +148,9 @@ async def lifespan(app): except Exception as e: logger.warning("failed to bump anyio thread pool capacity: %s", e) + from app.api.cache_warmup import maybe_schedule_startup_warmup + maybe_schedule_startup_warmup() + yield from src.db import close_system_db close_system_db() @@ -552,6 +556,7 @@ def create_app() -> FastAPI: app.include_router(my_stack_router) app.include_router(welcome_router) app.include_router(claude_md_router) + app.include_router(cache_warmup_router) app.include_router(marketplace_server_router) # Git smart-HTTP endpoint for Claude Code: /marketplace.git/* diff --git a/app/web/templates/admin_tables.html b/app/web/templates/admin_tables.html index 2870489..9143f00 100644 --- a/app/web/templates/admin_tables.html +++ b/app/web/templates/admin_tables.html @@ -871,6 +871,25 @@
+
+
+

Cache freshness

+ +
+
+
+ Loading… +
+ +
+ Show log +

+                
+
+
+ {# Phase D: tab-split scaffold. Per-connector tabs (BigQuery / Keboola / Jira) replace the single mixed form. Each tab has its own Register button + listing div + (later) form modals. The @@ -1080,7 +1099,9 @@
- +
+ so the per-row badge slot exists.""" + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.get("/admin/tables", headers={"Authorization": f"Bearer {token}"}) + assert 'class="col-status"' in r.text +``` + +- [ ] **Step 14.2: Run test to verify it fails** + +```bash +cd /tmp/agnes-metadata && python -m pytest tests/test_admin_tables_warmup_ui.py -v +``` +Expected: 2 of 3 fail (toolbar absent, EventSource not yet wired). Third probably passes. + +- [ ] **Step 14.3: Add the cache toolbar `
` to `admin_tables.html`** + +Edit `app/web/templates/admin_tables.html`. Locate the section between the page header and the per-source-type table listings — search for `bqTableListing` to find the area. Insert the new card just before the listings: + +```html +
+
+

Cache freshness

+ +
+
+
+ Loading… +
+ +
+ Show log +

+        
+
+
+``` + +- [ ] **Step 14.4: Add the JS for live updates** + +Inside the existing `