agnes-the-ai-analyst/app/api/admin.py
ZdenekSrotyr 85d3810535 feat(materialized): query_mode='materialized' for BigQuery + Keboola — admin SELECT → parquet → analyst
Closes the 'admin pre-stages a curated table/view for analysts' use case end-to-end across both supported source connectors.

Backend (BigQuery + Keboola, schema v20):
  - schema v20 adds source_query TEXT to table_registry (renumbered from v19 after main's #150 RBAC migration also bumped to v19)
  - connectors/bigquery/extractor.py adds materialize_query(table_id, sql, *, bq, output_dir, max_bytes=...) — BqAccess session, dry-run cost guardrail (default 10 GiB, configurable via data_source.bigquery.max_bytes_per_materialize), idempotent ATTACH, rows/bytes/md5 metadata for sync_state
  - connectors/keboola/access.py — new KeboolaAccess facade (parallel of BqAccess) wrapping ATTACH 'keboola://...' AS kbc
  - connectors/keboola/extractor.py adds materialize_query — same shape, no dry-run analog (Keboola Storage API has different cost model); legacy bucket-download path skips query_mode='materialized' rows
  - app/api/sync.py:_run_materialized_pass dispatches by source_type to the right materialize_query
  - app/api/admin.py: RegisterTableRequest accepts source_query; model_validator coheres mode↔source_query↔bucket; PUT preserves omitted fields; deprecation marks (Field(deprecated=True)) on sync_strategy + profile_after_sync (no extractor reads them; profile_after_sync becomes inert — bug from earlier work where /api/sync/trigger never honored the flag); _BQ_OPTIONAL_FIELD_DEFAULTS injects defaults into GET /server-config payload

Operator + CLI surface:
  - da admin register-table --query / --query-mode materialized
  - scripts/smoke-test-materialized-bq.sh — end-to-end smoke for operators

Tests (incl. spike + integration + regression):
  - test_db_migration_v20, test_table_registry_source_query
  - test_bq_materialize, test_bq_cost_guardrail, test_bq_init_extract_skips
  - test_keboola_access, test_keboola_extension_query_passthrough (lock-in for the DuckDB extension capability), test_keboola_materialize, test_keboola_init_extract_skips, test_keboola_materialized_e2e (skipped without KBC_TEST_* creds)
  - test_sync_trigger_materialized, test_sync_trigger_keboola_materialized
  - test_api_admin_materialized, test_cli_admin_materialized
  - test_admin_bq_register, test_admin_discover_bigquery, test_admin_keboola_materialized, test_admin_phase_c_deprecation, test_admin_put_preservation, test_materialized_e2e

Cost: BQ uses bigquery_query() (jobs API, view-aware) — works on tables, views, materialized views uniformly. Keboola uses ATTACH+COPY parquet through the DuckDB extension.
2026-05-01 20:25:56 +02:00

2429 lines
102 KiB
Python

"""Admin endpoints — table discovery, registry management, instance configuration.
Every gate on this router uses ``require_admin`` from ``app.auth.access``,
which checks Admin user_group membership for both OAuth session and PAT
callers via the same ``_user_group_ids`` lookup.
"""
import logging
import os
import threading
import uuid
from pathlib import Path
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Optional, List, Dict, Any
import duckdb
from app.auth.access import require_admin
from app.auth.dependencies import _get_db
from src.repositories.table_registry import TableRegistryRepository
from src.repositories.audit import AuditRepository
from src.identifier_validation import (
is_safe_identifier as _is_safe_identifier,
is_safe_quoted_identifier as _is_safe_quoted_identifier,
)
from src.sql_safe import is_safe_project_id as _is_safe_project_id
from src.scheduler import is_valid_schedule
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/admin", tags=["admin"])
# Serializes the read-modify-write of state/instance.yaml across the two
# endpoints that mutate the overlay (POST /server-config and POST /configure).
# Without it, two admins saving concurrently would each read the same overlay
# snapshot, merge their disjoint patches, and the second os.replace would silently
# drop the first patch. Single-process FastAPI workers; multi-worker deployments
# would need an OS-level file lock — documented limitation.
_overlay_write_lock = threading.Lock()
# SSRF protection: reject private/internal URLs for keboola_url
import ipaddress as _ipaddress
import socket as _socket
from urllib.parse import urlparse as _urlparse
def _validate_url_not_private(url: str, field_name: str = "url") -> None:
"""Raise 400 if the URL host points to a private/reserved network.
Uses DNS resolution + ipaddress checks instead of hostname regex,
which correctly handles all IPv4/IPv6 addresses including abbreviated
forms (fe80::1, ::1, etc.) and DNS rebinding (resolves at check time).
"""
try:
parsed = _urlparse(url)
except Exception:
raise HTTPException(status_code=400, detail=f"Invalid {field_name}: not a valid URL")
host = parsed.hostname or ""
if not host:
raise HTTPException(status_code=400, detail=f"Invalid {field_name}: missing hostname")
# Reject well-known dangerous hostnames before DNS resolution
if host.lower() in ("localhost", "localhost.localdomain"):
raise HTTPException(
status_code=400,
detail=f"Invalid {field_name}: must not point to a private or reserved network",
)
# Resolve hostname to IP addresses and check each one
try:
addrinfos = _socket.getaddrinfo(host, None, proto=_socket.IPPROTO_TCP)
except Exception:
raise HTTPException(
status_code=400,
detail=f"Invalid {field_name}: could not resolve hostname",
)
for family, _type, _proto, _canonname, sockaddr in addrinfos:
ip_str = sockaddr[0]
try:
ip = _ipaddress.ip_address(ip_str)
except ValueError:
continue
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast:
raise HTTPException(
status_code=400,
detail=f"Invalid {field_name}: must not point to a private or reserved network",
)
def _normalize_primary_key(v):
"""Coerce a string primary_key to ``[v]`` for backward compatibility.
The 0.14.0 contract is ``Optional[List[str]]`` so composite primary keys
(e.g. session-grain tables keyed on ``(session_id, event_date)``) round-
trip cleanly. Pre-0.14.0 callers sent a single string; Pydantic v2
refuses to coerce, so without this validator a CLI script posting
``"primary_key": "session_id"`` would now hit a 422. Wrap a bare string
in a one-element list so old and new callers both work.
"""
if v is None:
return v
if isinstance(v, str):
return [v]
return v
# Patches to these section paths must pass _validate_url_not_private. The
# tuple is `(section, *intermediate_keys, leaf_key)` — same SSRF gate the
# /configure wizard applies to keboola_url, so an admin can't sneak
# http://169.254.169.254/ in via the server-config editor's data_source patch.
#
# Intentionally NOT included: ``("ai", "base_url")``. The openai_compat
# provider legitimately points at internal services (LiteLLM proxy on a
# private network, on-cluster vLLM endpoint, etc.) — see
# config/instance.yaml.example "LiteLLM proxy" example. SSRF blocking
# would break those valid setups. Operators with stricter posture should
# enforce the constraint upstream (firewall / egress proxy allowlist).
# Devin ANALYSIS_0001 on PR #141 5f649a4 review.
_URL_BEARING_FIELDS: tuple[tuple[str, ...], ...] = (
("data_source", "keboola", "stack_url"),
)
def _validate_urls_in_patch(sections: Dict[str, Dict[str, Any]]) -> None:
"""Apply SSRF protection to every URL-bearing field present in the patch.
Walks each registered ``(section, *path, leaf)`` against the incoming
patch and runs ``_validate_url_not_private`` on any string value found.
Missing intermediate keys / non-dict nodes are silently skipped — the
patch hasn't touched that field, no validation needed.
"""
for path in _URL_BEARING_FIELDS:
section = path[0]
if section not in sections:
continue
node: Any = sections[section]
for key in path[1:-1]:
if not isinstance(node, dict) or key not in node:
node = None
break
node = node[key]
if isinstance(node, dict):
value = node.get(path[-1])
if isinstance(value, str) and value:
_validate_url_not_private(value, field_name=".".join(path))
# --- Server-config (instance.yaml) editor -----------------------------------
#
# The /admin/server-config UI POSTs a partial dict here keyed by section
# (instance, data_source, email, telegram, jira, theme, server, auth) with
# the field values to merge into instance.yaml. Each save:
# 1. Loads the current instance.yaml (writable overlay first, then static).
# 2. Deep-merges the patch on top.
# 3. Writes to DATA_DIR/state/instance.yaml (the writable overlay).
# 4. Writes one audit_log entry tagged `instance_config.update` containing
# a sanitized diff (secret-looking keys are masked).
# Hot-reload is OUT OF SCOPE for #91 — the response carries
# `restart_required=True` so the UI can show the banner.
# Sections an admin can mutate. Keep the list explicit so a typo'd section
# in the request body is rejected loudly instead of being silently merged
# into the YAML root and confusing future loads.
_EDITABLE_SECTIONS: tuple[str, ...] = (
"instance",
"data_source",
"email",
"telegram",
"jira",
"theme",
"server",
"auth",
"ai",
"openmetadata",
"desktop",
"corporate_memory",
)
# "Danger-zone" sections — flipping these can lock operators out (auth.*) or
# break OAuth callbacks (server.hostname/host). The UI shows a confirmation
# dialog before submitting them. The API accepts them; this list exists so
# the audit entry can flag the change as high-risk and the UI can surface
# the right warning copy.
_DANGER_SECTIONS: tuple[str, ...] = ("auth", "server")
# Known-but-optional config fields per section. The /admin/server-config UI
# uses this registry alongside the YAML payload to render fields the operator
# might want to set even though they're not currently in instance.yaml.
#
# Schema per field:
# {
# "kind": "string" | "secret" | "bool" | "int" | "select" | "object" | "array",
# "default": <type-appropriate default> (optional)
# "hint": "<one-line operator-facing help>"
# "options": [...] (only for kind="select")
# "fields": {<name>: <fieldspec>} (only for kind="object", nested fields)
# "item_kind": "string" | ... (only for kind="array", element type)
# "required": bool (defaults False; UI marks the label)
# }
#
# Subagents 2-4 will populate the bodies. The registry enables the UI to
# render missing-but-known fields with placeholders + hints rather than
# forcing the operator to discover them via the JSON-patch textarea or
# hitting a runtime error first. The smoke fixture below
# (data_source.bigquery.billing_project) proves the renderer wiring works
# end-to-end so subagents 2-4 only have to add registry entries — they
# don't need to touch admin_server_config.html.
_KNOWN_FIELDS: dict[str, dict[str, dict]] = {
"instance": {
# No commonly-missing instance-level fields. The example YAML's
# `name`/`subtitle` are always populated by `da setup` so they
# render via the populated path; nothing to surface here.
},
"data_source": {
"bigquery": {
"kind": "object",
"hint": "BigQuery connection knobs (read more in docs/DEPLOYMENT.md)",
"fields": {
"billing_project": {
"kind": "string",
"hint": (
"GCP project to bill BQ jobs against. Set when SA can read "
"the data project but cannot bill there (e.g. shared read-only "
"data project). Defaults to data_source.bigquery.project. "
"Mismatch → 403 USER_PROJECT_DENIED on every BQ call."
),
},
"legacy_wrap_views": {
"kind": "bool",
"default": False,
"hint": (
"When true, registered VIEWs and MATERIALIZED_VIEWs get a DuckDB "
"master view via bigquery_query() (jobs API) so analysts can "
"SELECT * FROM viewname directly. When false (default), views are "
"catalog-only — analysts use `da fetch` or `da query --remote`. "
"ON for view-heavy deployments where most views are small enough "
"to materialize without 'Response too large' (issue #101)."
),
},
"max_bytes_per_materialize": {
"kind": "int",
"default": 10737418240,
"hint": (
"Cost guardrail for query_mode='materialized' BQ scans (dry-run "
"check before running). Bytes processed; exceeds → registration "
"or sync rejected. 0 disables the gate. Default 10737418240 = 10 GiB."
),
},
},
},
"keboola": {
"kind": "object",
"hint": "Keboola Storage API connection",
"fields": {
"stack_url": {
"kind": "string",
"hint": (
"e.g. https://connection.keboola.com (instance-specific stack URL). "
"Validated against private-IP allowlist on save (SSRF guard)."
),
},
"project_id": {
"kind": "string",
"hint": "Keboola project ID (numeric, but kept as string in YAML).",
},
},
},
},
"email": {
# SMTP fields render via the populated path (always set when email
# is enabled); no commonly-missing optional knobs at this layer.
},
"telegram": {
# Rarely missing; leave empty.
},
"jira": {
# Webhook + REST credentials always present when Jira is configured.
},
"theme": {
# Cosmetic only; rarely missing.
},
"server": {
# TLS / hostname knobs are mostly env-side; nothing to surface here.
},
"auth": {
"allowed_domain": {
"kind": "string",
"hint": (
"Comma-separated list of allowed sign-in email domains (e.g. "
"'acme.com,acme-internal.com'). Single domain works too. Empty → no "
"domain restriction (any verified Google identity can sign in)."
),
},
},
"ai": {
"base_url": {
"kind": "string",
"hint": (
"Required for provider='openai_compat' (LiteLLM, OpenRouter, vLLM, etc.). "
"Ignored when provider='anthropic'. Examples: https://litellm.example.com, "
"https://openrouter.ai/api/v1."
),
},
"structured_output": {
"kind": "select",
"options": ["strict", "json", "auto"],
"default": "auto",
"hint": (
"JSON-schema enforcement strategy. strict=Layer 1 only "
"(Anthropic/OpenAI native, fail otherwise). json=Layer 1 + Layer 2 "
"fallback. auto=all three layers including prompt-based JSON (most "
"compatible, least strict)."
),
},
},
"openmetadata": {
"url": {
"kind": "string",
"hint": "Base URL of your OpenMetadata server (e.g. https://catalog.example.com).",
},
"token": {
"kind": "secret",
"hint": (
"JWT bearer token. Use ${OPENMETADATA_TOKEN} env-var reference "
"(don't paste secret directly)."
),
},
"cache_ttl_seconds": {
"kind": "int",
"default": 3600,
"hint": "How long to cache catalog responses in-process. Default 3600s (1h).",
},
"verify_ssl": {
"kind": "bool",
"default": True,
"hint": (
"TLS verification. Default true. Set false ONLY for internal CAs / "
"self-signed certs — sends the JWT over an unverified channel."
),
},
},
"desktop": {
"jwt_issuer": {
"kind": "string",
"default": "data-analyst",
"hint": "JWT iss claim. Match what the desktop app verifies.",
},
"jwt_secret": {
"kind": "secret",
"hint": "JWT signing secret. Use ${DESKTOP_JWT_SECRET} env-var reference.",
},
"url_scheme": {
"kind": "string",
"default": "data-analyst",
"hint": "Custom URL scheme registered by the desktop app (data-analyst://...).",
},
},
# corporate_memory governance — optional. When the section is missing
# from instance.yaml the system runs in legacy democratic-wiki mode
# (no admin review). Schema mirrors config/instance.yaml.example
# lines 224-317; renderer handles arbitrary depth + arrays + maps.
"corporate_memory": {
"distribution_mode": {
"kind": "select",
"options": ["mandatory_only", "admin_curated", "hybrid"],
"default": "hybrid",
"hint": (
"How knowledge reaches users. mandatory_only = admin-only; "
"admin_curated = admin + user voting as feedback; "
"hybrid = default (mandatory from admin + optional from user voting)."
),
},
"approval_mode": {
"kind": "select",
"options": ["review_queue", "auto_publish", "threshold"],
"default": "review_queue",
"hint": (
"How AI-extracted items enter the system. review_queue = admin "
"approval required (default); auto_publish = live immediately; "
"threshold = high-confidence auto, low-confidence to queue."
),
},
"review_period_months": {
"kind": "int",
"default": 6,
"hint": "How often approved/mandatory items are flagged for re-review (months).",
},
"notify_on_new_items": {
"kind": "bool",
"default": True,
"hint": "Notify km_admins when new pending items arrive.",
},
"sources": {
"kind": "object",
"hint": (
"Knowledge-source ingestion. Each source has its own enabled "
"flag + base confidence."
),
"fields": {
"claude_local_md": {
"kind": "object",
"fields": {
"enabled": {"kind": "bool", "default": True},
"confidence_base": {
"kind": "float",
"default": 0.50,
"hint": "Confidence assigned to extractions from CLAUDE.local.md (0-1).",
},
},
},
"session_transcripts": {
"kind": "object",
"fields": {
"enabled": {"kind": "bool", "default": True},
"confidence_base": {"kind": "float", "default": 0.60},
"max_turns_per_session": {
"kind": "int",
"default": 100,
"hint": "Truncate transcripts longer than this many turns.",
},
"detection_types": {
"kind": "array",
"item_kind": "string",
"default": [
"correction",
"confirmation",
"unprompted_definition",
],
"hint": (
"Which extraction patterns to detect. Each entry "
"is a detection-type tag."
),
},
},
},
},
},
"extraction": {
"kind": "object",
"fields": {
"model": {
"kind": "string",
"default": "claude-haiku-4-5-20251001",
"hint": "LLM used to extract knowledge. Override for cost or quality.",
},
"sensitivity_check": {"kind": "bool", "default": True},
"contradiction_check": {"kind": "bool", "default": True},
},
},
"confidence": {
"kind": "object",
"hint": "Confidence scoring + decay rules.",
"fields": {
"base": {
"kind": "map",
"key_kind": "string",
"value_kind": "float",
"default": {
"user_verification.correction": 0.90,
"user_verification.unprompted_definition": 0.90,
"user_verification.confirmation": 0.60,
"admin_mandate": 1.00,
"claude_local_md": 0.50,
"session_transcript": 0.50,
},
"hint": (
"Base score per source/detection. Keys are 'source_type' "
"or 'source_type.detection_type' (the dot is data, not "
"nesting)."
),
},
"modifiers": {
# map<string, map<string, float>>. The renderer's structured
# editor for "map of objects with declared subfields" is a
# TODO (see admin_server_config.html); for now this falls
# back to a JSON textarea — admins editing it see the
# schema doc inline via the hint.
"kind": "map",
"key_kind": "string",
"value_kind": "object",
"value_fields": {}, # signals the JSON-textarea fallback
"hint": (
"Per-key modifier step sizes applied to base when "
"optional signals are present (3-level dotted paths). "
"Edit as a JSON object — outer keys mirror confidence.base "
"keys; inner objects map signal name to bonus float."
),
},
"decay": {
"kind": "object",
"fields": {
"mode": {
"kind": "select",
"options": ["linear", "exponential"],
"default": "exponential",
},
"half_life_months": {
"kind": "int",
"default": 12,
"hint": "Used when mode=exponential.",
},
"decay_rate_monthly": {
"kind": "float",
"default": 0.02,
"hint": "Used when mode=linear.",
},
"floor": {
"kind": "map",
"key_kind": "string",
"value_kind": "float",
"default": {
"admin_mandate": 0.50,
"user_verification": 0.40,
"default": 0.0,
},
"hint": (
"Per-source minimum confidence — items never decay "
"below this floor."
),
},
},
},
},
},
"contradiction_detection": {
"kind": "object",
"fields": {
"enabled": {"kind": "bool", "default": True},
"max_candidates": {
"kind": "int",
"default": 10,
"hint": "Max contradiction candidates to evaluate per new item.",
},
},
},
"entity_resolution": {
"kind": "object",
"fields": {
"enabled": {"kind": "bool", "default": True},
"entities": {
"kind": "map",
"key_kind": "string",
"value_kind": "array",
"value_item_kind": "string",
"default": {
"metrics": ["churn", "MRR", "ARR", "NPS", "CAC", "LTV"],
"products": ["Platform", "API", "Dashboard"],
},
"hint": (
"Domain-entity vocabulary. Key = domain category; value = "
"canonical names list."
),
},
},
},
"domain_owners": {
"kind": "map",
"key_kind": "string",
"value_kind": "array",
"value_item_kind": "string",
"hint": (
"Per-domain admin emails. Key = domain name; value = email list."
),
},
"domains": {
"kind": "array",
"item_kind": "string",
"default": [
"finance",
"engineering",
"product",
"data",
"operations",
"infrastructure",
],
"hint": (
"Knowledge domains analysts can target. Each must match a key "
"in domain_owners."
),
},
},
}
# Keys whose values must be redacted from the audit diff. We match
# substring (case-insensitive) so `client_secret`, `api_token`,
# `webapp_secret_key`, `bot_token`, `password`, `smtp_password`, etc. all
# get masked even when nested.
_SECRET_KEY_PATTERNS: tuple[str, ...] = (
"secret",
"token",
"password",
"api_key",
)
def _is_secret_key(key: str) -> bool:
"""True if a config key holds a credential and should be masked in audit logs."""
k = key.lower()
return any(pat in k for pat in _SECRET_KEY_PATTERNS)
def _mask(value: Any) -> str:
"""Replacement value used in the audit diff for secret fields.
We deliberately do NOT preserve length or any hint about the secret —
the diff is read by other admins, and there's no operator value to
leaking "the new SMTP password is 16 chars". `***` is enough to show
that the field changed without exposing it.
"""
if value in (None, ""):
return "<empty>"
return "***"
# Sentinel values produced by `_mask`. Any patch leaf that arrives at a
# secret-keyed slot still bearing one of these strings means the caller
# round-tripped the GET payload (which redacts secret-keyed children inside
# nested objects) without changing the value — `_strip_redacted_sentinels`
# drops the leaf so deep-merge preserves whatever the overlay already had,
# rather than persisting the placeholder on top of the real secret.
_REDACTED_SENTINELS: frozenset = frozenset({"***", "<empty>"})
def _strip_redacted_sentinels(value: Any, key_hint: str = "") -> Any:
"""Recursively drop secret-keyed leaves whose value is a redaction sentinel.
Symmetric with `_redact`: the GET handler masks secret-keyed children
inside nested objects so the form never shows cleartext, and this
function is the write-side counterpart that ensures the placeholder
doesn't make a round-trip back into the overlay. Defense-in-depth
alongside the client-side `scrubRedactedSecrets` in
`admin_server_config.html` — an API caller (CLI / script) that forgets
to scrub still can't corrupt secrets via this endpoint.
"""
if isinstance(value, dict):
out: Dict[str, Any] = {}
for k, v in value.items():
if _is_secret_key(k) and isinstance(v, str) and v in _REDACTED_SENTINELS:
continue
out[k] = _strip_redacted_sentinels(v, k)
return out
if isinstance(value, list):
return [_strip_redacted_sentinels(item, key_hint) for item in value]
return value
def _redact(value: Any, key_hint: str = "") -> Any:
"""Recursively mask secret-looking fields in a config subtree.
`key_hint` is the parent key — used so a string value like
``"${KEBOOLA_TOKEN}"`` under ``token_env`` is masked even though the
value itself isn't a credential, because the key signals it points at
one.
"""
if isinstance(value, dict):
return {k: (_mask(v) if _is_secret_key(k) else _redact(v, k)) for k, v in value.items()}
if isinstance(value, list):
return [_redact(item, key_hint) for item in value]
if key_hint and _is_secret_key(key_hint):
return _mask(value)
return value
def _diff_dicts(before: dict, after: dict, path: str = "") -> List[Dict[str, Any]]:
"""Flat list of changed fields between two dicts.
Output: [{"path": "email.smtp_host", "before": "...", "after": "..."}].
Diff is computed on RAW values, then each row's `before`/`after` is
masked via `_mask` when the leaf key matches `_is_secret_key` — pre-
masking the inputs would collapse a secret rotation (e.g. password A
→ password B) into "no diff" because both sides redact to ``"***"``,
and the audit log would then silently fail to record one of the most
security-relevant changes. Compare raw, redact when emitting.
Recurses into a dict on either side (treating the missing side as
`{}`) so adding a brand-new section reports per-field paths
(`email.smtp_host`) rather than a single opaque `email` blob — that
keeps the audit row useful when an admin populates a section for the
first time.
"""
changes: List[Dict[str, Any]] = []
keys = set(before.keys()) | set(after.keys())
for key in sorted(keys):
new_path = f"{path}.{key}" if path else key
b_val = before.get(key)
a_val = after.get(key)
b_is_dict = isinstance(b_val, dict)
a_is_dict = isinstance(a_val, dict)
# Dict-vs-dict (or dict-vs-None) → recurse for per-field paths.
if b_is_dict and a_is_dict:
changes.extend(_diff_dicts(b_val, a_val, new_path))
elif b_is_dict and a_val is None:
changes.extend(_diff_dicts(b_val, {}, new_path))
elif a_is_dict and b_val is None:
changes.extend(_diff_dicts({}, a_val, new_path))
# Dict↔scalar shape change is recorded as a single replacement at
# the parent path. Recursing with `{}` would lose the scalar side
# entirely (admin sets `keboola: {…}` to `keboola: "disabled"` —
# auditor would see members removed but never the new value).
# The dict side may itself contain secret-keyed children (e.g.
# `keboola: {token_env: ${KEBOOLA_TOKEN}}` resolved to cleartext);
# `_redact` masks those children even when the parent key isn't
# secret-named, so the audit log doesn't leak ${ENV_VAR}-resolved
# values when a section is replaced wholesale.
elif b_is_dict != a_is_dict:
if _is_secret_key(key):
changes.append({
"path": new_path,
"before": _mask(b_val),
"after": _mask(a_val),
})
else:
changes.append({
"path": new_path,
"before": _redact(b_val, key) if b_is_dict else b_val,
"after": _redact(a_val, key) if a_is_dict else a_val,
})
elif b_val != a_val:
if _is_secret_key(key):
changes.append({
"path": new_path,
"before": _mask(b_val),
"after": _mask(a_val),
})
else:
changes.append({"path": new_path, "before": b_val, "after": a_val})
return changes
def _deep_merge(base: dict, patch: dict) -> dict:
"""Merge `patch` into `base` recursively, returning a new dict.
Patch values overwrite base values. Dict-into-dict recurses; everything
else (lists, scalars, None) is replaced wholesale — admin sets
``email: {smtp_port: 465}`` and we don't try to re-merge nested ports.
"""
out = dict(base)
for key, value in patch.items():
if isinstance(value, dict) and isinstance(out.get(key), dict):
out[key] = _deep_merge(out[key], value)
else:
out[key] = value
return out
def _load_current_instance_yaml() -> dict:
"""Return the editor's view of instance.yaml — deep-merge of static +
overlay via ``app.instance_config.load_instance_config``.
Readers (GET /server-config) hit the cache and trust that writers
invalidate. Writers must call ``reset_cache()`` explicitly *before*
the read so they see the latest disk state in the read-modify-write
sequence. The shared helper is the authoritative source so the editor
never sees a different view than the rest of the running app.
"""
from app.instance_config import load_instance_config
return load_instance_config()
def _public_view(config: dict) -> dict:
"""Return a config dict safe to render in the admin UI form.
Deep-copies and redacts secret-looking fields so an admin can see
*which* fields are populated without the cleartext leaking into the
rendered HTML / browser DevTools.
"""
import copy
return _redact(copy.deepcopy(config))
class ServerConfigUpdateRequest(BaseModel):
"""Patch payload for POST /api/admin/server-config.
Only the sections listed in `_EDITABLE_SECTIONS` are accepted; anything
else is rejected with 400. `confirm_danger` must be true if the patch
touches any danger-zone section (auth.*, server.*).
"""
sections: Dict[str, Dict[str, Any]] = Field(
default_factory=dict,
description="Per-section patch dict (e.g. {'instance': {'name': 'X'}})",
)
confirm_danger: bool = Field(
default=False,
description="Must be true to apply changes touching auth.* or server.*",
)
# Optional BQ fields whose runtime defaults are documented but which used to
# be invisible in the editor when YAML omitted them. The data_source.bigquery
# subtree renders as a JSON textarea; a key that's absent from the GET
# payload literally cannot appear in the form for the operator to edit. We
# surface them with their documented defaults so the UI always shows them as
# editable knobs — see Phase J of the admin-tables-cleanup work.
#
# - billing_project: defaults to data project; explicit value needed when
# the SA can read the data project but not bill against it.
# - legacy_wrap_views: default False; toggling ON wraps BQ views via
# `bigquery_query()` so analysts can `SELECT *` directly.
# - max_bytes_per_materialize: cost guardrail for `query_mode='materialized'`
# (default 10 GiB; 0 disables; null falls through to the default).
_BQ_OPTIONAL_FIELD_DEFAULTS: Dict[str, Any] = {
"billing_project": "",
"legacy_wrap_views": False,
"max_bytes_per_materialize": 10737418240,
}
def _ensure_bq_optional_fields(sections: Dict[str, Any]) -> None:
"""In-place: add missing BQ optional fields to data_source.bigquery so the
UI's JSON-textarea renders them as editable keys. Existing values are
preserved — only absent keys are populated with their documented default.
"""
ds = sections.get("data_source")
if not isinstance(ds, dict):
return
bq = ds.get("bigquery")
if not isinstance(bq, dict):
# No BQ subsection — leave alone. Non-BQ instances don't need these
# knobs, and creating an empty bigquery dict would be misleading.
return
for key, default in _BQ_OPTIONAL_FIELD_DEFAULTS.items():
bq.setdefault(key, default)
@router.get("/server-config")
async def get_server_config(
user: dict = Depends(require_admin),
):
"""Return the current instance.yaml with secrets redacted.
Used by the /admin/server-config UI to prefill its form. The redacted
payload mirrors the actual file shape, so the UI doesn't need to know
the schema — it iterates over the editable sections and renders the
fields it finds. Empty sections still show in the response so the form
knows to render their headers.
"""
config = _load_current_instance_yaml()
redacted = _public_view(config)
# Surface every editable section so the UI renders them even when the
# file omits them — operator can populate from scratch without manual
# JSON edits.
sections = {section: redacted.get(section, {}) for section in _EDITABLE_SECTIONS}
# Always surface the optional BQ knobs so the operator sees them in the
# UI's JSON editor instead of having to know they exist (Phase J).
_ensure_bq_optional_fields(sections)
return {
"sections": sections,
"editable_sections": list(_EDITABLE_SECTIONS),
"danger_sections": list(_DANGER_SECTIONS),
"secret_key_patterns": list(_SECRET_KEY_PATTERNS),
# Known-but-optional fields per section so the UI can render
# placeholders for fields the operator hasn't set yet (Phase J).
# Subagents 2-4 populate the bodies; the renderer ships now so the
# mechanism is wired end-to-end and adding entries is purely a
# data-edit in `_KNOWN_FIELDS` above.
"known_fields": _KNOWN_FIELDS,
}
@router.post("/server-config")
async def update_server_config(
request: ServerConfigUpdateRequest,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Patch instance.yaml from the /admin/server-config editor.
Accepts a partial patch keyed by section. Validates sections, refuses
danger-zone edits without explicit confirmation, deep-merges into the
current overlay, writes the file, and emits one audit entry per save
with a sanitized diff. Returns ``restart_required=true`` so the UI can
show the restart banner — hot-reload is a separate issue (see #91 Out
of scope).
"""
import yaml
if not request.sections:
raise HTTPException(status_code=422, detail="sections cannot be empty")
# Reject unknown sections loudly. Without this, a typo like "thmee"
# would silently land in the YAML root and the operator wouldn't see
# their colour change apply.
unknown = sorted(set(request.sections.keys()) - set(_EDITABLE_SECTIONS))
if unknown:
raise HTTPException(
status_code=400,
detail=f"unknown section(s): {', '.join(unknown)}. "
f"Editable: {', '.join(_EDITABLE_SECTIONS)}",
)
# Danger-zone gate. The UI shows a confirmation dialog before posting
# with confirm_danger=true; an API caller (CLI/script) has to pass it
# explicitly so they can't fat-finger a hostname change.
danger_touched = sorted(set(request.sections.keys()) & set(_DANGER_SECTIONS))
if danger_touched and not request.confirm_danger:
raise HTTPException(
status_code=400,
detail=f"section(s) {', '.join(danger_touched)} require confirm_danger=true",
)
# SSRF protection — same gate the /configure wizard applies to
# keboola_url, but here it covers any URL-bearing field reachable via
# the per-section patch (e.g. data_source.keboola.stack_url).
_validate_urls_in_patch(request.sections)
# Defense-in-depth: scrub redaction sentinels (`***` / `<empty>`) out of
# secret-keyed leaves in the patch before they reach the deep-merge.
# The client form does the same scrub, but an API caller round-tripping
# the GET payload could otherwise overwrite real overlay secrets with
# the placeholder shown in the form.
scrubbed_sections: Dict[str, Dict[str, Any]] = {
section: _strip_redacted_sentinels(patch, section)
for section, patch in request.sections.items()
}
# Serialize read-modify-write across concurrent admin saves. Without the
# lock, two saves would each read the same overlay snapshot, merge their
# disjoint patches, and the second os.replace would silently drop the
# first patch. The lock spans the cache-invalidate → load → merge →
# atomic-write sequence; the audit log sits outside since it operates on
# local snapshots.
from app.instance_config import reset_cache
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
config_path = data_dir / "state" / "instance.yaml"
config_path.parent.mkdir(parents=True, exist_ok=True)
with _overlay_write_lock:
# Drop the in-process cache so we read the latest on-disk state,
# including any update that landed from a concurrent caller before
# we acquired the lock.
reset_cache()
before = _load_current_instance_yaml()
# Deep merge — section-by-section so we never accidentally delete a
# sibling section the patch didn't touch. Use the redaction-scrubbed
# patch so a round-tripped GET payload can't overwrite real secrets
# with the `***` placeholder.
after = dict(before)
for section, patch in scrubbed_sections.items():
if not isinstance(patch, dict):
raise HTTPException(
status_code=422,
detail=f"section '{section}' must be an object, got {type(patch).__name__}",
)
if isinstance(after.get(section), dict):
after[section] = _deep_merge(after[section], patch)
else:
after[section] = patch
# Write only the sections the user actually patched in this request.
# Two reasons:
# 1. Persisting the full merged config (or every editable section)
# would snapshot non-editable static sections into the overlay,
# shadowing later operator updates to those sections in the
# static file (`_load_current_instance_yaml` merges static + overlay,
# overlay wins per leaf).
# 2. The merged config has `${ENV_VAR}` placeholders RESOLVED to the
# runtime values by config.loader. Writing every editable section
# back would persist real cleartext secrets where the static file
# had only env-var references — turning `smtp_password:
# ${SMTP_PASSWORD}` into `smtp_password: hunter2` in the overlay.
# By writing only the sections in `request.sections` we keep both the
# static-evolution and the env-var-placeholder properties intact.
overlay_payload: Dict[str, Any] = {}
if config_path.exists():
try:
overlay_payload = yaml.safe_load(config_path.read_text()) or {}
except Exception as e:
# A corrupt overlay used to be silently replaced — that masked
# disk corruption / partial writes / hand-edits and dropped
# every previously-saved section on the next save. Refuse and
# surface so the operator can investigate.
logger.exception("server-config: refusing to overwrite corrupt overlay at %s", config_path)
raise HTTPException(
status_code=500,
detail=f"refusing to overwrite corrupt overlay at {config_path} ({e}); "
"back up and remove the file, or fix it by hand",
) from e
for section, patch in scrubbed_sections.items():
if section not in _EDITABLE_SECTIONS:
continue
# Deep-merge the patch into the existing overlay slot (or static-
# backed `before` if overlay had nothing for this section). This
# preserves any unrelated keys the operator didn't touch in this
# request — e.g. patching `email.smtp_host` doesn't blow away the
# `email.smtp_password: ${SMTP_PASSWORD}` reference.
existing = overlay_payload.get(section)
if not isinstance(existing, dict):
existing = {}
overlay_payload[section] = _deep_merge(existing, patch)
# Atomic via tmp + os.replace so two concurrent admin saves can't
# interleave bytes and produce corrupt YAML (especially harmful since
# auth.* is editable here — half-written file → operator lockout).
tmp_path = config_path.with_suffix(config_path.suffix + ".tmp")
tmp_path.write_text(yaml.dump(overlay_payload, default_flow_style=False, sort_keys=False))
os.replace(tmp_path, config_path)
logger.info("server-config: wrote %d section(s) to %s",
len(request.sections), config_path)
# Invalidate cached instance config so subsequent reads pick up the
# change. Hot-reload of running modules (auth providers, SMTP client)
# is out of scope — the restart banner tells the operator to bounce.
reset_cache()
# Audit entry — diff is computed on RAW values then `_diff_dicts`
# redacts each row whose leaf key matches `_is_secret_key`. Pre-
# masking the inputs would collapse a secret rotation into "no
# diff" because both sides redact to ``***``, hiding the most
# security-relevant changes from the audit log. We log even if no
# fields changed so the operator's intent (touched the page, hit
# save) is auditable.
diff = _diff_dicts(before, after)
AuditRepository(conn).log(
user_id=user.get("id"),
action="instance_config.update",
resource="instance.yaml",
params={
"sections": sorted(request.sections.keys()),
"danger_sections": danger_touched,
"diff": diff,
"diff_count": len(diff),
},
)
return {
"status": "ok",
"restart_required": True,
"sections_updated": sorted(request.sections.keys()),
"diff_count": len(diff),
}
# --- End server-config editor -----------------------------------------------
# Source types accepted by /api/admin/register-table. Anything else is
# rejected with 422 — keeps a typo'd source_type from silently landing in
# table_registry (where it would later confuse the orchestrator scan).
_VALID_SOURCE_TYPES: tuple[str, ...] = ("keboola", "bigquery", "jira", "local")
# Explicit allowlist of audit-payload keys whose values are credentials and
# must be masked. Substring-scan + ad-hoc whitelist (the previous shape) is
# fragile in two ways:
# 1. False positive: legit fields like `primary_key` get masked because
# they contain "key" — we then need a whitelist exception, which has
# to be kept in sync as new fields are added.
# 2. False negative: a future field like `primary_key_hash` *would* be
# masked (defensible) but `not_actually_a_token` ALSO matches "token"
# and gets masked unnecessarily; conversely, a brand-new credential
# field that doesn't contain one of the patterns (`auth_material`,
# `bearer`) silently leaks.
# Allowlist puts the burden on the developer adding a new secret-bearing
# field: they must add the literal key name here, which forces a code-
# review touch on the audit path. Audit the current Pydantic models
# (RegisterTableRequest / UpdateTableRequest / ConfigureRequest /
# ServerConfigUpdateRequest) when extending — the registry payloads don't
# currently carry credentials, but ConfigureRequest does (`keboola_token`)
# and could be routed through this sanitizer in the future.
_SECRET_FIELDS: frozenset = frozenset({
# ConfigureRequest — POST /api/admin/configure carries Keboola creds.
"keboola_token",
# Generic names that have appeared in earlier iterations of admin
# request bodies and could resurface — keep them masked defensively.
"api_token",
"auth_token",
"bot_token",
"client_secret",
"google_client_secret",
"google_oauth_client_secret",
"password",
"smtp_password",
"webapp_secret_key",
"bot_secret",
# Marketplace PATs (private repos) — see src/marketplace.py.
"marketplace_token",
"marketplace_pat",
})
def _sanitize_for_audit(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Mask credential-bearing fields in a request payload before audit_log.
Uses an explicit `_SECRET_FIELDS` allowlist (case-insensitive) instead
of substring matching. The trade-off is that adding a new secret field
requires updating the set — but that's the *point*: the test suite
asserts `not_actually_a_token` does NOT get masked, so a substring-
based regression would surface immediately, and a missing entry for a
real new credential gets caught at code review of the audit path.
"""
out: Dict[str, Any] = {}
for k, v in payload.items():
if k.lower() in _SECRET_FIELDS:
out[k] = "***" if v not in (None, "") else "<empty>"
else:
out[k] = v
return out
class RegisterTableRequest(BaseModel):
name: str
folder: Optional[str] = None
sync_strategy: str = Field(
default="full_refresh",
deprecated=True,
description=(
"DEPRECATED: catalog/profiler metadata only. No extractor reads "
"this field; every sync is a full overwrite regardless of value. "
"profiler.is_partitioned() consumes it for parquet-layout "
"detection. Field stays for back-compat; will be removed in a "
"future major release."
),
)
# Composite primary keys are real (session-grain MSA tables key on
# `(session_id, event_date)`, browse rows on more). The frontend sends +
# reads this as a list; backend stores it JSON-serialized in VARCHAR.
# A bare string is accepted for backward compat — see _normalize_primary_key.
primary_key: Optional[List[str]] = None
description: Optional[str] = None
source_type: Optional[str] = None
bucket: Optional[str] = None
source_table: Optional[str] = None
# Backs query_mode='materialized'. Stored verbatim in
# table_registry.source_query (schema v20); the trigger pass runs it
# through the DuckDB BQ extension via BqAccess and writes the result
# to /data/extracts/bigquery/data/<id>.parquet.
source_query: Optional[str] = None
query_mode: str = "local"
sync_schedule: Optional[str] = None
profile_after_sync: bool = Field(
default=True,
deprecated=True,
description=(
"DEPRECATED: not consumed by the runtime (Agent 1 finding "
"2026-05-01). Profiler runs unconditionally on every synced "
"table; this flag has no effect. Field stays for back-compat."
),
)
@model_validator(mode="after")
def _check_mode_query_coherence(self):
"""Enforce query_mode ↔ source_query invariants up front so an admin
can't persist a remote/local row carrying an orphan source_query, and
materialized rows can't be registered without a SQL body."""
sq = (self.source_query or "").strip() or None
if self.query_mode == "materialized" and not sq:
raise ValueError(
"query_mode='materialized' requires a non-empty source_query"
)
if self.query_mode != "materialized" and sq:
raise ValueError(
"source_query is only valid when query_mode='materialized'"
)
# Normalise: stash the trimmed-or-None form so the persisted column
# never carries surrounding whitespace or empty-string sentinels.
self.source_query = sq
return self
@field_validator("primary_key", mode="before")
@classmethod
def _coerce_primary_key(cls, v):
return _normalize_primary_key(v)
@field_validator("source_type", mode="before")
@classmethod
def _validate_source_type(cls, v):
# None is tolerated for backward compat with old CLI scripts that
# didn't set a source_type; the route resolves it later. Anything
# else must be in the canonical list.
if v in (None, ""):
return v
if v not in _VALID_SOURCE_TYPES:
raise ValueError(
f"source_type must be one of {sorted(_VALID_SOURCE_TYPES)}, got {v!r}"
)
return v
@field_validator("sync_schedule", mode="before")
@classmethod
def _validate_sync_schedule(cls, v):
# None / "" → no schedule, accepted.
# Any non-empty string (including pure whitespace) must parse as a
# valid schedule — otherwise it would be persisted and silently
# ignored by the runtime evaluator.
if v in (None, ""):
return v
if not is_valid_schedule(v):
raise ValueError(
f"sync_schedule must be 'every Nm' / 'every Nh' / "
f"'daily HH:MM[,HH:MM,...]', got {v!r}"
)
return v
def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None:
"""Enforce BQ-specific shape on a register/precheck request.
Two BQ paths:
- ``query_mode='materialized'`` — admin-registered SQL writes a parquet on
schedule. Requires ``source_query``; ``bucket`` / ``source_table`` are
not used (the SQL inlines the references). Doesn't force any field; the
Pydantic ``model_validator`` already gated the query/mode coherence.
- ``query_mode='remote'`` (or default) — remote view over a single BQ
table. Requires ``bucket`` (BQ dataset) + ``source_table``. Mutates
the model: forces ``query_mode='remote'`` and ``profile_after_sync=False``
(per Decision 7 in #108) so a caller can't accidentally enqueue a
parquet profiling pass for a remote view that has no local file.
Raises HTTPException(422) for missing required fields and
HTTPException(400) for unsafe identifiers / bogus project_id.
"""
if req.query_mode == "materialized":
# Materialized BQ rows: the SQL body replaces dataset+table refs.
# Pydantic model_validator already verified source_query is non-empty;
# all we still need is a valid project_id and a safe view name.
if not req.source_query or not req.source_query.strip():
raise HTTPException(
status_code=422,
detail="bigquery materialized: 'source_query' is required",
)
raw_name = req.name or ""
if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name):
raise HTTPException(
status_code=400,
detail=(
f"bigquery: view name {raw_name!r} is unsafe — must match "
f"^[a-zA-Z_][a-zA-Z0-9_]{{0,63}}$ (DuckDB identifier rules) "
"with no leading/trailing whitespace"
),
)
from app.instance_config import get_value
project_id = get_value("data_source", "bigquery", "project", default="")
if not project_id:
raise HTTPException(
status_code=400,
detail=(
"bigquery: data_source.bigquery.project is not set in "
"instance.yaml; configure it via /admin/server-config or "
"/api/admin/configure first"
),
)
if not _is_safe_project_id(project_id):
raise HTTPException(
status_code=400,
detail=(
f"bigquery: data_source.bigquery.project {project_id!r} "
"is malformed — must match GCP project_id grammar "
"^[a-z][a-z0-9-]{4,28}[a-z0-9]$"
),
)
# Phase C: profile_after_sync is now inert (Pydantic field marked
# deprecated; not read by app/api/sync.py:410-438). The runtime
# profiles every synced table unconditionally, so we no longer
# force-set this here as a "signal."
return
if not req.bucket or not req.bucket.strip():
raise HTTPException(
status_code=422,
detail="bigquery: 'bucket' (BQ dataset) is required",
)
if not req.source_table or not req.source_table.strip():
raise HTTPException(
status_code=422,
detail="bigquery: 'source_table' is required",
)
# No wildcard / sharded BQ tables in M1 (Decision 8).
if "*" in (req.source_table or "") or "*" in (req.bucket or ""):
raise HTTPException(
status_code=400,
detail="bigquery: wildcard / sharded tables are not supported (see #108 M3+)",
)
# Strict identifier on the DuckDB view name. CRITICAL: validate the RAW
# name (the value that ``register_table`` actually persists to
# ``table_registry.name`` and which the BQ extractor reads back as the
# DuckDB view name at next rebuild). Earlier revisions normalized first
# (``strip().lower().replace(" ", "_")``) and then checked, which let
# names like ``"my table"`` pass here, get stored verbatim, and then
# blow up inside ``_init_extract`` at view-create time — defeating the
# whole point of fast-fail-at-register. We do NOT silently rewrite the
# operator's name; if they typed ``"my table"``, return 400 with a
# clear message and let them retype with a corrected name.
raw_name = req.name or ""
if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name):
raise HTTPException(
status_code=400,
detail=(
f"bigquery: view name {raw_name!r} is unsafe — must match "
f"^[a-zA-Z_][a-zA-Z0-9_]{{0,63}}$ (DuckDB identifier rules) "
"with no leading/trailing whitespace"
),
)
# Same fast-fail rule as ``raw_name`` above: validate the RAW value the
# caller sent, not a stripped form. ``register_table`` persists ``bucket``
# / ``source_table`` verbatim, and the BQ extractor splices them straight
# into the ``ATTACH … AS bq_<bucket>`` and view DDL at next rebuild — so a
# value with leading/trailing whitespace passes validation here, gets
# stored as-is, and explodes inside DuckDB at view-create time. Surface
# the offending raw value in the 400 detail and let the operator retype.
raw_bucket = req.bucket
if raw_bucket.strip() != raw_bucket or not _is_safe_quoted_identifier(raw_bucket):
raise HTTPException(
status_code=400,
detail=(
f"bigquery: dataset {raw_bucket!r} is unsafe (only [A-Za-z0-9_.-] "
"allowed, no leading/trailing whitespace)"
),
)
raw_source_table = req.source_table
if raw_source_table.strip() != raw_source_table or not _is_safe_quoted_identifier(raw_source_table):
raise HTTPException(
status_code=400,
detail=(
f"bigquery: source_table {raw_source_table!r} is unsafe (only "
"[A-Za-z0-9_.-] allowed, no leading/trailing whitespace)"
),
)
# Pull project from instance.yaml — single-project model in M1
# (Decision: no per-table project field). Validate the format here so
# we surface a config issue at registration rather than at first
# rebuild, where the operator no longer has a request to look at.
from app.instance_config import get_value
project_id = get_value("data_source", "bigquery", "project", default="")
if not project_id:
raise HTTPException(
status_code=400,
detail=(
"bigquery: data_source.bigquery.project is not set in instance.yaml; "
"configure it via /admin/server-config or /api/admin/configure first"
),
)
if not _is_safe_project_id(project_id):
raise HTTPException(
status_code=400,
detail=(
f"bigquery: data_source.bigquery.project {project_id!r} is malformed — "
"must match GCP project_id grammar ^[a-z][a-z0-9-]{4,28}[a-z0-9]$"
),
)
# Force the BQ-required mode (Decision 7). The orchestrator and
# extractor both assume remote; persisting `local` here would later create
# a profiling job against a non-existent parquet file.
# Phase C: profile_after_sync is now inert (deprecated, not read by the
# runtime); no longer force-set here.
req.query_mode = "remote"
class UpdateTableRequest(BaseModel):
name: Optional[str] = None
sync_strategy: Optional[str] = Field(
default=None,
deprecated=True,
description=(
"DEPRECATED: catalog/profiler metadata only. See "
"RegisterTableRequest.sync_strategy."
),
)
primary_key: Optional[List[str]] = None
description: Optional[str] = None
source_type: Optional[str] = None
bucket: Optional[str] = None
source_table: Optional[str] = None
source_query: Optional[str] = None
query_mode: Optional[str] = None
sync_schedule: Optional[str] = None
profile_after_sync: Optional[bool] = Field(
default=None,
deprecated=True,
description=(
"DEPRECATED: not consumed by the runtime. See "
"RegisterTableRequest.profile_after_sync."
),
)
@model_validator(mode="after")
def _check_mode_query_coherence(self):
"""PUT semantics — only the fields explicitly in the body are
validated. The body is overlaid on the existing row at the handler
level (see ``update_table``), so omitted fields keep their stored
values and the synthetic ``RegisterTableRequest`` constructed against
the merged record runs the strict cross-field check before persist.
The only invariants enforceable from the PUT body alone:
- explicit ``source_query='SELECT ...'`` paired with ``query_mode``
that isn't materialized → coherent reject (the SQL would be dead);
- explicit ``source_query='SELECT ...'`` without any ``query_mode``
in the body → reject; the operator must commit to materialized;
- explicit empty/whitespace ``source_query=''`` paired with
``query_mode='materialized'`` → reject (operator clearly
mistyped — they sent the field).
Pre-fix this validator also rejected ``{"query_mode": "materialized",
"sync_schedule": "every 12h"}`` because ``source_query`` was None
— but that's the canonical "edit the schedule on a materialized
row" use-case from the Edit modal, which always sends
``query_mode`` to indicate intent. Devin BUG_0002 on PR #148
commit 2219255.
"""
if self.query_mode is None and self.source_query is None:
return self
sq_raw = self.source_query
sq = (sq_raw or "").strip() or None
# Operator explicitly sent source_query as empty/whitespace while
# claiming materialized — typo / bad form data, reject.
if (
self.query_mode == "materialized"
and sq_raw is not None
and not sq
):
raise ValueError(
"query_mode='materialized' requires a non-empty source_query"
)
# source_query only makes sense with materialized mode. Allow None
# (omitted) to flow through; only reject when explicitly set with
# the wrong mode.
if (
self.query_mode is not None
and self.query_mode != "materialized"
and sq
):
raise ValueError(
"source_query is only valid when query_mode='materialized'"
)
if self.query_mode is None and sq:
raise ValueError(
"source_query requires query_mode='materialized' to be set "
"in the same request"
)
# Normalise: drop whitespace-only strings to None so the persisted
# column is clean. Don't touch when source_query was None to begin
# with — that signals "PUT didn't touch this field, keep existing".
if sq_raw is not None:
self.source_query = sq
return self
@field_validator("primary_key", mode="before")
@classmethod
def _coerce_primary_key(cls, v):
return _normalize_primary_key(v)
# Duplicated from RegisterTableRequest — Pydantic v2 validators don't
# inherit cleanly across unrelated BaseModel classes; a shared mixin
# would be overkill for two fields.
@field_validator("sync_schedule", mode="before")
@classmethod
def _validate_sync_schedule(cls, v):
# None / "" → no schedule, accepted.
# Any non-empty string (including pure whitespace) must parse as a
# valid schedule — otherwise it would be persisted and silently
# ignored by the runtime evaluator.
if v in (None, ""):
return v
if not is_valid_schedule(v):
raise ValueError(
f"sync_schedule must be 'every Nm' / 'every Nh' / "
f"'daily HH:MM[,HH:MM,...]', got {v!r}"
)
return v
class ConfigureRequest(BaseModel):
data_source: str # "keboola" | "bigquery" | "local"
keboola_token: Optional[str] = None
keboola_url: Optional[str] = None
bigquery_project: Optional[str] = None
bigquery_location: Optional[str] = None
instance_name: Optional[str] = None
allowed_domain: Optional[str] = None
@router.get("/discover-tables")
async def discover_tables(
user: dict = Depends(require_admin),
dataset: Optional[str] = None,
):
"""Discover available tables from the configured data source.
For ``data_source.type='keboola'`` returns the full Storage API table
list (single round-trip). For ``data_source.type='bigquery'``:
- Without ``dataset``: list datasets in the configured project.
- With ``dataset=name``: list tables (BASE TABLE + VIEW) in that dataset.
Two-step shape avoids paying the per-dataset list_tables cost up-front
on projects with hundreds of datasets — the UI populates the dataset
dropdown first, then fetches tables only for the selected dataset.
"""
try:
from app.instance_config import get_data_source_type
source_type = get_data_source_type()
if source_type == "keboola":
from connectors.keboola.client import KeboolaClient
from app.instance_config import get_value
url = get_value("data_source", "keboola", "stack_url", default="")
token_env = get_value("data_source", "keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN")
token = os.environ.get(token_env, "") if token_env else ""
if not token:
token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
client = KeboolaClient(token=token, url=url)
tables = client.discover_all_tables()
return {"tables": tables, "count": len(tables), "source": "keboola"}
if source_type == "bigquery":
return _discover_bigquery(dataset=dataset)
return {
"tables": [],
"count": 0,
"source": source_type,
"error": f"Discovery not implemented for source_type={source_type!r}",
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Discovery failed: {e}")
def _discover_bigquery(dataset: Optional[str]) -> Dict[str, Any]:
"""List BQ datasets (when ``dataset`` is None) or tables-in-dataset.
Routes through ``BqAccess.client()`` so config / auth / error
translation matches the rest of the BQ surface (#138 facade). Returns
the same shape as the Keboola path so the UI doesn't have to branch.
"""
from connectors.bigquery.access import (
get_bq_access,
BqAccessError,
translate_bq_error,
)
try:
bq = get_bq_access()
client = bq.client()
except BqAccessError as e:
raise HTTPException(
status_code=BqAccessError.HTTP_STATUS.get(e.kind, 500),
detail={"error": e.message, "kind": e.kind, "details": e.details},
)
try:
if dataset is None:
datasets = []
for ds in client.list_datasets():
datasets.append({
"dataset_id": ds.dataset_id,
"full_id": f"{ds.project}.{ds.dataset_id}",
})
return {
"datasets": sorted(datasets, key=lambda d: d["dataset_id"]),
"count": len(datasets),
"source": "bigquery",
}
# List tables in the named dataset. `list_tables` returns
# `TableListItem` with `table_id` + `table_type` ('TABLE', 'VIEW',
# 'MATERIALIZED_VIEW', 'EXTERNAL', 'SNAPSHOT'). UI maps TABLE → Type
# selector "table" and VIEW/MATERIALIZED_VIEW → "view"; the rest are
# passed through with their raw type so the operator can decide.
tables = []
for t in client.list_tables(dataset):
tables.append({
"table_id": t.table_id,
"table_type": t.table_type,
"full_id": f"{t.project}.{t.dataset_id}.{t.table_id}",
})
return {
"tables": sorted(tables, key=lambda t: t["table_id"]),
"count": len(tables),
"source": "bigquery",
"dataset": dataset,
}
except Exception as e:
# `translate_bq_error` re-raises non-Google exceptions unchanged,
# so wrap in HTTPException to keep the JSON-shape contract.
try:
err = translate_bq_error(e, bq.projects, bad_request_status="upstream_error")
except Exception:
raise HTTPException(status_code=502, detail=f"BQ discovery failed: {e}")
raise HTTPException(
status_code=BqAccessError.HTTP_STATUS.get(err.kind, 502),
detail={"error": err.message, "kind": err.kind, "details": err.details},
)
@router.get("/registry")
async def list_registry(
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Get full table registry."""
repo = TableRegistryRepository(conn)
tables = repo.list_all()
return {"tables": tables, "count": len(tables)}
# Wall-clock budget for the synchronous BQ materialization that runs after
# a successful BQ register. If the rebuild + view creation exceeds this,
# we hand the rest off to BackgroundTasks and return 202. 5s matches the
# UX contract in #108 ("Queryable as <view> within seconds") — long enough
# to cover a healthy GCE round-trip, short enough that a hung GCE call
# doesn't park the request handler.
_BQ_SYNC_REGISTER_TIMEOUT_S: float = 5.0
def _materialize_bigquery_extract() -> Dict[str, Any]:
"""Re-build the BigQuery extract.duckdb + master views.
Wrapper used by both the synchronous (in-band) and async (BackgroundTask)
code paths after a BQ register/update/delete. Imports kept inside the
function so non-BQ instances don't pay the import cost on app start.
Opens a FRESH system DB connection rather than reusing the request-scoped
one. The request handler closes its connection in a `finally` after the
response, but BackgroundTask + the timeout-fallback daemon thread can
both outlive that close — they would then operate on a closed handle (or
one being torn down concurrently). A fresh handle is cheap (DuckDB is an
embedded engine) and isolates the worker's lifetime from the request's.
Returns the rebuild result dict (``{"errors": [...], "tables_registered":
N, ...}``) so the synchronous caller can propagate failures to the
operator. Background-task callers ignore the return value, but the loud
log inside ``_run_bigquery_materialize_with_timeout`` covers that path.
"""
from connectors.bigquery import extractor as _bq_extractor
from src.db import get_system_db
from src.orchestrator import SyncOrchestrator
fresh_conn = get_system_db()
try:
result = _bq_extractor.rebuild_from_registry(conn=fresh_conn)
SyncOrchestrator().rebuild()
return result or {}
finally:
try:
fresh_conn.close()
except Exception:
pass
def _materialize_bigquery_extract_bg() -> None:
"""BackgroundTask wrapper around `_materialize_bigquery_extract`.
BackgroundTasks discard return values, but `rebuild_from_registry` can
surface auth / config / identifier errors via the ``errors`` list. Log
those at ERROR level so the failure is loud in the operator's logs even
though the 202 response can't carry the detail (Decision 3 in #108: a
202 is documented as "accepted, may not be queryable yet" — we don't
block on it but we shouldn't swallow it either).
"""
try:
result = _materialize_bigquery_extract()
except Exception:
logger.exception("BQ post-register background materialize crashed")
return
errors = (result or {}).get("errors") or []
if errors:
logger.error(
"BQ post-register background materialize completed with %d error(s): %s",
len(errors), errors,
)
def _run_bigquery_materialize_with_timeout(
background: BackgroundTasks,
) -> Dict[str, Any]:
"""Try to materialize synchronously within the wall-clock budget.
Returns a dict with:
- ``status`` ∈ {"ok", "errors", "timeout"} — caller maps to HTTP code
- ``errors``: list of {table, error} surfaced by ``rebuild_from_registry``
(only present on ``status="errors"``)
Mapping by caller (`register_table`):
- "ok" → 200 (synchronous success)
- "errors" → 500 (rebuild ran but reported errors — propagate so
the operator knows the registry row exists but the
view wasn't created)
- "timeout" → 202 (rebuild still running on a BackgroundTask)
The synchronous worker runs on a daemon thread (so a hung GCE call
can't park the request) that opens its OWN system DB connection (see
`_materialize_bigquery_extract`). Even though FastAPI now invokes the
sync route in a threadpool — and `done.wait()` no longer blocks the
event loop — we still off-load to a daemon so the wait is bounded
even if `rebuild_from_registry` ignores its own timeouts.
"""
import threading
done = threading.Event()
err_holder: Dict[str, Any] = {}
result_holder: Dict[str, Any] = {}
def _worker():
try:
result_holder["result"] = _materialize_bigquery_extract()
except Exception as e: # pragma: no cover — logged below
err_holder["error"] = e
finally:
done.set()
t = threading.Thread(target=_worker, daemon=True, name="bq-register-rebuild")
t.start()
finished = done.wait(_BQ_SYNC_REGISTER_TIMEOUT_S)
if finished:
if "error" in err_holder:
# Worker finished within the wall-clock budget but raised. This
# is a HARD ERROR, not a timeout — surface it as such so the
# operator gets the actual exception in the 500 body instead
# of a misleading 202 + "still working in the background".
# Earlier revisions returned ``{"status": "timeout"}`` here,
# which the register handler then mapped to 202 + a retry
# BackgroundTask; that hid the real failure for `_BQ_SYNC_
# REGISTER_TIMEOUT_S` seconds before the BG retry surfaced
# the same exception in the logs.
exc = err_holder["error"]
logger.error(
"BQ post-register rebuild raised within budget: %r",
exc,
)
return {
"status": "errors",
"errors": [{"error": f"{type(exc).__name__}: {exc}"}],
}
# Synchronous worker finished cleanly — but check whether
# `rebuild_from_registry` itself surfaced any errors (auth fail,
# missing project from the overlay, unsafe identifier slipping the
# validator, etc.). Without this, those errors got silently logged
# and the API claimed success.
result = result_holder.get("result") or {}
errors = result.get("errors") or []
if errors:
logger.error(
"BQ post-register rebuild reported %d error(s): %s",
len(errors), errors,
)
return {"status": "errors", "errors": errors}
return {"status": "ok"}
# Timed out — let the worker keep running on its thread (already daemon)
# and also schedule a BackgroundTask so the orchestrator gets called via
# the supported FastAPI path. `_INIT_EXTRACT_LOCK` in the BQ extractor
# serializes the two file-swap calls so the slow daemon thread and the
# background task can't tear `extract.duckdb`; the orchestrator's own
# `_rebuild_lock` protects the master-view rebuild step downstream.
logger.info(
"BQ post-register rebuild exceeded %ss budget — handing off to BackgroundTask",
_BQ_SYNC_REGISTER_TIMEOUT_S,
)
background.add_task(_materialize_bigquery_extract_bg)
return {"status": "timeout"}
@router.post(
"/register-table",
responses={
200: {"description": "BigQuery row registered + materialized synchronously"},
201: {"description": "Non-BigQuery row registered (no post-insert materialize)"},
202: {"description": "BigQuery row registered; materialize continues in background"},
409: {"description": "Table id or view name already in use"},
500: {"description": "BigQuery row registered but post-insert rebuild failed"},
},
)
def register_table(
request: RegisterTableRequest,
background: BackgroundTasks,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Register a new table in the system.
Behavior by source_type:
- **bigquery**: validates BQ-specific shape (dataset / source_table /
identifier safety / project_id format), forces query_mode='remote' and
profile_after_sync=False, then synchronously rebuilds extract.duckdb +
master views with a wall-clock budget. Returns 200 with the view name
on success, 202 on budget overrun (rebuild continues in a
BackgroundTask), or 500 if the synchronous rebuild ran but reported
an error (e.g. auth failure, missing project, unsafe identifier).
- other source types: insert-only, no post-register hook. Returns 201.
Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a
threadpool — the synchronous-materialize path waits on
``threading.Event.wait()``, which would otherwise block the asyncio
event loop and stall every other request for up to ``_BQ_SYNC_REGISTER_
TIMEOUT_S``. ``Depends(...)``, ``BackgroundTasks``, and
``JSONResponse`` all work the same in sync handlers; the rest of the
admin module mixes both styles already.
The route does NOT carry a default ``status_code`` — each branch returns
its own JSONResponse with the right code. A blanket ``status_code=201``
on the decorator would mislead OpenAPI consumers about the BQ branch.
Always: 409 on view-name collision against the existing registry, audit
log entry on success.
"""
from fastapi.responses import JSONResponse
if not request.name or not request.name.strip():
raise HTTPException(status_code=422, detail="Table name cannot be empty")
repo = TableRegistryRepository(conn)
table_id = request.name.strip().lower().replace(" ", "_")
if repo.get(table_id):
raise HTTPException(status_code=409, detail=f"Table '{table_id}' already registered")
# View-name collision pre-check — distinct from id collision above.
# `id` is derived from `name`, but two callers could legally pick
# different display names that lower-case + slugify to the same view
# (e.g. "Orders v2" + "orders_v2"); the strict view-name uniqueness
# check stops that here, before the orchestrator surfaces it as a
# silent overwrite at next rebuild.
existing_by_name = next(
(r for r in repo.list_all() if (r.get("name") or "") == request.name),
None,
)
if existing_by_name is not None:
raise HTTPException(
status_code=409,
detail=f"View name '{request.name}' is already in use by table id '{existing_by_name.get('id')}'",
)
# BQ rows go through the extra validation + post-insert materialization
# contract from issue #108. Other source types keep the legacy insert-only
# flow — Keboola materialization happens via the scheduled sync, Jira via
# webhook, local via a manual extractor run.
is_bigquery = request.source_type == "bigquery"
if is_bigquery:
_validate_bigquery_register_payload(request)
# Phase C: profile_after_sync is no longer passed — the field is
# deprecated and inert at the runtime layer. The DB column keeps its
# schema default; the registry response no longer reflects request
# values for this flag.
repo.register(
id=table_id,
name=request.name,
folder=request.folder,
sync_strategy=request.sync_strategy,
primary_key=request.primary_key,
description=request.description,
registered_by=user.get("email"),
source_type=request.source_type,
bucket=request.bucket,
source_table=request.source_table,
source_query=request.source_query,
query_mode=request.query_mode,
sync_schedule=request.sync_schedule,
)
# Audit entry — masked params; description kept raw (it's documentation).
AuditRepository(conn).log(
user_id=user.get("id"),
action="register_table",
resource=table_id,
params=_sanitize_for_audit(request.model_dump()),
)
if not is_bigquery:
# Keboola / Jira / local rows are insert-only here. 201 Created — the
# decorator no longer carries a default status, so each branch is
# explicit about its code (BQ branch overrides via JSONResponse).
return JSONResponse(
status_code=201,
content={"id": table_id, "name": request.name, "status": "registered"},
)
if request.query_mode == "materialized":
# Materialized BQ rows are picked up by the trigger pass on the next
# scheduled tick (or via POST /api/sync/trigger). No synchronous
# rebuild — the COPY can scan multi-GB and would block the request.
return JSONResponse(
status_code=201,
content={
"id": table_id,
"name": request.name,
"status": "registered",
"view_name": table_id,
"message": (
"Materialized — parquet will be written on the next sync "
"tick. Trigger now via POST /api/sync/trigger."
),
},
)
# BQ post-register: rebuild extract + master views, with timeout fallback.
# Decision 1: 200 on synchronous success, 202 on timeout, 500 if the
# synchronous rebuild surfaced errors. Distinct from the 201 Keboola
# path above, so the BQ branch builds its own response.
outcome = _run_bigquery_materialize_with_timeout(background)
status = outcome.get("status")
if status == "ok":
return JSONResponse(
status_code=200,
content={
"id": table_id,
"name": request.name,
"status": "ok",
"view_name": table_id,
},
)
if status == "errors":
# Registry insert succeeded but the post-insert rebuild reported
# errors — the row is in the registry but the master view was NOT
# created. Surface the failure verbatim so the operator can fix
# the underlying config (typically a missing
# `data_source.bigquery.project` in the overlay or auth that lacks
# bigquery.metadata.get on the dataset). The row stays in the
# registry; a re-run after fixing the config picks up the existing
# row and creates the view on the next register/update or
# scheduler tick.
return JSONResponse(
status_code=500,
content={
"id": table_id,
"name": request.name,
"status": "rebuild_failed",
"view_name": table_id,
"errors": outcome.get("errors") or [],
"message": (
"Registry row created but post-insert rebuild failed; "
"view is not queryable. See `errors` for details."
),
},
)
# Default: timeout — rebuild continues on a BackgroundTask.
return JSONResponse(
status_code=202,
content={
"id": table_id,
"name": request.name,
"status": "accepted",
"view_name": table_id,
"message": "Registration accepted; materializing in background",
},
)
class PrecheckResponse(BaseModel):
"""Response model for /api/admin/register-table/precheck.
Documented here so OpenAPI consumers know what to expect; the route
returns a plain dict for backwards compatibility with the rest of the
admin API which doesn't use response_model.
"""
ok: bool
table: Dict[str, Any]
@router.post("/register-table/precheck")
def register_table_precheck(
request: RegisterTableRequest,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Validate a register-table payload + (BQ only) confirm the source table exists.
No DB write. Used by the UI to surface row count + size + column count
in the modal before the operator clicks Register, and by the CLI's
``--dry-run`` to print what *would* be registered without touching
state. Identical Pydantic validation to register-table; for BQ rows we
additionally make a ``bigquery.Client(project).get_table(...)`` call
and surface the GCP error verbatim.
Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a
threadpool — the BQ branch makes synchronous ``bigquery.Client(...)``
/``client.get_table(...)`` calls, which would otherwise block the
asyncio event loop and stall every other request for the duration of
the GCE round-trip. Mirrors the same conversion done for
``register_table`` (see comment on that route). ``Depends(...)`` works
identically in sync handlers.
"""
if not request.name or not request.name.strip():
raise HTTPException(status_code=422, detail="Table name cannot be empty")
if request.source_type != "bigquery":
# M1 only adds BQ-specific precheck. Other source types get a
# validation-only response so the CLI / UI can rely on the same
# endpoint shape across types.
return {
"ok": True,
"table": {
"name": request.name,
"source_type": request.source_type,
"bucket": request.bucket,
"source_table": request.source_table,
"rows": None,
"size_bytes": None,
"columns": [],
"note": "precheck for non-bigquery sources is validation-only in M1",
},
}
# BQ-specific shape validation (forces query_mode/profile_after_sync,
# checks identifier safety, validates project_id from instance.yaml).
_validate_bigquery_register_payload(request)
# Materialized BQ rows have no `dataset.source_table` to round-trip —
# the SQL body is the contract. Skip the BQ-jobs-API call and return a
# validation-only precheck so the CLI's `--dry-run --query-mode
# materialized` path doesn't crash on an empty fully-qualified name.
if request.query_mode == "materialized":
return {
"ok": True,
"table": {
"name": request.name,
"source_type": "bigquery",
"query_mode": "materialized",
"source_query": request.source_query,
"rows": None,
"size_bytes": None,
"columns": [],
"note": (
"materialized precheck is validation-only — the SQL is "
"evaluated for cost on each scheduled materialize tick"
),
},
}
# Round-trip the BQ jobs API to confirm the table exists and the SA can
# see it. Imports kept local to avoid pulling google-cloud-bigquery into
# the import chain on non-BQ instances.
try:
from google.cloud import bigquery # noqa: PLC0415
from google.api_core import exceptions as google_exc # noqa: PLC0415
except ImportError as e:
raise HTTPException(
status_code=500,
detail=(
"google-cloud-bigquery not installed; install the bigquery "
f"extras to use BQ precheck ({e})"
),
) from e
from app.instance_config import get_value
project_id = get_value("data_source", "bigquery", "project", default="")
dataset = (request.bucket or "").strip()
source_table = (request.source_table or "").strip()
fq = f"{project_id}.{dataset}.{source_table}"
try:
client = bigquery.Client(project=project_id)
bq_table = client.get_table(fq)
except google_exc.NotFound as e:
raise HTTPException(status_code=404, detail=f"BigQuery table not found: {fq} ({e})") from e
except google_exc.Forbidden as e:
raise HTTPException(
status_code=403,
detail=(
f"BigQuery access denied for {fq}: {e}. "
"Service account needs bigquery.metadata.get on the dataset."
),
) from e
except Exception as e:
# Auth errors, transient 5xx, malformed table refs — surface as 400
# so the operator gets the GCP error verbatim and can fix their
# config without us guessing the right HTTP code.
raise HTTPException(status_code=400, detail=f"BigQuery precheck failed for {fq}: {e}") from e
columns = [
{"name": f.name, "type": f.field_type}
for f in (bq_table.schema or [])
]
return {
"ok": True,
"table": {
"name": request.name,
"source_type": "bigquery",
"bucket": dataset,
"source_table": source_table,
"project_id": project_id,
"rows": int(bq_table.num_rows or 0),
"size_bytes": int(bq_table.num_bytes or 0),
"columns": columns,
"column_count": len(columns),
},
}
@router.put("/registry/{table_id}")
async def update_table(
table_id: str,
request: UpdateTableRequest,
background: BackgroundTasks,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Update a registered table's configuration.
For BQ rows, schedules a background rebuild so the master view picks
up changes (e.g. a renamed dataset) without waiting for the next
scheduled sync.
"""
repo = TableRegistryRepository(conn)
existing = repo.get(table_id)
if not existing:
raise HTTPException(status_code=404, detail="Table not found")
updates = {k: v for k, v in request.model_dump().items() if v is not None}
# Run BQ-shape validation BEFORE persisting whenever the merged record
# would be a bigquery row (existing was BQ, or the patch flips it to BQ,
# or the patch touches BQ-relevant fields on an already-BQ row). Without
# this gate, an admin could PUT `bucket="evil\"; DROP --"` onto a BQ
# row and the next rebuild would silently fail at view-create time —
# surface the bad shape at PUT time instead.
if updates:
# Preserve the original `registered_at` across PUTs — `repo.register`
# now accepts it as an optional kwarg; without this the upsert would
# stamp a fresh `now()` on every edit (issue #130).
merged = dict(existing)
merged.update(updates)
merged.pop("id", None) # avoid duplicate id kwarg
# When switching the merged record away from materialized mode, drop
# the stale source_query — the request validator can't clear it via
# the `if v is not None` filter above. Without this, a remote/local
# row would carry an orphan source_query in the registry.
if merged.get("query_mode") != "materialized":
merged["source_query"] = None
if merged.get("source_type") == "bigquery":
# Reuse the register-time validator. It mutates the request to
# force query_mode='remote' / profile_after_sync=False (or to
# leave a materialized row alone) — apply the same coercion to
# `merged` so the persisted row matches.
synthetic = RegisterTableRequest(
name=merged.get("name") or table_id,
bucket=merged.get("bucket"),
source_table=merged.get("source_table"),
source_query=merged.get("source_query"),
source_type="bigquery",
query_mode=merged.get("query_mode") or "remote",
profile_after_sync=bool(merged.get("profile_after_sync") or False),
primary_key=merged.get("primary_key"),
description=merged.get("description"),
folder=merged.get("folder"),
sync_strategy=merged.get("sync_strategy") or "full_refresh",
sync_schedule=merged.get("sync_schedule"),
)
_validate_bigquery_register_payload(synthetic)
merged["query_mode"] = synthetic.query_mode
merged["profile_after_sync"] = synthetic.profile_after_sync
merged["source_query"] = synthetic.source_query
repo.register(id=table_id, **merged)
AuditRepository(conn).log(
user_id=user.get("id"),
action="update_table",
resource=table_id,
params=_sanitize_for_audit({"updated_fields": sorted(updates.keys()), **updates}),
)
# If we updated a BQ row (or one that's now BQ), refresh the extract in
# the background so the view picks up renames / column-list changes.
# Use the BG wrapper so any rebuild errors are logged at ERROR level
# instead of being silently dropped by BackgroundTasks (which discards
# return values).
after = repo.get(table_id) or {}
if after.get("source_type") == "bigquery":
background.add_task(_materialize_bigquery_extract_bg)
return {"id": table_id, "updated": list(updates.keys())}
@router.delete("/registry/{table_id}", status_code=204)
async def unregister_table(
table_id: str,
background: BackgroundTasks,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Unregister a table from the system.
For BQ rows, schedules a background rebuild so the dropped row's
master view is removed from analytics.duckdb (rather than hanging
around until the next scheduled sync).
"""
repo = TableRegistryRepository(conn)
existing = repo.get(table_id)
if not existing:
raise HTTPException(status_code=404, detail="Table not found")
was_bigquery = existing.get("source_type") == "bigquery"
repo.unregister(table_id)
AuditRepository(conn).log(
user_id=user.get("id"),
action="unregister_table",
resource=table_id,
params=_sanitize_for_audit({
"name": existing.get("name"),
"source_type": existing.get("source_type"),
"bucket": existing.get("bucket"),
"source_table": existing.get("source_table"),
}),
)
if was_bigquery:
background.add_task(_materialize_bigquery_extract_bg)
@router.post("/configure")
async def configure_instance(
request: ConfigureRequest,
user: dict = Depends(require_admin),
):
"""Configure data source and instance settings via API.
Writes config to instance.yaml and persists secrets to .env_overlay.
AI agents and the /setup wizard use this instead of manual file editing.
"""
import yaml
if request.data_source not in ("keboola", "bigquery", "local"):
raise HTTPException(status_code=400, detail="data_source must be 'keboola', 'bigquery', or 'local'")
# Validate credentials if provided
if request.data_source == "keboola":
if not request.keboola_token or not request.keboola_url:
raise HTTPException(status_code=400, detail="keboola_token and keboola_url are required for Keboola data source")
_validate_url_not_private(request.keboola_url, field_name="keboola_url")
try:
from connectors.keboola.client import KeboolaClient
client = KeboolaClient(token=request.keboola_token, url=request.keboola_url)
client.test_connection()
except Exception as e:
logger.error("Keboola connection validation failed: %s", e)
raise HTTPException(status_code=400, detail="Keboola connection failed. Check your token and URL.")
elif request.data_source == "bigquery":
if not request.bigquery_project:
raise HTTPException(status_code=400, detail="bigquery_project is required for BigQuery data source")
# Write instance.yaml to DATA_DIR/state/ (writable Docker volume),
# NOT to CONFIG_DIR which is mounted read-only in Docker.
#
# Narrow-overlay write strategy — must match `/api/admin/server-config`:
# 1. Read overlay verbatim (do NOT fall back to static). Falling back
# would copy env-resolved cleartext secrets from the merged static
# file back into the overlay (e.g. `smtp_password: ${SMTP_PASSWORD}`
# → `smtp_password: hunter2`). The wizard only ever sets
# `instance`, `auth`, `data_source` here, so other sections must
# flow from the static file via `load_instance_config`'s deep-merge
# — they don't belong in the overlay at all.
# 2. Patch only the sections this endpoint touches.
# 3. Write the narrow overlay back atomically (tmp + os.replace).
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
config_path = data_dir / "state" / "instance.yaml"
# Same serialization + corrupt-overlay handling as POST /server-config.
with _overlay_write_lock:
overlay: dict = {}
if config_path.exists():
try:
overlay = yaml.safe_load(config_path.read_text()) or {}
except Exception as e:
logger.exception("configure: refusing to overwrite corrupt overlay at %s", config_path)
raise HTTPException(
status_code=500,
detail=f"refusing to overwrite corrupt overlay at {config_path} ({e}); "
"back up and remove the file, or fix it by hand",
) from e
# Merge instance settings into the overlay only — never seed from the
# env-resolved merged config.
if request.instance_name:
overlay.setdefault("instance", {})["name"] = request.instance_name
if request.allowed_domain:
overlay.setdefault("auth", {})["allowed_domain"] = request.allowed_domain
# data_source is fully owned by this endpoint — replace wholesale.
overlay["data_source"] = {"type": request.data_source}
if request.data_source == "keboola":
overlay["data_source"]["keboola"] = {
"stack_url": request.keboola_url,
"token_env": "KEBOOLA_STORAGE_TOKEN",
}
elif request.data_source == "bigquery":
overlay["data_source"]["bigquery"] = {
"project": request.bigquery_project,
"location": request.bigquery_location or "us",
}
# Atomic write to writable data volume — same tmp + os.replace pattern
# as the server-config editor so a concurrent save can't tear the file.
config_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = config_path.with_suffix(config_path.suffix + ".tmp")
tmp_path.write_text(yaml.dump(overlay, default_flow_style=False, sort_keys=False))
os.replace(tmp_path, config_path)
logger.info("Wrote instance config to %s", config_path)
# Persist secrets to .env_overlay (in data volume, never in git)
secrets_to_persist = {}
if request.keboola_token:
secrets_to_persist["KEBOOLA_STORAGE_TOKEN"] = request.keboola_token
if request.keboola_url:
secrets_to_persist["KEBOOLA_STACK_URL"] = request.keboola_url
if secrets_to_persist:
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
overlay_path = data_dir / "state" / ".env_overlay"
overlay_path.parent.mkdir(parents=True, exist_ok=True)
# Merge with existing overlay
existing_overlay = {}
if overlay_path.exists():
for line in overlay_path.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
existing_overlay[k.strip()] = v.strip()
existing_overlay.update(secrets_to_persist)
overlay_path.write_text(
"\n".join(f"{k}={v}" for k, v in existing_overlay.items()) + "\n"
)
try:
overlay_path.chmod(0o600)
except OSError:
pass
logger.info("Persisted %d secrets to .env_overlay", len(secrets_to_persist))
# Inject into current process environment
for k, v in secrets_to_persist.items():
os.environ[k] = v
# Invalidate cached instance config so next read picks up changes.
# Use the public helper (matches `/api/admin/server-config`); reaching
# into the private global silently breaks if the cache layout changes.
from app.instance_config import reset_cache
reset_cache()
return {
"status": "ok",
"data_source": request.data_source,
"connection": "verified" if request.data_source != "local" else "local",
}
def _discover_and_register_tables(conn: duckdb.DuckDBPyConnection, user_email: str) -> dict:
"""Discover tables from configured source and register them. Shared logic for API and sync."""
from app.instance_config import get_data_source_type, get_value
source_type = get_data_source_type()
if source_type != "keboola":
return {"registered": 0, "skipped": 0, "errors": 0, "tables": [], "source": source_type}
from connectors.keboola.client import KeboolaClient
# Read from data_source.keboola (matches what /api/admin/configure writes)
url = get_value("data_source", "keboola", "stack_url", default="")
token_env = get_value("data_source", "keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN")
token = os.environ.get(token_env, "") if token_env else ""
if not token:
token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
client = KeboolaClient(token=token, url=url)
discovered = client.discover_all_tables()
repo = TableRegistryRepository(conn)
registered = 0
skipped = 0
errors = 0
table_names = []
for table in discovered:
table_id = table.get("id", "").strip().lower().replace(".", "_").replace(" ", "_")
if not table_id:
errors += 1
continue
if repo.get(table_id):
skipped += 1
continue
try:
# Parse bucket from table ID (format: in.c-bucket.table_name)
parts = table.get("id", "").split(".")
bucket = parts[1] if len(parts) > 1 else ""
source_table = parts[2] if len(parts) > 2 else table.get("name", "")
repo.register(
id=table_id,
name=table.get("name", table_id),
source_type="keboola",
bucket=bucket,
source_table=source_table,
query_mode="local",
registered_by=user_email,
description=f"Auto-discovered from Keboola: {table.get('id', '')}",
)
registered += 1
table_names.append(table_id)
except Exception as e:
logger.warning("Failed to register %s: %s", table_id, e)
errors += 1
return {
"registered": registered,
"skipped": skipped,
"errors": errors,
"tables": table_names,
"source": "keboola",
}
@router.post("/discover-and-register")
async def discover_and_register(
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Discover tables from configured source and auto-register them.
Combines discover-tables + register-table into one call.
Skips already-registered tables. Used by /setup wizard and AI agents.
"""
try:
result = _discover_and_register_tables(conn, user.get("email", "admin"))
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Discovery and registration failed: {e}")