When admin registers a materialized BQ row with bucket+source_table but no source_query, the server generates 'SELECT * FROM `<project>.<ds>.<tbl>`' from instance.yaml's configured BQ project. Same fallback fires on PUT when flipping to materialized. The backtick rejection guard, which was appropriate for DuckDB-flavor source_query, is relaxed for materialized rows since the new wrapping path (Task 2) runs admin SQL through BQ jobs API which uses BQ-native syntax (backticks for dashed identifiers).
2711 lines
116 KiB
Python
2711 lines
116 KiB
Python
"""Admin endpoints — table discovery, registry management, instance configuration.
|
|
|
|
Every gate on this router uses ``require_admin`` from ``app.auth.access``,
|
|
which checks Admin user_group membership for both OAuth session and PAT
|
|
callers via the same ``_user_group_ids`` lookup.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import threading
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
|
|
from pydantic import BaseModel, Field, field_validator, model_validator
|
|
from typing import Optional, List, Dict, Any
|
|
import duckdb
|
|
|
|
from app.auth.access import require_admin
|
|
from app.auth.dependencies import _get_db
|
|
from src.repositories.table_registry import TableRegistryRepository
|
|
from src.repositories.audit import AuditRepository
|
|
from src.identifier_validation import (
|
|
is_safe_identifier as _is_safe_identifier,
|
|
is_safe_quoted_identifier as _is_safe_quoted_identifier,
|
|
)
|
|
from src.sql_safe import is_safe_project_id as _is_safe_project_id
|
|
from src.scheduler import is_valid_schedule
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
|
|
|
# Serializes the read-modify-write of state/instance.yaml across the two
|
|
# endpoints that mutate the overlay (POST /server-config and POST /configure).
|
|
# Without it, two admins saving concurrently would each read the same overlay
|
|
# snapshot, merge their disjoint patches, and the second os.replace would silently
|
|
# drop the first patch. Single-process FastAPI workers; multi-worker deployments
|
|
# would need an OS-level file lock — documented limitation.
|
|
_overlay_write_lock = threading.Lock()
|
|
|
|
# SSRF protection: reject private/internal URLs for keboola_url
|
|
import ipaddress as _ipaddress
|
|
import socket as _socket
|
|
from urllib.parse import urlparse as _urlparse
|
|
|
|
|
|
def _validate_url_not_private(url: str, field_name: str = "url") -> None:
|
|
"""Raise 400 if the URL host points to a private/reserved network.
|
|
|
|
Uses DNS resolution + ipaddress checks instead of hostname regex,
|
|
which correctly handles all IPv4/IPv6 addresses including abbreviated
|
|
forms (fe80::1, ::1, etc.) and DNS rebinding (resolves at check time).
|
|
"""
|
|
try:
|
|
parsed = _urlparse(url)
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail=f"Invalid {field_name}: not a valid URL")
|
|
host = parsed.hostname or ""
|
|
if not host:
|
|
raise HTTPException(status_code=400, detail=f"Invalid {field_name}: missing hostname")
|
|
|
|
# Reject well-known dangerous hostnames before DNS resolution
|
|
if host.lower() in ("localhost", "localhost.localdomain"):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid {field_name}: must not point to a private or reserved network",
|
|
)
|
|
|
|
# Resolve hostname to IP addresses and check each one
|
|
try:
|
|
addrinfos = _socket.getaddrinfo(host, None, proto=_socket.IPPROTO_TCP)
|
|
except Exception:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid {field_name}: could not resolve hostname",
|
|
)
|
|
|
|
for family, _type, _proto, _canonname, sockaddr in addrinfos:
|
|
ip_str = sockaddr[0]
|
|
try:
|
|
ip = _ipaddress.ip_address(ip_str)
|
|
except ValueError:
|
|
continue
|
|
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid {field_name}: must not point to a private or reserved network",
|
|
)
|
|
|
|
|
|
def _normalize_primary_key(v):
|
|
"""Coerce a string primary_key to ``[v]`` for backward compatibility.
|
|
|
|
The 0.14.0 contract is ``Optional[List[str]]`` so composite primary keys
|
|
(e.g. session-grain tables keyed on ``(session_id, event_date)``) round-
|
|
trip cleanly. Pre-0.14.0 callers sent a single string; Pydantic v2
|
|
refuses to coerce, so without this validator a CLI script posting
|
|
``"primary_key": "session_id"`` would now hit a 422. Wrap a bare string
|
|
in a one-element list so old and new callers both work.
|
|
"""
|
|
if v is None:
|
|
return v
|
|
if isinstance(v, str):
|
|
return [v]
|
|
return v
|
|
|
|
|
|
# Patches to these section paths must pass _validate_url_not_private. The
|
|
# tuple is `(section, *intermediate_keys, leaf_key)` — same SSRF gate the
|
|
# /configure wizard applies to keboola_url, so an admin can't sneak
|
|
# http://169.254.169.254/ in via the server-config editor's data_source patch.
|
|
#
|
|
# Intentionally NOT included: ``("ai", "base_url")``. The openai_compat
|
|
# provider legitimately points at internal services (LiteLLM proxy on a
|
|
# private network, on-cluster vLLM endpoint, etc.) — see
|
|
# config/instance.yaml.example "LiteLLM proxy" example. SSRF blocking
|
|
# would break those valid setups. Operators with stricter posture should
|
|
# enforce the constraint upstream (firewall / egress proxy allowlist).
|
|
# Devin ANALYSIS_0001 on PR #141 5f649a4 review.
|
|
_URL_BEARING_FIELDS: tuple[tuple[str, ...], ...] = (
|
|
("data_source", "keboola", "stack_url"),
|
|
)
|
|
|
|
|
|
def _validate_urls_in_patch(sections: Dict[str, Dict[str, Any]]) -> None:
|
|
"""Apply SSRF protection to every URL-bearing field present in the patch.
|
|
|
|
Walks each registered ``(section, *path, leaf)`` against the incoming
|
|
patch and runs ``_validate_url_not_private`` on any string value found.
|
|
Missing intermediate keys / non-dict nodes are silently skipped — the
|
|
patch hasn't touched that field, no validation needed.
|
|
"""
|
|
for path in _URL_BEARING_FIELDS:
|
|
section = path[0]
|
|
if section not in sections:
|
|
continue
|
|
node: Any = sections[section]
|
|
for key in path[1:-1]:
|
|
if not isinstance(node, dict) or key not in node:
|
|
node = None
|
|
break
|
|
node = node[key]
|
|
if isinstance(node, dict):
|
|
value = node.get(path[-1])
|
|
if isinstance(value, str) and value:
|
|
_validate_url_not_private(value, field_name=".".join(path))
|
|
|
|
|
|
# --- Server-config (instance.yaml) editor -----------------------------------
|
|
#
|
|
# The /admin/server-config UI POSTs a partial dict here keyed by section
|
|
# (instance, data_source, email, telegram, jira, theme, server, auth) with
|
|
# the field values to merge into instance.yaml. Each save:
|
|
# 1. Loads the current instance.yaml (writable overlay first, then static).
|
|
# 2. Deep-merges the patch on top.
|
|
# 3. Writes to DATA_DIR/state/instance.yaml (the writable overlay).
|
|
# 4. Writes one audit_log entry tagged `instance_config.update` containing
|
|
# a sanitized diff (secret-looking keys are masked).
|
|
# Hot-reload is OUT OF SCOPE for #91 — the response carries
|
|
# `restart_required=True` so the UI can show the banner.
|
|
|
|
# Sections an admin can mutate. Keep the list explicit so a typo'd section
|
|
# in the request body is rejected loudly instead of being silently merged
|
|
# into the YAML root and confusing future loads.
|
|
_EDITABLE_SECTIONS: tuple[str, ...] = (
|
|
"instance",
|
|
"data_source",
|
|
"email",
|
|
"telegram",
|
|
"jira",
|
|
"theme",
|
|
"server",
|
|
"auth",
|
|
"ai",
|
|
"openmetadata",
|
|
"desktop",
|
|
"corporate_memory",
|
|
)
|
|
|
|
# "Danger-zone" sections — flipping these can lock operators out (auth.*) or
|
|
# break OAuth callbacks (server.hostname/host). The UI shows a confirmation
|
|
# dialog before submitting them. The API accepts them; this list exists so
|
|
# the audit entry can flag the change as high-risk and the UI can surface
|
|
# the right warning copy.
|
|
_DANGER_SECTIONS: tuple[str, ...] = ("auth", "server")
|
|
|
|
# Known-but-optional config fields per section. The /admin/server-config UI
|
|
# uses this registry alongside the YAML payload to render fields the operator
|
|
# might want to set even though they're not currently in instance.yaml.
|
|
#
|
|
# Schema per field:
|
|
# {
|
|
# "kind": "string" | "secret" | "bool" | "int" | "select" | "object" | "array",
|
|
# "default": <type-appropriate default> (optional)
|
|
# "hint": "<one-line operator-facing help>"
|
|
# "options": [...] (only for kind="select")
|
|
# "fields": {<name>: <fieldspec>} (only for kind="object", nested fields)
|
|
# "item_kind": "string" | ... (only for kind="array", element type)
|
|
# "required": bool (defaults False; UI marks the label)
|
|
# }
|
|
#
|
|
# Subagents 2-4 will populate the bodies. The registry enables the UI to
|
|
# render missing-but-known fields with placeholders + hints rather than
|
|
# forcing the operator to discover them via the JSON-patch textarea or
|
|
# hitting a runtime error first. The smoke fixture below
|
|
# (data_source.bigquery.billing_project) proves the renderer wiring works
|
|
# end-to-end so subagents 2-4 only have to add registry entries — they
|
|
# don't need to touch admin_server_config.html.
|
|
_KNOWN_FIELDS: dict[str, dict[str, dict]] = {
|
|
"instance": {
|
|
# No commonly-missing instance-level fields. The example YAML's
|
|
# `name`/`subtitle` are always populated by `da setup` so they
|
|
# render via the populated path; nothing to surface here.
|
|
},
|
|
"data_source": {
|
|
"bigquery": {
|
|
"kind": "object",
|
|
"hint": "BigQuery connection knobs (read more in docs/DEPLOYMENT.md)",
|
|
"fields": {
|
|
"billing_project": {
|
|
"kind": "string",
|
|
"hint": (
|
|
"GCP project to bill BQ jobs against. Set when SA can read "
|
|
"the data project but cannot bill there (e.g. shared read-only "
|
|
"data project). Defaults to data_source.bigquery.project. "
|
|
"Mismatch → 403 USER_PROJECT_DENIED on every BQ call."
|
|
),
|
|
# Issue #160 §4.7.5: when this field is empty in the
|
|
# admin form, the JS template shows "(defaults to <project>)"
|
|
# as placeholder text — surfacing the access.py:339-340
|
|
# fallback rule directly in the UI without the operator
|
|
# having to read source. Path is walked against the
|
|
# `original` config payload from GET /api/admin/server-config.
|
|
"placeholder_from": ["data_source", "bigquery", "project"],
|
|
},
|
|
"max_bytes_per_materialize": {
|
|
"kind": "int",
|
|
"default": 10737418240,
|
|
"hint": (
|
|
"Cost guardrail for query_mode='materialized' BQ scans (dry-run "
|
|
"check before running). Bytes processed; exceeds → registration "
|
|
"or sync rejected. 0 disables the gate. Default 10737418240 = 10 GiB."
|
|
),
|
|
},
|
|
"bq_max_scan_bytes": {
|
|
"kind": "int",
|
|
"default": 5368709120,
|
|
"hint": (
|
|
"Cost guardrail for `da query --remote` against query_mode='remote' "
|
|
"BQ rows (dry-run check on the underlying SELECT before execute). "
|
|
"Bytes processed; exceeds → 400 remote_scan_too_large with a "
|
|
"`da fetch` suggestion. 0 disables the gate. Default 5368709120 = 5 GiB."
|
|
),
|
|
},
|
|
},
|
|
},
|
|
"keboola": {
|
|
"kind": "object",
|
|
"hint": "Keboola Storage API connection",
|
|
"fields": {
|
|
"stack_url": {
|
|
"kind": "string",
|
|
"hint": (
|
|
"e.g. https://connection.keboola.com (instance-specific stack URL). "
|
|
"Validated against private-IP allowlist on save (SSRF guard)."
|
|
),
|
|
},
|
|
"project_id": {
|
|
"kind": "string",
|
|
"hint": "Keboola project ID (numeric, but kept as string in YAML).",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"email": {
|
|
# SMTP fields render via the populated path (always set when email
|
|
# is enabled); no commonly-missing optional knobs at this layer.
|
|
},
|
|
"telegram": {
|
|
# Rarely missing; leave empty.
|
|
},
|
|
"jira": {
|
|
# Webhook + REST credentials always present when Jira is configured.
|
|
},
|
|
"theme": {
|
|
# Cosmetic only; rarely missing.
|
|
},
|
|
"server": {
|
|
# TLS / hostname knobs are mostly env-side; nothing to surface here.
|
|
},
|
|
"auth": {
|
|
"allowed_domain": {
|
|
"kind": "string",
|
|
"hint": (
|
|
"Comma-separated list of allowed sign-in email domains (e.g. "
|
|
"'acme.com,acme-internal.com'). Single domain works too. Empty → no "
|
|
"domain restriction (any verified Google identity can sign in)."
|
|
),
|
|
},
|
|
},
|
|
"ai": {
|
|
"base_url": {
|
|
"kind": "string",
|
|
"hint": (
|
|
"Required for provider='openai_compat' (LiteLLM, OpenRouter, vLLM, etc.). "
|
|
"Ignored when provider='anthropic'. Examples: https://litellm.example.com, "
|
|
"https://openrouter.ai/api/v1."
|
|
),
|
|
},
|
|
"structured_output": {
|
|
"kind": "select",
|
|
"options": ["strict", "json", "auto"],
|
|
"default": "auto",
|
|
"hint": (
|
|
"JSON-schema enforcement strategy. strict=Layer 1 only "
|
|
"(Anthropic/OpenAI native, fail otherwise). json=Layer 1 + Layer 2 "
|
|
"fallback. auto=all three layers including prompt-based JSON (most "
|
|
"compatible, least strict)."
|
|
),
|
|
},
|
|
},
|
|
"openmetadata": {
|
|
"url": {
|
|
"kind": "string",
|
|
"hint": "Base URL of your OpenMetadata server (e.g. https://catalog.example.com).",
|
|
},
|
|
"token": {
|
|
"kind": "secret",
|
|
"hint": (
|
|
"JWT bearer token. Use ${OPENMETADATA_TOKEN} env-var reference "
|
|
"(don't paste secret directly)."
|
|
),
|
|
},
|
|
"cache_ttl_seconds": {
|
|
"kind": "int",
|
|
"default": 3600,
|
|
"hint": "How long to cache catalog responses in-process. Default 3600s (1h).",
|
|
},
|
|
"verify_ssl": {
|
|
"kind": "bool",
|
|
"default": True,
|
|
"hint": (
|
|
"TLS verification. Default true. Set false ONLY for internal CAs / "
|
|
"self-signed certs — sends the JWT over an unverified channel."
|
|
),
|
|
},
|
|
},
|
|
"desktop": {
|
|
"jwt_issuer": {
|
|
"kind": "string",
|
|
"default": "data-analyst",
|
|
"hint": "JWT iss claim. Match what the desktop app verifies.",
|
|
},
|
|
"jwt_secret": {
|
|
"kind": "secret",
|
|
"hint": "JWT signing secret. Use ${DESKTOP_JWT_SECRET} env-var reference.",
|
|
},
|
|
"url_scheme": {
|
|
"kind": "string",
|
|
"default": "data-analyst",
|
|
"hint": "Custom URL scheme registered by the desktop app (data-analyst://...).",
|
|
},
|
|
},
|
|
# corporate_memory governance — optional. When the section is missing
|
|
# from instance.yaml the system runs in legacy democratic-wiki mode
|
|
# (no admin review). Schema mirrors config/instance.yaml.example
|
|
# lines 224-317; renderer handles arbitrary depth + arrays + maps.
|
|
"corporate_memory": {
|
|
"distribution_mode": {
|
|
"kind": "select",
|
|
"options": ["mandatory_only", "admin_curated", "hybrid"],
|
|
"default": "hybrid",
|
|
"hint": (
|
|
"How knowledge reaches users. mandatory_only = admin-only; "
|
|
"admin_curated = admin + user voting as feedback; "
|
|
"hybrid = default (mandatory from admin + optional from user voting)."
|
|
),
|
|
},
|
|
"approval_mode": {
|
|
"kind": "select",
|
|
"options": ["review_queue", "auto_publish", "threshold"],
|
|
"default": "review_queue",
|
|
"hint": (
|
|
"How AI-extracted items enter the system. review_queue = admin "
|
|
"approval required (default); auto_publish = live immediately; "
|
|
"threshold = high-confidence auto, low-confidence to queue."
|
|
),
|
|
},
|
|
"review_period_months": {
|
|
"kind": "int",
|
|
"default": 6,
|
|
"hint": "How often approved/mandatory items are flagged for re-review (months).",
|
|
},
|
|
"notify_on_new_items": {
|
|
"kind": "bool",
|
|
"default": True,
|
|
"hint": "Notify km_admins when new pending items arrive.",
|
|
},
|
|
"sources": {
|
|
"kind": "object",
|
|
"hint": (
|
|
"Knowledge-source ingestion. Each source has its own enabled "
|
|
"flag + base confidence."
|
|
),
|
|
"fields": {
|
|
"claude_local_md": {
|
|
"kind": "object",
|
|
"fields": {
|
|
"enabled": {"kind": "bool", "default": True},
|
|
"confidence_base": {
|
|
"kind": "float",
|
|
"default": 0.50,
|
|
"hint": "Confidence assigned to extractions from CLAUDE.local.md (0-1).",
|
|
},
|
|
},
|
|
},
|
|
"session_transcripts": {
|
|
"kind": "object",
|
|
"fields": {
|
|
"enabled": {"kind": "bool", "default": True},
|
|
"confidence_base": {"kind": "float", "default": 0.60},
|
|
"max_turns_per_session": {
|
|
"kind": "int",
|
|
"default": 100,
|
|
"hint": "Truncate transcripts longer than this many turns.",
|
|
},
|
|
"detection_types": {
|
|
"kind": "array",
|
|
"item_kind": "string",
|
|
"default": [
|
|
"correction",
|
|
"confirmation",
|
|
"unprompted_definition",
|
|
],
|
|
"hint": (
|
|
"Which extraction patterns to detect. Each entry "
|
|
"is a detection-type tag."
|
|
),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"extraction": {
|
|
"kind": "object",
|
|
"fields": {
|
|
"model": {
|
|
"kind": "string",
|
|
"default": "claude-haiku-4-5-20251001",
|
|
"hint": "LLM used to extract knowledge. Override for cost or quality.",
|
|
},
|
|
"sensitivity_check": {"kind": "bool", "default": True},
|
|
"contradiction_check": {"kind": "bool", "default": True},
|
|
},
|
|
},
|
|
"confidence": {
|
|
"kind": "object",
|
|
"hint": "Confidence scoring + decay rules.",
|
|
"fields": {
|
|
"base": {
|
|
"kind": "map",
|
|
"key_kind": "string",
|
|
"value_kind": "float",
|
|
"default": {
|
|
"user_verification.correction": 0.90,
|
|
"user_verification.unprompted_definition": 0.90,
|
|
"user_verification.confirmation": 0.60,
|
|
"admin_mandate": 1.00,
|
|
"claude_local_md": 0.50,
|
|
"session_transcript": 0.50,
|
|
},
|
|
"hint": (
|
|
"Base score per source/detection. Keys are 'source_type' "
|
|
"or 'source_type.detection_type' (the dot is data, not "
|
|
"nesting)."
|
|
),
|
|
},
|
|
"modifiers": {
|
|
# map<string, map<string, float>>. The renderer's structured
|
|
# editor for "map of objects with declared subfields" is a
|
|
# TODO (see admin_server_config.html); for now this falls
|
|
# back to a JSON textarea — admins editing it see the
|
|
# schema doc inline via the hint.
|
|
"kind": "map",
|
|
"key_kind": "string",
|
|
"value_kind": "object",
|
|
"value_fields": {}, # signals the JSON-textarea fallback
|
|
"hint": (
|
|
"Per-key modifier step sizes applied to base when "
|
|
"optional signals are present (3-level dotted paths). "
|
|
"Edit as a JSON object — outer keys mirror confidence.base "
|
|
"keys; inner objects map signal name to bonus float."
|
|
),
|
|
},
|
|
"decay": {
|
|
"kind": "object",
|
|
"fields": {
|
|
"mode": {
|
|
"kind": "select",
|
|
"options": ["linear", "exponential"],
|
|
"default": "exponential",
|
|
},
|
|
"half_life_months": {
|
|
"kind": "int",
|
|
"default": 12,
|
|
"hint": "Used when mode=exponential.",
|
|
},
|
|
"decay_rate_monthly": {
|
|
"kind": "float",
|
|
"default": 0.02,
|
|
"hint": "Used when mode=linear.",
|
|
},
|
|
"floor": {
|
|
"kind": "map",
|
|
"key_kind": "string",
|
|
"value_kind": "float",
|
|
"default": {
|
|
"admin_mandate": 0.50,
|
|
"user_verification": 0.40,
|
|
"default": 0.0,
|
|
},
|
|
"hint": (
|
|
"Per-source minimum confidence — items never decay "
|
|
"below this floor."
|
|
),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"contradiction_detection": {
|
|
"kind": "object",
|
|
"fields": {
|
|
"enabled": {"kind": "bool", "default": True},
|
|
"max_candidates": {
|
|
"kind": "int",
|
|
"default": 10,
|
|
"hint": "Max contradiction candidates to evaluate per new item.",
|
|
},
|
|
},
|
|
},
|
|
"entity_resolution": {
|
|
"kind": "object",
|
|
"fields": {
|
|
"enabled": {"kind": "bool", "default": True},
|
|
"entities": {
|
|
"kind": "map",
|
|
"key_kind": "string",
|
|
"value_kind": "array",
|
|
"value_item_kind": "string",
|
|
"default": {
|
|
"metrics": ["churn", "MRR", "ARR", "NPS", "CAC", "LTV"],
|
|
"products": ["Platform", "API", "Dashboard"],
|
|
},
|
|
"hint": (
|
|
"Domain-entity vocabulary. Key = domain category; value = "
|
|
"canonical names list."
|
|
),
|
|
},
|
|
},
|
|
},
|
|
"domain_owners": {
|
|
"kind": "map",
|
|
"key_kind": "string",
|
|
"value_kind": "array",
|
|
"value_item_kind": "string",
|
|
"hint": (
|
|
"Per-domain admin emails. Key = domain name; value = email list."
|
|
),
|
|
},
|
|
"domains": {
|
|
"kind": "array",
|
|
"item_kind": "string",
|
|
"default": [
|
|
"finance",
|
|
"engineering",
|
|
"product",
|
|
"data",
|
|
"operations",
|
|
"infrastructure",
|
|
],
|
|
"hint": (
|
|
"Knowledge domains analysts can target. Each must match a key "
|
|
"in domain_owners."
|
|
),
|
|
},
|
|
},
|
|
}
|
|
|
|
# Keys whose values must be redacted from the audit diff. We match
|
|
# substring (case-insensitive) so `client_secret`, `api_token`,
|
|
# `webapp_secret_key`, `bot_token`, `password`, `smtp_password`, etc. all
|
|
# get masked even when nested.
|
|
_SECRET_KEY_PATTERNS: tuple[str, ...] = (
|
|
"secret",
|
|
"token",
|
|
"password",
|
|
"api_key",
|
|
)
|
|
|
|
|
|
def _is_secret_key(key: str) -> bool:
|
|
"""True if a config key holds a credential and should be masked in audit logs."""
|
|
k = key.lower()
|
|
return any(pat in k for pat in _SECRET_KEY_PATTERNS)
|
|
|
|
|
|
def _mask(value: Any) -> str:
|
|
"""Replacement value used in the audit diff for secret fields.
|
|
|
|
We deliberately do NOT preserve length or any hint about the secret —
|
|
the diff is read by other admins, and there's no operator value to
|
|
leaking "the new SMTP password is 16 chars". `***` is enough to show
|
|
that the field changed without exposing it.
|
|
"""
|
|
if value in (None, ""):
|
|
return "<empty>"
|
|
return "***"
|
|
|
|
|
|
# Sentinel values produced by `_mask`. Any patch leaf that arrives at a
|
|
# secret-keyed slot still bearing one of these strings means the caller
|
|
# round-tripped the GET payload (which redacts secret-keyed children inside
|
|
# nested objects) without changing the value — `_strip_redacted_sentinels`
|
|
# drops the leaf so deep-merge preserves whatever the overlay already had,
|
|
# rather than persisting the placeholder on top of the real secret.
|
|
_REDACTED_SENTINELS: frozenset = frozenset({"***", "<empty>"})
|
|
|
|
|
|
def _strip_redacted_sentinels(value: Any, key_hint: str = "") -> Any:
|
|
"""Recursively drop secret-keyed leaves whose value is a redaction sentinel.
|
|
|
|
Symmetric with `_redact`: the GET handler masks secret-keyed children
|
|
inside nested objects so the form never shows cleartext, and this
|
|
function is the write-side counterpart that ensures the placeholder
|
|
doesn't make a round-trip back into the overlay. Defense-in-depth
|
|
alongside the client-side `scrubRedactedSecrets` in
|
|
`admin_server_config.html` — an API caller (CLI / script) that forgets
|
|
to scrub still can't corrupt secrets via this endpoint.
|
|
"""
|
|
if isinstance(value, dict):
|
|
out: Dict[str, Any] = {}
|
|
for k, v in value.items():
|
|
if _is_secret_key(k) and isinstance(v, str) and v in _REDACTED_SENTINELS:
|
|
continue
|
|
out[k] = _strip_redacted_sentinels(v, k)
|
|
return out
|
|
if isinstance(value, list):
|
|
return [_strip_redacted_sentinels(item, key_hint) for item in value]
|
|
return value
|
|
|
|
|
|
def _redact(value: Any, key_hint: str = "") -> Any:
|
|
"""Recursively mask secret-looking fields in a config subtree.
|
|
|
|
`key_hint` is the parent key — used so a string value like
|
|
``"${KEBOOLA_TOKEN}"`` under ``token_env`` is masked even though the
|
|
value itself isn't a credential, because the key signals it points at
|
|
one.
|
|
"""
|
|
if isinstance(value, dict):
|
|
return {k: (_mask(v) if _is_secret_key(k) else _redact(v, k)) for k, v in value.items()}
|
|
if isinstance(value, list):
|
|
return [_redact(item, key_hint) for item in value]
|
|
if key_hint and _is_secret_key(key_hint):
|
|
return _mask(value)
|
|
return value
|
|
|
|
|
|
def _diff_dicts(before: dict, after: dict, path: str = "") -> List[Dict[str, Any]]:
|
|
"""Flat list of changed fields between two dicts.
|
|
|
|
Output: [{"path": "email.smtp_host", "before": "...", "after": "..."}].
|
|
Diff is computed on RAW values, then each row's `before`/`after` is
|
|
masked via `_mask` when the leaf key matches `_is_secret_key` — pre-
|
|
masking the inputs would collapse a secret rotation (e.g. password A
|
|
→ password B) into "no diff" because both sides redact to ``"***"``,
|
|
and the audit log would then silently fail to record one of the most
|
|
security-relevant changes. Compare raw, redact when emitting.
|
|
|
|
Recurses into a dict on either side (treating the missing side as
|
|
`{}`) so adding a brand-new section reports per-field paths
|
|
(`email.smtp_host`) rather than a single opaque `email` blob — that
|
|
keeps the audit row useful when an admin populates a section for the
|
|
first time.
|
|
"""
|
|
changes: List[Dict[str, Any]] = []
|
|
keys = set(before.keys()) | set(after.keys())
|
|
for key in sorted(keys):
|
|
new_path = f"{path}.{key}" if path else key
|
|
b_val = before.get(key)
|
|
a_val = after.get(key)
|
|
b_is_dict = isinstance(b_val, dict)
|
|
a_is_dict = isinstance(a_val, dict)
|
|
# Dict-vs-dict (or dict-vs-None) → recurse for per-field paths.
|
|
if b_is_dict and a_is_dict:
|
|
changes.extend(_diff_dicts(b_val, a_val, new_path))
|
|
elif b_is_dict and a_val is None:
|
|
changes.extend(_diff_dicts(b_val, {}, new_path))
|
|
elif a_is_dict and b_val is None:
|
|
changes.extend(_diff_dicts({}, a_val, new_path))
|
|
# Dict↔scalar shape change is recorded as a single replacement at
|
|
# the parent path. Recursing with `{}` would lose the scalar side
|
|
# entirely (admin sets `keboola: {…}` to `keboola: "disabled"` —
|
|
# auditor would see members removed but never the new value).
|
|
# The dict side may itself contain secret-keyed children (e.g.
|
|
# `keboola: {token_env: ${KEBOOLA_TOKEN}}` resolved to cleartext);
|
|
# `_redact` masks those children even when the parent key isn't
|
|
# secret-named, so the audit log doesn't leak ${ENV_VAR}-resolved
|
|
# values when a section is replaced wholesale.
|
|
elif b_is_dict != a_is_dict:
|
|
if _is_secret_key(key):
|
|
changes.append({
|
|
"path": new_path,
|
|
"before": _mask(b_val),
|
|
"after": _mask(a_val),
|
|
})
|
|
else:
|
|
changes.append({
|
|
"path": new_path,
|
|
"before": _redact(b_val, key) if b_is_dict else b_val,
|
|
"after": _redact(a_val, key) if a_is_dict else a_val,
|
|
})
|
|
elif b_val != a_val:
|
|
if _is_secret_key(key):
|
|
changes.append({
|
|
"path": new_path,
|
|
"before": _mask(b_val),
|
|
"after": _mask(a_val),
|
|
})
|
|
else:
|
|
changes.append({"path": new_path, "before": b_val, "after": a_val})
|
|
return changes
|
|
|
|
|
|
def _deep_merge(base: dict, patch: dict) -> dict:
|
|
"""Merge `patch` into `base` recursively, returning a new dict.
|
|
|
|
Patch values overwrite base values. Dict-into-dict recurses; everything
|
|
else (lists, scalars, None) is replaced wholesale — admin sets
|
|
``email: {smtp_port: 465}`` and we don't try to re-merge nested ports.
|
|
"""
|
|
out = dict(base)
|
|
for key, value in patch.items():
|
|
if isinstance(value, dict) and isinstance(out.get(key), dict):
|
|
out[key] = _deep_merge(out[key], value)
|
|
else:
|
|
out[key] = value
|
|
return out
|
|
|
|
|
|
def _load_current_instance_yaml() -> dict:
|
|
"""Return the editor's view of instance.yaml — deep-merge of static +
|
|
overlay via ``app.instance_config.load_instance_config``.
|
|
|
|
Readers (GET /server-config) hit the cache and trust that writers
|
|
invalidate. Writers must call ``reset_cache()`` explicitly *before*
|
|
the read so they see the latest disk state in the read-modify-write
|
|
sequence. The shared helper is the authoritative source so the editor
|
|
never sees a different view than the rest of the running app.
|
|
"""
|
|
from app.instance_config import load_instance_config
|
|
return load_instance_config()
|
|
|
|
|
|
def _public_view(config: dict) -> dict:
|
|
"""Return a config dict safe to render in the admin UI form.
|
|
|
|
Deep-copies and redacts secret-looking fields so an admin can see
|
|
*which* fields are populated without the cleartext leaking into the
|
|
rendered HTML / browser DevTools.
|
|
"""
|
|
import copy
|
|
return _redact(copy.deepcopy(config))
|
|
|
|
|
|
class ServerConfigUpdateRequest(BaseModel):
|
|
"""Patch payload for POST /api/admin/server-config.
|
|
|
|
Only the sections listed in `_EDITABLE_SECTIONS` are accepted; anything
|
|
else is rejected with 400. `confirm_danger` must be true if the patch
|
|
touches any danger-zone section (auth.*, server.*).
|
|
"""
|
|
sections: Dict[str, Dict[str, Any]] = Field(
|
|
default_factory=dict,
|
|
description="Per-section patch dict (e.g. {'instance': {'name': 'X'}})",
|
|
)
|
|
confirm_danger: bool = Field(
|
|
default=False,
|
|
description="Must be true to apply changes touching auth.* or server.*",
|
|
)
|
|
|
|
|
|
# Optional BQ fields whose runtime defaults are documented but which used to
|
|
# be invisible in the editor when YAML omitted them. The data_source.bigquery
|
|
# subtree renders as a JSON textarea; a key that's absent from the GET
|
|
# payload literally cannot appear in the form for the operator to edit. We
|
|
# surface them with their documented defaults so the UI always shows them as
|
|
# editable knobs — see Phase J of the admin-tables-cleanup work.
|
|
#
|
|
# - billing_project: defaults to data project; explicit value needed when
|
|
# the SA can read the data project but not bill against it.
|
|
# - max_bytes_per_materialize: cost guardrail for `query_mode='materialized'`
|
|
# (default 10 GiB; 0 disables; null falls through to the default).
|
|
_BQ_OPTIONAL_FIELD_DEFAULTS: Dict[str, Any] = {
|
|
# `billing_project` intentionally NOT seeded here. The empty-string
|
|
# default would inject `billing_project: ""` into every GET payload,
|
|
# which makes the JS `isUnset = (value === undefined)` check evaluate
|
|
# False — and the `(defaults to <project>)` placeholder feature
|
|
# (#160 §4.7.5) would never render. Leaving it absent keeps the
|
|
# field in the unset rendering path so placeholder_from fires.
|
|
# Devin Review iter #3 on PR #168.
|
|
"max_bytes_per_materialize": 10737418240,
|
|
"bq_max_scan_bytes": 5368709120,
|
|
}
|
|
|
|
|
|
def _ensure_bq_optional_fields(sections: Dict[str, Any]) -> None:
|
|
"""In-place: add missing BQ optional fields to data_source.bigquery so the
|
|
UI's JSON-textarea renders them as editable keys. Existing values are
|
|
preserved — only absent keys are populated with their documented default.
|
|
"""
|
|
ds = sections.get("data_source")
|
|
if not isinstance(ds, dict):
|
|
return
|
|
bq = ds.get("bigquery")
|
|
if not isinstance(bq, dict):
|
|
# No BQ subsection — leave alone. Non-BQ instances don't need these
|
|
# knobs, and creating an empty bigquery dict would be misleading.
|
|
return
|
|
for key, default in _BQ_OPTIONAL_FIELD_DEFAULTS.items():
|
|
bq.setdefault(key, default)
|
|
|
|
|
|
@router.get("/server-config")
|
|
async def get_server_config(
|
|
user: dict = Depends(require_admin),
|
|
):
|
|
"""Return the current instance.yaml with secrets redacted.
|
|
|
|
Used by the /admin/server-config UI to prefill its form. The redacted
|
|
payload mirrors the actual file shape, so the UI doesn't need to know
|
|
the schema — it iterates over the editable sections and renders the
|
|
fields it finds. Empty sections still show in the response so the form
|
|
knows to render their headers.
|
|
"""
|
|
config = _load_current_instance_yaml()
|
|
redacted = _public_view(config)
|
|
# Surface every editable section so the UI renders them even when the
|
|
# file omits them — operator can populate from scratch without manual
|
|
# JSON edits.
|
|
sections = {section: redacted.get(section, {}) for section in _EDITABLE_SECTIONS}
|
|
# Always surface the optional BQ knobs so the operator sees them in the
|
|
# UI's JSON editor instead of having to know they exist (Phase J).
|
|
_ensure_bq_optional_fields(sections)
|
|
return {
|
|
"sections": sections,
|
|
"editable_sections": list(_EDITABLE_SECTIONS),
|
|
"danger_sections": list(_DANGER_SECTIONS),
|
|
"secret_key_patterns": list(_SECRET_KEY_PATTERNS),
|
|
# Known-but-optional fields per section so the UI can render
|
|
# placeholders for fields the operator hasn't set yet (Phase J).
|
|
# Subagents 2-4 populate the bodies; the renderer ships now so the
|
|
# mechanism is wired end-to-end and adding entries is purely a
|
|
# data-edit in `_KNOWN_FIELDS` above.
|
|
"known_fields": _KNOWN_FIELDS,
|
|
}
|
|
|
|
|
|
@router.post("/server-config")
|
|
async def update_server_config(
|
|
request: ServerConfigUpdateRequest,
|
|
user: dict = Depends(require_admin),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Patch instance.yaml from the /admin/server-config editor.
|
|
|
|
Accepts a partial patch keyed by section. Validates sections, refuses
|
|
danger-zone edits without explicit confirmation, deep-merges into the
|
|
current overlay, writes the file, and emits one audit entry per save
|
|
with a sanitized diff. Returns ``restart_required=true`` so the UI can
|
|
show the restart banner — hot-reload is a separate issue (see #91 Out
|
|
of scope).
|
|
"""
|
|
import yaml
|
|
|
|
if not request.sections:
|
|
raise HTTPException(status_code=422, detail="sections cannot be empty")
|
|
|
|
# Reject unknown sections loudly. Without this, a typo like "thmee"
|
|
# would silently land in the YAML root and the operator wouldn't see
|
|
# their colour change apply.
|
|
unknown = sorted(set(request.sections.keys()) - set(_EDITABLE_SECTIONS))
|
|
if unknown:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"unknown section(s): {', '.join(unknown)}. "
|
|
f"Editable: {', '.join(_EDITABLE_SECTIONS)}",
|
|
)
|
|
|
|
# Danger-zone gate. The UI shows a confirmation dialog before posting
|
|
# with confirm_danger=true; an API caller (CLI/script) has to pass it
|
|
# explicitly so they can't fat-finger a hostname change.
|
|
danger_touched = sorted(set(request.sections.keys()) & set(_DANGER_SECTIONS))
|
|
if danger_touched and not request.confirm_danger:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"section(s) {', '.join(danger_touched)} require confirm_danger=true",
|
|
)
|
|
|
|
# SSRF protection — same gate the /configure wizard applies to
|
|
# keboola_url, but here it covers any URL-bearing field reachable via
|
|
# the per-section patch (e.g. data_source.keboola.stack_url).
|
|
_validate_urls_in_patch(request.sections)
|
|
|
|
# Defense-in-depth: scrub redaction sentinels (`***` / `<empty>`) out of
|
|
# secret-keyed leaves in the patch before they reach the deep-merge.
|
|
# The client form does the same scrub, but an API caller round-tripping
|
|
# the GET payload could otherwise overwrite real overlay secrets with
|
|
# the placeholder shown in the form.
|
|
scrubbed_sections: Dict[str, Dict[str, Any]] = {
|
|
section: _strip_redacted_sentinels(patch, section)
|
|
for section, patch in request.sections.items()
|
|
}
|
|
|
|
# Serialize read-modify-write across concurrent admin saves. Without the
|
|
# lock, two saves would each read the same overlay snapshot, merge their
|
|
# disjoint patches, and the second os.replace would silently drop the
|
|
# first patch. The lock spans the cache-invalidate → load → merge →
|
|
# atomic-write sequence; the audit log sits outside since it operates on
|
|
# local snapshots.
|
|
from app.instance_config import reset_cache
|
|
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
|
|
config_path = data_dir / "state" / "instance.yaml"
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with _overlay_write_lock:
|
|
# Drop the in-process cache so we read the latest on-disk state,
|
|
# including any update that landed from a concurrent caller before
|
|
# we acquired the lock.
|
|
reset_cache()
|
|
before = _load_current_instance_yaml()
|
|
|
|
# Deep merge — section-by-section so we never accidentally delete a
|
|
# sibling section the patch didn't touch. Use the redaction-scrubbed
|
|
# patch so a round-tripped GET payload can't overwrite real secrets
|
|
# with the `***` placeholder.
|
|
after = dict(before)
|
|
for section, patch in scrubbed_sections.items():
|
|
if not isinstance(patch, dict):
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=f"section '{section}' must be an object, got {type(patch).__name__}",
|
|
)
|
|
if isinstance(after.get(section), dict):
|
|
after[section] = _deep_merge(after[section], patch)
|
|
else:
|
|
after[section] = patch
|
|
|
|
# Write only the sections the user actually patched in this request.
|
|
# Two reasons:
|
|
# 1. Persisting the full merged config (or every editable section)
|
|
# would snapshot non-editable static sections into the overlay,
|
|
# shadowing later operator updates to those sections in the
|
|
# static file (`_load_current_instance_yaml` merges static + overlay,
|
|
# overlay wins per leaf).
|
|
# 2. The merged config has `${ENV_VAR}` placeholders RESOLVED to the
|
|
# runtime values by config.loader. Writing every editable section
|
|
# back would persist real cleartext secrets where the static file
|
|
# had only env-var references — turning `smtp_password:
|
|
# ${SMTP_PASSWORD}` into `smtp_password: hunter2` in the overlay.
|
|
# By writing only the sections in `request.sections` we keep both the
|
|
# static-evolution and the env-var-placeholder properties intact.
|
|
overlay_payload: Dict[str, Any] = {}
|
|
if config_path.exists():
|
|
try:
|
|
overlay_payload = yaml.safe_load(config_path.read_text()) or {}
|
|
except Exception as e:
|
|
# A corrupt overlay used to be silently replaced — that masked
|
|
# disk corruption / partial writes / hand-edits and dropped
|
|
# every previously-saved section on the next save. Refuse and
|
|
# surface so the operator can investigate.
|
|
logger.exception("server-config: refusing to overwrite corrupt overlay at %s", config_path)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"refusing to overwrite corrupt overlay at {config_path} ({e}); "
|
|
"back up and remove the file, or fix it by hand",
|
|
) from e
|
|
for section, patch in scrubbed_sections.items():
|
|
if section not in _EDITABLE_SECTIONS:
|
|
continue
|
|
# Deep-merge the patch into the existing overlay slot (or static-
|
|
# backed `before` if overlay had nothing for this section). This
|
|
# preserves any unrelated keys the operator didn't touch in this
|
|
# request — e.g. patching `email.smtp_host` doesn't blow away the
|
|
# `email.smtp_password: ${SMTP_PASSWORD}` reference.
|
|
existing = overlay_payload.get(section)
|
|
if not isinstance(existing, dict):
|
|
existing = {}
|
|
overlay_payload[section] = _deep_merge(existing, patch)
|
|
|
|
# Atomic via tmp + os.replace so two concurrent admin saves can't
|
|
# interleave bytes and produce corrupt YAML (especially harmful since
|
|
# auth.* is editable here — half-written file → operator lockout).
|
|
tmp_path = config_path.with_suffix(config_path.suffix + ".tmp")
|
|
tmp_path.write_text(yaml.dump(overlay_payload, default_flow_style=False, sort_keys=False))
|
|
os.replace(tmp_path, config_path)
|
|
logger.info("server-config: wrote %d section(s) to %s",
|
|
len(request.sections), config_path)
|
|
|
|
# Invalidate cached instance config so subsequent reads pick up the
|
|
# change. Hot-reload of running modules (auth providers, SMTP client)
|
|
# is out of scope — the restart banner tells the operator to bounce.
|
|
reset_cache()
|
|
|
|
# Audit entry — diff is computed on RAW values then `_diff_dicts`
|
|
# redacts each row whose leaf key matches `_is_secret_key`. Pre-
|
|
# masking the inputs would collapse a secret rotation into "no
|
|
# diff" because both sides redact to ``***``, hiding the most
|
|
# security-relevant changes from the audit log. We log even if no
|
|
# fields changed so the operator's intent (touched the page, hit
|
|
# save) is auditable.
|
|
diff = _diff_dicts(before, after)
|
|
AuditRepository(conn).log(
|
|
user_id=user.get("id"),
|
|
action="instance_config.update",
|
|
resource="instance.yaml",
|
|
params={
|
|
"sections": sorted(request.sections.keys()),
|
|
"danger_sections": danger_touched,
|
|
"diff": diff,
|
|
"diff_count": len(diff),
|
|
},
|
|
)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"restart_required": True,
|
|
"sections_updated": sorted(request.sections.keys()),
|
|
"diff_count": len(diff),
|
|
}
|
|
|
|
|
|
# --- End server-config editor -----------------------------------------------
|
|
|
|
|
|
# Source types accepted by /api/admin/register-table. Anything else is
|
|
# rejected with 422 — keeps a typo'd source_type from silently landing in
|
|
# table_registry (where it would later confuse the orchestrator scan).
|
|
_VALID_SOURCE_TYPES: tuple[str, ...] = ("keboola", "bigquery", "jira", "local")
|
|
|
|
# Explicit allowlist of audit-payload keys whose values are credentials and
|
|
# must be masked. Substring-scan + ad-hoc whitelist (the previous shape) is
|
|
# fragile in two ways:
|
|
# 1. False positive: legit fields like `primary_key` get masked because
|
|
# they contain "key" — we then need a whitelist exception, which has
|
|
# to be kept in sync as new fields are added.
|
|
# 2. False negative: a future field like `primary_key_hash` *would* be
|
|
# masked (defensible) but `not_actually_a_token` ALSO matches "token"
|
|
# and gets masked unnecessarily; conversely, a brand-new credential
|
|
# field that doesn't contain one of the patterns (`auth_material`,
|
|
# `bearer`) silently leaks.
|
|
# Allowlist puts the burden on the developer adding a new secret-bearing
|
|
# field: they must add the literal key name here, which forces a code-
|
|
# review touch on the audit path. Audit the current Pydantic models
|
|
# (RegisterTableRequest / UpdateTableRequest / ConfigureRequest /
|
|
# ServerConfigUpdateRequest) when extending — the registry payloads don't
|
|
# currently carry credentials, but ConfigureRequest does (`keboola_token`)
|
|
# and could be routed through this sanitizer in the future.
|
|
_SECRET_FIELDS: frozenset = frozenset({
|
|
# ConfigureRequest — POST /api/admin/configure carries Keboola creds.
|
|
"keboola_token",
|
|
# Generic names that have appeared in earlier iterations of admin
|
|
# request bodies and could resurface — keep them masked defensively.
|
|
"api_token",
|
|
"auth_token",
|
|
"bot_token",
|
|
"client_secret",
|
|
"google_client_secret",
|
|
"google_oauth_client_secret",
|
|
"password",
|
|
"smtp_password",
|
|
"webapp_secret_key",
|
|
"bot_secret",
|
|
# Marketplace PATs (private repos) — see src/marketplace.py.
|
|
"marketplace_token",
|
|
"marketplace_pat",
|
|
})
|
|
|
|
|
|
def _sanitize_for_audit(payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Mask credential-bearing fields in a request payload before audit_log.
|
|
|
|
Uses an explicit `_SECRET_FIELDS` allowlist (case-insensitive) instead
|
|
of substring matching. The trade-off is that adding a new secret field
|
|
requires updating the set — but that's the *point*: the test suite
|
|
asserts `not_actually_a_token` does NOT get masked, so a substring-
|
|
based regression would surface immediately, and a missing entry for a
|
|
real new credential gets caught at code review of the audit path.
|
|
"""
|
|
out: Dict[str, Any] = {}
|
|
for k, v in payload.items():
|
|
if k.lower() in _SECRET_FIELDS:
|
|
out[k] = "***" if v not in (None, "") else "<empty>"
|
|
else:
|
|
out[k] = v
|
|
return out
|
|
|
|
|
|
# Both the BigQuery and Keboola materialize paths funnel `source_query`
|
|
# through DuckDB (BQ via the bigquery extension's COPY translation, Keboola
|
|
# via an ATTACH'd extension and a direct COPY). DuckDB uses double quotes
|
|
# for quoted identifiers — backticks are a BigQuery-native syntactic form
|
|
# DuckDB's parser does not honor, so a backtick-quoted source_query either
|
|
# parse-errors at COPY time or silently scans nothing. Surfaced from the
|
|
# field validator on RegisterTableRequest AND the merged-record path in
|
|
# `update_table` so neither route can persist a backtick query.
|
|
_BACKTICK_REJECTION_MESSAGE = (
|
|
"source_query uses BigQuery-native backtick identifiers (e.g. "
|
|
"`project.dataset.table`), but the materialize path runs the SQL "
|
|
"through DuckDB's BigQuery extension which uses DuckDB-flavor "
|
|
"identifiers. Rewrite to DuckDB syntax: bq.\"dataset\".\"table\" "
|
|
"(with the attached catalog alias `bq` plus double-quoted dataset/"
|
|
"table). The instance is configured with the data project, so you "
|
|
"don't need to repeat it in the FROM clause."
|
|
)
|
|
|
|
|
|
class RegisterTableRequest(BaseModel):
|
|
name: str
|
|
folder: Optional[str] = None
|
|
sync_strategy: str = Field(
|
|
default="full_refresh",
|
|
deprecated=True,
|
|
description=(
|
|
"DEPRECATED: catalog/profiler metadata only. No extractor reads "
|
|
"this field; every sync is a full overwrite regardless of value. "
|
|
"profiler.is_partitioned() consumes it for parquet-layout "
|
|
"detection. Field stays for back-compat; will be removed in a "
|
|
"future major release."
|
|
),
|
|
)
|
|
# Composite primary keys are real (session-grain MSA tables key on
|
|
# `(session_id, event_date)`, browse rows on more). The frontend sends +
|
|
# reads this as a list; backend stores it JSON-serialized in VARCHAR.
|
|
# A bare string is accepted for backward compat — see _normalize_primary_key.
|
|
primary_key: Optional[List[str]] = None
|
|
description: Optional[str] = None
|
|
source_type: Optional[str] = None
|
|
bucket: Optional[str] = None
|
|
source_table: Optional[str] = None
|
|
# Backs query_mode='materialized'. Stored verbatim in
|
|
# table_registry.source_query (schema v20); the trigger pass runs it
|
|
# through the DuckDB BQ extension via BqAccess and writes the result
|
|
# to /data/extracts/bigquery/data/<id>.parquet.
|
|
source_query: Optional[str] = None
|
|
query_mode: str = "local"
|
|
sync_schedule: Optional[str] = None
|
|
profile_after_sync: bool = Field(
|
|
default=True,
|
|
deprecated=True,
|
|
description=(
|
|
"DEPRECATED: not consumed by the runtime (Agent 1 finding "
|
|
"2026-05-01). Profiler runs unconditionally on every synced "
|
|
"table; this flag has no effect. Field stays for back-compat."
|
|
),
|
|
)
|
|
|
|
@model_validator(mode="after")
|
|
def _check_mode_query_coherence(self):
|
|
"""Enforce query_mode ↔ source_query invariants up front so an admin
|
|
can't persist a remote/local row carrying an orphan source_query.
|
|
|
|
For BigQuery materialized rows, an empty source_query is allowed here
|
|
because _validate_bigquery_register_payload generates it from
|
|
bucket+source_table after this validator runs. For all other source
|
|
types (e.g. Keboola), source_query is still required for materialized.
|
|
"""
|
|
sq = (self.source_query or "").strip() or None
|
|
if self.query_mode != "materialized" and sq:
|
|
raise ValueError(
|
|
"source_query is only valid when query_mode='materialized'"
|
|
)
|
|
# Non-BQ materialized rows must supply source_query explicitly — there
|
|
# is no server-generate fallback for Keboola materialized.
|
|
if self.query_mode == "materialized" and not sq and self.source_type != "bigquery":
|
|
raise ValueError(
|
|
"query_mode='materialized' requires a non-empty source_query"
|
|
)
|
|
# Backtick guard stays for non-materialized rows (DuckDB-flavor SQL
|
|
# contract); materialized SQL is BigQuery-native and MUST allow
|
|
# backticks for dashed identifiers (e.g. `prj-org.dataset.table`).
|
|
if self.query_mode != "materialized" and sq and "`" in sq:
|
|
raise ValueError(_BACKTICK_REJECTION_MESSAGE)
|
|
# Normalise: stash the trimmed-or-None form so the persisted column
|
|
# never carries surrounding whitespace or empty-string sentinels.
|
|
self.source_query = sq
|
|
return self
|
|
|
|
@field_validator("primary_key", mode="before")
|
|
@classmethod
|
|
def _coerce_primary_key(cls, v):
|
|
return _normalize_primary_key(v)
|
|
|
|
@field_validator("source_type", mode="before")
|
|
@classmethod
|
|
def _validate_source_type(cls, v):
|
|
# None is tolerated for backward compat with old CLI scripts that
|
|
# didn't set a source_type; the route resolves it later. Anything
|
|
# else must be in the canonical list.
|
|
if v in (None, ""):
|
|
return v
|
|
if v not in _VALID_SOURCE_TYPES:
|
|
raise ValueError(
|
|
f"source_type must be one of {sorted(_VALID_SOURCE_TYPES)}, got {v!r}"
|
|
)
|
|
return v
|
|
|
|
@field_validator("sync_schedule", mode="before")
|
|
@classmethod
|
|
def _validate_sync_schedule(cls, v):
|
|
# None / "" → no schedule, accepted.
|
|
# Any non-empty string (including pure whitespace) must parse as a
|
|
# valid schedule — otherwise it would be persisted and silently
|
|
# ignored by the runtime evaluator.
|
|
if v in (None, ""):
|
|
return v
|
|
if not is_valid_schedule(v):
|
|
raise ValueError(
|
|
f"sync_schedule must be 'every Nm' / 'every Nh' / "
|
|
f"'daily HH:MM[,HH:MM,...]', got {v!r}"
|
|
)
|
|
return v
|
|
|
|
|
|
def _generate_materialized_source_query(
|
|
bucket: str, source_table: str, project_id: str,
|
|
) -> str:
|
|
"""Build the canonical full-table-dump source_query for a materialized
|
|
BQ row when admin only supplies dataset + table. The result is
|
|
BigQuery-native SQL — wrapped at materialize time into
|
|
bigquery_query(...) by connectors.bigquery.extractor.materialize_query."""
|
|
if not _is_safe_quoted_identifier(bucket):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"bigquery: dataset {bucket!r} is unsafe",
|
|
)
|
|
if not _is_safe_quoted_identifier(source_table):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"bigquery: source_table {source_table!r} is unsafe",
|
|
)
|
|
if not _is_safe_project_id(project_id):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"bigquery: data_source.bigquery.project {project_id!r} is malformed",
|
|
)
|
|
return f"SELECT * FROM `{project_id}.{bucket}.{source_table}`"
|
|
|
|
|
|
def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None:
|
|
"""Enforce BQ-specific shape on a register/precheck request.
|
|
|
|
Two BQ paths:
|
|
|
|
- ``query_mode='materialized'`` — admin-registered SQL writes a parquet on
|
|
schedule. Requires ``source_query``; ``bucket`` / ``source_table`` are
|
|
not used (the SQL inlines the references). Doesn't force any field; the
|
|
Pydantic ``model_validator`` already gated the query/mode coherence.
|
|
|
|
- ``query_mode='remote'`` (or default) — remote view over a single BQ
|
|
table. Requires ``bucket`` (BQ dataset) + ``source_table``. Mutates
|
|
the model: forces ``query_mode='remote'`` and ``profile_after_sync=False``
|
|
(per Decision 7 in #108) so a caller can't accidentally enqueue a
|
|
parquet profiling pass for a remote view that has no local file.
|
|
|
|
Raises HTTPException(422) for missing required fields and
|
|
HTTPException(400) for unsafe identifiers / bogus project_id.
|
|
"""
|
|
if req.query_mode == "materialized":
|
|
# Materialized BQ rows: the SQL body replaces dataset+table refs.
|
|
# source_query may be empty if admin supplied bucket+source_table —
|
|
# in that case the server generates a full-table-dump SQL below.
|
|
raw_name = req.name or ""
|
|
if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
f"bigquery: view name {raw_name!r} is unsafe — must match "
|
|
f"^[a-zA-Z_][a-zA-Z0-9_]{{0,63}}$ (DuckDB identifier rules) "
|
|
"with no leading/trailing whitespace"
|
|
),
|
|
)
|
|
from app.instance_config import get_value
|
|
project_id = get_value("data_source", "bigquery", "project", default="") or ""
|
|
if not project_id:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
"bigquery: data_source.bigquery.project is not set in "
|
|
"instance.yaml; configure it via /admin/server-config or "
|
|
"/api/admin/configure first"
|
|
),
|
|
)
|
|
if not _is_safe_project_id(project_id):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
f"bigquery: data_source.bigquery.project {project_id!r} "
|
|
"is malformed — must match GCP project_id grammar "
|
|
"^[a-z][a-z0-9-]{4,28}[a-z0-9]$"
|
|
),
|
|
)
|
|
|
|
if not (req.source_query and req.source_query.strip()):
|
|
# Server-generate from bucket+source_table. Trivial full-table
|
|
# dump path; admin only sets dataset+table and the server
|
|
# builds BQ-native SQL from instance.yaml's configured project.
|
|
if not (req.bucket and req.source_table):
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=(
|
|
"bigquery materialized requires either source_query "
|
|
"(custom SQL) or bucket+source_table (server-generates "
|
|
"the full-table-dump SQL)"
|
|
),
|
|
)
|
|
req.source_query = _generate_materialized_source_query(
|
|
req.bucket, req.source_table, project_id,
|
|
)
|
|
|
|
# Phase C: profile_after_sync is now inert (Pydantic field marked
|
|
# deprecated; not read by app/api/sync.py:410-438). The runtime
|
|
# profiles every synced table unconditionally, so we no longer
|
|
# force-set this here as a "signal."
|
|
return
|
|
|
|
if not req.bucket or not req.bucket.strip():
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="bigquery: 'bucket' (BQ dataset) is required",
|
|
)
|
|
if not req.source_table or not req.source_table.strip():
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="bigquery: 'source_table' is required",
|
|
)
|
|
# No wildcard / sharded BQ tables in M1 (Decision 8).
|
|
if "*" in (req.source_table or "") or "*" in (req.bucket or ""):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="bigquery: wildcard / sharded tables are not supported (see #108 M3+)",
|
|
)
|
|
# Strict identifier on the DuckDB view name. CRITICAL: validate the RAW
|
|
# name (the value that ``register_table`` actually persists to
|
|
# ``table_registry.name`` and which the BQ extractor reads back as the
|
|
# DuckDB view name at next rebuild). Earlier revisions normalized first
|
|
# (``strip().lower().replace(" ", "_")``) and then checked, which let
|
|
# names like ``"my table"`` pass here, get stored verbatim, and then
|
|
# blow up inside ``_init_extract`` at view-create time — defeating the
|
|
# whole point of fast-fail-at-register. We do NOT silently rewrite the
|
|
# operator's name; if they typed ``"my table"``, return 400 with a
|
|
# clear message and let them retype with a corrected name.
|
|
raw_name = req.name or ""
|
|
if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
f"bigquery: view name {raw_name!r} is unsafe — must match "
|
|
f"^[a-zA-Z_][a-zA-Z0-9_]{{0,63}}$ (DuckDB identifier rules) "
|
|
"with no leading/trailing whitespace"
|
|
),
|
|
)
|
|
# Same fast-fail rule as ``raw_name`` above: validate the RAW value the
|
|
# caller sent, not a stripped form. ``register_table`` persists ``bucket``
|
|
# / ``source_table`` verbatim, and the BQ extractor splices them straight
|
|
# into the ``ATTACH … AS bq_<bucket>`` and view DDL at next rebuild — so a
|
|
# value with leading/trailing whitespace passes validation here, gets
|
|
# stored as-is, and explodes inside DuckDB at view-create time. Surface
|
|
# the offending raw value in the 400 detail and let the operator retype.
|
|
raw_bucket = req.bucket
|
|
if raw_bucket.strip() != raw_bucket or not _is_safe_quoted_identifier(raw_bucket):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
f"bigquery: dataset {raw_bucket!r} is unsafe (only [A-Za-z0-9_.-] "
|
|
"allowed, no leading/trailing whitespace)"
|
|
),
|
|
)
|
|
raw_source_table = req.source_table
|
|
if raw_source_table.strip() != raw_source_table or not _is_safe_quoted_identifier(raw_source_table):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
f"bigquery: source_table {raw_source_table!r} is unsafe (only "
|
|
"[A-Za-z0-9_.-] allowed, no leading/trailing whitespace)"
|
|
),
|
|
)
|
|
# Pull project from instance.yaml — single-project model in M1
|
|
# (Decision: no per-table project field). Validate the format here so
|
|
# we surface a config issue at registration rather than at first
|
|
# rebuild, where the operator no longer has a request to look at.
|
|
from app.instance_config import get_value
|
|
project_id = get_value("data_source", "bigquery", "project", default="")
|
|
if not project_id:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
"bigquery: data_source.bigquery.project is not set in instance.yaml; "
|
|
"configure it via /admin/server-config or /api/admin/configure first"
|
|
),
|
|
)
|
|
if not _is_safe_project_id(project_id):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
f"bigquery: data_source.bigquery.project {project_id!r} is malformed — "
|
|
"must match GCP project_id grammar ^[a-z][a-z0-9-]{4,28}[a-z0-9]$"
|
|
),
|
|
)
|
|
# Force the BQ-required mode (Decision 7). The orchestrator and
|
|
# extractor both assume remote; persisting `local` here would later create
|
|
# a profiling job against a non-existent parquet file.
|
|
# Phase C: profile_after_sync is now inert (deprecated, not read by the
|
|
# runtime); no longer force-set here.
|
|
req.query_mode = "remote"
|
|
|
|
|
|
# Source types that don't depend on a `data_source.<name>.*` block — they
|
|
# get their data through a different ingestion path (e.g. Jira via
|
|
# webhooks). Registrations against these types are allowed regardless of
|
|
# the configured primary `data_source.type`.
|
|
_SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE: frozenset[str] = frozenset({
|
|
"jira",
|
|
"local",
|
|
})
|
|
|
|
|
|
def _validate_source_type_configured(source_type: Optional[str]) -> None:
|
|
"""Refuse register-table requests whose ``source_type`` isn't actually
|
|
configured on this instance.
|
|
|
|
Pre-fix the route happily persisted e.g. ``source_type='keboola'`` on a
|
|
BQ-only instance — the row landed in the registry but the scheduler had
|
|
no Keboola URL/token to ATTACH against, so it silently never synced.
|
|
No upfront error, no operator-visible signal until they noticed the
|
|
table was missing from `da catalog`.
|
|
|
|
A source_type is considered configured when:
|
|
|
|
- it matches the instance's primary ``data_source.type``, OR
|
|
- a non-empty ``data_source.<source_type>`` block exists in the
|
|
effective `instance.yaml` (multi-source instances), OR
|
|
- it's in the small allowlist of types that don't sit under
|
|
`data_source.*` at all (Jira, local — see
|
|
``_SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE``).
|
|
|
|
Special case: when the configured primary is ``'local'`` (the default
|
|
when an instance is freshly bootstrapped and no `data_source.type` has
|
|
been set yet), the validator stays permissive — refusing registrations
|
|
here would block the first-time-setup workflow where the operator
|
|
registers a few tables against a not-yet-fully-configured instance.
|
|
The misconfiguration that this validator targets is the *explicit
|
|
mismatch*: `type=bigquery` instance + `source_type=keboola` payload
|
|
with no `data_source.keboola.*` block. That case still 422s.
|
|
|
|
A bare/None source_type is tolerated for backward compat with legacy
|
|
CLI scripts; the route resolves it later against
|
|
``get_data_source_type()``.
|
|
"""
|
|
if not source_type:
|
|
return
|
|
if source_type in _SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE:
|
|
return
|
|
|
|
from app.instance_config import get_data_source_type, get_value
|
|
|
|
configured_primary = get_data_source_type()
|
|
if source_type == configured_primary:
|
|
return
|
|
|
|
# Multi-source: accept if a non-empty `data_source.<source_type>` block
|
|
# exists. Empty dict / None / "" all count as "not configured".
|
|
secondary_block = get_value("data_source", source_type, default=None)
|
|
if secondary_block:
|
|
# Truthy non-empty dict / mapping / scalar — treat as configured.
|
|
return
|
|
|
|
# Bootstrap-friendliness: a primary of 'local' means the instance hasn't
|
|
# been pointed at a real source yet (or has been deliberately set to
|
|
# local-only). Don't gate registrations in that state — the operator is
|
|
# likely in the middle of first-time setup and will fill in the config
|
|
# next. The check still fires when primary is an actual source type
|
|
# (bigquery / keboola) and the requested source_type doesn't match
|
|
# AND has no secondary block.
|
|
if configured_primary == "local":
|
|
return
|
|
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=(
|
|
f"source_type={source_type!r} is not configured on this instance. "
|
|
f"The configured data source is {configured_primary!r}. To enable "
|
|
f"a secondary source, set data_source.{source_type}.* fields in "
|
|
"instance.yaml or via /admin/server-config."
|
|
),
|
|
)
|
|
|
|
|
|
class UpdateTableRequest(BaseModel):
|
|
name: Optional[str] = None
|
|
sync_strategy: Optional[str] = Field(
|
|
default=None,
|
|
deprecated=True,
|
|
description=(
|
|
"DEPRECATED: catalog/profiler metadata only. See "
|
|
"RegisterTableRequest.sync_strategy."
|
|
),
|
|
)
|
|
primary_key: Optional[List[str]] = None
|
|
description: Optional[str] = None
|
|
source_type: Optional[str] = None
|
|
bucket: Optional[str] = None
|
|
source_table: Optional[str] = None
|
|
source_query: Optional[str] = None
|
|
query_mode: Optional[str] = None
|
|
sync_schedule: Optional[str] = None
|
|
profile_after_sync: Optional[bool] = Field(
|
|
default=None,
|
|
deprecated=True,
|
|
description=(
|
|
"DEPRECATED: not consumed by the runtime. See "
|
|
"RegisterTableRequest.profile_after_sync."
|
|
),
|
|
)
|
|
|
|
@model_validator(mode="after")
|
|
def _check_mode_query_coherence(self):
|
|
"""PUT semantics — only the fields explicitly in the body are
|
|
validated. The body is overlaid on the existing row at the handler
|
|
level (see ``update_table``), so omitted fields keep their stored
|
|
values and the synthetic ``RegisterTableRequest`` constructed against
|
|
the merged record runs the strict cross-field check before persist.
|
|
|
|
The only invariants enforceable from the PUT body alone:
|
|
|
|
- explicit ``source_query='SELECT ...'`` paired with ``query_mode``
|
|
that isn't materialized → coherent reject (the SQL would be dead);
|
|
- explicit ``source_query='SELECT ...'`` without any ``query_mode``
|
|
in the body → reject; the operator must commit to materialized;
|
|
- explicit empty/whitespace ``source_query=''`` paired with
|
|
``query_mode='materialized'`` → reject (operator clearly
|
|
mistyped — they sent the field).
|
|
|
|
Pre-fix this validator also rejected ``{"query_mode": "materialized",
|
|
"sync_schedule": "every 12h"}`` because ``source_query`` was None
|
|
— but that's the canonical "edit the schedule on a materialized
|
|
row" use-case from the Edit modal, which always sends
|
|
``query_mode`` to indicate intent. Devin BUG_0002 on PR #148
|
|
commit 2219255.
|
|
"""
|
|
if self.query_mode is None and self.source_query is None:
|
|
return self
|
|
|
|
sq_raw = self.source_query
|
|
sq = (sq_raw or "").strip() or None
|
|
|
|
# Operator explicitly sent source_query as empty/whitespace while
|
|
# claiming materialized — typo / bad form data, reject.
|
|
if (
|
|
self.query_mode == "materialized"
|
|
and sq_raw is not None
|
|
and not sq
|
|
):
|
|
raise ValueError(
|
|
"query_mode='materialized' requires a non-empty source_query"
|
|
)
|
|
|
|
# source_query only makes sense with materialized mode. Allow None
|
|
# (omitted) to flow through; only reject when explicitly set with
|
|
# the wrong mode.
|
|
if (
|
|
self.query_mode is not None
|
|
and self.query_mode != "materialized"
|
|
and sq
|
|
):
|
|
raise ValueError(
|
|
"source_query is only valid when query_mode='materialized'"
|
|
)
|
|
if self.query_mode is None and sq:
|
|
raise ValueError(
|
|
"source_query requires query_mode='materialized' to be set "
|
|
"in the same request"
|
|
)
|
|
|
|
# Normalise: drop whitespace-only strings to None so the persisted
|
|
# column is clean. Don't touch when source_query was None to begin
|
|
# with — that signals "PUT didn't touch this field, keep existing".
|
|
if sq_raw is not None:
|
|
self.source_query = sq
|
|
return self
|
|
|
|
@field_validator("primary_key", mode="before")
|
|
@classmethod
|
|
def _coerce_primary_key(cls, v):
|
|
return _normalize_primary_key(v)
|
|
|
|
# Duplicated from RegisterTableRequest — Pydantic v2 validators don't
|
|
# inherit cleanly across unrelated BaseModel classes; a shared mixin
|
|
# would be overkill for two fields.
|
|
@field_validator("sync_schedule", mode="before")
|
|
@classmethod
|
|
def _validate_sync_schedule(cls, v):
|
|
# None / "" → no schedule, accepted.
|
|
# Any non-empty string (including pure whitespace) must parse as a
|
|
# valid schedule — otherwise it would be persisted and silently
|
|
# ignored by the runtime evaluator.
|
|
if v in (None, ""):
|
|
return v
|
|
if not is_valid_schedule(v):
|
|
raise ValueError(
|
|
f"sync_schedule must be 'every Nm' / 'every Nh' / "
|
|
f"'daily HH:MM[,HH:MM,...]', got {v!r}"
|
|
)
|
|
return v
|
|
|
|
|
|
class ConfigureRequest(BaseModel):
|
|
data_source: str # "keboola" | "bigquery" | "local"
|
|
keboola_token: Optional[str] = None
|
|
keboola_url: Optional[str] = None
|
|
bigquery_project: Optional[str] = None
|
|
bigquery_location: Optional[str] = None
|
|
instance_name: Optional[str] = None
|
|
allowed_domain: Optional[str] = None
|
|
|
|
|
|
@router.get("/discover-tables")
|
|
async def discover_tables(
|
|
user: dict = Depends(require_admin),
|
|
dataset: Optional[str] = None,
|
|
):
|
|
"""Discover available tables from the configured data source.
|
|
|
|
For ``data_source.type='keboola'`` returns the full Storage API table
|
|
list (single round-trip). For ``data_source.type='bigquery'``:
|
|
|
|
- Without ``dataset``: list datasets in the configured project.
|
|
- With ``dataset=name``: list tables (BASE TABLE + VIEW) in that dataset.
|
|
|
|
Two-step shape avoids paying the per-dataset list_tables cost up-front
|
|
on projects with hundreds of datasets — the UI populates the dataset
|
|
dropdown first, then fetches tables only for the selected dataset.
|
|
"""
|
|
try:
|
|
from app.instance_config import get_data_source_type
|
|
source_type = get_data_source_type()
|
|
|
|
if source_type == "keboola":
|
|
from connectors.keboola.client import KeboolaClient
|
|
from app.instance_config import get_value
|
|
url = get_value("data_source", "keboola", "stack_url", default="")
|
|
token_env = get_value("data_source", "keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN")
|
|
token = os.environ.get(token_env, "") if token_env else ""
|
|
if not token:
|
|
token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
|
|
client = KeboolaClient(token=token, url=url)
|
|
tables = client.discover_all_tables()
|
|
return {"tables": tables, "count": len(tables), "source": "keboola"}
|
|
|
|
if source_type == "bigquery":
|
|
return _discover_bigquery(dataset=dataset)
|
|
|
|
return {
|
|
"tables": [],
|
|
"count": 0,
|
|
"source": source_type,
|
|
"error": f"Discovery not implemented for source_type={source_type!r}",
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Discovery failed: {e}")
|
|
|
|
|
|
def _discover_bigquery(dataset: Optional[str]) -> Dict[str, Any]:
|
|
"""List BQ datasets (when ``dataset`` is None) or tables-in-dataset.
|
|
|
|
Routes through ``BqAccess.client()`` so config / auth / error
|
|
translation matches the rest of the BQ surface (#138 facade). Returns
|
|
the same shape as the Keboola path so the UI doesn't have to branch.
|
|
"""
|
|
from connectors.bigquery.access import (
|
|
get_bq_access,
|
|
BqAccessError,
|
|
translate_bq_error,
|
|
)
|
|
|
|
try:
|
|
bq = get_bq_access()
|
|
client = bq.client()
|
|
except BqAccessError as e:
|
|
raise HTTPException(
|
|
status_code=BqAccessError.HTTP_STATUS.get(e.kind, 500),
|
|
detail={"error": e.message, "kind": e.kind, "details": e.details},
|
|
)
|
|
|
|
try:
|
|
if dataset is None:
|
|
datasets = []
|
|
for ds in client.list_datasets():
|
|
datasets.append({
|
|
"dataset_id": ds.dataset_id,
|
|
"full_id": f"{ds.project}.{ds.dataset_id}",
|
|
})
|
|
return {
|
|
"datasets": sorted(datasets, key=lambda d: d["dataset_id"]),
|
|
"count": len(datasets),
|
|
"source": "bigquery",
|
|
}
|
|
|
|
# List tables in the named dataset. `list_tables` returns
|
|
# `TableListItem` with `table_id` + `table_type` ('TABLE', 'VIEW',
|
|
# 'MATERIALIZED_VIEW', 'EXTERNAL', 'SNAPSHOT'). UI maps TABLE → Type
|
|
# selector "table" and VIEW/MATERIALIZED_VIEW → "view"; the rest are
|
|
# passed through with their raw type so the operator can decide.
|
|
tables = []
|
|
for t in client.list_tables(dataset):
|
|
tables.append({
|
|
"table_id": t.table_id,
|
|
"table_type": t.table_type,
|
|
"full_id": f"{t.project}.{t.dataset_id}.{t.table_id}",
|
|
})
|
|
return {
|
|
"tables": sorted(tables, key=lambda t: t["table_id"]),
|
|
"count": len(tables),
|
|
"source": "bigquery",
|
|
"dataset": dataset,
|
|
}
|
|
except Exception as e:
|
|
# `translate_bq_error` re-raises non-Google exceptions unchanged,
|
|
# so wrap in HTTPException to keep the JSON-shape contract.
|
|
try:
|
|
err = translate_bq_error(e, bq.projects, bad_request_status="upstream_error")
|
|
except Exception:
|
|
raise HTTPException(status_code=502, detail=f"BQ discovery failed: {e}")
|
|
raise HTTPException(
|
|
status_code=BqAccessError.HTTP_STATUS.get(err.kind, 502),
|
|
detail={"error": err.message, "kind": err.kind, "details": err.details},
|
|
)
|
|
|
|
|
|
@router.get("/registry")
|
|
async def list_registry(
|
|
user: dict = Depends(require_admin),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Get full table registry.
|
|
|
|
Each table row is enriched with `last_sync_error` from sync_state so
|
|
operators can see WHY a row isn't materializing without trawling
|
|
scheduler logs. None for rows that have never errored or have already
|
|
recovered (status='ok'); the per-row error message string otherwise.
|
|
"""
|
|
repo = TableRegistryRepository(conn)
|
|
tables = repo.list_all()
|
|
|
|
# Single batched read of sync_state errors — avoid N+1 GETs against
|
|
# `sync_state` for large registries. The sync_state row is keyed on
|
|
# `table_id` which mirrors `table_registry.name` (see comment in
|
|
# _run_materialized_pass / _build_manifest_for_user about name vs id).
|
|
error_by_name: Dict[str, Optional[str]] = {}
|
|
try:
|
|
rows = conn.execute(
|
|
"SELECT table_id, error FROM sync_state "
|
|
"WHERE status = 'error' AND error IS NOT NULL AND error <> ''"
|
|
).fetchall()
|
|
error_by_name = {r[0]: r[1] for r in rows}
|
|
except Exception:
|
|
# Defensive: if sync_state is unreadable for any reason, the
|
|
# registry response still serializes — operators just lose the
|
|
# last_sync_error column on this call.
|
|
logger.exception("Failed to read sync_state errors for registry")
|
|
|
|
for t in tables:
|
|
# Sync_state.table_id == table_registry.name by convention.
|
|
t["last_sync_error"] = error_by_name.get(t.get("name"))
|
|
|
|
return {"tables": tables, "count": len(tables)}
|
|
|
|
|
|
# Wall-clock budget for the synchronous BQ materialization that runs after
|
|
# a successful BQ register. If the rebuild + view creation exceeds this,
|
|
# we hand the rest off to BackgroundTasks and return 202. 5s matches the
|
|
# UX contract in #108 ("Queryable as <view> within seconds") — long enough
|
|
# to cover a healthy GCE round-trip, short enough that a hung GCE call
|
|
# doesn't park the request handler.
|
|
_BQ_SYNC_REGISTER_TIMEOUT_S: float = 5.0
|
|
|
|
|
|
def _materialize_bigquery_extract() -> Dict[str, Any]:
|
|
"""Re-build the BigQuery extract.duckdb + master views.
|
|
|
|
Wrapper used by both the synchronous (in-band) and async (BackgroundTask)
|
|
code paths after a BQ register/update/delete. Imports kept inside the
|
|
function so non-BQ instances don't pay the import cost on app start.
|
|
|
|
Opens a FRESH system DB connection rather than reusing the request-scoped
|
|
one. The request handler closes its connection in a `finally` after the
|
|
response, but BackgroundTask + the timeout-fallback daemon thread can
|
|
both outlive that close — they would then operate on a closed handle (or
|
|
one being torn down concurrently). A fresh handle is cheap (DuckDB is an
|
|
embedded engine) and isolates the worker's lifetime from the request's.
|
|
|
|
Returns the rebuild result dict (``{"errors": [...], "tables_registered":
|
|
N, ...}``) so the synchronous caller can propagate failures to the
|
|
operator. Background-task callers ignore the return value, but the loud
|
|
log inside ``_run_bigquery_materialize_with_timeout`` covers that path.
|
|
"""
|
|
from connectors.bigquery import extractor as _bq_extractor
|
|
from src.db import get_system_db
|
|
from src.orchestrator import SyncOrchestrator
|
|
|
|
fresh_conn = get_system_db()
|
|
try:
|
|
result = _bq_extractor.rebuild_from_registry(conn=fresh_conn)
|
|
SyncOrchestrator().rebuild()
|
|
return result or {}
|
|
finally:
|
|
try:
|
|
fresh_conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _materialize_bigquery_extract_bg() -> None:
|
|
"""BackgroundTask wrapper around `_materialize_bigquery_extract`.
|
|
|
|
BackgroundTasks discard return values, but `rebuild_from_registry` can
|
|
surface auth / config / identifier errors via the ``errors`` list. Log
|
|
those at ERROR level so the failure is loud in the operator's logs even
|
|
though the 202 response can't carry the detail (Decision 3 in #108: a
|
|
202 is documented as "accepted, may not be queryable yet" — we don't
|
|
block on it but we shouldn't swallow it either).
|
|
"""
|
|
try:
|
|
result = _materialize_bigquery_extract()
|
|
except Exception:
|
|
logger.exception("BQ post-register background materialize crashed")
|
|
return
|
|
errors = (result or {}).get("errors") or []
|
|
if errors:
|
|
logger.error(
|
|
"BQ post-register background materialize completed with %d error(s): %s",
|
|
len(errors), errors,
|
|
)
|
|
|
|
|
|
def _run_bigquery_materialize_with_timeout(
|
|
background: BackgroundTasks,
|
|
) -> Dict[str, Any]:
|
|
"""Try to materialize synchronously within the wall-clock budget.
|
|
|
|
Returns a dict with:
|
|
- ``status`` ∈ {"ok", "errors", "timeout"} — caller maps to HTTP code
|
|
- ``errors``: list of {table, error} surfaced by ``rebuild_from_registry``
|
|
(only present on ``status="errors"``)
|
|
|
|
Mapping by caller (`register_table`):
|
|
- "ok" → 200 (synchronous success)
|
|
- "errors" → 500 (rebuild ran but reported errors — propagate so
|
|
the operator knows the registry row exists but the
|
|
view wasn't created)
|
|
- "timeout" → 202 (rebuild still running on a BackgroundTask)
|
|
|
|
The synchronous worker runs on a daemon thread (so a hung GCE call
|
|
can't park the request) that opens its OWN system DB connection (see
|
|
`_materialize_bigquery_extract`). Even though FastAPI now invokes the
|
|
sync route in a threadpool — and `done.wait()` no longer blocks the
|
|
event loop — we still off-load to a daemon so the wait is bounded
|
|
even if `rebuild_from_registry` ignores its own timeouts.
|
|
"""
|
|
import threading
|
|
|
|
done = threading.Event()
|
|
err_holder: Dict[str, Any] = {}
|
|
result_holder: Dict[str, Any] = {}
|
|
|
|
def _worker():
|
|
try:
|
|
result_holder["result"] = _materialize_bigquery_extract()
|
|
except Exception as e: # pragma: no cover — logged below
|
|
err_holder["error"] = e
|
|
finally:
|
|
done.set()
|
|
|
|
t = threading.Thread(target=_worker, daemon=True, name="bq-register-rebuild")
|
|
t.start()
|
|
finished = done.wait(_BQ_SYNC_REGISTER_TIMEOUT_S)
|
|
|
|
if finished:
|
|
if "error" in err_holder:
|
|
# Worker finished within the wall-clock budget but raised. This
|
|
# is a HARD ERROR, not a timeout — surface it as such so the
|
|
# operator gets the actual exception in the 500 body instead
|
|
# of a misleading 202 + "still working in the background".
|
|
# Earlier revisions returned ``{"status": "timeout"}`` here,
|
|
# which the register handler then mapped to 202 + a retry
|
|
# BackgroundTask; that hid the real failure for `_BQ_SYNC_
|
|
# REGISTER_TIMEOUT_S` seconds before the BG retry surfaced
|
|
# the same exception in the logs.
|
|
exc = err_holder["error"]
|
|
logger.error(
|
|
"BQ post-register rebuild raised within budget: %r",
|
|
exc,
|
|
)
|
|
return {
|
|
"status": "errors",
|
|
"errors": [{"error": f"{type(exc).__name__}: {exc}"}],
|
|
}
|
|
# Synchronous worker finished cleanly — but check whether
|
|
# `rebuild_from_registry` itself surfaced any errors (auth fail,
|
|
# missing project from the overlay, unsafe identifier slipping the
|
|
# validator, etc.). Without this, those errors got silently logged
|
|
# and the API claimed success.
|
|
result = result_holder.get("result") or {}
|
|
errors = result.get("errors") or []
|
|
if errors:
|
|
logger.error(
|
|
"BQ post-register rebuild reported %d error(s): %s",
|
|
len(errors), errors,
|
|
)
|
|
return {"status": "errors", "errors": errors}
|
|
return {"status": "ok"}
|
|
|
|
# Timed out — let the worker keep running on its thread (already daemon)
|
|
# and also schedule a BackgroundTask so the orchestrator gets called via
|
|
# the supported FastAPI path. `_INIT_EXTRACT_LOCK` in the BQ extractor
|
|
# serializes the two file-swap calls so the slow daemon thread and the
|
|
# background task can't tear `extract.duckdb`; the orchestrator's own
|
|
# `_rebuild_lock` protects the master-view rebuild step downstream.
|
|
logger.info(
|
|
"BQ post-register rebuild exceeded %ss budget — handing off to BackgroundTask",
|
|
_BQ_SYNC_REGISTER_TIMEOUT_S,
|
|
)
|
|
background.add_task(_materialize_bigquery_extract_bg)
|
|
return {"status": "timeout"}
|
|
|
|
|
|
@router.post(
|
|
"/register-table",
|
|
responses={
|
|
200: {"description": "BigQuery row registered + materialized synchronously"},
|
|
201: {"description": "Non-BigQuery row registered (no post-insert materialize)"},
|
|
202: {"description": "BigQuery row registered; materialize continues in background"},
|
|
409: {"description": "Table id or view name already in use"},
|
|
500: {"description": "BigQuery row registered but post-insert rebuild failed"},
|
|
},
|
|
)
|
|
def register_table(
|
|
request: RegisterTableRequest,
|
|
background: BackgroundTasks,
|
|
user: dict = Depends(require_admin),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Register a new table in the system.
|
|
|
|
Behavior by source_type:
|
|
- **bigquery**: validates BQ-specific shape (dataset / source_table /
|
|
identifier safety / project_id format), forces query_mode='remote' and
|
|
profile_after_sync=False, then synchronously rebuilds extract.duckdb +
|
|
master views with a wall-clock budget. Returns 200 with the view name
|
|
on success, 202 on budget overrun (rebuild continues in a
|
|
BackgroundTask), or 500 if the synchronous rebuild ran but reported
|
|
an error (e.g. auth failure, missing project, unsafe identifier).
|
|
- other source types: insert-only, no post-register hook. Returns 201.
|
|
|
|
Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a
|
|
threadpool — the synchronous-materialize path waits on
|
|
``threading.Event.wait()``, which would otherwise block the asyncio
|
|
event loop and stall every other request for up to ``_BQ_SYNC_REGISTER_
|
|
TIMEOUT_S``. ``Depends(...)``, ``BackgroundTasks``, and
|
|
``JSONResponse`` all work the same in sync handlers; the rest of the
|
|
admin module mixes both styles already.
|
|
|
|
The route does NOT carry a default ``status_code`` — each branch returns
|
|
its own JSONResponse with the right code. A blanket ``status_code=201``
|
|
on the decorator would mislead OpenAPI consumers about the BQ branch.
|
|
|
|
Always: 409 on view-name collision against the existing registry, audit
|
|
log entry on success.
|
|
"""
|
|
from fastapi.responses import JSONResponse
|
|
if not request.name or not request.name.strip():
|
|
raise HTTPException(status_code=422, detail="Table name cannot be empty")
|
|
repo = TableRegistryRepository(conn)
|
|
table_id = request.name.strip().lower().replace(" ", "_")
|
|
|
|
if repo.get(table_id):
|
|
raise HTTPException(status_code=409, detail=f"Table '{table_id}' already registered")
|
|
|
|
# View-name collision pre-check — distinct from id collision above.
|
|
# `id` is derived from `name`, but two callers could legally pick
|
|
# different display names that lower-case + slugify to the same view
|
|
# (e.g. "Orders v2" + "orders_v2"); the strict view-name uniqueness
|
|
# check stops that here, before the orchestrator surfaces it as a
|
|
# silent overwrite at next rebuild.
|
|
existing_by_name = next(
|
|
(r for r in repo.list_all() if (r.get("name") or "") == request.name),
|
|
None,
|
|
)
|
|
if existing_by_name is not None:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=f"View name '{request.name}' is already in use by table id '{existing_by_name.get('id')}'",
|
|
)
|
|
|
|
# Refuse rows whose source_type isn't actually configured — pre-fix the
|
|
# row landed in the registry but never synced because there was no
|
|
# Keboola URL/token (or BQ project) to ATTACH against. Surfaces the
|
|
# misconfig at registration time so the operator sees the gap before
|
|
# they wonder why `da catalog` is missing the table.
|
|
_validate_source_type_configured(request.source_type)
|
|
|
|
# BQ rows go through the extra validation + post-insert materialization
|
|
# contract from issue #108. Other source types keep the legacy insert-only
|
|
# flow — Keboola materialization happens via the scheduled sync, Jira via
|
|
# webhook, local via a manual extractor run.
|
|
is_bigquery = request.source_type == "bigquery"
|
|
if is_bigquery:
|
|
_validate_bigquery_register_payload(request)
|
|
|
|
# Phase C: profile_after_sync is no longer passed — the field is
|
|
# deprecated and inert at the runtime layer. The DB column keeps its
|
|
# schema default; the registry response no longer reflects request
|
|
# values for this flag.
|
|
repo.register(
|
|
id=table_id,
|
|
name=request.name,
|
|
folder=request.folder,
|
|
sync_strategy=request.sync_strategy,
|
|
primary_key=request.primary_key,
|
|
description=request.description,
|
|
registered_by=user.get("email"),
|
|
source_type=request.source_type,
|
|
bucket=request.bucket,
|
|
source_table=request.source_table,
|
|
source_query=request.source_query,
|
|
query_mode=request.query_mode,
|
|
sync_schedule=request.sync_schedule,
|
|
)
|
|
|
|
# Audit entry — masked params; description kept raw (it's documentation).
|
|
AuditRepository(conn).log(
|
|
user_id=user.get("id"),
|
|
action="register_table",
|
|
resource=table_id,
|
|
params=_sanitize_for_audit(request.model_dump()),
|
|
)
|
|
|
|
if not is_bigquery:
|
|
# Keboola / Jira / local rows are insert-only here. 201 Created — the
|
|
# decorator no longer carries a default status, so each branch is
|
|
# explicit about its code (BQ branch overrides via JSONResponse).
|
|
return JSONResponse(
|
|
status_code=201,
|
|
content={"id": table_id, "name": request.name, "status": "registered"},
|
|
)
|
|
|
|
if request.query_mode == "materialized":
|
|
# Materialized BQ rows are picked up by the trigger pass on the next
|
|
# scheduled tick (or via POST /api/sync/trigger). No synchronous
|
|
# rebuild — the COPY can scan multi-GB and would block the request.
|
|
return JSONResponse(
|
|
status_code=201,
|
|
content={
|
|
"id": table_id,
|
|
"name": request.name,
|
|
"status": "registered",
|
|
"view_name": table_id,
|
|
"message": (
|
|
"Materialized — parquet will be written on the next sync "
|
|
"tick. Trigger now via POST /api/sync/trigger."
|
|
),
|
|
},
|
|
)
|
|
|
|
# BQ post-register: rebuild extract + master views, with timeout fallback.
|
|
# Decision 1: 200 on synchronous success, 202 on timeout, 500 if the
|
|
# synchronous rebuild surfaced errors. Distinct from the 201 Keboola
|
|
# path above, so the BQ branch builds its own response.
|
|
outcome = _run_bigquery_materialize_with_timeout(background)
|
|
status = outcome.get("status")
|
|
if status == "ok":
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content={
|
|
"id": table_id,
|
|
"name": request.name,
|
|
"status": "ok",
|
|
"view_name": table_id,
|
|
},
|
|
)
|
|
if status == "errors":
|
|
# Registry insert succeeded but the post-insert rebuild reported
|
|
# errors — the row is in the registry but the master view was NOT
|
|
# created. Surface the failure verbatim so the operator can fix
|
|
# the underlying config (typically a missing
|
|
# `data_source.bigquery.project` in the overlay or auth that lacks
|
|
# bigquery.metadata.get on the dataset). The row stays in the
|
|
# registry; a re-run after fixing the config picks up the existing
|
|
# row and creates the view on the next register/update or
|
|
# scheduler tick.
|
|
return JSONResponse(
|
|
status_code=500,
|
|
content={
|
|
"id": table_id,
|
|
"name": request.name,
|
|
"status": "rebuild_failed",
|
|
"view_name": table_id,
|
|
"errors": outcome.get("errors") or [],
|
|
"message": (
|
|
"Registry row created but post-insert rebuild failed; "
|
|
"view is not queryable. See `errors` for details."
|
|
),
|
|
},
|
|
)
|
|
# Default: timeout — rebuild continues on a BackgroundTask.
|
|
return JSONResponse(
|
|
status_code=202,
|
|
content={
|
|
"id": table_id,
|
|
"name": request.name,
|
|
"status": "accepted",
|
|
"view_name": table_id,
|
|
"message": "Registration accepted; materializing in background",
|
|
},
|
|
)
|
|
|
|
|
|
class PrecheckResponse(BaseModel):
|
|
"""Response model for /api/admin/register-table/precheck.
|
|
|
|
Documented here so OpenAPI consumers know what to expect; the route
|
|
returns a plain dict for backwards compatibility with the rest of the
|
|
admin API which doesn't use response_model.
|
|
"""
|
|
ok: bool
|
|
table: Dict[str, Any]
|
|
|
|
|
|
@router.post("/register-table/precheck")
|
|
def register_table_precheck(
|
|
request: RegisterTableRequest,
|
|
user: dict = Depends(require_admin),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Validate a register-table payload + (BQ only) confirm the source table exists.
|
|
|
|
No DB write. Used by the UI to surface row count + size + column count
|
|
in the modal before the operator clicks Register, and by the CLI's
|
|
``--dry-run`` to print what *would* be registered without touching
|
|
state. Identical Pydantic validation to register-table; for BQ rows we
|
|
additionally make a ``bigquery.Client(project).get_table(...)`` call
|
|
and surface the GCP error verbatim.
|
|
|
|
Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a
|
|
threadpool — the BQ branch makes synchronous ``bigquery.Client(...)``
|
|
/``client.get_table(...)`` calls, which would otherwise block the
|
|
asyncio event loop and stall every other request for the duration of
|
|
the GCE round-trip. Mirrors the same conversion done for
|
|
``register_table`` (see comment on that route). ``Depends(...)`` works
|
|
identically in sync handlers.
|
|
"""
|
|
if not request.name or not request.name.strip():
|
|
raise HTTPException(status_code=422, detail="Table name cannot be empty")
|
|
|
|
if request.source_type != "bigquery":
|
|
# M1 only adds BQ-specific precheck. Other source types get a
|
|
# validation-only response so the CLI / UI can rely on the same
|
|
# endpoint shape across types.
|
|
return {
|
|
"ok": True,
|
|
"table": {
|
|
"name": request.name,
|
|
"source_type": request.source_type,
|
|
"bucket": request.bucket,
|
|
"source_table": request.source_table,
|
|
"rows": None,
|
|
"size_bytes": None,
|
|
"columns": [],
|
|
"note": "precheck for non-bigquery sources is validation-only in M1",
|
|
},
|
|
}
|
|
|
|
# BQ-specific shape validation (forces query_mode/profile_after_sync,
|
|
# checks identifier safety, validates project_id from instance.yaml).
|
|
_validate_bigquery_register_payload(request)
|
|
|
|
# Materialized BQ rows have no `dataset.source_table` to round-trip —
|
|
# the SQL body is the contract. Skip the BQ-jobs-API call and return a
|
|
# validation-only precheck so the CLI's `--dry-run --query-mode
|
|
# materialized` path doesn't crash on an empty fully-qualified name.
|
|
if request.query_mode == "materialized":
|
|
return {
|
|
"ok": True,
|
|
"table": {
|
|
"name": request.name,
|
|
"source_type": "bigquery",
|
|
"query_mode": "materialized",
|
|
"source_query": request.source_query,
|
|
"rows": None,
|
|
"size_bytes": None,
|
|
"columns": [],
|
|
"note": (
|
|
"materialized precheck is validation-only — the SQL is "
|
|
"evaluated for cost on each scheduled materialize tick"
|
|
),
|
|
},
|
|
}
|
|
|
|
# Round-trip the BQ jobs API to confirm the table exists and the SA can
|
|
# see it. Imports kept local to avoid pulling google-cloud-bigquery into
|
|
# the import chain on non-BQ instances.
|
|
try:
|
|
from google.cloud import bigquery # noqa: PLC0415
|
|
from google.api_core import exceptions as google_exc # noqa: PLC0415
|
|
except ImportError as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=(
|
|
"google-cloud-bigquery not installed; install the bigquery "
|
|
f"extras to use BQ precheck ({e})"
|
|
),
|
|
) from e
|
|
|
|
from app.instance_config import get_value
|
|
project_id = get_value("data_source", "bigquery", "project", default="")
|
|
dataset = (request.bucket or "").strip()
|
|
source_table = (request.source_table or "").strip()
|
|
fq = f"{project_id}.{dataset}.{source_table}"
|
|
|
|
try:
|
|
client = bigquery.Client(project=project_id)
|
|
bq_table = client.get_table(fq)
|
|
except google_exc.NotFound as e:
|
|
raise HTTPException(status_code=404, detail=f"BigQuery table not found: {fq} ({e})") from e
|
|
except google_exc.Forbidden as e:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=(
|
|
f"BigQuery access denied for {fq}: {e}. "
|
|
"Service account needs bigquery.metadata.get on the dataset."
|
|
),
|
|
) from e
|
|
except Exception as e:
|
|
# Auth errors, transient 5xx, malformed table refs — surface as 400
|
|
# so the operator gets the GCP error verbatim and can fix their
|
|
# config without us guessing the right HTTP code.
|
|
raise HTTPException(status_code=400, detail=f"BigQuery precheck failed for {fq}: {e}") from e
|
|
|
|
columns = [
|
|
{"name": f.name, "type": f.field_type}
|
|
for f in (bq_table.schema or [])
|
|
]
|
|
return {
|
|
"ok": True,
|
|
"table": {
|
|
"name": request.name,
|
|
"source_type": "bigquery",
|
|
"bucket": dataset,
|
|
"source_table": source_table,
|
|
"project_id": project_id,
|
|
"rows": int(bq_table.num_rows or 0),
|
|
"size_bytes": int(bq_table.num_bytes or 0),
|
|
"columns": columns,
|
|
"column_count": len(columns),
|
|
},
|
|
}
|
|
|
|
|
|
@router.put("/registry/{table_id}")
|
|
async def update_table(
|
|
table_id: str,
|
|
request: UpdateTableRequest,
|
|
background: BackgroundTasks,
|
|
user: dict = Depends(require_admin),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Update a registered table's configuration.
|
|
|
|
For BQ rows, schedules a background rebuild so the master view picks
|
|
up changes (e.g. a renamed dataset) without waiting for the next
|
|
scheduled sync.
|
|
"""
|
|
repo = TableRegistryRepository(conn)
|
|
existing = repo.get(table_id)
|
|
if not existing:
|
|
raise HTTPException(status_code=404, detail="Table not found")
|
|
|
|
updates = {k: v for k, v in request.model_dump().items() if v is not None}
|
|
# Run BQ-shape validation BEFORE persisting whenever the merged record
|
|
# would be a bigquery row (existing was BQ, or the patch flips it to BQ,
|
|
# or the patch touches BQ-relevant fields on an already-BQ row). Without
|
|
# this gate, an admin could PUT `bucket="evil\"; DROP --"` onto a BQ
|
|
# row and the next rebuild would silently fail at view-create time —
|
|
# surface the bad shape at PUT time instead.
|
|
if updates:
|
|
# Preserve the original `registered_at` across PUTs — `repo.register`
|
|
# now accepts it as an optional kwarg; without this the upsert would
|
|
# stamp a fresh `now()` on every edit (issue #130).
|
|
merged = dict(existing)
|
|
merged.update(updates)
|
|
merged.pop("id", None) # avoid duplicate id kwarg
|
|
|
|
# When switching the merged record away from materialized mode, drop
|
|
# the stale source_query — the request validator can't clear it via
|
|
# the `if v is not None` filter above. Without this, a remote/local
|
|
# row would carry an orphan source_query in the registry.
|
|
if merged.get("query_mode") != "materialized":
|
|
merged["source_query"] = None
|
|
|
|
# Cross-source coherence: query_mode='materialized' requires a
|
|
# non-empty source_query for ALL source types, not just BigQuery.
|
|
# BQ rows without source_query can be server-generated from
|
|
# bucket+source_table (handled by _validate_bigquery_register_payload
|
|
# via the synthetic RegisterTableRequest below). Non-BQ rows (e.g.
|
|
# Keboola) still require an explicit source_query at PUT time.
|
|
if merged.get("query_mode") == "materialized":
|
|
sq = merged.get("source_query")
|
|
if not sq or not str(sq).strip():
|
|
# BQ rows: let _validate_bigquery_register_payload generate
|
|
# source_query from bucket+source_table (falls through below).
|
|
# Non-BQ rows: no server-generate fallback; raise 422.
|
|
if merged.get("source_type") != "bigquery":
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=(
|
|
"query_mode='materialized' requires a non-empty "
|
|
"source_query. To revert to a non-materialized mode, "
|
|
"PATCH query_mode='local' (Keboola) or 'remote' "
|
|
"(BigQuery) and the stale source_query is cleared "
|
|
"automatically."
|
|
),
|
|
)
|
|
# Backtick guard removed for materialized rows: the Task 2 wrapping
|
|
# path (connectors.bigquery.extractor.materialize_query) now runs
|
|
# admin SQL through the BQ jobs API using BQ-native syntax, which
|
|
# requires backticks for dashed project/dataset identifiers.
|
|
# Non-materialized rows still reject backticks in the model validator.
|
|
|
|
if merged.get("source_type") == "bigquery":
|
|
# Reuse the register-time validator. It mutates the request to
|
|
# force query_mode='remote' / profile_after_sync=False (or to
|
|
# leave a materialized row alone) — apply the same coercion to
|
|
# `merged` so the persisted row matches.
|
|
synthetic = RegisterTableRequest(
|
|
name=merged.get("name") or table_id,
|
|
bucket=merged.get("bucket"),
|
|
source_table=merged.get("source_table"),
|
|
source_query=merged.get("source_query"),
|
|
source_type="bigquery",
|
|
query_mode=merged.get("query_mode") or "remote",
|
|
profile_after_sync=bool(merged.get("profile_after_sync") or False),
|
|
primary_key=merged.get("primary_key"),
|
|
description=merged.get("description"),
|
|
folder=merged.get("folder"),
|
|
sync_strategy=merged.get("sync_strategy") or "full_refresh",
|
|
sync_schedule=merged.get("sync_schedule"),
|
|
)
|
|
_validate_bigquery_register_payload(synthetic)
|
|
merged["query_mode"] = synthetic.query_mode
|
|
merged["profile_after_sync"] = synthetic.profile_after_sync
|
|
merged["source_query"] = synthetic.source_query
|
|
|
|
repo.register(id=table_id, **merged)
|
|
|
|
AuditRepository(conn).log(
|
|
user_id=user.get("id"),
|
|
action="update_table",
|
|
resource=table_id,
|
|
params=_sanitize_for_audit({"updated_fields": sorted(updates.keys()), **updates}),
|
|
)
|
|
|
|
# If we updated a BQ row (or one that's now BQ), refresh the extract in
|
|
# the background so the view picks up renames / column-list changes.
|
|
# Use the BG wrapper so any rebuild errors are logged at ERROR level
|
|
# instead of being silently dropped by BackgroundTasks (which discards
|
|
# return values).
|
|
after = repo.get(table_id) or {}
|
|
if after.get("source_type") == "bigquery":
|
|
background.add_task(_materialize_bigquery_extract_bg)
|
|
|
|
return {"id": table_id, "updated": list(updates.keys())}
|
|
|
|
|
|
@router.delete("/registry/{table_id}", status_code=204)
|
|
async def unregister_table(
|
|
table_id: str,
|
|
background: BackgroundTasks,
|
|
user: dict = Depends(require_admin),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Unregister a table from the system.
|
|
|
|
For BQ rows, schedules a background rebuild so the dropped row's
|
|
master view is removed from analytics.duckdb (rather than hanging
|
|
around until the next scheduled sync).
|
|
|
|
For materialized rows, also removes the canonical parquet at
|
|
`${DATA_DIR}/extracts/<source_type>/data/<id>.parquet` and clears
|
|
the matching `sync_state` row. Without these two cleanups, the
|
|
manifest endpoint kept advertising the dropped table to `da sync`
|
|
(sync_state-driven) and the orchestrator's next rebuild could
|
|
resurrect a master view from the leftover parquet (E2E sub-agent
|
|
finding 2026-05-01).
|
|
"""
|
|
repo = TableRegistryRepository(conn)
|
|
existing = repo.get(table_id)
|
|
if not existing:
|
|
raise HTTPException(status_code=404, detail="Table not found")
|
|
|
|
was_bigquery = existing.get("source_type") == "bigquery"
|
|
was_materialized = existing.get("query_mode") == "materialized"
|
|
source_type = existing.get("source_type") or ""
|
|
name = existing.get("name") or table_id
|
|
|
|
repo.unregister(table_id)
|
|
|
|
# Drop the canonical parquet for materialized rows. Path layout:
|
|
# `${DATA_DIR}/extracts/<source_type>/data/<name>.parquet` — the
|
|
# filename is keyed by `table_registry.name` (matches sync_state
|
|
# bookkeeping convention; see _run_materialized_pass + the manifest
|
|
# builder for the same name-keyed lookup). Defensively remove the
|
|
# `.parquet.tmp` sibling too in case a prior materialize crashed
|
|
# mid-COPY. Failure to remove (file missing, permission error) is
|
|
# logged but doesn't fail the DELETE — the registry row is already
|
|
# gone, and the orphan parquet will not produce a master view at
|
|
# next rebuild because the orchestrator's _meta-driven scan never
|
|
# picks up bare parquet files.
|
|
if was_materialized and source_type in ("bigquery", "keboola"):
|
|
try:
|
|
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
|
|
base = data_dir / "extracts" / source_type / "data"
|
|
for candidate in (
|
|
base / f"{name}.parquet",
|
|
base / f"{name}.parquet.tmp",
|
|
):
|
|
if candidate.exists():
|
|
candidate.unlink()
|
|
logger.info(
|
|
"Removed materialized parquet for unregistered table %s: %s",
|
|
table_id, candidate,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to remove materialized parquet for %s: %s — registry row is "
|
|
"still dropped; clean up the file manually if it lingers",
|
|
table_id, e,
|
|
)
|
|
|
|
# Clear sync_state for any source/mode (a row that was synced at any
|
|
# point — local/materialized — has a sync_state entry that the manifest
|
|
# serves regardless of registry state). Pre-fix, the manifest still
|
|
# advertised the dropped table to `da sync` because sync_state was
|
|
# never cleaned up, and analysts kept getting it through the manifest.
|
|
try:
|
|
conn.execute("DELETE FROM sync_state WHERE table_id = ?", [name])
|
|
conn.execute("DELETE FROM sync_history WHERE table_id = ?", [name])
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to clear sync_state for unregistered table %s: %s — "
|
|
"manifest may still advertise the dropped row to da sync",
|
|
table_id, e,
|
|
)
|
|
|
|
AuditRepository(conn).log(
|
|
user_id=user.get("id"),
|
|
action="unregister_table",
|
|
resource=table_id,
|
|
params=_sanitize_for_audit({
|
|
"name": existing.get("name"),
|
|
"source_type": existing.get("source_type"),
|
|
"bucket": existing.get("bucket"),
|
|
"source_table": existing.get("source_table"),
|
|
}),
|
|
)
|
|
|
|
if was_bigquery:
|
|
background.add_task(_materialize_bigquery_extract_bg)
|
|
|
|
|
|
@router.post("/configure")
|
|
async def configure_instance(
|
|
request: ConfigureRequest,
|
|
user: dict = Depends(require_admin),
|
|
):
|
|
"""Configure data source and instance settings via API.
|
|
|
|
Writes config to instance.yaml and persists secrets to .env_overlay.
|
|
AI agents and the /setup wizard use this instead of manual file editing.
|
|
"""
|
|
import yaml
|
|
|
|
if request.data_source not in ("keboola", "bigquery", "local"):
|
|
raise HTTPException(status_code=400, detail="data_source must be 'keboola', 'bigquery', or 'local'")
|
|
|
|
# Validate credentials if provided
|
|
if request.data_source == "keboola":
|
|
if not request.keboola_token or not request.keboola_url:
|
|
raise HTTPException(status_code=400, detail="keboola_token and keboola_url are required for Keboola data source")
|
|
_validate_url_not_private(request.keboola_url, field_name="keboola_url")
|
|
try:
|
|
from connectors.keboola.client import KeboolaClient
|
|
client = KeboolaClient(token=request.keboola_token, url=request.keboola_url)
|
|
client.test_connection()
|
|
except Exception as e:
|
|
logger.error("Keboola connection validation failed: %s", e)
|
|
raise HTTPException(status_code=400, detail="Keboola connection failed. Check your token and URL.")
|
|
|
|
elif request.data_source == "bigquery":
|
|
if not request.bigquery_project:
|
|
raise HTTPException(status_code=400, detail="bigquery_project is required for BigQuery data source")
|
|
|
|
# Write instance.yaml to DATA_DIR/state/ (writable Docker volume),
|
|
# NOT to CONFIG_DIR which is mounted read-only in Docker.
|
|
#
|
|
# Narrow-overlay write strategy — must match `/api/admin/server-config`:
|
|
# 1. Read overlay verbatim (do NOT fall back to static). Falling back
|
|
# would copy env-resolved cleartext secrets from the merged static
|
|
# file back into the overlay (e.g. `smtp_password: ${SMTP_PASSWORD}`
|
|
# → `smtp_password: hunter2`). The wizard only ever sets
|
|
# `instance`, `auth`, `data_source` here, so other sections must
|
|
# flow from the static file via `load_instance_config`'s deep-merge
|
|
# — they don't belong in the overlay at all.
|
|
# 2. Patch only the sections this endpoint touches.
|
|
# 3. Write the narrow overlay back atomically (tmp + os.replace).
|
|
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
|
|
config_path = data_dir / "state" / "instance.yaml"
|
|
|
|
# Same serialization + corrupt-overlay handling as POST /server-config.
|
|
with _overlay_write_lock:
|
|
overlay: dict = {}
|
|
if config_path.exists():
|
|
try:
|
|
overlay = yaml.safe_load(config_path.read_text()) or {}
|
|
except Exception as e:
|
|
logger.exception("configure: refusing to overwrite corrupt overlay at %s", config_path)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"refusing to overwrite corrupt overlay at {config_path} ({e}); "
|
|
"back up and remove the file, or fix it by hand",
|
|
) from e
|
|
|
|
# Merge instance settings into the overlay only — never seed from the
|
|
# env-resolved merged config.
|
|
if request.instance_name:
|
|
overlay.setdefault("instance", {})["name"] = request.instance_name
|
|
|
|
if request.allowed_domain:
|
|
overlay.setdefault("auth", {})["allowed_domain"] = request.allowed_domain
|
|
|
|
# data_source is fully owned by this endpoint — replace wholesale.
|
|
overlay["data_source"] = {"type": request.data_source}
|
|
if request.data_source == "keboola":
|
|
overlay["data_source"]["keboola"] = {
|
|
"stack_url": request.keboola_url,
|
|
"token_env": "KEBOOLA_STORAGE_TOKEN",
|
|
}
|
|
elif request.data_source == "bigquery":
|
|
overlay["data_source"]["bigquery"] = {
|
|
"project": request.bigquery_project,
|
|
"location": request.bigquery_location or "us",
|
|
}
|
|
|
|
# Atomic write to writable data volume — same tmp + os.replace pattern
|
|
# as the server-config editor so a concurrent save can't tear the file.
|
|
config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = config_path.with_suffix(config_path.suffix + ".tmp")
|
|
tmp_path.write_text(yaml.dump(overlay, default_flow_style=False, sort_keys=False))
|
|
os.replace(tmp_path, config_path)
|
|
logger.info("Wrote instance config to %s", config_path)
|
|
|
|
# Persist secrets to .env_overlay (in data volume, never in git)
|
|
secrets_to_persist = {}
|
|
if request.keboola_token:
|
|
secrets_to_persist["KEBOOLA_STORAGE_TOKEN"] = request.keboola_token
|
|
if request.keboola_url:
|
|
secrets_to_persist["KEBOOLA_STACK_URL"] = request.keboola_url
|
|
|
|
if secrets_to_persist:
|
|
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
|
|
overlay_path = data_dir / "state" / ".env_overlay"
|
|
overlay_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Merge with existing overlay
|
|
existing_overlay = {}
|
|
if overlay_path.exists():
|
|
for line in overlay_path.read_text().splitlines():
|
|
if "=" in line and not line.startswith("#"):
|
|
k, v = line.split("=", 1)
|
|
existing_overlay[k.strip()] = v.strip()
|
|
existing_overlay.update(secrets_to_persist)
|
|
|
|
overlay_path.write_text(
|
|
"\n".join(f"{k}={v}" for k, v in existing_overlay.items()) + "\n"
|
|
)
|
|
try:
|
|
overlay_path.chmod(0o600)
|
|
except OSError:
|
|
pass
|
|
logger.info("Persisted %d secrets to .env_overlay", len(secrets_to_persist))
|
|
|
|
# Inject into current process environment
|
|
for k, v in secrets_to_persist.items():
|
|
os.environ[k] = v
|
|
|
|
# Invalidate cached instance config so next read picks up changes.
|
|
# Use the public helper (matches `/api/admin/server-config`); reaching
|
|
# into the private global silently breaks if the cache layout changes.
|
|
from app.instance_config import reset_cache
|
|
reset_cache()
|
|
|
|
return {
|
|
"status": "ok",
|
|
"data_source": request.data_source,
|
|
"connection": "verified" if request.data_source != "local" else "local",
|
|
}
|
|
|
|
|
|
def _discover_and_register_tables(conn: duckdb.DuckDBPyConnection, user_email: str) -> dict:
|
|
"""Discover tables from configured source and register them. Shared logic for API and sync."""
|
|
from app.instance_config import get_data_source_type, get_value
|
|
|
|
source_type = get_data_source_type()
|
|
if source_type != "keboola":
|
|
return {"registered": 0, "skipped": 0, "errors": 0, "tables": [], "source": source_type}
|
|
|
|
from connectors.keboola.client import KeboolaClient
|
|
# Read from data_source.keboola (matches what /api/admin/configure writes)
|
|
url = get_value("data_source", "keboola", "stack_url", default="")
|
|
token_env = get_value("data_source", "keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN")
|
|
token = os.environ.get(token_env, "") if token_env else ""
|
|
if not token:
|
|
token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
|
|
|
|
client = KeboolaClient(token=token, url=url)
|
|
discovered = client.discover_all_tables()
|
|
|
|
repo = TableRegistryRepository(conn)
|
|
registered = 0
|
|
skipped = 0
|
|
errors = 0
|
|
table_names = []
|
|
|
|
for table in discovered:
|
|
table_id = table.get("id", "").strip().lower().replace(".", "_").replace(" ", "_")
|
|
if not table_id:
|
|
errors += 1
|
|
continue
|
|
|
|
if repo.get(table_id):
|
|
skipped += 1
|
|
continue
|
|
|
|
try:
|
|
# Parse bucket from table ID (format: in.c-bucket.table_name)
|
|
parts = table.get("id", "").split(".")
|
|
bucket = parts[1] if len(parts) > 1 else ""
|
|
source_table = parts[2] if len(parts) > 2 else table.get("name", "")
|
|
|
|
repo.register(
|
|
id=table_id,
|
|
name=table.get("name", table_id),
|
|
source_type="keboola",
|
|
bucket=bucket,
|
|
source_table=source_table,
|
|
query_mode="local",
|
|
registered_by=user_email,
|
|
description=f"Auto-discovered from Keboola: {table.get('id', '')}",
|
|
)
|
|
registered += 1
|
|
table_names.append(table_id)
|
|
except Exception as e:
|
|
logger.warning("Failed to register %s: %s", table_id, e)
|
|
errors += 1
|
|
|
|
return {
|
|
"registered": registered,
|
|
"skipped": skipped,
|
|
"errors": errors,
|
|
"tables": table_names,
|
|
"source": "keboola",
|
|
}
|
|
|
|
|
|
@router.post("/discover-and-register")
|
|
async def discover_and_register(
|
|
user: dict = Depends(require_admin),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Discover tables from configured source and auto-register them.
|
|
|
|
Combines discover-tables + register-table into one call.
|
|
Skips already-registered tables. Used by /setup wizard and AI agents.
|
|
"""
|
|
try:
|
|
result = _discover_and_register_tables(conn, user.get("email", "admin"))
|
|
return result
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Discovery and registration failed: {e}")
|