From 995e4cd36680c56544385f814fe395418bd40159 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr <139972147+ZdenekSrotyr@users.noreply.github.com> Date: Wed, 29 Apr 2026 11:44:00 +0200 Subject: [PATCH] fix(scheduler): HTTP marketplaces job + SCHEDULER_API_TOKEN shared secret (#127) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(scheduler): HTTP marketplaces job + SCHEDULER_API_TOKEN shared secret Two scheduler-reliability bugs surfaced after the v0.12.1 USER-agnes flip: 1. The marketplaces job called src.marketplace.sync_marketplaces() in-process from the scheduler container, racing the app's long-lived system.duckdb handle. DuckDB rejects cross-process writers — every cron tick 500-ed on "Could not set lock on file ... PID 0". 2. The data-refresh + new marketplaces jobs both 401-ed on the API because SCHEDULER_API_TOKEN was never propagated by the Terraform startup script. The scheduler had no credential to authenticate with. Fix: - New POST /api/marketplaces/sync-all (admin-only) drives the nightly refresh through the app process so it inherits the existing DB connection. - Scheduler swaps fn->http for marketplaces; all jobs are now plain HTTP and the scheduler is reduced to a cron clock. - New app/auth/scheduler_token.py adds a shared-secret auth path. The startup script generates a 256-bit secret on first boot, persists it across reboots, and writes it to /opt/agnes/.env. Both containers source the same .env. The app validates incoming Bearer tokens against the env var (constant-time, length-floored) and resolves matches to a synthetic scheduler@system.local user that's a member of the Admin system group. Audit-log entries from the scheduler are attributed to this user. - app/main.py seeds the synthetic user at startup so the first cron tick has a valid actor; lazy seed in get_scheduler_user covers token rotation before the next app restart. Tests: 5 new in tests/test_auth_scheduler_token.py covering empty/short secret rejection, exact-match comparison, idempotent user seeding, and lazy provisioning. 142 marketplace + scheduler tests + 96 auth tests remain green. Existing VMs with .env from before this change need a one-time re-provisioning (re-run startup-script or rotate via openssl rand); documented in CHANGELOG. * fix(audit): use '_all' sentinel for bulk marketplace sync — Devin review #127 Avoids the literal string 'marketplace:None' in the audit_log resource column when the bulk sync endpoint writes its summary row. * fix(scheduler): unblock event loop + per-job timeouts — Devin review #127 Two findings from Devin re-review on commit 5fbad15: 1. BUG: trigger_sync_all was async def, so FastAPI ran it on the asyncio event loop. sync_marketplaces() does blocking I/O (subprocess git clones up to GIT_TIMEOUT_SEC=300 each, threading.Lock, DuckDB writes) and would freeze every concurrent request for the duration of a bulk sync. Switched to plain def so FastAPI auto-routes to the thread pool. 2. ANALYSIS: scheduler used a fixed 120s httpx timeout for every POST. Bulk marketplace sync iterates the registry under a single lock with up to 300s per repo — easily exceeds 120s on 2-3 slow repos. The scheduler then sees a timeout, doesn't update last_run, and re-fires on the next 30s tick, queueing redundant work. Per-job timeout override added to the JOBS tuple; marketplaces gets 900s (15 min), data-refresh keeps 120s, health-check 30s. * fix(auth): require_session_token rejects scheduler shared secret — Devin review #127 require_session_token gates /auth/tokens (PAT minting). Pre-fix it only rejected JWTs with typ=pat — but the scheduler shared secret is an opaque string, so verify_token() returns None, payload becomes {}, and the PAT-claim check silently passed. A caller bearing SCHEDULER_API_TOKEN could mint persistent PATs that survive a secret rotation. Added explicit is_scheduler_token() check before the PAT-claim check; new regression test in tests/test_auth_scheduler_token.py. Devin's other note (pre-existing async def trigger_sync at marketplaces.py:392 also calls blocking sync_one) — Devin flagged it as out-of-scope for this PR and I agree; tracking separately. * release(0.17.0): cut + clean up CHANGELOG duplicates Cuts 0.17.0 (minor: scheduler shared-secret auth + sync-all endpoint plus the deploy-shape fixes that landed since the last release tag). Bumps pyproject from 0.15.0 — also corrects the missed bump from PR #120 (v0.16.0 was tagged on GitHub and shipped as :stable, but pyproject stayed at 0.15.0, so /api/version, /cli/latest, and `da --version` had been under-reporting the running release). Removes the long-form duplicate entries for 0.13.0 / 0.14.0 / 0.15.0 above [0.16.0] — the canonical short summaries (with GitHub-release links) already exist below 0.16.0, the long forms were leftover state from before those versions were cut and have been silently shadowed ever since. --- CHANGELOG.md | 59 ++----- CLAUDE.md | 2 +- app/api/marketplaces.py | 44 +++++ app/auth/dependencies.py | 38 ++++- app/auth/scheduler_token.py | 136 ++++++++++++++++ app/main.py | 29 ++++ .../customer-instance/startup-script.sh.tpl | 20 +++ pyproject.toml | 2 +- services/scheduler/__main__.py | 95 +++++------ tests/test_auth_scheduler_token.py | 153 ++++++++++++++++++ 10 files changed, 469 insertions(+), 109 deletions(-) create mode 100644 app/auth/scheduler_token.py create mode 100644 tests/test_auth_scheduler_token.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7546a50..5027142 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,17 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.17.0] — 2026-04-29 + +### Added + +- **Shared-secret auth path for the in-cluster scheduler service** (`SCHEDULER_API_TOKEN`). Both the `app` and `scheduler` containers source the same `/opt/agnes/.env` via Docker Compose `env_file:`, so a 256-bit secret generated once at VM provisioning serves both sides symmetrically. The app validates incoming `Authorization: Bearer ` against the env var (constant-time compare; minimum length 32 chars; rejected when env is empty) and resolves matches to a synthetic `scheduler@system.local` user that is a member of the `Admin` system group — every existing RBAC gate (`require_admin`, `require_resource_access`) works unchanged. Audit-log entries from the scheduler are attributed to this user. Rotation: edit `.env`, `docker compose restart app scheduler`. See `app/auth/scheduler_token.py` for the threat model. +- **`POST /api/marketplaces/sync-all`** — admin-only endpoint that runs `src.marketplace.sync_marketplaces()` inside the app process. Wired up so the scheduler container can drive the nightly refresh over HTTP without opening `system.duckdb` directly. + ### Fixed +- **Scheduler `marketplaces` job 500-ed every cron tick with `IO Error: Could not set lock on file system.duckdb` after v0.12.1.** The previous implementation called `src.marketplace.sync_marketplaces()` in-process from the scheduler container, but DuckDB permits only one writer per file across processes — the scheduler raced the app's long-lived handle. Switched the job to `POST /api/marketplaces/sync-all`, making the app the sole writer; the scheduler is now a pure cron clock. +- **Scheduler `data-refresh` job 401-ed every 15 minutes** with `Missing or invalid Authorization header` because `SCHEDULER_API_TOKEN` was never propagated by `infra/modules/customer-instance/startup-script.sh.tpl`. The startup script now generates a 64-hex-char secret on first boot via `openssl rand -hex 32`, persists it across reboots by reading back from an existing `.env` (rotation requires explicit operator action — both containers must restart together), and writes it into `/opt/agnes/.env` alongside the other secrets. `app/main.py` seeds the matching synthetic user at startup so the very first cron tick has a valid actor to attribute audit-log entries to. Existing VMs need a one-time `sudo /opt/agnes/agnes-rotate-scheduler-token.sh` (or simply re-run the startup script via `terraform apply -replace='module.agnes.google_compute_instance.vm[""]'`); see migration note in this changelog or rerun the startup script manually. - **Non-root container couldn't write to host-bind-mounted `/data` after the v0.12.1 USER-agnes flip.** `infra/modules/customer-instance/startup-script.sh.tpl` now `chown -R 999:999 /data` after creating the persistent-disk subdirs (`state`, `analytics`, `extracts`). Without this, a freshly-attached PD is root-owned by default and `USER agnes` (uid 999) cannot open `/data/state/system.duckdb` for write — every authed request 500s with `IOException: Cannot open file ... Permission denied` while `/api/health` (which doesn't open the system DB) keeps returning 200, masking the failure from health-only monitoring. Regression first observed on `agnes-development` on 2026-04-29 after the auto-upgrade picked up `:stable` from the 0.12.1 release. **Existing VMs with PD-backed `/data` need a one-time host-side `sudo chown -R 999:999 /var/lib/docker/volumes/agnes_data/_data && sudo docker restart agnes-app-1 agnes-scheduler-1` to recover** — Terraform `metadata_startup_script` only runs on boot, so an apply alone does not retro-fix running VMs. - `Dockerfile` pins the `agnes` user to `uid:gid 999:999` explicitly (`useradd --uid 999`). Previously the uid was whatever Debian's `useradd --system` assigned next — happened to be 999 today, but a future base-image change picking 998 or 1000 would silently desync from the startup-script's `chown 999:999`, reintroducing the same incident. Pinning makes the contract grep-able from both sides. - `scripts/smoke-test.sh` no longer silently SKIPs every authed check when `bootstrap` returns 403 (users exist) and `SMOKE_TOKEN` is not set — it now FAILs loudly. Also adds an unauthenticated DB-touching probe (`POST /auth/email/request`) before bootstrap, since `/api/health` deliberately doesn't open `system.duckdb` (kept cheap for LB probes) and so cannot detect filesystem/permission issues. The new probe catches the foundryai-development class of regression even on instances where bootstrap is closed. @@ -23,56 +32,6 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C - `release.yml` adds an `e2e-bind-mount` job that boots the freshly built image against a host-bind-mounted `/data` directory (instead of the named volume the existing `smoke-test` job uses). Docker initializes a fresh named volume by copying from the image's `/data` — which the Dockerfile chowns to `agnes:agnes` before flipping USER — so the named-volume path always works. The bind-mount path mirrors what GCE VMs run via `docker-compose.host-mount.yml`, and includes a negative assertion (write must fail on root-owned `/data` before the operator chown) plus a positive assertion (smoke passes after the chown). Locks in the contract that broke `agnes-development`: removing `chown 999:999` from `startup-script.sh.tpl` or changing the Dockerfile uid pin breaks CI. -## [0.15.0] — 2026-04-29 - -### Added - -- **Corporate-memory v1 + v1.5 — confidence, contradictions, audience distribution, and rule sync.** Issue #72. - - Schema v15: `knowledge_items` gains context-engineering columns (`confidence`, `domain`, `entities`, `source_type`, `source_ref`, `valid_from`, `valid_until`, `supersedes`, `sensitivity`, `is_personal`); new `knowledge_contradictions` table for surfacing conflicting facts and `session_extraction_state` for the verification detector's idempotent resume. - - Schema v16: `verification_evidence` table — one row per analyst confirmation, indexed on `item_id`, drives the `confidence` calculation in `services/corporate_memory/confidence.py` (linear-decay with floor + additional-verifier boost; configurable via `instance.yaml`). - - **Server**: `GET /api/memory/bundle` returns mandatory + ranked-approved items within a token budget (default 6000 ≈ 24KB) — drives `da sync`'s rule write. `GET /api/memory/stats` now uses SQL aggregation (no full-list materialization, doesn't block the event loop). `POST /api/memory/admin/mandate` and `POST /api/memory/admin/batch` accept an `audience` field; the audience is matched against the caller's `user_group_members` JOIN `user_groups` on read so a user in group `finance` sees `audience='group:finance'` items, and admins see all. Verification-detector endpoint extracts knowledge candidates from session JSONL files and merges evidence into the existing item when a fact is re-asserted. - - **CLI**: `da sync` step 7 (`_fetch_and_write_rules`) calls `/api/memory/bundle`, writes `mandatory` items to `.claude/rules/km_.md` (one file per item) and concatenates `approved` into `.claude/rules/km_approved.md`. Stale `km_*.md` files from a previous run are pruned. Best-effort: any HTTP/JSON failure is logged and sync continues. - - **Auto-tagging**: `services/corporate_memory/tagger.py` runs an LLM extraction pass on knowledge create/update when `ai:` is configured in `instance.yaml`. Wrapped in `asyncio.to_thread` so it doesn't block the event loop. Best-effort: missing config or LLM error → item created with no auto-tags. - - **Per-item privacy**: `is_personal` items are visible only to the contributor and platform admins (members of the `Admin` system group). The `_can_view_item` helper takes a pre-computed `is_priv` flag so list endpoints don't re-query `user_group_members` per item. - - **Audit log**: every admin action (mandate / approve / reject / revoke / mandate-batch / contradiction resolve) writes a row tagged `corporate_memory.` with the affected item ids and reason field. -- `/me/debug` — self-only auth diagnostic page. Shows the logged-in user their own decoded JWT claims (no raw token), group memberships with sources and bound `external_id` when present, resource grants effective via those memberships, and a "Refetch from Google (dry-run)" button that issues a fresh `fetch_user_groups` call and reports the diff against the cached `user_group_members` snapshot without writing anything. Gated by `AGNES_DEBUG_AUTH=true` env var (default off → route returns 404 and the navbar item is not rendered). Intended for dev / staging VMs; do not enable on customer-facing instances. Issue #116. - -### Changed - -- **`POST /api/memory/{id}/vote` accepts `vote=0`** to retract a previous vote (toggle un-vote from the UI). Pre-fix the API rejected vote=0 with 400 and the UI's toggle logic silently no-op'd on un-toggle. -- **`/admin/corporate-memory` and `/corporate-memory/admin` are gated by `require_admin`** (admin-group membership) instead of the v9-era `require_role(Role.KM_ADMIN)`. The km_admin role was collapsed into admin in main's RBAC v13; module authors needing finer-grained corporate-memory curation should use a `resource_grants` row of type `corporate_memory_admin`. - -### Internal - -- `_effective_groups` in `app/api/memory.py` queries `user_group_members JOIN user_groups` instead of reading the `users.groups` JSON column (dropped in v13). The audience-distribution tests in `tests/test_memory_api.py` use a `_add_user_to_group` helper that inserts into `user_group_members` + `user_groups` directly. -- `jsonschema` added to dev dependencies for the corporate-memory schema-validation fixtures (`tests/test_corporate_memory_v1.py::TestSchemaValidation`). Production code does not import it. - -## [0.14.0] — 2026-04-29 - -### Added - -- **v2 fetch primitives — discovery + scoped fetch** for analytical workflows. Replaces the BigQuery wrap-view pattern (which caused "Response too large" on multi-hundred-million-row source views) with a Claude-session-driven toolkit. Server exposes `GET /api/v2/catalog` (RBAC-filtered table list with flavor + fetch-via hints), `GET /api/v2/schema/{table_id}` (column metadata + BQ flavor hints), `GET /api/v2/sample/{table_id}` (N sample rows), `POST /api/v2/scan` (validator + RBAC + quota + max_result_bytes guard, Arrow IPC stream), `POST /api/v2/scan/estimate` (BigQuery dryRun, no execution). CLI gains `da catalog`, `da schema`, `da describe`, `da fetch --select … --where … --limit N [--estimate] [--as ]`, `da snapshot list/refresh/drop/prune`, `da disk-info`. WHERE-clause validator at `app/api/where_validator.py` is sqlglot-backed with structural rejects + function allow-list + column-existence enforcement. Process-local quota tracker (concurrent + daily bytes per user). New `cli/skills/agnes-data-querying.md` standalone skill + CLAUDE.md addendum tells Claude to discover → estimate → fetch a filtered subset locally → analyze. 11 new `instance.yaml` knobs (`api.scan_max_concurrent_per_user`, `api.scan_daily_bytes_per_user`, `api.scan_max_result_bytes`, `api.where_clause_max_length`, `api.catalog_cache_ttl_seconds`, `api.schema_cache_ttl_seconds`, `api.sample_cache_ttl_seconds`, `data_source.bigquery.billing_project`, `data_source.bigquery.legacy_wrap_views`, `snapshots.dir`, `snapshots.cache_size_limit_gb`). Issue #101. -- `data_source.bigquery.billing_project` config knob — explicit billing project for BQ scan + estimate. Defaults to `data_source.bigquery.project`. Matters for cross-project read patterns where the VM service account has `bigquery.data.*` on the data project but lacks `serviceusage.services.use` there; setting this to a project where the SA holds `serviceusage.services.use` fixes the dry-run 403. - -### Changed - -- **BREAKING**: `connectors/bigquery/extractor.py` no longer creates a wrap view (`SELECT * FROM bigquery_query(...)`) for VIEW / MATERIALIZED_VIEW entities by default. Operators relying on the old behavior must set `data_source.bigquery.legacy_wrap_views: true` in `instance.yaml` for one release cycle. BASE TABLE entities are unchanged. The new `da fetch` workflow replaces wrap views for analytical queries — see CLAUDE.md "Querying Agnes data — agent rails". -- `RegisterTableRequest.primary_key` and `UpdateTableRequest.primary_key` accept `Optional[List[str]]` for composite keys (session-grain MSA tables key on `(session_id, event_date)`, browse rows on more); a bare string remains accepted for backward compat via a `field_validator(mode="before")` that wraps it in a one-element list. Old CLI scripts posting `"primary_key": "session_id"` continue to work. -- `GET /api/v2/catalog` now caches the underlying `repo.list_all()` rows globally with the documented `api.catalog_cache_ttl_seconds` default (300s) and runs RBAC fresh per request. The previous per-user cache served stale RBAC-filtered results for up to TTL after a permission flip — `v2_schema.py` and `v2_sample.py` already had this pattern; `v2_catalog.py` now matches. - -## [0.13.0] — 2026-04-29 - -### Added - -- **Windows/PowerShell wrapper for local dev.** New `scripts/run-local-dev.ps1` mirrors `scripts/run-local-dev.sh` for operators on Windows where GNU Make / bash aren't available — same compose stack (`docker-compose.yml` + `docker-compose.dev.yml` + `docker-compose.local-dev.yml`), same `LOCAL_DEV_GROUPS` default seeding, same `up` / `down` / `logs` actions. Run `.\scripts\run-local-dev.ps1` for the fast path (reuses existing image) or `.\scripts\run-local-dev.ps1 -Build` to force `--build` after `pyproject.toml` / `Dockerfile` changes. Verified on Docker Desktop for Windows. See `docs/local-development.md`. -- **Admin server configuration editor** at `/admin/server-config` — admins can now view and edit `instance.yaml` from the web UI without SSHing into the host. Two new endpoints (`GET /api/admin/server-config` returns the current config with secret-looking values masked; `POST /api/admin/server-config` deep-merges a section patch into `DATA_DIR/state/instance.yaml`). The page lists the editable sections (`instance`, `data_source`, `email`, `telegram`, `jira`, `theme`, `server`, `auth`) and renders a per-field form. Saves touching `auth.*` or `server.*` ("danger zone" — can lock operators out) require an explicit confirmation step. Every save writes an `instance_config.update` row to `audit_log` with a per-field diff (secret values masked as `***`, field paths preserved so a rotation is recorded as `email.smtp_password: *** → ***`). Issue #91. - -### Fixed - -- `app/instance_config.py:load_instance_config` now deep-merges the static `CONFIG_DIR/instance.yaml` with the writable overlay at `DATA_DIR/state/instance.yaml` instead of returning the overlay verbatim when present. Pre-fix, the first save through the new server-config editor (which writes only the section the operator actually touched) caused every consumer of static-only sections (corporate memory, dataset list, OpenMetadata client) to fall through to empty defaults until the overlay was deleted. Issue #91. -- `POST /api/admin/configure` now uses the same narrow-overlay write strategy as the new server-config editor: it reads the overlay verbatim (no static fallback), patches only `instance` / `auth` / `data_source`, and writes atomically via tmp + `os.replace`. Pre-fix it seeded `existing` from the env-resolved merged config when no overlay file was present and dumped the whole thing back, persisting cleartext `${ENV_VAR}` values (e.g. `smtp_password`) into the writable overlay even though the wizard never touched those sections. Issue #91. -- `POST /api/admin/server-config` now strips redaction sentinels (`***` / ``) out of every secret-keyed leaf in the incoming patch before the deep-merge. The companion GET endpoint masks secret-keyed children inside nested objects (e.g. `data_source.keboola.token_env`), and the form renders those nested objects as JSON textareas — without the scrub, a no-op save would round-trip the masked JSON back and overwrite the real overlay value (`token_env: "KEBOOLA_STORAGE_TOKEN"` → `"***"`), silently breaking the next sync. Defense-in-depth on both sides: the client form scrubs before posting, and the server scrubs before merge so an API caller (CLI / script) can't corrupt secrets either. Issue #91. - ## [0.16.0] — 2026-04-29 Minor release. Comprehensive deploy safety audit — CI/CD pipeline hardening, 50+ new tests covering previously untested failure modes, DB schema health check, config versioning, and BigQuery ATTACH error resilience. Built on top of v0.15.0 / `2e1dfb7`. diff --git a/CLAUDE.md b/CLAUDE.md index 1f7ff80..9be3f18 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -257,7 +257,7 @@ Admin-managed git repos cloned nightly to `${DATA_DIR}/marketplaces//` so FastAPI can read their contents from disk. - Register via `/admin/marketplaces` (admin UI) or `POST /api/marketplaces`. -- Scheduler calls `src.marketplace.sync_marketplaces()` in-process at `daily 03:00` UTC — no HTTP round-trip to the main app. +- Scheduler calls `POST /api/marketplaces/sync-all` (admin-only, authed via `SCHEDULER_API_TOKEN`) at `daily 03:00` UTC. Routing through HTTP keeps the app the sole writer to `system.duckdb` — the previous in-process call from the scheduler container raced the app's long-lived DB handle and 500-ed on `Could not set lock on file`. - Manual re-sync from the UI ("Sync now") hits `POST /api/marketplaces/{id}/sync`. - PATs for private repos persist to `${DATA_DIR}/state/.env_overlay` (chmod 600) as `AGNES_MARKETPLACE__TOKEN`. DuckDB stores only the env-var name (`token_env`), never the secret. - Registry lives in DuckDB table `marketplace_registry` (schema v9). diff --git a/app/api/marketplaces.py b/app/api/marketplaces.py index 952b126..b802726 100644 --- a/app/api/marketplaces.py +++ b/app/api/marketplaces.py @@ -22,6 +22,7 @@ from src.marketplace import ( MarketplaceNotFound, delete_marketplace_dir, is_valid_slug, + sync_marketplaces, sync_one, ) from src.repositories.audit import AuditRepository @@ -409,3 +410,46 @@ async def trigger_sync( {"commit": result["commit"], "action": result["action"]}, ) return result + + +@router.post("/sync-all") +def trigger_sync_all( + user: dict = Depends(require_admin), + conn: duckdb.DuckDBPyConnection = Depends(_get_db), +): + """Sync every registered marketplace. + + Wired up so the scheduler service can drive the nightly refresh over + HTTP. The previous implementation called ``src.marketplace.sync_marketplaces`` + in-process from the scheduler container, which conflicted with the app's + long-lived ``system.duckdb`` handle (DuckDB allows only one writer per + file across processes). Routing through the app inherits the existing + connection without contention. + + Declared ``def`` (not ``async def``) so FastAPI runs it in a thread + pool — :func:`sync_marketplaces` does blocking I/O (subprocess git + clones with ``GIT_TIMEOUT_SEC=300`` per repo, DuckDB writes, a + process-wide threading.Lock) and would freeze the event loop for the + duration of a bulk sync if it ran on the asyncio thread. Health + checks, login redirects, and every other concurrent request keep + serving while the bulk sync churns through the registry. + + One audit row per call summarises the outcome — per-marketplace details + live in ``marketplace_registry`` and the per-call result payload below. + """ + result = sync_marketplaces() + # _audit appends "marketplace:" to the target id when writing the + # resource column. "_all" produces "marketplace:_all" — a stable, + # greppable sentinel for bulk-sync rows; the real per-marketplace + # commit/error breakdown is in the params payload. + _audit( + conn, + user["id"], + "marketplace.sync_all", + "_all", + { + "synced": [r.get("id") for r in result.get("synced", [])], + "errors": [{"id": e.get("id"), "error": e.get("error")} for e in result.get("errors", [])], + }, + ) + return result diff --git a/app/auth/dependencies.py b/app/auth/dependencies.py index 84f6086..4229c5e 100644 --- a/app/auth/dependencies.py +++ b/app/auth/dependencies.py @@ -179,6 +179,21 @@ async def get_current_user( detail="Missing or invalid Authorization header", ) + # Shared-secret path for the in-cluster scheduler. Checked before + # pat_resolver because the scheduler token is not a JWT — feeding it to + # verify_token() would log a spurious decode warning every cron tick. + # See app/auth/scheduler_token.py for the threat model. + from app.auth.scheduler_token import get_scheduler_user, is_scheduler_token + if is_scheduler_token(token): + scheduler_user = get_scheduler_user(conn) + if scheduler_user: + _attach_admin_flag(scheduler_user, conn) + return scheduler_user + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Scheduler user not provisioned", + ) + from app.auth.pat_resolver import resolve_token_to_user user, reason = resolve_token_to_user(conn, token, request) if user: @@ -226,8 +241,21 @@ async def get_optional_user( async def require_session_token(request: Request, user: dict = Depends(get_current_user)) -> dict: - """Like get_current_user but rejects PAT — for endpoints that must not - be callable via a long-lived CI token (e.g. creating new tokens, changing password).""" + """Like get_current_user but rejects every non-interactive token kind — + for endpoints that must not be callable via a long-lived service or CI + credential (e.g. creating new tokens, changing password). + + Two non-interactive paths exist today: + + 1. **PAT** — JWT with ``typ="pat"``. Detected by decoding the JWT and + inspecting the claim. + 2. **Scheduler shared secret** — opaque string equal to + ``SCHEDULER_API_TOKEN``. Not a JWT, so ``verify_token`` returns None + and the PAT-claim check would silently pass — meaning a caller + holding the scheduler secret could mint persistent PATs through + ``POST /auth/tokens`` that survive a secret rotation. Explicit + check here closes that bypass. + """ auth = request.headers.get("authorization", "") token = None if auth.startswith("Bearer "): @@ -235,6 +263,12 @@ async def require_session_token(request: Request, user: dict = Depends(get_curre if not token and request: token = request.cookies.get("access_token") if token: + from app.auth.scheduler_token import is_scheduler_token + if is_scheduler_token(token): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="This endpoint requires an interactive session, not a service token", + ) from app.auth.jwt import verify_token payload = verify_token(token) or {} if payload.get("typ") == "pat": diff --git a/app/auth/scheduler_token.py b/app/auth/scheduler_token.py new file mode 100644 index 0000000..b7c9517 --- /dev/null +++ b/app/auth/scheduler_token.py @@ -0,0 +1,136 @@ +"""Shared-secret auth path for the in-cluster scheduler service. + +The scheduler container ships every cron tick to the FastAPI app over HTTP +(see ``services.scheduler.__main__``). It needs a long-lived credential to +authenticate itself, but minting a real PAT for it requires a logged-in +session — chicken-and-egg at first boot. + +The pragmatic solution: both the ``app`` and ``scheduler`` containers source +the same ``.env`` (via Docker Compose ``env_file: .env``). The +``infra/modules/customer-instance/startup-script.sh.tpl`` generates a random +``SCHEDULER_API_TOKEN`` once at VM provisioning and writes it there. When a +caller presents that exact secret as ``Authorization: Bearer ``, the +app loads (or seeds on demand) a synthetic ``scheduler@system.local`` user +that is a member of the ``Admin`` system group — so existing RBAC paths +continue to work without special-casing. + +Constraints on the secret (enforced here, not parsed): + +- Empty / unset → this auth path is **disabled**. Production deploys should + set it; dev / LOCAL_DEV_MODE typically doesn't, since the scheduler + rides the dev-bypass instead. +- Length < 32 → treated as misconfiguration and disabled. Prevents an + operator typo that sets ``SCHEDULER_API_TOKEN=todo`` from accidentally + granting admin to a 4-character bearer. +- Comparison uses :func:`hmac.compare_digest` — constant-time so a remote + caller cannot mount a length-discrimination timing attack. + +Audit: every action by this user is attributed to ``scheduler@system.local``, +visible in ``audit_log`` as a normal admin actor. Rotating the secret is +``edit .env → docker compose restart app scheduler``; no DB write needed. +""" + +from __future__ import annotations + +import hmac +import logging +import os +import uuid +from typing import Optional + +import duckdb + +logger = logging.getLogger(__name__) + +# Identity of the synthetic user that backs the shared-secret auth path. +# Kept stable so audit-log entries from the scheduler are easy to filter. +SCHEDULER_USER_EMAIL = "scheduler@system.local" +SCHEDULER_USER_NAME = "Scheduler" + +# Floor on the secret length. 32 bytes ≈ 256 bits of entropy if generated +# from /dev/urandom; well above the brute-force frontier and well above any +# typo a human is plausibly going to make. +SCHEDULER_TOKEN_MIN_LENGTH = 32 + + +def get_scheduler_secret() -> str: + """Return the configured shared secret, stripped. Empty when disabled.""" + return os.environ.get("SCHEDULER_API_TOKEN", "").strip() + + +def is_scheduler_token(token: str) -> bool: + """True iff ``token`` exactly matches the configured shared secret. + + Returns False when the env var is empty or shorter than the minimum + length (auth path disabled). Uses constant-time comparison. + """ + if not token: + return False + secret = get_scheduler_secret() + if not secret or len(secret) < SCHEDULER_TOKEN_MIN_LENGTH: + return False + return hmac.compare_digest(token, secret) + + +def ensure_scheduler_user(conn: duckdb.DuckDBPyConnection) -> dict: + """Idempotently provision the scheduler user + Admin group membership. + + Called both from the app's startup hook (so the user exists from the + very first boot) and lazily from :func:`get_scheduler_user` so a token + presented before the next restart of the app still resolves. + + Returns the user dict in the same shape ``UserRepository.get_by_email`` + yields elsewhere — the caller treats it as any other authenticated user. + """ + from src.db import SYSTEM_ADMIN_GROUP + from src.repositories.user_group_members import UserGroupMembersRepository + from src.repositories.users import UserRepository + + repo = UserRepository(conn) + user = repo.get_by_email(SCHEDULER_USER_EMAIL) + if not user: + user_id = str(uuid.uuid4()) + repo.create( + id=user_id, + email=SCHEDULER_USER_EMAIL, + name=SCHEDULER_USER_NAME, + role="admin", + # No password_hash — this user authenticates via the shared + # secret only, never via /auth/login. Keeps the bootstrap + # check ("any user has a password?") accurate. + password_hash=None, + ) + user = repo.get_by_email(SCHEDULER_USER_EMAIL) + logger.info("Seeded scheduler service user: %s", SCHEDULER_USER_EMAIL) + + admin_group = conn.execute( + "SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP], + ).fetchone() + if admin_group: + UserGroupMembersRepository(conn).add_member( + user_id=user["id"], + group_id=admin_group[0], + source="system_seed", + added_by="app.auth.scheduler_token:ensure_scheduler_user", + ) + + return user + + +def get_scheduler_user(conn: duckdb.DuckDBPyConnection) -> Optional[dict]: + """Look up the scheduler user, seeding it on demand if absent. + + Returns None only when seeding fails — typically a malformed schema or + an out-of-band DB error. The caller (``get_current_user``) maps None + to a normal 401 so the failure is observable but does not crash. + """ + from src.repositories.users import UserRepository + + user = UserRepository(conn).get_by_email(SCHEDULER_USER_EMAIL) + if user: + return user + try: + return ensure_scheduler_user(conn) + except Exception as e: # noqa: BLE001 + logger.error("Failed to provision scheduler user on demand: %s", e) + return None diff --git a/app/main.py b/app/main.py index de91f01..3bba0a6 100644 --- a/app/main.py +++ b/app/main.py @@ -292,6 +292,35 @@ def create_app() -> FastAPI: except Exception as e: logger.warning(f"Could not seed admin: {e}") + # Seed the synthetic scheduler user when SCHEDULER_API_TOKEN is configured, + # so the very first cron tick after a fresh deploy already has a valid + # actor to attribute audit-log entries to. The lazy seed in + # `app.auth.scheduler_token.get_scheduler_user` covers the case where the + # secret is rotated mid-life, but doing it here keeps startup observable. + from app.auth.scheduler_token import get_scheduler_secret + if get_scheduler_secret(): + try: + from app.auth.scheduler_token import ( + SCHEDULER_TOKEN_MIN_LENGTH, + ensure_scheduler_user, + ) + from src.db import get_system_db + secret = get_scheduler_secret() + if len(secret) < SCHEDULER_TOKEN_MIN_LENGTH: + logger.warning( + "SCHEDULER_API_TOKEN is set but only %d chars — auth path" + " disabled (minimum %d). Generate a longer secret in .env.", + len(secret), SCHEDULER_TOKEN_MIN_LENGTH, + ) + else: + conn = get_system_db() + try: + ensure_scheduler_user(conn) + finally: + conn.close() + except Exception as e: + logger.warning(f"Could not seed scheduler user: {e}") + # C8: Warn when no user has a password_hash — bootstrap endpoint is open. # This is intentional UX (operator can claim seed admin), but the open # window should be visible in startup logs so it's not forgotten. diff --git a/infra/modules/customer-instance/startup-script.sh.tpl b/infra/modules/customer-instance/startup-script.sh.tpl index 568ba0e..1986a68 100644 --- a/infra/modules/customer-instance/startup-script.sh.tpl +++ b/infra/modules/customer-instance/startup-script.sh.tpl @@ -75,6 +75,25 @@ if [ "$DATA_SOURCE" = "keboola" ]; then fi JWT_KEY=$(gcloud secrets versions access latest --secret=agnes-$${CUSTOMER_NAME}-jwt-secret) +# SCHEDULER_API_TOKEN — shared secret between the app and scheduler containers. +# Both source the same /opt/agnes/.env via Docker Compose env_file:, so the +# scheduler's outbound bearer token always matches the app's expected value. +# See app/auth/scheduler_token.py for the auth path it unlocks. +# +# Preserve across reboots: the token is plumbed into a long-lived synthetic +# user, and rotating it forces a restart of both containers. Read back from +# an existing .env when present; mint fresh only on the first boot. +SCHEDULER_API_TOKEN="" +if [ -f "$APP_DIR/.env" ]; then + SCHEDULER_API_TOKEN=$(grep -E '^SCHEDULER_API_TOKEN=' "$APP_DIR/.env" | head -1 | cut -d= -f2- | tr -d '"' || true) +fi +if [ -z "$SCHEDULER_API_TOKEN" ]; then + # 64 hex chars = 256 bits of /dev/urandom entropy. Floor enforced in + # app/auth/scheduler_token.SCHEDULER_TOKEN_MIN_LENGTH is 32; 64 leaves + # headroom for a future tightening without re-provisioning every VM. + SCHEDULER_API_TOKEN=$(openssl rand -hex 32) +fi + # Optional Google OAuth credentials. If the operator has created # google-oauth-client-{id,secret} secrets in the project's Secret Manager # AND wired them via runtime_secrets in the calling Terraform, the VM SA can @@ -118,6 +137,7 @@ KEBOOLA_STORAGE_TOKEN=$KEBOOLA_TOKEN KEBOOLA_STACK_URL=$KEBOOLA_STACK_URL SEED_ADMIN_EMAIL=$SEED_ADMIN_EMAIL SEED_ADMIN_PASSWORD=$SEED_ADMIN_PASSWORD +SCHEDULER_API_TOKEN=$SCHEDULER_API_TOKEN LOG_LEVEL=info DOMAIN=$DOMAIN AGNES_TAG=$IMAGE_TAG diff --git a/pyproject.toml b/pyproject.toml index aeea0c7..5146d2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.15.0" +version = "0.17.0" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/services/scheduler/__main__.py b/services/scheduler/__main__.py index 7f128ec..5822855 100644 --- a/services/scheduler/__main__.py +++ b/services/scheduler/__main__.py @@ -1,12 +1,20 @@ """Scheduler service — replaces systemd timers. -Lightweight sidecar that fires scheduled jobs. Two job kinds: - - "http": POST/GET an endpoint on the main app (e.g. data-refresh). - - "fn": call a Python function in-process (e.g. marketplaces sync). +Lightweight sidecar that fires scheduled jobs over HTTP against the main +app. Authenticates with ``SCHEDULER_API_TOKEN`` (shared-secret synthetic +admin — see ``app.auth.scheduler_token``); falls back to no-auth in +LOCAL_DEV_MODE. -Schedules are strings parsed by src.scheduler.is_table_due — accepts +Schedules are strings parsed by ``src.scheduler.is_table_due`` — accepts "every 15m", "every 1h", "daily 03:00", "daily 07:00,13:00". +Why every job is HTTP and nothing runs in-process: the scheduler container +shares ``/data/state/system.duckdb`` with the app container, but DuckDB +permits only one writer per file across processes. An in-process call +from the scheduler raced the app's long-lived handle and 500-ed on +``Could not set lock on file``. Going through HTTP makes the app the sole +writer; the scheduler is reduced to a pure cron clock. + Usage: python -m services.scheduler """ @@ -36,21 +44,17 @@ _token_warning_emitted = False def _get_auth_token() -> str: """Return the bearer token for API calls. - Production: ``SCHEDULER_API_TOKEN`` env var carries a long-lived PAT - minted via ``/tokens`` for a service-account user with the roles the - jobs need (typically ``core.admin`` for sync triggers). Set it. + Production: ``SCHEDULER_API_TOKEN`` is a shared secret generated by the + Terraform startup script and written to ``/opt/agnes/.env``. Both the + ``app`` and ``scheduler`` containers source the same .env via Docker + Compose ``env_file:``, so the secret is symmetric. The app validates + incoming Bearer tokens against this env var (constant-time compare in + ``app.auth.scheduler_token``) and resolves matches to a synthetic + ``scheduler@system.local`` user that is a member of the Admin group. Dev / LOCAL_DEV_MODE: leave it unset. The scheduler returns the empty string and calls the API without an ``Authorization`` header — the API's dev-bypass auto-authenticates the request as the dev user. - - The previous implementation tried to auto-fetch a token by POSTing to - ``/auth/token`` with just the seed admin's email. That endpoint - requires email + password (or rejects external-auth accounts that - have no local password), so the call always 401-ed and the scheduler - log was noisy with one access-log line per cron tick. Removed in - favor of explicit configuration: either set the PAT or rely on - LOCAL_DEV_MODE. """ global _token_warning_emitted if SCHEDULER_API_TOKEN: @@ -65,24 +69,23 @@ def _get_auth_token() -> str: return "" -def _marketplaces_job(): - """Entry point for the nightly marketplaces sync. - - Imported lazily so the scheduler container still starts even if the - module has an import-time issue in development — a failure here only - kills one job, not the whole loop. - """ - from src.marketplace import sync_marketplaces - return sync_marketplaces() - - -# Schedule definitions: (name, schedule_string, kind, target) -# kind = "http" -> target = (endpoint, method) -# kind = "fn" -> target = callable_returning_any +# Schedule definitions: (name, schedule_string, endpoint, method, timeout_sec). +# All jobs are HTTP — see the module docstring for why nothing runs +# in-process anymore. ``daily 03:00`` for marketplaces matches the cadence +# the previous in-process job used; the endpoint is admin-only and +# idempotent (it iterates the registry and per-marketplace errors do not +# abort the run). +# +# timeout_sec: per-job override for the httpx call. Marketplaces gets a +# generous 15 min because the app handler iterates every registered +# marketplace under a single lock with up to 300s of git timeout per +# entry — at 120s (the default that data-refresh uses) a real-world +# registry of more than 2-3 slow repos times out the scheduler call, +# which then re-fires on the next 30s tick and queues a redundant sync. JOBS = [ - ("data-refresh", "every 15m", "http", ("/api/sync/trigger", "POST")), - ("health-check", "every 5m", "http", ("/api/health", "GET")), - ("marketplaces", "daily 03:00", "fn", _marketplaces_job), + ("data-refresh", "every 15m", "/api/sync/trigger", "POST", 120), + ("health-check", "every 5m", "/api/health", "GET", 30), + ("marketplaces", "daily 03:00", "/api/marketplaces/sync-all", "POST", 900), ] _running = True @@ -94,7 +97,7 @@ def _signal_handler(sig, frame): _running = False -def _call_api(endpoint: str, method: str = "POST") -> bool: +def _call_api(endpoint: str, method: str, timeout_sec: int) -> bool: """Call the main app API. Returns True on success.""" url = f"{API_URL}{endpoint}" headers = {} @@ -103,9 +106,9 @@ def _call_api(endpoint: str, method: str = "POST") -> bool: headers["Authorization"] = f"Bearer {token}" try: if method == "POST": - resp = httpx.post(url, headers=headers, timeout=120) + resp = httpx.post(url, headers=headers, timeout=timeout_sec) else: - resp = httpx.get(url, headers=headers, timeout=30) + resp = httpx.get(url, headers=headers, timeout=timeout_sec) if resp.status_code < 400: logger.info(f"Job {endpoint}: {resp.status_code}") return True @@ -117,17 +120,6 @@ def _call_api(endpoint: str, method: str = "POST") -> bool: return False -def _call_fn(label: str, fn) -> bool: - """Run an in-process callable. Returns True on success.""" - try: - result = fn() - logger.info("Job %s OK: %s", label, result) - return True - except Exception as e: - logger.error("Job %s failed: %s", label, e) - return False - - def run(): signal.signal(signal.SIGTERM, _signal_handler) signal.signal(signal.SIGINT, _signal_handler) @@ -140,18 +132,11 @@ def run(): while _running: now_iso = datetime.now(timezone.utc).isoformat() - for name, schedule, kind, target in JOBS: + for name, schedule, endpoint, method, timeout_sec in JOBS: if not is_table_due(schedule, last_run[name]): continue logger.info("Running job: %s (%s)", name, schedule) - if kind == "http": - endpoint, method = target - ok = _call_api(endpoint, method) - elif kind == "fn": - ok = _call_fn(name, target) - else: - logger.error("Unknown job kind %r for %s", kind, name) - ok = False + ok = _call_api(endpoint, method, timeout_sec) if ok: last_run[name] = now_iso # 30s tick is plenty: interval jobs have minute-level resolution, diff --git a/tests/test_auth_scheduler_token.py b/tests/test_auth_scheduler_token.py new file mode 100644 index 0000000..4938a63 --- /dev/null +++ b/tests/test_auth_scheduler_token.py @@ -0,0 +1,153 @@ +"""Tests for the SCHEDULER_API_TOKEN shared-secret auth path.""" + +import tempfile + +import pytest + + +@pytest.fixture +def fresh_db(monkeypatch): + """Isolated DuckDB + JWT secret per test, mirroring tests/test_pat.py.""" + with tempfile.TemporaryDirectory() as tmp: + monkeypatch.setenv("DATA_DIR", tmp) + monkeypatch.setenv("TESTING", "1") + monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!") + # Clean slate — clear any inherited token from the host shell. + monkeypatch.delenv("SCHEDULER_API_TOKEN", raising=False) + # Force pristine state — earlier tests in the same session may have + # opened the singleton; drop it so the new DATA_DIR takes effect. + from src.db import close_system_db + close_system_db() + yield tmp + close_system_db() + + +def test_is_scheduler_token_disabled_when_env_unset(fresh_db, monkeypatch): + """Empty SCHEDULER_API_TOKEN must disable the auth path entirely. + + A bug here would let any caller authenticate with empty Bearer "" — the + constant-time compare would also be empty — granting admin to anyone. + """ + from app.auth.scheduler_token import is_scheduler_token + + monkeypatch.delenv("SCHEDULER_API_TOKEN", raising=False) + assert is_scheduler_token("") is False + assert is_scheduler_token("anything") is False + + +def test_is_scheduler_token_disabled_when_env_too_short(fresh_db, monkeypatch): + """Operator typo (SCHEDULER_API_TOKEN=todo) must NOT grant admin. + + The minimum length floor exists specifically to prevent a 4-char bearer + from accidentally matching a 4-char misconfigured secret. + """ + from app.auth.scheduler_token import is_scheduler_token + + monkeypatch.setenv("SCHEDULER_API_TOKEN", "too-short") + assert is_scheduler_token("too-short") is False + + +def test_is_scheduler_token_matches_only_exact_value(fresh_db, monkeypatch): + from app.auth.scheduler_token import is_scheduler_token + + secret = "x" * 64 # > min length + monkeypatch.setenv("SCHEDULER_API_TOKEN", secret) + assert is_scheduler_token(secret) is True + assert is_scheduler_token(secret + "trailing") is False + assert is_scheduler_token(secret[:-1]) is False + assert is_scheduler_token("y" * 64) is False + + +def test_ensure_scheduler_user_seeds_user_and_admin_membership(fresh_db, monkeypatch): + """First call seeds; second call is a no-op idempotent re-add.""" + from app.auth.scheduler_token import ( + SCHEDULER_USER_EMAIL, + ensure_scheduler_user, + ) + from src.db import SYSTEM_ADMIN_GROUP, get_system_db + + conn = get_system_db() + try: + user1 = ensure_scheduler_user(conn) + assert user1["email"] == SCHEDULER_USER_EMAIL + # Admin group membership exists. + admin_group = conn.execute( + "SELECT id FROM user_groups WHERE name = ?", [SYSTEM_ADMIN_GROUP], + ).fetchone() + assert admin_group is not None + membership = conn.execute( + "SELECT 1 FROM user_group_members WHERE user_id = ? AND group_id = ?", + [user1["id"], admin_group[0]], + ).fetchone() + assert membership is not None + + # Second call — same id, no duplicate membership row. + user2 = ensure_scheduler_user(conn) + assert user2["id"] == user1["id"] + rows = conn.execute( + "SELECT COUNT(*) FROM user_group_members WHERE user_id = ? AND group_id = ?", + [user1["id"], admin_group[0]], + ).fetchone() + assert rows[0] == 1 + finally: + conn.close() + + +def test_get_scheduler_user_lazy_seeds_when_absent(fresh_db, monkeypatch): + """First lookup with no prior seed should provision on demand. + + The startup hook in app.main also seeds eagerly, but the scheduler may + present the token before main.py has finished its lifespan setup on a + cold boot — get_scheduler_user must close that gap. + """ + from app.auth.scheduler_token import ( + SCHEDULER_USER_EMAIL, + get_scheduler_user, + ) + from src.db import get_system_db + from src.repositories.users import UserRepository + + conn = get_system_db() + try: + # Confirm user does not exist before the call. + assert UserRepository(conn).get_by_email(SCHEDULER_USER_EMAIL) is None + user = get_scheduler_user(conn) + assert user is not None + assert user["email"] == SCHEDULER_USER_EMAIL + finally: + conn.close() + + +def test_require_session_token_rejects_scheduler_secret(fresh_db, monkeypatch): + """The shared scheduler secret must NOT pass `require_session_token`. + + /auth/tokens (PAT minting) is gated by `require_session_token`, which + historically rejected only PATs (JWTs with typ=pat). The scheduler + secret is opaque so verify_token() returns None and the PAT-claim + check would silently pass — letting a compromised secret forge + persistent PATs that survive a rotation. Regression guard for the + Devin review on PR #127. + """ + import asyncio + from unittest.mock import MagicMock + + from fastapi import HTTPException + + from app.auth.dependencies import require_session_token + + secret = "x" * 64 + monkeypatch.setenv("SCHEDULER_API_TOKEN", secret) + + request = MagicMock() + request.headers = {"authorization": f"Bearer {secret}"} + request.cookies = {} + + user = {"id": "scheduler-id", "email": "scheduler@system.local"} + try: + asyncio.run(require_session_token(request=request, user=user)) + except HTTPException as exc: + assert exc.status_code == 403 + # Detail should signal "interactive only", flavor doesn't matter. + assert "interactive" in exc.detail.lower() + else: + raise AssertionError("require_session_token must reject scheduler secret")