agnes-the-ai-analyst/app/api/cache_warmup.py
ZdenekSrotyr aa5921da67
release: 0.47.0 — source-agnostic catalog metadata + cache discipline (#223)
## Summary

- Catalog enrichment for `query_mode='remote'` rows: `rows`, `size_bytes`, `partition_by`, `clustered_by` per table (BQ + Keboola providers).
- `/api/v2/schema/{id}` cache miss: 2 BQ jobs → 1 (-50%) via shared `fetch_bq_columns_full`.
- All four catalog/schema/sample/metadata caches flush on registry change; single-row re-warm scheduled.
- Automatic cache warmup at server startup (bounded concurrency, opt-out via `AGNES_SKIP_CACHE_WARMUP=1`).
- SSE-driven freshness toolbar on `/admin/tables` with progress bar, log, and per-row badge.
- New admin doc `docs/admin/query-modes.md` — single source of truth on `local` / `remote` / `materialized` choice.

Closes #155.
Closes #156.

## Test plan

- [x] 65+ targeted tests pass across 11 new test modules + 3 modified ones.
- [x] No DB migration; no wire-break; `MIN_COMPAT_CLI_VERSION` unchanged.
- [ ] Reviewer: register a remote BQ table via `/admin/tables`, observe the toolbar populates within ~2 s and the per-row badge transitions warming → fresh.
- [ ] Reviewer: trigger `Re-warm all`, verify SSE log scrolls and `cacheWarmupBar` progresses.
- [ ] Reviewer: edit a registered row's bucket, verify `agnes schema <id>` returns updated columns immediately (no 1-hour staleness).
- [ ] Reviewer: confirm `agnes admin register-table --query-mode remote` prints the new IAM-smoke-check hint.

## Notable design decisions

- BigQuery `INFORMATION_SCHEMA.TABLE_STORAGE` is the only valid scope for size+rows (verified live 2026-05-07; dataset-scoped doesn't exist). Region resolved from `instance.yaml.data_source.bigquery.location` → `bq.client().get_dataset(...)` → fall back to legacy `__TABLES__`.
- VIEW handling: TABLE_STORAGE returns no rows for views, fall through to `__TABLES__` (also empty) → `TableMetadata(rows=None, size_bytes=None, partition_by=..., clustered_by=...)`. Null size signals analyst Claude to apply existing CLAUDE.md guidance.
- `size_bytes` is `active_logical_bytes + long_term_logical_bytes` — full BQ scan reads both; reporting only active undercounts aged partitioned tables.
- Source-agnostic provider seam: per-source `connectors/<source>/metadata.py:fetch(MetadataRequest)`; dispatcher in `app/api/v2_catalog.py:_metadata_provider_for` lazily imports per source_type so a Keboola-only deployment doesn't pay the BQ-extension import cost.
- Warmup non-blocking: FastAPI `lifespan` schedules `asyncio.create_task(_warm_catalog_caches_bg)` before `yield`. Per-row failures isolated.

## Out of scope

- Profile / column histograms / dimension cardinality for remote tables (separate issue).
- Onboarding nudge ("you have 0 remote tables, consider registering some BQ ones") — separate UX call.
- Provider plug-in registration via entry-points (the dispatch table is a hardcoded if-tree today; one line per future source).

## Release

Bumps `pyproject.toml` 0.46.1 → 0.47.0 (main shipped 0.46.0 + 0.46.1 during this PR — see commit `d98976ec`). New CHANGELOG section under `## [0.47.0] — 2026-05-07`.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
<!-- devin-review-badge-begin -->

---

<a href="https://app.devin.ai/review/keboola/agnes-the-ai-analyst/pull/223" target="_blank">
  <picture>
    <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1">
    <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open in Devin Review">
  </picture>
</a>
<!-- devin-review-badge-end -->
2026-05-07 18:33:55 +02:00

264 lines
8.8 KiB
Python

"""Cache warmup framework — populates catalog/schema/metadata caches at
container startup so the first analyst hits warm caches.
Bounded concurrency (4 by default). Exposes:
- GET /api/admin/cache-warmup/status — JSON snapshot
- POST /api/admin/cache-warmup/run — manual trigger (idempotent)
- GET /api/admin/cache-warmup/stream — Server-Sent Events
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Literal
from uuid import uuid4
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse
from app.auth.access import require_admin
logger = logging.getLogger(__name__)
router = APIRouter()
@dataclass
class WarmupRowState:
table_id: str
status: Literal["pending", "warming", "fresh", "error"]
started_at: str | None = None
completed_at: str | None = None
duration_ms: int | None = None
error: str | None = None
last_warmed_at: str | None = None
@dataclass
class WarmupRunState:
run_id: str
trigger: Literal["startup", "manual", "registry_change"]
started_at: str
completed_at: str | None = None
total: int = 0
completed: int = 0
failed: int = 0
rows: dict[str, WarmupRowState] = field(default_factory=dict)
_subscribers: list[asyncio.Queue] = field(default_factory=list, repr=False)
WARMUP_STATE: WarmupRunState | None = None
_RUN_LOCK = asyncio.Lock()
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def maybe_schedule_startup_warmup() -> None:
"""Called from app/main.py FastAPI startup event."""
if os.environ.get("AGNES_SKIP_CACHE_WARMUP") == "1":
logger.info("cache warmup skipped (AGNES_SKIP_CACHE_WARMUP=1)")
return
try:
asyncio.create_task(_warm_catalog_caches_bg(trigger="startup"))
except RuntimeError:
logger.warning("no running event loop — startup warmup skipped")
async def _warm_catalog_caches_bg(
trigger: str = "startup", state: WarmupRunState | None = None,
) -> None:
"""Walk registry, warm metadata + schema caches for every remote row.
If `state` is provided, use it (caller has already published it on
WARMUP_STATE). Otherwise build a fresh state and assign WARMUP_STATE.
"""
global WARMUP_STATE
if state is None:
async with _RUN_LOCK:
# Re-check inside the lock — another caller might have completed
# a run while we were waiting.
if WARMUP_STATE and WARMUP_STATE.completed_at is None:
return
state = WarmupRunState(
run_id=uuid4().hex[:8],
trigger=trigger,
started_at=_now_iso(),
)
WARMUP_STATE = state
run_id = state.run_id
rows = _list_remote_rows()
state.total = len(rows)
for r in rows:
state.rows[r["id"]] = WarmupRowState(
table_id=r["id"], status="pending",
)
_broadcast(state, {"event": "start", "data": {
"run_id": run_id, "trigger": trigger, "total": state.total,
}})
sem = asyncio.Semaphore(int(os.environ.get("AGNES_WARMUP_CONCURRENCY", "4")))
await asyncio.gather(
*(_warm_one(r, state, sem) for r in rows), return_exceptions=True,
)
state.completed_at = _now_iso()
_broadcast(state, {"event": "complete", "data": {
"run_id": run_id, "total": state.total,
"completed": state.completed, "failed": state.failed,
}})
logger.info(
"cache warmup complete: run_id=%s total=%d ok=%d fail=%d",
run_id, state.total, state.completed, state.failed,
)
def _list_remote_rows() -> list[dict]:
"""Snapshot of registry rows that need a warmup pass."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
conn = get_system_db()
rows = TableRegistryRepository(conn).list_all()
return [
r for r in rows
if r.get("query_mode") == "remote" and r.get("source_type") == "bigquery"
]
async def _warm_one(
row: dict, state: WarmupRunState, sem: asyncio.Semaphore,
) -> None:
async with sem:
rs = state.rows[row["id"]]
rs.status = "warming"
rs.started_at = _now_iso()
_broadcast(state, {"event": "row", "data": asdict(rs)})
t0 = time.monotonic()
try:
await asyncio.to_thread(_warm_metadata_sync, row)
await asyncio.to_thread(_warm_schema_sync, row)
rs.status = "fresh"
rs.last_warmed_at = _now_iso()
state.completed += 1
except Exception as e:
rs.status = "error"
rs.error = str(e)
state.failed += 1
logger.warning("cache warmup row=%s failed: %s", row["id"], e)
finally:
rs.completed_at = _now_iso()
rs.duration_ms = int((time.monotonic() - t0) * 1000)
_broadcast(state, {"event": "row", "data": asdict(rs)})
def _warm_metadata_sync(row: dict) -> None:
"""Trigger metadata cache populate via the catalog's normal path."""
from app.api.v2_catalog import _size_hint_for_row
_size_hint_for_row(row)
def _warm_schema_sync(row: dict) -> None:
"""Trigger schema cache populate via build_schema_uncached."""
from app.api.v2_schema import build_schema_uncached
from connectors.bigquery.access import get_bq_access
from src.db import get_system_db
bq = get_bq_access()
build_schema_uncached(get_system_db(), row["id"], bq=bq, row=row)
async def warm_one_table(table_id: str) -> None:
"""Single-row re-warm — invoked by `invalidate_for_table` after a
registry change. Does NOT update WARMUP_STATE (small change shouldn't
overwrite the last full run's status); just refreshes the caches."""
from src.db import get_system_db
from src.repositories.table_registry import TableRegistryRepository
conn = get_system_db()
row = TableRegistryRepository(conn).get(table_id)
if not row or row.get("query_mode") != "remote":
return
try:
await asyncio.to_thread(_warm_metadata_sync, row)
await asyncio.to_thread(_warm_schema_sync, row)
except Exception as e:
logger.warning("single-row warmup failed for %s: %s", table_id, e)
def _broadcast(state: WarmupRunState, event: dict) -> None:
"""Send an event to every SSE subscriber. Dead queues are pruned."""
dead = []
for q in state._subscribers:
try:
q.put_nowait(event)
except asyncio.QueueFull:
dead.append(q)
for q in dead:
state._subscribers.remove(q)
def _serialize_state(state: WarmupRunState) -> dict:
return {
"run_id": state.run_id,
"trigger": state.trigger,
"started_at": state.started_at,
"completed_at": state.completed_at,
"total": state.total,
"completed": state.completed,
"failed": state.failed,
"rows": {tid: asdict(rs) for tid, rs in state.rows.items()},
}
# ─── Endpoints ────────────────────────────────────────────────────────
@router.get("/api/admin/cache-warmup/status")
async def warmup_status(user: dict = Depends(require_admin)):
if WARMUP_STATE is None:
return {"state": "never_run"}
return _serialize_state(WARMUP_STATE)
@router.post("/api/admin/cache-warmup/run")
async def warmup_run(user: dict = Depends(require_admin)):
global WARMUP_STATE
if WARMUP_STATE and WARMUP_STATE.completed_at is None:
return {"run_id": WARMUP_STATE.run_id, "status": "already_running"}
state = WarmupRunState(
run_id=uuid4().hex[:8],
trigger="manual",
started_at=_now_iso(),
)
WARMUP_STATE = state
asyncio.create_task(_warm_catalog_caches_bg(state=state))
return {"run_id": state.run_id, "status": "started"}
@router.get("/api/admin/cache-warmup/stream")
async def warmup_stream(user: dict = Depends(require_admin)):
async def gen():
q: asyncio.Queue = asyncio.Queue(maxsize=256)
if WARMUP_STATE is None:
yield {"event": "idle", "data": json.dumps({"state": "never_run"})}
return
WARMUP_STATE._subscribers.append(q)
yield {"event": "snapshot", "data": json.dumps(_serialize_state(WARMUP_STATE))}
try:
while True:
ev = await asyncio.wait_for(q.get(), timeout=30.0)
yield {"event": ev["event"], "data": json.dumps(ev["data"])}
if ev["event"] == "complete":
return
except asyncio.TimeoutError:
return
finally:
if WARMUP_STATE and q in WARMUP_STATE._subscribers:
WARMUP_STATE._subscribers.remove(q)
return EventSourceResponse(gen())