agnes-the-ai-analyst/app/api/bq_metadata_refresh.py
ZdenekSrotyr b6cdd68e8d feat(catalog): entity_type + validated where_examples + view-aware cost-guard + scheduler hygiene
Three behavioural improvements driven by the sub-agent end-to-end test
findings, plus scheduler tweaks to prevent the post-deploy contention
burst we measured.

CATALOG (catalog-side bugs the test agents tripped on):
  - new entity_type field per remote row (BASE TABLE / VIEW /
    MATERIALIZED VIEW). For views, rows + size_bytes return null
    instead of the misleading 0 that __TABLES__ reports.
  - where_examples now validates against the table's actual schema
    (cached known_columns from refresh). The pre-fix behavior
    blindly advertised `country_code = 'CZ'` on tables with no
    country_code column — the sub-agent tests reliably hit this on
    unit_economics.
  - new known_columns + entity_type columns on bq_metadata_cache;
    populated by bq_metadata_refresh.refresh_one from the same
    fetch_bq_columns_full call (no extra BQ roundtrip) plus a
    cheap INFORMATION_SCHEMA.TABLES lookup for table_type.

QUERY COST-GUARD:
  - remote_scan_too_large suggestion now names views explicitly:
    `Target(s) <ids> are VIEW or MATERIALIZED VIEW. BigQuery does
    not push LIMIT into the view body — SELECT * FROM <view>
    LIMIT 1 still runs the full underlying scan.` Programmatic
    consumers get a new view_targets field on the error detail.

SCHEDULER HYGIENE (the post-deploy 1-minute window where
concurrent parquet downloads dropped to ~1 MB/s):
  - SCHEDULER_STARTUP_GRACE_SECONDS (default 60) holds the first
    tick so the burst doesn't overlap cache_warmup writes.
  - SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS (default 900)
    randomises bq-metadata-refresh's first-fire offset.

TESTS:
  - test_bq_metadata_cache_repo: entity_type + known_columns round-trip
  - test_v2_catalog_remote_metadata: where_examples validation, views
    return null rows/size_bytes, cold rows have empty examples
  - test_api_query_guardrail: VIEW-aware suggestion text + view_targets
  - test_connectors_bigquery_metadata: entity_type lookup mock + new
    fields in TableMetadata expectations
  - test_scheduler_sidecar: grace + jitter env-var resolution
2026-05-12 10:37:35 +02:00

315 lines
12 KiB
Python

