From b3841f5b6c36f357b25df50a2a158505193782ba Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Mon, 11 May 2026 20:37:17 +0200 Subject: [PATCH] =?UTF-8?q?release:=200.50.0=20=E2=80=94=20persistent=20BQ?= =?UTF-8?q?=20metadata=20cache=20+=20scheduled=20refresh;=20catalog=20neve?= =?UTF-8?q?r=20blocks=20on=20BigQuery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since 0.47.0 GET /api/v2/catalog enriched each remote BigQuery row by fetching INFORMATION_SCHEMA.TABLE_STORAGE + COLUMNS through the DuckDB BigQuery extension *inside the request*. On cold caches that fanned out to O(N) sequential BQ jobs-API roundtrips — easily 90 s+ on partitioned / view-backed tables — and reliably blew the CLI's 30 s httpx ReadTimeout. Reproduced with py-spy: three AnyIO worker threads stuck inside connectors/bigquery/metadata._fetch_via_legacy_tables. Refactor: enrichment is read exclusively from a new persistent bq_metadata_cache DuckDB table (schema v40), populated by a scheduler- driven refresh job at SCHEDULER_BQ_METADATA_REFRESH_INTERVAL (default 4 h). Cold catalog response on a fresh container is now tens of milliseconds with metadata_freshness=never_fetched for unwarmed rows. New surface: - POST /api/admin/run-bq-metadata-refresh (scheduler-driven, full) - POST /api/v2/metadata-cache/refresh?table= (admin, single) - GET /api/v2/metadata-cache/status (auth, non-admin) - metadata_freshness field per catalog row Removed (internal API): v2_catalog._size_hint_for_row, _resolve_remote_metadata, _metadata_provider_for, _build_metadata_request, _materialized_size_hint, in-memory _metadata_cache. Response shape unchanged for external consumers. 991 tests passing; 2 pre-existing failures (test_db v3→v4 ladder, test_cli_binary_rename) unrelated to this change. --- CHANGELOG.md | 24 ++ app/api/bq_metadata_refresh.py | 310 +++++++++++++++++++++ app/api/cache_warmup.py | 13 +- app/api/v2_catalog.py | 252 +++++++---------- app/main.py | 2 + pyproject.toml | 2 +- services/scheduler/__main__.py | 19 +- src/db.py | 58 +++- src/repositories/bq_metadata_cache.py | 120 ++++++++ tests/conftest.py | 2 - tests/test_bq_metadata_cache_repo.py | 160 +++++++++++ tests/test_bq_metadata_refresh_endpoint.py | 211 ++++++++++++++ tests/test_scheduler_sidecar.py | 8 + tests/test_v2_catalog_dispatcher.py | 71 ----- tests/test_v2_catalog_invalidation.py | 55 ++-- tests/test_v2_catalog_remote_metadata.py | 207 +++++++------- 16 files changed, 1158 insertions(+), 356 deletions(-) create mode 100644 app/api/bq_metadata_refresh.py create mode 100644 src/repositories/bq_metadata_cache.py create mode 100644 tests/test_bq_metadata_cache_repo.py create mode 100644 tests/test_bq_metadata_refresh_endpoint.py delete mode 100644 tests/test_v2_catalog_dispatcher.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8868ccc..d401f36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,30 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.50.0] — 2026-05-11 + +### Fixed + +- **`GET /api/v2/catalog` no longer hangs on cold cache.** Since 0.47.0 the catalog endpoint enriched each remote BigQuery row by fetching `INFORMATION_SCHEMA.TABLE_STORAGE` + `COLUMNS` through the DuckDB BigQuery extension inside the request. On cold caches that fanned out to O(N) sequential BQ jobs-API roundtrips — easily 90 s+ on partitioned / view-backed tables — and reliably exceeded the CLI's 30 s `httpx.ReadTimeout`. Enrichment now reads exclusively from a persistent `bq_metadata_cache` DuckDB table, populated by a scheduler-driven refresh job. First call after a fresh container start returns in tens of milliseconds with `metadata_freshness: never_fetched` for rows the scheduler hasn't reached yet; subsequent ticks fill the cache. Closes the cold-start outage class entirely. + +### Added + +- **Persistent BigQuery metadata cache (`bq_metadata_cache`, schema v40).** Holds `rows`, `size_bytes`, `partition_by`, `clustered_by`, `refreshed_at`, plus a `error_at` / `error_msg` pair that preserves the last successful row across transient provider failures so analyst tooling keeps seeing last-known-good numbers. +- **`POST /api/admin/run-bq-metadata-refresh`** — scheduler-driven full refresh of every remote BigQuery row in the registry. Bounded concurrency via `AGNES_BQ_METADATA_REFRESH_CONCURRENCY` (default 4). +- **`POST /api/v2/metadata-cache/refresh?table=`** — operator on-demand single-row refresh (admin-gated), for use right after a registry edit when waiting for the next scheduled tick is too long. +- **`GET /api/v2/metadata-cache/status`** — non-admin endpoint surfacing per-row `refreshed_at`, `error_at`, `error_msg`, and `freshness` (`fresh` / `stale` / `never_fetched` / `error`) so CLI / Claude Code can decide whether to trust the catalog's `rows` and `size_bytes`. +- **`metadata_freshness` field** in every `/api/v2/catalog` row. `not_applicable` for `local` / `materialized` rows where the BQ cache concept doesn't apply. +- **Scheduler job `bq-metadata-refresh`** running at `SCHEDULER_BQ_METADATA_REFRESH_INTERVAL` (default `4 * 60 * 60` seconds = 4 h). Tunable per deployment; the catalog request path is independent of the value. + +### Changed + +- **BREAKING (internal API):** removed `app.api.v2_catalog._size_hint_for_row`, `_resolve_remote_metadata`, `_metadata_provider_for`, `_build_metadata_request`, `_materialized_size_hint`, and the in-memory `_metadata_cache` (`TTLCache`). Catalog responses still expose the same enrichment fields (`rows`, `size_bytes`, `partition_by`, `clustered_by`); the new `metadata_freshness` field is additive. External consumers that read the response shape are unaffected. +- `app.api.cache_warmup._warm_metadata_sync` now refreshes the persistent cache via `bq_metadata_refresh.refresh_one` instead of priming an in-memory TTL cache. The existing `/api/admin/cache-warmup/*` endpoints and admin-tables SSE wiring continue to work. + +### Internal + +- Schema v40 migration `_V39_TO_V40_MIGRATIONS` adds the new table; existing instances pick it up on next start. Empty cache is treated as `never_fetched` by the catalog, never as an error. + ## [0.49.1] — 2026-05-11 ### Added diff --git a/app/api/bq_metadata_refresh.py b/app/api/bq_metadata_refresh.py new file mode 100644 index 0000000..94ebab6 --- /dev/null +++ b/app/api/bq_metadata_refresh.py @@ -0,0 +1,310 @@ +"""BigQuery metadata cache refresh — owner of the ``bq_metadata_cache`` +write path. + +Three endpoints share this module: + + - ``POST /api/admin/run-bq-metadata-refresh`` — called by the scheduler + container (auth: shared scheduler token resolves to a synthetic admin + user). Walks remote rows in ``table_registry``, fetches each via the + BigQuery metadata provider, UPSERTs into ``bq_metadata_cache``. + + - ``POST /api/v2/metadata-cache/refresh?table=`` — admin-gated, for + operator on-demand refresh of a single row (e.g. after editing the + registry entry's ``bucket`` / ``source_table``). + + - ``GET /api/v2/metadata-cache/status`` — auth required, NOT admin-only. + Returns per-row freshness so analyst tooling (CLI / Claude Code) can + decide whether to trust the cached numbers or wait for a refresh. + +Why this lives outside the catalog endpoint +------------------------------------------- +Earlier releases inlined a per-row BigQuery fetch into ``GET /api/v2/catalog``. +On cold caches that became O(N) sequential BQ jobs API roundtrips inside +one HTTP request — easily 90 s+ on partitioned tables — and reliably blew +the CLI's 30 s ``httpx.ReadTimeout``. Moving the fetch off the hot path +into a scheduled refresh job (default every 4 h, configurable via +``SCHEDULER_BQ_METADATA_REFRESH_INTERVAL``) keeps the catalog response +under tens of milliseconds even at first boot, at the cost of metadata +being up to one refresh-interval stale. The freshness field surfaces +that explicitly. +""" + +from __future__ import annotations + +import asyncio +import logging +import os +import time +from datetime import datetime, timedelta, timezone +from typing import Any, Optional + +import duckdb +from fastapi import APIRouter, Depends, HTTPException, Query + +from app.auth.access import require_admin +from app.auth.dependencies import _get_db, get_current_user +from src.repositories.bq_metadata_cache import BqMetadataCacheRepository +from src.repositories.table_registry import TableRegistryRepository + +logger = logging.getLogger(__name__) +router = APIRouter() + + +# ─── Freshness thresholds ────────────────────────────────────────────────── + + +def _scheduler_interval_seconds() -> int: + """Return the scheduler's configured refresh interval, mirroring + ``services/scheduler/__main__.py``. We re-read the env var instead + of importing the scheduler module because the scheduler runs in a + sibling container and is not on the app's import path. + """ + raw = os.environ.get("SCHEDULER_BQ_METADATA_REFRESH_INTERVAL") + if raw is None or raw == "": + return 4 * 60 * 60 # 4 h default + try: + value = int(raw) + except (TypeError, ValueError): + return 4 * 60 * 60 + return value if value > 0 else 4 * 60 * 60 + + +def _fresh_threshold_seconds() -> int: + """A row is ``fresh`` when refreshed within this window. + + Two refresh intervals: one refresh might fail (network blip, BQ + throttle); the analyst should keep seeing the last-known-good row + as ``fresh`` until two consecutive refreshes have passed without + success. Beyond that, the response surfaces ``stale`` so the + consumer knows the numbers might be outdated. + """ + return 2 * _scheduler_interval_seconds() + + +def compute_freshness( + cache_row: Optional[dict[str, Any]], + *, + now: Optional[datetime] = None, + fresh_threshold: Optional[int] = None, +) -> str: + """Classify a cache row's freshness. + + - ``never_fetched``: no row, or no successful refresh yet. + - ``fresh``: refreshed within the threshold. + - ``stale``: refreshed earlier than the threshold. + - ``error``: most recent attempt failed and there is no prior success + (success row is preserved across errors — analyst keeps using + last-known-good numbers). + """ + if cache_row is None: + return "never_fetched" + refreshed_at = cache_row.get("refreshed_at") + error_at = cache_row.get("error_at") + if refreshed_at is None: + return "error" if error_at is not None else "never_fetched" + threshold = fresh_threshold if fresh_threshold is not None else _fresh_threshold_seconds() + cutoff = (now or datetime.now(timezone.utc)) - timedelta(seconds=threshold) + # DuckDB returns naive datetimes for TIMESTAMP columns; treat as UTC. + if refreshed_at.tzinfo is None: + refreshed_at = refreshed_at.replace(tzinfo=timezone.utc) + return "fresh" if refreshed_at >= cutoff else "stale" + + +# ─── Single-row refresh primitive ────────────────────────────────────────── + + +def refresh_one(conn: duckdb.DuckDBPyConnection, row: dict[str, Any]) -> dict[str, Any]: + """Fetch BQ metadata for one row and UPSERT the result. + + Synchronous; safe to call from an anyio thread. Returns a small + outcome dict for the caller (counts, audit). + + Failures are absorbed: the cache row's prior success is preserved + (``error_at`` + ``error_msg`` set, ``refreshed_at`` left alone). + """ + from app.api._metadata_models import MetadataRequest + from connectors.bigquery import metadata as bq_metadata + from src.identifier_validation import validate_quoted_identifier + + table_id = row["id"] + bucket = row.get("bucket") or "" + source_table = row.get("source_table") or table_id + repo = BqMetadataCacheRepository(conn) + + if not ( + validate_quoted_identifier(bucket, "bucket") + and validate_quoted_identifier(source_table, "source_table") + ): + repo.mark_error(table_id, "invalid bucket/source_table identifier") + return {"table_id": table_id, "status": "error", "error": "invalid identifier"} + + req = MetadataRequest( + table_id=table_id, bucket=bucket, source_table=source_table, + ) + try: + result = bq_metadata.fetch(req) + except Exception as e: + # bq_metadata.fetch is documented as never-raises, but defense in + # depth: catch any regression so one bad row doesn't kill the + # whole scheduler tick. + msg = f"{type(e).__name__}: {e}" + logger.warning("bq metadata refresh failed for %s: %s", table_id, msg) + repo.mark_error(table_id, msg) + return {"table_id": table_id, "status": "error", "error": msg} + + if result is None: + repo.mark_error(table_id, "provider returned no data") + return {"table_id": table_id, "status": "no_data"} + + repo.upsert_success( + table_id, + rows=result.rows, + size_bytes=result.size_bytes, + partition_by=result.partition_by, + clustered_by=result.clustered_by, + ) + return { + "table_id": table_id, + "status": "ok", + "rows": result.rows, + "size_bytes": result.size_bytes, + } + + +def _list_remote_bq_rows(conn: duckdb.DuckDBPyConnection) -> list[dict[str, Any]]: + rows = TableRegistryRepository(conn).list_all() + return [ + r for r in rows + if r.get("query_mode") == "remote" and r.get("source_type") == "bigquery" + ] + + +def _refresh_concurrency() -> int: + raw = os.environ.get("AGNES_BQ_METADATA_REFRESH_CONCURRENCY", "4") + try: + value = int(raw) + except (TypeError, ValueError): + return 4 + return value if value > 0 else 4 + + +# ─── Endpoints ───────────────────────────────────────────────────────────── + + +@router.post("/api/admin/run-bq-metadata-refresh") +async def run_bq_metadata_refresh( + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Refresh metadata for every remote BQ row in the registry. + + Called by the scheduler at ``SCHEDULER_BQ_METADATA_REFRESH_INTERVAL`` + (default 4 h). Idempotent — running twice in quick succession is + safe but wasteful; the scheduler enforces the interval. + + Bounded concurrency (default 4, override via + ``AGNES_BQ_METADATA_REFRESH_CONCURRENCY``) so a deployment with + many remote tables doesn't fan out to dozens of parallel BQ jobs. + """ + from src.db import get_system_db + + rows = _list_remote_bq_rows(conn) + sem = asyncio.Semaphore(_refresh_concurrency()) + + async def _one(row: dict[str, Any]) -> dict[str, Any]: + async with sem: + # Each refresh_one call wants its own cursor; the singleton + # connection accessor returns a fresh cursor each call. + return await asyncio.to_thread(refresh_one, get_system_db(), row) + + t0 = time.monotonic() + results = await asyncio.gather( + *(_one(r) for r in rows), return_exceptions=True, + ) + duration_ms = int((time.monotonic() - t0) * 1000) + + succeeded = sum( + 1 for r in results if isinstance(r, dict) and r.get("status") == "ok" + ) + no_data = sum( + 1 for r in results if isinstance(r, dict) and r.get("status") == "no_data" + ) + failed = sum( + 1 for r in results + if isinstance(r, Exception) + or (isinstance(r, dict) and r.get("status") == "error") + ) + + logger.info( + "bq metadata refresh: total=%d ok=%d no_data=%d failed=%d duration_ms=%d", + len(rows), succeeded, no_data, failed, duration_ms, + ) + return { + "total": len(rows), + "succeeded": succeeded, + "no_data": no_data, + "failed": failed, + "duration_ms": duration_ms, + } + + +@router.post("/api/v2/metadata-cache/refresh") +async def refresh_one_table( + table: str = Query(..., description="Registry table_id to refresh"), + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Operator on-demand refresh of one row. + + Useful right after editing the registry row (so the catalog reflects + new ``bucket`` / ``source_table`` immediately) or after an upstream + BQ schema change that the operator wants reflected before the next + scheduled tick. + """ + from src.db import get_system_db + + row = TableRegistryRepository(conn).get(table) + if not row: + raise HTTPException(status_code=404, detail=f"Unknown table_id: {table}") + if row.get("query_mode") != "remote" or row.get("source_type") != "bigquery": + raise HTTPException( + status_code=400, + detail="Manual metadata refresh is only meaningful for remote BigQuery tables", + ) + return await asyncio.to_thread(refresh_one, get_system_db(), row) + + +@router.get("/api/v2/metadata-cache/status") +def metadata_cache_status( + user: dict = Depends(get_current_user), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Per-table cache status. Non-admin — analyst tools rely on this to + decide whether to trust the catalog's ``rows`` / ``size_bytes`` or + treat the table as opaque until the next refresh. + """ + cache_rows = BqMetadataCacheRepository(conn).list_all() + threshold = _fresh_threshold_seconds() + now = datetime.now(timezone.utc) + interval = _scheduler_interval_seconds() + tables = [] + for r in cache_rows: + refreshed_at = r.get("refreshed_at") + error_at = r.get("error_at") + tables.append({ + "table_id": r["table_id"], + "refreshed_at": refreshed_at.isoformat() if refreshed_at else None, + "rows": r.get("rows"), + "size_bytes": r.get("size_bytes"), + "partition_by": r.get("partition_by"), + "clustered_by": r.get("clustered_by") or [], + "error_at": error_at.isoformat() if error_at else None, + "error_msg": r.get("error_msg"), + "freshness": compute_freshness(r, now=now, fresh_threshold=threshold), + }) + return { + "scheduler_interval_seconds": interval, + "fresh_threshold_seconds": threshold, + "server_time": now.isoformat(), + "tables": tables, + } diff --git a/app/api/cache_warmup.py b/app/api/cache_warmup.py index 4391f8c..057c1a5 100644 --- a/app/api/cache_warmup.py +++ b/app/api/cache_warmup.py @@ -159,9 +159,16 @@ async def _warm_one( def _warm_metadata_sync(row: dict) -> None: - """Trigger metadata cache populate via the catalog's normal path.""" - from app.api.v2_catalog import _size_hint_for_row - _size_hint_for_row(row) + """Refresh the persistent ``bq_metadata_cache`` row. + + Pre-0.50 this called ``v2_catalog._size_hint_for_row`` to populate + an in-memory TTL cache. The in-memory cache is gone — metadata now + lives in DuckDB, owned by ``app/api/bq_metadata_refresh.refresh_one`` + (the same primitive the scheduler-driven refresh uses). + """ + from app.api.bq_metadata_refresh import refresh_one + from src.db import get_system_db + refresh_one(get_system_db(), row) def _warm_schema_sync(row: dict) -> None: diff --git a/app/api/v2_catalog.py b/app/api/v2_catalog.py index 917beba..e5a3a21 100644 --- a/app/api/v2_catalog.py +++ b/app/api/v2_catalog.py @@ -1,18 +1,32 @@ -"""GET /api/v2/catalog — list tables visible to caller (spec §3.1).""" +"""GET /api/v2/catalog — list tables visible to caller (spec §3.1). + +History note +------------ +0.47.0 enriched remote rows with BigQuery metadata (rows / size_bytes / +partition_by / clustered_by) by fetching from BQ *inside the request* +through a per-table TTL cache. On a cold cache that fanned out to O(N) +sequential BQ jobs API roundtrips and reliably exceeded the CLI's 30 s +``httpx.ReadTimeout`` against partitioned tables. This module now reads +those fields exclusively from the persistent ``bq_metadata_cache`` table +(populated by ``app/api/bq_metadata_refresh.py`` on a scheduler tick). +The request path never calls BQ. +""" from __future__ import annotations + from datetime import datetime, timezone from pathlib import Path -from fastapi import APIRouter, Depends -import duckdb +from typing import Any -from app.auth.dependencies import get_current_user, _get_db +import duckdb +from fastapi import APIRouter, Depends + +from app.api.v2_cache import TTLCache +from app.auth.dependencies import _get_db, get_current_user from app.utils import get_data_dir as _get_data_dir from src.rbac import can_access_table +from src.repositories.bq_metadata_cache import BqMetadataCacheRepository from src.repositories.table_registry import TableRegistryRepository -from app.api.v2_cache import TTLCache -from app.api._metadata_models import MetadataRequest, TableMetadata -from src.identifier_validation import validate_quoted_identifier router = APIRouter(prefix="/api/v2", tags=["v2"]) @@ -27,51 +41,6 @@ router = APIRouter(prefix="/api/v2", tags=["v2"]) _table_rows_cache = TTLCache(maxsize=1, ttl_seconds=300) _TABLE_ROWS_KEY = "all" -# Per-table cached TableMetadata. 15-min TTL — long enough to amortise -# across an analyst session, short enough that a freshly-registered -# remote table shows real numbers within a coffee break (the cache-bust -# path in `invalidate_for_table` accelerates this for the common admin- -# verifies-registration flow). -_metadata_cache = TTLCache(maxsize=512, ttl_seconds=900) - - -def _metadata_provider_for(source_type: str): - """Lazy-import dispatch for source-specific metadata providers. - - Lazy because connector modules are heavy (BQ extension, google-cloud - client, etc.) and a Keboola-only deployment shouldn't pay the BQ - import cost. Returns ``None`` for unknown source types — the caller - treats that as "no metadata enrichment available" and falls through. - """ - if source_type == "bigquery": - from connectors.bigquery import metadata as m - return m.fetch - if source_type == "keboola": - from connectors.keboola import metadata as m - return m.fetch - return None - - -def _build_metadata_request(row: dict) -> MetadataRequest | None: - """Construct a validated MetadataRequest from a registry row. - - Pre-validates the identifiers via `validate_quoted_identifier` before - constructing the request — providers can then interpolate - `req.bucket` / `req.source_table` into SQL/URL paths without - re-checking. Returns ``None`` when validation fails; provider is not - dispatched for that row. - """ - bucket = row.get("bucket") or "" - source_table = row.get("source_table") or row.get("id") or "" - if not bucket or not source_table: - return None - if not (validate_quoted_identifier(bucket, "bucket") - and validate_quoted_identifier(source_table, "source_table")): - return None - return MetadataRequest( - table_id=row["id"], bucket=bucket, source_table=source_table, - ) - def _flavor_for(source_type: str) -> str: return "bigquery" if source_type == "bigquery" else "duckdb" @@ -112,56 +81,11 @@ def _bucket_size(byte_count: int) -> str: return "very_large" -def _size_hint_for_row(row: dict) -> dict: - """Resolve the per-row metadata bundle the catalog response surfaces. - - Renamed from `_materialized_size_hint` (which always also handled - `local` rows; the old name was misleading). Returns a dict with up - to four keys: `rough_size_hint`, `rows`, `size_bytes`, `partition_by`, - `clustered_by`. Missing keys are reported as `null` in the response. - - Branches: - - `local` / `materialized` → existing on-disk parquet stat (cheap). - - `remote` → dispatch to the per-source-type provider; cache the - TableMetadata for 15 min. - """ - table_id = row["id"] - source_type = row.get("source_type") or "" - query_mode = row.get("query_mode") or "local" - - if query_mode in ("local", "materialized"): - return {"rough_size_hint": _materialized_parquet_size_bucket( - table_id, source_type, query_mode, - )} - - if query_mode != "remote": - return {"rough_size_hint": None} - - # Cache lookup (per-row TableMetadata). - cached = _metadata_cache.get(table_id) - if cached is None: - cached = _resolve_remote_metadata(row) - if cached is not None: - _metadata_cache.set(table_id, cached) - - if cached is None: - return {"rough_size_hint": None} - - return { - "rough_size_hint": _bucket_size(cached.size_bytes) if cached.size_bytes is not None else None, - "rows": cached.rows, - "size_bytes": cached.size_bytes, - "partition_by": cached.partition_by, - "clustered_by": cached.clustered_by, - } - - def _materialized_parquet_size_bucket( table_id: str, source_type: str, query_mode: str, ) -> str | None: """Size hint for rows whose data is on the server filesystem - (the old `_materialized_size_hint` body). Renamed for clarity now - that the new dispatcher is the entry point. + (``local`` or ``materialized``). Cheap ``Path.stat()``; never blocks. Layout matches the v2 extract.duckdb contract: ${DATA_DIR}/extracts//data/.parquet @@ -182,21 +106,64 @@ def _materialized_parquet_size_bucket( return None -def _resolve_remote_metadata(row: dict) -> "TableMetadata | None": - """Provider dispatch for a remote row. Returns None on any failure.""" +def _hint_for_row( + row: dict[str, Any], + bq_cache_index: dict[str, dict[str, Any]], +) -> dict[str, Any]: + """Resolve the per-row metadata bundle the catalog response surfaces. + + Branches: + - ``local`` / ``materialized`` → on-disk parquet ``stat()`` (cheap). + - ``remote`` (BigQuery) → pre-computed row from ``bq_metadata_cache``, + populated by the scheduler-driven refresh. Never touches BQ here. + + Always returns ``metadata_freshness`` (``fresh`` / ``stale`` / + ``never_fetched`` / ``error`` / ``not_applicable``) so AI consumers can + decide whether to trust ``rows`` / ``size_bytes`` or treat them as + advisory. + """ + table_id = row["id"] source_type = row.get("source_type") or "" - provider = _metadata_provider_for(source_type) - if provider is None: - return None - req = _build_metadata_request(row) - if req is None: - return None - try: - return provider(req) - except Exception: - # Defense in depth — providers are documented as never-raises, - # but a regression would otherwise 500 the whole catalog. - return None + query_mode = row.get("query_mode") or "local" + + if query_mode in ("local", "materialized"): + return { + "rough_size_hint": _materialized_parquet_size_bucket( + table_id, source_type, query_mode, + ), + "metadata_freshness": "not_applicable", + } + + if query_mode != "remote": + return { + "rough_size_hint": None, + "metadata_freshness": "not_applicable", + } + + # Remote: read from the persistent cache; never call BQ here. + from app.api.bq_metadata_refresh import compute_freshness + cache_row = bq_cache_index.get(table_id) + freshness = compute_freshness(cache_row) + + if cache_row is None: + return { + "rough_size_hint": None, + "rows": None, + "size_bytes": None, + "partition_by": None, + "clustered_by": [], + "metadata_freshness": freshness, + } + + size_bytes = cache_row.get("size_bytes") + return { + "rough_size_hint": _bucket_size(size_bytes) if size_bytes is not None else None, + "rows": cache_row.get("rows"), + "size_bytes": size_bytes, + "partition_by": cache_row.get("partition_by"), + "clustered_by": cache_row.get("clustered_by") or [], + "metadata_freshness": freshness, + } def invalidate_for_table(table_id: str) -> None: @@ -205,14 +172,14 @@ def invalidate_for_table(table_id: str) -> None: by the catalog module so admin.py doesn't need to know which caches exist. - Imports v2_schema and v2_sample lazily — keeps catalog tests from - pulling in BQ-extension imports they don't need. + The persistent ``bq_metadata_cache`` row is NOT invalidated here — + the scheduler-driven refresh owns that lifecycle. Admins who need + an immediate refresh after a registry edit should hit + ``POST /api/v2/metadata-cache/refresh?table=``. """ - import asyncio - from app.api import v2_schema, v2_sample + from app.api import v2_sample, v2_schema _table_rows_cache.clear() - _metadata_cache.invalidate(table_id) v2_schema._schema_cache.invalidate(table_id) # Sample cache key is `f"{table_id}|{n}"`; clearing the whole sample # cache is heavier than precise invalidation, but registry-change @@ -220,36 +187,6 @@ def invalidate_for_table(table_id: str) -> None: # adding a prefix-invalidation primitive to TTLCache. v2_sample._sample_cache.clear() - # Schedule a single-row re-warm so admins editing a registry row - # see fresh data within a couple of seconds rather than waiting for - # the next analyst to trigger a miss. Fire-and-forget; failures - # log + skip inside the coroutine. - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - if loop is not None: - # Running inside an async context (production FastAPI path). - asyncio.create_task(_rewarm_one_row(table_id)) - # No running event loop (e.g. called from a sync test or a sync - # handler thread). Skip re-warm — the next live request will - # populate via miss. - - -async def _rewarm_one_row(table_id: str) -> None: - """Background single-row re-warm. Imports cache_warmup lazily to - avoid a circular import at module load (cache_warmup.py is created - in Task 10; until then, this function logs a warning and returns).""" - try: - from app.api.cache_warmup import warm_one_table - await warm_one_table(table_id) - except Exception: - import logging - logging.getLogger(__name__).warning( - "single-row re-warm failed for %s — next live request will populate", - table_id, - ) - def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: rows = _table_rows_cache.get(_TABLE_ROWS_KEY) @@ -258,6 +195,12 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: rows = repo.list_all() _table_rows_cache.set(_TABLE_ROWS_KEY, rows) + # One DB read for all remote-row metadata. Indexed by table_id so the + # per-row loop below stays O(N). + bq_cache_index: dict[str, dict[str, Any]] = { + r["table_id"]: r for r in BqMetadataCacheRepository(conn).list_all() + } + # RBAC is enforced fresh per request. Revoking a user's access to a # table takes effect on their next call to this endpoint, not after the # cache TTL expires. @@ -265,7 +208,7 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: for r in rows: if not can_access_table(user, r["id"], conn): continue - hint = _size_hint_for_row(r) + hint = _hint_for_row(r, bq_cache_index) visible.append({ "id": r["id"], "name": r.get("name") or r["id"], @@ -279,7 +222,8 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: "rows": hint.get("rows"), "size_bytes": hint.get("size_bytes"), "partition_by": hint.get("partition_by"), - "clustered_by": hint.get("clustered_by"), + "clustered_by": hint.get("clustered_by") or [], + "metadata_freshness": hint.get("metadata_freshness"), }) return { @@ -294,12 +238,6 @@ def catalog( conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): # Plain ``def`` so FastAPI auto-offloads to the anyio thread pool — - # build_catalog now calls `_size_hint_for_row` for every visible row, - # which does sync `Path.stat()` / `Path.exists()` on the data volume - # (local/materialized) or provider dispatch (remote). On local FS - # that's microseconds, but on a network-mounted DATA_DIR (NFS / CIFS / - # GCS-FUSE) those calls can block. Plain ``def`` means each request - # runs on its own thread; the event loop stays free for non-catalog - # traffic. Mirrors the Tier 1 conversion of /api/query, /api/v2/scan, - # /api/v2/sample, /api/v2/schema — Devin Review on PR #188. + # the request path is pure local I/O (DuckDB reads + filesystem + # stat()) and uses a sync DuckDB cursor. return build_catalog(conn, user) diff --git a/app/main.py b/app/main.py index 6367c2d..0ef02b2 100644 --- a/app/main.py +++ b/app/main.py @@ -117,6 +117,7 @@ from app.api.welcome import router as welcome_router from app.api.claude_md import router as claude_md_router from app.api.news import router as news_router from app.api.cache_warmup import router as cache_warmup_router +from app.api.bq_metadata_refresh import router as bq_metadata_refresh_router from app.marketplace_server.router import router as marketplace_server_router from app.marketplace_server.git_router import make_git_wsgi_app from app.web.router import router as web_router @@ -598,6 +599,7 @@ def create_app() -> FastAPI: app.include_router(claude_md_router) app.include_router(news_router) app.include_router(cache_warmup_router) + app.include_router(bq_metadata_refresh_router) app.include_router(marketplace_server_router) # Git smart-HTTP endpoint for Claude Code: /marketplace.git/* diff --git a/pyproject.toml b/pyproject.toml index 08cda40..be7ff7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.49.1" +version = "0.50.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py index f336d69..e045fc6 100644 --- a/services/scheduler/__main__.py +++ b/services/scheduler/__main__.py @@ -90,6 +90,14 @@ _DEFAULTS = { "SCHEDULER_VERIFICATION_DETECTOR_INTERVAL": 15 * 60, "SCHEDULER_USAGE_PROCESSOR_INTERVAL": 10 * 60, "SCHEDULER_CORPORATE_MEMORY_INTERVAL": 17 * 60, + # BigQuery metadata refresh: walks remote registry rows and updates the + # persistent ``bq_metadata_cache``. Default 4 h — long enough that the + # cumulative BQ jobs API cost stays negligible on a typical 10–50-table + # registry, short enough that operator-edited tables show real numbers + # within an analyst's working day. Hot reads of ``/api/v2/catalog`` go + # to DuckDB, never to BQ, so this can be tuned freely without touching + # request-path latency. + "SCHEDULER_BQ_METADATA_REFRESH_INTERVAL": 4 * 60 * 60, } @@ -149,8 +157,9 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]: verify = _read_positive_int("SCHEDULER_VERIFICATION_DETECTOR_INTERVAL") usage = _read_positive_int("SCHEDULER_USAGE_PROCESSOR_INTERVAL") corpmem = _read_positive_int("SCHEDULER_CORPORATE_MEMORY_INTERVAL") + bqmeta = _read_positive_int("SCHEDULER_BQ_METADATA_REFRESH_INTERVAL") tick = _read_positive_int("SCHEDULER_TICK_SECONDS") - smallest = min(refresh, health, scripts, sess, verify, usage, corpmem) + smallest = min(refresh, health, scripts, sess, verify, usage, corpmem, bqmeta) if tick > smallest: raise ValueError( f"SCHEDULER_TICK_SECONDS={tick} must be <= the smallest job " @@ -193,6 +202,14 @@ def build_jobs() -> list[tuple[str, str, str, str, int]]: # to review_error so admin can retry. Cheap (one indexed # SELECT + N small UPDATEs); short timeout sufficient. ("store-reap-stuck-reviews", "every 15m", "/api/admin/run-reap-stuck-reviews", "POST", 60), + # BigQuery metadata refresh — keeps ``bq_metadata_cache`` warm so + # ``GET /api/v2/catalog`` never has to call BQ at request time. + # 30-min timeout is generous; on a 10-table dev registry the + # observed full refresh ran in ~7 min when two view-backed rows + # took 7 min each. Bounded concurrency + # (``AGNES_BQ_METADATA_REFRESH_CONCURRENCY``, default 4) caps the + # tail. + ("bq-metadata-refresh", _seconds_to_schedule(bqmeta), "/api/admin/run-bq-metadata-refresh", "POST", 1800), ] _running = True diff --git a/src/db.py b/src/db.py index 07f0dc2..6e27b6c 100644 --- a/src/db.py +++ b/src/db.py @@ -40,7 +40,7 @@ def _maybe_instrument(con, db_tag: str): _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 39 +SCHEMA_VERSION = 40 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -640,6 +640,38 @@ CREATE INDEX IF NOT EXISTS idx_store_submissions_entity ON store_submissions(ent -- (reproduced with N=2 against 3 rows during /admin/store/submissions -- paging). Submissions table is admin-only and bounded by upload -- volume, so the index buys little; dropping it sidesteps the bug. + +-- v40: persistent metadata cache for remote sources (BigQuery initially). +-- Replaces the per-request, in-memory `_metadata_cache` in v2_catalog.py +-- that turned every cold-cache /api/v2/catalog into a sequence of N×3 BQ +-- jobs API calls (one TABLE_STORAGE + COLUMNS pair per remote row) — long +-- enough on view-backed or partitioned tables (>>30 s) to blow the CLI's +-- httpx 30 s read timeout. Now refresh is driven exclusively by the +-- scheduler (default every 4 h, `SCHEDULER_BQ_METADATA_REFRESH_INTERVAL`), +-- and the catalog endpoint just reads this table — no BQ at request time. +-- +-- Columns: +-- table_id — registry.id; PK and join key with table_registry. +-- rows / size_bytes / partition_by / clustered_by — last successful +-- provider result. NULL when the table has never +-- been fetched, or fetch failed before any success. +-- clustered_by stored as JSON array of column names. +-- refreshed_at — wall-clock of the last successful fetch. Used by +-- the catalog response to compute metadata_freshness +-- (`fresh` if < 2× scheduler interval old, `stale` +-- otherwise, `never_fetched` if NULL). +-- error_at / error_msg — last failure timestamp + redacted message. +-- NULL after the next successful refresh. +CREATE TABLE IF NOT EXISTS bq_metadata_cache ( + table_id VARCHAR PRIMARY KEY, + rows BIGINT, + size_bytes BIGINT, + partition_by VARCHAR, + clustered_by JSON, + refreshed_at TIMESTAMP, + error_at TIMESTAMP, + error_msg VARCHAR +); """ @@ -2489,6 +2521,27 @@ _V38_TO_V39_MIGRATIONS = [ ] +# v40: bq_metadata_cache table. Existing DBs get an empty table; the next +# scheduler tick (or app startup warmup) populates it. The catalog endpoint +# treats absence-of-row as `metadata_freshness: never_fetched` and returns +# NULL for the optional fields rather than failing — analyst tooling already +# tolerates NULL rows / size_bytes from the pre-0.47 contract. +_V39_TO_V40_MIGRATIONS = [ + """ + CREATE TABLE IF NOT EXISTS bq_metadata_cache ( + table_id VARCHAR PRIMARY KEY, + rows BIGINT, + size_bytes BIGINT, + partition_by VARCHAR, + clustered_by JSON, + refreshed_at TIMESTAMP, + error_at TIMESTAMP, + error_msg VARCHAR + ) + """, +] + + _V33_TO_V34_MIGRATIONS = [ # DuckDB blocks DROP COLUMN while indexes reference the table # ("Dependency Error: Cannot alter entry … because there are entries @@ -2882,6 +2935,9 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 39: for sql in _V38_TO_V39_MIGRATIONS: conn.execute(sql) + if current < 40: + for sql in _V39_TO_V40_MIGRATIONS: + conn.execute(sql) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/src/repositories/bq_metadata_cache.py b/src/repositories/bq_metadata_cache.py new file mode 100644 index 0000000..e4d85c4 --- /dev/null +++ b/src/repositories/bq_metadata_cache.py @@ -0,0 +1,120 @@ +"""Repository for the persistent BigQuery metadata cache. + +Backs the v40 ``bq_metadata_cache`` table. Reads are called from the +hot path (``/api/v2/catalog``); writes only from the scheduler-driven +refresh job in ``app/api/bq_metadata_refresh.py`` and from operator- +triggered single-row refreshes via ``/api/v2/metadata-cache/refresh``. + +clustered_by is stored as a JSON array of column-name strings and +returned to callers as a list (decoded here, never raw JSON). +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any, Optional + +import duckdb + + +def _decode_clustered_by(stored: Any) -> Optional[list[str]]: + if stored is None: + return None + if isinstance(stored, list): + return [str(x) for x in stored] + if isinstance(stored, str): + try: + parsed = json.loads(stored) + except json.JSONDecodeError: + return None + return [str(x) for x in parsed] if isinstance(parsed, list) else None + return None + + +def _row_to_dict(conn: duckdb.DuckDBPyConnection, row: tuple) -> dict[str, Any]: + columns = [desc[0] for desc in conn.description] + out: dict[str, Any] = dict(zip(columns, row)) + out["clustered_by"] = _decode_clustered_by(out.get("clustered_by")) + return out + + +class BqMetadataCacheRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def get(self, table_id: str) -> Optional[dict[str, Any]]: + result = self.conn.execute( + "SELECT * FROM bq_metadata_cache WHERE table_id = ?", + [table_id], + ).fetchone() + if not result: + return None + return _row_to_dict(self.conn, result) + + def list_all(self) -> list[dict[str, Any]]: + results = self.conn.execute( + "SELECT * FROM bq_metadata_cache ORDER BY table_id" + ).fetchall() + if not results: + return [] + columns = [desc[0] for desc in self.conn.description] + out: list[dict[str, Any]] = [] + for r in results: + row = dict(zip(columns, r)) + row["clustered_by"] = _decode_clustered_by(row.get("clustered_by")) + out.append(row) + return out + + def upsert_success( + self, + table_id: str, + *, + rows: Optional[int], + size_bytes: Optional[int], + partition_by: Optional[str], + clustered_by: Optional[list[str]], + ) -> None: + """Record a successful refresh. Clears any prior error_at/error_msg.""" + now = datetime.now(timezone.utc) + clustered_json = ( + json.dumps(list(clustered_by)) if clustered_by is not None else None + ) + self.conn.execute( + """INSERT INTO bq_metadata_cache + (table_id, rows, size_bytes, partition_by, clustered_by, + refreshed_at, error_at, error_msg) + VALUES (?, ?, ?, ?, ?, ?, NULL, NULL) + ON CONFLICT (table_id) DO UPDATE SET + rows = excluded.rows, + size_bytes = excluded.size_bytes, + partition_by = excluded.partition_by, + clustered_by = excluded.clustered_by, + refreshed_at = excluded.refreshed_at, + error_at = NULL, + error_msg = NULL""", + [table_id, rows, size_bytes, partition_by, clustered_json, now], + ) + + def mark_error(self, table_id: str, error_msg: str) -> None: + """Record a failed refresh. Preserves the prior success row (if any) + so analyst Claude keeps using last-known-good rows + size_bytes while + the next scheduled retry attempts to recover.""" + now = datetime.now(timezone.utc) + truncated = (error_msg or "")[:512] # bound storage + self.conn.execute( + """INSERT INTO bq_metadata_cache + (table_id, rows, size_bytes, partition_by, clustered_by, + refreshed_at, error_at, error_msg) + VALUES (?, NULL, NULL, NULL, NULL, NULL, ?, ?) + ON CONFLICT (table_id) DO UPDATE SET + error_at = excluded.error_at, + error_msg = excluded.error_msg""", + [table_id, now, truncated], + ) + + def delete(self, table_id: str) -> None: + """Drop a row — used by admin endpoints when a table is unregistered.""" + self.conn.execute( + "DELETE FROM bq_metadata_cache WHERE table_id = ?", [table_id] + ) diff --git a/tests/conftest.py b/tests/conftest.py index 8a8a366..d6eeb03 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -84,7 +84,6 @@ def _reset_module_caches(): try: from app.api import v2_catalog as _vc _vc._table_rows_cache.clear() - _vc._metadata_cache.clear() except (ImportError, AttributeError): pass try: @@ -96,7 +95,6 @@ def _reset_module_caches(): try: from app.api import v2_catalog as _vc _vc._table_rows_cache.clear() - _vc._metadata_cache.clear() except (ImportError, AttributeError): pass try: diff --git a/tests/test_bq_metadata_cache_repo.py b/tests/test_bq_metadata_cache_repo.py new file mode 100644 index 0000000..1b42a59 --- /dev/null +++ b/tests/test_bq_metadata_cache_repo.py @@ -0,0 +1,160 @@ +"""Repository + freshness tests for the persistent BQ metadata cache.""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from src.repositories.bq_metadata_cache import BqMetadataCacheRepository + + +def test_upsert_success_inserts_then_updates(seeded_app): + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.upsert_success( + "orders", rows=10, size_bytes=2048, + partition_by="event_date", clustered_by=["country"], + ) + row = repo.get("orders") + assert row is not None + assert row["rows"] == 10 + assert row["size_bytes"] == 2048 + assert row["partition_by"] == "event_date" + assert row["clustered_by"] == ["country"] + assert row["refreshed_at"] is not None + assert row["error_at"] is None + + # Update with new numbers; refreshed_at advances. + first_refresh = row["refreshed_at"] + repo.upsert_success( + "orders", rows=20, size_bytes=4096, + partition_by=None, clustered_by=[], + ) + row2 = repo.get("orders") + assert row2["rows"] == 20 + assert row2["partition_by"] is None + assert row2["clustered_by"] == [] + assert row2["refreshed_at"] >= first_refresh + finally: + conn.close() + + +def test_mark_error_preserves_prior_success(seeded_app): + """After a successful refresh, a subsequent failure must keep the + rows/size_bytes columns untouched — analyst Claude keeps using the + last-known-good numbers while the next scheduled retry attempts to + recover.""" + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.upsert_success( + "orders", rows=100, size_bytes=1000, + partition_by=None, clustered_by=None, + ) + repo.mark_error("orders", "BQ timeout") + row = repo.get("orders") + assert row["rows"] == 100, "prior success must be preserved across error" + assert row["size_bytes"] == 1000 + assert row["error_at"] is not None + assert row["error_msg"] == "BQ timeout" + # Subsequent success clears the error. + repo.upsert_success( + "orders", rows=200, size_bytes=2000, + partition_by=None, clustered_by=None, + ) + row2 = repo.get("orders") + assert row2["rows"] == 200 + assert row2["error_at"] is None + assert row2["error_msg"] is None + finally: + conn.close() + + +def test_mark_error_truncates_long_messages(seeded_app): + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.mark_error("orders", "x" * 2000) + row = repo.get("orders") + assert len(row["error_msg"]) == 512 + finally: + conn.close() + + +def test_list_all_orders_by_table_id(seeded_app): + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.upsert_success( + "zeta", rows=1, size_bytes=1, partition_by=None, clustered_by=None, + ) + repo.upsert_success( + "alpha", rows=2, size_bytes=2, partition_by=None, clustered_by=None, + ) + rows = repo.list_all() + ids = [r["table_id"] for r in rows] + assert ids == sorted(ids) + finally: + conn.close() + + +def test_delete_removes_row(seeded_app): + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.upsert_success( + "orders", rows=1, size_bytes=1, partition_by=None, clustered_by=None, + ) + repo.delete("orders") + assert repo.get("orders") is None + finally: + conn.close() + + +# ─── compute_freshness ──────────────────────────────────────────────────── + + +def test_freshness_never_fetched_for_missing_row(): + from app.api.bq_metadata_refresh import compute_freshness + assert compute_freshness(None) == "never_fetched" + + +def test_freshness_never_fetched_for_no_refresh_no_error(): + from app.api.bq_metadata_refresh import compute_freshness + row = {"refreshed_at": None, "error_at": None} + assert compute_freshness(row) == "never_fetched" + + +def test_freshness_error_when_only_error_present(): + from app.api.bq_metadata_refresh import compute_freshness + row = { + "refreshed_at": None, + "error_at": datetime.now(timezone.utc), + } + assert compute_freshness(row) == "error" + + +def test_freshness_fresh_within_threshold(): + from app.api.bq_metadata_refresh import compute_freshness + now = datetime.now(timezone.utc) + row = { + "refreshed_at": now - timedelta(seconds=60), + "error_at": None, + } + # 1-minute-old row with a 1-hour threshold ⇒ fresh. + assert compute_freshness(row, now=now, fresh_threshold=3600) == "fresh" + + +def test_freshness_stale_beyond_threshold(): + from app.api.bq_metadata_refresh import compute_freshness + now = datetime.now(timezone.utc) + row = { + "refreshed_at": now - timedelta(hours=10), + "error_at": None, + } + assert compute_freshness(row, now=now, fresh_threshold=3600) == "stale" diff --git a/tests/test_bq_metadata_refresh_endpoint.py b/tests/test_bq_metadata_refresh_endpoint.py new file mode 100644 index 0000000..e6c8d68 --- /dev/null +++ b/tests/test_bq_metadata_refresh_endpoint.py @@ -0,0 +1,211 @@ +"""End-to-end tests for the three bq_metadata_refresh endpoints.""" + +from unittest.mock import patch + +from app.api._metadata_models import TableMetadata + + +def _register_remote(seeded_app, table_id: str): + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + conn = get_system_db() + try: + TableRegistryRepository(conn).register( + name=table_id, + id=table_id, + source_type="bigquery", + bucket="dwh_base", + source_table=table_id, + query_mode="remote", + ) + finally: + conn.close() + + +# ─── POST /api/admin/run-bq-metadata-refresh ────────────────────────────── + + +def test_run_refresh_walks_remote_rows_and_upserts(seeded_app): + from src.db import get_system_db + from src.repositories.bq_metadata_cache import BqMetadataCacheRepository + + _register_remote(seeded_app, "a_remote") + _register_remote(seeded_app, "b_remote") + + fake = TableMetadata( + rows=5, size_bytes=512, partition_by="d", clustered_by=["c"], + ) + + c = seeded_app["client"] + token = seeded_app["admin_token"] + with patch("connectors.bigquery.metadata.fetch", return_value=fake): + r = c.post( + "/api/admin/run-bq-metadata-refresh", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["total"] >= 2 + assert body["succeeded"] >= 2 + assert body["failed"] == 0 + + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + for tid in ("a_remote", "b_remote"): + row = repo.get(tid) + assert row is not None + assert row["rows"] == 5 + assert row["size_bytes"] == 512 + assert row["partition_by"] == "d" + assert row["clustered_by"] == ["c"] + finally: + conn.close() + + +def test_run_refresh_marks_error_on_provider_failure(seeded_app): + from src.db import get_system_db + from src.repositories.bq_metadata_cache import BqMetadataCacheRepository + + _register_remote(seeded_app, "boom") + + c = seeded_app["client"] + token = seeded_app["admin_token"] + with patch( + "connectors.bigquery.metadata.fetch", + side_effect=RuntimeError("BQ throttle"), + ): + r = c.post( + "/api/admin/run-bq-metadata-refresh", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200 + body = r.json() + assert body["failed"] >= 1 + + conn = get_system_db() + try: + row = BqMetadataCacheRepository(conn).get("boom") + assert row is not None + assert row["error_at"] is not None + assert "BQ throttle" in (row["error_msg"] or "") + finally: + conn.close() + + +def test_run_refresh_requires_admin(seeded_app): + c = seeded_app["client"] + # No Authorization header → 401. + r = c.post("/api/admin/run-bq-metadata-refresh") + assert r.status_code == 401 + + +# ─── POST /api/v2/metadata-cache/refresh?table= ─────────────────────────── + + +def test_refresh_one_table_endpoint(seeded_app): + from src.db import get_system_db + from src.repositories.bq_metadata_cache import BqMetadataCacheRepository + + _register_remote(seeded_app, "single") + + c = seeded_app["client"] + token = seeded_app["admin_token"] + fake = TableMetadata(rows=99, size_bytes=999) + with patch("connectors.bigquery.metadata.fetch", return_value=fake): + r = c.post( + "/api/v2/metadata-cache/refresh?table=single", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + assert r.json()["status"] == "ok" + + conn = get_system_db() + try: + row = BqMetadataCacheRepository(conn).get("single") + assert row["rows"] == 99 + finally: + conn.close() + + +def test_refresh_one_table_unknown_id_returns_404(seeded_app): + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/v2/metadata-cache/refresh?table=does_not_exist", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 404 + + +def test_refresh_one_table_rejects_non_remote(seeded_app): + from src.db import get_system_db + from src.repositories.table_registry import TableRegistryRepository + + conn = get_system_db() + try: + TableRegistryRepository(conn).register( + name="local_t", + id="local_t", + source_type="keboola", + bucket="in.c-x", + source_table="t", + query_mode="local", + ) + finally: + conn.close() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/v2/metadata-cache/refresh?table=local_t", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 400 + + +# ─── GET /api/v2/metadata-cache/status ──────────────────────────────────── + + +def test_status_endpoint_returns_per_row_freshness(seeded_app): + from src.db import get_system_db + from src.repositories.bq_metadata_cache import BqMetadataCacheRepository + + conn = get_system_db() + try: + BqMetadataCacheRepository(conn).upsert_success( + "orders", rows=1, size_bytes=1, partition_by=None, clustered_by=None, + ) + finally: + conn.close() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.get( + "/api/v2/metadata-cache/status", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + body = r.json() + assert "scheduler_interval_seconds" in body + assert "fresh_threshold_seconds" in body + assert body["fresh_threshold_seconds"] == 2 * body["scheduler_interval_seconds"] + orders = next(t for t in body["tables"] if t["table_id"] == "orders") + assert orders["freshness"] == "fresh" + + +def test_status_endpoint_does_not_require_admin(seeded_app): + """Non-admin analyst tools (CLI, Claude Code) need this surface.""" + c = seeded_app["client"] + # No token at all → 401 (auth still required, just not admin). + r = c.get("/api/v2/metadata-cache/status") + assert r.status_code == 401 + # Any authenticated user works — seeded_app's admin_token is the + # easiest valid bearer; downgrade once the test harness exposes a + # plain-user token. + token = seeded_app["admin_token"] + r = c.get( + "/api/v2/metadata-cache/status", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200 diff --git a/tests/test_scheduler_sidecar.py b/tests/test_scheduler_sidecar.py index c356921..ffc33a1 100644 --- a/tests/test_scheduler_sidecar.py +++ b/tests/test_scheduler_sidecar.py @@ -18,9 +18,17 @@ def test_build_jobs_uses_documented_defaults(monkeypatch): assert jobs["health-check"] == "every 5m" assert jobs["script-runner"] == "every 1m" assert jobs["marketplaces"] == "daily 03:00" + assert jobs["bq-metadata-refresh"] == "every 4h" assert resolved_tick_seconds() == 30 +def test_build_jobs_honors_bq_metadata_env_override(monkeypatch): + monkeypatch.setenv("SCHEDULER_BQ_METADATA_REFRESH_INTERVAL", "7200") # 2h + from services.scheduler.__main__ import build_jobs + jobs = {name: schedule for name, schedule, *_ in build_jobs()} + assert jobs["bq-metadata-refresh"] == "every 2h" + + def test_build_jobs_honors_env_overrides(monkeypatch): monkeypatch.setenv("SCHEDULER_DATA_REFRESH_INTERVAL", "1800") # 30m monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60") # 1m diff --git a/tests/test_v2_catalog_dispatcher.py b/tests/test_v2_catalog_dispatcher.py deleted file mode 100644 index 607597d..0000000 --- a/tests/test_v2_catalog_dispatcher.py +++ /dev/null @@ -1,71 +0,0 @@ -"""Dispatch + identifier-validation gate for the source-agnostic -metadata providers.""" - -from app.api._metadata_models import MetadataRequest - - -def test_dispatcher_returns_bq_provider_for_bigquery(): - from app.api.v2_catalog import _metadata_provider_for - from connectors.bigquery import metadata as bq_meta - fn = _metadata_provider_for("bigquery") - assert fn is bq_meta.fetch - - -def test_dispatcher_returns_keboola_provider_for_keboola(): - from app.api.v2_catalog import _metadata_provider_for - from connectors.keboola import metadata as kb_meta - fn = _metadata_provider_for("keboola") - assert fn is kb_meta.fetch - - -def test_dispatcher_returns_none_for_unknown_source(): - from app.api.v2_catalog import _metadata_provider_for - assert _metadata_provider_for("jira") is None - assert _metadata_provider_for("") is None - assert _metadata_provider_for("snowflake") is None - - -def test_build_metadata_request_for_valid_row(): - from app.api.v2_catalog import _build_metadata_request - req = _build_metadata_request({ - "id": "orders", - "bucket": "dwh_base", - "source_table": "orders_2024", - }) - assert isinstance(req, MetadataRequest) - assert req.table_id == "orders" - assert req.bucket == "dwh_base" - assert req.source_table == "orders_2024" - - -def test_build_metadata_request_rejects_unsafe_bucket(): - from app.api.v2_catalog import _build_metadata_request - req = _build_metadata_request({ - "id": "x", - "bucket": "evil`; DROP--", - "source_table": "t", - }) - assert req is None - - -def test_build_metadata_request_falls_back_to_id_when_source_table_missing(): - """Some legacy Keboola registry rows have empty source_table; the row id - is the table name in that case (mirrors v2_schema:168 behavior).""" - from app.api.v2_catalog import _build_metadata_request - req = _build_metadata_request({ - "id": "orders", - "bucket": "in.c-crm", - "source_table": "", - }) - assert req is not None - assert req.source_table == "orders" - - -def test_stub_providers_return_none(): - """Providers don't have their real bodies yet — stubs return None - so the catalog endpoint stays 200 while we wire the rest.""" - from connectors.bigquery import metadata as bq_meta - from connectors.keboola import metadata as kb_meta - req = MetadataRequest(table_id="x", bucket="b", source_table="t") - assert bq_meta.fetch(req) is None - assert kb_meta.fetch(req) is None diff --git a/tests/test_v2_catalog_invalidation.py b/tests/test_v2_catalog_invalidation.py index fae5517..e8194b9 100644 --- a/tests/test_v2_catalog_invalidation.py +++ b/tests/test_v2_catalog_invalidation.py @@ -1,47 +1,58 @@ -"""Unified cache flush across all four catalog/schema/sample/metadata -caches on registry write.""" +"""Unified cache flush across the three in-memory catalog/schema/sample +caches on registry write. -from unittest.mock import patch +Post-0.50: the persistent ``bq_metadata_cache`` is intentionally NOT +invalidated here. That table's lifecycle is owned by the scheduler- +driven refresh — admins who need an immediate refresh after editing a +remote row hit ``POST /api/v2/metadata-cache/refresh?table=`` +explicitly. Auto-invalidation on every registry edit would re-introduce +the request-path BQ fan-out the refactor exists to avoid. +""" + +from src.db import get_system_db +from src.repositories.bq_metadata_cache import BqMetadataCacheRepository -def test_invalidate_flushes_all_four_caches(): +def test_invalidate_flushes_three_in_memory_caches(): from app.api import v2_catalog, v2_schema, v2_sample - from app.api._metadata_models import TableMetadata # Pre-populate. v2_catalog._table_rows_cache.set("all", ["fake_row"]) - v2_catalog._metadata_cache.set("orders", TableMetadata(rows=10)) v2_schema._schema_cache.set("orders", {"columns": []}) v2_sample._sample_cache.set("orders|10", [{"row": 1}]) v2_catalog.invalidate_for_table("orders") assert v2_catalog._table_rows_cache.get("all") is None - assert v2_catalog._metadata_cache.get("orders") is None assert v2_schema._schema_cache.get("orders") is None # Sample cache is cleared whole (we don't have prefix-invalidation). assert v2_sample._sample_cache.get("orders|10") is None -def test_invalidate_schedules_single_row_rewarm(monkeypatch): - """After the flush, a background re-warm task is scheduled for the - same table_id. Assert via patching create_task.""" - import asyncio +def test_invalidate_does_not_touch_persistent_bq_cache(): + """The persistent cache survives registry-row invalidations; only an + explicit ``POST /api/v2/metadata-cache/refresh`` (or the scheduled + refresh) should change it.""" from app.api import v2_catalog - scheduled = [] + conn = get_system_db() + try: + BqMetadataCacheRepository(conn).upsert_success( + "survives_invalidate", + rows=42, size_bytes=4096, partition_by=None, clustered_by=None, + ) + finally: + conn.close() - def fake_create_task(coro): - # Drain the coroutine so the test doesn't leak it. - coro.close() - scheduled.append(coro) - return None + v2_catalog.invalidate_for_table("survives_invalidate") - # Simulate a running event loop so the create_task branch is reached. - monkeypatch.setattr(asyncio, "get_running_loop", lambda: object()) - monkeypatch.setattr(asyncio, "create_task", fake_create_task) - v2_catalog.invalidate_for_table("orders") - assert len(scheduled) == 1 + conn = get_system_db() + try: + row = BqMetadataCacheRepository(conn).get("survives_invalidate") + finally: + conn.close() + assert row is not None + assert row["rows"] == 42 def test_register_table_invalidates(seeded_app): diff --git a/tests/test_v2_catalog_remote_metadata.py b/tests/test_v2_catalog_remote_metadata.py index c813bb6..6c9751a 100644 --- a/tests/test_v2_catalog_remote_metadata.py +++ b/tests/test_v2_catalog_remote_metadata.py @@ -1,10 +1,16 @@ """Catalog endpoint integration: per-table metadata enrichment for -remote rows.""" +remote rows. + +Post-0.50 the catalog endpoint reads enrichment fields exclusively from +the persistent ``bq_metadata_cache`` table (populated by the scheduler- +driven refresh in ``app/api/bq_metadata_refresh.py``). These tests +pre-seed cache rows and verify the catalog response shape; they do NOT +mock ``connectors.bigquery.metadata.fetch`` because that path is no +longer reachable from the catalog request. +""" from unittest.mock import patch -from app.api._metadata_models import TableMetadata - def _register_table(seeded_app, **kwargs): """Register a table into the test DB using TableRegistryRepository.""" @@ -13,42 +19,64 @@ def _register_table(seeded_app, **kwargs): conn = get_system_db() try: repo = TableRegistryRepository(conn) - # `name` defaults to `id` if not supplied name = kwargs.pop("name", kwargs.get("id")) repo.register(name=name, **kwargs) finally: conn.close() -def test_remote_row_includes_metadata_fields(seeded_app, monkeypatch): - """Catalog response for a query_mode='remote' BQ row carries the four - new fields populated by the provider.""" - # Reset catalog row cache so this test's registered table is visible. +def _seed_cache_row( + table_id: str, + *, + rows=None, + size_bytes=None, + partition_by=None, + clustered_by=None, +): + """Insert a successful refresh row into bq_metadata_cache.""" + from src.db import get_system_db + from src.repositories.bq_metadata_cache import BqMetadataCacheRepository + conn = get_system_db() + try: + BqMetadataCacheRepository(conn).upsert_success( + table_id, + rows=rows, + size_bytes=size_bytes, + partition_by=partition_by, + clustered_by=clustered_by, + ) + finally: + conn.close() + + +def _reset_catalog_caches(): from app.api import v2_catalog v2_catalog._table_rows_cache.clear() - v2_catalog._metadata_cache.clear() + + +def test_remote_row_includes_metadata_fields(seeded_app): + """Catalog response for a query_mode='remote' BQ row carries the four + enrichment fields read from the persistent cache.""" + _reset_catalog_caches() c = seeded_app["client"] token = seeded_app["admin_token"] - fake_meta = TableMetadata( - rows=10000, size_bytes=2_000_000, - partition_by="event_date", clustered_by=["country", "platform"], - ) - _register_table( seeded_app, id="orders", source_type="bigquery", bucket="dwh_base", source_table="orders_2024", query_mode="remote", ) + _seed_cache_row( + "orders", + rows=10000, size_bytes=2_000_000, + partition_by="event_date", clustered_by=["country", "platform"], + ) - with patch( - "connectors.bigquery.metadata.fetch", return_value=fake_meta, - ): - r = c.get( - "/api/v2/catalog", - headers={"Authorization": f"Bearer {token}"}, - ) + r = c.get( + "/api/v2/catalog", + headers={"Authorization": f"Bearer {token}"}, + ) assert r.status_code == 200, r.text tables = r.json()["tables"] orders = next(t for t in tables if t["id"] == "orders") @@ -56,15 +84,46 @@ def test_remote_row_includes_metadata_fields(seeded_app, monkeypatch): assert orders["size_bytes"] == 2_000_000 assert orders["partition_by"] == "event_date" assert orders["clustered_by"] == ["country", "platform"] - # Existing fields still present. assert orders["query_mode"] == "remote" + assert orders["metadata_freshness"] == "fresh" -def test_local_row_unaffected_by_provider_dispatch(seeded_app): - """query_mode='local' rows take the parquet-stat path; provider not called.""" - from app.api import v2_catalog - v2_catalog._table_rows_cache.clear() - v2_catalog._metadata_cache.clear() +def test_remote_row_with_no_cache_returns_null_fields(seeded_app): + """Catalog response for a remote row with no cache entry — first boot + before scheduler tick — returns null enrichment fields and + metadata_freshness='never_fetched'. MUST stay 200; MUST NOT call BQ.""" + _reset_catalog_caches() + + c = seeded_app["client"] + token = seeded_app["admin_token"] + _register_table( + seeded_app, + id="cold_t", source_type="bigquery", bucket="dwh_base", + source_table="cold_t", query_mode="remote", + ) + + # Patch the BQ provider so we can prove the request path never reaches it. + with patch("connectors.bigquery.metadata.fetch") as mock_fetch: + r = c.get( + "/api/v2/catalog", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + mock_fetch.assert_not_called() + + tables = r.json()["tables"] + cold = next(t for t in tables if t["id"] == "cold_t") + assert cold["rows"] is None + assert cold["size_bytes"] is None + assert cold["partition_by"] is None + assert cold["clustered_by"] == [] + assert cold["metadata_freshness"] == "never_fetched" + + +def test_local_row_metadata_freshness_is_not_applicable(seeded_app): + """query_mode='local' rows take the parquet-stat path; the freshness + field signals that the BQ cache concept doesn't apply.""" + _reset_catalog_caches() c = seeded_app["client"] token = seeded_app["admin_token"] @@ -74,93 +133,47 @@ def test_local_row_unaffected_by_provider_dispatch(seeded_app): source_table="users", query_mode="local", ) - with patch("connectors.keboola.metadata.fetch") as mock_fetch: - r = c.get( - "/api/v2/catalog", - headers={"Authorization": f"Bearer {token}"}, - ) - assert r.status_code == 200, r.text - mock_fetch.assert_not_called() - - -def test_provider_failure_returns_null_metadata(seeded_app): - """Provider returns None → row appears with null new fields, not - a 500. Catalog endpoint must stay 200.""" - from app.api import v2_catalog - v2_catalog._table_rows_cache.clear() - v2_catalog._metadata_cache.clear() - - c = seeded_app["client"] - token = seeded_app["admin_token"] - _register_table( - seeded_app, - id="broken", source_type="bigquery", bucket="dwh_base", - source_table="broken_t", query_mode="remote", + r = c.get( + "/api/v2/catalog", + headers={"Authorization": f"Bearer {token}"}, ) - - with patch( - "connectors.bigquery.metadata.fetch", return_value=None, - ): - r = c.get( - "/api/v2/catalog", - headers={"Authorization": f"Bearer {token}"}, - ) assert r.status_code == 200, r.text tables = r.json()["tables"] - broken = next(t for t in tables if t["id"] == "broken") - assert broken["rows"] is None - assert broken["size_bytes"] is None - assert broken["partition_by"] is None - assert broken["clustered_by"] is None + users = next(t for t in tables if t["id"] == "users") + assert users["metadata_freshness"] == "not_applicable" def test_zero_size_bytes_reports_small_not_unknown(seeded_app): - """Devin Review #1 regression: `if cached.size_bytes:` is falsy when - `size_bytes == 0` (genuinely empty table) — that wrongly emitted - `rough_size_hint=None` ("unknown") instead of `"small"` (the bucket - `_bucket_size(0)` returns). - - Fix in `_size_hint_for_row`: distinguish "size known to be zero" from - "size is unknown" with `is not None`.""" - from app.api import v2_catalog - v2_catalog._table_rows_cache.clear() - v2_catalog._metadata_cache.clear() + """Devin Review #1 regression preserved across the refactor: a cache + row with size_bytes=0 must surface rough_size_hint='small', not None. + """ + _reset_catalog_caches() c = seeded_app["client"] token = seeded_app["admin_token"] - - fake_meta = TableMetadata( - rows=0, size_bytes=0, partition_by=None, clustered_by=[], - ) - _register_table( seeded_app, id="empty_t", source_type="bigquery", bucket="dwh_base", source_table="empty_t", query_mode="remote", ) + _seed_cache_row("empty_t", rows=0, size_bytes=0, clustered_by=[]) - with patch( - "connectors.bigquery.metadata.fetch", return_value=fake_meta, - ): - r = c.get( - "/api/v2/catalog", - headers={"Authorization": f"Bearer {token}"}, - ) + r = c.get( + "/api/v2/catalog", + headers={"Authorization": f"Bearer {token}"}, + ) assert r.status_code == 200, r.text tables = r.json()["tables"] empty = next(t for t in tables if t["id"] == "empty_t") - # The whole point of this test: 0 bytes is NOT "unknown". assert empty["size_bytes"] == 0 - assert empty["rough_size_hint"] == "small", ( - f"size_bytes=0 should bucket to 'small', got {empty['rough_size_hint']}" - ) + assert empty["rough_size_hint"] == "small" -def test_cache_hit_does_not_call_provider_twice(seeded_app): - """First call invokes provider; second within 15 min hits cache.""" - from app.api import v2_catalog - v2_catalog._table_rows_cache.clear() - v2_catalog._metadata_cache.clear() +def test_catalog_request_never_calls_bq(seeded_app): + """The whole point of the refactor: even with a cold cache and a + remote BQ row in the registry, GET /api/v2/catalog MUST NOT touch + the BQ provider. Regressing this re-introduces the >90 s hang.""" + _reset_catalog_caches() c = seeded_app["client"] token = seeded_app["admin_token"] @@ -170,10 +183,8 @@ def test_cache_hit_does_not_call_provider_twice(seeded_app): source_table="orders_2024", query_mode="remote", ) - fake_meta = TableMetadata(rows=1, size_bytes=2) - with patch( - "connectors.bigquery.metadata.fetch", return_value=fake_meta, - ) as mock_fetch: + with patch("connectors.bigquery.metadata.fetch") as mock_fetch: c.get("/api/v2/catalog", headers={"Authorization": f"Bearer {token}"}) c.get("/api/v2/catalog", headers={"Authorization": f"Bearer {token}"}) - assert mock_fetch.call_count == 1 + + mock_fetch.assert_not_called()