"""Admin endpoints — table discovery, registry management, instance configuration. Every gate on this router uses ``require_admin`` from ``app.auth.access``, which checks Admin user_group membership for both OAuth session and PAT callers via the same ``_user_group_ids`` lookup. """ import logging import os import threading import uuid from pathlib import Path from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, Field, field_validator, model_validator from typing import Optional, List, Dict, Any import duckdb from app.auth.access import require_admin from app.auth.dependencies import _get_db from src.repositories.table_registry import TableRegistryRepository from src.repositories.audit import AuditRepository from src.identifier_validation import ( is_safe_identifier as _is_safe_identifier, is_safe_quoted_identifier as _is_safe_quoted_identifier, ) from src.sql_safe import is_safe_project_id as _is_safe_project_id from src.scheduler import is_valid_schedule logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/admin", tags=["admin"]) # Serializes the read-modify-write of state/instance.yaml across the two # endpoints that mutate the overlay (POST /server-config and POST /configure). # Without it, two admins saving concurrently would each read the same overlay # snapshot, merge their disjoint patches, and the second os.replace would silently # drop the first patch. Single-process FastAPI workers; multi-worker deployments # would need an OS-level file lock — documented limitation. _overlay_write_lock = threading.Lock() # SSRF protection: reject private/internal URLs for keboola_url import ipaddress as _ipaddress import socket as _socket from urllib.parse import urlparse as _urlparse def _validate_url_not_private(url: str, field_name: str = "url") -> None: """Raise 400 if the URL host points to a private/reserved network. Uses DNS resolution + ipaddress checks instead of hostname regex, which correctly handles all IPv4/IPv6 addresses including abbreviated forms (fe80::1, ::1, etc.) and DNS rebinding (resolves at check time). """ try: parsed = _urlparse(url) except Exception: raise HTTPException(status_code=400, detail=f"Invalid {field_name}: not a valid URL") host = parsed.hostname or "" if not host: raise HTTPException(status_code=400, detail=f"Invalid {field_name}: missing hostname") # Reject well-known dangerous hostnames before DNS resolution if host.lower() in ("localhost", "localhost.localdomain"): raise HTTPException( status_code=400, detail=f"Invalid {field_name}: must not point to a private or reserved network", ) # Resolve hostname to IP addresses and check each one try: addrinfos = _socket.getaddrinfo(host, None, proto=_socket.IPPROTO_TCP) except Exception: raise HTTPException( status_code=400, detail=f"Invalid {field_name}: could not resolve hostname", ) for family, _type, _proto, _canonname, sockaddr in addrinfos: ip_str = sockaddr[0] try: ip = _ipaddress.ip_address(ip_str) except ValueError: continue if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast: raise HTTPException( status_code=400, detail=f"Invalid {field_name}: must not point to a private or reserved network", ) def _normalize_primary_key(v): """Coerce a string primary_key to ``[v]`` for backward compatibility. The 0.14.0 contract is ``Optional[List[str]]`` so composite primary keys (e.g. session-grain tables keyed on ``(session_id, event_date)``) round- trip cleanly. Pre-0.14.0 callers sent a single string; Pydantic v2 refuses to coerce, so without this validator a CLI script posting ``"primary_key": "session_id"`` would now hit a 422. Wrap a bare string in a one-element list so old and new callers both work. """ if v is None: return v if isinstance(v, str): return [v] return v # Patches to these section paths must pass _validate_url_not_private. The # tuple is `(section, *intermediate_keys, leaf_key)` — same SSRF gate the # /configure wizard applies to keboola_url, so an admin can't sneak # http://169.254.169.254/ in via the server-config editor's data_source patch. # # Intentionally NOT included: ``("ai", "base_url")``. The openai_compat # provider legitimately points at internal services (LiteLLM proxy on a # private network, on-cluster vLLM endpoint, etc.) — see # config/instance.yaml.example "LiteLLM proxy" example. SSRF blocking # would break those valid setups. Operators with stricter posture should # enforce the constraint upstream (firewall / egress proxy allowlist). # Devin ANALYSIS_0001 on PR #141 5f649a4 review. _URL_BEARING_FIELDS: tuple[tuple[str, ...], ...] = ( ("data_source", "keboola", "stack_url"), ) def _validate_urls_in_patch(sections: Dict[str, Dict[str, Any]]) -> None: """Apply SSRF protection to every URL-bearing field present in the patch. Walks each registered ``(section, *path, leaf)`` against the incoming patch and runs ``_validate_url_not_private`` on any string value found. Missing intermediate keys / non-dict nodes are silently skipped — the patch hasn't touched that field, no validation needed. """ for path in _URL_BEARING_FIELDS: section = path[0] if section not in sections: continue node: Any = sections[section] for key in path[1:-1]: if not isinstance(node, dict) or key not in node: node = None break node = node[key] if isinstance(node, dict): value = node.get(path[-1]) if isinstance(value, str) and value: _validate_url_not_private(value, field_name=".".join(path)) # --- Server-config (instance.yaml) editor ----------------------------------- # # The /admin/server-config UI POSTs a partial dict here keyed by section # (instance, data_source, email, telegram, jira, theme, server, auth) with # the field values to merge into instance.yaml. Each save: # 1. Loads the current instance.yaml (writable overlay first, then static). # 2. Deep-merges the patch on top. # 3. Writes to DATA_DIR/state/instance.yaml (the writable overlay). # 4. Writes one audit_log entry tagged `instance_config.update` containing # a sanitized diff (secret-looking keys are masked). # Hot-reload is OUT OF SCOPE for #91 — the response carries # `restart_required=True` so the UI can show the banner. # Sections an admin can mutate. Keep the list explicit so a typo'd section # in the request body is rejected loudly instead of being silently merged # into the YAML root and confusing future loads. _EDITABLE_SECTIONS: tuple[str, ...] = ( "instance", "data_source", "email", "telegram", "jira", "theme", "server", "auth", "ai", "openmetadata", "desktop", "corporate_memory", ) # "Danger-zone" sections — flipping these can lock operators out (auth.*) or # break OAuth callbacks (server.hostname/host). The UI shows a confirmation # dialog before submitting them. The API accepts them; this list exists so # the audit entry can flag the change as high-risk and the UI can surface # the right warning copy. _DANGER_SECTIONS: tuple[str, ...] = ("auth", "server") # Known-but-optional config fields per section. The /admin/server-config UI # uses this registry alongside the YAML payload to render fields the operator # might want to set even though they're not currently in instance.yaml. # # Schema per field: # { # "kind": "string" | "secret" | "bool" | "int" | "select" | "object" | "array", # "default": (optional) # "hint": "" # "options": [...] (only for kind="select") # "fields": {: } (only for kind="object", nested fields) # "item_kind": "string" | ... (only for kind="array", element type) # "required": bool (defaults False; UI marks the label) # } # # Subagents 2-4 will populate the bodies. The registry enables the UI to # render missing-but-known fields with placeholders + hints rather than # forcing the operator to discover them via the JSON-patch textarea or # hitting a runtime error first. The smoke fixture below # (data_source.bigquery.billing_project) proves the renderer wiring works # end-to-end so subagents 2-4 only have to add registry entries — they # don't need to touch admin_server_config.html. _KNOWN_FIELDS: dict[str, dict[str, dict]] = { "instance": { # No commonly-missing instance-level fields. The example YAML's # `name`/`subtitle` are always populated by `da setup` so they # render via the populated path; nothing to surface here. }, "data_source": { "bigquery": { "kind": "object", "hint": "BigQuery connection knobs (read more in docs/DEPLOYMENT.md)", "fields": { "billing_project": { "kind": "string", "hint": ( "GCP project to bill BQ jobs against. Set when SA can read " "the data project but cannot bill there (e.g. shared read-only " "data project). Defaults to data_source.bigquery.project. " "Mismatch → 403 USER_PROJECT_DENIED on every BQ call." ), # Issue #160 §4.7.5: when this field is empty in the # admin form, the JS template shows "(defaults to )" # as placeholder text — surfacing the access.py:339-340 # fallback rule directly in the UI without the operator # having to read source. Path is walked against the # `original` config payload from GET /api/admin/server-config. "placeholder_from": ["data_source", "bigquery", "project"], }, "max_bytes_per_materialize": { "kind": "int", "default": 10737418240, "hint": ( "Cost guardrail for query_mode='materialized' BQ scans (dry-run " "check before running). Bytes processed; exceeds → registration " "or sync rejected. 0 disables the gate. Default 10737418240 = 10 GiB." ), }, "bq_max_scan_bytes": { "kind": "int", "default": 5368709120, "hint": ( "Cost guardrail for `da query --remote` against query_mode='remote' " "BQ rows (dry-run check on the underlying SELECT before execute). " "Bytes processed; exceeds → 400 remote_scan_too_large with a " "`da fetch` suggestion. 0 disables the gate. Default 5368709120 = 5 GiB." ), }, }, }, "keboola": { "kind": "object", "hint": "Keboola Storage API connection", "fields": { "stack_url": { "kind": "string", "hint": ( "e.g. https://connection.keboola.com (instance-specific stack URL). " "Validated against private-IP allowlist on save (SSRF guard)." ), }, "project_id": { "kind": "string", "hint": "Keboola project ID (numeric, but kept as string in YAML).", }, }, }, }, "email": { # SMTP fields render via the populated path (always set when email # is enabled); no commonly-missing optional knobs at this layer. }, "telegram": { # Rarely missing; leave empty. }, "jira": { # Webhook + REST credentials always present when Jira is configured. }, "theme": { # Cosmetic only; rarely missing. }, "server": { # TLS / hostname knobs are mostly env-side; nothing to surface here. }, "auth": { "allowed_domain": { "kind": "string", "hint": ( "Comma-separated list of allowed sign-in email domains (e.g. " "'acme.com,acme-internal.com'). Single domain works too. Empty → no " "domain restriction (any verified Google identity can sign in)." ), }, }, "ai": { "base_url": { "kind": "string", "hint": ( "Required for provider='openai_compat' (LiteLLM, OpenRouter, vLLM, etc.). " "Ignored when provider='anthropic'. Examples: https://litellm.example.com, " "https://openrouter.ai/api/v1." ), }, "structured_output": { "kind": "select", "options": ["strict", "json", "auto"], "default": "auto", "hint": ( "JSON-schema enforcement strategy. strict=Layer 1 only " "(Anthropic/OpenAI native, fail otherwise). json=Layer 1 + Layer 2 " "fallback. auto=all three layers including prompt-based JSON (most " "compatible, least strict)." ), }, }, "openmetadata": { "url": { "kind": "string", "hint": "Base URL of your OpenMetadata server (e.g. https://catalog.example.com).", }, "token": { "kind": "secret", "hint": ( "JWT bearer token. Use ${OPENMETADATA_TOKEN} env-var reference " "(don't paste secret directly)." ), }, "cache_ttl_seconds": { "kind": "int", "default": 3600, "hint": "How long to cache catalog responses in-process. Default 3600s (1h).", }, "verify_ssl": { "kind": "bool", "default": True, "hint": ( "TLS verification. Default true. Set false ONLY for internal CAs / " "self-signed certs — sends the JWT over an unverified channel." ), }, }, "desktop": { "jwt_issuer": { "kind": "string", "default": "data-analyst", "hint": "JWT iss claim. Match what the desktop app verifies.", }, "jwt_secret": { "kind": "secret", "hint": "JWT signing secret. Use ${DESKTOP_JWT_SECRET} env-var reference.", }, "url_scheme": { "kind": "string", "default": "data-analyst", "hint": "Custom URL scheme registered by the desktop app (data-analyst://...).", }, }, # corporate_memory governance — optional. When the section is missing # from instance.yaml the system runs in legacy democratic-wiki mode # (no admin review). Schema mirrors config/instance.yaml.example # lines 224-317; renderer handles arbitrary depth + arrays + maps. "corporate_memory": { "distribution_mode": { "kind": "select", "options": ["mandatory_only", "admin_curated", "hybrid"], "default": "hybrid", "hint": ( "How knowledge reaches users. mandatory_only = admin-only; " "admin_curated = admin + user voting as feedback; " "hybrid = default (mandatory from admin + optional from user voting)." ), }, "approval_mode": { "kind": "select", "options": ["review_queue", "auto_publish", "threshold"], "default": "review_queue", "hint": ( "How AI-extracted items enter the system. review_queue = admin " "approval required (default); auto_publish = live immediately; " "threshold = high-confidence auto, low-confidence to queue." ), }, "review_period_months": { "kind": "int", "default": 6, "hint": "How often approved/mandatory items are flagged for re-review (months).", }, "notify_on_new_items": { "kind": "bool", "default": True, "hint": "Notify km_admins when new pending items arrive.", }, "sources": { "kind": "object", "hint": ( "Knowledge-source ingestion. Each source has its own enabled " "flag + base confidence." ), "fields": { "claude_local_md": { "kind": "object", "fields": { "enabled": {"kind": "bool", "default": True}, "confidence_base": { "kind": "float", "default": 0.50, "hint": "Confidence assigned to extractions from CLAUDE.local.md (0-1).", }, }, }, "session_transcripts": { "kind": "object", "fields": { "enabled": {"kind": "bool", "default": True}, "confidence_base": {"kind": "float", "default": 0.60}, "max_turns_per_session": { "kind": "int", "default": 100, "hint": "Truncate transcripts longer than this many turns.", }, "detection_types": { "kind": "array", "item_kind": "string", "default": [ "correction", "confirmation", "unprompted_definition", ], "hint": ( "Which extraction patterns to detect. Each entry " "is a detection-type tag." ), }, }, }, }, }, "extraction": { "kind": "object", "fields": { "model": { "kind": "string", "default": "claude-haiku-4-5-20251001", "hint": "LLM used to extract knowledge. Override for cost or quality.", }, "sensitivity_check": {"kind": "bool", "default": True}, "contradiction_check": {"kind": "bool", "default": True}, }, }, "confidence": { "kind": "object", "hint": "Confidence scoring + decay rules.", "fields": { "base": { "kind": "map", "key_kind": "string", "value_kind": "float", "default": { "user_verification.correction": 0.90, "user_verification.unprompted_definition": 0.90, "user_verification.confirmation": 0.60, "admin_mandate": 1.00, "claude_local_md": 0.50, "session_transcript": 0.50, }, "hint": ( "Base score per source/detection. Keys are 'source_type' " "or 'source_type.detection_type' (the dot is data, not " "nesting)." ), }, "modifiers": { # map>. The renderer's structured # editor for "map of objects with declared subfields" is a # TODO (see admin_server_config.html); for now this falls # back to a JSON textarea — admins editing it see the # schema doc inline via the hint. "kind": "map", "key_kind": "string", "value_kind": "object", "value_fields": {}, # signals the JSON-textarea fallback "hint": ( "Per-key modifier step sizes applied to base when " "optional signals are present (3-level dotted paths). " "Edit as a JSON object — outer keys mirror confidence.base " "keys; inner objects map signal name to bonus float." ), }, "decay": { "kind": "object", "fields": { "mode": { "kind": "select", "options": ["linear", "exponential"], "default": "exponential", }, "half_life_months": { "kind": "int", "default": 12, "hint": "Used when mode=exponential.", }, "decay_rate_monthly": { "kind": "float", "default": 0.02, "hint": "Used when mode=linear.", }, "floor": { "kind": "map", "key_kind": "string", "value_kind": "float", "default": { "admin_mandate": 0.50, "user_verification": 0.40, "default": 0.0, }, "hint": ( "Per-source minimum confidence — items never decay " "below this floor." ), }, }, }, }, }, "contradiction_detection": { "kind": "object", "fields": { "enabled": {"kind": "bool", "default": True}, "max_candidates": { "kind": "int", "default": 10, "hint": "Max contradiction candidates to evaluate per new item.", }, }, }, "entity_resolution": { "kind": "object", "fields": { "enabled": {"kind": "bool", "default": True}, "entities": { "kind": "map", "key_kind": "string", "value_kind": "array", "value_item_kind": "string", "default": { "metrics": ["churn", "MRR", "ARR", "NPS", "CAC", "LTV"], "products": ["Platform", "API", "Dashboard"], }, "hint": ( "Domain-entity vocabulary. Key = domain category; value = " "canonical names list." ), }, }, }, "domain_owners": { "kind": "map", "key_kind": "string", "value_kind": "array", "value_item_kind": "string", "hint": ( "Per-domain admin emails. Key = domain name; value = email list." ), }, "domains": { "kind": "array", "item_kind": "string", "default": [ "finance", "engineering", "product", "data", "operations", "infrastructure", ], "hint": ( "Knowledge domains analysts can target. Each must match a key " "in domain_owners." ), }, }, } # Keys whose values must be redacted from the audit diff. We match # substring (case-insensitive) so `client_secret`, `api_token`, # `webapp_secret_key`, `bot_token`, `password`, `smtp_password`, etc. all # get masked even when nested. _SECRET_KEY_PATTERNS: tuple[str, ...] = ( "secret", "token", "password", "api_key", ) def _is_secret_key(key: str) -> bool: """True if a config key holds a credential and should be masked in audit logs.""" k = key.lower() return any(pat in k for pat in _SECRET_KEY_PATTERNS) def _mask(value: Any) -> str: """Replacement value used in the audit diff for secret fields. We deliberately do NOT preserve length or any hint about the secret — the diff is read by other admins, and there's no operator value to leaking "the new SMTP password is 16 chars". `***` is enough to show that the field changed without exposing it. """ if value in (None, ""): return "" return "***" # Sentinel values produced by `_mask`. Any patch leaf that arrives at a # secret-keyed slot still bearing one of these strings means the caller # round-tripped the GET payload (which redacts secret-keyed children inside # nested objects) without changing the value — `_strip_redacted_sentinels` # drops the leaf so deep-merge preserves whatever the overlay already had, # rather than persisting the placeholder on top of the real secret. _REDACTED_SENTINELS: frozenset = frozenset({"***", ""}) def _strip_redacted_sentinels(value: Any, key_hint: str = "") -> Any: """Recursively drop secret-keyed leaves whose value is a redaction sentinel. Symmetric with `_redact`: the GET handler masks secret-keyed children inside nested objects so the form never shows cleartext, and this function is the write-side counterpart that ensures the placeholder doesn't make a round-trip back into the overlay. Defense-in-depth alongside the client-side `scrubRedactedSecrets` in `admin_server_config.html` — an API caller (CLI / script) that forgets to scrub still can't corrupt secrets via this endpoint. """ if isinstance(value, dict): out: Dict[str, Any] = {} for k, v in value.items(): if _is_secret_key(k) and isinstance(v, str) and v in _REDACTED_SENTINELS: continue out[k] = _strip_redacted_sentinels(v, k) return out if isinstance(value, list): return [_strip_redacted_sentinels(item, key_hint) for item in value] return value def _redact(value: Any, key_hint: str = "") -> Any: """Recursively mask secret-looking fields in a config subtree. `key_hint` is the parent key — used so a string value like ``"${KEBOOLA_TOKEN}"`` under ``token_env`` is masked even though the value itself isn't a credential, because the key signals it points at one. """ if isinstance(value, dict): return {k: (_mask(v) if _is_secret_key(k) else _redact(v, k)) for k, v in value.items()} if isinstance(value, list): return [_redact(item, key_hint) for item in value] if key_hint and _is_secret_key(key_hint): return _mask(value) return value def _diff_dicts(before: dict, after: dict, path: str = "") -> List[Dict[str, Any]]: """Flat list of changed fields between two dicts. Output: [{"path": "email.smtp_host", "before": "...", "after": "..."}]. Diff is computed on RAW values, then each row's `before`/`after` is masked via `_mask` when the leaf key matches `_is_secret_key` — pre- masking the inputs would collapse a secret rotation (e.g. password A → password B) into "no diff" because both sides redact to ``"***"``, and the audit log would then silently fail to record one of the most security-relevant changes. Compare raw, redact when emitting. Recurses into a dict on either side (treating the missing side as `{}`) so adding a brand-new section reports per-field paths (`email.smtp_host`) rather than a single opaque `email` blob — that keeps the audit row useful when an admin populates a section for the first time. """ changes: List[Dict[str, Any]] = [] keys = set(before.keys()) | set(after.keys()) for key in sorted(keys): new_path = f"{path}.{key}" if path else key b_val = before.get(key) a_val = after.get(key) b_is_dict = isinstance(b_val, dict) a_is_dict = isinstance(a_val, dict) # Dict-vs-dict (or dict-vs-None) → recurse for per-field paths. if b_is_dict and a_is_dict: changes.extend(_diff_dicts(b_val, a_val, new_path)) elif b_is_dict and a_val is None: changes.extend(_diff_dicts(b_val, {}, new_path)) elif a_is_dict and b_val is None: changes.extend(_diff_dicts({}, a_val, new_path)) # Dict↔scalar shape change is recorded as a single replacement at # the parent path. Recursing with `{}` would lose the scalar side # entirely (admin sets `keboola: {…}` to `keboola: "disabled"` — # auditor would see members removed but never the new value). # The dict side may itself contain secret-keyed children (e.g. # `keboola: {token_env: ${KEBOOLA_TOKEN}}` resolved to cleartext); # `_redact` masks those children even when the parent key isn't # secret-named, so the audit log doesn't leak ${ENV_VAR}-resolved # values when a section is replaced wholesale. elif b_is_dict != a_is_dict: if _is_secret_key(key): changes.append({ "path": new_path, "before": _mask(b_val), "after": _mask(a_val), }) else: changes.append({ "path": new_path, "before": _redact(b_val, key) if b_is_dict else b_val, "after": _redact(a_val, key) if a_is_dict else a_val, }) elif b_val != a_val: if _is_secret_key(key): changes.append({ "path": new_path, "before": _mask(b_val), "after": _mask(a_val), }) else: changes.append({"path": new_path, "before": b_val, "after": a_val}) return changes def _deep_merge(base: dict, patch: dict) -> dict: """Merge `patch` into `base` recursively, returning a new dict. Patch values overwrite base values. Dict-into-dict recurses; everything else (lists, scalars, None) is replaced wholesale — admin sets ``email: {smtp_port: 465}`` and we don't try to re-merge nested ports. """ out = dict(base) for key, value in patch.items(): if isinstance(value, dict) and isinstance(out.get(key), dict): out[key] = _deep_merge(out[key], value) else: out[key] = value return out def _load_current_instance_yaml() -> dict: """Return the editor's view of instance.yaml — deep-merge of static + overlay via ``app.instance_config.load_instance_config``. Readers (GET /server-config) hit the cache and trust that writers invalidate. Writers must call ``reset_cache()`` explicitly *before* the read so they see the latest disk state in the read-modify-write sequence. The shared helper is the authoritative source so the editor never sees a different view than the rest of the running app. """ from app.instance_config import load_instance_config return load_instance_config() def _public_view(config: dict) -> dict: """Return a config dict safe to render in the admin UI form. Deep-copies and redacts secret-looking fields so an admin can see *which* fields are populated without the cleartext leaking into the rendered HTML / browser DevTools. """ import copy return _redact(copy.deepcopy(config)) class ServerConfigUpdateRequest(BaseModel): """Patch payload for POST /api/admin/server-config. Only the sections listed in `_EDITABLE_SECTIONS` are accepted; anything else is rejected with 400. `confirm_danger` must be true if the patch touches any danger-zone section (auth.*, server.*). """ sections: Dict[str, Dict[str, Any]] = Field( default_factory=dict, description="Per-section patch dict (e.g. {'instance': {'name': 'X'}})", ) confirm_danger: bool = Field( default=False, description="Must be true to apply changes touching auth.* or server.*", ) # Optional BQ fields whose runtime defaults are documented but which used to # be invisible in the editor when YAML omitted them. The data_source.bigquery # subtree renders as a JSON textarea; a key that's absent from the GET # payload literally cannot appear in the form for the operator to edit. We # surface them with their documented defaults so the UI always shows them as # editable knobs — see Phase J of the admin-tables-cleanup work. # # - billing_project: defaults to data project; explicit value needed when # the SA can read the data project but not bill against it. # - max_bytes_per_materialize: cost guardrail for `query_mode='materialized'` # (default 10 GiB; 0 disables; null falls through to the default). _BQ_OPTIONAL_FIELD_DEFAULTS: Dict[str, Any] = { # `billing_project` intentionally NOT seeded here. The empty-string # default would inject `billing_project: ""` into every GET payload, # which makes the JS `isUnset = (value === undefined)` check evaluate # False — and the `(defaults to )` placeholder feature # (#160 §4.7.5) would never render. Leaving it absent keeps the # field in the unset rendering path so placeholder_from fires. # Devin Review iter #3 on PR #168. "max_bytes_per_materialize": 10737418240, "bq_max_scan_bytes": 5368709120, } def _ensure_bq_optional_fields(sections: Dict[str, Any]) -> None: """In-place: add missing BQ optional fields to data_source.bigquery so the UI's JSON-textarea renders them as editable keys. Existing values are preserved — only absent keys are populated with their documented default. """ ds = sections.get("data_source") if not isinstance(ds, dict): return bq = ds.get("bigquery") if not isinstance(bq, dict): # No BQ subsection — leave alone. Non-BQ instances don't need these # knobs, and creating an empty bigquery dict would be misleading. return for key, default in _BQ_OPTIONAL_FIELD_DEFAULTS.items(): bq.setdefault(key, default) @router.get("/server-config") async def get_server_config( user: dict = Depends(require_admin), ): """Return the current instance.yaml with secrets redacted. Used by the /admin/server-config UI to prefill its form. The redacted payload mirrors the actual file shape, so the UI doesn't need to know the schema — it iterates over the editable sections and renders the fields it finds. Empty sections still show in the response so the form knows to render their headers. """ config = _load_current_instance_yaml() redacted = _public_view(config) # Surface every editable section so the UI renders them even when the # file omits them — operator can populate from scratch without manual # JSON edits. sections = {section: redacted.get(section, {}) for section in _EDITABLE_SECTIONS} # Always surface the optional BQ knobs so the operator sees them in the # UI's JSON editor instead of having to know they exist (Phase J). _ensure_bq_optional_fields(sections) return { "sections": sections, "editable_sections": list(_EDITABLE_SECTIONS), "danger_sections": list(_DANGER_SECTIONS), "secret_key_patterns": list(_SECRET_KEY_PATTERNS), # Known-but-optional fields per section so the UI can render # placeholders for fields the operator hasn't set yet (Phase J). # Subagents 2-4 populate the bodies; the renderer ships now so the # mechanism is wired end-to-end and adding entries is purely a # data-edit in `_KNOWN_FIELDS` above. "known_fields": _KNOWN_FIELDS, } @router.post("/server-config") async def update_server_config( request: ServerConfigUpdateRequest, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Patch instance.yaml from the /admin/server-config editor. Accepts a partial patch keyed by section. Validates sections, refuses danger-zone edits without explicit confirmation, deep-merges into the current overlay, writes the file, and emits one audit entry per save with a sanitized diff. Returns ``restart_required=true`` so the UI can show the restart banner — hot-reload is a separate issue (see #91 Out of scope). """ import yaml if not request.sections: raise HTTPException(status_code=422, detail="sections cannot be empty") # Reject unknown sections loudly. Without this, a typo like "thmee" # would silently land in the YAML root and the operator wouldn't see # their colour change apply. unknown = sorted(set(request.sections.keys()) - set(_EDITABLE_SECTIONS)) if unknown: raise HTTPException( status_code=400, detail=f"unknown section(s): {', '.join(unknown)}. " f"Editable: {', '.join(_EDITABLE_SECTIONS)}", ) # Danger-zone gate. The UI shows a confirmation dialog before posting # with confirm_danger=true; an API caller (CLI/script) has to pass it # explicitly so they can't fat-finger a hostname change. danger_touched = sorted(set(request.sections.keys()) & set(_DANGER_SECTIONS)) if danger_touched and not request.confirm_danger: raise HTTPException( status_code=400, detail=f"section(s) {', '.join(danger_touched)} require confirm_danger=true", ) # SSRF protection — same gate the /configure wizard applies to # keboola_url, but here it covers any URL-bearing field reachable via # the per-section patch (e.g. data_source.keboola.stack_url). _validate_urls_in_patch(request.sections) # Defense-in-depth: scrub redaction sentinels (`***` / ``) out of # secret-keyed leaves in the patch before they reach the deep-merge. # The client form does the same scrub, but an API caller round-tripping # the GET payload could otherwise overwrite real overlay secrets with # the placeholder shown in the form. scrubbed_sections: Dict[str, Dict[str, Any]] = { section: _strip_redacted_sentinels(patch, section) for section, patch in request.sections.items() } # Serialize read-modify-write across concurrent admin saves. Without the # lock, two saves would each read the same overlay snapshot, merge their # disjoint patches, and the second os.replace would silently drop the # first patch. The lock spans the cache-invalidate → load → merge → # atomic-write sequence; the audit log sits outside since it operates on # local snapshots. from app.instance_config import reset_cache data_dir = Path(os.environ.get("DATA_DIR", "./data")) config_path = data_dir / "state" / "instance.yaml" config_path.parent.mkdir(parents=True, exist_ok=True) with _overlay_write_lock: # Drop the in-process cache so we read the latest on-disk state, # including any update that landed from a concurrent caller before # we acquired the lock. reset_cache() before = _load_current_instance_yaml() # Deep merge — section-by-section so we never accidentally delete a # sibling section the patch didn't touch. Use the redaction-scrubbed # patch so a round-tripped GET payload can't overwrite real secrets # with the `***` placeholder. after = dict(before) for section, patch in scrubbed_sections.items(): if not isinstance(patch, dict): raise HTTPException( status_code=422, detail=f"section '{section}' must be an object, got {type(patch).__name__}", ) if isinstance(after.get(section), dict): after[section] = _deep_merge(after[section], patch) else: after[section] = patch # Write only the sections the user actually patched in this request. # Two reasons: # 1. Persisting the full merged config (or every editable section) # would snapshot non-editable static sections into the overlay, # shadowing later operator updates to those sections in the # static file (`_load_current_instance_yaml` merges static + overlay, # overlay wins per leaf). # 2. The merged config has `${ENV_VAR}` placeholders RESOLVED to the # runtime values by config.loader. Writing every editable section # back would persist real cleartext secrets where the static file # had only env-var references — turning `smtp_password: # ${SMTP_PASSWORD}` into `smtp_password: hunter2` in the overlay. # By writing only the sections in `request.sections` we keep both the # static-evolution and the env-var-placeholder properties intact. overlay_payload: Dict[str, Any] = {} if config_path.exists(): try: overlay_payload = yaml.safe_load(config_path.read_text()) or {} except Exception as e: # A corrupt overlay used to be silently replaced — that masked # disk corruption / partial writes / hand-edits and dropped # every previously-saved section on the next save. Refuse and # surface so the operator can investigate. logger.exception("server-config: refusing to overwrite corrupt overlay at %s", config_path) raise HTTPException( status_code=500, detail=f"refusing to overwrite corrupt overlay at {config_path} ({e}); " "back up and remove the file, or fix it by hand", ) from e for section, patch in scrubbed_sections.items(): if section not in _EDITABLE_SECTIONS: continue # Deep-merge the patch into the existing overlay slot (or static- # backed `before` if overlay had nothing for this section). This # preserves any unrelated keys the operator didn't touch in this # request — e.g. patching `email.smtp_host` doesn't blow away the # `email.smtp_password: ${SMTP_PASSWORD}` reference. existing = overlay_payload.get(section) if not isinstance(existing, dict): existing = {} overlay_payload[section] = _deep_merge(existing, patch) # Atomic via tmp + os.replace so two concurrent admin saves can't # interleave bytes and produce corrupt YAML (especially harmful since # auth.* is editable here — half-written file → operator lockout). tmp_path = config_path.with_suffix(config_path.suffix + ".tmp") tmp_path.write_text(yaml.dump(overlay_payload, default_flow_style=False, sort_keys=False)) os.replace(tmp_path, config_path) logger.info("server-config: wrote %d section(s) to %s", len(request.sections), config_path) # Invalidate cached instance config so subsequent reads pick up the # change. Hot-reload of running modules (auth providers, SMTP client) # is out of scope — the restart banner tells the operator to bounce. reset_cache() # Audit entry — diff is computed on RAW values then `_diff_dicts` # redacts each row whose leaf key matches `_is_secret_key`. Pre- # masking the inputs would collapse a secret rotation into "no # diff" because both sides redact to ``***``, hiding the most # security-relevant changes from the audit log. We log even if no # fields changed so the operator's intent (touched the page, hit # save) is auditable. diff = _diff_dicts(before, after) AuditRepository(conn).log( user_id=user.get("id"), action="instance_config.update", resource="instance.yaml", params={ "sections": sorted(request.sections.keys()), "danger_sections": danger_touched, "diff": diff, "diff_count": len(diff), }, ) return { "status": "ok", "restart_required": True, "sections_updated": sorted(request.sections.keys()), "diff_count": len(diff), } # --- End server-config editor ----------------------------------------------- # Source types accepted by /api/admin/register-table. Anything else is # rejected with 422 — keeps a typo'd source_type from silently landing in # table_registry (where it would later confuse the orchestrator scan). _VALID_SOURCE_TYPES: tuple[str, ...] = ("keboola", "bigquery", "jira", "local") # Explicit allowlist of audit-payload keys whose values are credentials and # must be masked. Substring-scan + ad-hoc whitelist (the previous shape) is # fragile in two ways: # 1. False positive: legit fields like `primary_key` get masked because # they contain "key" — we then need a whitelist exception, which has # to be kept in sync as new fields are added. # 2. False negative: a future field like `primary_key_hash` *would* be # masked (defensible) but `not_actually_a_token` ALSO matches "token" # and gets masked unnecessarily; conversely, a brand-new credential # field that doesn't contain one of the patterns (`auth_material`, # `bearer`) silently leaks. # Allowlist puts the burden on the developer adding a new secret-bearing # field: they must add the literal key name here, which forces a code- # review touch on the audit path. Audit the current Pydantic models # (RegisterTableRequest / UpdateTableRequest / ConfigureRequest / # ServerConfigUpdateRequest) when extending — the registry payloads don't # currently carry credentials, but ConfigureRequest does (`keboola_token`) # and could be routed through this sanitizer in the future. _SECRET_FIELDS: frozenset = frozenset({ # ConfigureRequest — POST /api/admin/configure carries Keboola creds. "keboola_token", # Generic names that have appeared in earlier iterations of admin # request bodies and could resurface — keep them masked defensively. "api_token", "auth_token", "bot_token", "client_secret", "google_client_secret", "google_oauth_client_secret", "password", "smtp_password", "webapp_secret_key", "bot_secret", # Marketplace PATs (private repos) — see src/marketplace.py. "marketplace_token", "marketplace_pat", }) def _sanitize_for_audit(payload: Dict[str, Any]) -> Dict[str, Any]: """Mask credential-bearing fields in a request payload before audit_log. Uses an explicit `_SECRET_FIELDS` allowlist (case-insensitive) instead of substring matching. The trade-off is that adding a new secret field requires updating the set — but that's the *point*: the test suite asserts `not_actually_a_token` does NOT get masked, so a substring- based regression would surface immediately, and a missing entry for a real new credential gets caught at code review of the audit path. """ out: Dict[str, Any] = {} for k, v in payload.items(): if k.lower() in _SECRET_FIELDS: out[k] = "***" if v not in (None, "") else "" else: out[k] = v return out # Both the BigQuery and Keboola materialize paths funnel `source_query` # through DuckDB (BQ via the bigquery extension's COPY translation, Keboola # via an ATTACH'd extension and a direct COPY). DuckDB uses double quotes # for quoted identifiers — backticks are a BigQuery-native syntactic form # DuckDB's parser does not honor, so a backtick-quoted source_query either # parse-errors at COPY time or silently scans nothing. Surfaced from the # field validator on RegisterTableRequest AND the merged-record path in # `update_table` so neither route can persist a backtick query. _BACKTICK_REJECTION_MESSAGE = ( "source_query uses BigQuery-native backtick identifiers (e.g. " "`project.dataset.table`), but the materialize path runs the SQL " "through DuckDB's BigQuery extension which uses DuckDB-flavor " "identifiers. Rewrite to DuckDB syntax: bq.\"dataset\".\"table\" " "(with the attached catalog alias `bq` plus double-quoted dataset/" "table). The instance is configured with the data project, so you " "don't need to repeat it in the FROM clause." ) class RegisterTableRequest(BaseModel): name: str folder: Optional[str] = None sync_strategy: str = Field( default="full_refresh", deprecated=True, description=( "DEPRECATED: catalog/profiler metadata only. No extractor reads " "this field; every sync is a full overwrite regardless of value. " "profiler.is_partitioned() consumes it for parquet-layout " "detection. Field stays for back-compat; will be removed in a " "future major release." ), ) # Composite primary keys are real (session-grain MSA tables key on # `(session_id, event_date)`, browse rows on more). The frontend sends + # reads this as a list; backend stores it JSON-serialized in VARCHAR. # A bare string is accepted for backward compat — see _normalize_primary_key. primary_key: Optional[List[str]] = None description: Optional[str] = None source_type: Optional[str] = None bucket: Optional[str] = None source_table: Optional[str] = None # Backs query_mode='materialized'. Stored verbatim in # table_registry.source_query (schema v20); the trigger pass runs it # through the DuckDB BQ extension via BqAccess and writes the result # to /data/extracts/bigquery/data/.parquet. source_query: Optional[str] = None query_mode: str = "local" sync_schedule: Optional[str] = None profile_after_sync: bool = Field( default=True, deprecated=True, description=( "DEPRECATED: not consumed by the runtime (Agent 1 finding " "2026-05-01). Profiler runs unconditionally on every synced " "table; this flag has no effect. Field stays for back-compat." ), ) @model_validator(mode="after") def _check_mode_query_coherence(self): """Enforce query_mode ↔ source_query invariants up front so an admin can't persist a remote/local row carrying an orphan source_query, and materialized rows can't be registered without a SQL body.""" sq = (self.source_query or "").strip() or None if self.query_mode == "materialized" and not sq: raise ValueError( "query_mode='materialized' requires a non-empty source_query" ) if self.query_mode != "materialized" and sq: raise ValueError( "source_query is only valid when query_mode='materialized'" ) # The materialize path runs the SQL through DuckDB's parser (BigQuery # extension's COPY pushes it through DuckDB first, and the Keboola # path COPYs the raw SQL through a DuckDB session too). DuckDB does # NOT understand BigQuery-native backtick identifiers — those parse- # error or silently match no rows, leaving no parquet at the # canonical path and no operator-visible failure. Reject at register # time with an actionable message so the bad SQL never lands in # `table_registry.source_query`. See `_run_materialized_pass` for # the runtime path that would otherwise eat the error. if sq and "`" in sq: raise ValueError(_BACKTICK_REJECTION_MESSAGE) # Normalise: stash the trimmed-or-None form so the persisted column # never carries surrounding whitespace or empty-string sentinels. self.source_query = sq return self @field_validator("primary_key", mode="before") @classmethod def _coerce_primary_key(cls, v): return _normalize_primary_key(v) @field_validator("source_type", mode="before") @classmethod def _validate_source_type(cls, v): # None is tolerated for backward compat with old CLI scripts that # didn't set a source_type; the route resolves it later. Anything # else must be in the canonical list. if v in (None, ""): return v if v not in _VALID_SOURCE_TYPES: raise ValueError( f"source_type must be one of {sorted(_VALID_SOURCE_TYPES)}, got {v!r}" ) return v @field_validator("sync_schedule", mode="before") @classmethod def _validate_sync_schedule(cls, v): # None / "" → no schedule, accepted. # Any non-empty string (including pure whitespace) must parse as a # valid schedule — otherwise it would be persisted and silently # ignored by the runtime evaluator. if v in (None, ""): return v if not is_valid_schedule(v): raise ValueError( f"sync_schedule must be 'every Nm' / 'every Nh' / " f"'daily HH:MM[,HH:MM,...]', got {v!r}" ) return v def _validate_bigquery_register_payload(req: "RegisterTableRequest") -> None: """Enforce BQ-specific shape on a register/precheck request. Two BQ paths: - ``query_mode='materialized'`` — admin-registered SQL writes a parquet on schedule. Requires ``source_query``; ``bucket`` / ``source_table`` are not used (the SQL inlines the references). Doesn't force any field; the Pydantic ``model_validator`` already gated the query/mode coherence. - ``query_mode='remote'`` (or default) — remote view over a single BQ table. Requires ``bucket`` (BQ dataset) + ``source_table``. Mutates the model: forces ``query_mode='remote'`` and ``profile_after_sync=False`` (per Decision 7 in #108) so a caller can't accidentally enqueue a parquet profiling pass for a remote view that has no local file. Raises HTTPException(422) for missing required fields and HTTPException(400) for unsafe identifiers / bogus project_id. """ if req.query_mode == "materialized": # Materialized BQ rows: the SQL body replaces dataset+table refs. # Pydantic model_validator already verified source_query is non-empty; # all we still need is a valid project_id and a safe view name. if not req.source_query or not req.source_query.strip(): raise HTTPException( status_code=422, detail="bigquery materialized: 'source_query' is required", ) raw_name = req.name or "" if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name): raise HTTPException( status_code=400, detail=( f"bigquery: view name {raw_name!r} is unsafe — must match " f"^[a-zA-Z_][a-zA-Z0-9_]{{0,63}}$ (DuckDB identifier rules) " "with no leading/trailing whitespace" ), ) from app.instance_config import get_value project_id = get_value("data_source", "bigquery", "project", default="") if not project_id: raise HTTPException( status_code=400, detail=( "bigquery: data_source.bigquery.project is not set in " "instance.yaml; configure it via /admin/server-config or " "/api/admin/configure first" ), ) if not _is_safe_project_id(project_id): raise HTTPException( status_code=400, detail=( f"bigquery: data_source.bigquery.project {project_id!r} " "is malformed — must match GCP project_id grammar " "^[a-z][a-z0-9-]{4,28}[a-z0-9]$" ), ) # Phase C: profile_after_sync is now inert (Pydantic field marked # deprecated; not read by app/api/sync.py:410-438). The runtime # profiles every synced table unconditionally, so we no longer # force-set this here as a "signal." return if not req.bucket or not req.bucket.strip(): raise HTTPException( status_code=422, detail="bigquery: 'bucket' (BQ dataset) is required", ) if not req.source_table or not req.source_table.strip(): raise HTTPException( status_code=422, detail="bigquery: 'source_table' is required", ) # No wildcard / sharded BQ tables in M1 (Decision 8). if "*" in (req.source_table or "") or "*" in (req.bucket or ""): raise HTTPException( status_code=400, detail="bigquery: wildcard / sharded tables are not supported (see #108 M3+)", ) # Strict identifier on the DuckDB view name. CRITICAL: validate the RAW # name (the value that ``register_table`` actually persists to # ``table_registry.name`` and which the BQ extractor reads back as the # DuckDB view name at next rebuild). Earlier revisions normalized first # (``strip().lower().replace(" ", "_")``) and then checked, which let # names like ``"my table"`` pass here, get stored verbatim, and then # blow up inside ``_init_extract`` at view-create time — defeating the # whole point of fast-fail-at-register. We do NOT silently rewrite the # operator's name; if they typed ``"my table"``, return 400 with a # clear message and let them retype with a corrected name. raw_name = req.name or "" if raw_name.strip() != raw_name or not _is_safe_identifier(raw_name): raise HTTPException( status_code=400, detail=( f"bigquery: view name {raw_name!r} is unsafe — must match " f"^[a-zA-Z_][a-zA-Z0-9_]{{0,63}}$ (DuckDB identifier rules) " "with no leading/trailing whitespace" ), ) # Same fast-fail rule as ``raw_name`` above: validate the RAW value the # caller sent, not a stripped form. ``register_table`` persists ``bucket`` # / ``source_table`` verbatim, and the BQ extractor splices them straight # into the ``ATTACH … AS bq_`` and view DDL at next rebuild — so a # value with leading/trailing whitespace passes validation here, gets # stored as-is, and explodes inside DuckDB at view-create time. Surface # the offending raw value in the 400 detail and let the operator retype. raw_bucket = req.bucket if raw_bucket.strip() != raw_bucket or not _is_safe_quoted_identifier(raw_bucket): raise HTTPException( status_code=400, detail=( f"bigquery: dataset {raw_bucket!r} is unsafe (only [A-Za-z0-9_.-] " "allowed, no leading/trailing whitespace)" ), ) raw_source_table = req.source_table if raw_source_table.strip() != raw_source_table or not _is_safe_quoted_identifier(raw_source_table): raise HTTPException( status_code=400, detail=( f"bigquery: source_table {raw_source_table!r} is unsafe (only " "[A-Za-z0-9_.-] allowed, no leading/trailing whitespace)" ), ) # Pull project from instance.yaml — single-project model in M1 # (Decision: no per-table project field). Validate the format here so # we surface a config issue at registration rather than at first # rebuild, where the operator no longer has a request to look at. from app.instance_config import get_value project_id = get_value("data_source", "bigquery", "project", default="") if not project_id: raise HTTPException( status_code=400, detail=( "bigquery: data_source.bigquery.project is not set in instance.yaml; " "configure it via /admin/server-config or /api/admin/configure first" ), ) if not _is_safe_project_id(project_id): raise HTTPException( status_code=400, detail=( f"bigquery: data_source.bigquery.project {project_id!r} is malformed — " "must match GCP project_id grammar ^[a-z][a-z0-9-]{4,28}[a-z0-9]$" ), ) # Force the BQ-required mode (Decision 7). The orchestrator and # extractor both assume remote; persisting `local` here would later create # a profiling job against a non-existent parquet file. # Phase C: profile_after_sync is now inert (deprecated, not read by the # runtime); no longer force-set here. req.query_mode = "remote" # Source types that don't depend on a `data_source..*` block — they # get their data through a different ingestion path (e.g. Jira via # webhooks). Registrations against these types are allowed regardless of # the configured primary `data_source.type`. _SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE: frozenset[str] = frozenset({ "jira", "local", }) def _validate_source_type_configured(source_type: Optional[str]) -> None: """Refuse register-table requests whose ``source_type`` isn't actually configured on this instance. Pre-fix the route happily persisted e.g. ``source_type='keboola'`` on a BQ-only instance — the row landed in the registry but the scheduler had no Keboola URL/token to ATTACH against, so it silently never synced. No upfront error, no operator-visible signal until they noticed the table was missing from `da catalog`. A source_type is considered configured when: - it matches the instance's primary ``data_source.type``, OR - a non-empty ``data_source.`` block exists in the effective `instance.yaml` (multi-source instances), OR - it's in the small allowlist of types that don't sit under `data_source.*` at all (Jira, local — see ``_SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE``). Special case: when the configured primary is ``'local'`` (the default when an instance is freshly bootstrapped and no `data_source.type` has been set yet), the validator stays permissive — refusing registrations here would block the first-time-setup workflow where the operator registers a few tables against a not-yet-fully-configured instance. The misconfiguration that this validator targets is the *explicit mismatch*: `type=bigquery` instance + `source_type=keboola` payload with no `data_source.keboola.*` block. That case still 422s. A bare/None source_type is tolerated for backward compat with legacy CLI scripts; the route resolves it later against ``get_data_source_type()``. """ if not source_type: return if source_type in _SOURCE_TYPES_INDEPENDENT_OF_DATA_SOURCE: return from app.instance_config import get_data_source_type, get_value configured_primary = get_data_source_type() if source_type == configured_primary: return # Multi-source: accept if a non-empty `data_source.` block # exists. Empty dict / None / "" all count as "not configured". secondary_block = get_value("data_source", source_type, default=None) if secondary_block: # Truthy non-empty dict / mapping / scalar — treat as configured. return # Bootstrap-friendliness: a primary of 'local' means the instance hasn't # been pointed at a real source yet (or has been deliberately set to # local-only). Don't gate registrations in that state — the operator is # likely in the middle of first-time setup and will fill in the config # next. The check still fires when primary is an actual source type # (bigquery / keboola) and the requested source_type doesn't match # AND has no secondary block. if configured_primary == "local": return raise HTTPException( status_code=422, detail=( f"source_type={source_type!r} is not configured on this instance. " f"The configured data source is {configured_primary!r}. To enable " f"a secondary source, set data_source.{source_type}.* fields in " "instance.yaml or via /admin/server-config." ), ) class UpdateTableRequest(BaseModel): name: Optional[str] = None sync_strategy: Optional[str] = Field( default=None, deprecated=True, description=( "DEPRECATED: catalog/profiler metadata only. See " "RegisterTableRequest.sync_strategy." ), ) primary_key: Optional[List[str]] = None description: Optional[str] = None source_type: Optional[str] = None bucket: Optional[str] = None source_table: Optional[str] = None source_query: Optional[str] = None query_mode: Optional[str] = None sync_schedule: Optional[str] = None profile_after_sync: Optional[bool] = Field( default=None, deprecated=True, description=( "DEPRECATED: not consumed by the runtime. See " "RegisterTableRequest.profile_after_sync." ), ) @model_validator(mode="after") def _check_mode_query_coherence(self): """PUT semantics — only the fields explicitly in the body are validated. The body is overlaid on the existing row at the handler level (see ``update_table``), so omitted fields keep their stored values and the synthetic ``RegisterTableRequest`` constructed against the merged record runs the strict cross-field check before persist. The only invariants enforceable from the PUT body alone: - explicit ``source_query='SELECT ...'`` paired with ``query_mode`` that isn't materialized → coherent reject (the SQL would be dead); - explicit ``source_query='SELECT ...'`` without any ``query_mode`` in the body → reject; the operator must commit to materialized; - explicit empty/whitespace ``source_query=''`` paired with ``query_mode='materialized'`` → reject (operator clearly mistyped — they sent the field). Pre-fix this validator also rejected ``{"query_mode": "materialized", "sync_schedule": "every 12h"}`` because ``source_query`` was None — but that's the canonical "edit the schedule on a materialized row" use-case from the Edit modal, which always sends ``query_mode`` to indicate intent. Devin BUG_0002 on PR #148 commit 2219255. """ if self.query_mode is None and self.source_query is None: return self sq_raw = self.source_query sq = (sq_raw or "").strip() or None # Operator explicitly sent source_query as empty/whitespace while # claiming materialized — typo / bad form data, reject. if ( self.query_mode == "materialized" and sq_raw is not None and not sq ): raise ValueError( "query_mode='materialized' requires a non-empty source_query" ) # source_query only makes sense with materialized mode. Allow None # (omitted) to flow through; only reject when explicitly set with # the wrong mode. if ( self.query_mode is not None and self.query_mode != "materialized" and sq ): raise ValueError( "source_query is only valid when query_mode='materialized'" ) if self.query_mode is None and sq: raise ValueError( "source_query requires query_mode='materialized' to be set " "in the same request" ) # Normalise: drop whitespace-only strings to None so the persisted # column is clean. Don't touch when source_query was None to begin # with — that signals "PUT didn't touch this field, keep existing". if sq_raw is not None: self.source_query = sq return self @field_validator("primary_key", mode="before") @classmethod def _coerce_primary_key(cls, v): return _normalize_primary_key(v) # Duplicated from RegisterTableRequest — Pydantic v2 validators don't # inherit cleanly across unrelated BaseModel classes; a shared mixin # would be overkill for two fields. @field_validator("sync_schedule", mode="before") @classmethod def _validate_sync_schedule(cls, v): # None / "" → no schedule, accepted. # Any non-empty string (including pure whitespace) must parse as a # valid schedule — otherwise it would be persisted and silently # ignored by the runtime evaluator. if v in (None, ""): return v if not is_valid_schedule(v): raise ValueError( f"sync_schedule must be 'every Nm' / 'every Nh' / " f"'daily HH:MM[,HH:MM,...]', got {v!r}" ) return v class ConfigureRequest(BaseModel): data_source: str # "keboola" | "bigquery" | "local" keboola_token: Optional[str] = None keboola_url: Optional[str] = None bigquery_project: Optional[str] = None bigquery_location: Optional[str] = None instance_name: Optional[str] = None allowed_domain: Optional[str] = None @router.get("/discover-tables") async def discover_tables( user: dict = Depends(require_admin), dataset: Optional[str] = None, ): """Discover available tables from the configured data source. For ``data_source.type='keboola'`` returns the full Storage API table list (single round-trip). For ``data_source.type='bigquery'``: - Without ``dataset``: list datasets in the configured project. - With ``dataset=name``: list tables (BASE TABLE + VIEW) in that dataset. Two-step shape avoids paying the per-dataset list_tables cost up-front on projects with hundreds of datasets — the UI populates the dataset dropdown first, then fetches tables only for the selected dataset. """ try: from app.instance_config import get_data_source_type source_type = get_data_source_type() if source_type == "keboola": from connectors.keboola.client import KeboolaClient from app.instance_config import get_value url = get_value("data_source", "keboola", "stack_url", default="") token_env = get_value("data_source", "keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN") token = os.environ.get(token_env, "") if token_env else "" if not token: token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "") client = KeboolaClient(token=token, url=url) tables = client.discover_all_tables() return {"tables": tables, "count": len(tables), "source": "keboola"} if source_type == "bigquery": return _discover_bigquery(dataset=dataset) return { "tables": [], "count": 0, "source": source_type, "error": f"Discovery not implemented for source_type={source_type!r}", } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Discovery failed: {e}") def _discover_bigquery(dataset: Optional[str]) -> Dict[str, Any]: """List BQ datasets (when ``dataset`` is None) or tables-in-dataset. Routes through ``BqAccess.client()`` so config / auth / error translation matches the rest of the BQ surface (#138 facade). Returns the same shape as the Keboola path so the UI doesn't have to branch. """ from connectors.bigquery.access import ( get_bq_access, BqAccessError, translate_bq_error, ) try: bq = get_bq_access() client = bq.client() except BqAccessError as e: raise HTTPException( status_code=BqAccessError.HTTP_STATUS.get(e.kind, 500), detail={"error": e.message, "kind": e.kind, "details": e.details}, ) try: if dataset is None: datasets = [] for ds in client.list_datasets(): datasets.append({ "dataset_id": ds.dataset_id, "full_id": f"{ds.project}.{ds.dataset_id}", }) return { "datasets": sorted(datasets, key=lambda d: d["dataset_id"]), "count": len(datasets), "source": "bigquery", } # List tables in the named dataset. `list_tables` returns # `TableListItem` with `table_id` + `table_type` ('TABLE', 'VIEW', # 'MATERIALIZED_VIEW', 'EXTERNAL', 'SNAPSHOT'). UI maps TABLE → Type # selector "table" and VIEW/MATERIALIZED_VIEW → "view"; the rest are # passed through with their raw type so the operator can decide. tables = [] for t in client.list_tables(dataset): tables.append({ "table_id": t.table_id, "table_type": t.table_type, "full_id": f"{t.project}.{t.dataset_id}.{t.table_id}", }) return { "tables": sorted(tables, key=lambda t: t["table_id"]), "count": len(tables), "source": "bigquery", "dataset": dataset, } except Exception as e: # `translate_bq_error` re-raises non-Google exceptions unchanged, # so wrap in HTTPException to keep the JSON-shape contract. try: err = translate_bq_error(e, bq.projects, bad_request_status="upstream_error") except Exception: raise HTTPException(status_code=502, detail=f"BQ discovery failed: {e}") raise HTTPException( status_code=BqAccessError.HTTP_STATUS.get(err.kind, 502), detail={"error": err.message, "kind": err.kind, "details": err.details}, ) @router.get("/registry") async def list_registry( user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Get full table registry. Each table row is enriched with `last_sync_error` from sync_state so operators can see WHY a row isn't materializing without trawling scheduler logs. None for rows that have never errored or have already recovered (status='ok'); the per-row error message string otherwise. """ repo = TableRegistryRepository(conn) tables = repo.list_all() # Single batched read of sync_state errors — avoid N+1 GETs against # `sync_state` for large registries. The sync_state row is keyed on # `table_id` which mirrors `table_registry.name` (see comment in # _run_materialized_pass / _build_manifest_for_user about name vs id). error_by_name: Dict[str, Optional[str]] = {} try: rows = conn.execute( "SELECT table_id, error FROM sync_state " "WHERE status = 'error' AND error IS NOT NULL AND error <> ''" ).fetchall() error_by_name = {r[0]: r[1] for r in rows} except Exception: # Defensive: if sync_state is unreadable for any reason, the # registry response still serializes — operators just lose the # last_sync_error column on this call. logger.exception("Failed to read sync_state errors for registry") for t in tables: # Sync_state.table_id == table_registry.name by convention. t["last_sync_error"] = error_by_name.get(t.get("name")) return {"tables": tables, "count": len(tables)} # Wall-clock budget for the synchronous BQ materialization that runs after # a successful BQ register. If the rebuild + view creation exceeds this, # we hand the rest off to BackgroundTasks and return 202. 5s matches the # UX contract in #108 ("Queryable as within seconds") — long enough # to cover a healthy GCE round-trip, short enough that a hung GCE call # doesn't park the request handler. _BQ_SYNC_REGISTER_TIMEOUT_S: float = 5.0 def _materialize_bigquery_extract() -> Dict[str, Any]: """Re-build the BigQuery extract.duckdb + master views. Wrapper used by both the synchronous (in-band) and async (BackgroundTask) code paths after a BQ register/update/delete. Imports kept inside the function so non-BQ instances don't pay the import cost on app start. Opens a FRESH system DB connection rather than reusing the request-scoped one. The request handler closes its connection in a `finally` after the response, but BackgroundTask + the timeout-fallback daemon thread can both outlive that close — they would then operate on a closed handle (or one being torn down concurrently). A fresh handle is cheap (DuckDB is an embedded engine) and isolates the worker's lifetime from the request's. Returns the rebuild result dict (``{"errors": [...], "tables_registered": N, ...}``) so the synchronous caller can propagate failures to the operator. Background-task callers ignore the return value, but the loud log inside ``_run_bigquery_materialize_with_timeout`` covers that path. """ from connectors.bigquery import extractor as _bq_extractor from src.db import get_system_db from src.orchestrator import SyncOrchestrator fresh_conn = get_system_db() try: result = _bq_extractor.rebuild_from_registry(conn=fresh_conn) SyncOrchestrator().rebuild() return result or {} finally: try: fresh_conn.close() except Exception: pass def _materialize_bigquery_extract_bg() -> None: """BackgroundTask wrapper around `_materialize_bigquery_extract`. BackgroundTasks discard return values, but `rebuild_from_registry` can surface auth / config / identifier errors via the ``errors`` list. Log those at ERROR level so the failure is loud in the operator's logs even though the 202 response can't carry the detail (Decision 3 in #108: a 202 is documented as "accepted, may not be queryable yet" — we don't block on it but we shouldn't swallow it either). """ try: result = _materialize_bigquery_extract() except Exception: logger.exception("BQ post-register background materialize crashed") return errors = (result or {}).get("errors") or [] if errors: logger.error( "BQ post-register background materialize completed with %d error(s): %s", len(errors), errors, ) def _run_bigquery_materialize_with_timeout( background: BackgroundTasks, ) -> Dict[str, Any]: """Try to materialize synchronously within the wall-clock budget. Returns a dict with: - ``status`` ∈ {"ok", "errors", "timeout"} — caller maps to HTTP code - ``errors``: list of {table, error} surfaced by ``rebuild_from_registry`` (only present on ``status="errors"``) Mapping by caller (`register_table`): - "ok" → 200 (synchronous success) - "errors" → 500 (rebuild ran but reported errors — propagate so the operator knows the registry row exists but the view wasn't created) - "timeout" → 202 (rebuild still running on a BackgroundTask) The synchronous worker runs on a daemon thread (so a hung GCE call can't park the request) that opens its OWN system DB connection (see `_materialize_bigquery_extract`). Even though FastAPI now invokes the sync route in a threadpool — and `done.wait()` no longer blocks the event loop — we still off-load to a daemon so the wait is bounded even if `rebuild_from_registry` ignores its own timeouts. """ import threading done = threading.Event() err_holder: Dict[str, Any] = {} result_holder: Dict[str, Any] = {} def _worker(): try: result_holder["result"] = _materialize_bigquery_extract() except Exception as e: # pragma: no cover — logged below err_holder["error"] = e finally: done.set() t = threading.Thread(target=_worker, daemon=True, name="bq-register-rebuild") t.start() finished = done.wait(_BQ_SYNC_REGISTER_TIMEOUT_S) if finished: if "error" in err_holder: # Worker finished within the wall-clock budget but raised. This # is a HARD ERROR, not a timeout — surface it as such so the # operator gets the actual exception in the 500 body instead # of a misleading 202 + "still working in the background". # Earlier revisions returned ``{"status": "timeout"}`` here, # which the register handler then mapped to 202 + a retry # BackgroundTask; that hid the real failure for `_BQ_SYNC_ # REGISTER_TIMEOUT_S` seconds before the BG retry surfaced # the same exception in the logs. exc = err_holder["error"] logger.error( "BQ post-register rebuild raised within budget: %r", exc, ) return { "status": "errors", "errors": [{"error": f"{type(exc).__name__}: {exc}"}], } # Synchronous worker finished cleanly — but check whether # `rebuild_from_registry` itself surfaced any errors (auth fail, # missing project from the overlay, unsafe identifier slipping the # validator, etc.). Without this, those errors got silently logged # and the API claimed success. result = result_holder.get("result") or {} errors = result.get("errors") or [] if errors: logger.error( "BQ post-register rebuild reported %d error(s): %s", len(errors), errors, ) return {"status": "errors", "errors": errors} return {"status": "ok"} # Timed out — let the worker keep running on its thread (already daemon) # and also schedule a BackgroundTask so the orchestrator gets called via # the supported FastAPI path. `_INIT_EXTRACT_LOCK` in the BQ extractor # serializes the two file-swap calls so the slow daemon thread and the # background task can't tear `extract.duckdb`; the orchestrator's own # `_rebuild_lock` protects the master-view rebuild step downstream. logger.info( "BQ post-register rebuild exceeded %ss budget — handing off to BackgroundTask", _BQ_SYNC_REGISTER_TIMEOUT_S, ) background.add_task(_materialize_bigquery_extract_bg) return {"status": "timeout"} @router.post( "/register-table", responses={ 200: {"description": "BigQuery row registered + materialized synchronously"}, 201: {"description": "Non-BigQuery row registered (no post-insert materialize)"}, 202: {"description": "BigQuery row registered; materialize continues in background"}, 409: {"description": "Table id or view name already in use"}, 500: {"description": "BigQuery row registered but post-insert rebuild failed"}, }, ) def register_table( request: RegisterTableRequest, background: BackgroundTasks, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Register a new table in the system. Behavior by source_type: - **bigquery**: validates BQ-specific shape (dataset / source_table / identifier safety / project_id format), forces query_mode='remote' and profile_after_sync=False, then synchronously rebuilds extract.duckdb + master views with a wall-clock budget. Returns 200 with the view name on success, 202 on budget overrun (rebuild continues in a BackgroundTask), or 500 if the synchronous rebuild ran but reported an error (e.g. auth failure, missing project, unsafe identifier). - other source types: insert-only, no post-register hook. Returns 201. Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a threadpool — the synchronous-materialize path waits on ``threading.Event.wait()``, which would otherwise block the asyncio event loop and stall every other request for up to ``_BQ_SYNC_REGISTER_ TIMEOUT_S``. ``Depends(...)``, ``BackgroundTasks``, and ``JSONResponse`` all work the same in sync handlers; the rest of the admin module mixes both styles already. The route does NOT carry a default ``status_code`` — each branch returns its own JSONResponse with the right code. A blanket ``status_code=201`` on the decorator would mislead OpenAPI consumers about the BQ branch. Always: 409 on view-name collision against the existing registry, audit log entry on success. """ from fastapi.responses import JSONResponse if not request.name or not request.name.strip(): raise HTTPException(status_code=422, detail="Table name cannot be empty") repo = TableRegistryRepository(conn) table_id = request.name.strip().lower().replace(" ", "_") if repo.get(table_id): raise HTTPException(status_code=409, detail=f"Table '{table_id}' already registered") # View-name collision pre-check — distinct from id collision above. # `id` is derived from `name`, but two callers could legally pick # different display names that lower-case + slugify to the same view # (e.g. "Orders v2" + "orders_v2"); the strict view-name uniqueness # check stops that here, before the orchestrator surfaces it as a # silent overwrite at next rebuild. existing_by_name = next( (r for r in repo.list_all() if (r.get("name") or "") == request.name), None, ) if existing_by_name is not None: raise HTTPException( status_code=409, detail=f"View name '{request.name}' is already in use by table id '{existing_by_name.get('id')}'", ) # Refuse rows whose source_type isn't actually configured — pre-fix the # row landed in the registry but never synced because there was no # Keboola URL/token (or BQ project) to ATTACH against. Surfaces the # misconfig at registration time so the operator sees the gap before # they wonder why `da catalog` is missing the table. _validate_source_type_configured(request.source_type) # BQ rows go through the extra validation + post-insert materialization # contract from issue #108. Other source types keep the legacy insert-only # flow — Keboola materialization happens via the scheduled sync, Jira via # webhook, local via a manual extractor run. is_bigquery = request.source_type == "bigquery" if is_bigquery: _validate_bigquery_register_payload(request) # Phase C: profile_after_sync is no longer passed — the field is # deprecated and inert at the runtime layer. The DB column keeps its # schema default; the registry response no longer reflects request # values for this flag. repo.register( id=table_id, name=request.name, folder=request.folder, sync_strategy=request.sync_strategy, primary_key=request.primary_key, description=request.description, registered_by=user.get("email"), source_type=request.source_type, bucket=request.bucket, source_table=request.source_table, source_query=request.source_query, query_mode=request.query_mode, sync_schedule=request.sync_schedule, ) # Audit entry — masked params; description kept raw (it's documentation). AuditRepository(conn).log( user_id=user.get("id"), action="register_table", resource=table_id, params=_sanitize_for_audit(request.model_dump()), ) if not is_bigquery: # Keboola / Jira / local rows are insert-only here. 201 Created — the # decorator no longer carries a default status, so each branch is # explicit about its code (BQ branch overrides via JSONResponse). return JSONResponse( status_code=201, content={"id": table_id, "name": request.name, "status": "registered"}, ) if request.query_mode == "materialized": # Materialized BQ rows are picked up by the trigger pass on the next # scheduled tick (or via POST /api/sync/trigger). No synchronous # rebuild — the COPY can scan multi-GB and would block the request. return JSONResponse( status_code=201, content={ "id": table_id, "name": request.name, "status": "registered", "view_name": table_id, "message": ( "Materialized — parquet will be written on the next sync " "tick. Trigger now via POST /api/sync/trigger." ), }, ) # BQ post-register: rebuild extract + master views, with timeout fallback. # Decision 1: 200 on synchronous success, 202 on timeout, 500 if the # synchronous rebuild surfaced errors. Distinct from the 201 Keboola # path above, so the BQ branch builds its own response. outcome = _run_bigquery_materialize_with_timeout(background) status = outcome.get("status") if status == "ok": return JSONResponse( status_code=200, content={ "id": table_id, "name": request.name, "status": "ok", "view_name": table_id, }, ) if status == "errors": # Registry insert succeeded but the post-insert rebuild reported # errors — the row is in the registry but the master view was NOT # created. Surface the failure verbatim so the operator can fix # the underlying config (typically a missing # `data_source.bigquery.project` in the overlay or auth that lacks # bigquery.metadata.get on the dataset). The row stays in the # registry; a re-run after fixing the config picks up the existing # row and creates the view on the next register/update or # scheduler tick. return JSONResponse( status_code=500, content={ "id": table_id, "name": request.name, "status": "rebuild_failed", "view_name": table_id, "errors": outcome.get("errors") or [], "message": ( "Registry row created but post-insert rebuild failed; " "view is not queryable. See `errors` for details." ), }, ) # Default: timeout — rebuild continues on a BackgroundTask. return JSONResponse( status_code=202, content={ "id": table_id, "name": request.name, "status": "accepted", "view_name": table_id, "message": "Registration accepted; materializing in background", }, ) class PrecheckResponse(BaseModel): """Response model for /api/admin/register-table/precheck. Documented here so OpenAPI consumers know what to expect; the route returns a plain dict for backwards compatibility with the rest of the admin API which doesn't use response_model. """ ok: bool table: Dict[str, Any] @router.post("/register-table/precheck") def register_table_precheck( request: RegisterTableRequest, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Validate a register-table payload + (BQ only) confirm the source table exists. No DB write. Used by the UI to surface row count + size + column count in the modal before the operator clicks Register, and by the CLI's ``--dry-run`` to print what *would* be registered without touching state. Identical Pydantic validation to register-table; for BQ rows we additionally make a ``bigquery.Client(project).get_table(...)`` call and surface the GCP error verbatim. Defined as a plain ``def`` (not ``async def``) so FastAPI runs it in a threadpool — the BQ branch makes synchronous ``bigquery.Client(...)`` /``client.get_table(...)`` calls, which would otherwise block the asyncio event loop and stall every other request for the duration of the GCE round-trip. Mirrors the same conversion done for ``register_table`` (see comment on that route). ``Depends(...)`` works identically in sync handlers. """ if not request.name or not request.name.strip(): raise HTTPException(status_code=422, detail="Table name cannot be empty") if request.source_type != "bigquery": # M1 only adds BQ-specific precheck. Other source types get a # validation-only response so the CLI / UI can rely on the same # endpoint shape across types. return { "ok": True, "table": { "name": request.name, "source_type": request.source_type, "bucket": request.bucket, "source_table": request.source_table, "rows": None, "size_bytes": None, "columns": [], "note": "precheck for non-bigquery sources is validation-only in M1", }, } # BQ-specific shape validation (forces query_mode/profile_after_sync, # checks identifier safety, validates project_id from instance.yaml). _validate_bigquery_register_payload(request) # Materialized BQ rows have no `dataset.source_table` to round-trip — # the SQL body is the contract. Skip the BQ-jobs-API call and return a # validation-only precheck so the CLI's `--dry-run --query-mode # materialized` path doesn't crash on an empty fully-qualified name. if request.query_mode == "materialized": return { "ok": True, "table": { "name": request.name, "source_type": "bigquery", "query_mode": "materialized", "source_query": request.source_query, "rows": None, "size_bytes": None, "columns": [], "note": ( "materialized precheck is validation-only — the SQL is " "evaluated for cost on each scheduled materialize tick" ), }, } # Round-trip the BQ jobs API to confirm the table exists and the SA can # see it. Imports kept local to avoid pulling google-cloud-bigquery into # the import chain on non-BQ instances. try: from google.cloud import bigquery # noqa: PLC0415 from google.api_core import exceptions as google_exc # noqa: PLC0415 except ImportError as e: raise HTTPException( status_code=500, detail=( "google-cloud-bigquery not installed; install the bigquery " f"extras to use BQ precheck ({e})" ), ) from e from app.instance_config import get_value project_id = get_value("data_source", "bigquery", "project", default="") dataset = (request.bucket or "").strip() source_table = (request.source_table or "").strip() fq = f"{project_id}.{dataset}.{source_table}" try: client = bigquery.Client(project=project_id) bq_table = client.get_table(fq) except google_exc.NotFound as e: raise HTTPException(status_code=404, detail=f"BigQuery table not found: {fq} ({e})") from e except google_exc.Forbidden as e: raise HTTPException( status_code=403, detail=( f"BigQuery access denied for {fq}: {e}. " "Service account needs bigquery.metadata.get on the dataset." ), ) from e except Exception as e: # Auth errors, transient 5xx, malformed table refs — surface as 400 # so the operator gets the GCP error verbatim and can fix their # config without us guessing the right HTTP code. raise HTTPException(status_code=400, detail=f"BigQuery precheck failed for {fq}: {e}") from e columns = [ {"name": f.name, "type": f.field_type} for f in (bq_table.schema or []) ] return { "ok": True, "table": { "name": request.name, "source_type": "bigquery", "bucket": dataset, "source_table": source_table, "project_id": project_id, "rows": int(bq_table.num_rows or 0), "size_bytes": int(bq_table.num_bytes or 0), "columns": columns, "column_count": len(columns), }, } @router.put("/registry/{table_id}") async def update_table( table_id: str, request: UpdateTableRequest, background: BackgroundTasks, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Update a registered table's configuration. For BQ rows, schedules a background rebuild so the master view picks up changes (e.g. a renamed dataset) without waiting for the next scheduled sync. """ repo = TableRegistryRepository(conn) existing = repo.get(table_id) if not existing: raise HTTPException(status_code=404, detail="Table not found") updates = {k: v for k, v in request.model_dump().items() if v is not None} # Run BQ-shape validation BEFORE persisting whenever the merged record # would be a bigquery row (existing was BQ, or the patch flips it to BQ, # or the patch touches BQ-relevant fields on an already-BQ row). Without # this gate, an admin could PUT `bucket="evil\"; DROP --"` onto a BQ # row and the next rebuild would silently fail at view-create time — # surface the bad shape at PUT time instead. if updates: # Preserve the original `registered_at` across PUTs — `repo.register` # now accepts it as an optional kwarg; without this the upsert would # stamp a fresh `now()` on every edit (issue #130). merged = dict(existing) merged.update(updates) merged.pop("id", None) # avoid duplicate id kwarg # When switching the merged record away from materialized mode, drop # the stale source_query — the request validator can't clear it via # the `if v is not None` filter above. Without this, a remote/local # row would carry an orphan source_query in the registry. if merged.get("query_mode") != "materialized": merged["source_query"] = None # Cross-source coherence: query_mode='materialized' requires a # non-empty source_query for ALL source types, not just BigQuery. # Pre-fix, only the BQ-specific synthetic-RegisterTableRequest below # caught this — Keboola materialized rows could be PUT without # source_query and persisted with source_query=None, then crash at # the next sync tick when kb_materialize_query received `sql=None` # and DuckDB rejected `COPY (None) TO ...`. Devin finding 2026-05-01: # BUG_pr-review-job-58ae3148_0001. if merged.get("query_mode") == "materialized": sq = merged.get("source_query") if not sq or not str(sq).strip(): raise HTTPException( status_code=422, detail=( "query_mode='materialized' requires a non-empty " "source_query. To revert to a non-materialized mode, " "PATCH query_mode='local' (Keboola) or 'remote' " "(BigQuery) and the stale source_query is cleared " "automatically." ), ) # Backtick rejection on the merged record — see # `_BACKTICK_REJECTION_MESSAGE` for the rationale. Catches PATCHes # that flip `source_query` to a backtick form on an already- # materialized row, which the synthetic-RegisterTableRequest below # only re-validates for BQ rows. Apply uniformly so Keboola # materialized rows can't carry one either. if "`" in str(sq): raise HTTPException( status_code=422, detail=_BACKTICK_REJECTION_MESSAGE, ) if merged.get("source_type") == "bigquery": # Reuse the register-time validator. It mutates the request to # force query_mode='remote' / profile_after_sync=False (or to # leave a materialized row alone) — apply the same coercion to # `merged` so the persisted row matches. synthetic = RegisterTableRequest( name=merged.get("name") or table_id, bucket=merged.get("bucket"), source_table=merged.get("source_table"), source_query=merged.get("source_query"), source_type="bigquery", query_mode=merged.get("query_mode") or "remote", profile_after_sync=bool(merged.get("profile_after_sync") or False), primary_key=merged.get("primary_key"), description=merged.get("description"), folder=merged.get("folder"), sync_strategy=merged.get("sync_strategy") or "full_refresh", sync_schedule=merged.get("sync_schedule"), ) _validate_bigquery_register_payload(synthetic) merged["query_mode"] = synthetic.query_mode merged["profile_after_sync"] = synthetic.profile_after_sync merged["source_query"] = synthetic.source_query repo.register(id=table_id, **merged) AuditRepository(conn).log( user_id=user.get("id"), action="update_table", resource=table_id, params=_sanitize_for_audit({"updated_fields": sorted(updates.keys()), **updates}), ) # If we updated a BQ row (or one that's now BQ), refresh the extract in # the background so the view picks up renames / column-list changes. # Use the BG wrapper so any rebuild errors are logged at ERROR level # instead of being silently dropped by BackgroundTasks (which discards # return values). after = repo.get(table_id) or {} if after.get("source_type") == "bigquery": background.add_task(_materialize_bigquery_extract_bg) return {"id": table_id, "updated": list(updates.keys())} @router.delete("/registry/{table_id}", status_code=204) async def unregister_table( table_id: str, background: BackgroundTasks, user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Unregister a table from the system. For BQ rows, schedules a background rebuild so the dropped row's master view is removed from analytics.duckdb (rather than hanging around until the next scheduled sync). For materialized rows, also removes the canonical parquet at `${DATA_DIR}/extracts//data/.parquet` and clears the matching `sync_state` row. Without these two cleanups, the manifest endpoint kept advertising the dropped table to `da sync` (sync_state-driven) and the orchestrator's next rebuild could resurrect a master view from the leftover parquet (E2E sub-agent finding 2026-05-01). """ repo = TableRegistryRepository(conn) existing = repo.get(table_id) if not existing: raise HTTPException(status_code=404, detail="Table not found") was_bigquery = existing.get("source_type") == "bigquery" was_materialized = existing.get("query_mode") == "materialized" source_type = existing.get("source_type") or "" name = existing.get("name") or table_id repo.unregister(table_id) # Drop the canonical parquet for materialized rows. Path layout: # `${DATA_DIR}/extracts//data/.parquet` — the # filename is keyed by `table_registry.name` (matches sync_state # bookkeeping convention; see _run_materialized_pass + the manifest # builder for the same name-keyed lookup). Defensively remove the # `.parquet.tmp` sibling too in case a prior materialize crashed # mid-COPY. Failure to remove (file missing, permission error) is # logged but doesn't fail the DELETE — the registry row is already # gone, and the orphan parquet will not produce a master view at # next rebuild because the orchestrator's _meta-driven scan never # picks up bare parquet files. if was_materialized and source_type in ("bigquery", "keboola"): try: data_dir = Path(os.environ.get("DATA_DIR", "./data")) base = data_dir / "extracts" / source_type / "data" for candidate in ( base / f"{name}.parquet", base / f"{name}.parquet.tmp", ): if candidate.exists(): candidate.unlink() logger.info( "Removed materialized parquet for unregistered table %s: %s", table_id, candidate, ) except Exception as e: logger.warning( "Failed to remove materialized parquet for %s: %s — registry row is " "still dropped; clean up the file manually if it lingers", table_id, e, ) # Clear sync_state for any source/mode (a row that was synced at any # point — local/materialized — has a sync_state entry that the manifest # serves regardless of registry state). Pre-fix, the manifest still # advertised the dropped table to `da sync` because sync_state was # never cleaned up, and analysts kept getting it through the manifest. try: conn.execute("DELETE FROM sync_state WHERE table_id = ?", [name]) conn.execute("DELETE FROM sync_history WHERE table_id = ?", [name]) except Exception as e: logger.warning( "Failed to clear sync_state for unregistered table %s: %s — " "manifest may still advertise the dropped row to da sync", table_id, e, ) AuditRepository(conn).log( user_id=user.get("id"), action="unregister_table", resource=table_id, params=_sanitize_for_audit({ "name": existing.get("name"), "source_type": existing.get("source_type"), "bucket": existing.get("bucket"), "source_table": existing.get("source_table"), }), ) if was_bigquery: background.add_task(_materialize_bigquery_extract_bg) @router.post("/configure") async def configure_instance( request: ConfigureRequest, user: dict = Depends(require_admin), ): """Configure data source and instance settings via API. Writes config to instance.yaml and persists secrets to .env_overlay. AI agents and the /setup wizard use this instead of manual file editing. """ import yaml if request.data_source not in ("keboola", "bigquery", "local"): raise HTTPException(status_code=400, detail="data_source must be 'keboola', 'bigquery', or 'local'") # Validate credentials if provided if request.data_source == "keboola": if not request.keboola_token or not request.keboola_url: raise HTTPException(status_code=400, detail="keboola_token and keboola_url are required for Keboola data source") _validate_url_not_private(request.keboola_url, field_name="keboola_url") try: from connectors.keboola.client import KeboolaClient client = KeboolaClient(token=request.keboola_token, url=request.keboola_url) client.test_connection() except Exception as e: logger.error("Keboola connection validation failed: %s", e) raise HTTPException(status_code=400, detail="Keboola connection failed. Check your token and URL.") elif request.data_source == "bigquery": if not request.bigquery_project: raise HTTPException(status_code=400, detail="bigquery_project is required for BigQuery data source") # Write instance.yaml to DATA_DIR/state/ (writable Docker volume), # NOT to CONFIG_DIR which is mounted read-only in Docker. # # Narrow-overlay write strategy — must match `/api/admin/server-config`: # 1. Read overlay verbatim (do NOT fall back to static). Falling back # would copy env-resolved cleartext secrets from the merged static # file back into the overlay (e.g. `smtp_password: ${SMTP_PASSWORD}` # → `smtp_password: hunter2`). The wizard only ever sets # `instance`, `auth`, `data_source` here, so other sections must # flow from the static file via `load_instance_config`'s deep-merge # — they don't belong in the overlay at all. # 2. Patch only the sections this endpoint touches. # 3. Write the narrow overlay back atomically (tmp + os.replace). data_dir = Path(os.environ.get("DATA_DIR", "./data")) config_path = data_dir / "state" / "instance.yaml" # Same serialization + corrupt-overlay handling as POST /server-config. with _overlay_write_lock: overlay: dict = {} if config_path.exists(): try: overlay = yaml.safe_load(config_path.read_text()) or {} except Exception as e: logger.exception("configure: refusing to overwrite corrupt overlay at %s", config_path) raise HTTPException( status_code=500, detail=f"refusing to overwrite corrupt overlay at {config_path} ({e}); " "back up and remove the file, or fix it by hand", ) from e # Merge instance settings into the overlay only — never seed from the # env-resolved merged config. if request.instance_name: overlay.setdefault("instance", {})["name"] = request.instance_name if request.allowed_domain: overlay.setdefault("auth", {})["allowed_domain"] = request.allowed_domain # data_source is fully owned by this endpoint — replace wholesale. overlay["data_source"] = {"type": request.data_source} if request.data_source == "keboola": overlay["data_source"]["keboola"] = { "stack_url": request.keboola_url, "token_env": "KEBOOLA_STORAGE_TOKEN", } elif request.data_source == "bigquery": overlay["data_source"]["bigquery"] = { "project": request.bigquery_project, "location": request.bigquery_location or "us", } # Atomic write to writable data volume — same tmp + os.replace pattern # as the server-config editor so a concurrent save can't tear the file. config_path.parent.mkdir(parents=True, exist_ok=True) tmp_path = config_path.with_suffix(config_path.suffix + ".tmp") tmp_path.write_text(yaml.dump(overlay, default_flow_style=False, sort_keys=False)) os.replace(tmp_path, config_path) logger.info("Wrote instance config to %s", config_path) # Persist secrets to .env_overlay (in data volume, never in git) secrets_to_persist = {} if request.keboola_token: secrets_to_persist["KEBOOLA_STORAGE_TOKEN"] = request.keboola_token if request.keboola_url: secrets_to_persist["KEBOOLA_STACK_URL"] = request.keboola_url if secrets_to_persist: data_dir = Path(os.environ.get("DATA_DIR", "./data")) overlay_path = data_dir / "state" / ".env_overlay" overlay_path.parent.mkdir(parents=True, exist_ok=True) # Merge with existing overlay existing_overlay = {} if overlay_path.exists(): for line in overlay_path.read_text().splitlines(): if "=" in line and not line.startswith("#"): k, v = line.split("=", 1) existing_overlay[k.strip()] = v.strip() existing_overlay.update(secrets_to_persist) overlay_path.write_text( "\n".join(f"{k}={v}" for k, v in existing_overlay.items()) + "\n" ) try: overlay_path.chmod(0o600) except OSError: pass logger.info("Persisted %d secrets to .env_overlay", len(secrets_to_persist)) # Inject into current process environment for k, v in secrets_to_persist.items(): os.environ[k] = v # Invalidate cached instance config so next read picks up changes. # Use the public helper (matches `/api/admin/server-config`); reaching # into the private global silently breaks if the cache layout changes. from app.instance_config import reset_cache reset_cache() return { "status": "ok", "data_source": request.data_source, "connection": "verified" if request.data_source != "local" else "local", } def _discover_and_register_tables(conn: duckdb.DuckDBPyConnection, user_email: str) -> dict: """Discover tables from configured source and register them. Shared logic for API and sync.""" from app.instance_config import get_data_source_type, get_value source_type = get_data_source_type() if source_type != "keboola": return {"registered": 0, "skipped": 0, "errors": 0, "tables": [], "source": source_type} from connectors.keboola.client import KeboolaClient # Read from data_source.keboola (matches what /api/admin/configure writes) url = get_value("data_source", "keboola", "stack_url", default="") token_env = get_value("data_source", "keboola", "token_env", default="KEBOOLA_STORAGE_TOKEN") token = os.environ.get(token_env, "") if token_env else "" if not token: token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "") client = KeboolaClient(token=token, url=url) discovered = client.discover_all_tables() repo = TableRegistryRepository(conn) registered = 0 skipped = 0 errors = 0 table_names = [] for table in discovered: table_id = table.get("id", "").strip().lower().replace(".", "_").replace(" ", "_") if not table_id: errors += 1 continue if repo.get(table_id): skipped += 1 continue try: # Parse bucket from table ID (format: in.c-bucket.table_name) parts = table.get("id", "").split(".") bucket = parts[1] if len(parts) > 1 else "" source_table = parts[2] if len(parts) > 2 else table.get("name", "") repo.register( id=table_id, name=table.get("name", table_id), source_type="keboola", bucket=bucket, source_table=source_table, query_mode="local", registered_by=user_email, description=f"Auto-discovered from Keboola: {table.get('id', '')}", ) registered += 1 table_names.append(table_id) except Exception as e: logger.warning("Failed to register %s: %s", table_id, e) errors += 1 return { "registered": registered, "skipped": skipped, "errors": errors, "tables": table_names, "source": "keboola", } @router.post("/discover-and-register") async def discover_and_register( user: dict = Depends(require_admin), conn: duckdb.DuckDBPyConnection = Depends(_get_db), ): """Discover tables from configured source and auto-register them. Combines discover-tables + register-table into one call. Skips already-registered tables. Used by /setup wizard and AI agents. """ try: result = _discover_and_register_tables(conn, user.get("email", "admin")) return result except Exception as e: raise HTTPException(status_code=500, detail=f"Discovery and registration failed: {e}")