"""BigQuery metadata cache refresh — owner of the ``bq_metadata_cache``
write path.
Three endpoints share this module:
- ``POST /api/admin/run-bq-metadata-refresh`` — called by the scheduler
container (auth: shared scheduler token resolves to a synthetic admin
user). Walks remote rows in ``table_registry``, fetches each via the
BigQuery metadata provider, UPSERTs into ``bq_metadata_cache``.
- ``POST /api/v2/metadata-cache/refresh?table=<id>`` — admin-gated, for
operator on-demand refresh of a single row (e.g. after editing the
registry entry's ``bucket`` / ``source_table``).
- ``GET /api/v2/metadata-cache/status`` — auth required, NOT admin-only.
Returns per-row freshness so analyst tooling (CLI / Claude Code) can
decide whether to trust the cached numbers or wait for a refresh.
Why this lives outside the catalog endpoint
-------------------------------------------
Earlier releases inlined a per-row BigQuery fetch into ``GET /api/v2/catalog``.
On cold caches that became O(N) sequential BQ jobs API roundtrips inside
one HTTP request — easily 90 s+ on partitioned tables — and reliably blew
the CLI's 30 s ``httpx.ReadTimeout``. Moving the fetch off the hot path
into a scheduled refresh job (default every 4 h, configurable via
``SCHEDULER_BQ_METADATA_REFRESH_INTERVAL``) keeps the catalog response
under tens of milliseconds even at first boot, at the cost of metadata
being up to one refresh-interval stale. The freshness field surfaces
that explicitly.
"""
from __future__ import annotations
import asyncio
import logging
import os
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import duckdb
from fastapi import APIRouter, Depends, HTTPException, Query
from app.auth.access import require_admin
from app.auth.dependencies import _get_db, get_current_user
from src.repositories.bq_metadata_cache import BqMetadataCacheRepository
from src.repositories.table_registry import TableRegistryRepository
logger = logging.getLogger(__name__)
router = APIRouter()
# ─── Freshness thresholds ──────────────────────────────────────────────────
def _scheduler_interval_seconds() -> int:
"""Return the scheduler's configured refresh interval, mirroring
``services/scheduler/__main__.py``. We re-read the env var instead
of importing the scheduler module because the scheduler runs in a
sibling container and is not on the app's import path.
"""
raw = os.environ.get("SCHEDULER_BQ_METADATA_REFRESH_INTERVAL")
if raw is None or raw == "":
return 4 * 60 * 60 # 4 h default
try:
value = int(raw)
except (TypeError, ValueError):
return 4 * 60 * 60
return value if value > 0 else 4 * 60 * 60
def _fresh_threshold_seconds() -> int:
"""A row is ``fresh`` when refreshed within this window.
Two refresh intervals: one refresh might fail (network blip, BQ
throttle); the analyst should keep seeing the last-known-good row
as ``fresh`` until two consecutive refreshes have passed without
success. Beyond that, the response surfaces ``stale`` so the
consumer knows the numbers might be outdated.
"""
return 2 * _scheduler_interval_seconds()
def compute_freshness(
cache_row: Optional[dict[str, Any]],
*,
now: Optional[datetime] = None,
fresh_threshold: Optional[int] = None,
) -> str:
"""Classify a cache row's freshness.
- ``never_fetched``: no row, or no successful refresh yet.
- ``fresh``: refreshed within the threshold.
- ``stale``: refreshed earlier than the threshold.
- ``error``: most recent attempt failed and there is no prior success
(success row is preserved across errors — analyst keeps using
last-known-good numbers).
"""
if cache_row is None:
return "never_fetched"
refreshed_at = cache_row.get("refreshed_at")
error_at = cache_row.get("error_at")
if refreshed_at is None:
return "error" if error_at is not None else "never_fetched"
threshold = fresh_threshold if fresh_threshold is not None else _fresh_threshold_seconds()
cutoff = (now or datetime.now(timezone.utc)) - timedelta(seconds=threshold)
# DuckDB returns naive datetimes for TIMESTAMP columns; treat as UTC.
if refreshed_at.tzinfo is None:
refreshed_at = refreshed_at.replace(tzinfo=timezone.utc)
return "fresh" if refreshed_at >= cutoff else "stale"
# ─── Single-row refresh primitive ──────────────────────────────────────────
def refresh_one(conn: duckdb.DuckDBPyConnection, row: dict[str, Any]) -> dict[str, Any]:
"""Fetch BQ metadata for one row and UPSERT the result.
Synchronous; safe to call from an anyio thread. Returns a small
outcome dict for the caller (counts, audit).
Failures are absorbed: the cache row's prior success is preserved
(``error_at`` + ``error_msg`` set, ``refreshed_at`` left alone).
"""
from app.api._metadata_models import MetadataRequest
from connectors.bigquery import metadata as bq_metadata
from src.identifier_validation import validate_quoted_identifier
table_id = row["id"]
bucket = row.get("bucket") or ""
source_table = row.get("source_table") or table_id
repo = BqMetadataCacheRepository(conn)
if not (
validate_quoted_identifier(bucket, "bucket")
and validate_quoted_identifier(source_table, "source_table")
):
repo.mark_error(table_id, "invalid bucket/source_table identifier")
return {"table_id": table_id, "status": "error", "error": "invalid identifier"}
req = MetadataRequest(
table_id=table_id, bucket=bucket, source_table=source_table,
)
try:
result = bq_metadata.fetch(req)
except Exception as e:
# bq_metadata.fetch is documented as never-raises, but defense in
# depth: catch any regression so one bad row doesn't kill the
# whole scheduler tick.
msg = f"{type(e).__name__}: {e}"
logger.warning("bq metadata refresh failed for %s: %s", table_id, msg)
repo.mark_error(table_id, msg)
return {"table_id": table_id, "status": "error", "error": msg}
if result is None:
repo.mark_error(table_id, "provider returned no data")
return {"table_id": table_id, "status": "no_data"}
repo.upsert_success(
table_id,
rows=result.rows,
size_bytes=result.size_bytes,
partition_by=result.partition_by,
clustered_by=result.clustered_by,
entity_type=result.entity_type,
known_columns=result.known_columns,
)
return {
"table_id": table_id,
"status": "ok",
"rows": result.rows,
"size_bytes": result.size_bytes,
"entity_type": result.entity_type,
}
def _list_remote_bq_rows(conn: duckdb.DuckDBPyConnection) -> list[dict[str, Any]]:
rows = TableRegistryRepository(conn).list_all()
return [
r for r in rows
if r.get("query_mode") == "remote" and r.get("source_type") == "bigquery"
]
def _refresh_concurrency() -> int:
raw = os.environ.get("AGNES_BQ_METADATA_REFRESH_CONCURRENCY", "4")
try:
value = int(raw)
except (TypeError, ValueError):
return 4
return value if value > 0 else 4
# ─── Endpoints ─────────────────────────────────────────────────────────────
@router.post("/api/admin/run-bq-metadata-refresh")
async def run_bq_metadata_refresh(
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Refresh metadata for every remote BQ row in the registry.
Called by the scheduler at ``SCHEDULER_BQ_METADATA_REFRESH_INTERVAL``
(default 4 h). Idempotent — running twice in quick succession is
safe but wasteful; the scheduler enforces the interval.
Bounded concurrency (default 4, override via
``AGNES_BQ_METADATA_REFRESH_CONCURRENCY``) so a deployment with
many remote tables doesn't fan out to dozens of parallel BQ jobs.
"""
from src.db import get_system_db
rows = _list_remote_bq_rows(conn)
sem = asyncio.Semaphore(_refresh_concurrency())
async def _one(row: dict[str, Any]) -> dict[str, Any]:
async with sem:
# Each refresh_one call wants its own cursor; the singleton
# connection accessor returns a fresh cursor each call.
return await asyncio.to_thread(refresh_one, get_system_db(), row)
t0 = time.monotonic()
results = await asyncio.gather(
*(_one(r) for r in rows), return_exceptions=True,
)
duration_ms = int((time.monotonic() - t0) * 1000)
succeeded = sum(
1 for r in results if isinstance(r, dict) and r.get("status") == "ok"
)
no_data = sum(
1 for r in results if isinstance(r, dict) and r.get("status") == "no_data"
)
failed = sum(
1 for r in results
if isinstance(r, Exception)
or (isinstance(r, dict) and r.get("status") == "error")
)
logger.info(
"bq metadata refresh: total=%d ok=%d no_data=%d failed=%d duration_ms=%d",
len(rows), succeeded, no_data, failed, duration_ms,
)
return {
"total": len(rows),
"succeeded": succeeded,
"no_data": no_data,
"failed": failed,
"duration_ms": duration_ms,
}
@router.post("/api/v2/metadata-cache/refresh")
async def refresh_one_table(
table: str = Query(..., description="Registry table_id to refresh"),
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Operator on-demand refresh of one row.
Useful right after editing the registry row (so the catalog reflects
new ``bucket`` / ``source_table`` immediately) or after an upstream
BQ schema change that the operator wants reflected before the next
scheduled tick.
"""
from src.db import get_system_db
row = TableRegistryRepository(conn).get(table)
if not row:
raise HTTPException(status_code=404, detail=f"Unknown table_id: {table}")
if row.get("query_mode") != "remote" or row.get("source_type") != "bigquery":
raise HTTPException(
status_code=400,
detail="Manual metadata refresh is only meaningful for remote BigQuery tables",
)
return await asyncio.to_thread(refresh_one, get_system_db(), row)
@router.get("/api/v2/metadata-cache/status")
def metadata_cache_status(
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Per-table cache status. Non-admin — analyst tools rely on this to
decide whether to trust the catalog's ``rows`` / ``size_bytes`` or
treat the table as opaque until the next refresh.
"""
cache_rows = BqMetadataCacheRepository(conn).list_all()
threshold = _fresh_threshold_seconds()
now = datetime.now(timezone.utc)
interval = _scheduler_interval_seconds()
tables = []
for r in cache_rows:
refreshed_at = r.get("refreshed_at")
error_at = r.get("error_at")
tables.append({
"table_id": r["table_id"],
"refreshed_at": refreshed_at.isoformat() if refreshed_at else None,
"rows": r.get("rows"),
"size_bytes": r.get("size_bytes"),
"partition_by": r.get("partition_by"),
"clustered_by": r.get("clustered_by") or [],
"entity_type": r.get("entity_type"),
"known_columns": r.get("known_columns") or [],
"error_at": error_at.isoformat() if error_at else None,
"error_msg": r.get("error_msg"),
"freshness": compute_freshness(r, now=now, fresh_threshold=threshold),
})
return {
"scheduler_interval_seconds": interval,
"fresh_threshold_seconds": threshold,
"server_time": now.isoformat(),
"tables": tables,
}