diff --git a/CHANGELOG.md b/CHANGELOG.md index 5027142..ab1b43f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,17 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +### Added + +- **BigQuery table registration via admin UI + CLI (issue #108 — Milestone 1).** Operators on a BigQuery instance can now register a BQ table or view as a remote DuckDB master view from `/admin/tables` or `da admin register-table`, without hand-editing `table_registry` or running the extractor by hand. The register modal branches on `data_source.type` server-side: BQ instances see Dataset / Source Table / View Name / Description / Folder / Sync Schedule; Keboola instances keep the discovery-driven flow. Submit runs `/api/admin/register-table/precheck` first (round-trips `bigquery.Client.get_table` to confirm the table exists and the SA can see it; surfaces row count + size + column count in the modal), then commits. The server validates BQ-specific shape (dataset / source_table / DuckDB-safe identifiers / GCP project_id grammar), forces `query_mode=remote` + `profile_after_sync=false`, and synchronously rebuilds `extract.duckdb` + master views with a 5s wall-clock budget — on overrun, the rebuild continues in a `BackgroundTask` and the API returns 202 with `{"status": "accepted", "view_name": ...}` instead of 200. View-name collisions (distinct from id collisions) return 409 to stop two callers from registering the same DuckDB view via different display names. `sync_schedule` is accepted and stored but not yet evaluated by the scheduler — see issue #79; addressed in Milestone 3 of #108. See `docs/DATA_SOURCES.md`. +- `POST /api/admin/register-table/precheck` — validation-only sibling of register-table. Returns `{"ok": true, "table": {rows, size_bytes, columns, …}}` for BQ rows after round-tripping `get_table`; surfaces NotFound → 404, Forbidden → 403, anything else → 400 with the GCP error verbatim. Also runs Pydantic validation for non-BQ source types so the CLI / UI gets a single endpoint shape. +- `--dry-run` flag on `da admin register-table` — calls `/precheck` and pretty-prints rows / bytes / columns; exits 0 on `ok`, 1 on validation or source-side error. +- Audit-log entries on every `register_table` / `update_table` / `unregister_table` mutation — closes the asymmetry where instance-config saves audited but registry mutations didn't (Decision 4 in #108). Secret-named fields in the request payload are masked as `***`; `description` is logged raw. + +### Fixed + +- `PUT /api/admin/registry/{id}` now preserves the original `registered_at` timestamp instead of resetting it to `now()` on every edit. `TableRegistryRepository.register` accepts `registered_at` as an optional kwarg; `update_table` re-passes the existing value from the row it just read. Closes #130. + ## [0.17.0] — 2026-04-29 ### Added @@ -23,14 +34,22 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C - **Scheduler `data-refresh` job 401-ed every 15 minutes** with `Missing or invalid Authorization header` because `SCHEDULER_API_TOKEN` was never propagated by `infra/modules/customer-instance/startup-script.sh.tpl`. The startup script now generates a 64-hex-char secret on first boot via `openssl rand -hex 32`, persists it across reboots by reading back from an existing `.env` (rotation requires explicit operator action — both containers must restart together), and writes it into `/opt/agnes/.env` alongside the other secrets. `app/main.py` seeds the matching synthetic user at startup so the very first cron tick has a valid actor to attribute audit-log entries to. Existing VMs need a one-time `sudo /opt/agnes/agnes-rotate-scheduler-token.sh` (or simply re-run the startup script via `terraform apply -replace='module.agnes.google_compute_instance.vm[""]'`); see migration note in this changelog or rerun the startup script manually. - **Non-root container couldn't write to host-bind-mounted `/data` after the v0.12.1 USER-agnes flip.** `infra/modules/customer-instance/startup-script.sh.tpl` now `chown -R 999:999 /data` after creating the persistent-disk subdirs (`state`, `analytics`, `extracts`). Without this, a freshly-attached PD is root-owned by default and `USER agnes` (uid 999) cannot open `/data/state/system.duckdb` for write — every authed request 500s with `IOException: Cannot open file ... Permission denied` while `/api/health` (which doesn't open the system DB) keeps returning 200, masking the failure from health-only monitoring. Regression first observed on `agnes-development` on 2026-04-29 after the auto-upgrade picked up `:stable` from the 0.12.1 release. **Existing VMs with PD-backed `/data` need a one-time host-side `sudo chown -R 999:999 /var/lib/docker/volumes/agnes_data/_data && sudo docker restart agnes-app-1 agnes-scheduler-1` to recover** — Terraform `metadata_startup_script` only runs on boot, so an apply alone does not retro-fix running VMs. - `Dockerfile` pins the `agnes` user to `uid:gid 999:999` explicitly (`useradd --uid 999`). Previously the uid was whatever Debian's `useradd --system` assigned next — happened to be 999 today, but a future base-image change picking 998 or 1000 would silently desync from the startup-script's `chown 999:999`, reintroducing the same incident. Pinning makes the contract grep-able from both sides. -- `scripts/smoke-test.sh` no longer silently SKIPs every authed check when `bootstrap` returns 403 (users exist) and `SMOKE_TOKEN` is not set — it now FAILs loudly. Also adds an unauthenticated DB-touching probe (`POST /auth/email/request`) before bootstrap, since `/api/health` deliberately doesn't open `system.duckdb` (kept cheap for LB probes) and so cannot detect filesystem/permission issues. The new probe catches the foundryai-development class of regression even on instances where bootstrap is closed. +- `scripts/smoke-test.sh` no longer silently SKIPs every authed check when `bootstrap` returns 403 (users exist) and `SMOKE_TOKEN` is not set — it now FAILs loudly. Also adds an unauthenticated DB-touching probe (`POST /auth/email/request`) before bootstrap, since `/api/health` deliberately doesn't open `system.duckdb` (kept cheap for LB probes) and so cannot detect filesystem/permission issues. The new probe catches a class of regression that bypasses health-only monitoring even on instances where bootstrap is closed. - Corporate memory pages (`/corporate-memory`, `/corporate-memory/admin`) now render the shared app header at full viewport width, matching the dashboard. Previously the `_app_header.html` include sat inside `.container-memory` (max-width: 1000px) and was cropped on wide viewports. - `release.yml` now publishes a `:dev-` + `:dev--latest` image when a fresh branch is pushed off `main` with no extra commits. Pre-fix, `paths-ignore` on the `push` event diffed the new ref against the default branch — a same-SHA branch had zero diff, every file matched paths-ignore, and the workflow was skipped, so a developer creating a personal branch off main to deploy main's exact state to their dev VM (which pins to `:dev--latest`) had to either commit something or trigger the workflow manually. The `build-and-push` job's `if` was also tightened to `main || workflow_dispatch` only, which prevented branch-push images regardless. Both fixed: added `create:` trigger (filtered to branch refs at the job level so tag creates don't double-build with `keboola-deploy.yml`), and broadened `build-and-push.if` to also publish on non-main branch pushes / branch creates. - Web header admin nav (All tokens, Marketplaces, Admin → Users / Groups / Resource access / Server config) is now visible to admin users again. Pre-fix, `_app_header.html` gated the admin block on `session.user.role == 'admin'`, but the v13 RBAC migration nulled `users.role` and moved admin authority onto `user_group_members` (Admin system group) — so the gate evaluated to false for everyone, including actual admins. `get_current_user` now injects `user["is_admin"]` (computed via `app.auth.access.is_user_admin`, the same call all server-side admin gates use), and the header reads `session.user.is_admin`. The role badge in the user-menu dropdown now reads "Admin" or hides — `users.role` is no longer surfaced in the UI. +- `admin_tables` register modal payload now matches the `RegisterTableRequest` API contract — drops the phantom `id` and `version` fields the modal used to send (the API silently dropped them), and renames `dataset` → `bucket` so the source-bucket actually persists. Pre-fix the operator's bucket / dataset edit looked saved but never made it past the wire. Edit + delete handlers in the same template were dropping the same fields and are also corrected. +- Discovery JS in `admin_tables` now handles the actual `{tables: [...]}` flat shape returned by `GET /api/admin/discover-tables`. Pre-fix the JS expected `{buckets: [...]}` (a shape the API never emitted) and silently rendered an empty discovery panel after the first call. +- **#108 review fixes for BigQuery register-table.** (a) The post-register materialize worker (BackgroundTask + 5s-timeout daemon thread) no longer captures the request-scoped DuckDB connection — it opens a fresh `get_system_db()` handle per run, so the request's `finally: conn.close()` no longer races the worker. (b) `connectors/bigquery/extractor.init_extract` is now serialized by a module-level `_INIT_EXTRACT_LOCK` so the timeout-fallback BackgroundTask cannot collide with the still-running daemon thread on the `extract.duckdb` swap. (c) `PUT /api/admin/registry/{id}` now runs the same BQ-shape validation as register when the merged record is a BigQuery row (or the patch flips it to BigQuery), returning 400/422 instead of silently persisting an unsafe `bucket` / `source_table` / project_id and breaking at the next rebuild. (d) `POST /api/admin/register-table` no longer carries a misleading `status_code=201` on the route decorator — the Keboola branch explicitly returns 201, the BigQuery branch returns 200 (sync) or 202 (timeout fallback), and OpenAPI now documents all three. +- **#108 round-4 review fix for BigQuery register-table.** `_validate_bigquery_register_payload` now applies the same raw-value rule to `bucket` and `source_table` as round-3 added for `name`. Pre-fix the helper validated `bucket.strip()` / `source_table.strip()` but `register_table` persisted the un-stripped value, so a `bucket=" my_dataset"` slipped through validation, got stored verbatim, and 500'd at the next rebuild when the BQ extractor spliced it into `ATTACH … AS bq_` and view DDL. The validator now rejects any `bucket` / `source_table` with leading/trailing whitespace and surfaces the offending raw value in the 400 detail. Applies identically to `POST /api/admin/register-table` and `POST /api/admin/register-table/precheck`. +- **#108 round-3 review fixes for BigQuery register-table.** (a) `_validate_bigquery_register_payload` now validates the **raw** view name (the value persisted to `table_registry.name` and read back by the BQ extractor), not a normalized `strip().lower().replace(" ", "_")` form. Pre-fix a name like `"my table"` passed validation (normalized `"my_table"` was safe), got stored verbatim, and then 500'd at the post-insert rebuild — defeating fast-fail-at-register. The validator now rejects any name with leading/trailing whitespace OR that fails the strict `^[a-zA-Z_][a-zA-Z0-9_]{0,63}$` check, and surfaces the offending raw value verbatim in the 400 response so the operator can retype with a corrected name. Server does NOT silently rewrite the input. Applies identically to `POST /api/admin/register-table` and `POST /api/admin/register-table/precheck`. (b) `_run_bigquery_materialize_with_timeout` now distinguishes worker-raised-within-budget (→ `{"status": "errors"}` → HTTP 500 with the exception in the body) from worker-still-running-at-timeout (→ `{"status": "timeout"}` → HTTP 202 + BackgroundTask retry). Pre-fix both outcomes mapped to "timeout" / 202, hiding the real failure for the budget window before the BG retry surfaced the same exception in the logs. (c) `register_table_precheck` is now a plain `def` (was `async def`) — the BQ branch makes synchronous `bigquery.Client(...)` / `client.get_table(...)` calls that would otherwise block the asyncio event loop on an async handler. Mirrors the same conversion already done for `register_table`. +- **#108 round-2 review fixes for BigQuery register-table.** (a) `POST /api/admin/register-table` is now a plain `def` (was `async def`) — the synchronous-materialize path waits on `threading.Event.wait()`, which blocks the asyncio event loop on an async handler and stalls every other request for up to the 5s budget. FastAPI runs sync handlers in a threadpool so the wait is harmless there. (b) `connectors/bigquery/extractor.rebuild_from_registry` now resolves `data_source.bigquery.project` via `app.instance_config.get_value` (deep-merge of static + writable overlay) instead of `config.loader.load_instance_config` (static only). Operators who set the project through `POST /api/admin/configure` got a silent rebuild failure pre-fix — validation passed (validation already used the overlay-aware read) but the rebuild reported "project missing" and the master view never appeared. (c) `register-table` now propagates `rebuild_from_registry` errors as **HTTP 500 with `{"status": "rebuild_failed", "errors": [...]}`** when the synchronous rebuild ran but reported an error (auth failure, missing project, unsafe identifier slipping the validator). Pre-fix those errors were silently logged and the API returned 200 ok. The BackgroundTask path now logs rebuild errors at ERROR level (was WARNING). (d) The admin tables UI's BigQuery register modal now splits precheck and register into two operator-driven clicks — Step 1 fires precheck and surfaces row count / size / column count in the modal AND swaps the primary button to "Register"; Step 2 fires the actual register call only when the operator clicks. Pre-fix the precheck and register fired in a single chained promise, so the operator never got to review the summary before the row was committed. (e) The Keboola register-modal payload now derives `source_table` from the discovered table's storage identifier (`t.id` minus the bucket prefix, e.g. `company` for `in.c-sfdc.company`) via a new hidden `regSourceTable` field. Pre-fix the JS sent `regTableName` (the human-friendly display name) as `source_table`; manual-entry callers fall back to the display name. (f) `da admin discover-and-register` accepts HTTP 200 / 201 / 202 as success (was 201 only); pre-fix every successful BigQuery row counted as an error because BQ register returns 200 (sync OK) or 202 (background) but never 201. ### Internal -- `release.yml` adds an `e2e-bind-mount` job that boots the freshly built image against a host-bind-mounted `/data` directory (instead of the named volume the existing `smoke-test` job uses). Docker initializes a fresh named volume by copying from the image's `/data` — which the Dockerfile chowns to `agnes:agnes` before flipping USER — so the named-volume path always works. The bind-mount path mirrors what GCE VMs run via `docker-compose.host-mount.yml`, and includes a negative assertion (write must fail on root-owned `/data` before the operator chown) plus a positive assertion (smoke passes after the chown). Locks in the contract that broke `agnes-development`: removing `chown 999:999` from `startup-script.sh.tpl` or changing the Dockerfile uid pin breaks CI. +- `_sanitize_for_audit` now masks against an explicit `_SECRET_FIELDS` allowlist instead of substring-scanning + maintaining a `primary_key` whitelist exception. New tests assert `not_actually_a_token` / `primary_key_hash` / `passwordless` flow through cleartext while known-secret fields (`keboola_token`, `client_secret`, `smtp_password`, `bot_token`) get masked. Operationally identical for the current registry payloads (no secret-bearing fields), but removes a class of false-positive / false-negative as the request body grows. +- `release.yml` adds an `e2e-bind-mount` job that boots the freshly built image against a host-bind-mounted `/data` directory (instead of the named volume the existing `smoke-test` job uses). Docker initializes a fresh named volume by copying from the image's `/data` — which the Dockerfile chowns to `agnes:agnes` before flipping USER — so the named-volume path always works. The bind-mount path mirrors what GCE VMs run via `docker-compose.host-mount.yml`, and includes a negative assertion (write must fail on root-owned `/data` before the operator chown) plus a positive assertion (smoke passes after the chown). Locks in the contract that broke a recent release: removing `chown 999:999` from `startup-script.sh.tpl` or changing the Dockerfile uid pin breaks CI. +- Extracted `bigquery.extractor.rebuild_from_registry()` from the `__main__` block of `connectors/bigquery/extractor.py` so the API can call it post-register without `runpy`-importing the module. The standalone CLI entrypoint (`python -m connectors.bigquery.extractor`) keeps working. ## [0.16.0] — 2026-04-29 diff --git a/app/api/admin.py b/app/api/admin.py index 075882d..a0c53f3 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -11,7 +11,7 @@ import threading import uuid from pathlib import Path -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, Field, field_validator from typing import Optional, List, Dict, Any import duckdb @@ -20,6 +20,11 @@ from app.auth.access import require_admin from app.auth.dependencies import _get_db from src.repositories.table_registry import TableRegistryRepository from src.repositories.audit import AuditRepository +from src.identifier_validation import ( + is_safe_identifier as _is_safe_identifier, + is_safe_quoted_identifier as _is_safe_quoted_identifier, +) +from src.sql_safe import is_safe_project_id as _is_safe_project_id logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/admin", tags=["admin"]) @@ -573,6 +578,69 @@ async def update_server_config( # --- End server-config editor ----------------------------------------------- +# Source types accepted by /api/admin/register-table. Anything else is +# rejected with 422 — keeps a typo'd source_type from silently landing in +# table_registry (where it would later confuse the orchestrator scan). +_VALID_SOURCE_TYPES: tuple[str, ...] = ("keboola", "bigquery", "jira", "local") + +# Explicit allowlist of audit-payload keys whose values are credentials and +# must be masked. Substring-scan + ad-hoc whitelist (the previous shape) is +# fragile in two ways: +# 1. False positive: legit fields like `primary_key` get masked because +# they contain "key" — we then need a whitelist exception, which has +# to be kept in sync as new fields are added. +# 2. False negative: a future field like `primary_key_hash` *would* be +# masked (defensible) but `not_actually_a_token` ALSO matches "token" +# and gets masked unnecessarily; conversely, a brand-new credential +# field that doesn't contain one of the patterns (`auth_material`, +# `bearer`) silently leaks. +# Allowlist puts the burden on the developer adding a new secret-bearing +# field: they must add the literal key name here, which forces a code- +# review touch on the audit path. Audit the current Pydantic models +# (RegisterTableRequest / UpdateTableRequest / ConfigureRequest / +# ServerConfigUpdateRequest) when extending — the registry payloads don't +# currently carry credentials, but ConfigureRequest does (`keboola_token`) +# and could be routed through this sanitizer in the future. +_SECRET_FIELDS: frozenset = frozenset({ + # ConfigureRequest — POST /api/admin/configure carries Keboola creds. + "keboola_token", + # Generic names that have appeared in earlier iterations of admin + # request bodies and could resurface — keep them masked defensively. + "api_token", + "auth_token", + "bot_token", + "client_secret", + "google_client_secret", + "google_oauth_client_secret", + "password", + "smtp_password", + "webapp_secret_key", + "bot_secret", + # Marketplace PATs (private repos) — see src/marketplace.py. + "marketplace_token", + "marketplace_pat", +}) + + +def _sanitize_for_audit(payload: Dict[str, Any]) -> Dict[str, Any]: + """Mask credential-bearing fields in a request payload before audit_log. + + Uses an explicit `_SECRET_FIELDS` allowlist (case-insensitive) instead + of substring matching. The trade-off is that adding a new secret field + requires updating the set — but that's the *point*: the test suite + asserts `not_actually_a_token` does NOT get masked, so a substring- + based regression would surface immediately, and a missing entry for a + real new credential gets caught at code review of the audit path. + """ + out: Dict[str, Any] = {} + for k, v in payload.items(): + if k.lower() in _SECRET_FIELDS: + out[k] = "***" if v not in (None, "") else "" + else: + out[k] = v + return out + + class RegisterTableRequest(BaseModel): name: str folder: Optional[str] = None @@ -595,6 +663,119 @@ class RegisterTableRequest(BaseModel): def _coerce_primary_key(cls, v): return _normalize_primary_key(v) + @field_validator("source_type", mode="before") + @classmethod + def _validate_source_type(cls, v): + # None is tolerated for backward compat with old CLI scripts that + # didn't set a source_type; the route resolves it later. Anything + # else must be in the canonical list. + if v in (None, ""): + return v + if v not in _VALID_SOURCE_TYPES: + raise ValueError( + f"source_type must be one of {sorted(_VALID_SOURCE_TYPES)}, got {v!r}" + ) + return v + + +def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: + """Enforce BQ-specific shape on a register/precheck request. + + Mutates the model: forces ``query_mode='remote'`` and + ``profile_after_sync=False`` (per Decision 7 in #108) so a caller can't + accidentally enqueue a parquet profiling pass for a remote view that + has no local file. Raises HTTPException(422) for missing required + fields and HTTPException(400) for unsafe identifiers / bogus project_id. + """ + if not req.bucket or not req.bucket.strip(): + raise HTTPException( + status_code=422, + detail="bigquery: 'bucket' (BQ dataset) is required", + ) + if not req.source_table or not req.source_table.strip(): + raise HTTPException( + status_code=422, + detail="bigquery: 'source_table' is required", + ) + # No wildcard / sharded BQ tables in M1 (Decision 8). + if "*" in (req.source_table or "") or "*" in (req.bucket or ""): + raise HTTPException( + status_code=400, + detail="bigquery: wildcard / sharded tables are not supported (see #108 M3+)", + ) + # Strict identifier on the DuckDB view name. CRITICAL: validate the RAW + # name (the value that ``register_table`` actually persists to + # ``table_registry.name`` and which the BQ extractor reads back as the + # DuckDB view name at next rebuild). Earlier revisions normalized first + # (``strip().lower().replace(" ", "_")``) and then checked, which let + # names like ``"my table"`` pass here, get stored verbatim, and then + # blow up inside ``_init_extract`` at view-create time — defeating the + # whole point of fast-fail-at-register. We do NOT silently rewrite the + # operator's name; if they typed ``"my table"``, return 400 with a + # clear message and let them retype with a corrected name. + raw_name = req.name or "" + if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name): + raise HTTPException( + status_code=400, + detail=( + f"bigquery: view name {raw_name!r} is unsafe — must match " + f"^[a-zA-Z_][a-zA-Z0-9_]{{0,63}}$ (DuckDB identifier rules) " + "with no leading/trailing whitespace" + ), + ) + # Same fast-fail rule as ``raw_name`` above: validate the RAW value the + # caller sent, not a stripped form. ``register_table`` persists ``bucket`` + # / ``source_table`` verbatim, and the BQ extractor splices them straight + # into the ``ATTACH … AS bq_`` and view DDL at next rebuild — so a + # value with leading/trailing whitespace passes validation here, gets + # stored as-is, and explodes inside DuckDB at view-create time. Surface + # the offending raw value in the 400 detail and let the operator retype. + raw_bucket = req.bucket + if raw_bucket.strip() != raw_bucket or not _is_safe_quoted_identifier(raw_bucket): + raise HTTPException( + status_code=400, + detail=( + f"bigquery: dataset {raw_bucket!r} is unsafe (only [A-Za-z0-9_.-] " + "allowed, no leading/trailing whitespace)" + ), + ) + raw_source_table = req.source_table + if raw_source_table.strip() != raw_source_table or not _is_safe_quoted_identifier(raw_source_table): + raise HTTPException( + status_code=400, + detail=( + f"bigquery: source_table {raw_source_table!r} is unsafe (only " + "[A-Za-z0-9_.-] allowed, no leading/trailing whitespace)" + ), + ) + # Pull project from instance.yaml — single-project model in M1 + # (Decision: no per-table project field). Validate the format here so + # we surface a config issue at registration rather than at first + # rebuild, where the operator no longer has a request to look at. + from app.instance_config import get_value + project_id = get_value("data_source", "bigquery", "project", default="") + if not project_id: + raise HTTPException( + status_code=400, + detail=( + "bigquery: data_source.bigquery.project is not set in instance.yaml; " + "configure it via /admin/server-config or /api/admin/configure first" + ), + ) + if not _is_safe_project_id(project_id): + raise HTTPException( + status_code=400, + detail=( + f"bigquery: data_source.bigquery.project {project_id!r} is malformed — " + "must match GCP project_id grammar ^[a-z][a-z0-9-]{4,28}[a-z0-9]$" + ), + ) + # Force the BQ-required mode + flag (Decision 7). The orchestrator and + # extractor both assume remote; persisting `local` here would later create + # a profiling job against a non-existent parquet file. + req.query_mode = "remote" + req.profile_after_sync = False + class UpdateTableRequest(BaseModel): name: Optional[str] = None @@ -661,13 +842,208 @@ async def list_registry( return {"tables": tables, "count": len(tables)} -@router.post("/register-table", status_code=201) -async def register_table( +# Wall-clock budget for the synchronous BQ materialization that runs after +# a successful BQ register. If the rebuild + view creation exceeds this, +# we hand the rest off to BackgroundTasks and return 202. 5s matches the +# UX contract in #108 ("Queryable as within seconds") — long enough +# to cover a healthy GCE round-trip, short enough that a hung GCE call +# doesn't park the request handler. +_BQ_SYNC_REGISTER_TIMEOUT_S: float = 5.0 + + +def _materialize_bigquery_extract() -> Dict[str, Any]: + """Re-build the BigQuery extract.duckdb + master views. + + Wrapper used by both the synchronous (in-band) and async (BackgroundTask) + code paths after a BQ register/update/delete. Imports kept inside the + function so non-BQ instances don't pay the import cost on app start. + + Opens a FRESH system DB connection rather than reusing the request-scoped + one. The request handler closes its connection in a `finally` after the + response, but BackgroundTask + the timeout-fallback daemon thread can + both outlive that close — they would then operate on a closed handle (or + one being torn down concurrently). A fresh handle is cheap (DuckDB is an + embedded engine) and isolates the worker's lifetime from the request's. + + Returns the rebuild result dict (``{"errors": [...], "tables_registered": + N, ...}``) so the synchronous caller can propagate failures to the + operator. Background-task callers ignore the return value, but the loud + log inside ``_run_bigquery_materialize_with_timeout`` covers that path. + """ + from connectors.bigquery import extractor as _bq_extractor + from src.db import get_system_db + from src.orchestrator import SyncOrchestrator + + fresh_conn = get_system_db() + try: + result = _bq_extractor.rebuild_from_registry(conn=fresh_conn) + SyncOrchestrator().rebuild() + return result or {} + finally: + try: + fresh_conn.close() + except Exception: + pass + + +def _materialize_bigquery_extract_bg() -> None: + """BackgroundTask wrapper around `_materialize_bigquery_extract`. + + BackgroundTasks discard return values, but `rebuild_from_registry` can + surface auth / config / identifier errors via the ``errors`` list. Log + those at ERROR level so the failure is loud in the operator's logs even + though the 202 response can't carry the detail (Decision 3 in #108: a + 202 is documented as "accepted, may not be queryable yet" — we don't + block on it but we shouldn't swallow it either). + """ + try: + result = _materialize_bigquery_extract() + except Exception: + logger.exception("BQ post-register background materialize crashed") + return + errors = (result or {}).get("errors") or [] + if errors: + logger.error( + "BQ post-register background materialize completed with %d error(s): %s", + len(errors), errors, + ) + + +def _run_bigquery_materialize_with_timeout( + background: BackgroundTasks, +) -> Dict[str, Any]: + """Try to materialize synchronously within the wall-clock budget. + + Returns a dict with: + - ``status`` ∈ {"ok", "errors", "timeout"} — caller maps to HTTP code + - ``errors``: list of {table, error} surfaced by ``rebuild_from_registry`` + (only present on ``status="errors"``) + + Mapping by caller (`register_table`): + - "ok" → 200 (synchronous success) + - "errors" → 500 (rebuild ran but reported errors — propagate so + the operator knows the registry row exists but the + view wasn't created) + - "timeout" → 202 (rebuild still running on a BackgroundTask) + + The synchronous worker runs on a daemon thread (so a hung GCE call + can't park the request) that opens its OWN system DB connection (see + `_materialize_bigquery_extract`). Even though FastAPI now invokes the + sync route in a threadpool — and `done.wait()` no longer blocks the + event loop — we still off-load to a daemon so the wait is bounded + even if `rebuild_from_registry` ignores its own timeouts. + """ + import threading + + done = threading.Event() + err_holder: Dict[str, Any] = {} + result_holder: Dict[str, Any] = {} + + def _worker(): + try: + result_holder["result"] = _materialize_bigquery_extract() + except Exception as e: # pragma: no cover — logged below + err_holder["error"] = e + finally: + done.set() + + t = threading.Thread(target=_worker, daemon=True, name="bq-register-rebuild") + t.start() + finished = done.wait(_BQ_SYNC_REGISTER_TIMEOUT_S) + + if finished: + if "error" in err_holder: + # Worker finished within the wall-clock budget but raised. This + # is a HARD ERROR, not a timeout — surface it as such so the + # operator gets the actual exception in the 500 body instead + # of a misleading 202 + "still working in the background". + # Earlier revisions returned ``{"status": "timeout"}`` here, + # which the register handler then mapped to 202 + a retry + # BackgroundTask; that hid the real failure for `_BQ_SYNC_ + # REGISTER_TIMEOUT_S` seconds before the BG retry surfaced + # the same exception in the logs. + exc = err_holder["error"] + logger.error( + "BQ post-register rebuild raised within budget: %r", + exc, + ) + return { + "status": "errors", + "errors": [{"error": f"{type(exc).__name__}: {exc}"}], + } + # Synchronous worker finished cleanly — but check whether + # `rebuild_from_registry` itself surfaced any errors (auth fail, + # missing project from the overlay, unsafe identifier slipping the + # validator, etc.). Without this, those errors got silently logged + # and the API claimed success. + result = result_holder.get("result") or {} + errors = result.get("errors") or [] + if errors: + logger.error( + "BQ post-register rebuild reported %d error(s): %s", + len(errors), errors, + ) + return {"status": "errors", "errors": errors} + return {"status": "ok"} + + # Timed out — let the worker keep running on its thread (already daemon) + # and also schedule a BackgroundTask so the orchestrator gets called via + # the supported FastAPI path. `_INIT_EXTRACT_LOCK` in the BQ extractor + # serializes the two file-swap calls so the slow daemon thread and the + # background task can't tear `extract.duckdb`; the orchestrator's own + # `_rebuild_lock` protects the master-view rebuild step downstream. + logger.info( + "BQ post-register rebuild exceeded %ss budget — handing off to BackgroundTask", + _BQ_SYNC_REGISTER_TIMEOUT_S, + ) + background.add_task(_materialize_bigquery_extract_bg) + return {"status": "timeout"} + + +@router.post( + "/register-table", + responses={ + 200: {"description": "BigQuery row registered + materialized synchronously"}, + 201: {"description": "Non-BigQuery row registered (no post-insert materialize)"}, + 202: {"description": "BigQuery row registered; materialize continues in background"}, + 409: {"description": "Table id or view name already in use"}, + 500: {"description": "BigQuery row registered but post-insert rebuild failed"}, + }, +) +def register_table( request: RegisterTableRequest, + background: BackgroundTasks, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Register a new table in the system.""" + """Register a new table in the system. + + Behavior by source_type: + - **bigquery**: validates BQ-specific shape (dataset / source_table / + identifier safety / project_id format), forces query_mode='remote' and + profile_after_sync=False, then synchronously rebuilds extract.duckdb + + master views with a wall-clock budget. Returns 200 with the view name + on success, 202 on budget overrun (rebuild continues in a + BackgroundTask), or 500 if the synchronous rebuild ran but reported + an error (e.g. auth failure, missing project, unsafe identifier). + - other source types: insert-only, no post-register hook. Returns 201. + + Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a + threadpool — the synchronous-materialize path waits on + ``threading.Event.wait()``, which would otherwise block the asyncio + event loop and stall every other request for up to ``_BQ_SYNC_REGISTER_ + TIMEOUT_S``. ``Depends(...)``, ``BackgroundTasks``, and + ``JSONResponse`` all work the same in sync handlers; the rest of the + admin module mixes both styles already. + + The route does NOT carry a default ``status_code`` — each branch returns + its own JSONResponse with the right code. A blanket ``status_code=201`` + on the decorator would mislead OpenAPI consumers about the BQ branch. + + Always: 409 on view-name collision against the existing registry, audit + log entry on success. + """ + from fastapi.responses import JSONResponse if not request.name or not request.name.strip(): raise HTTPException(status_code=422, detail="Table name cannot be empty") repo = TableRegistryRepository(conn) @@ -676,6 +1052,30 @@ async def register_table( if repo.get(table_id): raise HTTPException(status_code=409, detail=f"Table '{table_id}' already registered") + # View-name collision pre-check — distinct from id collision above. + # `id` is derived from `name`, but two callers could legally pick + # different display names that lower-case + slugify to the same view + # (e.g. "Orders v2" + "orders_v2"); the strict view-name uniqueness + # check stops that here, before the orchestrator surfaces it as a + # silent overwrite at next rebuild. + existing_by_name = next( + (r for r in repo.list_all() if (r.get("name") or "") == request.name), + None, + ) + if existing_by_name is not None: + raise HTTPException( + status_code=409, + detail=f"View name '{request.name}' is already in use by table id '{existing_by_name.get('id')}'", + ) + + # BQ rows go through the extra validation + post-insert materialization + # contract from issue #108. Other source types keep the legacy insert-only + # flow — Keboola materialization happens via the scheduled sync, Jira via + # webhook, local via a manual extractor run. + is_bigquery = request.source_type == "bigquery" + if is_bigquery: + _validate_bigquery_register_payload(request) + repo.register( id=table_id, name=request.name, @@ -692,43 +1092,307 @@ async def register_table( profile_after_sync=request.profile_after_sync, ) - return {"id": table_id, "name": request.name, "status": "registered"} + # Audit entry — masked params; description kept raw (it's documentation). + AuditRepository(conn).log( + user_id=user.get("id"), + action="register_table", + resource=table_id, + params=_sanitize_for_audit(request.model_dump()), + ) + + if not is_bigquery: + # Keboola / Jira / local rows are insert-only here. 201 Created — the + # decorator no longer carries a default status, so each branch is + # explicit about its code (BQ branch overrides via JSONResponse). + return JSONResponse( + status_code=201, + content={"id": table_id, "name": request.name, "status": "registered"}, + ) + + # BQ post-register: rebuild extract + master views, with timeout fallback. + # Decision 1: 200 on synchronous success, 202 on timeout, 500 if the + # synchronous rebuild surfaced errors. Distinct from the 201 Keboola + # path above, so the BQ branch builds its own response. + outcome = _run_bigquery_materialize_with_timeout(background) + status = outcome.get("status") + if status == "ok": + return JSONResponse( + status_code=200, + content={ + "id": table_id, + "name": request.name, + "status": "ok", + "view_name": table_id, + }, + ) + if status == "errors": + # Registry insert succeeded but the post-insert rebuild reported + # errors — the row is in the registry but the master view was NOT + # created. Surface the failure verbatim so the operator can fix + # the underlying config (typically a missing + # `data_source.bigquery.project` in the overlay or auth that lacks + # bigquery.metadata.get on the dataset). The row stays in the + # registry; a re-run after fixing the config picks up the existing + # row and creates the view on the next register/update or + # scheduler tick. + return JSONResponse( + status_code=500, + content={ + "id": table_id, + "name": request.name, + "status": "rebuild_failed", + "view_name": table_id, + "errors": outcome.get("errors") or [], + "message": ( + "Registry row created but post-insert rebuild failed; " + "view is not queryable. See `errors` for details." + ), + }, + ) + # Default: timeout — rebuild continues on a BackgroundTask. + return JSONResponse( + status_code=202, + content={ + "id": table_id, + "name": request.name, + "status": "accepted", + "view_name": table_id, + "message": "Registration accepted; materializing in background", + }, + ) + + +class PrecheckResponse(BaseModel): + """Response model for /api/admin/register-table/precheck. + + Documented here so OpenAPI consumers know what to expect; the route + returns a plain dict for backwards compatibility with the rest of the + admin API which doesn't use response_model. + """ + ok: bool + table: Dict[str, Any] + + +@router.post("/register-table/precheck") +def register_table_precheck( + request: RegisterTableRequest, + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Validate a register-table payload + (BQ only) confirm the source table exists. + + No DB write. Used by the UI to surface row count + size + column count + in the modal before the operator clicks Register, and by the CLI's + ``--dry-run`` to print what *would* be registered without touching + state. Identical Pydantic validation to register-table; for BQ rows we + additionally make a ``bigquery.Client(project).get_table(...)`` call + and surface the GCP error verbatim. + + Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a + threadpool — the BQ branch makes synchronous ``bigquery.Client(...)`` + /``client.get_table(...)`` calls, which would otherwise block the + asyncio event loop and stall every other request for the duration of + the GCE round-trip. Mirrors the same conversion done for + ``register_table`` (see comment on that route). ``Depends(...)`` works + identically in sync handlers. + """ + if not request.name or not request.name.strip(): + raise HTTPException(status_code=422, detail="Table name cannot be empty") + + if request.source_type != "bigquery": + # M1 only adds BQ-specific precheck. Other source types get a + # validation-only response so the CLI / UI can rely on the same + # endpoint shape across types. + return { + "ok": True, + "table": { + "name": request.name, + "source_type": request.source_type, + "bucket": request.bucket, + "source_table": request.source_table, + "rows": None, + "size_bytes": None, + "columns": [], + "note": "precheck for non-bigquery sources is validation-only in M1", + }, + } + + # BQ-specific shape validation (forces query_mode/profile_after_sync, + # checks identifier safety, validates project_id from instance.yaml). + _validate_bigquery_register_payload(request) + + # Round-trip the BQ jobs API to confirm the table exists and the SA can + # see it. Imports kept local to avoid pulling google-cloud-bigquery into + # the import chain on non-BQ instances. + try: + from google.cloud import bigquery # noqa: PLC0415 + from google.api_core import exceptions as google_exc # noqa: PLC0415 + except ImportError as e: + raise HTTPException( + status_code=500, + detail=( + "google-cloud-bigquery not installed; install the bigquery " + f"extras to use BQ precheck ({e})" + ), + ) from e + + from app.instance_config import get_value + project_id = get_value("data_source", "bigquery", "project", default="") + dataset = (request.bucket or "").strip() + source_table = (request.source_table or "").strip() + fq = f"{project_id}.{dataset}.{source_table}" + + try: + client = bigquery.Client(project=project_id) + bq_table = client.get_table(fq) + except google_exc.NotFound as e: + raise HTTPException(status_code=404, detail=f"BigQuery table not found: {fq} ({e})") from e + except google_exc.Forbidden as e: + raise HTTPException( + status_code=403, + detail=( + f"BigQuery access denied for {fq}: {e}. " + "Service account needs bigquery.metadata.get on the dataset." + ), + ) from e + except Exception as e: + # Auth errors, transient 5xx, malformed table refs — surface as 400 + # so the operator gets the GCP error verbatim and can fix their + # config without us guessing the right HTTP code. + raise HTTPException(status_code=400, detail=f"BigQuery precheck failed for {fq}: {e}") from e + + columns = [ + {"name": f.name, "type": f.field_type} + for f in (bq_table.schema or []) + ] + return { + "ok": True, + "table": { + "name": request.name, + "source_type": "bigquery", + "bucket": dataset, + "source_table": source_table, + "project_id": project_id, + "rows": int(bq_table.num_rows or 0), + "size_bytes": int(bq_table.num_bytes or 0), + "columns": columns, + "column_count": len(columns), + }, + } @router.put("/registry/{table_id}") async def update_table( table_id: str, request: UpdateTableRequest, + background: BackgroundTasks, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Update a registered table's configuration.""" + """Update a registered table's configuration. + + For BQ rows, schedules a background rebuild so the master view picks + up changes (e.g. a renamed dataset) without waiting for the next + scheduled sync. + """ repo = TableRegistryRepository(conn) - if not repo.get(table_id): + existing = repo.get(table_id) + if not existing: raise HTTPException(status_code=404, detail="Table not found") updates = {k: v for k, v in request.model_dump().items() if v is not None} + # Run BQ-shape validation BEFORE persisting whenever the merged record + # would be a bigquery row (existing was BQ, or the patch flips it to BQ, + # or the patch touches BQ-relevant fields on an already-BQ row). Without + # this gate, an admin could PUT `bucket="evil\"; DROP --"` onto a BQ + # row and the next rebuild would silently fail at view-create time — + # surface the bad shape at PUT time instead. if updates: - existing = repo.get(table_id) - merged = {k: v for k, v in existing.items() if k != "registered_at"} + # Preserve the original `registered_at` across PUTs — `repo.register` + # now accepts it as an optional kwarg; without this the upsert would + # stamp a fresh `now()` on every edit (issue #130). + merged = dict(existing) merged.update(updates) merged.pop("id", None) # avoid duplicate id kwarg + + if merged.get("source_type") == "bigquery": + # Reuse the register-time validator. It mutates the request to + # force query_mode='remote' / profile_after_sync=False — apply + # the same coercion to `merged` so the persisted row matches. + synthetic = RegisterTableRequest( + name=merged.get("name") or table_id, + bucket=merged.get("bucket"), + source_table=merged.get("source_table"), + source_type="bigquery", + query_mode=merged.get("query_mode") or "remote", + profile_after_sync=bool(merged.get("profile_after_sync") or False), + primary_key=merged.get("primary_key"), + description=merged.get("description"), + folder=merged.get("folder"), + sync_strategy=merged.get("sync_strategy") or "full_refresh", + sync_schedule=merged.get("sync_schedule"), + ) + _validate_bigquery_register_payload(synthetic) + merged["query_mode"] = synthetic.query_mode + merged["profile_after_sync"] = synthetic.profile_after_sync + repo.register(id=table_id, **merged) + + AuditRepository(conn).log( + user_id=user.get("id"), + action="update_table", + resource=table_id, + params=_sanitize_for_audit({"updated_fields": sorted(updates.keys()), **updates}), + ) + + # If we updated a BQ row (or one that's now BQ), refresh the extract in + # the background so the view picks up renames / column-list changes. + # Use the BG wrapper so any rebuild errors are logged at ERROR level + # instead of being silently dropped by BackgroundTasks (which discards + # return values). + after = repo.get(table_id) or {} + if after.get("source_type") == "bigquery": + background.add_task(_materialize_bigquery_extract_bg) + return {"id": table_id, "updated": list(updates.keys())} @router.delete("/registry/{table_id}", status_code=204) async def unregister_table( table_id: str, + background: BackgroundTasks, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): - """Unregister a table from the system.""" + """Unregister a table from the system. + + For BQ rows, schedules a background rebuild so the dropped row's + master view is removed from analytics.duckdb (rather than hanging + around until the next scheduled sync). + """ repo = TableRegistryRepository(conn) - if not repo.get(table_id): + existing = repo.get(table_id) + if not existing: raise HTTPException(status_code=404, detail="Table not found") + + was_bigquery = existing.get("source_type") == "bigquery" repo.unregister(table_id) + AuditRepository(conn).log( + user_id=user.get("id"), + action="unregister_table", + resource=table_id, + params=_sanitize_for_audit({ + "name": existing.get("name"), + "source_type": existing.get("source_type"), + "bucket": existing.get("bucket"), + "source_table": existing.get("source_table"), + }), + ) + + if was_bigquery: + background.add_task(_materialize_bigquery_extract_bg) + @router.post("/configure") async def configure_instance( diff --git a/app/web/router.py b/app/web/router.py index 9278568..09ea1cb 100644 --- a/app/web/router.py +++ b/app/web/router.py @@ -617,9 +617,18 @@ async def admin_tables( conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): from src.repositories.table_registry import TableRegistryRepository + from app.instance_config import get_data_source_type repo = TableRegistryRepository(conn) tables = repo.list_all() - ctx = _build_context(request, user=user, registered_tables=tables) + # Branch the register-modal layout server-side so the JS doesn't have + # to round-trip /api/admin/server-config to learn the source type. + data_source_type = get_data_source_type() or "keboola" + ctx = _build_context( + request, + user=user, + registered_tables=tables, + data_source_type=data_source_type, + ) return templates.TemplateResponse(request, "admin_tables.html", ctx) diff --git a/app/web/templates/_app_header.html b/app/web/templates/_app_header.html index f1195dd..5f5a4d6 100644 --- a/app/web/templates/_app_header.html +++ b/app/web/templates/_app_header.html @@ -13,9 +13,8 @@ Dashboard Install CLI {% if session.user.is_admin %} - All tokens Marketplaces - {% set _admin_active = _path.startswith('/admin/users') or _path.startswith('/admin/groups') or _path.startswith('/admin/access') or _path.startswith('/admin/server-config') %} + {% set _admin_active = _path.startswith('/admin/tables') or _path.startswith('/admin/tokens') or _path.startswith('/admin/users') or _path.startswith('/admin/groups') or _path.startswith('/admin/access') or _path.startswith('/admin/server-config') %}