* fix: redirect unauthenticated HTML routes to /login (#10) * docs(plan): user mgmt + PAT + CLI distribution implementation plan (#9 #10 #11 #12) * build(docker): produce wheel artifact for /cli/download (#9) * feat(db): schema v5 — users.active + deactivated_at/by (#11) * feat(api): /cli/download wheel + /cli/install.sh with baked server URL (#9) * feat(users): repository supports active flag + count_admins (#11) * feat(ui): /install page with per-deployment install instructions (#9) * feat(api): user PATCH/reset-password/set-password/activate/deactivate (#11) * fix(cli): da login prompts for password and sends it in body (#9) * test(api): safeguard tests for self-deactivate and last admin (#11) * feat(auth): reject requests from deactivated users (#11) * fixup(#10): propagate next through /login buttons + lock down sanitizer tests * feat(cli): da admin set-role/activate/deactivate/reset-password/set-password (#11) * feat(ui): /admin/users management page (#11) * feat(db): schema v6 — personal_access_tokens (#12) * feat(users): access_tokens repository (#12) * feat(auth): JWT carries typ (session|pat) and explicit jti (#12) * feat(auth): reject revoked/expired PATs; update last_used_at (#12) * feat(api): /auth/tokens CRUD + admin revoke; session-only guard (#12) * feat(cli): da auth token create/list/revoke (#12) * feat(ui): /profile page with PAT create/list/revoke (#12) * docs: PAT usage and session/PAT TTL clarification (#12) * feat(auth): PAT first-use-from-new-IP audit + last_used_ip (schema v7) (#12) Closes remaining acceptance gap from issue #12: audit_log entry on first use of a PAT from an IP that differs from the recorded last_used_ip. - schema v7: personal_access_tokens.last_used_ip column - AccessTokenRepository.mark_used now stores the client IP - get_current_user extracts client IP (X-Forwarded-For first hop, fallback to request.client.host) and emits a token.first_use_new_ip audit when the IP changes on a subsequent use (not the very first use) - tests: new-ip audit, same-ip no-op, first-ever-use no-op, schema v7 column * fix: address Devin review findings on PR #28 - app/main.py: exclude /auth/* from HTML redirect handler so JSON endpoints under /auth/ (PAT CRUD used by `da auth token` CLI) keep their 401 JSON contract (Devin #1, bug) - app/api/tokens.py: reject expires_in_days <= 0 explicitly; use `is not None` so 0 no longer silently creates a non-expiring token (Devin #2) - app/api/users.py: validate role against Role enum in create_user to match update_user and prevent 500 on role-protected requests later (Devin #3) - app/web/templates/admin_users.html: escape user-supplied strings before innerHTML; move onclick handlers to addEventListener via data attributes so emails with quotes / HTML no longer break the UI or enable stored XSS (Devin #4) - app/auth/router.py, app/auth/providers/{password,google}.py: reject deactivated users at login instead of issuing a JWT that would then fail on the next request — removes the confusing redirect loop (Devin #5) - CLAUDE.md: document schema v7 instead of stale v4 (Devin #6) - tests/test_web_ui.py: regression test for the /auth/* JSON 401 * feat(web): add /profile and /admin/users links to dashboard nav * feat(web): point setup banner at /install page * chore(web): drop unused setup_instructions context * fix: address Devin review round 2 on PR #28 - app/api/tokens.py: when expires_in_days is None (the "never" option), use a ~100-year JWT expiry so the token doesn't silently die in 24h via the session-default fallback in create_access_token. The real expiry enforcement stays in verify_token's DB-level check (Devin 🔴) - app/web/templates/profile.html: escape t.name and other user-supplied strings via esc() helper before innerHTML, same pattern as admin_users.html. Move revoke onclick to data-attribute + addEventListener (Devin 🟡) - app/api/cli_artifacts.py: use `mktemp -d` with X's at end of template for GNU/BSD portability, place wheel inside the temp dir and clean up with rm -rf (Devin 🚩) * feat(web): redesign /install page; make curl one-liner primary, collapse manual Rebuild the public /install page using the dashboard visual language (shared header, card layout, gradient hero, design tokens from style-custom.css). The page is now anchored on the one-liner install path: curl -fsSL <server>/cli/install.sh | bash is rendered as the primary, prominent step 1, while the old manual wheel-download flow is tucked behind a closed-by-default <details> block for users in restricted/offline environments. Information architecture: hero (server URL + version) -> step 1: quick install (one-liner, big Copy button) -> step 2: create PAT on /profile + export DA_TOKEN / da auth whoami -> step 3: Claude Code / MCP via ~/.config/da/token.json -> collapsed "Manual install" details for download-wheel flow -> footer link to docs/HEADLESS_USAGE.md Every shell snippet has a vanilla-JS "Copy" button that confirms visually ("Copied!" for 1.5s) and falls back to textarea+execCommand on non-secure contexts. No new dependencies, no bundler. The route now also pulls an optional user so the header shows the same nav (Dashboard / Profile / Logout) as dashboard.html when a session exists, while staying fully public when signed out. * fix(cli): use real wheel filename in install.sh (broken pip/uv install) The installer wrote the downloaded wheel as agnes_cli.whl, which lacks a PEP-427 version component — both pip and uv tool install reject it and abort the one-liner. Use curl -OJ so Content-Disposition determines the on-disk filename, then resolve it via glob. Install an EXIT trap to remove the tmpdir even when install fails. * fix(web): correct manual install wheel glob and add PEP 668 / PATH hints - Wheel glob is agnes_the_ai_analyst-*.whl (not agnes-*.whl) — the old pattern never matched the real artefact name from the build. - Add — or — separator between uv tool install and pip install. - Warn that pip install --user is blocked on macOS Homebrew / modern Debian (PEP 668) and recommend uv tool install as the default path. - Both flows now show the ~/.local/bin PATH hint so a fresh shell can find the da binary after install. * fix(web): consistent session.user reference in install header The avatar-letter fallback inside {% if session.user %} was reading user.name / user.email directly, but the route dependency can pass user=None — those references resolved to an empty FlexDict and produced an empty avatar circle. Read everything through session.user to match the guard and the dashboard pattern. * fix(web): point headless usage link at GitHub source /docs/HEADLESS_USAGE.md 404s — no static route serves repo docs. Point the footer link at the rendered markdown on GitHub instead of adding a dedicated docs serving route just for one file. * feat(web): /install hero size, anon sign-in banner, step 2 copy polish - Bump hero h1 from 26px to 30px to match dashboard primary scale. - Anonymous visitors see a small sign-in banner above Step 2 (creating a token requires auth; without the banner the flow appears stuck). - Add an 'After generating your token' section label inside Step 2 so the /profile CTA button no longer looks wedged mid-sentence between adjacent paragraphs. * chore(web): /install a11y + version pill polish - aria-live='polite' on copy buttons so screen readers announce the 'Copied!' state change. - Replace redundant INSTANCE_NAME eyebrow (already in the header logo) with 'Getting started'. - Hide the version pill when AGNES_VERSION is unset/'dev' — avoids the misleading 'vdev' label in local/unbuilt runs. - Manual summary focus-visible outline-offset +2px (was -2px which clipped inside the card), and mark the chevron as decorative. * fix(web): use session.user in dashboard avatar fallback Inside {% if session.user %} guard, the avatar fallback referenced (user.name or user.email). If user is None the block crashes when the profile picture is absent. Align with the guard variable. * fix: address Devin review round 3 on PR #28 - app/api/users.py: stop auto-sending email from reset_password. The magic-link sender would deliver a "Login Link" that — when clicked — consumes the reset_token via verify_magic_link and logs the user in WITHOUT prompting for a new password. Admins now share the raw reset_token from the API response manually, or use set-password directly. email_sent is always False. Documented inline. (Devin 🟡) - app/api/cli_artifacts.py: harden /cli/install.sh generation against shell injection via Host header or AGNES_VERSION. base_url is validated against a strict scheme+host+port regex; version against an alnum + dot/dash/underscore allowlist. Both values are also piped through shlex.quote() as defense in depth. (Devin 🟡) The shared users.reset_token column between magic-link and password- reset flows (Devin 🚩) remains an architectural gap; splitting into separate columns needs schema v8 and is tracked for a follow-up PR. * docs, chore(grpn): manual-deploy helpers + hackathon deploy learnings Adds scripts/grpn/ — Makefile + agnes-auto-upgrade.sh + README for operating Agnes on GRPN's existing foundryai-development VM when the full Terraform flow is blocked by org policies: - iam.disableServiceAccountKeyCreation (org constraint) forbids SA JSON keys, so GCP_SA_KEY-based CI is unavailable - No projectIamAdmin delegation → bootstrap-gcp.sh can't grant roles - Secret Manager IAM bindings require setIamPolicy which editor lacks Helper targets: deploy, deploy-tag, recreate, restart, stop, start, status, version, logs, ps, env, ssh, tunnel, open, bootstrap-admin, set-data-source, install-cron, uninstall-cron. docs/superpowers/plans/2026-04-22-grpn-deploy-learnings.md — running log of all org-policy constraints hit during the hackathon deploy, with workarounds and derived follow-ups (WIF support, external_ip variable, customer onboarding IAM checklist). Not a replacement for the TF flow — stopgap until WIF lands. * fix(web): make header logos clickable links to home * feat(web): one-click "Setup a new Claude Code" button Adds a single-button flow on the dashboard and /install page that generates a fresh personal access token via POST /auth/tokens and copies a complete, paste-ready setup script (server URL, token, install/verify commands) to the clipboard. Falls back to a modal textarea when the clipboard is blocked; redirects to /login on 401; surfaces backend errors inline. - dashboard.html: replaces the top "Set up your local environment" anchor with a real button wired to setupNewClaude(). Removes the duplicate bottom setup banner to keep a single entry point. - install.html: for signed-in users, Step 1 leads with the one-click button and demotes the curl one-liner into a collapsible "Or run manually" aside. Anonymous visitors still see the curl flow plus a sign-in hint. - No new deps. Vanilla JS. Token lives in memory/clipboard only — never rendered into persistent DOM. * feat(cli): add "da auth import-token" for non-interactive PAT login Writes a provided JWT into ~/.config/da/token.json using the canonical {access_token, email, role} shape expected by save_token(). Decodes the token locally to pull email/role claims, verifies it against the server via GET /api/catalog/tables, and refuses to overwrite an existing token file if the server returns 401. --email / --role overrides exist for tokens missing those claims; --skip-verify bypasses the server round-trip for offline / CI scenarios. * test(cli): cover da auth import-token success + 401 + claim-fallback paths Three new tests in TestAuthImportToken: - valid JWT + 200 -> canonical token.json written - 401 from /api/catalog/tables -> exit 1, existing token file untouched - JWT without email/role claims -> refused without overrides, accepted with --email / --role flags * feat(web): update one-click Claude setup instructions — explicit uv install, import-token, skills question Replaces the fragile `cat > token.json <<EOF` clipboard payload with an explicit, auditable sequence: 1. `curl -fsSL /cli/download` + `uv tool install --force` (no opaque `curl | bash`). 2. `da auth import-token --token ...` instead of hand-written JSON. 3. Explicit PATH persistence for zsh/bash. 4. A required question to the user about whether to copy the bundled skills into ~/.claude/skills/agnes/ or pull them on-demand via `da skills show`. 5. A final confirmation step with whoami + version output. Factored both pages to include a shared partial (app/web/templates/_claude_setup_instructions.jinja) so dashboard.html and install.html can never drift apart again. {server_url} and {token} stay as runtime placeholders substituted by renderSetupInstructions(). * feat(ui): modernize /admin/users + unify header nav across pages - New shared partial app/web/templates/_app_header.html — single source of truth for the top navigation. Used by base.html and dashboard.html (which doesn't extend base.html). Active page highlighted via request.url.path. Admin "Users" link gated by session.user.role. - style-custom.css: add .app-header / .app-nav-link / .app-btn-logout / .app-avatar styles (mirrors dashboard's previous inline copy under app-* prefix). Mobile-friendly fallback at <720px. - base.html: include the new partial so every page extending base (admin_users, profile, login_email, error, …) gets the same chrome the dashboard has. - dashboard.html: replace its inline <header class="header"> markup with the shared partial. Inline .header CSS left in place as harmless dead code (separate cleanup PR). - admin_users.html: rewritten with avatars, role pills (color-coded per role), toggle switch for active, search/filter input, toast notifications, modal dialogs replacing alert/confirm/prompt, one-click copy for the reset token, empty / loading states. All XSS-safe via the existing esc() helper + data-attribute event delegation. - tests/test_web_ui.py: smoke test that /admin/users renders the new shared header chrome and the modernized markup. * feat(api): serve CLI wheel at /cli/agnes.whl for direct uv install uv tool install inspects the URL path suffix to recognise a wheel, so /cli/download (which has no .whl suffix) cannot be installed directly. Expose a stable /cli/agnes.whl alias over the same wheel lookup so users can run: uv tool install --force https://<server>/cli/agnes.whl * test(cli): cover da auth import-token --server persisting to config.yaml The server persistence was already implemented in the import-token command (save_config({server}) call) but not covered by tests. Add an explicit test so the one-step setup contract — single import-token call writes both token and server — cannot regress. * feat(web): simpler Claude setup — single uv install URL, single import-token call User feedback: the prior clipboard payload repeated the server URL and token across multiple steps (curl + tmpfile + install + rm + separate seed-config + import-token). Collapse to: 1. uv tool install --force {server_url}/cli/agnes.whl (single URL, direct) 2. da auth import-token --token ... --server ... (one call, persists both) 3. da auth whoami 4. skills (ask user first) 5. confirm uv accepts HTTPS URLs that end in .whl and installs them directly, so the tmpfile dance is unnecessary. import-token --server already persists the server to config.yaml, so no separate printf > config.yaml step. * fix(tests): update admin users heading assertion after template rename The admin_users.html template now uses <h2 class="users-title">Users</h2> instead of <h2>User management</h2>. Update the assertion to match. * feat(ui): unify header across remaining 7 standalone pages These 7 pages render their own full <html> and don't extend base.html, so the previous unification commit only covered base + dashboard. Each had its own ad-hoc <header> markup with inconsistent classes (.top-header / .header / .page-header), inconsistent nav-link sets, and inconsistent avatar/email styling. Replace each inline <header>...</header> block with the shared {% include '_app_header.html' %} so /activity-center, /admin/permissions, /admin/tables, /catalog, /corporate-memory, /corporate-memory/admin, and /install all show the same chrome (Dashboard / Install CLI / Profile / Users / email + avatar / Logout) with the active page highlighted via request.url.path. Old inline header CSS (.header, .top-header, .page-header, .nav-link, etc.) is left in place as harmless dead code; it can be cleaned up in a follow-up sweep. * feat(web): add readable preview of Claude setup payload on dashboard + /install Move the line-by-line setup instructions into app/web/setup_instructions.py as the single source of truth, then render them in two modes from the existing _claude_setup_instructions.jinja partial: - preview_mode=True → visible, read-only <pre><code> block with the real server URL and a clearly-styled placeholder token (never a real one). - preview_mode=False → the JS SETUP_INSTRUCTIONS_TEMPLATE used by the one-click flow (unchanged behaviour). Both /dashboard (env-setup-cta card) and /install (Step 1 card) now show the preview directly under the 'Setup a new Claude Code' button so users can see exactly what will land in their clipboard before they click. * feat(web): update setup instructions — `da diagnose` step, explicit section titles Rework the Claude Code setup payload to: - Give every numbered step an unambiguous verb header ("1) Install the CLI", "2) Log in", "3) Verify the login", "4) Run diagnostics", "5) Skills (ask the user first)", "6) Confirm"). - Add step 4 `da diagnose` as the post-login health check. The CLI already ships this command (cli/commands/diagnose.py); it prints "Overall: healthy" and a list of green checks that map cleanly to next actions. - Ask the skills copy-vs-on-demand question verbatim so Claude Code always prompts the user the same way. - Replace the terse "Confirm" line with a 4-bullet summary (version, whoami, skills choice, diagnose status) so the return message is structured and comparable across setups. * chore(web): remove stale MCP card from /install (no MCP server today) The 'Use with Claude Code / MCP' card (Step 3 on /install) referenced an MCP integration Agnes does not ship. Remove the whole card. The one-click 'Setup a new Claude Code' flow in Step 1 already covers the long-lived client use case and is less confusing than dangling persistence tips for a non-existent integration. * feat(api): include user_email + last_used_ip + user_id in admin tokens list response Adds AdminTokenItem response model (superset of TokenListItem) and AccessTokenRepository.list_all_with_user() joining personal_access_tokens with users to denormalize user_email. Needed for /admin/tokens UI where admins triage tokens across all users. * feat(web): /admin/tokens page — list, filter, search, revoke across all users Adds a new admin-only page with client-side filtering (status, user email, last-used window), column sorting, counts bar (active/revoked/expired), and an inline revoke action. Mirrors the /admin/users visual language. * feat(web): add Tokens nav link for admins + deep-link from admin/users row Admin-only nav entry to /admin/tokens, and a per-row Tokens button on /admin/users that prefills the token page's user filter via ?user=<email>. * test(admin): cover /admin/tokens rendering, filter state, non-admin denial, revoke Verifies admin can render the page (title + JS hooks present), a non-admin is blocked, unauthenticated users are redirected, the admin list response includes user_email / user_id / last_used_ip, and admin can revoke another user's token. * feat(web): modern redesign of /admin/tokens — hero, stat strip, refined table, responsive cards, a11y * feat(web): ditch the table — /admin/tokens as a card stack, modern GitHub-style list Replaces the table-based layout with a stack of self-contained token cards inside a <ul role=list>. Each card is a flex row: avatar + name/meta on the left, last-used block in the middle, status pill + outlined 'Revoke' button on the right. Status and sort controls are pill-shaped toggle chips; user email search has an inline search icon. No <table>/<tr>/<th>/<td> anywhere. Responsive below 720px (card stacks vertically) and 480px (stat chips 2x2). Preserves filter IDs (flt-status, flt-user, flt-last-used) and data-revoke for existing tests. * feat(web): add /tokens (role-aware) — single page for both user PAT CRUD and admin overview - Rename admin_tokens.html -> tokens.html with a new is_admin context flag. - New route GET /tokens: renders the same card-stack UI for everyone. * Admins: loads /auth/admin/tokens, shows owner column + stat strip, keeps the owner-email search box and sort-by-owner chip. * Non-admins: loads /auth/tokens (own tokens only), hides owner column + stat chips, adds a 'New token' CTA in the hero that opens a modal (name + expires_in_days) calling POST /auth/tokens. The raw token is revealed once in a dismissable banner and cleared from the DOM on Hide. - GET /admin/tokens now 302-redirects to /tokens, preserving query string (so the /admin/users deep-link ?user=foo still works). * feat(web): /tokens full-bleed layout to match dashboard width The hero, toolbar, and card list used to sit inside base.html's .container (max-width 800px). Break out with negative horizontal margins so the page spans the viewport like /dashboard does, capped at 1440px for readability on very wide screens with a 24px gutter on each side. - No change to base.html itself. The override is scoped to .tokens-page. - body { overflow-x: hidden; } guards against rare horizontal scrollbars. - < 808px viewport: reset to natural flow (mobile already narrower). - ≥ 1488px viewport: cap to 1440px and re-center. * chore(web): remove /profile template + nav link (redirect /profile -> /tokens) The old /profile PAT CRUD page is now redundant — the modern /tokens page covers both user and admin flows. Delete the template; the router's /profile handler already 302-redirects to /tokens. Nav cleanup: - Remove the 'Profile' link. - Show a single 'Tokens' link to every signed-in user (previously only admins saw it). - Active-state matches /tokens, /admin/tokens, and /profile so the highlight survives the redirect chain. /install CTA now points at /tokens instead of /profile. * test: cover /tokens for admin + non-admin flows, /profile redirect, nav update tests/test_admin_tokens_ui.py - Point admin rendering test at /tokens directly and tighten assertions (admin-only stat strip + owner search, non-admin CTA absent). - Add test_non_admin_can_render_tokens_page: personal body, New-token CTA, create-modal, reveal banner; stat strip + owner search absent. - Add test_admin_tokens_redirects_to_tokens: 302 to /tokens, query string (?user=...) preserved for the /admin/users deep-link. - Add test_profile_redirects_to_tokens: 302 to /tokens. - Add test_non_admin_can_create_pat_via_tokens_page_api: exercises the POST /auth/tokens call that the non-admin create-modal submits. tests/test_pat.py - test_profile_page_renders -> test_profile_page_redirects_to_tokens: assert the 302 + that /tokens lands on the unified non-admin body. tests/test_web_ui.py - admin_users nav assertion: 'Tokens' link present, 'Profile' link absent. - Add test_nav_shows_tokens_link_for_non_admin: non-admins see the same 'Tokens' link (previously only admins did). - Add test_profile_redirects_to_tokens back-compat check. * feat(web): collapse 'What Claude Code will receive' by default The preview block on /dashboard and /install now uses <details>/<summary> so it is hidden by default. Click the chevron/title to expand and review the clipboard payload. Markup stays in the DOM so existing tests that assert on content continue to pass. * fix(web): /tokens width — override .container to 1280px like dashboard The negative-margin full-bleed trick was fragile and pushed content past the right edge on deployed viewports. Replace with a simple max-width override of base.html's .container on this page only, matching /dashboard's 1280px center-column layout. * feat(web): split role-aware /tokens into my_tokens.html + admin_tokens.html * feat(web): router — separate handlers for /tokens (own) and /admin/tokens (all) * feat(web): nav — show Tokens for all, add All tokens for admins * test: cover split token pages (own vs all) + admin access gating * feat(web): move 'My tokens' into a user dropdown menu Replaces the separate Tokens/email/Logout nav trio with a rounded avatar trigger that opens a dropdown containing the user's email, role, a 'My tokens' link, and Logout. Admin-only 'All tokens' stays as a top-level nav item since it's an admin function, not a personal one. Click-outside and Escape close the panel; chevron rotates on open. * fix(api): allow PATs to list/get/revoke their own tokens (CLI flow) The documented 'da auth token list/revoke' CLI flow in docs/HEADLESS_USAGE.md uses a PAT, but the previous dependency (require_session_token) returned 403. Only create_token must be session-only to prevent PAT-spawning-PAT chains; listing and revoking your own tokens is safe with a PAT. * fix(api): cap expires_in_days at 3650 to avoid datetime overflow (500 to 400) Values above ~11 million days overflowed datetime.max in datetime.now(utc) + timedelta(days=...) and surfaced as an unhandled OverflowError → 500. Cap at 10 years with a clear 400 instead; the no-expiry code path is unaffected. * fix(api): relax _SAFE_URL_RE to allow path prefixes, underscores, and IPv6 The previous regex rejected legitimate reverse-proxy base_url values (https://host/agnes/), underscores in Docker Compose hostnames, and IPv6 literals (http://[::1]:8000). Widen the charset and allow an optional trailing path. shlex.quote continues to provide defense-in-depth against any metacharacter that slips through. * fix(web): /login/email and Google OAuth propagate next_path Previously, /login/email silently dropped the ?next=<path> query param so the hidden form field rendered empty and login always landed on /dashboard. Google's button was hard-coded to /auth/google/login, ignoring next entirely. - /login page now appends ?next to the Google button URL - /login/email reads + sanitizes next, passes as template context - google_login stashes sanitized next_path in session['login_next'] - google_callback pops + re-sanitizes and redirects there Sanitization factored into app/auth/_common.safe_next_path. * fix(auth): differentiate argon2 VerifyMismatchError from internal errors in web login The previous except (VerifyMismatchError, Exception) collapsed both cases into the generic 'invalid credentials' redirect, silently hiding corrupted-hash / library errors from ops. Split the two: bad password still gets ?error=invalid; anything else logs via logger.exception and redirects with ?err=auth_internal so ops have a visible signal and users don't retry forever against a broken password_hash column. * docs: correct CLAUDE.md table name (personal_access_tokens) v7 note referenced 'access_tokens.last_used_ip' but the real table is personal_access_tokens (as mentioned two tokens earlier in the same bullet). Same-file consistency fix. * chore(web): clarify admin user-reset UI — encourage Set password over the unused reset_token POST /api/users/{id}/reset-password stores and returns a token but no endpoint consumes it — the magic-link sender would log the user in without prompting for a new password, defeating the reset. - Drop the 'Reset' row action from admin_users so admins aren't pointed at a dead end. - Rewrite the reveal-modal copy to tell admins to use Set password and explicitly note that the magic-link flow isn't available for reset tokens in this build. The API endpoint stays for API-level future use. * test: cover PAT CLI flow, expires_in_days overflow, proxy base_url, next propagation - tests/test_pat.py: PAT can list own tokens (200, was 403); PAT can revoke own tokens (204); create_token returns 400 for expires_in_days > 3650 (was 500 via datetime overflow). - tests/test_cli_artifacts.py: _SAFE_URL_RE accepts reverse-proxy path prefixes, underscores, and IPv6 literals; end-to-end check of cli_install_script with a stubbed base_url that includes a path prefix (Agnes behind /agnes/). - tests/test_web_ui.py: /login propagates ?next to the Google button URL; /login/email renders next in the hidden form field and strips hostile values; unit coverage of safe_next_path. * fix(security): use \Z instead of $ in URL/version allowlists (trailing-\n bypass) Python regex `$` also matches just before a trailing newline, so a Host header or AGNES_VERSION value like "good.example.com\n$(rm -rf /)" would slip past the allowlist. `\Z` anchors to strict end-of-string. shlex.quote downstream remains as defense-in-depth, but the allowlist is now the tight gate it claims to be. * fix(auth): PAT with null expiry omits JWT exp claim (DB is the source of truth) Previously a PAT created with `expires_in_days=null` (user-requested "never expires") set the DB `expires_at` to NULL (correct) but still baked a ~100y `exp` claim into the JWT. That is misleading: the PAT silently did expire eventually, despite the UI and API promising "no expiry". `create_access_token` now accepts `omit_exp=True` to skip the `exp` claim entirely. `app/api/tokens.py` passes that when `expires_in_days is None`. The authoritative expiry check lives in `app/auth/dependencies.py`, which reads `expires_at` from the DB row — unchanged. PyJWT accepts claim-less JWTs indefinitely. * test: cover trailing-newline regex bypass + no-exp JWT for unbounded PAT - test_safe_url_re_rejects_trailing_newline_bypass: asserts both `_SAFE_URL_RE` and `_SAFE_VERSION_RE` reject values with a trailing `\n` (previously accepted because Python `$` matches before `\n`). - test_pat_null_expiry_jwt_has_no_exp_claim: POST /auth/tokens with `expires_in_days=null`, decode the returned JWT, assert `exp` is absent while `typ=pat`, `sub`, and `jti` are still present. - test_pat_with_null_expiry_is_accepted_by_verify_token: verify_token round-trips a claim-less JWT without ExpiredSignatureError. - test_pat_null_expiry_end_to_end_allows_authenticated_request: use the null-expiry PAT against /auth/tokens and confirm it authenticates. * docs(auth): document X-Forwarded-For trust model in _client_ip Deployment runs behind Caddy which strips incoming X-Forwarded-For and sets its own, so the leftmost hop is trustworthy. Clarify that the stored last_used_ip is audit-only and never used for access control — if the app is ever exposed directly, this value becomes client-settable. * docs: /profile → /tokens in install.sh next-steps, CLI error, HEADLESS_USAGE, security skill After splitting PAT management to /tokens (with /profile as a back-compat 302), stale references remained in user-facing text. Update them to the canonical /tokens URL so shell scripts, CLI error hints, docs, and the bundled security skill are all consistent.
101 KiB
User Management + PAT + CLI Distribution Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Dodat plný scope issues #10, #11, #12 a #9 — HTML auth redirect, user management UI/API/CLI, Personal Access Tokens a CLI distribuce z docker image s install stránkou — pro produkční multi-customer nasazení Agnes.
Architecture: 4 fáze sériové tam, kde sdílejí schema migrace nebo stejný soubor. V místech, kde scope nekoliduje, běží paralelně v oddělených worktreech. Každá fáze je self-contained, má vlastní TDD tasky a commituje se průběžně.
Tech Stack: Python 3.13, FastAPI, DuckDB, Jinja2 templates (Bootstrap-like CSS), Typer CLI, PyJWT, Argon2, pytest, Docker (uv build), httpx.
Sekvence a paralelismus:
Phase 0 (#10 HTML redirect) ── 1–2 h, standalone
│
▼
Phase 1 (#11 User management) ── schema v5, API+CLI+UI, 1 den
│
▼
Phase 2 (#12 PAT) ──────────┐ ── schema v6, JWT+API+CLI+UI
├── paralelně ve 2 worktreech
Phase 3 (#9 CLI dist) ──────┘ ── Dockerfile+/cli/*+/install+login fix
Konflikt mapa (review-audited): Phase 2 a Phase 3 sdílejí 4 soubory, ne jen jeden. Všechny jsou ale malé/lokální edity a merge je triviální:
| Soubor | Phase 1 | Phase 2 | Phase 3 | Řešení |
|---|---|---|---|---|
app/main.py |
— | register tokens_router |
register cli_artifacts_router |
oba přidávají include_router, merge konflikt-free |
app/web/router.py |
/admin/users route |
/profile route |
/install route |
appendy na konec, konflikt max 1 řádek |
app/web/templates/dashboard.html |
admin nav link | profile nav link | install nav link | ve stejném <nav> bloku — označit jeden owner (Phase 3) který doplní všechny tři linky podle existujících rolí |
cli/commands/auth.py |
— | register token_app |
fix login password | sériové: Phase 3 první, Phase 2 rebase |
Pravidlo: Phase 3 se mergne první (obsahuje da login fix + dashboard nav). Phase 2 potom rebase na main. Phase 1 nemá konflikt s Phase 2/3 v nav (adminský link), ostatní soubory disjunktní.
Testování: Každá fáze končí zeleným pytest tests/ na celé sadě. Každý task má TDD cyklus: failing test → minimal impl → passing test → commit.
Phase 0 — HTML Auth Redirect (#10)
Backend get_current_user dnes hází HTTPException(401) pro všechny requesty. Browser dostane JSON místo přesměrování na /login. Cíl: rozlišit API vs. HTML request a pro HTML vracet RedirectResponse("/login").
File Structure
- Modify:
app/auth/dependencies.py— rozlišit request typ přesAcceptheader / cestu - Test:
tests/test_auth_html_redirect.py(new)
Task 0.1: Test — HTML request bez tokenu dostane 302 na /login
Files:
-
Create:
tests/test_auth_html_redirect.py -
Step 1: Write the failing test
"""Tests for #10 — unauthenticated HTML routes must redirect, not return JSON 401."""
from fastapi.testclient import TestClient
from app.main import app
def test_html_route_without_token_redirects_to_login():
"""GET /dashboard without token must return 302 to /login (not 401 JSON)."""
client = TestClient(app, follow_redirects=False)
response = client.get("/dashboard", headers={"Accept": "text/html"})
assert response.status_code == 302
assert response.headers["location"] == "/login"
def test_api_route_without_token_returns_401_json():
"""GET /api/users without token must return 401 JSON (not redirect)."""
client = TestClient(app, follow_redirects=False)
response = client.get("/api/users", headers={"Accept": "application/json"})
assert response.status_code == 401
assert response.headers["content-type"].startswith("application/json")
def test_html_route_with_bad_token_redirects_to_login():
"""Expired/invalid cookie on HTML route → redirect to /login, not JSON 401."""
client = TestClient(app, follow_redirects=False)
response = client.get(
"/dashboard",
headers={"Accept": "text/html"},
cookies={"access_token": "bogus.token.here"},
)
assert response.status_code == 302
assert response.headers["location"] == "/login"
def test_root_without_token_still_redirects_to_login():
"""Regression: `/` uses get_optional_user and must still render/redirect, not crash."""
client = TestClient(app, follow_redirects=False)
response = client.get("/", headers={"Accept": "text/html"})
# `/` redirects based on auth state; without token we go to /login
assert response.status_code == 302
assert response.headers["location"] == "/login"
- Step 2: Run test to verify it fails
Run: pytest tests/test_auth_html_redirect.py -v
Expected: first and third tests FAIL (HTTP 401 returned instead of 302); second test likely passes.
- Step 3: Implement HTML-aware auth dependency in
app/auth/dependencies.py
Replace the body of get_current_user so HTML requests get a redirect instead of a 401. Keep get_optional_user intact (it already returns None on failure).
"""FastAPI auth dependencies — current user, role checking."""
from typing import Optional
import duckdb
from fastapi import Depends, HTTPException, Header, Request, status
from fastapi.responses import RedirectResponse
from app.auth.jwt import verify_token
from src.db import get_system_db
from src.rbac import Role, ROLE_HIERARCHY
from src.repositories.users import UserRepository
def _get_db():
conn = get_system_db()
try:
yield conn
finally:
conn.close()
def _wants_html(request: Optional[Request]) -> bool:
"""True if client is a browser expecting HTML (not an API client wanting JSON)."""
if request is None:
return False
# Explicit JSON request from an API client — never redirect.
accept = request.headers.get("accept", "")
if "application/json" in accept and "text/html" not in accept:
return False
# Path heuristic — /api/* and /auth/* are never HTML surfaces.
path = request.url.path
if path.startswith("/api/") or path.startswith("/auth/"):
return False
# Everything else (browser navigations to /dashboard, /admin/*, /profile/*, etc.)
# treats text/html or */* as HTML.
return "text/html" in accept or "*/*" in accept or accept == ""
class _HTMLAuthRedirect(Exception):
"""Sentinel raised by auth dependencies to trigger redirect instead of 401."""
async def get_current_user(
request: Request = None,
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> dict:
"""Extract and validate JWT from Authorization header or cookie. Returns user dict.
HTML browser requests without a valid token get redirected to /login via an
exception handler in app/main.py (#10). API requests keep getting JSON 401.
"""
token = None
if authorization and authorization.startswith("Bearer "):
token = authorization.removeprefix("Bearer ")
if not token and request:
token = request.cookies.get("access_token")
def _fail(detail: str) -> None:
if _wants_html(request):
raise _HTMLAuthRedirect()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=detail
)
if not token:
_fail("Missing or invalid Authorization header")
payload = verify_token(token)
if not payload:
_fail("Invalid or expired token")
repo = UserRepository(conn)
user = repo.get_by_id(payload.get("sub", ""))
if not user:
_fail("User not found")
return user
async def get_optional_user(
request: Request = None,
authorization: Optional[str] = Header(None),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> Optional[dict]:
"""Like get_current_user but returns None instead of 401 if no token."""
try:
return await get_current_user(request=request, authorization=authorization, conn=conn)
except (HTTPException, _HTMLAuthRedirect):
return None
def require_role(minimum_role: Role):
"""Dependency factory: require user has at least the given role."""
async def _check(request: Request, user: dict = Depends(get_current_user)):
user_role = Role(user.get("role", "viewer"))
if ROLE_HIERARCHY.get(user_role, 0) < ROLE_HIERARCHY.get(minimum_role, 0):
if _wants_html(request):
raise _HTMLAuthRedirect()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role {minimum_role.value} or higher",
)
return user
return _check
async def require_admin(request: Request, user: dict = Depends(get_current_user)) -> dict:
"""Dependency: require user is an admin. Raises 403 or redirects on HTML requests."""
if user.get("role") != "admin":
if _wants_html(request):
raise _HTMLAuthRedirect()
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user
- Step 4: Register redirect exception handler in
app/main.py
Add inside create_app() after middleware registration, before router includes:
from fastapi import Request
from fastapi.responses import RedirectResponse
from app.auth.dependencies import _HTMLAuthRedirect
@app.exception_handler(_HTMLAuthRedirect)
async def _html_auth_redirect_handler(request: Request, exc: _HTMLAuthRedirect):
return RedirectResponse(url="/login", status_code=302)
- Step 5: Run tests — verify pass
Run: pytest tests/test_auth_html_redirect.py -v
Expected: all 3 tests PASS.
- Step 6: Run full test suite — verify no regressions
Run: pytest tests/ -x --timeout=30
Expected: no new failures.
- Step 7: Commit
git add app/auth/dependencies.py app/main.py tests/test_auth_html_redirect.py
git commit -m "fix(auth): redirect HTML requests to /login instead of JSON 401 (#10)"
Phase 1 — User Management (#11)
Scope: schema v5 (active column + audit fields), API (PATCH, reset-password, set-password, deactivate, activate) s self-lockout safeguards, CLI (5 nových da admin příkazů), UI (/admin/users stránka), runtime-kontrola active v get_current_user.
File Structure
- Modify:
src/db.py— schema v5 + migration - Modify:
src/repositories/users.py— extendedupdate,count_admins,list_all_with_active - Modify:
app/api/users.py— 5 nových endpointů + safeguards + audit - Modify:
app/auth/dependencies.py— kontrolaactive=falsevget_current_user - Modify:
cli/commands/admin.py— 5 nových příkazů, rozšířenýlist-usersoutput - Create:
app/web/templates/admin_users.html - Modify:
app/web/router.py— přidat/admin/usersroute + nav link - Modify:
app/web/templates/dashboard.html— admin nav link na user management - Test:
tests/test_user_management.py(new)
Task 1.1: Schema v5 — users.active + deactivated_at/by
Files:
-
Modify:
src/db.py:19(bumpSCHEMA_VERSION) -
Modify:
src/db.py:27-39(userstable definition) -
Modify:
src/db.py:387-426(add_V4_TO_V5_MIGRATIONS) -
Modify:
src/db.py:460-471(migration dispatch) -
Step 1: Write the failing test
# tests/test_user_management.py
"""Tests for #11 — user management (active flag, safeguards, endpoints)."""
import os
import tempfile
import pytest
import duckdb
from src.db import _ensure_schema, get_schema_version
@pytest.fixture
def fresh_db(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
yield tmp
def test_schema_v5_adds_active_column(fresh_db):
from src.db import get_system_db, close_system_db
conn = get_system_db()
try:
cols = conn.execute("PRAGMA table_info(users)").fetchall()
col_names = [c[1] for c in cols]
assert "active" in col_names
assert "deactivated_at" in col_names
assert "deactivated_by" in col_names
assert get_schema_version(conn) >= 5
finally:
conn.close()
close_system_db()
- Step 2: Run test to verify it fails
Run: pytest tests/test_user_management.py::test_schema_v5_adds_active_column -v
Expected: FAIL with "active column not found" or schema version < 5.
- Step 3: Bump
SCHEMA_VERSIONand update users DDL
In src/db.py:
SCHEMA_VERSION = 5
Update the users table in _SYSTEM_SCHEMA:
CREATE TABLE IF NOT EXISTS users (
id VARCHAR PRIMARY KEY,
email VARCHAR UNIQUE NOT NULL,
name VARCHAR,
role VARCHAR DEFAULT 'analyst',
password_hash VARCHAR,
setup_token VARCHAR,
setup_token_created TIMESTAMP,
reset_token VARCHAR,
reset_token_created TIMESTAMP,
active BOOLEAN NOT NULL DEFAULT TRUE,
deactivated_at TIMESTAMP,
deactivated_by VARCHAR,
created_at TIMESTAMP DEFAULT current_timestamp,
updated_at TIMESTAMP
);
Add migration list below _V3_TO_V4_MIGRATIONS:
_V4_TO_V5_MIGRATIONS = [
"ALTER TABLE users ADD COLUMN IF NOT EXISTS active BOOLEAN NOT NULL DEFAULT TRUE",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS deactivated_at TIMESTAMP",
"ALTER TABLE users ADD COLUMN IF NOT EXISTS deactivated_by VARCHAR",
]
Extend the migration dispatch in _ensure_schema (inside the else branch at the current if current < 4: block):
if current < 5:
for sql in _V4_TO_V5_MIGRATIONS:
conn.execute(sql)
- Step 4: Run test to verify it passes
Run: pytest tests/test_user_management.py::test_schema_v5_adds_active_column -v
Expected: PASS.
- Step 4b: Backfill test — upgrade from v4 with existing rows
Append to tests/test_user_management.py:
def test_schema_v5_backfill_keeps_existing_users_active(fresh_db):
"""Simulate upgrading from v4: insert a user pre-migration, verify active=TRUE afterwards."""
import uuid
import duckdb as _duckdb
from pathlib import Path
# 1. Create a v4-era DB by hand.
db_dir = Path(fresh_db) / "state"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "system.duckdb"
conn = _duckdb.connect(str(db_path))
try:
conn.execute("CREATE TABLE schema_version (version INTEGER NOT NULL, applied_at TIMESTAMP DEFAULT current_timestamp)")
conn.execute("INSERT INTO schema_version (version) VALUES (4)")
conn.execute("""CREATE TABLE users (
id VARCHAR PRIMARY KEY, email VARCHAR UNIQUE NOT NULL,
name VARCHAR, role VARCHAR DEFAULT 'analyst',
password_hash VARCHAR, setup_token VARCHAR,
setup_token_created TIMESTAMP, reset_token VARCHAR,
reset_token_created TIMESTAMP,
created_at TIMESTAMP DEFAULT current_timestamp, updated_at TIMESTAMP)""")
uid = str(uuid.uuid4())
conn.execute("INSERT INTO users (id, email, name, role) VALUES (?, 'pre@v4', 'Pre', 'admin')", [uid])
finally:
conn.close()
# 2. Now let the app open it — schema should migrate to v5 and backfill active=TRUE.
from src.db import get_system_db, close_system_db, get_schema_version
close_system_db()
conn = get_system_db()
try:
assert get_schema_version(conn) >= 5
row = conn.execute("SELECT email, active FROM users WHERE email = 'pre@v4'").fetchone()
assert row is not None
assert row[1] is True
finally:
conn.close()
close_system_db()
Run: pytest tests/test_user_management.py::test_schema_v5_backfill_keeps_existing_users_active -v
Expected: PASS.
- Step 5: Commit
git add src/db.py tests/test_user_management.py
git commit -m "feat(db): schema v5 — users.active + deactivated_at/by (#11)"
Task 1.2: UserRepository — update supports active/deactivated_*, add count_admins & list_all
Files:
-
Modify:
src/repositories/users.py:49-58(extendupdateallowed fields) -
Modify:
src/repositories/users.py:27-32(list_all already exists; keep) -
Add:
count_admins()method -
Step 1: Write the failing test
# tests/test_user_management.py — append
def test_repository_update_accepts_active(fresh_db):
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
try:
repo = UserRepository(conn)
uid = str(uuid.uuid4())
repo.create(id=uid, email="a@b.c", name="A", role="analyst")
repo.update(id=uid, active=False, deactivated_by="admin-uuid")
row = repo.get_by_id(uid)
assert row["active"] is False
assert row["deactivated_by"] == "admin-uuid"
finally:
conn.close()
close_system_db()
def test_repository_count_admins(fresh_db):
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
conn = get_system_db()
try:
repo = UserRepository(conn)
assert repo.count_admins() == 0
repo.create(id=str(uuid.uuid4()), email="a@b.c", name="A", role="admin")
repo.create(id=str(uuid.uuid4()), email="b@b.c", name="B", role="analyst")
assert repo.count_admins() == 1
finally:
conn.close()
close_system_db()
- Step 2: Run — verify fail
Run: pytest tests/test_user_management.py -v -k "repository"
Expected: FAIL.
- Step 3: Extend repository
In src/repositories/users.py, update allowed set and set special handling for active + add count_admins. Also add deactivated_at:
def update(self, id: str, **kwargs) -> None:
allowed = {
"email", "name", "role", "password_hash", "setup_token",
"setup_token_created", "reset_token", "reset_token_created",
"active", "deactivated_at", "deactivated_by",
}
updates = {k: v for k, v in kwargs.items() if k in allowed}
if not updates:
return
updates["updated_at"] = datetime.now(timezone.utc)
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [id]
self.conn.execute(f"UPDATE users SET {set_clause} WHERE id = ?", values)
def count_admins(self, active_only: bool = True) -> int:
sql = "SELECT COUNT(*) FROM users WHERE role = 'admin'"
if active_only:
sql += " AND COALESCE(active, TRUE) = TRUE"
result = self.conn.execute(sql).fetchone()
return int(result[0]) if result else 0
- Step 4: Run — verify pass
Run: pytest tests/test_user_management.py -v -k "repository"
Expected: PASS.
- Step 5: Commit
git add src/repositories/users.py tests/test_user_management.py
git commit -m "feat(users): repository supports active flag + count_admins (#11)"
Task 1.3: API — PATCH /api/users/{id} + audit
Files:
-
Modify:
app/api/users.py -
Step 1: Failing test
# tests/test_user_management.py — append (use existing API test fixtures pattern)
from fastapi.testclient import TestClient
@pytest.fixture
def app_client(fresh_db, monkeypatch):
monkeypatch.setenv("TESTING", "1")
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
from app.main import app
return TestClient(app)
def _seed_admin(fresh_db):
"""Create an admin user and return (id, bearer_token)."""
import uuid
from src.db import get_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="admin@test", name="Admin", role="admin")
token = create_access_token(user_id=uid, email="admin@test", role="admin")
return uid, token
finally:
conn.close()
def test_patch_user_updates_role(app_client, fresh_db):
import uuid
from src.db import get_system_db
from src.repositories.users import UserRepository
admin_id, token = _seed_admin(fresh_db)
target_id = str(uuid.uuid4())
conn = get_system_db()
try:
UserRepository(conn).create(id=target_id, email="x@test", name="X", role="viewer")
finally:
conn.close()
resp = app_client.patch(
f"/api/users/{target_id}",
headers={"Authorization": f"Bearer {token}"},
json={"role": "analyst", "name": "X2"},
)
assert resp.status_code == 200
data = resp.json()
assert data["role"] == "analyst"
assert data["name"] == "X2"
- Step 2: Run — verify fail
Run: pytest tests/test_user_management.py::test_patch_user_updates_role -v
Expected: FAIL (405 Method Not Allowed or 404).
- Step 3: Implement PATCH in
app/api/users.py
Replace entire file with extended version (additions bolded conceptually):
"""User management endpoints (#11)."""
import uuid
from datetime import datetime, timezone
from typing import Optional, List
import duckdb
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from argon2 import PasswordHasher
from app.auth.dependencies import require_role, Role, _get_db
from src.repositories.users import UserRepository
from src.repositories.audit import AuditRepository
router = APIRouter(prefix="/api/users", tags=["users"])
def _audit(conn: duckdb.DuckDBPyConnection, actor_id: str, action: str, target_id: str, params: Optional[dict] = None) -> None:
try:
AuditRepository(conn).log(
user_id=actor_id,
action=action,
resource=f"user:{target_id}",
params=params,
)
except Exception:
pass # never block the endpoint on audit failure
class CreateUserRequest(BaseModel):
email: str
name: str
role: str = "analyst"
class UpdateUserRequest(BaseModel):
name: Optional[str] = None
role: Optional[str] = None
active: Optional[bool] = None
class SetPasswordRequest(BaseModel):
password: str
class UserResponse(BaseModel):
id: str
email: str
name: Optional[str]
role: str
active: bool = True
created_at: Optional[str]
deactivated_at: Optional[str] = None
def _to_response(u: dict) -> UserResponse:
return UserResponse(
id=u["id"],
email=u["email"],
name=u.get("name"),
role=u["role"],
active=bool(u.get("active", True)),
created_at=str(u.get("created_at", "")),
deactivated_at=str(u["deactivated_at"]) if u.get("deactivated_at") else None,
)
@router.get("", response_model=List[UserResponse])
async def list_users(
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
return [_to_response(u) for u in UserRepository(conn).list_all()]
@router.post("", response_model=UserResponse, status_code=201)
async def create_user(
payload: CreateUserRequest,
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
repo = UserRepository(conn)
if repo.get_by_email(payload.email):
raise HTTPException(status_code=409, detail="User with this email already exists")
user_id = str(uuid.uuid4())
repo.create(id=user_id, email=payload.email, name=payload.name, role=payload.role)
_audit(conn, user["id"], "user.create", user_id, {"email": payload.email, "role": payload.role})
created = repo.get_by_id(user_id)
return _to_response(created)
@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: str,
payload: UpdateUserRequest,
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
repo = UserRepository(conn)
target = repo.get_by_id(user_id)
if not target:
raise HTTPException(status_code=404, detail="User not found")
updates: dict = {}
if payload.name is not None:
updates["name"] = payload.name
if payload.role is not None:
# Validate role is a known value
try:
Role(payload.role)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unknown role: {payload.role}")
# Protect: don't let admin demote themselves if they are the last admin
if (
target["id"] == user["id"]
and target["role"] == "admin"
and payload.role != "admin"
and repo.count_admins(active_only=True) <= 1
):
raise HTTPException(status_code=409, detail="Cannot demote the last active admin")
updates["role"] = payload.role
if payload.active is not None:
# Protect: cannot self-deactivate
if target["id"] == user["id"] and payload.active is False:
raise HTTPException(status_code=409, detail="Cannot deactivate yourself")
# Protect: cannot deactivate the last active admin
if (
target.get("role") == "admin"
and payload.active is False
and repo.count_admins(active_only=True) <= 1
):
raise HTTPException(status_code=409, detail="Cannot deactivate the last active admin")
updates["active"] = payload.active
if payload.active is False:
updates["deactivated_at"] = datetime.now(timezone.utc)
updates["deactivated_by"] = user["id"]
else:
updates["deactivated_at"] = None
updates["deactivated_by"] = None
if updates:
repo.update(id=user_id, **updates)
_audit(conn, user["id"], "user.update", user_id, {k: v for k, v in updates.items() if k != "deactivated_at"})
return _to_response(repo.get_by_id(user_id))
@router.delete("/{user_id}", status_code=204)
async def delete_user(
user_id: str,
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
repo = UserRepository(conn)
target = repo.get_by_id(user_id)
if not target:
raise HTTPException(status_code=404, detail="User not found")
if target["id"] == user["id"]:
raise HTTPException(status_code=409, detail="Cannot delete yourself")
if target.get("role") == "admin" and repo.count_admins(active_only=True) <= 1:
raise HTTPException(status_code=409, detail="Cannot delete the last active admin")
repo.delete(user_id)
_audit(conn, user["id"], "user.delete", user_id, {"email": target["email"]})
@router.post("/{user_id}/reset-password")
async def reset_password(
user_id: str,
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Generate a reset token and (best-effort) email it to the user."""
import secrets
repo = UserRepository(conn)
target = repo.get_by_id(user_id)
if not target:
raise HTTPException(status_code=404, detail="User not found")
token = secrets.token_urlsafe(32)
repo.update(
id=user_id,
reset_token=token,
reset_token_created=datetime.now(timezone.utc),
)
_audit(conn, user["id"], "user.reset_password", user_id, {"email": target["email"]})
# Best-effort email
email_sent = False
try:
from app.auth.providers.email import _send_email, is_available
if is_available():
_send_email(target["email"], token)
email_sent = True
except Exception:
pass
return {"reset_token": token, "email_sent": email_sent}
@router.post("/{user_id}/set-password", status_code=204)
async def set_password(
user_id: str,
payload: SetPasswordRequest,
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
if not payload.password or len(payload.password) < 8:
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
repo = UserRepository(conn)
target = repo.get_by_id(user_id)
if not target:
raise HTTPException(status_code=404, detail="User not found")
ph = PasswordHasher()
repo.update(id=user_id, password_hash=ph.hash(payload.password))
_audit(conn, user["id"], "user.set_password", user_id, {"email": target["email"]})
@router.post("/{user_id}/deactivate", response_model=UserResponse)
async def deactivate_user(
user_id: str,
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
return await update_user(
user_id=user_id,
payload=UpdateUserRequest(active=False),
request=request, user=user, conn=conn,
)
@router.post("/{user_id}/activate", response_model=UserResponse)
async def activate_user(
user_id: str,
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
return await update_user(
user_id=user_id,
payload=UpdateUserRequest(active=True),
request=request, user=user, conn=conn,
)
- Step 4: Run — verify pass
Run: pytest tests/test_user_management.py::test_patch_user_updates_role -v
Expected: PASS.
- Step 5: Commit
git add app/api/users.py tests/test_user_management.py
git commit -m "feat(api): user PATCH/reset-password/set-password/activate/deactivate (#11)"
Task 1.4: Safeguards — self-deactivate / last-admin protection
Files:
-
Test:
tests/test_user_management.py— append -
Step 1: Failing tests
# tests/test_user_management.py — append
def test_cannot_self_deactivate(app_client, fresh_db):
admin_id, token = _seed_admin(fresh_db)
resp = app_client.patch(
f"/api/users/{admin_id}",
headers={"Authorization": f"Bearer {token}"},
json={"active": False},
)
assert resp.status_code == 409
assert "yourself" in resp.json()["detail"].lower()
def test_cannot_delete_last_admin(app_client, fresh_db):
admin_id, token = _seed_admin(fresh_db)
# Create a non-admin so we have ≥2 users, but admin is still the only admin.
resp = app_client.post(
"/api/users",
headers={"Authorization": f"Bearer {token}"},
json={"email": "x@test", "name": "X", "role": "viewer"},
)
x_id = resp.json()["id"]
# Try deleting the admin.
resp = app_client.delete(
f"/api/users/{admin_id}",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 409
assert "last" in resp.json()["detail"].lower()
def test_cannot_deactivate_last_admin(app_client, fresh_db):
admin_id, token = _seed_admin(fresh_db)
# Promote a helper admin then deactivate self via them? Simpler: ensure demotion rule.
# Create a second user and try to demote the current admin via PATCH.
resp = app_client.post(
"/api/users",
headers={"Authorization": f"Bearer {token}"},
json={"email": "y@test", "name": "Y", "role": "viewer"},
)
y_id = resp.json()["id"]
# Try to demote self (admin → viewer) while only admin — should fail.
resp = app_client.patch(
f"/api/users/{admin_id}",
headers={"Authorization": f"Bearer {token}"},
json={"role": "viewer"},
)
assert resp.status_code == 409
assert "admin" in resp.json()["detail"].lower()
- Step 2: Run
Run: pytest tests/test_user_management.py -v -k "safeguard or cannot"
Expected: PASS (implementation already includes safeguards in Task 1.3).
- Step 3: Commit tests
git add tests/test_user_management.py
git commit -m "test(api): safeguard tests for self-deactivate and last admin (#11)"
Task 1.5: Active-flag enforcement in get_current_user
Files:
-
Modify:
app/auth/dependencies.py -
Step 1: Failing test
# tests/test_user_management.py — append
def test_deactivated_user_cannot_authenticate(app_client, fresh_db):
"""A deactivated user's old JWT must be rejected."""
import uuid
from src.db import get_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@test", name="U", role="analyst")
token = create_access_token(user_id=uid, email="u@test", role="analyst")
UserRepository(conn).update(id=uid, active=False)
finally:
conn.close()
resp = app_client.get(
"/api/users", # any authenticated endpoint
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
)
# Deactivated — must not succeed.
assert resp.status_code in (401, 403)
- Step 2: Run — verify fail
Run: pytest tests/test_user_management.py::test_deactivated_user_cannot_authenticate -v
Expected: may PASS (if user happens to be admin denied on role) or FAIL — depending on seed. If it passes here spuriously, adjust test to use 403-guaranteed endpoint. For safety, check any auth'd endpoint; 401 from inactive check is intended.
- Step 3: Add active check in
get_current_user
In app/auth/dependencies.py, after repo lookup:
repo = UserRepository(conn)
user = repo.get_by_id(payload.get("sub", ""))
if not user:
_fail("User not found")
if not bool(user.get("active", True)):
_fail("Account deactivated")
return user
- Step 4: Run full suite
Run: pytest tests/ -x --timeout=30
Expected: no regressions.
- Step 5: Commit
git add app/auth/dependencies.py tests/test_user_management.py
git commit -m "feat(auth): reject requests from deactivated users (#11)"
Task 1.6: CLI commands — 5 new admin subcommands
Files:
-
Modify:
cli/commands/admin.py -
Test:
tests/test_cli_admin.py— append -
Step 1: Failing test
# tests/test_cli_admin.py — append
def test_admin_set_role_invokes_patch(monkeypatch):
"""`da admin set-role` sends PATCH to /api/users/{id} with role."""
import httpx
from cli.commands.admin import admin_app
from typer.testing import CliRunner
captured = {}
def fake_patch(path, json=None, **kwargs):
captured["path"] = path
captured["json"] = json
return httpx.Response(200, json={
"id": "abc", "email": "x@y.z", "name": "X",
"role": json.get("role") if json else "viewer",
"active": True, "created_at": "", "deactivated_at": None,
})
from cli import client as cli_client
monkeypatch.setattr(cli_client, "api_patch", fake_patch, raising=False)
# patch admin.api_patch too since admin.py imports names
from cli.commands import admin as admin_mod
monkeypatch.setattr(admin_mod, "api_patch", fake_patch, raising=False)
runner = CliRunner()
result = runner.invoke(admin_app, ["set-role", "abc", "analyst"])
assert result.exit_code == 0
assert captured["path"] == "/api/users/abc"
assert captured["json"] == {"role": "analyst"}
- Step 2: Run — verify fail
Run: pytest tests/test_cli_admin.py::test_admin_set_role_invokes_patch -v
Expected: FAIL (no set-role command).
- Step 3: Add
api_patchtocli/client.py
def api_patch(path: str, **kwargs) -> httpx.Response:
with get_client() as client:
return client.patch(path, **kwargs)
- Step 4: Add CLI commands to
cli/commands/admin.py
Append at the end of file:
from cli.client import api_patch
@admin_app.command("set-role")
def set_role(
user_ref: str = typer.Argument(..., help="User id or email"),
role: str = typer.Argument(..., help="viewer | analyst | km_admin | admin"),
):
"""Set a user's role."""
uid = _resolve_user_id(user_ref)
resp = api_patch(f"/api/users/{uid}", json={"role": role})
_print_user_result(resp, f"Updated role for {user_ref} → {role}")
@admin_app.command("deactivate")
def deactivate(user_ref: str = typer.Argument(..., help="User id or email")):
"""Deactivate a user (blocks login, existing tokens also rejected)."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/deactivate")
_print_user_result(resp, f"Deactivated {user_ref}")
@admin_app.command("activate")
def activate(user_ref: str = typer.Argument(..., help="User id or email")):
"""Re-activate a deactivated user."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/activate")
_print_user_result(resp, f"Activated {user_ref}")
@admin_app.command("reset-password")
def reset_password(user_ref: str = typer.Argument(..., help="User id or email")):
"""Generate a reset token (emailed if SMTP/SendGrid configured)."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/reset-password")
if resp.status_code == 200:
data = resp.json()
typer.echo(f"Reset token: {data['reset_token']}")
typer.echo(f"Email sent: {data['email_sent']}")
else:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
@admin_app.command("set-password")
def set_password(
user_ref: str = typer.Argument(..., help="User id or email"),
password: str = typer.Option(
..., prompt=True, hide_input=True, confirmation_prompt=True,
help="New password (hidden input)",
),
):
"""Set a user's password directly (force-reset flow)."""
uid = _resolve_user_id(user_ref)
resp = api_post(f"/api/users/{uid}/set-password", json={"password": password})
if resp.status_code == 204:
typer.echo(f"Password set for {user_ref}")
else:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
def _resolve_user_id(ref: str) -> str:
"""Accept either a UUID or an email; look up email → id via list."""
if "@" not in ref:
return ref
resp = api_get("/api/users")
if resp.status_code != 200:
typer.echo(f"Could not list users: {resp.text}", err=True)
raise typer.Exit(1)
for u in resp.json():
if u.get("email") == ref:
return u["id"]
typer.echo(f"User not found: {ref}", err=True)
raise typer.Exit(1)
def _print_user_result(resp, ok_msg: str) -> None:
if resp.status_code in (200, 204):
typer.echo(ok_msg)
else:
try:
detail = resp.json().get("detail", resp.text)
except Exception:
detail = resp.text
typer.echo(f"Failed: {detail}", err=True)
raise typer.Exit(1)
Also extend list-users output (replace body):
@admin_app.command("list-users")
def list_users(as_json: bool = typer.Option(False, "--json")):
"""List all users."""
resp = api_get("/api/users")
if resp.status_code != 200:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
users = resp.json()
if as_json:
typer.echo(json.dumps(users, indent=2))
else:
for u in users:
status_str = "active" if u.get("active", True) else "DEACTIVATED"
typer.echo(
f" {u['email']:30s} role={u['role']:10s} {status_str:12s} id={u['id'][:8]}"
)
- Step 5: Run tests
Run: pytest tests/test_cli_admin.py -v
Expected: all green.
- Step 6: Commit
git add cli/client.py cli/commands/admin.py tests/test_cli_admin.py
git commit -m "feat(cli): da admin set-role/activate/deactivate/reset-password/set-password (#11)"
Task 1.7: UI — /admin/users page
Files:
-
Create:
app/web/templates/admin_users.html -
Modify:
app/web/router.py— add route -
Modify:
app/web/templates/dashboard.html— nav link (admins only) -
Step 1: Create template
Create app/web/templates/admin_users.html:
{% extends "base.html" %}
{% block title %}User management — {{ config.INSTANCE_NAME }}{% endblock %}
{% block content %}
<div class="container">
<div class="d-flex justify-content-between align-items-center mb-3">
<h2>User management</h2>
<button class="btn btn-primary" onclick="openCreateModal()">+ Add user</button>
</div>
<table class="table table-striped" id="users-table">
<thead>
<tr>
<th>Email</th><th>Name</th><th>Role</th><th>Active</th>
<th>Created</th><th>Deactivated</th><th>Actions</th>
</tr>
</thead>
<tbody>
<!-- rows rendered by JS -->
</tbody>
</table>
</div>
<div class="modal" id="create-modal" style="display:none;">
<div class="modal-content">
<h3>Add user</h3>
<label>Email <input id="new-email" type="email" required></label>
<label>Name <input id="new-name" type="text"></label>
<label>Role
<select id="new-role">
<option value="viewer">viewer</option>
<option value="analyst" selected>analyst</option>
<option value="km_admin">km_admin</option>
<option value="admin">admin</option>
</select>
</label>
<button class="btn btn-primary" onclick="createUser()">Create</button>
<button class="btn btn-secondary" onclick="closeCreateModal()">Cancel</button>
</div>
</div>
<script>
const API = "/api/users";
function fmtDate(s) { return s ? s.slice(0, 19).replace("T", " ") : ""; }
async function loadUsers() {
const r = await fetch(API, {credentials: "include"});
if (!r.ok) { alert("Failed to load users: " + r.status); return; }
const users = await r.json();
const tbody = document.querySelector("#users-table tbody");
tbody.innerHTML = "";
for (const u of users) {
const tr = document.createElement("tr");
tr.innerHTML = `
<td>${u.email}</td>
<td>${u.name || ""}</td>
<td>
<select onchange="setRole('${u.id}', this.value)">
${["viewer","analyst","km_admin","admin"].map(r =>
`<option value="${r}" ${r===u.role?"selected":""}>${r}</option>`).join("")}
</select>
</td>
<td>
<input type="checkbox" ${u.active?"checked":""}
onchange="toggleActive('${u.id}', this.checked)">
</td>
<td>${fmtDate(u.created_at)}</td>
<td>${fmtDate(u.deactivated_at)}</td>
<td>
<button class="btn btn-sm" onclick="resetPassword('${u.id}')">Reset</button>
<button class="btn btn-sm" onclick="setPassword('${u.id}')">Set pwd</button>
<button class="btn btn-sm btn-danger" onclick="delUser('${u.id}','${u.email}')">Delete</button>
</td>`;
tbody.appendChild(tr);
}
}
async function setRole(id, role) {
const r = await fetch(`${API}/${id}`, {
method: "PATCH", credentials: "include",
headers: {"Content-Type":"application/json"},
body: JSON.stringify({role}),
});
if (!r.ok) { alert("Failed: " + (await r.text())); }
loadUsers();
}
async function toggleActive(id, active) {
const path = active ? "activate" : "deactivate";
const r = await fetch(`${API}/${id}/${path}`, {method: "POST", credentials: "include"});
if (!r.ok) { alert("Failed: " + (await r.text())); }
loadUsers();
}
async function resetPassword(id) {
if (!confirm("Generate a password reset token?")) return;
const r = await fetch(`${API}/${id}/reset-password`, {method: "POST", credentials: "include"});
const data = await r.json();
if (!r.ok) { alert("Failed: " + data.detail); return; }
alert(`Reset token: ${data.reset_token}\nEmail sent: ${data.email_sent}`);
}
async function setPassword(id) {
const pwd = prompt("New password (min 8 chars):");
if (!pwd) return;
const r = await fetch(`${API}/${id}/set-password`, {
method: "POST", credentials: "include",
headers: {"Content-Type":"application/json"},
body: JSON.stringify({password: pwd}),
});
if (!r.ok) { alert("Failed: " + (await r.text())); return; }
alert("Password set.");
}
async function delUser(id, email) {
if (!confirm(`Delete ${email}? This cannot be undone.`)) return;
const r = await fetch(`${API}/${id}`, {method: "DELETE", credentials: "include"});
if (!r.ok) { alert("Failed: " + (await r.text())); return; }
loadUsers();
}
function openCreateModal() { document.getElementById("create-modal").style.display = "block"; }
function closeCreateModal() { document.getElementById("create-modal").style.display = "none"; }
async function createUser() {
const email = document.getElementById("new-email").value;
const name = document.getElementById("new-name").value;
const role = document.getElementById("new-role").value;
const r = await fetch(API, {
method: "POST", credentials: "include",
headers: {"Content-Type":"application/json"},
body: JSON.stringify({email, name: name || email.split("@")[0], role}),
});
if (!r.ok) { alert("Failed: " + (await r.text())); return; }
closeCreateModal();
loadUsers();
}
loadUsers();
</script>
{% endblock %}
- Step 2: Add route in
app/web/router.py
After the existing admin_permissions_page route:
@router.get("/admin/users", response_class=HTMLResponse)
async def admin_users_page(
request: Request,
user: dict = Depends(require_role(Role.ADMIN)),
):
"""Admin page for user management."""
ctx = _build_context(request, user=user)
return templates.TemplateResponse(request, "admin_users.html", ctx)
- Step 3: Add nav link in
app/web/templates/dashboard.html
Find the existing admin nav block (search for "admin/tables" or "admin/permissions") and add /admin/users as a sibling:
{% if user.role == 'admin' %}
<a class="nav-link" href="/admin/users">Users</a>
{% endif %}
(If no dashboard admin nav block exists, skip — the page is still reachable by URL and tests will cover it.)
- Step 4: Test the route renders for admin
Add to tests/test_user_management.py:
def test_admin_users_page_renders_for_admin(app_client, fresh_db):
admin_id, token = _seed_admin(fresh_db)
resp = app_client.get(
"/admin/users",
headers={"Accept": "text/html"},
cookies={"access_token": token},
)
assert resp.status_code == 200
assert "User management" in resp.text
def test_admin_users_page_denies_non_admin(app_client, fresh_db):
import uuid
from src.db import get_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="a@test", name="A", role="analyst")
token = create_access_token(user_id=uid, email="a@test", role="analyst")
finally:
conn.close()
resp = app_client.get(
"/admin/users",
headers={"Accept": "text/html"},
cookies={"access_token": token},
follow_redirects=False,
)
# HTML request to admin-only page → 302 (to /login) for non-admin per Phase 0
assert resp.status_code in (302, 403)
- Step 5: Run
Run: pytest tests/test_user_management.py -v -k "admin_users_page"
Expected: PASS.
- Step 6: Commit
git add app/web/router.py app/web/templates/admin_users.html app/web/templates/dashboard.html tests/test_user_management.py
git commit -m "feat(ui): /admin/users management page (#11)"
Task 1.8: Phase 1 integration — full test suite + review
- Step 1: Run full suite
Run: pytest tests/ --timeout=30
Expected: all green.
- Step 2: Manual smoke
Run locally: uvicorn app.main:app --reload, visit /admin/users as admin, create+edit+delete a test user, verify deactivated user's token is rejected on an API call.
- Step 3: Request code review
Dispatch superpowers:code-reviewer agent with diff of git log --oneline main..HEAD scope; fix any blockers.
- Step 4: Optional squash/rebase
If commits are noisy, rebase-interactive to a clean sequence. Otherwise leave as-is.
Phase 2 — Personal Access Tokens (#12)
Scope: schema v6 s tabulkou personal_access_tokens, JWT rozšíření (typ, jti), endpointy /auth/tokens (session-only) + admin variant, CLI (da auth token create|list|revoke), profile UI s one-time revealem, aktualizace cli/skills/security.md.
File Structure
- Modify:
src/db.py— schema v6 +personal_access_tokens - Create:
src/repositories/access_tokens.py - Modify:
app/auth/jwt.py—typ,jtiverification for PATs - Modify:
app/auth/dependencies.py— reject revoked/expired PATs, updatelast_used_at - Create:
app/api/tokens.py—/auth/tokensCRUD (session-only) - Modify:
app/main.py— register tokens router - Create:
cli/commands/tokens.py - Modify:
cli/main.py— addtokensub-typer underauth - Modify:
cli/commands/auth.py— addtokensubcommand group hook - Create:
app/web/templates/profile.html(orprofile_tokens.html) - Modify:
app/web/router.py—/profileroute - Modify:
cli/skills/security.md— fix 24h vs 30d mismatch + add PAT section - Test:
tests/test_pat.py(new)
Task 2.1: Schema v6 — personal_access_tokens table
Files:
-
Modify:
src/db.py -
Step 1: Failing test
# tests/test_pat.py
import os
import tempfile
import pytest
@pytest.fixture
def fresh_db(monkeypatch):
with tempfile.TemporaryDirectory() as tmp:
monkeypatch.setenv("DATA_DIR", tmp)
monkeypatch.setenv("TESTING", "1")
monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
yield tmp
def test_schema_v6_creates_pat_table(fresh_db):
from src.db import get_system_db, get_schema_version, close_system_db
conn = get_system_db()
try:
cols = conn.execute("PRAGMA table_info(personal_access_tokens)").fetchall()
col_names = [c[1] for c in cols]
for expected in ("id", "user_id", "name", "token_hash", "prefix",
"scopes", "created_at", "expires_at", "last_used_at", "revoked_at"):
assert expected in col_names
assert get_schema_version(conn) >= 6
finally:
conn.close()
close_system_db()
- Step 2: Run — fail
Run: pytest tests/test_pat.py::test_schema_v6_creates_pat_table -v
- Step 3: Bump schema and add DDL
src/db.py:
SCHEMA_VERSION = 6
Append to _SYSTEM_SCHEMA just before the closing """:
CREATE TABLE IF NOT EXISTS personal_access_tokens (
id VARCHAR PRIMARY KEY,
user_id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
token_hash VARCHAR NOT NULL,
prefix VARCHAR NOT NULL,
scopes VARCHAR,
created_at TIMESTAMP NOT NULL DEFAULT current_timestamp,
expires_at TIMESTAMP,
last_used_at TIMESTAMP,
revoked_at TIMESTAMP
);
Add migration list:
_V5_TO_V6_MIGRATIONS = [
"""
CREATE TABLE IF NOT EXISTS personal_access_tokens (
id VARCHAR PRIMARY KEY,
user_id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
token_hash VARCHAR NOT NULL,
prefix VARCHAR NOT NULL,
scopes VARCHAR,
created_at TIMESTAMP NOT NULL DEFAULT current_timestamp,
expires_at TIMESTAMP,
last_used_at TIMESTAMP,
revoked_at TIMESTAMP
)
""",
]
Extend dispatch:
if current < 6:
for sql in _V5_TO_V6_MIGRATIONS:
conn.execute(sql)
- Step 4: Run — pass
Run: pytest tests/test_pat.py::test_schema_v6_creates_pat_table -v
- Step 5: Commit
git add src/db.py tests/test_pat.py
git commit -m "feat(db): schema v6 — personal_access_tokens (#12)"
Task 2.2: AccessTokenRepository
Files:
-
Create:
src/repositories/access_tokens.py -
Step 1: Failing test
# tests/test_pat.py — append
def test_access_token_repo_create_and_lookup(fresh_db):
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.access_tokens import AccessTokenRepository
conn = get_system_db()
try:
repo = AccessTokenRepository(conn)
token_id = str(uuid.uuid4())
raw = "abcdefgh" + "x" * 32
repo.create(
id=token_id,
user_id="u1",
name="laptop",
token_hash=hashlib.sha256(raw.encode()).hexdigest(),
prefix=raw[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
row = repo.get_by_id(token_id)
assert row is not None
assert row["name"] == "laptop"
assert row["prefix"] == "abcdefgh"
assert row["revoked_at"] is None
rows = repo.list_for_user("u1")
assert len(rows) == 1
repo.revoke(token_id)
assert repo.get_by_id(token_id)["revoked_at"] is not None
finally:
conn.close()
close_system_db()
def test_access_token_repo_mark_used(fresh_db):
import hashlib, uuid
from datetime import datetime, timezone
from src.db import get_system_db, close_system_db
from src.repositories.access_tokens import AccessTokenRepository
conn = get_system_db()
try:
repo = AccessTokenRepository(conn)
tid = str(uuid.uuid4())
repo.create(id=tid, user_id="u1", name="x",
token_hash=hashlib.sha256(b"r").hexdigest(), prefix="rrrrrrrr")
assert repo.get_by_id(tid)["last_used_at"] is None
repo.mark_used(tid)
assert repo.get_by_id(tid)["last_used_at"] is not None
finally:
conn.close()
close_system_db()
-
Step 2: Run — fail
-
Step 3: Implement repository
Create src/repositories/access_tokens.py:
"""Repository for personal access tokens (#12)."""
from datetime import datetime, timezone
from typing import Any, Optional, List, Dict
import duckdb
class AccessTokenRepository:
def __init__(self, conn: duckdb.DuckDBPyConnection):
self.conn = conn
def _row_to_dict(self, row) -> Optional[Dict[str, Any]]:
if not row:
return None
columns = [desc[0] for desc in self.conn.description]
return dict(zip(columns, row))
def create(
self,
id: str,
user_id: str,
name: str,
token_hash: str,
prefix: str,
expires_at: Optional[datetime] = None,
scopes: Optional[str] = None,
) -> None:
self.conn.execute(
"""INSERT INTO personal_access_tokens
(id, user_id, name, token_hash, prefix, scopes, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
[id, user_id, name, token_hash, prefix, scopes,
datetime.now(timezone.utc), expires_at],
)
def get_by_id(self, token_id: str) -> Optional[Dict[str, Any]]:
result = self.conn.execute(
"SELECT * FROM personal_access_tokens WHERE id = ?", [token_id]
).fetchone()
return self._row_to_dict(result)
def list_for_user(self, user_id: str, include_revoked: bool = True) -> List[Dict[str, Any]]:
sql = "SELECT * FROM personal_access_tokens WHERE user_id = ?"
if not include_revoked:
sql += " AND revoked_at IS NULL"
sql += " ORDER BY created_at DESC"
rows = self.conn.execute(sql, [user_id]).fetchall()
if not rows:
return []
columns = [desc[0] for desc in self.conn.description]
return [dict(zip(columns, r)) for r in rows]
def list_all(self) -> List[Dict[str, Any]]:
rows = self.conn.execute(
"SELECT * FROM personal_access_tokens ORDER BY created_at DESC"
).fetchall()
if not rows:
return []
columns = [desc[0] for desc in self.conn.description]
return [dict(zip(columns, r)) for r in rows]
def revoke(self, token_id: str) -> None:
self.conn.execute(
"UPDATE personal_access_tokens SET revoked_at = ? WHERE id = ?",
[datetime.now(timezone.utc), token_id],
)
def delete(self, token_id: str) -> None:
self.conn.execute("DELETE FROM personal_access_tokens WHERE id = ?", [token_id])
def mark_used(self, token_id: str) -> None:
self.conn.execute(
"UPDATE personal_access_tokens SET last_used_at = ? WHERE id = ?",
[datetime.now(timezone.utc), token_id],
)
-
Step 4: Run — pass
-
Step 5: Commit
git add src/repositories/access_tokens.py tests/test_pat.py
git commit -m "feat(users): access_tokens repository (#12)"
Task 2.3: JWT — typ field, helper for PAT vs session
Files:
-
Modify:
app/auth/jwt.py -
Step 1: Failing test
# tests/test_pat.py — append
def test_pat_token_carries_typ_claim(fresh_db):
from app.auth.jwt import create_access_token, verify_token
token = create_access_token(
user_id="u1", email="u@test", role="analyst",
token_id="deadbeef-1234", typ="pat",
)
payload = verify_token(token)
assert payload["typ"] == "pat"
assert payload["jti"] == "deadbeef-1234"
def test_session_token_defaults_typ(fresh_db):
from app.auth.jwt import create_access_token, verify_token
token = create_access_token(user_id="u1", email="u@test", role="analyst")
payload = verify_token(token)
# Default typ is "session".
assert payload.get("typ") == "session"
-
Step 2: Run — fail
-
Step 3: Extend
create_access_token
Replace the function body in app/auth/jwt.py:
def create_access_token(
user_id: str,
email: str,
role: str = "analyst",
expires_delta: Optional[timedelta] = None,
token_id: Optional[str] = None,
typ: str = "session",
) -> str:
"""Create a JWT. `typ` is "session" (interactive login) or "pat" (long-lived)."""
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
)
payload = {
"sub": user_id,
"email": email,
"role": role,
"typ": typ,
"exp": expire,
"iat": datetime.now(timezone.utc),
"jti": token_id or uuid.uuid4().hex,
}
return jwt.encode(payload, _get_cached_secret_key(), algorithm=ALGORITHM)
-
Step 4: Run — pass
-
Step 5: Commit
git add app/auth/jwt.py tests/test_pat.py
git commit -m "feat(auth): JWT carries typ (session|pat) and explicit jti (#12)"
Task 2.4: Auth dependency — reject revoked/expired PATs, update last_used_at
Files:
-
Modify:
app/auth/dependencies.py -
Step 1: Failing test
# tests/test_pat.py — append
def test_revoked_pat_is_rejected(fresh_db, monkeypatch):
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
token_id = str(uuid.uuid4())
raw = "secretXX" + "a" * 32
AccessTokenRepository(conn).create(
id=token_id, user_id=uid, name="ci",
token_hash=hashlib.sha256(raw.encode()).hexdigest(),
prefix=raw[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=30),
)
jwt_token = create_access_token(
user_id=uid, email="u@t", role="admin", token_id=token_id, typ="pat",
)
# Revoke
AccessTokenRepository(conn).revoke(token_id)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={"Authorization": f"Bearer {jwt_token}", "Accept": "application/json"},
)
assert resp.status_code == 401
def test_expired_pat_is_rejected_from_db(fresh_db):
"""A PAT with a past expires_at in DB is rejected even if JWT exp is in future."""
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
tid = str(uuid.uuid4())
# Past-dated expiry in DB
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="stale",
token_hash=hashlib.sha256(b"whatever").hexdigest(), prefix=tid.replace("-","")[:8],
expires_at=datetime.now(timezone.utc) - timedelta(days=1),
)
# JWT with much longer TTL so signature-level `exp` would pass
pat = create_access_token(
user_id=uid, email="u@t", role="admin",
token_id=tid, typ="pat",
expires_delta=timedelta(days=365),
)
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/api/users",
headers={"Authorization": f"Bearer {pat}", "Accept": "application/json"},
)
assert resp.status_code == 401
-
Step 2: Run — fail
-
Step 3: Add PAT validation in
app/auth/dependencies.py
Inside get_current_user, after payload = verify_token(token):
# PAT validation: check it's not revoked / expired / unknown in DB.
if payload.get("typ") == "pat":
from datetime import datetime, timezone
import hashlib
from src.repositories.access_tokens import AccessTokenRepository
tokens_repo = AccessTokenRepository(conn)
record = tokens_repo.get_by_id(payload.get("jti", ""))
if not record:
_fail("Token unknown")
if record.get("revoked_at") is not None:
_fail("Token revoked")
exp_at = record.get("expires_at")
if exp_at is not None:
if isinstance(exp_at, str):
exp_at = datetime.fromisoformat(exp_at)
if exp_at.tzinfo is None:
exp_at = exp_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) > exp_at:
_fail("Token expired")
# Defense-in-depth: stored token_hash must match sha256(bearer JWT).
# Protects against a forged-but-unrevoked JWT using a stolen key.
stored_hash = record.get("token_hash")
if stored_hash:
actual = hashlib.sha256(token.encode()).hexdigest()
if actual != stored_hash:
_fail("Token mismatch")
# Record last_used_at synchronously — acceptable cost; can batch later.
try:
tokens_repo.mark_used(payload["jti"])
except Exception:
pass
-
Step 4: Run — pass
-
Step 5: Commit
git add app/auth/dependencies.py tests/test_pat.py
git commit -m "feat(auth): reject revoked/expired PATs; update last_used_at (#12)"
Task 2.5: API — /auth/tokens CRUD (session-only)
Files:
-
Create:
app/api/tokens.py -
Modify:
app/main.py— include router -
Modify:
app/auth/dependencies.py— addrequire_session_tokendep -
Step 1: Failing test
# tests/test_pat.py — append
def test_create_pat_returns_raw_once(fresh_db):
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
sess_token = create_access_token(user_id=uid, email="u@t", role="admin") # typ=session
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {sess_token}"},
json={"name": "laptop", "expires_in_days": 30},
)
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "laptop"
assert "token" in data and data["token"] # raw token returned exactly once
# Listing returns prefix, never raw.
# Prefix is derived from the token id (jti), not the JWT string, to avoid
# all tokens having the useless "eyJhbGci" JWT-header prefix.
list_resp = client.get(
"/auth/tokens", headers={"Authorization": f"Bearer {sess_token}"},
)
assert list_resp.status_code == 200
rows = list_resp.json()
assert len(rows) == 1
assert "token" not in rows[0]
assert rows[0]["prefix"] == data["prefix"]
assert len(rows[0]["prefix"]) == 8
assert not data["prefix"].startswith("eyJ") # regression: not the JWT header
def test_pat_cannot_create_pat(fresh_db):
from fastapi.testclient import TestClient
import hashlib, uuid
from datetime import datetime, timezone, timedelta
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from src.repositories.access_tokens import AccessTokenRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
tid = str(uuid.uuid4())
raw = "abcdefgh" + "x" * 32
AccessTokenRepository(conn).create(
id=tid, user_id=uid, name="x",
token_hash=hashlib.sha256(raw.encode()).hexdigest(), prefix=raw[:8],
expires_at=datetime.now(timezone.utc) + timedelta(days=90),
)
pat = create_access_token(user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.post(
"/auth/tokens",
headers={"Authorization": f"Bearer {pat}"},
json={"name": "bad", "expires_in_days": 30},
)
assert resp.status_code == 403
-
Step 2: Run — fail
-
Step 3: Add
require_session_tokendependency
In app/auth/dependencies.py, append:
async def require_session_token(request: Request, user: dict = Depends(get_current_user)) -> dict:
"""Like get_current_user but rejects PAT — for endpoints that must not
be callable via a long-lived CI token (e.g. creating new tokens, changing password)."""
auth = request.headers.get("authorization", "")
token = None
if auth.startswith("Bearer "):
token = auth.removeprefix("Bearer ")
if not token and request:
token = request.cookies.get("access_token")
if token:
from app.auth.jwt import verify_token
payload = verify_token(token) or {}
if payload.get("typ") == "pat":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This endpoint requires an interactive session, not a PAT",
)
return user
- Step 4: Create router
app/api/tokens.py
"""Personal access token endpoints (#12)."""
import hashlib
import secrets
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional, List
import duckdb
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.auth.dependencies import require_session_token, require_role, Role, _get_db
from src.repositories.access_tokens import AccessTokenRepository
from src.repositories.audit import AuditRepository
from app.auth.jwt import create_access_token
router = APIRouter(prefix="/auth/tokens", tags=["tokens"])
admin_router = APIRouter(prefix="/auth/admin/tokens", tags=["tokens-admin"])
class CreateTokenRequest(BaseModel):
name: str
expires_in_days: Optional[int] = 90 # null = no expiry
class CreateTokenResponse(BaseModel):
id: str
name: str
prefix: str
token: str # raw token — returned exactly once
expires_at: Optional[str]
created_at: str
class TokenListItem(BaseModel):
id: str
name: str
prefix: str
created_at: str
expires_at: Optional[str]
last_used_at: Optional[str]
revoked_at: Optional[str]
def _audit(conn, actor: str, action: str, target: str, params=None):
try:
AuditRepository(conn).log(user_id=actor, action=action,
resource=f"token:{target}", params=params)
except Exception:
pass
def _row_to_item(row: dict) -> TokenListItem:
return TokenListItem(
id=row["id"], name=row["name"], prefix=row["prefix"],
created_at=str(row.get("created_at") or ""),
expires_at=str(row["expires_at"]) if row.get("expires_at") else None,
last_used_at=str(row["last_used_at"]) if row.get("last_used_at") else None,
revoked_at=str(row["revoked_at"]) if row.get("revoked_at") else None,
)
@router.post("", response_model=CreateTokenResponse, status_code=201)
async def create_token(
payload: CreateTokenRequest,
user: dict = Depends(require_session_token),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
if not payload.name.strip():
raise HTTPException(status_code=400, detail="name is required")
repo = AccessTokenRepository(conn)
token_id = str(uuid.uuid4())
expires_at = None
expires_delta = None
if payload.expires_in_days:
expires_delta = timedelta(days=payload.expires_in_days)
expires_at = datetime.now(timezone.utc) + expires_delta
# Build the JWT that embeds jti=token_id and typ=pat
jwt_token = create_access_token(
user_id=user["id"], email=user["email"], role=user["role"],
token_id=token_id, typ="pat", expires_delta=expires_delta,
)
# Prefix: first 8 chars of the jti (UUID) — uniquely identifies the token in UI
# without exposing JWT headers (which all start with "eyJhbGci…" and are useless
# for identification). The JWT itself is returned ONCE in the response body.
prefix = token_id.replace("-", "")[:8]
# token_hash = sha256(raw JWT). Used in verify_token as defense-in-depth.
token_hash = hashlib.sha256(jwt_token.encode()).hexdigest()
repo.create(
id=token_id, user_id=user["id"], name=payload.name.strip(),
token_hash=token_hash, prefix=prefix, expires_at=expires_at,
)
_audit(conn, user["id"], "token.create", token_id, {"name": payload.name})
return CreateTokenResponse(
id=token_id, name=payload.name.strip(), prefix=prefix,
token=jwt_token, # returned EXACTLY ONCE; never retrievable again
expires_at=str(expires_at) if expires_at else None,
created_at=str(datetime.now(timezone.utc)),
)
@router.get("", response_model=List[TokenListItem])
async def list_tokens(
user: dict = Depends(require_session_token),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
rows = AccessTokenRepository(conn).list_for_user(user["id"])
return [_row_to_item(r) for r in rows]
@router.get("/{token_id}", response_model=TokenListItem)
async def get_token(
token_id: str,
user: dict = Depends(require_session_token),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
row = AccessTokenRepository(conn).get_by_id(token_id)
if not row or row["user_id"] != user["id"]:
raise HTTPException(status_code=404, detail="Token not found")
return _row_to_item(row)
@router.delete("/{token_id}", status_code=204)
async def revoke_token(
token_id: str,
user: dict = Depends(require_session_token),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
repo = AccessTokenRepository(conn)
row = repo.get_by_id(token_id)
if not row or row["user_id"] != user["id"]:
raise HTTPException(status_code=404, detail="Token not found")
repo.revoke(token_id)
_audit(conn, user["id"], "token.revoke", token_id)
# Admin — list & revoke tokens across users (for incident response)
@admin_router.get("", response_model=List[TokenListItem])
async def admin_list_tokens(
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
return [_row_to_item(r) for r in AccessTokenRepository(conn).list_all()]
@admin_router.delete("/{token_id}", status_code=204)
async def admin_revoke_token(
token_id: str,
user: dict = Depends(require_role(Role.ADMIN)),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
repo = AccessTokenRepository(conn)
row = repo.get_by_id(token_id)
if not row:
raise HTTPException(status_code=404, detail="Token not found")
repo.revoke(token_id)
_audit(conn, user["id"], "token.admin_revoke", token_id, {"owner_id": row["user_id"]})
- Step 5: Register routers in
app/main.py
Add imports:
from app.api.tokens import router as tokens_router, admin_router as tokens_admin_router
Add in create_app near other include_router calls:
app.include_router(tokens_router)
app.include_router(tokens_admin_router)
- Step 6: Run — pass
Run: pytest tests/test_pat.py -v
- Step 7: Commit
git add app/api/tokens.py app/main.py app/auth/dependencies.py tests/test_pat.py
git commit -m "feat(api): /auth/tokens CRUD + admin revoke; session-only guard (#12)"
Task 2.6: CLI — da auth token create|list|revoke
Files:
-
Create:
cli/commands/tokens.py -
Modify:
cli/commands/auth.py— registertokensub-typer -
Test:
tests/test_cli_auth.py— append -
Step 1: Failing test
# tests/test_cli_auth.py — append
def test_da_auth_token_create_calls_api(monkeypatch):
import httpx
from typer.testing import CliRunner
from cli.commands.auth import auth_app
from cli.commands import tokens as tok_mod
captured = {}
def fake_post(path, json=None, **kwargs):
captured["path"] = path
captured["json"] = json
return httpx.Response(201, json={
"id": "abc", "name": json["name"], "prefix": "XXXXXXXX",
"token": "raw-token-once",
"expires_at": None, "created_at": "2026-04-21T00:00:00+00:00",
})
monkeypatch.setattr(tok_mod, "api_post", fake_post, raising=False)
runner = CliRunner()
result = runner.invoke(auth_app, ["token", "create", "--name", "laptop", "--ttl", "30d"])
assert result.exit_code == 0, result.output
assert captured["path"] == "/auth/tokens"
assert captured["json"] == {"name": "laptop", "expires_in_days": 30}
assert "raw-token-once" in result.output
-
Step 2: Run — fail
-
Step 3: Create
cli/commands/tokens.py
"""`da auth token` — manage personal access tokens (#12)."""
import json as _json
import re
from typing import Optional
import typer
from cli.client import api_post, api_get, api_delete
token_app = typer.Typer(help="Personal access tokens (long-lived CLI/CI auth)")
def _parse_ttl(ttl: Optional[str]) -> Optional[int]:
"""Parse "30d", "90d", "365d", "never" → days (int) or None."""
if not ttl or ttl.lower() in ("never", "none", "no-expiry"):
return None
m = re.fullmatch(r"(\d+)d", ttl.lower().strip())
if not m:
raise typer.BadParameter(f"Invalid TTL: {ttl}. Use e.g. 30d, 90d, 365d, or 'never'.")
return int(m.group(1))
@token_app.command("create")
def create(
name: str = typer.Option(..., "--name", help="Human label for the token"),
ttl: str = typer.Option("90d", "--ttl", help="Lifetime (e.g. 30d, 90d, 365d, never)"),
raw: bool = typer.Option(False, "--raw", help="Print only the raw token (for CI)"),
):
"""Create a new personal access token."""
body = {"name": name, "expires_in_days": _parse_ttl(ttl)}
resp = api_post("/auth/tokens", json=body)
if resp.status_code != 201:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
data = resp.json()
if raw:
typer.echo(data["token"])
return
typer.echo("Personal access token created — this is shown ONCE:")
typer.echo("")
typer.echo(f" {data['token']}")
typer.echo("")
typer.echo(f"id: {data['id']}")
typer.echo(f"name: {data['name']}")
typer.echo(f"expires: {data.get('expires_at') or 'never'}")
typer.echo("")
typer.echo("Export it so `da` can use it:")
typer.echo(f" export DA_TOKEN={data['token']}")
@token_app.command("list")
def list_tokens(as_json: bool = typer.Option(False, "--json")):
"""List your personal access tokens."""
resp = api_get("/auth/tokens")
if resp.status_code != 200:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
rows = resp.json()
if as_json:
typer.echo(_json.dumps(rows, indent=2))
return
if not rows:
typer.echo("No tokens yet. Create one with: da auth token create --name <label>")
return
typer.echo(f"{'ID':36s} {'NAME':20s} {'PREFIX':10s} {'EXPIRES':20s} {'LAST USED':20s} STATUS")
for r in rows:
status = "revoked" if r.get("revoked_at") else "active"
typer.echo(
f"{r['id']:36s} {r['name']:20s} {r['prefix']:10s} "
f"{(r.get('expires_at') or 'never'):20s} "
f"{(r.get('last_used_at') or '-'):20s} {status}"
)
@token_app.command("revoke")
def revoke(
ident: str = typer.Argument(..., help="Token id, prefix, or name"),
):
"""Revoke a token."""
resp = api_get("/auth/tokens")
if resp.status_code != 200:
typer.echo(f"Failed to list tokens: {resp.text}", err=True)
raise typer.Exit(1)
rows = resp.json()
match = next(
(r for r in rows if r["id"] == ident or r["prefix"] == ident or r["name"] == ident),
None,
)
if not match:
typer.echo(f"No token matches {ident}", err=True)
raise typer.Exit(1)
del_resp = api_delete(f"/auth/tokens/{match['id']}")
if del_resp.status_code != 204:
typer.echo(f"Failed: {del_resp.text}", err=True)
raise typer.Exit(1)
typer.echo(f"Revoked token {match['id']} ({match['name']})")
- Step 4: Wire into
cli/commands/auth.py
At the end of cli/commands/auth.py add:
from cli.commands.tokens import token_app
auth_app.add_typer(token_app, name="token")
- Step 5: Run — pass
Run: pytest tests/test_cli_auth.py -v
- Step 6: Commit
git add cli/commands/tokens.py cli/commands/auth.py tests/test_cli_auth.py
git commit -m "feat(cli): da auth token create/list/revoke (#12)"
Task 2.7: UI — /profile page with PAT section
Files:
-
Create:
app/web/templates/profile.html -
Modify:
app/web/router.py— add/profileroute -
Modify:
app/web/templates/dashboard.html— add "Profile" link in nav (any authenticated user) -
Step 1: Create template
Create app/web/templates/profile.html:
{% extends "base.html" %}
{% block title %}Profile — {{ config.INSTANCE_NAME }}{% endblock %}
{% block content %}
<div class="container">
<h2>Profile</h2>
<p>Signed in as <strong>{{ user.email }}</strong> ({{ user.role }}).</p>
<h3>Personal access tokens</h3>
<p>Long-lived tokens for CLI, CI, and headless clients.
<a href="/install">How to use a token with <code>da</code> CLI →</a>
</p>
<form id="create-form" onsubmit="createToken(event)">
<input id="new-name" type="text" placeholder="Token name (e.g. laptop, github-ci)" required>
<select id="new-ttl">
<option value="30">30 days</option>
<option value="90" selected>90 days</option>
<option value="365">1 year</option>
<option value="">never</option>
</select>
<button class="btn btn-primary" type="submit">Create token</button>
</form>
<div id="new-token-reveal" style="display:none; margin: 1em 0; padding: 1em; background: #fee3; border: 1px solid #ca0;">
<strong>Copy your token now — it will not be shown again:</strong>
<pre><code id="new-token-raw"></code></pre>
<button class="btn btn-sm" onclick="copyNewToken()">Copy</button>
<button class="btn btn-sm" onclick="dismissReveal()">Dismiss</button>
</div>
<table class="table" id="tokens-table" style="margin-top: 1em;">
<thead>
<tr><th>Name</th><th>Prefix</th><th>Created</th><th>Expires</th>
<th>Last used</th><th>Status</th><th>Actions</th></tr>
</thead>
<tbody></tbody>
</table>
</div>
<script>
async function loadTokens() {
const r = await fetch("/auth/tokens", {credentials: "include"});
if (!r.ok) { alert("Failed: " + r.status); return; }
const rows = await r.json();
const tbody = document.querySelector("#tokens-table tbody");
tbody.innerHTML = "";
for (const t of rows) {
const status = t.revoked_at ? "revoked" : "active";
const tr = document.createElement("tr");
tr.innerHTML = `
<td>${t.name}</td>
<td><code>${t.prefix}…</code></td>
<td>${t.created_at.slice(0,19).replace("T"," ")}</td>
<td>${t.expires_at ? t.expires_at.slice(0,19).replace("T"," ") : "never"}</td>
<td>${t.last_used_at ? t.last_used_at.slice(0,19).replace("T"," ") : "-"}</td>
<td>${status}</td>
<td>${t.revoked_at ? "" :
`<button class="btn btn-sm btn-danger" onclick="revokeToken('${t.id}')">Revoke</button>`}</td>`;
tbody.appendChild(tr);
}
}
async function createToken(ev) {
ev.preventDefault();
const name = document.getElementById("new-name").value;
const ttl = document.getElementById("new-ttl").value;
const body = {name, expires_in_days: ttl ? Number(ttl) : null};
const r = await fetch("/auth/tokens", {
method: "POST", credentials: "include",
headers: {"Content-Type":"application/json"},
body: JSON.stringify(body),
});
if (!r.ok) { alert("Failed: " + (await r.text())); return; }
const data = await r.json();
document.getElementById("new-token-raw").textContent = data.token;
document.getElementById("new-token-reveal").style.display = "block";
document.getElementById("new-name").value = "";
loadTokens();
}
async function revokeToken(id) {
if (!confirm("Revoke this token?")) return;
const r = await fetch(`/auth/tokens/${id}`, {method: "DELETE", credentials: "include"});
if (!r.ok) { alert("Failed: " + (await r.text())); return; }
loadTokens();
}
function copyNewToken() {
const txt = document.getElementById("new-token-raw").textContent;
navigator.clipboard.writeText(txt);
}
function dismissReveal() {
document.getElementById("new-token-reveal").style.display = "none";
document.getElementById("new-token-raw").textContent = "";
}
loadTokens();
</script>
{% endblock %}
- Step 2: Add route in
app/web/router.py
@router.get("/profile", response_class=HTMLResponse)
async def profile_page(
request: Request,
user: dict = Depends(get_current_user),
):
ctx = _build_context(request, user=user)
return templates.TemplateResponse(request, "profile.html", ctx)
- Step 3: Test
# tests/test_pat.py — append
def test_profile_page_renders(fresh_db):
from fastapi.testclient import TestClient
import uuid
from src.db import get_system_db, close_system_db
from src.repositories.users import UserRepository
from app.auth.jwt import create_access_token
from app.main import app
conn = get_system_db()
try:
uid = str(uuid.uuid4())
UserRepository(conn).create(id=uid, email="u@t", name="U", role="analyst")
token = create_access_token(user_id=uid, email="u@t", role="analyst")
finally:
conn.close()
close_system_db()
client = TestClient(app)
resp = client.get(
"/profile",
headers={"Accept": "text/html"},
cookies={"access_token": token},
)
assert resp.status_code == 200
assert "Personal access tokens" in resp.text
-
Step 4: Run — pass
-
Step 5: Commit
git add app/web/templates/profile.html app/web/router.py tests/test_pat.py
git commit -m "feat(ui): /profile page with PAT create/list/revoke (#12)"
Task 2.8: Docs — fix cli/skills/security.md 24h/30d mismatch + PAT section
Files:
-
Modify:
cli/skills/security.md -
Create:
docs/HEADLESS_USAGE.md -
Step 1: Update
cli/skills/security.md
Find the line claiming "Issued on login, valid 30 days" and correct:
Session tokens: issued on interactive login (`da login`), valid 24 hours.
For long-lived CLI / CI use, create a Personal Access Token via the UI
(`/profile` → Personal access tokens) or CLI (`da auth token create`).
PATs are revocable and auditable; session tokens are not.
- Step 2: Create
docs/HEADLESS_USAGE.md
# Headless / CI usage
For unattended clients (CI, cron, Claude Code), authenticate with a Personal Access Token (PAT) rather than an interactive session.
## Create a PAT
**Via UI:** sign in, open `/profile`, create a token. Copy the raw value — it is shown exactly once.
**Via CLI (requires an interactive session):**
```bash
da auth token create --name "github-actions" --ttl 365d --raw
The --raw flag prints only the token, suitable for piping into a secret store.
Use the PAT
Set the DA_TOKEN env var:
export DA_TOKEN=<your-token>
da query "SELECT 1"
GitHub Actions example
- name: Sync data
env:
DA_TOKEN: ${{ secrets.AGNES_TOKEN }}
DA_SERVER: https://agnes.example.com
run: |
pip install data-analyst
da sync --all
Revoke
da auth token list
da auth token revoke <id|prefix|name>
Or from /profile → Revoke.
- [ ] **Step 3: Commit**
```bash
git add cli/skills/security.md docs/HEADLESS_USAGE.md
git commit -m "docs: PAT usage and session/PAT TTL clarification (#12)"
Task 2.9: Phase 2 integration
- Step 1: Run full suite
Run: pytest tests/ --timeout=30
- Step 2: Smoke
Start server, sign in, create a PAT from /profile, use it via DA_TOKEN to run da query. Revoke it. Verify a revoked PAT fails on next call.
- Step 3: Request code review
Dispatch superpowers:code-reviewer on Phase 2 diff. Fix blockers.
Phase 3 — CLI Distribution (#9)
Scope: Dockerfile staví wheel + install skript, FastAPI vystavuje /cli/download a /cli/install.sh s base-URL zaplétaným do skriptu, /install HTML stránka s návodem, link v dashboardu, fix bugu da login nezadává heslo. Statické docs v image.
File Structure
- Modify:
Dockerfile—uv build+ stash wheel at/app/dist - Create:
app/api/cli_artifacts.py—/cli/download+/cli/install.sh - Modify:
app/main.py— register router - Create:
app/web/templates/install.html - Modify:
app/web/router.py— add/installroute - Modify:
app/web/templates/dashboard.html— link to/install - Modify:
cli/commands/auth.py— prompt for password, send in body - Test:
tests/test_cli_artifacts.py(new) - Test:
tests/test_cli_auth.py— extend
Task 3.1: Dockerfile — build wheel + bake CLI version
Files:
-
Modify:
Dockerfile -
Step 1: Replace Dockerfile
FROM python:3.13-slim
RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
ARG AGNES_VERSION=dev
ARG RELEASE_CHANNEL=dev
ARG AGNES_COMMIT_SHA=unknown
ENV AGNES_VERSION=${AGNES_VERSION}
ENV RELEASE_CHANNEL=${RELEASE_CHANNEL}
ENV AGNES_COMMIT_SHA=${AGNES_COMMIT_SHA}
WORKDIR /app
COPY . .
# Build wheel artifact (served at /cli/download)
RUN uv build --wheel --out-dir /app/dist
# Install production dependencies from pyproject.toml
RUN uv pip install --system --no-cache .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
- Step 2: Local build verification
Run: docker build -t agnes:test .
Then: docker run --rm agnes:test ls -la /app/dist/
Expected: agnes_the_ai_analyst-*.whl file exists.
- Step 3: Commit
git add Dockerfile
git commit -m "build(docker): produce wheel artifact for /cli/download (#9)"
Task 3.2: FastAPI — /cli/download + /cli/install.sh
Files:
-
Create:
app/api/cli_artifacts.py -
Modify:
app/main.py -
Step 1: Failing test
# tests/test_cli_artifacts.py
"""Tests for #9 — CLI artifact + install script endpoints."""
import os
from pathlib import Path
import tempfile
def test_cli_install_script_bakes_server_url(monkeypatch):
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app, base_url="https://agnes.example.com")
resp = client.get("/cli/install.sh", headers={"host": "agnes.example.com"})
assert resp.status_code == 200
assert resp.headers["content-type"].startswith("text/")
body = resp.text
assert "https://agnes.example.com" in body or "agnes.example.com" in body
assert "pip install" in body or "uv tool install" in body
def test_cli_download_returns_wheel_or_404(monkeypatch):
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
resp = client.get("/cli/download")
# Either serve the wheel or return a clear 404 telling where to find it.
assert resp.status_code in (200, 404)
if resp.status_code == 200:
assert resp.headers["content-disposition"].startswith("attachment")
def test_cli_download_serves_wheel_when_present(monkeypatch, tmp_path):
"""Put a fake wheel and confirm the endpoint serves it."""
wheel = tmp_path / "agnes_fake-1.0-py3-none-any.whl"
wheel.write_bytes(b"PK\x03\x04fake-wheel-bytes")
monkeypatch.setenv("AGNES_CLI_DIST_DIR", str(tmp_path))
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
resp = client.get("/cli/download")
assert resp.status_code == 200
assert resp.content.startswith(b"PK")
-
Step 2: Run — fail
-
Step 3: Implement
app/api/cli_artifacts.py
"""CLI artifact download + install script endpoints (#9)."""
import os
from pathlib import Path
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import FileResponse, PlainTextResponse
router = APIRouter(tags=["cli"])
def _dist_dir() -> Path:
return Path(os.environ.get("AGNES_CLI_DIST_DIR", "/app/dist"))
def _find_wheel() -> Path | None:
d = _dist_dir()
if not d.exists():
return None
wheels = sorted(d.glob("*.whl"))
return wheels[-1] if wheels else None
@router.get("/cli/download")
async def cli_download():
wheel = _find_wheel()
if not wheel:
raise HTTPException(
status_code=404,
detail=(
"CLI wheel not found in dist dir. Build it with `uv build --wheel` "
"or run the official docker image (which builds on image-build)."
),
)
return FileResponse(
path=str(wheel),
filename=wheel.name,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{wheel.name}"'},
)
@router.get("/cli/install.sh", response_class=PlainTextResponse)
async def cli_install_script(request: Request):
"""Shell installer — bakes this server's URL into the generated config."""
base_url = str(request.base_url).rstrip("/")
version = os.environ.get("AGNES_VERSION", "dev")
script = f"""#!/usr/bin/env bash
# Agnes CLI installer — server: {base_url}
set -euo pipefail
SERVER="{base_url}"
echo "Installing Agnes CLI from $SERVER (version: {version})"
# 1. Download the wheel
WHEEL=$(mktemp -t agnes_cli.XXXXXX.whl)
curl -fsSL "$SERVER/cli/download" -o "$WHEEL"
# 2. Install via pip (prefer uv tool install if available)
if command -v uv >/dev/null 2>&1; then
uv tool install --force "$WHEEL"
else
python3 -m pip install --user --force-reinstall "$WHEEL"
fi
rm -f "$WHEEL"
# 3. Seed the server URL in CLI config
CFG_DIR="${{DA_CONFIG_DIR:-$HOME/.config/da}}"
mkdir -p "$CFG_DIR"
cat > "$CFG_DIR/config.yaml" <<EOF
server: $SERVER
EOF
echo "Installed."
echo "Next steps:"
echo " 1. Sign in to $SERVER and create a personal access token at $SERVER/profile"
echo " 2. Export it: export DA_TOKEN=<your-token>"
echo " 3. Verify: da auth whoami"
"""
return script
- Step 4: Register in
app/main.py
from app.api.cli_artifacts import router as cli_artifacts_router
# ...
app.include_router(cli_artifacts_router)
- Step 5: Run — pass
Run: pytest tests/test_cli_artifacts.py -v
- Step 6: Commit
git add app/api/cli_artifacts.py app/main.py tests/test_cli_artifacts.py
git commit -m "feat(api): /cli/download wheel + /cli/install.sh with baked server URL (#9)"
Task 3.3: /install HTML page
Files:
-
Create:
app/web/templates/install.html -
Modify:
app/web/router.py -
Step 1: Create template
app/web/templates/install.html:
{% extends "base.html" %}
{% block title %}Install CLI — {{ config.INSTANCE_NAME }}{% endblock %}
{% block content %}
<div class="container">
<h2>Install the Agnes CLI</h2>
<p>This server: <code>{{ server_url }}</code> (version <code>{{ agnes_version }}</code>)</p>
<h3>One-liner (Linux / macOS)</h3>
<pre><code>curl -fsSL {{ server_url }}/cli/install.sh | bash</code></pre>
<h3>Manual</h3>
<ol>
<li>Download: <a href="/cli/download">{{ server_url }}/cli/download</a></li>
<li>Install:
<pre><code>uv tool install ./agnes-*.whl
# or
python3 -m pip install --user ./agnes-*.whl</code></pre>
</li>
<li>Seed server URL:
<pre><code>mkdir -p ~/.config/da
echo "server: {{ server_url }}" > ~/.config/da/config.yaml</code></pre>
</li>
</ol>
<h3>Connect</h3>
<p>Create a personal access token (see <a href="/profile">/profile</a>) and export it:</p>
<pre><code>export DA_TOKEN=<your-token>
da auth whoami</code></pre>
<h3>Claude Code / MCP</h3>
<p>
Store your token in <code>~/.config/da/token.json</code> (Claude Code
reads this automatically via the <code>da</code> entrypoint) or export
<code>DA_TOKEN</code> in your shell.
</p>
<h3>CI / headless</h3>
<p>See <a href="/docs/HEADLESS_USAGE.md" target="_blank">Headless usage guide</a>.</p>
</div>
{% endblock %}
- Step 2: Add route
In app/web/router.py:
@router.get("/install", response_class=HTMLResponse)
async def install_page(request: Request):
"""Public install instructions for the CLI."""
base_url = str(request.base_url).rstrip("/")
ctx = _build_context(
request,
server_url=base_url,
agnes_version=os.environ.get("AGNES_VERSION", "dev"),
)
return templates.TemplateResponse(request, "install.html", ctx)
- Step 3: Dashboard link
In app/web/templates/dashboard.html, find the primary nav and add:
<a class="nav-link" href="/install">Install CLI</a>
- Step 4: Test
# tests/test_cli_artifacts.py — append
def test_install_page_renders_with_server_url():
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
resp = client.get("/install", headers={"host": "agnes.test", "Accept": "text/html"})
assert resp.status_code == 200
assert "agnes.test" in resp.text
assert "da auth whoami" in resp.text
-
Step 5: Run — pass
-
Step 6: Commit
git add app/web/router.py app/web/templates/install.html app/web/templates/dashboard.html tests/test_cli_artifacts.py
git commit -m "feat(ui): /install page with per-deployment install instructions (#9)"
Task 3.4: Fix da login password prompt bug
Files:
-
Modify:
cli/commands/auth.py -
Test:
tests/test_cli_auth.py— append -
Step 1: Failing test
# tests/test_cli_auth.py — append
def test_da_login_sends_password(monkeypatch):
import httpx
from typer.testing import CliRunner
from cli.commands import auth as auth_mod
captured = {}
def fake_post(path, json=None, **kwargs):
captured["path"] = path
captured["json"] = json
return httpx.Response(200, json={
"access_token": "tok", "email": "u@t", "role": "analyst",
"user_id": "u1", "token_type": "bearer",
})
monkeypatch.setattr(auth_mod, "api_post", fake_post, raising=False)
runner = CliRunner()
# Provide email and password via stdin (typer prompts)
result = runner.invoke(auth_mod.auth_app, ["login"], input="u@t\nhunter2\n")
assert result.exit_code == 0, result.output
assert captured["path"] == "/auth/token"
assert captured["json"] == {"email": "u@t", "password": "hunter2"}
-
Step 2: Run — fail
-
Step 3: Fix
logincommand
Replace in cli/commands/auth.py:
@auth_app.command()
def login(
email: str = typer.Option(..., prompt=True, help="Your email address"),
password: str = typer.Option(
"", prompt="Password (leave empty for magic-link / OAuth accounts)",
hide_input=True, help="Your password (if the account has one)",
),
server: str = typer.Option(None, help="Server URL override"),
):
"""Login and obtain a JWT token.
Password-enabled accounts: enter the password when prompted.
Magic-link / OAuth accounts: leave the password empty — the server will
respond with guidance pointing you to the correct auth provider.
"""
if server:
import os
os.environ["DA_SERVER"] = server
body = {"email": email}
if password:
body["password"] = password
try:
resp = api_post("/auth/token", json=body)
if resp.status_code == 200:
data = resp.json()
save_token(data["access_token"], data["email"], data["role"])
typer.echo(f"Logged in as {data['email']} (role: {data['role']})")
return
# Helpful error for accounts that cannot login via password.
try:
detail = resp.json().get("detail", resp.text)
except Exception:
detail = resp.text
if resp.status_code == 401 and "external authentication" in str(detail).lower():
typer.echo(
"This account uses a magic link / OAuth provider. "
"Sign in via the web UI, open /profile, and create a personal "
"access token — then export it as DA_TOKEN.",
err=True,
)
else:
typer.echo(f"Login failed: {detail}", err=True)
raise typer.Exit(1)
except typer.Exit:
raise
except Exception as e:
typer.echo(f"Connection error: {e}", err=True)
raise typer.Exit(1)
- Step 4: Run — pass
Run: pytest tests/test_cli_auth.py::test_da_login_sends_password -v
- Step 5: Commit
git add cli/commands/auth.py tests/test_cli_auth.py
git commit -m "fix(cli): da login prompts for password and sends it in body (#9)"
Task 3.5: Phase 3 integration
- Step 1: Run full suite
Run: pytest tests/ --timeout=30
-
Step 2: Smoke
-
docker buildproduces wheel in/app/dist. -
Hitting
/cli/install.shreturns a shell script with the correct URL. -
Hitting
/installrenders install instructions with the correct base URL. -
da loginnow prompts for a password and succeeds against a password-enabled account. -
Step 3: Request code review
Dispatch superpowers:code-reviewer on Phase 3 diff.
Final Integration
Task F.1: Full suite + merge
- All three phases green:
pytest tests/ --timeout=30 - Docker build succeeds and serves the wheel + install.sh
- Manual walkthrough: new user flow (create via UI → reset password → log in → create PAT → use PAT via CLI → revoke → verify PAT rejected).
Task F.2: Coverage check against issues
Run a final verification agent that reads each of #9, #10, #11, #12 and the resulting diff, and reports any unmet acceptance criterion. Iterate until every bullet is green.
Self-Review
-
Spec coverage for #10: HTML redirect handled in Phase 0. API clients still get 401. ✅
-
Spec coverage for #11:
- Schema v5 (
active+deactivated_at/by) ✅ PATCH,POST /reset-password,POST /set-password,POST /activate,POST /deactivate, audit log on every mutation ✅- Self-deactivate + last-admin safeguards ✅
get_current_userchecksactive✅- CLI commands (set-role, activate, deactivate, reset-password, set-password) + extended
list-users✅ /admin/usersUI ✅- Tests for every bullet ✅
- Schema v5 (
-
Spec coverage for #12:
personal_access_tokensDuckDB table (schema v6) ✅- JWT
typ+jti;verify_tokenvia DB fortyp==pat✅ /auth/tokensCRUD + admin variant ✅- CLI
da auth token create|list|revoke✅ - UI profile page with one-time reveal ✅
- Audit entries on create/revoke;
last_used_atupdated (sync) ✅ cli/skills/security.mdcorrection +docs/HEADLESS_USAGE.md✅- PAT cannot create new PATs (session-only guard) ✅
-
Spec coverage for #9:
- Wheel built in Dockerfile, stored at
/app/dist✅ /cli/download+/cli/install.shwith base URL baked-in ✅/installpage ✅da loginpassword bug fix ✅- Dashboard link ✅
- Wheel built in Dockerfile, stored at
-
Placeholder scan: every code step has a full code block or exact command. No "TBD" or "implement later".
-
Type consistency:
typ("session"|"pat"),token_id→ stored asid/jti, repository field names consistent across Phase 2 tasks.
All spec bullets covered. Ready for execution handoff.