* fix: cutover regressions + parallel Keboola legacy fallback
Bundled fixes from a fresh-deploy run on a Keboola Storage backend with
the block-shared-snowflake-access feature flag — DuckDB Keboola
extension's per-table scan can't access bucket schemas, so the legacy
kbcstorage Storage-API client is the only working path.
CUTOVER REGRESSIONS
- agnes pull hash mismatch on every Keboola local-mode table —
src/orchestrator.py:_update_sync_state stored md5(mtime+size)[:12]
while the CLI compares against full 32-char content MD5. Now stores
the same content MD5 the materialized SQL path already used.
- Trailing-slash sanitization in connectors/keboola/access.py and
extractor.py — DuckDB Keboola extension's ATTACH fails when the URL
ends in / (canonical form).
- src/profiler.py:TableInfo.description becomes optional — two call
sites instantiated without it, crashing the profiler pass.
- scripts/ops/agnes-auto-upgrade.sh: chown on UID change — older images
ran as root, current runs as agnes (uid 999). Reads target uid:gid
from /etc/passwd inside the new image and chowns ${STATE_DIR},
/data/extracts, /data/analytics when the digest moves.
- POST /api/sync/trigger is now singleton per process — two
near-simultaneous trigger calls each forked an extractor subprocess,
fought for extract.duckdb's file lock, starved uvicorn, flipped the
container to unhealthy. Trigger now returns 409
(sync_already_in_progress) when held; _run_sync acquires non-blocking.
PARALLEL LEGACY FALLBACK
- Process pool fan-out for the _extract_via_legacy queue (default 8
workers, override via AGNES_KEBOOLA_PARALLELISM). Process pool, not
thread pool, because connectors/keboola/client.py:export_table does
os.chdir(temp_dir) — process-global, so threads raced and slice files
landed in the wrong directory ("[Errno 2] No such file or directory:
'<job_id>.csv_X_Y_Z.csv'").
- Extractor subprocess timeout 1800s -> 3600s (configurable via
AGNES_EXTRACTOR_TIMEOUT_SEC). 28+ tables × multi-minute Keboola export
jobs need the headroom on telemetry-class projects.
- Process group cleanup on timeout — Popen(start_new_session=True) puts
the extractor in its own group. On timeout the parent SIGTERMs the
group (10s grace) then SIGKILLs stragglers. Without this, the pool
workers were reparented to PID 1 and continued holding open Keboola
Storage export jobs. Inline extractor script also installs a SIGTERM
-> sys.exit(143) handler so the with ProcessPoolExecutor(...) block
__exit__ runs cleanly.
Tests: existing tests that patched subprocess.run updated to patch
subprocess.Popen with a _FakePopen stand-in (same exit-code-injection
contract). Two tests that exercised the parallel path forced
AGNES_KEBOOLA_PARALLELISM=1 to keep mocks alive (mocks don't ride into
ProcessPoolExecutor subprocesses).
Squashed onto current main (was 7 commits + multi-commit CHANGELOG +
agnes-auto-upgrade.sh conflicts; squash avoids per-commit conflict
resolution against main's flat-mount STATE_DIR refactor and 0.38.0
release cut).
* feat(keboola): Storage API direct extract path; drop extension data path
The DuckDB Keboola extension's COPY routes through Keboola QueryService,
which is unreliable on linked-bucket projects (extension v0.1.6 fixes
that case but isn't yet in the community CDN, and pre-fix any project
with the block-shared-snowflake-access feature flag couldn't see bucket
schemas at all). Move the extract path off the extension entirely and
talk to the Storage API directly via signed-URL download — works on any
project, regardless of extension state.
connectors/keboola/storage_api.py (NEW)
Lightweight client built on requests.Session. Three endpoints:
- POST /v2/storage/tables/{id}/export-async (kicks off job)
- GET /v2/storage/jobs/{id} (poll until done)
- GET /v2/storage/files/{id}?federationToken=1 (signed URL detail)
- GET <signed_url> (download bytes)
Supports sliced exports (manifest + per-slice signed URLs) and gzipped
payloads. ExportFilter dataclass mirrors the Keboola filter spec
(whereFilters / columns / changedSince / limit) and handles JSON
round-trip with the registry's source_query column. Token redaction
in error messages. Bounded exponential backoff on job polling.
No cloud-SDK dependency on the data path; thread-safe.
connectors/keboola/extractor.py
- materialize_query() rewritten: takes bucket/source_table/source_query
(JSON filter spec), exports via KeboolaStorageClient, converts CSV
to parquet via DuckDB, atomic os.replace. Same return shape so
sync.py downstream code stays uniform with the BQ branch.
- _extract_via_legacy() also moved to Storage API direct (kept the
name for caller compatibility with _legacy_worker / the parallel
batch extractor). Per-call temp directories — no os.chdir, threads
don't race.
app/api/sync.py
_run_materialized_pass for source_type='keboola' rows now constructs a
KeboolaStorageClient (replaces KeboolaAccess) and passes
bucket/source_table/source_query to materialize_query. Reuses one
client across rows for HTTP keep-alive. Sources keboola URL from env
too (KEBOOLA_STACK_URL) when instance.yaml doesn't have stack_url
configured.
cli/commands/admin.py
discover-and-register defaults Keboola rows to query_mode='materialized'
(NULL source_query = full table), matching the v26 migration's
unification of the local/materialized split for Keboola. BigQuery and
Jira keep their per-source defaults.
src/db.py
Schema bump 25 → 26. Migration: UPDATE table_registry SET
query_mode='materialized' WHERE source_type='keboola' AND
query_mode='local'. NULL source_query on those rows means "full table
export" — same effective behavior the local mode provided, but now
via Storage API instead of the extension.
pyproject.toml
kbcstorage dep stays (admin-side bucket/table list still uses the
SDK in app/api/admin.py / connectors/keboola/client.py); only the
data path is migrated off the SDK. Comment updated to reflect the
new boundary.
tests
- test_keboola_storage_api.py (NEW, 19 tests): ExportFilter parsing,
HTTP client (token redaction, retry logic, polling), download_file
(single, gzipped, sliced), end-to-end export_table_to_csv.
- test_keboola_materialize.py rewritten: mocks KeboolaStorageClient
instead of FakeAccess; same atomic-write + zero-rows + unsafe-id
contracts.
- test_sync_trigger_keboola_materialized.py: registry rows now carry
bucket+source_table+JSON-shape source_query.
114+ Keboola-impacted tests green locally.
* test: schema version assertion bumped to 26 alongside the keboola query_mode migration
* fix(keboola): cutover hot-patches surfaced on agnes-dev
Five small fixes that were applied as in-container hot-patches during
agnes-dev cutover and need to be on the source-of-truth image so a fresh
upgrade does not undo them.
- app/api/sync.py: auto-discover gate considers the WHOLE registry (any
source, any mode), not just rows where source matches and query_mode
is local. After the v25→v26 keboola materialized migration an
instance can have 30 materialized rows and zero local rows; the
previous gate kept re-firing _discover_and_register_tables every
scheduler tick, creating duplicate auto-discovered rows with the
wrong bucket prefix every time.
- app/api/admin.py: _discover_and_register_tables reassembles the
bucket as <stage>.<bucket-id> (e.g. in.c-finance) instead of
dropping the stage prefix; default query_mode for keboola is now
materialized (the v26 contract); validator allows NULL source_query
for keboola materialized rows (full-table export via Storage API
export-async, no SQL needed).
- cli/commands/admin.py: register-table mirrors the server validator
(NULL source_query allowed for source_type=keboola); --bucket help
text generalized to cover both BQ dataset and Keboola bucket id.
- connectors/keboola/extractor.py: max_line_size=64 MiB on
read_csv_auto so embedded JSON / SQL cells (kbc_component_configuration
in particular) do not trip the default 2 MiB ceiling.
- connectors/keboola/storage_api.py: GCP backend support — when the
Storage API returns a manifest whose slice URLs are gs://
references with a gcsCredentials block, rewrite to the JSON REST
download endpoint and authenticate with the issued OAuth bearer
token; redact tokens in any surfaced error string.
* test: align with new keboola materialized + auto-discover-gate contracts
- test_admin_keboola_materialized: rename
test_register_keboola_materialized_rejects_missing_source_query →
test_register_keboola_materialized_accepts_missing_source_query.
v25→v26 introduced 'keboola materialized with NULL source_query
means full-table export via Storage API export-async' as the
default registration shape; the rejection case is no longer the
contract.
- test_sync_filter: add list_all() to _StubRegistry. The auto-discover
gate in _run_sync now keys off the WHOLE registry (not just local
rows) so materialized-only Keboola instances do not re-trigger
discovery on every tick.
* feat(keboola): native parquet export — skip CSV roundtrip
Storage API export-async accepts fileType={csv,parquet}. Switching the
materialized sync to parquet eliminates the CSV → DuckDB COPY → parquet
roundtrip that pinned a single uvicorn worker over 4 GiB on multi-GB
tables (read_csv with all_varchar + max_line_size=64MB has to
materialize the whole CSV in memory before COPY can stream out a
parquet). Snowflake UNLOAD on Keboola's side already produces typed,
self-contained parquet files; the extractor downloads them and renames
into place.
Two cases:
- **Single-file** export (small table): file_info.url points at one
signed URL; download_file streams chunks straight to .parquet.tmp
and we're done. No DuckDB.
- **Sliced** export (Snowflake UNLOAD respects MAX_FILE_SIZE — 16 MiB
default — so anything larger arrives as N parquet slices): each
slice is a complete parquet file with its own footer; naive concat
would corrupt them. download_file_slices keeps the slices as
separate files in a tempdir, then DuckDB COPY (SELECT * FROM
read_parquet([slice0, slice1, ...])) merges them into one
consolidated parquet. DuckDB streams row groups during this — peak
memory bounded to one row group (~1 MiB) regardless of source size.
The legacy CSV path stays as the explicit opt-in via source_query=
'{"file_type":"csv"}' for projects whose backend can't UNLOAD
parquet (none known today; cheap escape hatch). Backward-compat alias
KeboolaStorageClient.export_table_to_csv kept.
Also fixes a latent bug in download_file's gzip detection: previous
heuristic flagged any unencrypted file as gzipped, which would have
corrupted parquet downloads at gunzip time. Name-suffix-only now.
* fix: tempdir leak cleanup, every 0m schedule, /sync/trigger body shapes
Three small self-contained fixes uncovered during agnes-dev cutover.
- connectors/keboola/extractor.py: tempfile.TemporaryDirectory now uses
ignore_cleanup_errors=True so a worker death mid-write doesn't leave
multi-GiB stale slice trees on the boot disk. (12 GiB seen after a
disk-full crash where TemporaryDirectory's own cleanup also raised
and got swallowed.)
- src/scheduler.py: is_valid_schedule accepts 'every 0m' (interval=0
= always due). Force-resync of an errored row no longer requires
waiting out the default 'every 1h' interval — admin can flip the
schedule, trigger, then flip back.
- app/api/sync.py: POST /api/sync/trigger accepts both ['table_id']
(legacy bare-array body) and {'tables': ['table_id']} (matches the
response payload shape, more discoverable for clients building
requests by hand). Malformed bodies return 422 with a structured
detail; null/missing means 'sync everything' as before.
Tests cover: tempdir cleanup on raise (sliced parquet path),
is_valid_schedule + is_table_due 'every 0m' acceptance, and trigger
body parametrized matrix (8 valid shapes + 6 rejection cases).
* fix: targeted-trigger filter in materialized pass + auto-upgrade defer
Two operational gaps observed during agnes-dev cutover, in the same
sync-routing area.
- _run_materialized_pass now takes a 'tables' arg and skips rows not in
the target set with reason='not_in_target'. POST /api/sync/trigger
with a body of tables previously only scoped the legacy extractor
subprocess — the materialized pass kept iterating every due
materialized row, so an admin asking to re-sync kbc_job re-ran
every other due materialized row alongside it. Match on registry id
OR name (admins commonly pass either form). tables=None preserves
the no-filter behavior.
- New GET /api/sync/status (public, no auth) returns {locked: bool}
off _sync_lock.locked(). agnes-auto-upgrade.sh probes this before
docker compose up -d and exits 0 with a 'deferred recreate' log
line if a sync is in flight — the next 5-min cron tick retries.
Pre-fix, an auto-upgrade triggered mid-sync would recreate the
uvicorn worker and kill the in-flight extractor / Snowflake-UNLOAD
download (observed when kbc_job's first 7-day retry got SIGKILLed).
Connection failures in the probe fall through to the upgrade —
being stuck on a wedged image is worse than interrupting a
hypothetical sync.
* fix: auto-discover protects admin overrides + surfaces drift
Two real-world incidents on agnes-dev drove this:
1. kbc_job was registered manually with the correct
(in.c-kbc_telemetry, kbc_job) coordinates. A naive auto-discover
re-run would have inserted a SECOND kbc_job row at the slugified
id 'in_c-keboola-storage_kbc_job' (where Keboola's discovery
places it) — and that row's Storage API export-async 404s.
2. An earlier auto-discover bug stripped the stage prefix from
bucket ids ('c-finance' instead of 'in.c-finance'), inserting
137 rows whose syncs all failed.
Fix:
- _discover_and_register_tables now builds a plan first
(_build_keboola_discovery_plan) classifying each discovered table
into one of new / existing_match / existing_drift / invalid, then
executes only the 'new' bucket. Drift rows are reported with both
sides of the disagreement plus drift_kind:
- same_id_diff_coords: registry has the same id but different
bucket / source_table (admin migrated coords inline).
- name_collision: discovery's slugified id differs from any
registry id, but the discovered .name matches an existing row's
.name (case-insensitive). Catches the kbc_job case.
- Bucket detection now prefers the API's authoritative bucket_id
field (separate field on the Keboola tables.list response,
normalised by KeboolaClient.discover_all_tables). Falls back to
id-string parsing only when bucket_id is missing (older fallback
path inside discover_all_tables).
- Endpoint POST /api/admin/discover-and-register?dry_run=true
returns the plan without writing — would_register, drift,
invalid lists. Lets an operator audit before merging discovery
with a registry that has admin overrides.
Removed 'every 0m' from test_register_request_rejects_malformed_sync_schedule
— the runtime started accepting it in the previous commit (force-resync
override) and the validator follows suit.
* feat(keboola): AGNES_TEMP_DIR routes tempfiles off overlayfs /tmp
The container's /tmp lives on the boot disk's overlayfs (29 GiB on
agnes-dev, shared with /var). Snowflake UNLOAD of a wide table writes
slices into per-call /tmp tempdirs that fill multi-GiB / many-slice
exports long before the dedicated data disk fills. agnes-dev hit
100% boot-disk while the 20 GiB data disk had 15 GiB free.
connectors.keboola.storage_api.get_temp_root() reads AGNES_TEMP_DIR;
mkdirs the target on first use; unset / empty / unwritable falls
back to None (system tempdir, OSS-pre-fix behaviour). Both
materialize_query (parquet path) and _extract_via_legacy (CSV
fallback) and the sliced-CSV concat path in storage_api use the
helper now.
docker-compose.yml defaults AGNES_TEMP_DIR=/data/tmp on app, scheduler,
and extract services. The data volume is the dedicated disk in
production layouts and a plain docker volume in single-disk
dev/laptop setups — same blast radius as the previous /tmp default
on the latter, no regression.
917 lines
41 KiB
Python
917 lines
41 KiB
Python
"""Sync endpoints — manifest, trigger, sync-settings, table-subscriptions."""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional, List
|
|
|
|
from fastapi import APIRouter, Body, Depends, HTTPException, BackgroundTasks
|
|
from pydantic import BaseModel
|
|
import duckdb
|
|
|
|
from app.auth.access import require_admin
|
|
from app.auth.dependencies import get_current_user, _get_db
|
|
from app.utils import get_data_dir as _get_data_dir
|
|
from src.repositories.sync_state import SyncStateRepository
|
|
from src.repositories.sync_settings import SyncSettingsRepository
|
|
from src.repositories.table_registry import TableRegistryRepository
|
|
from src.rbac import can_access_table
|
|
from src.scheduler import filter_due_tables, is_table_due
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/sync", tags=["sync"])
|
|
|
|
# Process-wide guard against overlapping `_run_sync` invocations. Two
|
|
# concurrent extractor subprocesses both write `extract.duckdb` and fight
|
|
# for its file lock — the first sync stalls, the second crashes, and the
|
|
# `/api/health` check times out long enough that Docker flips the
|
|
# container to `unhealthy`, which (behind a `reverse_proxy` upstream)
|
|
# bricks external traffic until contention drains. The singleton-ness is
|
|
# enforced both in the trigger handler (return 409 fast, before the work
|
|
# is scheduled) and in `_run_sync` itself (defense in depth, in case
|
|
# something bypasses the handler).
|
|
_sync_lock = threading.Lock()
|
|
|
|
|
|
def _file_hash(path: Path) -> str:
|
|
if not path.exists():
|
|
return ""
|
|
h = hashlib.md5()
|
|
with open(path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def _materialize_table(
|
|
*,
|
|
table_id: str,
|
|
sql: str,
|
|
bq,
|
|
output_dir: str,
|
|
max_bytes: Optional[int],
|
|
) -> dict:
|
|
"""Thin wrapper around `connectors.bigquery.extractor.materialize_query`
|
|
so the trigger pass can be unit-tested by patching this seam without
|
|
touching the real BqAccess factory or the duckdb import."""
|
|
from connectors.bigquery.extractor import materialize_query
|
|
return materialize_query(
|
|
table_id=table_id, sql=sql, bq=bq,
|
|
output_dir=output_dir, max_bytes=max_bytes,
|
|
)
|
|
|
|
|
|
def _run_materialized_pass(
|
|
conn: duckdb.DuckDBPyConnection,
|
|
bq,
|
|
tables: Optional[List[str]] = None,
|
|
) -> dict:
|
|
"""Walk `table_registry` for `query_mode='materialized'` rows and run any
|
|
that are due, dispatching by ``source_type`` to the correct connector's
|
|
materialize_query. Honors per-table `sync_schedule` via `is_table_due()`,
|
|
computes the file hash inline, and updates `sync_state` so the manifest
|
|
can serve the row to `agnes pull` without re-hashing on every request.
|
|
|
|
``tables`` (when not None) restricts the pass to a specific subset —
|
|
targeted re-syncs from the operator (POST /api/sync/trigger with a
|
|
body) need this, otherwise an admin asking to re-sync `kbc_job` would
|
|
re-process every other materialized row that's also due. Matched
|
|
against both the registry id and name (admins often pass either).
|
|
|
|
BigQuery rows go through BqAccess + bigquery_query() (jobs API),
|
|
optionally cost-guarded by ``max_bytes_per_materialize``.
|
|
Keboola rows go through KeboolaAccess + ATTACH-and-COPY, no
|
|
guardrail (extension has no dry-run primitive).
|
|
|
|
Returns:
|
|
``{"materialized": [ids], "skipped": [ids], "errors": [{table, error}]}``
|
|
|
|
Errors are aggregated per row — one budget-blown table doesn't stop a
|
|
healthy sibling. ``MaterializeBudgetError`` is caught and rendered with
|
|
its structured fields so operator alerting can pick out the cap-vs-actual
|
|
bytes from the log line.
|
|
"""
|
|
from app.instance_config import get_value
|
|
from connectors.bigquery.extractor import MaterializeBudgetError, MaterializeInFlightError
|
|
|
|
bq_output_dir = str(Path(_get_data_dir()) / "extracts" / "bigquery")
|
|
kb_output_dir = Path(_get_data_dir()) / "extracts" / "keboola" / "data"
|
|
|
|
# Sentinel: max_bytes <= 0 (or None) disables the guardrail. `get_value()`
|
|
# treats YAML `null` as "missing" → returns the default; operators must use
|
|
# the explicit `0` sentinel to disable. See config/instance.yaml.example.
|
|
# YAML accepts floats too (e.g. `10737418240.0`), and operators may
|
|
# write `1e10` for readability; coerce to int and tolerate non-numeric
|
|
# entries by falling through to the disable path with a warning.
|
|
raw_max = get_value(
|
|
"data_source", "bigquery", "max_bytes_per_materialize",
|
|
default=10 * 2**30,
|
|
)
|
|
try:
|
|
n = int(raw_max) if raw_max is not None else 0
|
|
except (TypeError, ValueError):
|
|
logger.warning(
|
|
"data_source.bigquery.max_bytes_per_materialize is not numeric "
|
|
"(%r); cost guardrail disabled. Set an integer or 0 to disable.",
|
|
raw_max,
|
|
)
|
|
n = 0
|
|
bq_max_bytes = n if n > 0 else None
|
|
|
|
registry = TableRegistryRepository(conn)
|
|
state = SyncStateRepository(conn)
|
|
|
|
summary = {"materialized": [], "skipped": [], "errors": []}
|
|
keboola_access = None # lazy-init on first Keboola row
|
|
|
|
# Targeted-trigger filter. Compare against both id and name so an admin
|
|
# who passes either form (the registry id slug, or the human-friendly
|
|
# name) gets the same result. `None` means "no filter — process all
|
|
# due materialized rows".
|
|
target_set: Optional[set] = (
|
|
set(tables) if tables is not None else None
|
|
)
|
|
|
|
for row in registry.list_all():
|
|
if row.get("query_mode") != "materialized":
|
|
continue
|
|
|
|
# Convention across connectors: sync_state.table_id and the parquet
|
|
# filename are keyed by `table_registry.name` (matches Keboola's
|
|
# `_meta.table_name`) so the manifest's `registry_by_name` lookup
|
|
# at `_build_manifest_for_user` resolves cleanly. Without this,
|
|
# admins who register `name="Orders_90d"` (id slugified to
|
|
# `orders_90d`) would see `query_mode` default to `"local"` in the
|
|
# manifest because the lookup misses on `id`.
|
|
ref_name = row["name"]
|
|
|
|
if target_set is not None and not (
|
|
ref_name in target_set or row.get("id") in target_set
|
|
):
|
|
summary["skipped"].append(
|
|
{"table": ref_name, "reason": "not_in_target"}
|
|
)
|
|
continue
|
|
|
|
last = state.get_last_sync(ref_name)
|
|
last_iso = last.isoformat() if last else None
|
|
schedule = row.get("sync_schedule") or "every 1h"
|
|
if not is_table_due(schedule, last_iso):
|
|
summary["skipped"].append({"table": ref_name, "reason": "due_check"})
|
|
continue
|
|
|
|
source_type = row.get("source_type") or "bigquery" # legacy default
|
|
|
|
# Dispatch by source_type. BQ rows keep using `_materialize_table`
|
|
# (the existing test seam); Keboola rows use the new Keboola
|
|
# materialize_query via a lazily-initialized KeboolaAccess.
|
|
try:
|
|
if source_type == "bigquery":
|
|
stats = _materialize_table(
|
|
table_id=ref_name,
|
|
sql=row["source_query"],
|
|
bq=bq,
|
|
output_dir=bq_output_dir,
|
|
max_bytes=bq_max_bytes,
|
|
)
|
|
elif source_type == "keboola":
|
|
if keboola_access is None:
|
|
# Lazy-init the Storage API client (replaces the old
|
|
# DuckDB extension `KeboolaAccess`). One client is shared
|
|
# across all keboola materialized rows in this pass —
|
|
# `requests.Session` inside it is thread-safe and reuses
|
|
# the connection pool for HTTP keep-alive across rows.
|
|
# Variable name kept as `keboola_access` to minimise
|
|
# diff churn against the surrounding error-handling
|
|
# block; the type is now `KeboolaStorageClient`.
|
|
from connectors.keboola.storage_api import KeboolaStorageClient
|
|
keboola_url = get_value(
|
|
"data_source", "keboola", "stack_url", default=""
|
|
) or os.environ.get("KEBOOLA_STACK_URL", "")
|
|
token_env = get_value(
|
|
"data_source", "keboola", "token_env",
|
|
default="KEBOOLA_STORAGE_TOKEN",
|
|
) or "KEBOOLA_STORAGE_TOKEN"
|
|
keboola_token = os.environ.get(token_env, "")
|
|
if not (keboola_url and keboola_token):
|
|
summary["errors"].append({
|
|
"table": ref_name,
|
|
"error": (
|
|
"Keboola URL/token not configured for "
|
|
"materialized path (data_source.keboola.stack_url "
|
|
f"+ env {token_env})"
|
|
),
|
|
})
|
|
continue
|
|
keboola_access = KeboolaStorageClient(
|
|
url=keboola_url, token=keboola_token,
|
|
)
|
|
kb_output_dir.mkdir(parents=True, exist_ok=True)
|
|
from connectors.keboola.extractor import (
|
|
materialize_query as kb_materialize_query,
|
|
)
|
|
# Storage API needs the bucket+table split — registry rows
|
|
# carry both fields per the standard register-table schema.
|
|
bucket = row.get("bucket", "")
|
|
source_table = row.get("source_table") or ref_name
|
|
if not bucket:
|
|
summary["errors"].append({
|
|
"table": ref_name,
|
|
"error": (
|
|
"materialized keboola row is missing 'bucket'; "
|
|
"re-register with --bucket <in.c-...>"
|
|
),
|
|
})
|
|
continue
|
|
kb_stats = kb_materialize_query(
|
|
table_id=ref_name,
|
|
bucket=bucket,
|
|
source_table=source_table,
|
|
source_query=row.get("source_query"),
|
|
storage_client=keboola_access,
|
|
output_dir=kb_output_dir,
|
|
)
|
|
# Normalize Keboola materialize_query output to the shape the
|
|
# BQ branch uses for downstream sync_state updates. KB returns
|
|
# {table_id, path, rows, bytes, md5}; map to
|
|
# {rows, size_bytes, hash}.
|
|
stats = {
|
|
"rows": kb_stats["rows"],
|
|
"size_bytes": kb_stats["bytes"],
|
|
"hash": kb_stats["md5"],
|
|
"query_mode": "materialized",
|
|
}
|
|
else:
|
|
summary["errors"].append({
|
|
"table": ref_name,
|
|
"error": (
|
|
f"materialized path not supported for "
|
|
f"source_type={source_type!r}"
|
|
),
|
|
})
|
|
continue
|
|
except MaterializeInFlightError:
|
|
# In-flight on a sibling worker / scheduler tick — treat as
|
|
# 'skipped, in-flight'. Do NOT call state.set_error: that
|
|
# would flip status='error' on a healthy concurrent run and
|
|
# the registry UI would surface a false-positive failure.
|
|
summary["skipped"].append({"table": ref_name, "reason": "in_flight"})
|
|
continue
|
|
except MaterializeBudgetError as e:
|
|
logger.warning(
|
|
"Materialize cap exceeded for %s: %s bytes > %s bytes",
|
|
e.table_id, f"{e.current:,}", f"{e.limit:,}",
|
|
)
|
|
summary["errors"].append({
|
|
"table": ref_name,
|
|
"error": str(e),
|
|
"current": e.current,
|
|
"limit": e.limit,
|
|
})
|
|
# Persist the failure so `GET /api/admin/registry` can surface
|
|
# `last_sync_error` to the admin UI / `agnes admin status`.
|
|
# Without this, scheduler stderr was the only place the cap
|
|
# failure showed up and operators had no API path to it.
|
|
state.set_error(ref_name, str(e))
|
|
continue
|
|
except Exception as e:
|
|
logger.exception("Materialize failed for %s", ref_name)
|
|
summary["errors"].append({"table": ref_name, "error": str(e)})
|
|
state.set_error(ref_name, str(e))
|
|
continue
|
|
|
|
# `materialize_query` returns the parquet's MD5 inline — hashing
|
|
# there means we don't re-read a multi-GB file on the request
|
|
# thread. Fallback to `_file_hash(parquet_path)` if for some
|
|
# reason the stats dict didn't carry it (defensive).
|
|
parquet_hash = stats.get("hash")
|
|
if not parquet_hash:
|
|
output_dir_for_hash = (
|
|
bq_output_dir if source_type == "bigquery" else str(kb_output_dir.parent)
|
|
)
|
|
parquet_path = Path(output_dir_for_hash) / "data" / f"{ref_name}.parquet"
|
|
parquet_hash = _file_hash(parquet_path)
|
|
# `update_sync` resets `status='ok'` / `error=NULL` on the upsert
|
|
# path (its argument defaults), so a row that previously errored
|
|
# has the failure cleared by this call. No separate clear_error
|
|
# needed here — the test invariant is that a successful materialize
|
|
# leaves status='ok' and error='', which `update_sync` already
|
|
# establishes.
|
|
state.update_sync(
|
|
table_id=ref_name,
|
|
rows=stats["rows"],
|
|
file_size_bytes=stats["size_bytes"],
|
|
hash=parquet_hash,
|
|
)
|
|
summary["materialized"].append(ref_name)
|
|
|
|
return summary
|
|
|
|
|
|
def _run_sync(tables: Optional[List[str]] = None):
|
|
"""Run extractor as subprocess + orchestrator rebuild.
|
|
|
|
Reads table configs from DuckDB (in main process which has the shared
|
|
connection), passes them as JSON via stdin to the extractor subprocess.
|
|
This avoids DuckDB lock conflicts — subprocess never opens system.duckdb.
|
|
|
|
Singleton: only one invocation runs at a time per process (see
|
|
`_sync_lock` module-level). The trigger handler also fast-fails with
|
|
409 when the lock is held, so this branch is defense in depth.
|
|
"""
|
|
import json as _json
|
|
import sys as _sys
|
|
|
|
if not _sync_lock.acquire(blocking=False):
|
|
print(
|
|
"[SYNC] another sync is already in flight — skipping",
|
|
file=_sys.stderr, flush=True,
|
|
)
|
|
return
|
|
|
|
try:
|
|
from app.instance_config import get_data_source_type, get_value
|
|
from src.db import get_system_db
|
|
|
|
source_type = get_data_source_type()
|
|
data_dir = _get_data_dir()
|
|
|
|
# Read table configs in main process (has shared DuckDB connection)
|
|
sys_conn = get_system_db()
|
|
# Track whether the REGISTRY (not the post-filter list) was empty.
|
|
# Auto-discovery must only fire on a truly empty registry; if the
|
|
# filter returned [] because nothing was due, re-discovering would
|
|
# bypass the schedule entirely on Keboola instances. (Devin BUG_0001
|
|
# on ebb8cc9.)
|
|
registry_has_tables = False
|
|
try:
|
|
repo = TableRegistryRepository(sys_conn)
|
|
if tables:
|
|
# Manual operator override — bypass schedule filter entirely
|
|
# so an admin saying "sync these specific tables now" wins.
|
|
all_configs = [repo.get(t) for t in tables]
|
|
table_configs = [c for c in all_configs if c is not None]
|
|
registry_has_tables = bool(table_configs)
|
|
else:
|
|
table_configs = repo.list_local(source_type) if source_type else repo.list_local()
|
|
# Auto-discover gate must consider the WHOLE registry, not
|
|
# just `local` rows. After the Keboola migration to
|
|
# materialized (v25→v26), an instance can have 30
|
|
# materialized Keboola rows and zero local rows — but
|
|
# `bool(table_configs)` here would be False, and
|
|
# `not registry_has_tables` would re-trigger
|
|
# `_discover_and_register_tables` on every scheduler tick,
|
|
# creating duplicate "auto-discovered" rows with the wrong
|
|
# bucket prefix every time.
|
|
# Use list_all (any source, any mode) for the gate.
|
|
registry_has_tables = bool(repo.list_all())
|
|
# Without this filter, every scheduler tick would re-sync
|
|
# every table regardless of its sync_schedule cadence,
|
|
# making the field a no-op at trigger time. Tables with
|
|
# no schedule pass through unchanged (opt-in feature).
|
|
state_repo = SyncStateRepository(sys_conn)
|
|
table_configs = filter_due_tables(table_configs, state_repo)
|
|
finally:
|
|
sys_conn.close()
|
|
|
|
if not table_configs:
|
|
# Auto-discover tables on first sync when registry is empty.
|
|
# `not registry_has_tables` is the load-bearing guard — without
|
|
# it, "filter excluded everything" looks identical to "registry
|
|
# empty" and we'd re-discover + re-sync every tick regardless of
|
|
# sync_schedule.
|
|
if not registry_has_tables and source_type == "keboola" and os.environ.get("KEBOOLA_STORAGE_TOKEN"):
|
|
logger.info("No tables registered — running auto-discovery from Keboola")
|
|
try:
|
|
from app.api.admin import _discover_and_register_tables
|
|
auto_conn = get_system_db()
|
|
try:
|
|
result = _discover_and_register_tables(auto_conn, "auto-discovery")
|
|
logger.info("Auto-discovered %d tables, skipped %d", result["registered"], result["skipped"])
|
|
finally:
|
|
auto_conn.close()
|
|
# Re-read table configs after auto-registration
|
|
sys_conn2 = get_system_db()
|
|
try:
|
|
table_configs = TableRegistryRepository(sys_conn2).list_local(source_type)
|
|
finally:
|
|
sys_conn2.close()
|
|
except Exception as e:
|
|
logger.warning("Auto-discovery failed: %s", e)
|
|
|
|
# CRITICAL: don't early-return when local-mode tables are empty.
|
|
# `list_local("bigquery")` is always empty on BQ-only deployments
|
|
# (BQ rows are always remote or materialized, never local), so an
|
|
# early return would prevent the materialized pass AND the
|
|
# orchestrator rebuild from ever firing on a BQ-only instance.
|
|
# Devin BUG_0002 on PR #148 commit 2fa44f2. Just flag whether the
|
|
# Keboola subprocess + custom-connectors should run; everything
|
|
# below (materialized pass, orchestrator rebuild, profiler) runs
|
|
# unconditionally so a registry with materialized rows but no
|
|
# local rows still publishes them.
|
|
run_extractor_subprocess = bool(table_configs)
|
|
if not run_extractor_subprocess:
|
|
logger.info(
|
|
"No local-mode tables to sync for source_type=%s — "
|
|
"skipping extractor subprocess; materialized pass + "
|
|
"orchestrator rebuild still run.",
|
|
source_type,
|
|
)
|
|
|
|
env = {**os.environ}
|
|
|
|
if run_extractor_subprocess:
|
|
# Serialize configs — strip non-serializable fields
|
|
serializable = []
|
|
for tc in table_configs:
|
|
serializable.append({k: (v.isoformat() if hasattr(v, 'isoformat') else v)
|
|
for k, v in tc.items() if v is not None})
|
|
|
|
# Run extractor subprocess with table configs via stdin
|
|
# Subprocess does NOT open system.duckdb — no lock conflict
|
|
cmd = [_sys.executable, "-c", """
|
|
import json, sys, os, logging, signal
|
|
from pathlib import Path
|
|
|
|
# Subprocess inherits no logging config — without basicConfig, Python's
|
|
# lastResort handler only surfaces WARNING+ to stderr and INFO-level
|
|
# extraction progress from connectors.keboola.extractor.run() is silently
|
|
# dropped. capture_output=True in the parent then swallows the rest.
|
|
# Devin BUG_0002 on PR #136 review.
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
|
|
|
# Convert SIGTERM into a controlled SystemExit so the ProcessPoolExecutor
|
|
# `with` block in connectors.keboola.extractor.run() runs its __exit__
|
|
# (shutdown/wait_for_workers) before this process dies. Without this,
|
|
# SIGTERM kills the parent abruptly, leaving the OS to clean up the pool
|
|
# children — but each worker holds an open Keboola Storage export job
|
|
# whose lifetime is tied to the HTTP poll loop, and those leak until the
|
|
# Keboola side TTLs them out. The parent extractor calls this from
|
|
# app.api.sync._run_sync after `subprocess.Popen(start_new_session=True)`
|
|
# + `os.killpg(SIGTERM)` on timeout.
|
|
def _exit_on_sigterm(signum, frame):
|
|
sys.exit(143)
|
|
signal.signal(signal.SIGTERM, _exit_on_sigterm)
|
|
|
|
configs = json.load(sys.stdin)
|
|
url = os.environ.get("KEBOOLA_STACK_URL", "")
|
|
token = os.environ.get("KEBOOLA_STORAGE_TOKEN", "")
|
|
|
|
if not url or not token:
|
|
print("ERROR: Missing KEBOOLA_STACK_URL or KEBOOLA_STORAGE_TOKEN", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
from connectors.keboola.extractor import run, compute_exit_code
|
|
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
|
|
result = run(str(data_dir / "extracts" / "keboola"), configs, url, token)
|
|
print(json.dumps(result))
|
|
# Issue #81 Group B: surface partial-failure as exit 2 so the API
|
|
# caller can distinguish "every table failed" from "9/10 succeeded".
|
|
sys.exit(compute_exit_code(result, len(configs)))
|
|
"""]
|
|
|
|
print(f"[SYNC] Starting extractor subprocess for {len(table_configs)} tables", file=_sys.stderr, flush=True)
|
|
|
|
# Run in a new process group (start_new_session=True) so a
|
|
# timeout can take down the whole tree — the extractor itself
|
|
# plus any ProcessPoolExecutor workers it spawned for parallel
|
|
# legacy-fallback. Without this, plain `subprocess.run` on
|
|
# timeout SIGKILLs only the immediate child; the pool workers
|
|
# are reparented to PID 1 and continue holding open Keboola
|
|
# Storage export jobs, blocking the next sync cycle's
|
|
# connectivity to those same job IDs.
|
|
extractor_timeout = int(os.environ.get("AGNES_EXTRACTOR_TIMEOUT_SEC", "3600"))
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
text=True, env=env,
|
|
cwd=str(Path(__file__).parent.parent.parent),
|
|
start_new_session=True,
|
|
)
|
|
try:
|
|
stdout, stderr = proc.communicate(input=_json.dumps(serializable), timeout=extractor_timeout)
|
|
result = subprocess.CompletedProcess(cmd, proc.returncode, stdout, stderr)
|
|
except subprocess.TimeoutExpired:
|
|
# SIGTERM the whole process group first to give workers a
|
|
# chance to shut down cleanly (release Keboola export jobs,
|
|
# close DuckDB conns), then SIGKILL the stragglers after a
|
|
# short grace window.
|
|
import signal
|
|
try:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
except ProcessLookupError:
|
|
pass
|
|
try:
|
|
proc.communicate(timeout=10)
|
|
except subprocess.TimeoutExpired:
|
|
try:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
except ProcessLookupError:
|
|
pass
|
|
try:
|
|
proc.communicate(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
# Catch the timeout LOCALLY so the materialized BQ pass and
|
|
# orchestrator rebuild below still fire — pre-fix the timeout
|
|
# propagated to the outer except handler and skipped the rest
|
|
# of `_run_sync` (Devin BUG_0001 on PR #148 commit 2219255).
|
|
print(
|
|
f"[SYNC] Extractor timed out after {extractor_timeout}s — process "
|
|
"group killed; continuing to materialized pass + orchestrator rebuild",
|
|
file=_sys.stderr, flush=True,
|
|
)
|
|
result = None
|
|
|
|
if result is not None:
|
|
if result.stdout:
|
|
print(f"[SYNC] Extractor stdout: {result.stdout.strip()[-500:]}", file=_sys.stderr, flush=True)
|
|
if result.stderr:
|
|
print(f"[SYNC] Extractor stderr: {result.stderr[-500:]}", file=_sys.stderr, flush=True)
|
|
# Issue #81 Group B: three exit codes. 0 = full success,
|
|
# 1 = full failure, 2 = partial. Partial is a data-quality
|
|
# alert, not a crash — the orchestrator's per-table _meta
|
|
# machinery already captured which tables succeeded; we just
|
|
# need to log loudly so operator alerting can pick it up.
|
|
if result.returncode == 0:
|
|
print(f"[SYNC] Extractor OK", file=_sys.stderr, flush=True)
|
|
elif result.returncode == 2:
|
|
print(
|
|
f"[SYNC] Extractor PARTIAL FAILURE (exit 2) — some tables "
|
|
f"succeeded, some failed; see stderr for per-table errors. "
|
|
f"Successful tables will still be published by the orchestrator.",
|
|
file=_sys.stderr, flush=True,
|
|
)
|
|
else:
|
|
print(f"[SYNC] Extractor FAILED (exit {result.returncode})", file=_sys.stderr, flush=True)
|
|
|
|
# Run custom connectors (Tier A: local mount) — only when there
|
|
# were local-mode tables to drive the extractor. Custom connectors
|
|
# currently piggyback on the same env as the Keboola extractor.
|
|
connectors_dir = Path(os.environ.get("CONNECTORS_DIR", str(Path(__file__).parent.parent.parent / "connectors" / "custom")))
|
|
if connectors_dir.exists():
|
|
for connector_dir in sorted(connectors_dir.iterdir()):
|
|
if not connector_dir.is_dir():
|
|
continue
|
|
extractor = connector_dir / "extractor.py"
|
|
if not extractor.exists():
|
|
continue
|
|
logger.info("Running custom connector: %s", connector_dir.name)
|
|
try:
|
|
custom_result = subprocess.run(
|
|
[_sys.executable, str(extractor)],
|
|
env=env, capture_output=True, text=True, timeout=600,
|
|
cwd=str(Path(__file__).parent.parent.parent),
|
|
)
|
|
if custom_result.returncode != 0:
|
|
logger.error("Custom connector %s failed: %s", connector_dir.name, custom_result.stderr[-500:])
|
|
else:
|
|
logger.info("Custom connector %s completed", connector_dir.name)
|
|
except subprocess.TimeoutExpired:
|
|
logger.error("Custom connector %s timed out", connector_dir.name)
|
|
|
|
# Materialized SQL pass — runs admin-registered SQL through the
|
|
# source's DuckDB extension (BQ via BqAccess, Keboola via
|
|
# KeboolaAccess) and writes parquet for due rows. _run_materialized_pass
|
|
# itself dispatches by source_type, so we always run it regardless of
|
|
# which (or both) source types have a `project` / `stack_url` set —
|
|
# Keboola-only instances would otherwise silently skip Keboola
|
|
# materialized rows just because no BQ project is configured (Devin
|
|
# finding 2026-05-01: BUG_pr-review-job-3fbd31c9_0001). The BQ
|
|
# branch inside _run_materialized_pass uses a per-row try/except so
|
|
# the sentinel BqAccess (not_configured) raises a typed error that
|
|
# gets recorded against that row only — no cascade.
|
|
try:
|
|
from connectors.bigquery.access import get_bq_access
|
|
from src.db import get_system_db as _get_system_db
|
|
bq_access = get_bq_access() # sentinel if no BQ project; OK
|
|
mat_conn = _get_system_db()
|
|
try:
|
|
mat_summary = _run_materialized_pass(
|
|
mat_conn, bq_access, tables=tables,
|
|
)
|
|
finally:
|
|
mat_conn.close()
|
|
skipped_count = len(mat_summary["skipped"])
|
|
in_flight_count = sum(
|
|
1 for s in mat_summary["skipped"] if s.get("reason") == "in_flight"
|
|
)
|
|
print(
|
|
f"[SYNC] Materialized SQL: {len(mat_summary['materialized'])} ok, "
|
|
f"{skipped_count} skipped (in_flight={in_flight_count}), "
|
|
f"{len(mat_summary['errors'])} errors",
|
|
file=_sys.stderr, flush=True,
|
|
)
|
|
for err in mat_summary["errors"]:
|
|
print(
|
|
f"[SYNC] {err['table']}: {err['error']}",
|
|
file=_sys.stderr, flush=True,
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
f"[SYNC] Materialized SQL pass FAILED: {e}",
|
|
file=_sys.stderr, flush=True,
|
|
)
|
|
traceback.print_exc()
|
|
|
|
# Rebuild master views (reads extract.duckdb files, no write conflict)
|
|
from src.orchestrator import SyncOrchestrator
|
|
orch = SyncOrchestrator()
|
|
views = orch.rebuild()
|
|
print(f"[SYNC] Orchestrator rebuild: {{{', '.join(f'{k}: {len(v)}' for k, v in views.items())}}}", file=_sys.stderr, flush=True)
|
|
|
|
# Auto-profile synced tables (best-effort, don't fail sync on profile error)
|
|
try:
|
|
from src.profiler import profile_table, TableInfo
|
|
from src.repositories.profiles import ProfileRepository
|
|
|
|
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
|
|
extracts_dir = data_dir / "extracts"
|
|
|
|
sys_conn = get_system_db()
|
|
try:
|
|
profile_repo = ProfileRepository(sys_conn)
|
|
profiled = 0
|
|
for source_name, table_names in views.items():
|
|
for table_name in table_names[:10]: # Limit per sync
|
|
pq_path = extracts_dir / source_name / "data" / f"{table_name}.parquet"
|
|
if not pq_path.exists():
|
|
continue
|
|
try:
|
|
table_info = TableInfo(name=table_name, table_id=table_name)
|
|
profile = profile_table(table_info, pq_path, [], {}, {})
|
|
profile_repo.save(table_name, profile)
|
|
profiled += 1
|
|
except Exception as pe:
|
|
print(f"[SYNC] Profile {table_name}: {pe}", file=_sys.stderr, flush=True)
|
|
print(f"[SYNC] Profiled {profiled} tables", file=_sys.stderr, flush=True)
|
|
finally:
|
|
sys_conn.close()
|
|
except Exception as e:
|
|
print(f"[SYNC] Profiler skipped: {e}", file=_sys.stderr, flush=True)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
# Outer-handler fallback for any subprocess.run call site (e.g.
|
|
# custom-connectors below) that didn't already catch its own
|
|
# TimeoutExpired. Concrete timeout value isn't available here —
|
|
# log generically.
|
|
print("[SYNC] Extractor subprocess timed out", file=_sys.stderr, flush=True)
|
|
except Exception as e:
|
|
print(f"[SYNC] FAILED: {e}", file=_sys.stderr, flush=True)
|
|
traceback.print_exc()
|
|
finally:
|
|
_sync_lock.release()
|
|
|
|
|
|
# ---- Manifest ----
|
|
|
|
def _build_manifest_for_user(conn, user: dict) -> dict:
|
|
"""Build manifest dict filtered by user's accessible tables.
|
|
|
|
Joins ``sync_state`` with ``table_registry`` so each table entry exposes
|
|
``query_mode`` and ``source_type``. The CLI uses these to decide whether
|
|
to download a parquet (local) or skip it (remote, e.g. BigQuery views).
|
|
|
|
Defensive defaults: if a sync_state row has no matching registry entry
|
|
(race / manual deletion), fall back to ``query_mode='local'`` and
|
|
``source_type=''`` so the manifest still serializes cleanly.
|
|
"""
|
|
sync_repo = SyncStateRepository(conn)
|
|
table_repo = TableRegistryRepository(conn)
|
|
all_states = sync_repo.get_all_states()
|
|
# `sync_state.table_id` is sourced from `_meta.table_name` which equals
|
|
# `table_registry.name`, NOT `table_registry.id`. Auto-discovered Keboola
|
|
# tables and manually-registered ones with mixed-case/spaced names produce
|
|
# id != name; an id-keyed lookup would miss them and silently default to
|
|
# `query_mode=local`, causing the CLI to try downloading remote tables.
|
|
registry_by_name = {t["name"]: t for t in table_repo.list_all()}
|
|
|
|
# Filter by user's accessible tables. `can_access_table` has its own
|
|
# admin shortcut (Admin group → True). Lookup translates name→id first
|
|
# because `s["table_id"]` is sourced from `_meta.table_name` = registry
|
|
# `name` while `can_access_table` keys on registry `id`; when id != name
|
|
# an id-keyed call would miss.
|
|
def _id_for(state):
|
|
reg = registry_by_name.get(state["table_id"])
|
|
return reg["id"] if reg else state["table_id"]
|
|
all_states = [s for s in all_states if can_access_table(user, _id_for(s), conn)]
|
|
|
|
data_dir = _get_data_dir()
|
|
tables = {}
|
|
for state in all_states:
|
|
table_id = state["table_id"]
|
|
reg = registry_by_name.get(table_id, {})
|
|
tables[table_id] = {
|
|
"hash": state.get("hash", ""),
|
|
"updated": state.get("last_sync").isoformat() if state.get("last_sync") else None,
|
|
"size_bytes": state.get("file_size_bytes", 0),
|
|
"rows": state.get("rows", 0),
|
|
"query_mode": reg.get("query_mode") or "local",
|
|
"source_type": reg.get("source_type") or "",
|
|
}
|
|
|
|
# Asset hashes
|
|
docs_dir = data_dir / "docs"
|
|
assets = {}
|
|
for asset_name, asset_path in [
|
|
("docs", docs_dir),
|
|
("profiles", data_dir / "src_data" / "metadata" / "profiles.json"),
|
|
]:
|
|
if asset_path.exists():
|
|
if asset_path.is_file():
|
|
assets[asset_name] = {"hash": _file_hash(asset_path)}
|
|
else:
|
|
newest = max(
|
|
(f.stat().st_mtime for f in asset_path.rglob("*") if f.is_file()),
|
|
default=0,
|
|
)
|
|
assets[asset_name] = {"hash": str(int(newest))}
|
|
|
|
return {
|
|
"tables": tables,
|
|
"assets": assets,
|
|
"server_time": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
|
|
@router.get("/manifest")
|
|
async def sync_manifest(
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Return hash-based manifest of all synced data, filtered per user."""
|
|
return _build_manifest_for_user(conn, user)
|
|
|
|
|
|
# ---- Status ----
|
|
|
|
@router.get("/status")
|
|
async def sync_status():
|
|
"""Whether a sync is currently in flight on this app process.
|
|
|
|
Public (no auth) — used by the host-side ``agnes-auto-upgrade.sh``
|
|
cron to decide whether to skip a `docker compose up -d` that would
|
|
kill a running extractor / materialized pass mid-flight. Cheap to
|
|
serve (single Lock.locked() check) and contains no sensitive data.
|
|
|
|
Returns:
|
|
``{"locked": bool}`` — True if `_sync_lock` is currently held by
|
|
a `_run_sync` invocation. The host script defers the upgrade
|
|
when this is True and retries on the next 5-min cron tick.
|
|
"""
|
|
return {"locked": _sync_lock.locked()}
|
|
|
|
|
|
# ---- Trigger ----
|
|
|
|
@router.post("/trigger")
|
|
async def trigger_sync(
|
|
background_tasks: BackgroundTasks,
|
|
body: Optional[Any] = Body(None),
|
|
user: dict = Depends(require_admin),
|
|
):
|
|
"""Trigger data sync from configured source. Admin only. Runs in background.
|
|
|
|
Body accepts three shapes (all optional — empty body / `null` syncs
|
|
every registered table):
|
|
|
|
- ``["kbc_job", "orders"]`` — bare JSON array of table ids
|
|
- ``{"tables": ["kbc_job", "orders"]}`` — object with a ``tables``
|
|
key (matches the wire shape of the response, more discoverable
|
|
for clients building requests by hand)
|
|
- ``null`` / no body — sync everything
|
|
|
|
Both array forms have shipped at different times; accepting both
|
|
keeps older clients (PR-build CLIs, helper scripts) working while
|
|
surfacing the shape that mirrors the response payload. Anything
|
|
else returns HTTP 422 with a structured detail.
|
|
|
|
Returns 409 if a previously-triggered sync is still running. Two
|
|
concurrent extractor subprocesses fight for the same `extract.duckdb`
|
|
file lock — that contention starves uvicorn, makes `/api/health` time
|
|
out, flips the container to `unhealthy`, and (behind a `reverse_proxy`
|
|
upstream like the bundled Caddy overlay) bricks external traffic
|
|
until contention drains. Fast-fail here keeps that from happening.
|
|
"""
|
|
if body is None:
|
|
tables: Optional[List[str]] = None
|
|
elif isinstance(body, list):
|
|
tables = list(body)
|
|
elif isinstance(body, dict):
|
|
tables = body.get("tables")
|
|
if tables is not None and not isinstance(tables, list):
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="`tables` must be a list of strings",
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail=(
|
|
"body must be a list of table ids, an object with a "
|
|
"`tables` list, or null"
|
|
),
|
|
)
|
|
if tables is not None and not all(isinstance(t, str) for t in tables):
|
|
raise HTTPException(
|
|
status_code=422,
|
|
detail="all entries in `tables` must be strings",
|
|
)
|
|
|
|
if _sync_lock.locked():
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="sync_already_in_progress",
|
|
)
|
|
background_tasks.add_task(_run_sync, tables)
|
|
return {
|
|
"status": "triggered",
|
|
"tables": tables or "all",
|
|
"message": "Data sync started in background. Check /api/health for progress.",
|
|
}
|
|
|
|
|
|
# ---- Sync Settings (dataset subscriptions) ----
|
|
|
|
class SyncSettingsUpdate(BaseModel):
|
|
datasets: dict # {dataset_name: bool}
|
|
|
|
|
|
@router.get("/settings")
|
|
async def get_sync_settings(
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Get user's dataset sync settings."""
|
|
repo = SyncSettingsRepository(conn)
|
|
settings = repo.get_user_settings(user["id"])
|
|
enabled = repo.get_enabled_datasets(user["id"])
|
|
return {
|
|
"user_id": user["id"],
|
|
"settings": settings,
|
|
"enabled_datasets": enabled,
|
|
}
|
|
|
|
|
|
@router.post("/settings")
|
|
async def update_sync_settings(
|
|
request: SyncSettingsUpdate,
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Update user's dataset sync settings.
|
|
|
|
A dataset can only be enabled when the user has access (via
|
|
``resource_grants(group, "table", dataset)`` or Admin membership). The
|
|
user_sync_settings layer is per-user preference, not authorization —
|
|
the gate stops users from enabling sync on tables they cannot read.
|
|
"""
|
|
from app.auth.access import can_access
|
|
from app.resource_types import ResourceType
|
|
|
|
settings_repo = SyncSettingsRepository(conn)
|
|
results = {}
|
|
for dataset, enabled in request.datasets.items():
|
|
if not can_access(user["id"], ResourceType.TABLE.value, dataset, conn):
|
|
results[dataset] = {"error": "no permission"}
|
|
continue
|
|
settings_repo.set_dataset_enabled(user["id"], dataset, enabled)
|
|
results[dataset] = {"enabled": enabled}
|
|
|
|
return {"updated": results}
|
|
|
|
|
|
# ---- Table Subscriptions ----
|
|
|
|
class TableSubscriptionUpdate(BaseModel):
|
|
table_mode: str = "all" # "all" or "explicit"
|
|
tables: dict = {} # {table_name: bool}
|
|
|
|
|
|
@router.get("/table-subscriptions")
|
|
async def get_table_subscriptions(
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Get user's per-table subscription settings."""
|
|
repo = SyncSettingsRepository(conn)
|
|
settings = repo.get_user_settings(user["id"])
|
|
return {"user_id": user["id"], "subscriptions": settings}
|
|
|
|
|
|
@router.post("/table-subscriptions")
|
|
async def update_table_subscriptions(
|
|
request: TableSubscriptionUpdate,
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
):
|
|
"""Update per-table subscription preferences."""
|
|
repo = SyncSettingsRepository(conn)
|
|
for table_name, enabled in request.tables.items():
|
|
repo.set_dataset_enabled(user["id"], table_name, enabled)
|
|
return {"table_mode": request.table_mode, "updated": len(request.tables)}
|