From b6cdd68e8db00031f34a1ec74f8cff5541d34011 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Tue, 12 May 2026 10:37:35 +0200 Subject: [PATCH] feat(catalog): entity_type + validated where_examples + view-aware cost-guard + scheduler hygiene MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three behavioural improvements driven by the sub-agent end-to-end test findings, plus scheduler tweaks to prevent the post-deploy contention burst we measured. CATALOG (catalog-side bugs the test agents tripped on): - new entity_type field per remote row (BASE TABLE / VIEW / MATERIALIZED VIEW). For views, rows + size_bytes return null instead of the misleading 0 that __TABLES__ reports. - where_examples now validates against the table's actual schema (cached known_columns from refresh). The pre-fix behavior blindly advertised `country_code = 'CZ'` on tables with no country_code column — the sub-agent tests reliably hit this on unit_economics. - new known_columns + entity_type columns on bq_metadata_cache; populated by bq_metadata_refresh.refresh_one from the same fetch_bq_columns_full call (no extra BQ roundtrip) plus a cheap INFORMATION_SCHEMA.TABLES lookup for table_type. QUERY COST-GUARD: - remote_scan_too_large suggestion now names views explicitly: `Target(s) are VIEW or MATERIALIZED VIEW. BigQuery does not push LIMIT into the view body — SELECT * FROM LIMIT 1 still runs the full underlying scan.` Programmatic consumers get a new view_targets field on the error detail. SCHEDULER HYGIENE (the post-deploy 1-minute window where concurrent parquet downloads dropped to ~1 MB/s): - SCHEDULER_STARTUP_GRACE_SECONDS (default 60) holds the first tick so the burst doesn't overlap cache_warmup writes. - SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS (default 900) randomises bq-metadata-refresh's first-fire offset. TESTS: - test_bq_metadata_cache_repo: entity_type + known_columns round-trip - test_v2_catalog_remote_metadata: where_examples validation, views return null rows/size_bytes, cold rows have empty examples - test_api_query_guardrail: VIEW-aware suggestion text + view_targets - test_connectors_bigquery_metadata: entity_type lookup mock + new fields in TableMetadata expectations - test_scheduler_sidecar: grace + jitter env-var resolution --- CHANGELOG.md | 3 + app/api/_metadata_models.py | 14 +++ app/api/bq_metadata_refresh.py | 5 + app/api/query.py | 70 +++++++++++++- app/api/v2_catalog.py | 49 ++++++++-- connectors/bigquery/metadata.py | 61 +++++++++++- services/scheduler/__main__.py | 99 ++++++++++++++++++- src/db.py | 42 +++++++-- src/repositories/bq_metadata_cache.py | 64 ++++++++++--- tests/test_api_query_guardrail.py | 41 ++++++++ tests/test_bq_metadata_cache_repo.py | 60 ++++++++++++ tests/test_connectors_bigquery_metadata.py | 15 +++ tests/test_scheduler_sidecar.py | 56 +++++++++++ tests/test_v2_catalog_remote_metadata.py | 105 ++++++++++++++++++++- 14 files changed, 643 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66ef2ac..e0f3c41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,9 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Internal - Schema v40 migration `_V39_TO_V40_MIGRATIONS` adds the new table; existing instances pick it up on next start. Empty cache is treated as `never_fetched` by the catalog, never as an error. +- **`entity_type` + `known_columns` on `bq_metadata_cache`** (still v40). `entity_type` mirrors `INFORMATION_SCHEMA.TABLES.table_type` (`BASE TABLE` / `VIEW` / `MATERIALIZED VIEW` / `EXTERNAL` / `SNAPSHOT` / `CLONE`); catalog surfaces it per row and hides `rows` / `size_bytes` for views (which `__TABLES__` reports as zero) so analyst tooling sees explicit "unknown" rather than a misleading 0. `known_columns` caches the most recent successful `INFORMATION_SCHEMA.COLUMNS` fetch so the catalog endpoint can filter its generic `where_examples` templates against the table's real schema — the prior behavior of always advertising `country_code = 'CZ'` on tables without that column is gone. New columns are idempotently added via ALTER on existing v40 instances. +- **`/api/query` cost-guard message names views explicitly.** When `remote_scan_too_large` fires on a query whose target is classified `VIEW` or `MATERIALIZED VIEW`, the suggestion text tells the analyst directly that `LIMIT` does not push into the view body and that `agnes snapshot create` is the right path. New `view_targets` field on the error detail surfaces the matched registry IDs to programmatic consumers. +- **Scheduler post-deploy hygiene.** `SCHEDULER_STARTUP_GRACE_SECONDS` (default 60) pauses the scheduler's first tick after container start so its "everything is due" burst doesn't overlap the app's own startup `cache_warmup` writes — observed to drop concurrent parquet downloads from ~3 MB/s to ~1 MB/s for ~2 minutes under the previous behavior. `SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS` (default 900) randomises the `bq-metadata-refresh` job's first-fire offset so two scheduler containers brought up close in time don't synchronise their refresh ticks. - **DuckDB lower bound bumped from `>=0.9.0` to `>=1.5.2`.** 1.5.1 had a regression where `ALTER TABLE … ADD COLUMN IF NOT EXISTS` was rejected with `Cannot alter entry … because there are entries that depend on it` when the target table was FK-referenced from another table; the migration ladder hit this on `internal_roles` (v8→v9) and `user_groups` (v11→v12) when replayed from old schema_version. 1.5.2 restores the previous behavior. CI was already on 1.5.2; this just pins the same floor for local devs. - `tests/test_cli_binary_rename.py::test_agnes_command_exists` now skips with an actionable message instead of failing when the local venv has no `agnes` on PATH or the binary is a stale shim from a prior editable install. CI installs the package fresh and still asserts the real contract. diff --git a/app/api/_metadata_models.py b/app/api/_metadata_models.py index 36981f5..0609de3 100644 --- a/app/api/_metadata_models.py +++ b/app/api/_metadata_models.py @@ -33,8 +33,22 @@ class TableMetadata: field here is a non-breaking change: existing CLI consumers don't even render `rough_size_hint` (verified `grep -rn rough_size_hint cli/` is empty), let alone the new fields. + + ``entity_type`` for BigQuery mirrors INFORMATION_SCHEMA.TABLES.table_type + (``BASE TABLE`` / ``VIEW`` / ``MATERIALIZED VIEW`` / ``EXTERNAL`` / + ``SNAPSHOT`` / ``CLONE``). Catalog uses it to hide misleading + ``rows=0, size_bytes=0`` for VIEWs (which __TABLES__ reports as zero) + and to inject a "LIMIT doesn't push into view body" hint into + cost-guard errors when a remote query targets a VIEW. + + ``known_columns`` is the list of column names from the same refresh + that populated this row. Catalog endpoint filters generic + ``where_examples`` templates against this list — drops example + predicates that reference columns the table doesn't have. """ rows: int | None = None size_bytes: int | None = None partition_by: str | None = None clustered_by: list[str] | None = None + entity_type: str | None = None + known_columns: list[str] | None = None diff --git a/app/api/bq_metadata_refresh.py b/app/api/bq_metadata_refresh.py index 94ebab6..9732152 100644 --- a/app/api/bq_metadata_refresh.py +++ b/app/api/bq_metadata_refresh.py @@ -162,12 +162,15 @@ def refresh_one(conn: duckdb.DuckDBPyConnection, row: dict[str, Any]) -> dict[st size_bytes=result.size_bytes, partition_by=result.partition_by, clustered_by=result.clustered_by, + entity_type=result.entity_type, + known_columns=result.known_columns, ) return { "table_id": table_id, "status": "ok", "rows": result.rows, "size_bytes": result.size_bytes, + "entity_type": result.entity_type, } @@ -298,6 +301,8 @@ def metadata_cache_status( "size_bytes": r.get("size_bytes"), "partition_by": r.get("partition_by"), "clustered_by": r.get("clustered_by") or [], + "entity_type": r.get("entity_type"), + "known_columns": r.get("known_columns") or [], "error_at": error_at.isoformat() if error_at else None, "error_msg": r.get("error_msg"), "freshness": compute_freshness(r, now=now, fresh_threshold=threshold), diff --git a/app/api/query.py b/app/api/query.py index 4268fca..c8a17c6 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -931,6 +931,52 @@ def _rewrite_user_sql_for_bigquery_query( return rewritten, True +def _view_targets_in(dry_run_set: list) -> list[str]: + """Return registry IDs from ``dry_run_set`` whose ``bq_metadata_cache`` + row classifies them as ``VIEW`` or ``MATERIALIZED VIEW``. + + Used to enrich the ``remote_scan_too_large`` error message: when the + target is a view, BigQuery does NOT push ``LIMIT`` into the view body, + so a `SELECT * FROM LIMIT 1` still scans the full underlying + tables. Telling the analyst that explicitly saves them from retrying + with the same query expecting different results. + + Best-effort: any lookup failure returns ``[]`` so the original error + message still ships. The catalog is the source of truth for entity_type; + if the bq_metadata_cache hasn't been refreshed yet for a table, that + table is silently skipped (we just won't add the VIEW hint for it). + """ + if not dry_run_set: + return [] + try: + from src.db import get_system_db + conn = get_system_db() + try: + pairs = [(b, t) for b, t, _ in dry_run_set] + # Build a parameterized OR of (bucket, source_table) pairs. + # DuckDB supports row-tuple IN but keeping it explicit OR + # avoids any version-specific syntax surprises. + where = " OR ".join( + "(tr.bucket = ? AND tr.source_table = ?)" for _ in pairs + ) + params: list = [] + for b, t in pairs: + params.extend([b, t]) + sql_ = ( + f"SELECT mc.table_id " + f"FROM bq_metadata_cache mc " + f"JOIN table_registry tr ON tr.id = mc.table_id " + f"WHERE mc.entity_type IN ('VIEW', 'MATERIALIZED VIEW') " + f"AND ({where})" + ) + rows = conn.execute(sql_, params).fetchall() + return [r[0] for r in rows] + finally: + conn.close() + except Exception: + return [] + + @contextlib.contextmanager def _bq_quota_and_cap_guard( *, @@ -1115,16 +1161,30 @@ def _bq_quota_and_cap_guard( if cap_bytes > 0 and total_bytes > cap_bytes: tables = [f"{b}.{t}" for b, t, _ in dry_run_set] + view_targets = _view_targets_in(dry_run_set) + if view_targets: + suggestion = ( + f"Target(s) {', '.join(view_targets)} are VIEW or " + "MATERIALIZED VIEW. BigQuery does not push `LIMIT` " + "into the view body — `SELECT * FROM LIMIT 1` " + "still runs the full underlying scan. Use " + "`agnes snapshot create --select --where " + "` to bound the scan, then query the " + "snapshot locally." + ) + else: + suggestion = ( + "Use `agnes snapshot create --select " + "--where --estimate` to materialize a " + "filtered subset, then query the snapshot locally." + ) raise HTTPException(status_code=400, detail={ "reason": "remote_scan_too_large", "scan_bytes": total_bytes, "limit_bytes": cap_bytes, "tables": tables, - "suggestion": ( - "Use `agnes snapshot create --select --where " - "--estimate` to materialize a filtered subset, then query " - "the snapshot locally." - ), + "view_targets": view_targets, + "suggestion": suggestion, }) # Yield control to the handler — slot stays acquired while the diff --git a/app/api/v2_catalog.py b/app/api/v2_catalog.py index e5a3a21..d766c8f 100644 --- a/app/api/v2_catalog.py +++ b/app/api/v2_catalog.py @@ -46,13 +46,35 @@ def _flavor_for(source_type: str) -> str: return "bigquery" if source_type == "bigquery" else "duckdb" -def _examples_for(source_type: str) -> list[str]: - if source_type == "bigquery": - return [ - "event_date > DATE '2026-01-01'", - "country_code = 'CZ' AND platform = 'web'", - ] - return [] +# Generic ``where_examples`` templates the catalog surfaces as a starting +# point for AI consumers. Each entry is a tuple of ``(predicate_text, +# required_columns)``: the template is only included in the response when +# every required column is present in the table's actual schema (from +# ``bq_metadata_cache.known_columns``). This prevents the old behavior of +# always advertising ``country_code = 'CZ'`` on tables that have no +# ``country_code`` column at all. +_BQ_WHERE_TEMPLATES: tuple[tuple[str, tuple[str, ...]], ...] = ( + ("event_date > DATE '2026-01-01'", ("event_date",)), + ("country_code = 'CZ' AND platform = 'web'", ("country_code", "platform")), +) + + +def _examples_for(source_type: str, known_columns: list[str] | None) -> list[str]: + """Return generic ``where_examples`` filtered against the table's + actual columns. ``known_columns`` comes from the persistent metadata + cache; when it is unknown (None) or empty, return an empty list + instead of a possibly-wrong template — silence is better than + misleading hints for AI consumers.""" + if source_type != "bigquery": + return [] + if not known_columns: + return [] + cols = set(known_columns) + return [ + predicate + for predicate, required in _BQ_WHERE_TEMPLATES + if all(c in cols for c in required) + ] def _fetch_hint(table_id: str, source_type: str) -> str: @@ -131,12 +153,16 @@ def _hint_for_row( "rough_size_hint": _materialized_parquet_size_bucket( table_id, source_type, query_mode, ), + "entity_type": None, + "known_columns": [], "metadata_freshness": "not_applicable", } if query_mode != "remote": return { "rough_size_hint": None, + "entity_type": None, + "known_columns": [], "metadata_freshness": "not_applicable", } @@ -152,6 +178,8 @@ def _hint_for_row( "size_bytes": None, "partition_by": None, "clustered_by": [], + "entity_type": None, + "known_columns": [], "metadata_freshness": freshness, } @@ -162,6 +190,8 @@ def _hint_for_row( "size_bytes": size_bytes, "partition_by": cache_row.get("partition_by"), "clustered_by": cache_row.get("clustered_by") or [], + "entity_type": cache_row.get("entity_type"), + "known_columns": cache_row.get("known_columns") or [], "metadata_freshness": freshness, } @@ -216,13 +246,16 @@ def build_catalog(conn: duckdb.DuckDBPyConnection, user: dict) -> dict: "source_type": r.get("source_type") or "", "query_mode": r.get("query_mode") or "local", "sql_flavor": _flavor_for(r.get("source_type") or ""), - "where_examples": _examples_for(r.get("source_type") or ""), + "where_examples": _examples_for( + r.get("source_type") or "", hint.get("known_columns"), + ), "fetch_via": _fetch_hint(r["id"], r.get("source_type") or ""), "rough_size_hint": hint.get("rough_size_hint"), "rows": hint.get("rows"), "size_bytes": hint.get("size_bytes"), "partition_by": hint.get("partition_by"), "clustered_by": hint.get("clustered_by") or [], + "entity_type": hint.get("entity_type"), "metadata_freshness": hint.get("metadata_freshness"), }) diff --git a/connectors/bigquery/metadata.py b/connectors/bigquery/metadata.py index 178fa4e..d67c866 100644 --- a/connectors/bigquery/metadata.py +++ b/connectors/bigquery/metadata.py @@ -54,18 +54,73 @@ def fetch(req: MetadataRequest) -> TableMetadata | None: rows_size = _fetch_rows_and_size(bq, req) columns = fetch_bq_columns_full(bq, req.bucket, req.source_table) part_clust = _derive_partition_cluster(columns) if columns else None + entity_type = _fetch_entity_type(bq, req) + known_columns = [c["name"] for c in columns] if columns else None - if rows_size is None and part_clust is None: + if ( + rows_size is None + and part_clust is None + and entity_type is None + and not known_columns + ): return None + # For VIEW / MATERIALIZED VIEW the __TABLES__ fallback returns + # ``(0, 0)`` for ``row_count`` and ``size_bytes`` — accurate for the + # storage layer (views have no own storage) but misleading for + # analysts. Surface ``None`` so catalog consumers see explicit + # "unknown" rather than a confidently-wrong zero. + if entity_type in ("VIEW", "MATERIALIZED VIEW"): + rows_value = None + size_value = None + else: + rows_value = (rows_size or {}).get("rows") + size_value = (rows_size or {}).get("size_bytes") + return TableMetadata( - rows=(rows_size or {}).get("rows"), - size_bytes=(rows_size or {}).get("size_bytes"), + rows=rows_value, + size_bytes=size_value, partition_by=(part_clust or {}).get("partition_by"), clustered_by=(part_clust or {}).get("clustered_by"), + entity_type=entity_type, + known_columns=known_columns, ) +def _fetch_entity_type(bq, req: MetadataRequest) -> str | None: + """Look up ``INFORMATION_SCHEMA.TABLES.table_type`` for the table. + + Single dataset-scoped query, no region required. Returns one of the + documented BQ values (``BASE TABLE``, ``VIEW``, ``MATERIALIZED VIEW``, + ``EXTERNAL``, ``SNAPSHOT``, ``CLONE``) or ``None`` if the lookup + fails / the row isn't found. + + ``req.bucket`` and ``req.source_table`` are pre-validated by + `app/api/v2_catalog._build_metadata_request`, so direct interpolation + into the backtick-quoted path is safe. + """ + try: + bq_sql = ( + f"SELECT table_type " + f"FROM `{bq.projects.data}.{req.bucket}.INFORMATION_SCHEMA.TABLES` " + f"WHERE table_name = ?" + ) + with bq.duckdb_session() as conn: + row = conn.execute( + "SELECT * FROM bigquery_query(?, ?, ?)", + [bq.projects.billing, bq_sql, req.source_table], + ).fetchone() + except Exception as e: + logger.warning( + "BQ INFORMATION_SCHEMA.TABLES lookup failed for %s.%s.%s: %s", + bq.projects.data, req.bucket, req.source_table, e, + ) + return None + if row is None or row[0] is None: + return None + return str(row[0]) + + def _derive_partition_cluster(columns: list[dict]) -> dict | None: """Mirror v2_schema._fetch_bq_table_options derivations from the shared columns-full result.""" diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py index e045fc6..e44195b 100644 --- a/services/scheduler/__main__.py +++ b/services/scheduler/__main__.py @@ -98,6 +98,12 @@ _DEFAULTS = { # to DuckDB, never to BQ, so this can be tuned freely without touching # request-path latency. "SCHEDULER_BQ_METADATA_REFRESH_INTERVAL": 4 * 60 * 60, + # Pause between scheduler startup and the first tick. Keeps the + # scheduler from synchronising its "Table never synced, marking as + # due" burst with the app's own startup cache_warmup (which writes + # heavily to the system DB for several seconds). Set to 0 to disable + # — useful in tests that need deterministic-fast first-tick. + "SCHEDULER_STARTUP_GRACE_SECONDS": 60, } @@ -124,6 +130,29 @@ def _read_positive_int(name: str) -> int: return value +def _read_non_negative_int(name: str) -> int: + """Like ``_read_positive_int`` but also accepts ``0`` as a valid value. + + Used for knobs where ``0`` means "disable" rather than "operator typo" + — e.g. ``SCHEDULER_STARTUP_GRACE_SECONDS=0`` legitimately skips the + startup pause in unit tests / fast-iteration dev setups. + """ + raw = os.environ.get(name) + if raw is None: + if name not in _DEFAULTS: + raise ValueError(f"Unknown scheduler env var: {name}") + return _DEFAULTS[name] + if raw == "": + raise ValueError(f"{name}='' must be a non-negative integer (seconds)") + try: + value = int(raw) + except (TypeError, ValueError): + raise ValueError(f"{name}={raw!r} must be a non-negative integer (seconds)") + if value < 0: + raise ValueError(f"{name}={value} must be >= 0 (seconds)") + return value + + def _seconds_to_schedule(seconds: int) -> str: """Convert a seconds value to the closest 'every Nm' / 'every Nh' string. @@ -144,6 +173,45 @@ def resolved_tick_seconds() -> int: return _read_positive_int("SCHEDULER_TICK_SECONDS") +def resolved_startup_grace_seconds() -> int: + """Read SCHEDULER_STARTUP_GRACE_SECONDS (default 60). + + Sleep duration between scheduler startup and the first tick. Mitigates + the "post-deploy contention burst" where the scheduler's "everything + is due" first tick (5+ paralle HTTP POSTs against the just-restarted + app) overlaps the app's own startup ``cache_warmup`` job, doubling + disk I/O on the host's boot disk and dropping concurrent parquet + downloads from ~3 MB/s to ~1 MB/s for the duration of the burst. + """ + return _read_non_negative_int("SCHEDULER_STARTUP_GRACE_SECONDS") + + +def resolved_bq_metadata_initial_offset_seconds(rng=None) -> int: + """Random startup-jitter for ``bq-metadata-refresh``. + + Returns a value in ``[0, BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS]`` + that the run loop uses to fake a recent ``last_run`` for the + ``bq-metadata-refresh`` job at startup. With ``last_run = now - jitter`` + and the default 4 h interval, the first refresh fires + ``interval - jitter`` seconds after the startup grace finishes + (≈ 3 h 45 m to 4 h). This intentionally suppresses an immediate + refresh on every container start — the app's own ``cache_warmup`` + already populates the persistent cache at startup, so a duplicate + refresh from the scheduler would just compete for disk I/O while + adding nothing. + + ``rng`` injectable for deterministic tests. + """ + import random as _random + cap = int(os.environ.get( + "SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS", "900", + )) + if cap <= 0: + return 0 + r = rng or _random.Random() + return r.randint(0, cap) + + def build_jobs() -> list[tuple[str, str, str, str, int]]: """Build the JOBS list from env, applying defaults and validation. @@ -259,14 +327,41 @@ def run(): jobs = build_jobs() tick = resolved_tick_seconds() + grace = resolved_startup_grace_seconds() + bqmeta_offset = resolved_bq_metadata_initial_offset_seconds() logger.info( - "Scheduler started. API_URL=%s, %d jobs, tick=%ds. Schedules: %s", - API_URL, len(jobs), tick, + "Scheduler started. API_URL=%s, %d jobs, tick=%ds, " + "startup_grace=%ds, bq_metadata_initial_offset=%ds. Schedules: %s", + API_URL, len(jobs), tick, grace, bqmeta_offset, {name: schedule for name, schedule, *_ in jobs}, ) + # Startup grace — see ``resolved_startup_grace_seconds`` for why. + # Honors SIGTERM by polling _running in short slices, so an operator + # `docker compose stop` during grace doesn't hang for ~60s. + grace_remaining = grace + while grace_remaining > 0 and _running: + time.sleep(min(grace_remaining, 5)) + grace_remaining -= 5 + if not _running: + logger.info("Scheduler shutdown during startup grace; exiting.") + return + last_run: dict[str, str | None] = {name: None for name, *_ in jobs} + # Suppress the first ``bq-metadata-refresh`` fire by pretending the + # job ran ``bqmeta_offset`` seconds ago at startup. ``is_table_due`` + # will then wait the remainder of the configured interval before + # firing for the first time. Two scheduler containers that came up + # within seconds of each other will pick different offsets and stop + # synchronising their refresh ticks against one another. + if bqmeta_offset > 0: + from datetime import timedelta as _td + offset_ago = ( + datetime.now(timezone.utc) - _td(seconds=bqmeta_offset) + ).isoformat() + last_run["bq-metadata-refresh"] = offset_ago + # Per-tick concurrency: one thread per job slot, so a 900s verification # run can't block the 60s health-check or the 30s data-refresh from # firing on their own cadences (PR #232 review fix). Pure I/O workload diff --git a/src/db.py b/src/db.py index 6e27b6c..a699134 100644 --- a/src/db.py +++ b/src/db.py @@ -663,15 +663,33 @@ CREATE INDEX IF NOT EXISTS idx_store_submissions_entity ON store_submissions(ent -- error_at / error_msg — last failure timestamp + redacted message. -- NULL after the next successful refresh. CREATE TABLE IF NOT EXISTS bq_metadata_cache ( - table_id VARCHAR PRIMARY KEY, - rows BIGINT, - size_bytes BIGINT, - partition_by VARCHAR, - clustered_by JSON, - refreshed_at TIMESTAMP, - error_at TIMESTAMP, - error_msg VARCHAR + table_id VARCHAR PRIMARY KEY, + rows BIGINT, + size_bytes BIGINT, + partition_by VARCHAR, + clustered_by JSON, + -- BigQuery entity classification, surfaced in catalog so analyst Claude + -- can decide query strategy. Values mirror INFORMATION_SCHEMA.TABLES. + -- table_type: `BASE TABLE`, `VIEW`, `MATERIALIZED VIEW`, `EXTERNAL`, + -- `SNAPSHOT`, `CLONE`. NULL until first successful refresh. + entity_type VARCHAR, + -- Cache of known column names from the most recent successful refresh, + -- as JSON array of strings. Used by /api/v2/catalog to filter generic + -- where_examples against the table's actual schema — drops example + -- predicates that reference columns the table doesn't have. Populated + -- by bq_metadata_refresh.refresh_one from fetch_bq_columns_full, so + -- there is no extra BQ roundtrip just for this. + known_columns JSON, + refreshed_at TIMESTAMP, + error_at TIMESTAMP, + error_msg VARCHAR ); +-- Self-heal for instances that already ran an earlier v40 incarnation +-- that lacked entity_type / known_columns. The CREATE TABLE above is +-- IF NOT EXISTS so it skips on already-existing tables; these ALTERs +-- close the column-set gap. Idempotent on fresh installs (no-op). +ALTER TABLE bq_metadata_cache ADD COLUMN IF NOT EXISTS entity_type VARCHAR; +ALTER TABLE bq_metadata_cache ADD COLUMN IF NOT EXISTS known_columns JSON; """ @@ -2534,11 +2552,19 @@ _V39_TO_V40_MIGRATIONS = [ size_bytes BIGINT, partition_by VARCHAR, clustered_by JSON, + entity_type VARCHAR, + known_columns JSON, refreshed_at TIMESTAMP, error_at TIMESTAMP, error_msg VARCHAR ) """, + # entity_type + known_columns may be absent on instances that picked + # up the early v40 (`bq_metadata_cache` without these columns) before + # the field was added. IF NOT EXISTS makes the ALTERs idempotent for + # the fresh-create path above and additive for the upgrade path. + "ALTER TABLE bq_metadata_cache ADD COLUMN IF NOT EXISTS entity_type VARCHAR", + "ALTER TABLE bq_metadata_cache ADD COLUMN IF NOT EXISTS known_columns JSON", ] diff --git a/src/repositories/bq_metadata_cache.py b/src/repositories/bq_metadata_cache.py index e4d85c4..9f36187 100644 --- a/src/repositories/bq_metadata_cache.py +++ b/src/repositories/bq_metadata_cache.py @@ -18,7 +18,13 @@ from typing import Any, Optional import duckdb -def _decode_clustered_by(stored: Any) -> Optional[list[str]]: +def _decode_string_list(stored: Any) -> Optional[list[str]]: + """Decode a JSON-array-of-strings column back into a Python list. + + Shared by clustered_by and known_columns — both store + ``["col_a", "col_b"]``-shaped JSON. Tolerates lists (already decoded + by DuckDB) and JSON strings (round-tripped from disk). + """ if stored is None: return None if isinstance(stored, list): @@ -32,10 +38,15 @@ def _decode_clustered_by(stored: Any) -> Optional[list[str]]: return None +# Backwards-compat alias used in tests written against the old name. +_decode_clustered_by = _decode_string_list + + def _row_to_dict(conn: duckdb.DuckDBPyConnection, row: tuple) -> dict[str, Any]: columns = [desc[0] for desc in conn.description] out: dict[str, Any] = dict(zip(columns, row)) - out["clustered_by"] = _decode_clustered_by(out.get("clustered_by")) + out["clustered_by"] = _decode_string_list(out.get("clustered_by")) + out["known_columns"] = _decode_string_list(out.get("known_columns")) return out @@ -62,7 +73,8 @@ class BqMetadataCacheRepository: out: list[dict[str, Any]] = [] for r in results: row = dict(zip(columns, r)) - row["clustered_by"] = _decode_clustered_by(row.get("clustered_by")) + row["clustered_by"] = _decode_string_list(row.get("clustered_by")) + row["known_columns"] = _decode_string_list(row.get("known_columns")) out.append(row) return out @@ -74,26 +86,49 @@ class BqMetadataCacheRepository: size_bytes: Optional[int], partition_by: Optional[str], clustered_by: Optional[list[str]], + entity_type: Optional[str] = None, + known_columns: Optional[list[str]] = None, ) -> None: - """Record a successful refresh. Clears any prior error_at/error_msg.""" + """Record a successful refresh. Clears any prior error_at/error_msg. + + ``entity_type`` is the BigQuery ``INFORMATION_SCHEMA.TABLES.table_type`` + (``BASE TABLE`` / ``VIEW`` / ``MATERIALIZED VIEW`` / …). Catalog uses + it to (a) hide rows/size_bytes for views (where __TABLES__ returns + 0 and the value is misleading) and (b) inject a "VIEW: LIMIT doesn't + push" hint into cost-guard errors. + + ``known_columns`` is the list of column names from the refresh's + ``fetch_bq_columns_full`` call — stored so the catalog endpoint can + filter its generic ``where_examples`` templates against the table's + real schema instead of advertising columns the table doesn't have. + """ now = datetime.now(timezone.utc) clustered_json = ( json.dumps(list(clustered_by)) if clustered_by is not None else None ) + known_columns_json = ( + json.dumps(list(known_columns)) if known_columns is not None else None + ) self.conn.execute( """INSERT INTO bq_metadata_cache (table_id, rows, size_bytes, partition_by, clustered_by, + entity_type, known_columns, refreshed_at, error_at, error_msg) - VALUES (?, ?, ?, ?, ?, ?, NULL, NULL) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL) ON CONFLICT (table_id) DO UPDATE SET - rows = excluded.rows, - size_bytes = excluded.size_bytes, - partition_by = excluded.partition_by, - clustered_by = excluded.clustered_by, - refreshed_at = excluded.refreshed_at, - error_at = NULL, - error_msg = NULL""", - [table_id, rows, size_bytes, partition_by, clustered_json, now], + rows = excluded.rows, + size_bytes = excluded.size_bytes, + partition_by = excluded.partition_by, + clustered_by = excluded.clustered_by, + entity_type = excluded.entity_type, + known_columns = excluded.known_columns, + refreshed_at = excluded.refreshed_at, + error_at = NULL, + error_msg = NULL""", + [ + table_id, rows, size_bytes, partition_by, clustered_json, + entity_type, known_columns_json, now, + ], ) def mark_error(self, table_id: str, error_msg: str) -> None: @@ -105,8 +140,9 @@ class BqMetadataCacheRepository: self.conn.execute( """INSERT INTO bq_metadata_cache (table_id, rows, size_bytes, partition_by, clustered_by, + entity_type, known_columns, refreshed_at, error_at, error_msg) - VALUES (?, NULL, NULL, NULL, NULL, NULL, ?, ?) + VALUES (?, NULL, NULL, NULL, NULL, NULL, NULL, NULL, ?, ?) ON CONFLICT (table_id) DO UPDATE SET error_at = excluded.error_at, error_msg = excluded.error_msg""", diff --git a/tests/test_api_query_guardrail.py b/tests/test_api_query_guardrail.py index f26a461..d5c6101 100644 --- a/tests/test_api_query_guardrail.py +++ b/tests/test_api_query_guardrail.py @@ -115,6 +115,47 @@ def test_query_over_cap_rejected_400(seeded_app, mock_dry_run, monkeypatch): any("ue" in t for t in detail.get("tables", [])) +def test_query_over_cap_against_view_includes_view_hint(seeded_app, mock_dry_run, monkeypatch): + """When the target table is classified as VIEW in bq_metadata_cache, + the cost-guard suggestion explicitly tells the analyst LIMIT does + not push into the view body — the literal #1 surprise from the + sub-agent test runs.""" + from src.db import get_system_db + from src.repositories.bq_metadata_cache import BqMetadataCacheRepository + + _register_bq_remote_row("ue_view", "finance", "ue_view") + # _register_bq_remote_row writes id = "bq.."; + # the cache row's table_id must match that ID, not the catalog name. + cached_id = "bq.finance.ue_view" + conn = get_system_db() + try: + BqMetadataCacheRepository(conn).upsert_success( + cached_id, rows=None, size_bytes=None, + partition_by=None, clustered_by=None, + entity_type="VIEW", known_columns=["event_date"], + ) + finally: + conn.close() + + mock_dry_run["bytes"] = 10 * 1024 * 1024 * 1024 + + c = seeded_app["client"] + token = seeded_app["admin_token"] + r = c.post( + "/api/query", + json={"sql": "SELECT * FROM ue_view LIMIT 1"}, + headers=_auth(token), + ) + assert r.status_code == 400, r.json() + detail = r.json()["detail"] + assert detail["reason"] == "remote_scan_too_large" + assert cached_id in detail.get("view_targets", []) + suggestion = detail["suggestion"] + assert "VIEW" in suggestion + assert "LIMIT" in suggestion + assert "snapshot create" in suggestion + + def test_no_bq_row_reference_skips_dry_run(seeded_app, monkeypatch): """A query that doesn't touch any registered BQ remote row must NOT invoke `_bq_dry_run_bytes` — guardrail incurs zero new latency on diff --git a/tests/test_bq_metadata_cache_repo.py b/tests/test_bq_metadata_cache_repo.py index 1b42a59..6d7288c 100644 --- a/tests/test_bq_metadata_cache_repo.py +++ b/tests/test_bq_metadata_cache_repo.py @@ -15,6 +15,8 @@ def test_upsert_success_inserts_then_updates(seeded_app): repo.upsert_success( "orders", rows=10, size_bytes=2048, partition_by="event_date", clustered_by=["country"], + entity_type="BASE TABLE", + known_columns=["event_date", "country", "amount"], ) row = repo.get("orders") assert row is not None @@ -22,6 +24,8 @@ def test_upsert_success_inserts_then_updates(seeded_app): assert row["size_bytes"] == 2048 assert row["partition_by"] == "event_date" assert row["clustered_by"] == ["country"] + assert row["entity_type"] == "BASE TABLE" + assert row["known_columns"] == ["event_date", "country", "amount"] assert row["refreshed_at"] is not None assert row["error_at"] is None @@ -158,3 +162,59 @@ def test_freshness_stale_beyond_threshold(): "error_at": None, } assert compute_freshness(row, now=now, fresh_threshold=3600) == "stale" + + +# ─── entity_type + known_columns ─────────────────────────────────────────── + + +def test_upsert_without_entity_type_or_known_columns(seeded_app): + """Legacy callers (or pre-fetch paths) may not have entity_type or + known_columns yet. Default-None must round-trip as None / None.""" + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.upsert_success( + "older", rows=1, size_bytes=1, + partition_by=None, clustered_by=None, + ) + row = repo.get("older") + assert row["entity_type"] is None + assert row["known_columns"] is None + finally: + conn.close() + + +def test_entity_type_view_is_round_tripped(seeded_app): + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.upsert_success( + "a_view", rows=None, size_bytes=None, + partition_by=None, clustered_by=None, + entity_type="VIEW", known_columns=["a", "b"], + ) + row = repo.get("a_view") + assert row["entity_type"] == "VIEW" + assert row["known_columns"] == ["a", "b"] + finally: + conn.close() + + +def test_known_columns_empty_list_distinct_from_none(seeded_app): + """An empty known_columns list (e.g. table exists but COLUMNS returned + nothing) must round-trip as ``[]`` not ``None``.""" + from src.db import get_system_db + conn = get_system_db() + try: + repo = BqMetadataCacheRepository(conn) + repo.upsert_success( + "empty_cols", rows=0, size_bytes=0, + partition_by=None, clustered_by=None, + entity_type="BASE TABLE", known_columns=[], + ) + row = repo.get("empty_cols") + assert row["known_columns"] == [] + finally: + conn.close() diff --git a/tests/test_connectors_bigquery_metadata.py b/tests/test_connectors_bigquery_metadata.py index fb6359e..bb661bb 100644 --- a/tests/test_connectors_bigquery_metadata.py +++ b/tests/test_connectors_bigquery_metadata.py @@ -18,6 +18,7 @@ def req(): def _bq_with_session(table_storage_rows=None, columns_rows=None, table_storage_raises=None, columns_raises=None, legacy_tables_rows=None, legacy_tables_raises=None, + entity_type_rows=None, entity_type_raises=None, projects_data="data-proj", projects_billing="billing-proj"): """Mock `BqAccess` whose `duckdb_session()` returns a context manager routing `.execute(...)` based on the inner SQL string.""" @@ -40,6 +41,14 @@ def _bq_with_session(table_storage_rows=None, columns_rows=None, return MagicMock( fetchall=lambda: columns_rows or [], ) + if "INFORMATION_SCHEMA.TABLES" in inner_sql: + # entity_type lookup added in 0.50.0 — order matters: this check + # must come BEFORE __TABLES__ because the substring overlaps. + if entity_type_raises: + raise entity_type_raises + return MagicMock( + fetchone=lambda: entity_type_rows[0] if entity_type_rows else None, + ) if "__TABLES__" in inner_sql: if legacy_tables_raises: raise legacy_tables_raises @@ -85,6 +94,7 @@ def test_happy_path_returns_full_metadata(req, monkeypatch): ("country", "STRING", "YES", "NO", 1), ("user_id", "STRING", "NO", "NO", None), ], + entity_type_rows=[("BASE TABLE",)], ) with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq): result = metadata.fetch(req) @@ -93,6 +103,8 @@ def test_happy_path_returns_full_metadata(req, monkeypatch): size_bytes=5_000_000, partition_by="event_date", clustered_by=["country"], + entity_type="BASE TABLE", + known_columns=["event_date", "country", "user_id"], ) @@ -120,6 +132,7 @@ def test_view_path_returns_metadata_with_null_rows_size(req, monkeypatch): columns_rows=[ ("event_date", "DATE", "NO", "YES", None), ], + entity_type_rows=[("VIEW",)], ) with patch("connectors.bigquery.metadata.get_bq_access", return_value=bq): result = metadata.fetch(req) @@ -127,6 +140,8 @@ def test_view_path_returns_metadata_with_null_rows_size(req, monkeypatch): assert result.rows is None assert result.size_bytes is None assert result.partition_by == "event_date" + assert result.entity_type == "VIEW" + assert result.known_columns == ["event_date"] def test_region_typo_falls_through_to_legacy_tables(req, monkeypatch): diff --git a/tests/test_scheduler_sidecar.py b/tests/test_scheduler_sidecar.py index ffc33a1..91db68f 100644 --- a/tests/test_scheduler_sidecar.py +++ b/tests/test_scheduler_sidecar.py @@ -29,6 +29,62 @@ def test_build_jobs_honors_bq_metadata_env_override(monkeypatch): assert jobs["bq-metadata-refresh"] == "every 2h" +def test_resolved_startup_grace_default(monkeypatch): + monkeypatch.delenv("SCHEDULER_STARTUP_GRACE_SECONDS", raising=False) + from services.scheduler.__main__ import resolved_startup_grace_seconds + assert resolved_startup_grace_seconds() == 60 + + +def test_resolved_startup_grace_zero_is_valid(monkeypatch): + """0 means "disable" — useful for unit tests / fast dev iterations.""" + monkeypatch.setenv("SCHEDULER_STARTUP_GRACE_SECONDS", "0") + from services.scheduler.__main__ import resolved_startup_grace_seconds + assert resolved_startup_grace_seconds() == 0 + + +def test_resolved_startup_grace_rejects_negative(monkeypatch): + monkeypatch.setenv("SCHEDULER_STARTUP_GRACE_SECONDS", "-1") + from services.scheduler.__main__ import resolved_startup_grace_seconds + with pytest.raises(ValueError): + resolved_startup_grace_seconds() + + +def test_resolved_startup_grace_rejects_empty(monkeypatch): + """Empty string is operator typo, not 'use default' — fail fast.""" + monkeypatch.setenv("SCHEDULER_STARTUP_GRACE_SECONDS", "") + from services.scheduler.__main__ import resolved_startup_grace_seconds + with pytest.raises(ValueError): + resolved_startup_grace_seconds() + + +def test_bq_metadata_initial_offset_within_cap(monkeypatch): + """Default cap is 900s. With a fixed RNG, the offset is deterministic + and bounded.""" + monkeypatch.delenv("SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS", raising=False) + import random + from services.scheduler.__main__ import resolved_bq_metadata_initial_offset_seconds + rng = random.Random(42) # deterministic + val = resolved_bq_metadata_initial_offset_seconds(rng=rng) + assert 0 <= val <= 900 + + +def test_bq_metadata_initial_offset_zero_cap_returns_zero(monkeypatch): + """Operator opt-out: setting cap to 0 disables the jitter.""" + monkeypatch.setenv("SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS", "0") + from services.scheduler.__main__ import resolved_bq_metadata_initial_offset_seconds + assert resolved_bq_metadata_initial_offset_seconds() == 0 + + +def test_bq_metadata_initial_offset_honors_custom_cap(monkeypatch): + monkeypatch.setenv("SCHEDULER_BQ_METADATA_INITIAL_OFFSET_MAX_SECONDS", "60") + import random + from services.scheduler.__main__ import resolved_bq_metadata_initial_offset_seconds + # Loop a few times since RNG could legitimately return 60. + for seed in range(20): + val = resolved_bq_metadata_initial_offset_seconds(rng=random.Random(seed)) + assert 0 <= val <= 60 + + def test_build_jobs_honors_env_overrides(monkeypatch): monkeypatch.setenv("SCHEDULER_DATA_REFRESH_INTERVAL", "1800") # 30m monkeypatch.setenv("SCHEDULER_HEALTH_CHECK_INTERVAL", "60") # 1m diff --git a/tests/test_v2_catalog_remote_metadata.py b/tests/test_v2_catalog_remote_metadata.py index 6c9751a..f91e1d8 100644 --- a/tests/test_v2_catalog_remote_metadata.py +++ b/tests/test_v2_catalog_remote_metadata.py @@ -32,6 +32,8 @@ def _seed_cache_row( size_bytes=None, partition_by=None, clustered_by=None, + entity_type=None, + known_columns=None, ): """Insert a successful refresh row into bq_metadata_cache.""" from src.db import get_system_db @@ -44,6 +46,8 @@ def _seed_cache_row( size_bytes=size_bytes, partition_by=partition_by, clustered_by=clustered_by, + entity_type=entity_type, + known_columns=known_columns, ) finally: conn.close() @@ -71,6 +75,8 @@ def test_remote_row_includes_metadata_fields(seeded_app): "orders", rows=10000, size_bytes=2_000_000, partition_by="event_date", clustered_by=["country", "platform"], + entity_type="BASE TABLE", + known_columns=["event_date", "country", "platform", "amount"], ) r = c.get( @@ -86,6 +92,100 @@ def test_remote_row_includes_metadata_fields(seeded_app): assert orders["clustered_by"] == ["country", "platform"] assert orders["query_mode"] == "remote" assert orders["metadata_freshness"] == "fresh" + assert orders["entity_type"] == "BASE TABLE" + # Both example templates apply: event_date present, country+platform present + assert "event_date > DATE '2026-01-01'" in orders["where_examples"] + assert "country_code = 'CZ' AND platform = 'web'" not in orders["where_examples"] + + +def test_where_examples_filtered_against_real_columns(seeded_app): + """Generic where_examples that reference columns the table doesn't + have must be dropped (the pre-fix bug the test suite is designed to + catch). unit_economics-style table has event_date but no country_code.""" + _reset_catalog_caches() + c = seeded_app["client"] + token = seeded_app["admin_token"] + _register_table( + seeded_app, + id="ue_like", source_type="bigquery", bucket="dwh_base", + source_table="unit_economics", query_mode="remote", + ) + _seed_cache_row( + "ue_like", + rows=None, size_bytes=None, + partition_by="event_date", clustered_by=[], + entity_type="VIEW", + # Real schema: event_date present, country_code absent. + known_columns=["event_date", "order_event_id", "merchant_country"], + ) + + r = c.get( + "/api/v2/catalog", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + row = next(t for t in r.json()["tables"] if t["id"] == "ue_like") + # event_date example passes (column exists). + assert "event_date > DATE '2026-01-01'" in row["where_examples"] + # country_code/platform example dropped (columns missing). + assert all("country_code" not in e for e in row["where_examples"]) + + +def test_view_returns_null_rows_and_size_bytes(seeded_app): + """For a VIEW we keep rows/size_bytes as null even if the cache row + has them populated — pre-existing cache rows from before the + entity_type field existed will fix themselves on next refresh.""" + _reset_catalog_caches() + c = seeded_app["client"] + token = seeded_app["admin_token"] + _register_table( + seeded_app, + id="ue_view", source_type="bigquery", bucket="dwh_base", + source_table="ue_view", query_mode="remote", + ) + # Provider would have set rows/size_bytes to None for views; we mirror + # that contract here in the cache row. + _seed_cache_row( + "ue_view", rows=None, size_bytes=None, + partition_by=None, clustered_by=[], + entity_type="VIEW", + known_columns=["event_date"], + ) + + r = c.get( + "/api/v2/catalog", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + row = next(t for t in r.json()["tables"] if t["id"] == "ue_view") + assert row["entity_type"] == "VIEW" + assert row["rows"] is None + assert row["size_bytes"] is None + assert row["rough_size_hint"] is None + + +def test_where_examples_empty_when_columns_unknown(seeded_app): + """For a remote row with no cache entry yet (never_fetched), don't + advertise any where_examples — we can't validate them against an + unknown schema.""" + _reset_catalog_caches() + c = seeded_app["client"] + token = seeded_app["admin_token"] + _register_table( + seeded_app, + id="unfetched", source_type="bigquery", bucket="dwh_base", + source_table="unfetched", query_mode="remote", + ) + + r = c.get( + "/api/v2/catalog", + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + row = next(t for t in r.json()["tables"] if t["id"] == "unfetched") + assert row["metadata_freshness"] == "never_fetched" + assert row["where_examples"] == [] + assert row["entity_type"] is None def test_remote_row_with_no_cache_returns_null_fields(seeded_app): @@ -156,7 +256,10 @@ def test_zero_size_bytes_reports_small_not_unknown(seeded_app): id="empty_t", source_type="bigquery", bucket="dwh_base", source_table="empty_t", query_mode="remote", ) - _seed_cache_row("empty_t", rows=0, size_bytes=0, clustered_by=[]) + _seed_cache_row( + "empty_t", rows=0, size_bytes=0, clustered_by=[], + entity_type="BASE TABLE", known_columns=["event_date"], + ) r = c.get( "/api/v2/catalog",