agnes-the-ai-analyst/docs/superpowers/plans/2026-04-21-user-mgmt-pat-cli.md
ZdenekSrotyr d2c76cb221
User management + PAT + CLI distribution + HTML auth redirect (#9 #10 #11 #12) (#28)
* fix: redirect unauthenticated HTML routes to /login (#10)

* docs(plan): user mgmt + PAT + CLI distribution implementation plan (#9 #10 #11 #12)

* build(docker): produce wheel artifact for /cli/download (#9)

* feat(db): schema v5 — users.active + deactivated_at/by (#11)

* feat(api): /cli/download wheel + /cli/install.sh with baked server URL (#9)

* feat(users): repository supports active flag + count_admins (#11)

* feat(ui): /install page with per-deployment install instructions (#9)

* feat(api): user PATCH/reset-password/set-password/activate/deactivate (#11)

* fix(cli): da login prompts for password and sends it in body (#9)

* test(api): safeguard tests for self-deactivate and last admin (#11)

* feat(auth): reject requests from deactivated users (#11)

* fixup(#10): propagate next through /login buttons + lock down sanitizer tests

* feat(cli): da admin set-role/activate/deactivate/reset-password/set-password (#11)

* feat(ui): /admin/users management page (#11)

* feat(db): schema v6 — personal_access_tokens (#12)

* feat(users): access_tokens repository (#12)

* feat(auth): JWT carries typ (session|pat) and explicit jti (#12)

* feat(auth): reject revoked/expired PATs; update last_used_at (#12)

* feat(api): /auth/tokens CRUD + admin revoke; session-only guard (#12)

* feat(cli): da auth token create/list/revoke (#12)

* feat(ui): /profile page with PAT create/list/revoke (#12)

* docs: PAT usage and session/PAT TTL clarification (#12)

* feat(auth): PAT first-use-from-new-IP audit + last_used_ip (schema v7) (#12)

Closes remaining acceptance gap from issue #12: audit_log entry on first use
of a PAT from an IP that differs from the recorded last_used_ip.

- schema v7: personal_access_tokens.last_used_ip column
- AccessTokenRepository.mark_used now stores the client IP
- get_current_user extracts client IP (X-Forwarded-For first hop, fallback
  to request.client.host) and emits a token.first_use_new_ip audit when the
  IP changes on a subsequent use (not the very first use)
- tests: new-ip audit, same-ip no-op, first-ever-use no-op, schema v7 column

* fix: address Devin review findings on PR #28

- app/main.py: exclude /auth/* from HTML redirect handler so JSON
  endpoints under /auth/ (PAT CRUD used by `da auth token` CLI) keep
  their 401 JSON contract (Devin #1, bug)
- app/api/tokens.py: reject expires_in_days <= 0 explicitly; use
  `is not None` so 0 no longer silently creates a non-expiring token
  (Devin #2)
- app/api/users.py: validate role against Role enum in create_user
  to match update_user and prevent 500 on role-protected requests
  later (Devin #3)
- app/web/templates/admin_users.html: escape user-supplied strings
  before innerHTML; move onclick handlers to addEventListener via
  data attributes so emails with quotes / HTML no longer break the UI
  or enable stored XSS (Devin #4)
- app/auth/router.py, app/auth/providers/{password,google}.py:
  reject deactivated users at login instead of issuing a JWT that
  would then fail on the next request — removes the confusing
  redirect loop (Devin #5)
- CLAUDE.md: document schema v7 instead of stale v4 (Devin #6)
- tests/test_web_ui.py: regression test for the /auth/* JSON 401

* feat(web): add /profile and /admin/users links to dashboard nav

* feat(web): point setup banner at /install page

* chore(web): drop unused setup_instructions context

* fix: address Devin review round 2 on PR #28

- app/api/tokens.py: when expires_in_days is None (the "never" option),
  use a ~100-year JWT expiry so the token doesn't silently die in 24h
  via the session-default fallback in create_access_token. The real
  expiry enforcement stays in verify_token's DB-level check (Devin 🔴)
- app/web/templates/profile.html: escape t.name and other user-supplied
  strings via esc() helper before innerHTML, same pattern as
  admin_users.html. Move revoke onclick to data-attribute +
  addEventListener (Devin 🟡)
- app/api/cli_artifacts.py: use `mktemp -d` with X's at end of template
  for GNU/BSD portability, place wheel inside the temp dir and
  clean up with rm -rf (Devin 🚩)

* feat(web): redesign /install page; make curl one-liner primary, collapse manual

Rebuild the public /install page using the dashboard visual language
(shared header, card layout, gradient hero, design tokens from
style-custom.css). The page is now anchored on the one-liner install
path: curl -fsSL <server>/cli/install.sh | bash is rendered as the
primary, prominent step 1, while the old manual wheel-download flow
is tucked behind a closed-by-default <details> block for users in
restricted/offline environments.

Information architecture:
  hero (server URL + version)
  -> step 1: quick install (one-liner, big Copy button)
  -> step 2: create PAT on /profile + export DA_TOKEN / da auth whoami
  -> step 3: Claude Code / MCP via ~/.config/da/token.json
  -> collapsed "Manual install" details for download-wheel flow
  -> footer link to docs/HEADLESS_USAGE.md

Every shell snippet has a vanilla-JS "Copy" button that confirms
visually ("Copied!" for 1.5s) and falls back to textarea+execCommand
on non-secure contexts. No new dependencies, no bundler.

The route now also pulls an optional user so the header shows the
same nav (Dashboard / Profile / Logout) as dashboard.html when a
session exists, while staying fully public when signed out.

* fix(cli): use real wheel filename in install.sh (broken pip/uv install)

The installer wrote the downloaded wheel as agnes_cli.whl, which lacks a
PEP-427 version component — both pip and uv tool install reject it and
abort the one-liner.

Use curl -OJ so Content-Disposition determines the on-disk filename, then
resolve it via glob. Install an EXIT trap to remove the tmpdir even when
install fails.

* fix(web): correct manual install wheel glob and add PEP 668 / PATH hints

- Wheel glob is agnes_the_ai_analyst-*.whl (not agnes-*.whl) — the old
  pattern never matched the real artefact name from the build.
- Add — or — separator between uv tool install and pip install.
- Warn that pip install --user is blocked on macOS Homebrew / modern
  Debian (PEP 668) and recommend uv tool install as the default path.
- Both flows now show the ~/.local/bin PATH hint so a fresh shell can
  find the da binary after install.

* fix(web): consistent session.user reference in install header

The avatar-letter fallback inside {% if session.user %} was reading
user.name / user.email directly, but the route dependency can pass
user=None — those references resolved to an empty FlexDict and produced
an empty avatar circle. Read everything through session.user to match
the guard and the dashboard pattern.

* fix(web): point headless usage link at GitHub source

/docs/HEADLESS_USAGE.md 404s — no static route serves repo docs. Point
the footer link at the rendered markdown on GitHub instead of adding a
dedicated docs serving route just for one file.

* feat(web): /install hero size, anon sign-in banner, step 2 copy polish

- Bump hero h1 from 26px to 30px to match dashboard primary scale.
- Anonymous visitors see a small sign-in banner above Step 2 (creating
  a token requires auth; without the banner the flow appears stuck).
- Add an 'After generating your token' section label inside Step 2 so
  the /profile CTA button no longer looks wedged mid-sentence between
  adjacent paragraphs.

* chore(web): /install a11y + version pill polish

- aria-live='polite' on copy buttons so screen readers announce the
  'Copied!' state change.
- Replace redundant INSTANCE_NAME eyebrow (already in the header logo)
  with 'Getting started'.
- Hide the version pill when AGNES_VERSION is unset/'dev' — avoids the
  misleading 'vdev' label in local/unbuilt runs.
- Manual summary focus-visible outline-offset +2px (was -2px which
  clipped inside the card), and mark the chevron as decorative.

* fix(web): use session.user in dashboard avatar fallback

Inside {% if session.user %} guard, the avatar fallback referenced
(user.name or user.email). If user is None the block crashes when
the profile picture is absent. Align with the guard variable.

* fix: address Devin review round 3 on PR #28

- app/api/users.py: stop auto-sending email from reset_password. The
  magic-link sender would deliver a "Login Link" that — when clicked —
  consumes the reset_token via verify_magic_link and logs the user in
  WITHOUT prompting for a new password. Admins now share the raw
  reset_token from the API response manually, or use set-password
  directly. email_sent is always False. Documented inline. (Devin 🟡)
- app/api/cli_artifacts.py: harden /cli/install.sh generation against
  shell injection via Host header or AGNES_VERSION. base_url is
  validated against a strict scheme+host+port regex; version against
  an alnum + dot/dash/underscore allowlist. Both values are also
  piped through shlex.quote() as defense in depth. (Devin 🟡)

The shared users.reset_token column between magic-link and password-
reset flows (Devin 🚩) remains an architectural gap; splitting into
separate columns needs schema v8 and is tracked for a follow-up PR.

* docs, chore(grpn): manual-deploy helpers + hackathon deploy learnings

Adds scripts/grpn/ — Makefile + agnes-auto-upgrade.sh + README for
operating Agnes on GRPN's existing foundryai-development VM when the
full Terraform flow is blocked by org policies:

- iam.disableServiceAccountKeyCreation (org constraint) forbids SA
  JSON keys, so GCP_SA_KEY-based CI is unavailable
- No projectIamAdmin delegation → bootstrap-gcp.sh can't grant roles
- Secret Manager IAM bindings require setIamPolicy which editor lacks

Helper targets: deploy, deploy-tag, recreate, restart, stop, start,
status, version, logs, ps, env, ssh, tunnel, open, bootstrap-admin,
set-data-source, install-cron, uninstall-cron.

docs/superpowers/plans/2026-04-22-grpn-deploy-learnings.md — running
log of all org-policy constraints hit during the hackathon deploy,
with workarounds and derived follow-ups (WIF support, external_ip
variable, customer onboarding IAM checklist).

Not a replacement for the TF flow — stopgap until WIF lands.

* fix(web): make header logos clickable links to home

* feat(web): one-click "Setup a new Claude Code" button

Adds a single-button flow on the dashboard and /install page that
generates a fresh personal access token via POST /auth/tokens and
copies a complete, paste-ready setup script (server URL, token,
install/verify commands) to the clipboard. Falls back to a modal
textarea when the clipboard is blocked; redirects to /login on 401;
surfaces backend errors inline.

- dashboard.html: replaces the top "Set up your local environment"
  anchor with a real button wired to setupNewClaude(). Removes the
  duplicate bottom setup banner to keep a single entry point.
- install.html: for signed-in users, Step 1 leads with the one-click
  button and demotes the curl one-liner into a collapsible "Or run
  manually" aside. Anonymous visitors still see the curl flow plus a
  sign-in hint.
- No new deps. Vanilla JS. Token lives in memory/clipboard only —
  never rendered into persistent DOM.

* feat(cli): add "da auth import-token" for non-interactive PAT login

Writes a provided JWT into ~/.config/da/token.json using the canonical
{access_token, email, role} shape expected by save_token(). Decodes the
token locally to pull email/role claims, verifies it against the server
via GET /api/catalog/tables, and refuses to overwrite an existing token
file if the server returns 401. --email / --role overrides exist for
tokens missing those claims; --skip-verify bypasses the server round-trip
for offline / CI scenarios.

* test(cli): cover da auth import-token success + 401 + claim-fallback paths

Three new tests in TestAuthImportToken:
- valid JWT + 200 -> canonical token.json written
- 401 from /api/catalog/tables -> exit 1, existing token file untouched
- JWT without email/role claims -> refused without overrides, accepted
  with --email / --role flags

* feat(web): update one-click Claude setup instructions — explicit uv install, import-token, skills question

Replaces the fragile `cat > token.json <<EOF` clipboard payload with an
explicit, auditable sequence:

  1. `curl -fsSL /cli/download` + `uv tool install --force` (no opaque
     `curl | bash`).
  2. `da auth import-token --token ...` instead of hand-written JSON.
  3. Explicit PATH persistence for zsh/bash.
  4. A required question to the user about whether to copy the bundled
     skills into ~/.claude/skills/agnes/ or pull them on-demand via
     `da skills show`.
  5. A final confirmation step with whoami + version output.

Factored both pages to include a shared partial
(app/web/templates/_claude_setup_instructions.jinja) so dashboard.html
and install.html can never drift apart again. {server_url} and {token}
stay as runtime placeholders substituted by renderSetupInstructions().

* feat(ui): modernize /admin/users + unify header nav across pages

- New shared partial app/web/templates/_app_header.html — single source
  of truth for the top navigation. Used by base.html and dashboard.html
  (which doesn't extend base.html). Active page highlighted via
  request.url.path. Admin "Users" link gated by session.user.role.
- style-custom.css: add .app-header / .app-nav-link / .app-btn-logout /
  .app-avatar styles (mirrors dashboard's previous inline copy under
  app-* prefix). Mobile-friendly fallback at <720px.
- base.html: include the new partial so every page extending base
  (admin_users, profile, login_email, error, …) gets the same chrome
  the dashboard has.
- dashboard.html: replace its inline <header class="header"> markup
  with the shared partial. Inline .header CSS left in place as
  harmless dead code (separate cleanup PR).
- admin_users.html: rewritten with avatars, role pills (color-coded
  per role), toggle switch for active, search/filter input, toast
  notifications, modal dialogs replacing alert/confirm/prompt,
  one-click copy for the reset token, empty / loading states.
  All XSS-safe via the existing esc() helper + data-attribute
  event delegation.
- tests/test_web_ui.py: smoke test that /admin/users renders the new
  shared header chrome and the modernized markup.

* feat(api): serve CLI wheel at /cli/agnes.whl for direct uv install

uv tool install inspects the URL path suffix to recognise a wheel, so
/cli/download (which has no .whl suffix) cannot be installed directly.
Expose a stable /cli/agnes.whl alias over the same wheel lookup so users
can run: uv tool install --force https://<server>/cli/agnes.whl

* test(cli): cover da auth import-token --server persisting to config.yaml

The server persistence was already implemented in the import-token command
(save_config({server}) call) but not covered by tests. Add an explicit test
so the one-step setup contract — single import-token call writes both token
and server — cannot regress.

* feat(web): simpler Claude setup — single uv install URL, single import-token call

User feedback: the prior clipboard payload repeated the server URL and
token across multiple steps (curl + tmpfile + install + rm + separate
seed-config + import-token). Collapse to:

 1. uv tool install --force {server_url}/cli/agnes.whl  (single URL, direct)
 2. da auth import-token --token ... --server ...        (one call, persists both)
 3. da auth whoami
 4. skills (ask user first)
 5. confirm

uv accepts HTTPS URLs that end in .whl and installs them directly, so
the tmpfile dance is unnecessary. import-token --server already persists
the server to config.yaml, so no separate printf > config.yaml step.

* fix(tests): update admin users heading assertion after template rename

The admin_users.html template now uses <h2 class="users-title">Users</h2>
instead of <h2>User management</h2>. Update the assertion to match.

* feat(ui): unify header across remaining 7 standalone pages

These 7 pages render their own full <html> and don't extend base.html,
so the previous unification commit only covered base + dashboard. Each
had its own ad-hoc <header> markup with inconsistent classes
(.top-header / .header / .page-header), inconsistent nav-link sets,
and inconsistent avatar/email styling.

Replace each inline <header>...</header> block with the shared
{% include '_app_header.html' %} so /activity-center, /admin/permissions,
/admin/tables, /catalog, /corporate-memory, /corporate-memory/admin,
and /install all show the same chrome (Dashboard / Install CLI /
Profile / Users / email + avatar / Logout) with the active page
highlighted via request.url.path.

Old inline header CSS (.header, .top-header, .page-header, .nav-link,
etc.) is left in place as harmless dead code; it can be cleaned up in
a follow-up sweep.

* feat(web): add readable preview of Claude setup payload on dashboard + /install

Move the line-by-line setup instructions into app/web/setup_instructions.py
as the single source of truth, then render them in two modes from the
existing _claude_setup_instructions.jinja partial:

- preview_mode=True  → visible, read-only <pre><code> block with the real
  server URL and a clearly-styled placeholder token (never a real one).
- preview_mode=False → the JS SETUP_INSTRUCTIONS_TEMPLATE used by the
  one-click flow (unchanged behaviour).

Both /dashboard (env-setup-cta card) and /install (Step 1 card) now show
the preview directly under the 'Setup a new Claude Code' button so users
can see exactly what will land in their clipboard before they click.

* feat(web): update setup instructions — `da diagnose` step, explicit section titles

Rework the Claude Code setup payload to:

- Give every numbered step an unambiguous verb header ("1) Install the CLI",
  "2) Log in", "3) Verify the login", "4) Run diagnostics", "5) Skills (ask
  the user first)", "6) Confirm").
- Add step 4 `da diagnose` as the post-login health check. The CLI already
  ships this command (cli/commands/diagnose.py); it prints "Overall:
  healthy" and a list of green checks that map cleanly to next actions.
- Ask the skills copy-vs-on-demand question verbatim so Claude Code always
  prompts the user the same way.
- Replace the terse "Confirm" line with a 4-bullet summary (version,
  whoami, skills choice, diagnose status) so the return message is
  structured and comparable across setups.

* chore(web): remove stale MCP card from /install (no MCP server today)

The 'Use with Claude Code / MCP' card (Step 3 on /install) referenced an
MCP integration Agnes does not ship. Remove the whole card. The one-click
'Setup a new Claude Code' flow in Step 1 already covers the long-lived
client use case and is less confusing than dangling persistence tips for
a non-existent integration.

* feat(api): include user_email + last_used_ip + user_id in admin tokens list response

Adds AdminTokenItem response model (superset of TokenListItem) and
AccessTokenRepository.list_all_with_user() joining personal_access_tokens
with users to denormalize user_email. Needed for /admin/tokens UI where
admins triage tokens across all users.

* feat(web): /admin/tokens page — list, filter, search, revoke across all users

Adds a new admin-only page with client-side filtering (status, user email,
last-used window), column sorting, counts bar (active/revoked/expired),
and an inline revoke action. Mirrors the /admin/users visual language.

* feat(web): add Tokens nav link for admins + deep-link from admin/users row

Admin-only nav entry to /admin/tokens, and a per-row Tokens button on
/admin/users that prefills the token page's user filter via ?user=<email>.

* test(admin): cover /admin/tokens rendering, filter state, non-admin denial, revoke

Verifies admin can render the page (title + JS hooks present), a non-admin
is blocked, unauthenticated users are redirected, the admin list response
includes user_email / user_id / last_used_ip, and admin can revoke another
user's token.

* feat(web): modern redesign of /admin/tokens — hero, stat strip, refined table, responsive cards, a11y

* feat(web): ditch the table — /admin/tokens as a card stack, modern GitHub-style list

Replaces the table-based layout with a stack of self-contained token cards
inside a <ul role=list>. Each card is a flex row: avatar + name/meta on the
left, last-used block in the middle, status pill + outlined 'Revoke' button
on the right. Status and sort controls are pill-shaped toggle chips; user
email search has an inline search icon. No <table>/<tr>/<th>/<td> anywhere.
Responsive below 720px (card stacks vertically) and 480px (stat chips 2x2).
Preserves filter IDs (flt-status, flt-user, flt-last-used) and data-revoke
for existing tests.

* feat(web): add /tokens (role-aware) — single page for both user PAT CRUD and admin overview

- Rename admin_tokens.html -> tokens.html with a new is_admin context flag.
- New route GET /tokens: renders the same card-stack UI for everyone.
  * Admins: loads /auth/admin/tokens, shows owner column + stat strip, keeps
    the owner-email search box and sort-by-owner chip.
  * Non-admins: loads /auth/tokens (own tokens only), hides owner column +
    stat chips, adds a 'New token' CTA in the hero that opens a modal
    (name + expires_in_days) calling POST /auth/tokens. The raw token is
    revealed once in a dismissable banner and cleared from the DOM on Hide.
- GET /admin/tokens now 302-redirects to /tokens, preserving query string
  (so the /admin/users deep-link ?user=foo still works).

* feat(web): /tokens full-bleed layout to match dashboard width

The hero, toolbar, and card list used to sit inside base.html's .container
(max-width 800px). Break out with negative horizontal margins so the page
spans the viewport like /dashboard does, capped at 1440px for readability
on very wide screens with a 24px gutter on each side.

- No change to base.html itself. The override is scoped to .tokens-page.
- body { overflow-x: hidden; } guards against rare horizontal scrollbars.
- < 808px viewport: reset to natural flow (mobile already narrower).
- ≥ 1488px viewport: cap to 1440px and re-center.

* chore(web): remove /profile template + nav link (redirect /profile -> /tokens)

The old /profile PAT CRUD page is now redundant — the modern /tokens page
covers both user and admin flows. Delete the template; the router's
/profile handler already 302-redirects to /tokens.

Nav cleanup:
- Remove the 'Profile' link.
- Show a single 'Tokens' link to every signed-in user (previously only
  admins saw it).
- Active-state matches /tokens, /admin/tokens, and /profile so the
  highlight survives the redirect chain.

/install CTA now points at /tokens instead of /profile.

* test: cover /tokens for admin + non-admin flows, /profile redirect, nav update

tests/test_admin_tokens_ui.py
- Point admin rendering test at /tokens directly and tighten assertions
  (admin-only stat strip + owner search, non-admin CTA absent).
- Add test_non_admin_can_render_tokens_page: personal body, New-token CTA,
  create-modal, reveal banner; stat strip + owner search absent.
- Add test_admin_tokens_redirects_to_tokens: 302 to /tokens, query string
  (?user=...) preserved for the /admin/users deep-link.
- Add test_profile_redirects_to_tokens: 302 to /tokens.
- Add test_non_admin_can_create_pat_via_tokens_page_api: exercises the
  POST /auth/tokens call that the non-admin create-modal submits.

tests/test_pat.py
- test_profile_page_renders -> test_profile_page_redirects_to_tokens:
  assert the 302 + that /tokens lands on the unified non-admin body.

tests/test_web_ui.py
- admin_users nav assertion: 'Tokens' link present, 'Profile' link absent.
- Add test_nav_shows_tokens_link_for_non_admin: non-admins see the same
  'Tokens' link (previously only admins did).
- Add test_profile_redirects_to_tokens back-compat check.

* feat(web): collapse 'What Claude Code will receive' by default

The preview block on /dashboard and /install now uses <details>/<summary>
so it is hidden by default. Click the chevron/title to expand and review
the clipboard payload. Markup stays in the DOM so existing tests that
assert on content continue to pass.

* fix(web): /tokens width — override .container to 1280px like dashboard

The negative-margin full-bleed trick was fragile and pushed content past
the right edge on deployed viewports. Replace with a simple max-width
override of base.html's .container on this page only, matching
/dashboard's 1280px center-column layout.

* feat(web): split role-aware /tokens into my_tokens.html + admin_tokens.html

* feat(web): router — separate handlers for /tokens (own) and /admin/tokens (all)

* feat(web): nav — show Tokens for all, add All tokens for admins

* test: cover split token pages (own vs all) + admin access gating

* feat(web): move 'My tokens' into a user dropdown menu

Replaces the separate Tokens/email/Logout nav trio with a rounded
avatar trigger that opens a dropdown containing the user's email,
role, a 'My tokens' link, and Logout. Admin-only 'All tokens' stays
as a top-level nav item since it's an admin function, not a personal
one. Click-outside and Escape close the panel; chevron rotates on
open.

* fix(api): allow PATs to list/get/revoke their own tokens (CLI flow)

The documented 'da auth token list/revoke' CLI flow in
docs/HEADLESS_USAGE.md uses a PAT, but the previous dependency
(require_session_token) returned 403. Only create_token must be
session-only to prevent PAT-spawning-PAT chains; listing and
revoking your own tokens is safe with a PAT.

* fix(api): cap expires_in_days at 3650 to avoid datetime overflow (500 to 400)

Values above ~11 million days overflowed datetime.max in
datetime.now(utc) + timedelta(days=...) and surfaced as an
unhandled OverflowError → 500. Cap at 10 years with a clear
400 instead; the no-expiry code path is unaffected.

* fix(api): relax _SAFE_URL_RE to allow path prefixes, underscores, and IPv6

The previous regex rejected legitimate reverse-proxy base_url values
(https://host/agnes/), underscores in Docker Compose hostnames, and
IPv6 literals (http://[::1]:8000). Widen the charset and allow an
optional trailing path. shlex.quote continues to provide
defense-in-depth against any metacharacter that slips through.

* fix(web): /login/email and Google OAuth propagate next_path

Previously, /login/email silently dropped the ?next=<path> query
param so the hidden form field rendered empty and login always
landed on /dashboard. Google's button was hard-coded to
/auth/google/login, ignoring next entirely.

- /login page now appends ?next to the Google button URL
- /login/email reads + sanitizes next, passes as template context
- google_login stashes sanitized next_path in session['login_next']
- google_callback pops + re-sanitizes and redirects there

Sanitization factored into app/auth/_common.safe_next_path.

* fix(auth): differentiate argon2 VerifyMismatchError from internal errors in web login

The previous except (VerifyMismatchError, Exception) collapsed both
cases into the generic 'invalid credentials' redirect, silently
hiding corrupted-hash / library errors from ops. Split the two:
bad password still gets ?error=invalid; anything else logs via
logger.exception and redirects with ?err=auth_internal so ops have
a visible signal and users don't retry forever against a broken
password_hash column.

* docs: correct CLAUDE.md table name (personal_access_tokens)

v7 note referenced 'access_tokens.last_used_ip' but the real table
is personal_access_tokens (as mentioned two tokens earlier in the
same bullet). Same-file consistency fix.

* chore(web): clarify admin user-reset UI — encourage Set password over the unused reset_token

POST /api/users/{id}/reset-password stores and returns a token
but no endpoint consumes it — the magic-link sender would log the
user in without prompting for a new password, defeating the reset.
- Drop the 'Reset' row action from admin_users so admins aren't
  pointed at a dead end.
- Rewrite the reveal-modal copy to tell admins to use Set password
  and explicitly note that the magic-link flow isn't available
  for reset tokens in this build.
The API endpoint stays for API-level future use.

* test: cover PAT CLI flow, expires_in_days overflow, proxy base_url, next propagation

- tests/test_pat.py: PAT can list own tokens (200, was 403);
  PAT can revoke own tokens (204); create_token returns 400 for
  expires_in_days > 3650 (was 500 via datetime overflow).
- tests/test_cli_artifacts.py: _SAFE_URL_RE accepts reverse-proxy
  path prefixes, underscores, and IPv6 literals; end-to-end check
  of cli_install_script with a stubbed base_url that includes
  a path prefix (Agnes behind /agnes/).
- tests/test_web_ui.py: /login propagates ?next to the Google
  button URL; /login/email renders next in the hidden form field
  and strips hostile values; unit coverage of safe_next_path.

* fix(security): use \Z instead of $ in URL/version allowlists (trailing-\n bypass)

Python regex `$` also matches just before a trailing newline, so a Host
header or AGNES_VERSION value like "good.example.com\n$(rm -rf /)"
would slip past the allowlist. `\Z` anchors to strict end-of-string.

shlex.quote downstream remains as defense-in-depth, but the allowlist
is now the tight gate it claims to be.

* fix(auth): PAT with null expiry omits JWT exp claim (DB is the source of truth)

Previously a PAT created with `expires_in_days=null` (user-requested
"never expires") set the DB `expires_at` to NULL (correct) but still
baked a ~100y `exp` claim into the JWT. That is misleading: the PAT
silently did expire eventually, despite the UI and API promising
"no expiry".

`create_access_token` now accepts `omit_exp=True` to skip the `exp`
claim entirely. `app/api/tokens.py` passes that when `expires_in_days
is None`. The authoritative expiry check lives in
`app/auth/dependencies.py`, which reads `expires_at` from the DB row —
unchanged. PyJWT accepts claim-less JWTs indefinitely.

* test: cover trailing-newline regex bypass + no-exp JWT for unbounded PAT

- test_safe_url_re_rejects_trailing_newline_bypass: asserts both
  `_SAFE_URL_RE` and `_SAFE_VERSION_RE` reject values with a trailing
  `\n` (previously accepted because Python `$` matches before `\n`).
- test_pat_null_expiry_jwt_has_no_exp_claim: POST /auth/tokens with
  `expires_in_days=null`, decode the returned JWT, assert `exp` is
  absent while `typ=pat`, `sub`, and `jti` are still present.
- test_pat_with_null_expiry_is_accepted_by_verify_token: verify_token
  round-trips a claim-less JWT without ExpiredSignatureError.
- test_pat_null_expiry_end_to_end_allows_authenticated_request: use
  the null-expiry PAT against /auth/tokens and confirm it authenticates.

* docs(auth): document X-Forwarded-For trust model in _client_ip

Deployment runs behind Caddy which strips incoming X-Forwarded-For
and sets its own, so the leftmost hop is trustworthy. Clarify that
the stored last_used_ip is audit-only and never used for access
control — if the app is ever exposed directly, this value becomes
client-settable.

* docs: /profile → /tokens in install.sh next-steps, CLI error, HEADLESS_USAGE, security skill

After splitting PAT management to /tokens (with /profile as a back-compat
302), stale references remained in user-facing text. Update them to the
canonical /tokens URL so shell scripts, CLI error hints, docs, and the
bundled security skill are all consistent.
2026-04-22 14:24:28 +02:00

101 KiB
Raw Blame History

User Management + PAT + CLI Distribution Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Dodat plný scope issues #10, #11, #12 a #9 — HTML auth redirect, user management UI/API/CLI, Personal Access Tokens a CLI distribuce z docker image s install stránkou — pro produkční multi-customer nasazení Agnes.

Architecture: 4 fáze sériové tam, kde sdílejí schema migrace nebo stejný soubor. V místech, kde scope nekoliduje, běží paralelně v oddělených worktreech. Každá fáze je self-contained, má vlastní TDD tasky a commituje se průběžně.

Tech Stack: Python 3.13, FastAPI, DuckDB, Jinja2 templates (Bootstrap-like CSS), Typer CLI, PyJWT, Argon2, pytest, Docker (uv build), httpx.

Sekvence a paralelismus:

Phase 0 (#10 HTML redirect)          ── 12 h, standalone
   │
   ▼
Phase 1 (#11 User management)        ── schema v5, API+CLI+UI, 1 den
   │
   ▼
Phase 2 (#12 PAT)  ──────────┐       ── schema v6, JWT+API+CLI+UI
                             ├── paralelně ve 2 worktreech
Phase 3 (#9 CLI dist)  ──────┘       ── Dockerfile+/cli/*+/install+login fix

Konflikt mapa (review-audited): Phase 2 a Phase 3 sdílejí 4 soubory, ne jen jeden. Všechny jsou ale malé/lokální edity a merge je triviální:

Soubor Phase 1 Phase 2 Phase 3 Řešení
app/main.py register tokens_router register cli_artifacts_router oba přidávají include_router, merge konflikt-free
app/web/router.py /admin/users route /profile route /install route appendy na konec, konflikt max 1 řádek
app/web/templates/dashboard.html admin nav link profile nav link install nav link ve stejném <nav> bloku — označit jeden owner (Phase 3) který doplní všechny tři linky podle existujících rolí
cli/commands/auth.py register token_app fix login password sériové: Phase 3 první, Phase 2 rebase

Pravidlo: Phase 3 se mergne první (obsahuje da login fix + dashboard nav). Phase 2 potom rebase na main. Phase 1 nemá konflikt s Phase 2/3 v nav (adminský link), ostatní soubory disjunktní.

Testování: Každá fáze končí zeleným pytest tests/ na celé sadě. Každý task má TDD cyklus: failing test → minimal impl → passing test → commit.


Phase 0 — HTML Auth Redirect (#10)

Backend get_current_user dnes hází HTTPException(401) pro všechny requesty. Browser dostane JSON místo přesměrování na /login. Cíl: rozlišit API vs. HTML request a pro HTML vracet RedirectResponse("/login").

File Structure

  • Modify: app/auth/dependencies.py — rozlišit request typ přes Accept header / cestu
  • Test: tests/test_auth_html_redirect.py (new)

Task 0.1: Test — HTML request bez tokenu dostane 302 na /login

Files:

  • Create: tests/test_auth_html_redirect.py

  • Step 1: Write the failing test

"""Tests for #10 — unauthenticated HTML routes must redirect, not return JSON 401."""

from fastapi.testclient import TestClient

from app.main import app


def test_html_route_without_token_redirects_to_login():
    """GET /dashboard without token must return 302 to /login (not 401 JSON)."""
    client = TestClient(app, follow_redirects=False)
    response = client.get("/dashboard", headers={"Accept": "text/html"})
    assert response.status_code == 302
    assert response.headers["location"] == "/login"


def test_api_route_without_token_returns_401_json():
    """GET /api/users without token must return 401 JSON (not redirect)."""
    client = TestClient(app, follow_redirects=False)
    response = client.get("/api/users", headers={"Accept": "application/json"})
    assert response.status_code == 401
    assert response.headers["content-type"].startswith("application/json")


def test_html_route_with_bad_token_redirects_to_login():
    """Expired/invalid cookie on HTML route → redirect to /login, not JSON 401."""
    client = TestClient(app, follow_redirects=False)
    response = client.get(
        "/dashboard",
        headers={"Accept": "text/html"},
        cookies={"access_token": "bogus.token.here"},
    )
    assert response.status_code == 302
    assert response.headers["location"] == "/login"


def test_root_without_token_still_redirects_to_login():
    """Regression: `/` uses get_optional_user and must still render/redirect, not crash."""
    client = TestClient(app, follow_redirects=False)
    response = client.get("/", headers={"Accept": "text/html"})
    # `/` redirects based on auth state; without token we go to /login
    assert response.status_code == 302
    assert response.headers["location"] == "/login"
  • Step 2: Run test to verify it fails

Run: pytest tests/test_auth_html_redirect.py -v Expected: first and third tests FAIL (HTTP 401 returned instead of 302); second test likely passes.

  • Step 3: Implement HTML-aware auth dependency in app/auth/dependencies.py

Replace the body of get_current_user so HTML requests get a redirect instead of a 401. Keep get_optional_user intact (it already returns None on failure).

"""FastAPI auth dependencies — current user, role checking."""

from typing import Optional

import duckdb
from fastapi import Depends, HTTPException, Header, Request, status
from fastapi.responses import RedirectResponse

from app.auth.jwt import verify_token
from src.db import get_system_db
from src.rbac import Role, ROLE_HIERARCHY
from src.repositories.users import UserRepository


def _get_db():
    conn = get_system_db()
    try:
        yield conn
    finally:
        conn.close()


def _wants_html(request: Optional[Request]) -> bool:
    """True if client is a browser expecting HTML (not an API client wanting JSON)."""
    if request is None:
        return False
    # Explicit JSON request from an API client — never redirect.
    accept = request.headers.get("accept", "")
    if "application/json" in accept and "text/html" not in accept:
        return False
    # Path heuristic — /api/* and /auth/* are never HTML surfaces.
    path = request.url.path
    if path.startswith("/api/") or path.startswith("/auth/"):
        return False
    # Everything else (browser navigations to /dashboard, /admin/*, /profile/*, etc.)
    # treats text/html or */* as HTML.
    return "text/html" in accept or "*/*" in accept or accept == ""


class _HTMLAuthRedirect(Exception):
    """Sentinel raised by auth dependencies to trigger redirect instead of 401."""


async def get_current_user(
    request: Request = None,
    authorization: Optional[str] = Header(None),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> dict:
    """Extract and validate JWT from Authorization header or cookie. Returns user dict.

    HTML browser requests without a valid token get redirected to /login via an
    exception handler in app/main.py (#10). API requests keep getting JSON 401.
    """
    token = None
    if authorization and authorization.startswith("Bearer "):
        token = authorization.removeprefix("Bearer ")
    if not token and request:
        token = request.cookies.get("access_token")

    def _fail(detail: str) -> None:
        if _wants_html(request):
            raise _HTMLAuthRedirect()
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED, detail=detail
        )

    if not token:
        _fail("Missing or invalid Authorization header")
    payload = verify_token(token)
    if not payload:
        _fail("Invalid or expired token")

    repo = UserRepository(conn)
    user = repo.get_by_id(payload.get("sub", ""))
    if not user:
        _fail("User not found")
    return user


async def get_optional_user(
    request: Request = None,
    authorization: Optional[str] = Header(None),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
) -> Optional[dict]:
    """Like get_current_user but returns None instead of 401 if no token."""
    try:
        return await get_current_user(request=request, authorization=authorization, conn=conn)
    except (HTTPException, _HTMLAuthRedirect):
        return None


def require_role(minimum_role: Role):
    """Dependency factory: require user has at least the given role."""
    async def _check(request: Request, user: dict = Depends(get_current_user)):
        user_role = Role(user.get("role", "viewer"))
        if ROLE_HIERARCHY.get(user_role, 0) < ROLE_HIERARCHY.get(minimum_role, 0):
            if _wants_html(request):
                raise _HTMLAuthRedirect()
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Requires role {minimum_role.value} or higher",
            )
        return user
    return _check


async def require_admin(request: Request, user: dict = Depends(get_current_user)) -> dict:
    """Dependency: require user is an admin. Raises 403 or redirects on HTML requests."""
    if user.get("role") != "admin":
        if _wants_html(request):
            raise _HTMLAuthRedirect()
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required",
        )
    return user
  • Step 4: Register redirect exception handler in app/main.py

Add inside create_app() after middleware registration, before router includes:

from fastapi import Request
from fastapi.responses import RedirectResponse
from app.auth.dependencies import _HTMLAuthRedirect

@app.exception_handler(_HTMLAuthRedirect)
async def _html_auth_redirect_handler(request: Request, exc: _HTMLAuthRedirect):
    return RedirectResponse(url="/login", status_code=302)
  • Step 5: Run tests — verify pass

Run: pytest tests/test_auth_html_redirect.py -v Expected: all 3 tests PASS.

  • Step 6: Run full test suite — verify no regressions

Run: pytest tests/ -x --timeout=30 Expected: no new failures.

  • Step 7: Commit
git add app/auth/dependencies.py app/main.py tests/test_auth_html_redirect.py
git commit -m "fix(auth): redirect HTML requests to /login instead of JSON 401 (#10)"

Phase 1 — User Management (#11)

Scope: schema v5 (active column + audit fields), API (PATCH, reset-password, set-password, deactivate, activate) s self-lockout safeguards, CLI (5 nových da admin příkazů), UI (/admin/users stránka), runtime-kontrola active v get_current_user.

File Structure

  • Modify: src/db.py — schema v5 + migration
  • Modify: src/repositories/users.py — extended update, count_admins, list_all_with_active
  • Modify: app/api/users.py — 5 nových endpointů + safeguards + audit
  • Modify: app/auth/dependencies.py — kontrola active=false v get_current_user
  • Modify: cli/commands/admin.py — 5 nových příkazů, rozšířený list-users output
  • Create: app/web/templates/admin_users.html
  • Modify: app/web/router.py — přidat /admin/users route + nav link
  • Modify: app/web/templates/dashboard.html — admin nav link na user management
  • Test: tests/test_user_management.py (new)

Task 1.1: Schema v5 — users.active + deactivated_at/by

Files:

  • Modify: src/db.py:19 (bump SCHEMA_VERSION)

  • Modify: src/db.py:27-39 (users table definition)

  • Modify: src/db.py:387-426 (add _V4_TO_V5_MIGRATIONS)

  • Modify: src/db.py:460-471 (migration dispatch)

  • Step 1: Write the failing test

# tests/test_user_management.py
"""Tests for #11 — user management (active flag, safeguards, endpoints)."""

import os
import tempfile
import pytest

import duckdb

from src.db import _ensure_schema, get_schema_version


@pytest.fixture
def fresh_db(monkeypatch):
    with tempfile.TemporaryDirectory() as tmp:
        monkeypatch.setenv("DATA_DIR", tmp)
        yield tmp


def test_schema_v5_adds_active_column(fresh_db):
    from src.db import get_system_db, close_system_db
    conn = get_system_db()
    try:
        cols = conn.execute("PRAGMA table_info(users)").fetchall()
        col_names = [c[1] for c in cols]
        assert "active" in col_names
        assert "deactivated_at" in col_names
        assert "deactivated_by" in col_names
        assert get_schema_version(conn) >= 5
    finally:
        conn.close()
        close_system_db()
  • Step 2: Run test to verify it fails

Run: pytest tests/test_user_management.py::test_schema_v5_adds_active_column -v Expected: FAIL with "active column not found" or schema version < 5.

  • Step 3: Bump SCHEMA_VERSION and update users DDL

In src/db.py:

SCHEMA_VERSION = 5

Update the users table in _SYSTEM_SCHEMA:

CREATE TABLE IF NOT EXISTS users (
    id VARCHAR PRIMARY KEY,
    email VARCHAR UNIQUE NOT NULL,
    name VARCHAR,
    role VARCHAR DEFAULT 'analyst',
    password_hash VARCHAR,
    setup_token VARCHAR,
    setup_token_created TIMESTAMP,
    reset_token VARCHAR,
    reset_token_created TIMESTAMP,
    active BOOLEAN NOT NULL DEFAULT TRUE,
    deactivated_at TIMESTAMP,
    deactivated_by VARCHAR,
    created_at TIMESTAMP DEFAULT current_timestamp,
    updated_at TIMESTAMP
);

Add migration list below _V3_TO_V4_MIGRATIONS:

_V4_TO_V5_MIGRATIONS = [
    "ALTER TABLE users ADD COLUMN IF NOT EXISTS active BOOLEAN NOT NULL DEFAULT TRUE",
    "ALTER TABLE users ADD COLUMN IF NOT EXISTS deactivated_at TIMESTAMP",
    "ALTER TABLE users ADD COLUMN IF NOT EXISTS deactivated_by VARCHAR",
]

Extend the migration dispatch in _ensure_schema (inside the else branch at the current if current < 4: block):

            if current < 5:
                for sql in _V4_TO_V5_MIGRATIONS:
                    conn.execute(sql)
  • Step 4: Run test to verify it passes

Run: pytest tests/test_user_management.py::test_schema_v5_adds_active_column -v Expected: PASS.

  • Step 4b: Backfill test — upgrade from v4 with existing rows

Append to tests/test_user_management.py:

def test_schema_v5_backfill_keeps_existing_users_active(fresh_db):
    """Simulate upgrading from v4: insert a user pre-migration, verify active=TRUE afterwards."""
    import uuid
    import duckdb as _duckdb
    from pathlib import Path

    # 1. Create a v4-era DB by hand.
    db_dir = Path(fresh_db) / "state"
    db_dir.mkdir(parents=True, exist_ok=True)
    db_path = db_dir / "system.duckdb"
    conn = _duckdb.connect(str(db_path))
    try:
        conn.execute("CREATE TABLE schema_version (version INTEGER NOT NULL, applied_at TIMESTAMP DEFAULT current_timestamp)")
        conn.execute("INSERT INTO schema_version (version) VALUES (4)")
        conn.execute("""CREATE TABLE users (
            id VARCHAR PRIMARY KEY, email VARCHAR UNIQUE NOT NULL,
            name VARCHAR, role VARCHAR DEFAULT 'analyst',
            password_hash VARCHAR, setup_token VARCHAR,
            setup_token_created TIMESTAMP, reset_token VARCHAR,
            reset_token_created TIMESTAMP,
            created_at TIMESTAMP DEFAULT current_timestamp, updated_at TIMESTAMP)""")
        uid = str(uuid.uuid4())
        conn.execute("INSERT INTO users (id, email, name, role) VALUES (?, 'pre@v4', 'Pre', 'admin')", [uid])
    finally:
        conn.close()

    # 2. Now let the app open it — schema should migrate to v5 and backfill active=TRUE.
    from src.db import get_system_db, close_system_db, get_schema_version
    close_system_db()
    conn = get_system_db()
    try:
        assert get_schema_version(conn) >= 5
        row = conn.execute("SELECT email, active FROM users WHERE email = 'pre@v4'").fetchone()
        assert row is not None
        assert row[1] is True
    finally:
        conn.close()
        close_system_db()

Run: pytest tests/test_user_management.py::test_schema_v5_backfill_keeps_existing_users_active -v Expected: PASS.

  • Step 5: Commit
git add src/db.py tests/test_user_management.py
git commit -m "feat(db): schema v5 — users.active + deactivated_at/by (#11)"

Task 1.2: UserRepository — update supports active/deactivated_*, add count_admins & list_all

Files:

  • Modify: src/repositories/users.py:49-58 (extend update allowed fields)

  • Modify: src/repositories/users.py:27-32 (list_all already exists; keep)

  • Add: count_admins() method

  • Step 1: Write the failing test

# tests/test_user_management.py — append

def test_repository_update_accepts_active(fresh_db):
    import uuid
    from src.db import get_system_db, close_system_db
    from src.repositories.users import UserRepository
    conn = get_system_db()
    try:
        repo = UserRepository(conn)
        uid = str(uuid.uuid4())
        repo.create(id=uid, email="a@b.c", name="A", role="analyst")
        repo.update(id=uid, active=False, deactivated_by="admin-uuid")
        row = repo.get_by_id(uid)
        assert row["active"] is False
        assert row["deactivated_by"] == "admin-uuid"
    finally:
        conn.close()
        close_system_db()


def test_repository_count_admins(fresh_db):
    import uuid
    from src.db import get_system_db, close_system_db
    from src.repositories.users import UserRepository
    conn = get_system_db()
    try:
        repo = UserRepository(conn)
        assert repo.count_admins() == 0
        repo.create(id=str(uuid.uuid4()), email="a@b.c", name="A", role="admin")
        repo.create(id=str(uuid.uuid4()), email="b@b.c", name="B", role="analyst")
        assert repo.count_admins() == 1
    finally:
        conn.close()
        close_system_db()
  • Step 2: Run — verify fail

Run: pytest tests/test_user_management.py -v -k "repository" Expected: FAIL.

  • Step 3: Extend repository

In src/repositories/users.py, update allowed set and set special handling for active + add count_admins. Also add deactivated_at:

    def update(self, id: str, **kwargs) -> None:
        allowed = {
            "email", "name", "role", "password_hash", "setup_token",
            "setup_token_created", "reset_token", "reset_token_created",
            "active", "deactivated_at", "deactivated_by",
        }
        updates = {k: v for k, v in kwargs.items() if k in allowed}
        if not updates:
            return
        updates["updated_at"] = datetime.now(timezone.utc)
        set_clause = ", ".join(f"{k} = ?" for k in updates)
        values = list(updates.values()) + [id]
        self.conn.execute(f"UPDATE users SET {set_clause} WHERE id = ?", values)

    def count_admins(self, active_only: bool = True) -> int:
        sql = "SELECT COUNT(*) FROM users WHERE role = 'admin'"
        if active_only:
            sql += " AND COALESCE(active, TRUE) = TRUE"
        result = self.conn.execute(sql).fetchone()
        return int(result[0]) if result else 0
  • Step 4: Run — verify pass

Run: pytest tests/test_user_management.py -v -k "repository" Expected: PASS.

  • Step 5: Commit
git add src/repositories/users.py tests/test_user_management.py
git commit -m "feat(users): repository supports active flag + count_admins (#11)"

Task 1.3: API — PATCH /api/users/{id} + audit

Files:

  • Modify: app/api/users.py

  • Step 1: Failing test

# tests/test_user_management.py — append (use existing API test fixtures pattern)

from fastapi.testclient import TestClient


@pytest.fixture
def app_client(fresh_db, monkeypatch):
    monkeypatch.setenv("TESTING", "1")
    monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
    from app.main import app
    return TestClient(app)


def _seed_admin(fresh_db):
    """Create an admin user and return (id, bearer_token)."""
    import uuid
    from src.db import get_system_db
    from src.repositories.users import UserRepository
    from app.auth.jwt import create_access_token
    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="admin@test", name="Admin", role="admin")
        token = create_access_token(user_id=uid, email="admin@test", role="admin")
        return uid, token
    finally:
        conn.close()


def test_patch_user_updates_role(app_client, fresh_db):
    import uuid
    from src.db import get_system_db
    from src.repositories.users import UserRepository
    admin_id, token = _seed_admin(fresh_db)
    target_id = str(uuid.uuid4())
    conn = get_system_db()
    try:
        UserRepository(conn).create(id=target_id, email="x@test", name="X", role="viewer")
    finally:
        conn.close()

    resp = app_client.patch(
        f"/api/users/{target_id}",
        headers={"Authorization": f"Bearer {token}"},
        json={"role": "analyst", "name": "X2"},
    )
    assert resp.status_code == 200
    data = resp.json()
    assert data["role"] == "analyst"
    assert data["name"] == "X2"
  • Step 2: Run — verify fail

Run: pytest tests/test_user_management.py::test_patch_user_updates_role -v Expected: FAIL (405 Method Not Allowed or 404).

  • Step 3: Implement PATCH in app/api/users.py

Replace entire file with extended version (additions bolded conceptually):

"""User management endpoints (#11)."""

import uuid
from datetime import datetime, timezone
from typing import Optional, List

import duckdb
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from argon2 import PasswordHasher

from app.auth.dependencies import require_role, Role, _get_db
from src.repositories.users import UserRepository
from src.repositories.audit import AuditRepository

router = APIRouter(prefix="/api/users", tags=["users"])


def _audit(conn: duckdb.DuckDBPyConnection, actor_id: str, action: str, target_id: str, params: Optional[dict] = None) -> None:
    try:
        AuditRepository(conn).log(
            user_id=actor_id,
            action=action,
            resource=f"user:{target_id}",
            params=params,
        )
    except Exception:
        pass  # never block the endpoint on audit failure


class CreateUserRequest(BaseModel):
    email: str
    name: str
    role: str = "analyst"


class UpdateUserRequest(BaseModel):
    name: Optional[str] = None
    role: Optional[str] = None
    active: Optional[bool] = None


class SetPasswordRequest(BaseModel):
    password: str


class UserResponse(BaseModel):
    id: str
    email: str
    name: Optional[str]
    role: str
    active: bool = True
    created_at: Optional[str]
    deactivated_at: Optional[str] = None


def _to_response(u: dict) -> UserResponse:
    return UserResponse(
        id=u["id"],
        email=u["email"],
        name=u.get("name"),
        role=u["role"],
        active=bool(u.get("active", True)),
        created_at=str(u.get("created_at", "")),
        deactivated_at=str(u["deactivated_at"]) if u.get("deactivated_at") else None,
    )


@router.get("", response_model=List[UserResponse])
async def list_users(
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    return [_to_response(u) for u in UserRepository(conn).list_all()]


@router.post("", response_model=UserResponse, status_code=201)
async def create_user(
    payload: CreateUserRequest,
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = UserRepository(conn)
    if repo.get_by_email(payload.email):
        raise HTTPException(status_code=409, detail="User with this email already exists")
    user_id = str(uuid.uuid4())
    repo.create(id=user_id, email=payload.email, name=payload.name, role=payload.role)
    _audit(conn, user["id"], "user.create", user_id, {"email": payload.email, "role": payload.role})
    created = repo.get_by_id(user_id)
    return _to_response(created)


@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: str,
    payload: UpdateUserRequest,
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = UserRepository(conn)
    target = repo.get_by_id(user_id)
    if not target:
        raise HTTPException(status_code=404, detail="User not found")

    updates: dict = {}
    if payload.name is not None:
        updates["name"] = payload.name
    if payload.role is not None:
        # Validate role is a known value
        try:
            Role(payload.role)
        except ValueError:
            raise HTTPException(status_code=400, detail=f"Unknown role: {payload.role}")
        # Protect: don't let admin demote themselves if they are the last admin
        if (
            target["id"] == user["id"]
            and target["role"] == "admin"
            and payload.role != "admin"
            and repo.count_admins(active_only=True) <= 1
        ):
            raise HTTPException(status_code=409, detail="Cannot demote the last active admin")
        updates["role"] = payload.role
    if payload.active is not None:
        # Protect: cannot self-deactivate
        if target["id"] == user["id"] and payload.active is False:
            raise HTTPException(status_code=409, detail="Cannot deactivate yourself")
        # Protect: cannot deactivate the last active admin
        if (
            target.get("role") == "admin"
            and payload.active is False
            and repo.count_admins(active_only=True) <= 1
        ):
            raise HTTPException(status_code=409, detail="Cannot deactivate the last active admin")
        updates["active"] = payload.active
        if payload.active is False:
            updates["deactivated_at"] = datetime.now(timezone.utc)
            updates["deactivated_by"] = user["id"]
        else:
            updates["deactivated_at"] = None
            updates["deactivated_by"] = None

    if updates:
        repo.update(id=user_id, **updates)
        _audit(conn, user["id"], "user.update", user_id, {k: v for k, v in updates.items() if k != "deactivated_at"})
    return _to_response(repo.get_by_id(user_id))


@router.delete("/{user_id}", status_code=204)
async def delete_user(
    user_id: str,
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = UserRepository(conn)
    target = repo.get_by_id(user_id)
    if not target:
        raise HTTPException(status_code=404, detail="User not found")
    if target["id"] == user["id"]:
        raise HTTPException(status_code=409, detail="Cannot delete yourself")
    if target.get("role") == "admin" and repo.count_admins(active_only=True) <= 1:
        raise HTTPException(status_code=409, detail="Cannot delete the last active admin")
    repo.delete(user_id)
    _audit(conn, user["id"], "user.delete", user_id, {"email": target["email"]})


@router.post("/{user_id}/reset-password")
async def reset_password(
    user_id: str,
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    """Generate a reset token and (best-effort) email it to the user."""
    import secrets
    repo = UserRepository(conn)
    target = repo.get_by_id(user_id)
    if not target:
        raise HTTPException(status_code=404, detail="User not found")
    token = secrets.token_urlsafe(32)
    repo.update(
        id=user_id,
        reset_token=token,
        reset_token_created=datetime.now(timezone.utc),
    )
    _audit(conn, user["id"], "user.reset_password", user_id, {"email": target["email"]})
    # Best-effort email
    email_sent = False
    try:
        from app.auth.providers.email import _send_email, is_available
        if is_available():
            _send_email(target["email"], token)
            email_sent = True
    except Exception:
        pass
    return {"reset_token": token, "email_sent": email_sent}


@router.post("/{user_id}/set-password", status_code=204)
async def set_password(
    user_id: str,
    payload: SetPasswordRequest,
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    if not payload.password or len(payload.password) < 8:
        raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
    repo = UserRepository(conn)
    target = repo.get_by_id(user_id)
    if not target:
        raise HTTPException(status_code=404, detail="User not found")
    ph = PasswordHasher()
    repo.update(id=user_id, password_hash=ph.hash(payload.password))
    _audit(conn, user["id"], "user.set_password", user_id, {"email": target["email"]})


@router.post("/{user_id}/deactivate", response_model=UserResponse)
async def deactivate_user(
    user_id: str,
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    return await update_user(
        user_id=user_id,
        payload=UpdateUserRequest(active=False),
        request=request, user=user, conn=conn,
    )


@router.post("/{user_id}/activate", response_model=UserResponse)
async def activate_user(
    user_id: str,
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    return await update_user(
        user_id=user_id,
        payload=UpdateUserRequest(active=True),
        request=request, user=user, conn=conn,
    )
  • Step 4: Run — verify pass

Run: pytest tests/test_user_management.py::test_patch_user_updates_role -v Expected: PASS.

  • Step 5: Commit
git add app/api/users.py tests/test_user_management.py
git commit -m "feat(api): user PATCH/reset-password/set-password/activate/deactivate (#11)"

Task 1.4: Safeguards — self-deactivate / last-admin protection

Files:

  • Test: tests/test_user_management.py — append

  • Step 1: Failing tests

# tests/test_user_management.py — append

def test_cannot_self_deactivate(app_client, fresh_db):
    admin_id, token = _seed_admin(fresh_db)
    resp = app_client.patch(
        f"/api/users/{admin_id}",
        headers={"Authorization": f"Bearer {token}"},
        json={"active": False},
    )
    assert resp.status_code == 409
    assert "yourself" in resp.json()["detail"].lower()


def test_cannot_delete_last_admin(app_client, fresh_db):
    admin_id, token = _seed_admin(fresh_db)
    # Create a non-admin so we have ≥2 users, but admin is still the only admin.
    resp = app_client.post(
        "/api/users",
        headers={"Authorization": f"Bearer {token}"},
        json={"email": "x@test", "name": "X", "role": "viewer"},
    )
    x_id = resp.json()["id"]
    # Try deleting the admin.
    resp = app_client.delete(
        f"/api/users/{admin_id}",
        headers={"Authorization": f"Bearer {token}"},
    )
    assert resp.status_code == 409
    assert "last" in resp.json()["detail"].lower()


def test_cannot_deactivate_last_admin(app_client, fresh_db):
    admin_id, token = _seed_admin(fresh_db)
    # Promote a helper admin then deactivate self via them? Simpler: ensure demotion rule.
    # Create a second user and try to demote the current admin via PATCH.
    resp = app_client.post(
        "/api/users",
        headers={"Authorization": f"Bearer {token}"},
        json={"email": "y@test", "name": "Y", "role": "viewer"},
    )
    y_id = resp.json()["id"]
    # Try to demote self (admin → viewer) while only admin — should fail.
    resp = app_client.patch(
        f"/api/users/{admin_id}",
        headers={"Authorization": f"Bearer {token}"},
        json={"role": "viewer"},
    )
    assert resp.status_code == 409
    assert "admin" in resp.json()["detail"].lower()
  • Step 2: Run

Run: pytest tests/test_user_management.py -v -k "safeguard or cannot" Expected: PASS (implementation already includes safeguards in Task 1.3).

  • Step 3: Commit tests
git add tests/test_user_management.py
git commit -m "test(api): safeguard tests for self-deactivate and last admin (#11)"

Task 1.5: Active-flag enforcement in get_current_user

Files:

  • Modify: app/auth/dependencies.py

  • Step 1: Failing test

# tests/test_user_management.py — append

def test_deactivated_user_cannot_authenticate(app_client, fresh_db):
    """A deactivated user's old JWT must be rejected."""
    import uuid
    from src.db import get_system_db
    from src.repositories.users import UserRepository
    from app.auth.jwt import create_access_token

    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="u@test", name="U", role="analyst")
        token = create_access_token(user_id=uid, email="u@test", role="analyst")
        UserRepository(conn).update(id=uid, active=False)
    finally:
        conn.close()

    resp = app_client.get(
        "/api/users",  # any authenticated endpoint
        headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
    )
    # Deactivated — must not succeed.
    assert resp.status_code in (401, 403)
  • Step 2: Run — verify fail

Run: pytest tests/test_user_management.py::test_deactivated_user_cannot_authenticate -v Expected: may PASS (if user happens to be admin denied on role) or FAIL — depending on seed. If it passes here spuriously, adjust test to use 403-guaranteed endpoint. For safety, check any auth'd endpoint; 401 from inactive check is intended.

  • Step 3: Add active check in get_current_user

In app/auth/dependencies.py, after repo lookup:

    repo = UserRepository(conn)
    user = repo.get_by_id(payload.get("sub", ""))
    if not user:
        _fail("User not found")
    if not bool(user.get("active", True)):
        _fail("Account deactivated")
    return user
  • Step 4: Run full suite

Run: pytest tests/ -x --timeout=30 Expected: no regressions.

  • Step 5: Commit
git add app/auth/dependencies.py tests/test_user_management.py
git commit -m "feat(auth): reject requests from deactivated users (#11)"

Task 1.6: CLI commands — 5 new admin subcommands

Files:

  • Modify: cli/commands/admin.py

  • Test: tests/test_cli_admin.py — append

  • Step 1: Failing test

# tests/test_cli_admin.py — append

def test_admin_set_role_invokes_patch(monkeypatch):
    """`da admin set-role` sends PATCH to /api/users/{id} with role."""
    import httpx
    from cli.commands.admin import admin_app
    from typer.testing import CliRunner

    captured = {}

    def fake_patch(path, json=None, **kwargs):
        captured["path"] = path
        captured["json"] = json
        return httpx.Response(200, json={
            "id": "abc", "email": "x@y.z", "name": "X",
            "role": json.get("role") if json else "viewer",
            "active": True, "created_at": "", "deactivated_at": None,
        })

    from cli import client as cli_client
    monkeypatch.setattr(cli_client, "api_patch", fake_patch, raising=False)
    # patch admin.api_patch too since admin.py imports names
    from cli.commands import admin as admin_mod
    monkeypatch.setattr(admin_mod, "api_patch", fake_patch, raising=False)

    runner = CliRunner()
    result = runner.invoke(admin_app, ["set-role", "abc", "analyst"])
    assert result.exit_code == 0
    assert captured["path"] == "/api/users/abc"
    assert captured["json"] == {"role": "analyst"}
  • Step 2: Run — verify fail

Run: pytest tests/test_cli_admin.py::test_admin_set_role_invokes_patch -v Expected: FAIL (no set-role command).

  • Step 3: Add api_patch to cli/client.py
def api_patch(path: str, **kwargs) -> httpx.Response:
    with get_client() as client:
        return client.patch(path, **kwargs)
  • Step 4: Add CLI commands to cli/commands/admin.py

Append at the end of file:

from cli.client import api_patch


@admin_app.command("set-role")
def set_role(
    user_ref: str = typer.Argument(..., help="User id or email"),
    role: str = typer.Argument(..., help="viewer | analyst | km_admin | admin"),
):
    """Set a user's role."""
    uid = _resolve_user_id(user_ref)
    resp = api_patch(f"/api/users/{uid}", json={"role": role})
    _print_user_result(resp, f"Updated role for {user_ref}{role}")


@admin_app.command("deactivate")
def deactivate(user_ref: str = typer.Argument(..., help="User id or email")):
    """Deactivate a user (blocks login, existing tokens also rejected)."""
    uid = _resolve_user_id(user_ref)
    resp = api_post(f"/api/users/{uid}/deactivate")
    _print_user_result(resp, f"Deactivated {user_ref}")


@admin_app.command("activate")
def activate(user_ref: str = typer.Argument(..., help="User id or email")):
    """Re-activate a deactivated user."""
    uid = _resolve_user_id(user_ref)
    resp = api_post(f"/api/users/{uid}/activate")
    _print_user_result(resp, f"Activated {user_ref}")


@admin_app.command("reset-password")
def reset_password(user_ref: str = typer.Argument(..., help="User id or email")):
    """Generate a reset token (emailed if SMTP/SendGrid configured)."""
    uid = _resolve_user_id(user_ref)
    resp = api_post(f"/api/users/{uid}/reset-password")
    if resp.status_code == 200:
        data = resp.json()
        typer.echo(f"Reset token: {data['reset_token']}")
        typer.echo(f"Email sent: {data['email_sent']}")
    else:
        typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
        raise typer.Exit(1)


@admin_app.command("set-password")
def set_password(
    user_ref: str = typer.Argument(..., help="User id or email"),
    password: str = typer.Option(
        ..., prompt=True, hide_input=True, confirmation_prompt=True,
        help="New password (hidden input)",
    ),
):
    """Set a user's password directly (force-reset flow)."""
    uid = _resolve_user_id(user_ref)
    resp = api_post(f"/api/users/{uid}/set-password", json={"password": password})
    if resp.status_code == 204:
        typer.echo(f"Password set for {user_ref}")
    else:
        typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
        raise typer.Exit(1)


def _resolve_user_id(ref: str) -> str:
    """Accept either a UUID or an email; look up email → id via list."""
    if "@" not in ref:
        return ref
    resp = api_get("/api/users")
    if resp.status_code != 200:
        typer.echo(f"Could not list users: {resp.text}", err=True)
        raise typer.Exit(1)
    for u in resp.json():
        if u.get("email") == ref:
            return u["id"]
    typer.echo(f"User not found: {ref}", err=True)
    raise typer.Exit(1)


def _print_user_result(resp, ok_msg: str) -> None:
    if resp.status_code in (200, 204):
        typer.echo(ok_msg)
    else:
        try:
            detail = resp.json().get("detail", resp.text)
        except Exception:
            detail = resp.text
        typer.echo(f"Failed: {detail}", err=True)
        raise typer.Exit(1)

Also extend list-users output (replace body):

@admin_app.command("list-users")
def list_users(as_json: bool = typer.Option(False, "--json")):
    """List all users."""
    resp = api_get("/api/users")
    if resp.status_code != 200:
        typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
        raise typer.Exit(1)
    users = resp.json()
    if as_json:
        typer.echo(json.dumps(users, indent=2))
    else:
        for u in users:
            status_str = "active" if u.get("active", True) else "DEACTIVATED"
            typer.echo(
                f"  {u['email']:30s} role={u['role']:10s} {status_str:12s} id={u['id'][:8]}"
            )
  • Step 5: Run tests

Run: pytest tests/test_cli_admin.py -v Expected: all green.

  • Step 6: Commit
git add cli/client.py cli/commands/admin.py tests/test_cli_admin.py
git commit -m "feat(cli): da admin set-role/activate/deactivate/reset-password/set-password (#11)"

Task 1.7: UI — /admin/users page

Files:

  • Create: app/web/templates/admin_users.html

  • Modify: app/web/router.py — add route

  • Modify: app/web/templates/dashboard.html — nav link (admins only)

  • Step 1: Create template

Create app/web/templates/admin_users.html:

{% extends "base.html" %}
{% block title %}User management — {{ config.INSTANCE_NAME }}{% endblock %}

{% block content %}
<div class="container">
  <div class="d-flex justify-content-between align-items-center mb-3">
    <h2>User management</h2>
    <button class="btn btn-primary" onclick="openCreateModal()">+ Add user</button>
  </div>

  <table class="table table-striped" id="users-table">
    <thead>
      <tr>
        <th>Email</th><th>Name</th><th>Role</th><th>Active</th>
        <th>Created</th><th>Deactivated</th><th>Actions</th>
      </tr>
    </thead>
    <tbody>
      <!-- rows rendered by JS -->
    </tbody>
  </table>
</div>

<div class="modal" id="create-modal" style="display:none;">
  <div class="modal-content">
    <h3>Add user</h3>
    <label>Email <input id="new-email" type="email" required></label>
    <label>Name <input id="new-name" type="text"></label>
    <label>Role
      <select id="new-role">
        <option value="viewer">viewer</option>
        <option value="analyst" selected>analyst</option>
        <option value="km_admin">km_admin</option>
        <option value="admin">admin</option>
      </select>
    </label>
    <button class="btn btn-primary" onclick="createUser()">Create</button>
    <button class="btn btn-secondary" onclick="closeCreateModal()">Cancel</button>
  </div>
</div>

<script>
const API = "/api/users";

function fmtDate(s) { return s ? s.slice(0, 19).replace("T", " ") : ""; }

async function loadUsers() {
  const r = await fetch(API, {credentials: "include"});
  if (!r.ok) { alert("Failed to load users: " + r.status); return; }
  const users = await r.json();
  const tbody = document.querySelector("#users-table tbody");
  tbody.innerHTML = "";
  for (const u of users) {
    const tr = document.createElement("tr");
    tr.innerHTML = `
      <td>${u.email}</td>
      <td>${u.name || ""}</td>
      <td>
        <select onchange="setRole('${u.id}', this.value)">
          ${["viewer","analyst","km_admin","admin"].map(r =>
            `<option value="${r}" ${r===u.role?"selected":""}>${r}</option>`).join("")}
        </select>
      </td>
      <td>
        <input type="checkbox" ${u.active?"checked":""}
          onchange="toggleActive('${u.id}', this.checked)">
      </td>
      <td>${fmtDate(u.created_at)}</td>
      <td>${fmtDate(u.deactivated_at)}</td>
      <td>
        <button class="btn btn-sm" onclick="resetPassword('${u.id}')">Reset</button>
        <button class="btn btn-sm" onclick="setPassword('${u.id}')">Set pwd</button>
        <button class="btn btn-sm btn-danger" onclick="delUser('${u.id}','${u.email}')">Delete</button>
      </td>`;
    tbody.appendChild(tr);
  }
}

async function setRole(id, role) {
  const r = await fetch(`${API}/${id}`, {
    method: "PATCH", credentials: "include",
    headers: {"Content-Type":"application/json"},
    body: JSON.stringify({role}),
  });
  if (!r.ok) { alert("Failed: " + (await r.text())); }
  loadUsers();
}

async function toggleActive(id, active) {
  const path = active ? "activate" : "deactivate";
  const r = await fetch(`${API}/${id}/${path}`, {method: "POST", credentials: "include"});
  if (!r.ok) { alert("Failed: " + (await r.text())); }
  loadUsers();
}

async function resetPassword(id) {
  if (!confirm("Generate a password reset token?")) return;
  const r = await fetch(`${API}/${id}/reset-password`, {method: "POST", credentials: "include"});
  const data = await r.json();
  if (!r.ok) { alert("Failed: " + data.detail); return; }
  alert(`Reset token: ${data.reset_token}\nEmail sent: ${data.email_sent}`);
}

async function setPassword(id) {
  const pwd = prompt("New password (min 8 chars):");
  if (!pwd) return;
  const r = await fetch(`${API}/${id}/set-password`, {
    method: "POST", credentials: "include",
    headers: {"Content-Type":"application/json"},
    body: JSON.stringify({password: pwd}),
  });
  if (!r.ok) { alert("Failed: " + (await r.text())); return; }
  alert("Password set.");
}

async function delUser(id, email) {
  if (!confirm(`Delete ${email}? This cannot be undone.`)) return;
  const r = await fetch(`${API}/${id}`, {method: "DELETE", credentials: "include"});
  if (!r.ok) { alert("Failed: " + (await r.text())); return; }
  loadUsers();
}

function openCreateModal() { document.getElementById("create-modal").style.display = "block"; }
function closeCreateModal() { document.getElementById("create-modal").style.display = "none"; }

async function createUser() {
  const email = document.getElementById("new-email").value;
  const name = document.getElementById("new-name").value;
  const role = document.getElementById("new-role").value;
  const r = await fetch(API, {
    method: "POST", credentials: "include",
    headers: {"Content-Type":"application/json"},
    body: JSON.stringify({email, name: name || email.split("@")[0], role}),
  });
  if (!r.ok) { alert("Failed: " + (await r.text())); return; }
  closeCreateModal();
  loadUsers();
}

loadUsers();
</script>
{% endblock %}
  • Step 2: Add route in app/web/router.py

After the existing admin_permissions_page route:

@router.get("/admin/users", response_class=HTMLResponse)
async def admin_users_page(
    request: Request,
    user: dict = Depends(require_role(Role.ADMIN)),
):
    """Admin page for user management."""
    ctx = _build_context(request, user=user)
    return templates.TemplateResponse(request, "admin_users.html", ctx)
  • Step 3: Add nav link in app/web/templates/dashboard.html

Find the existing admin nav block (search for "admin/tables" or "admin/permissions") and add /admin/users as a sibling:

{% if user.role == 'admin' %}
  <a class="nav-link" href="/admin/users">Users</a>
{% endif %}

(If no dashboard admin nav block exists, skip — the page is still reachable by URL and tests will cover it.)

  • Step 4: Test the route renders for admin

Add to tests/test_user_management.py:

def test_admin_users_page_renders_for_admin(app_client, fresh_db):
    admin_id, token = _seed_admin(fresh_db)
    resp = app_client.get(
        "/admin/users",
        headers={"Accept": "text/html"},
        cookies={"access_token": token},
    )
    assert resp.status_code == 200
    assert "User management" in resp.text


def test_admin_users_page_denies_non_admin(app_client, fresh_db):
    import uuid
    from src.db import get_system_db
    from src.repositories.users import UserRepository
    from app.auth.jwt import create_access_token
    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="a@test", name="A", role="analyst")
        token = create_access_token(user_id=uid, email="a@test", role="analyst")
    finally:
        conn.close()
    resp = app_client.get(
        "/admin/users",
        headers={"Accept": "text/html"},
        cookies={"access_token": token},
        follow_redirects=False,
    )
    # HTML request to admin-only page → 302 (to /login) for non-admin per Phase 0
    assert resp.status_code in (302, 403)
  • Step 5: Run

Run: pytest tests/test_user_management.py -v -k "admin_users_page" Expected: PASS.

  • Step 6: Commit
git add app/web/router.py app/web/templates/admin_users.html app/web/templates/dashboard.html tests/test_user_management.py
git commit -m "feat(ui): /admin/users management page (#11)"

Task 1.8: Phase 1 integration — full test suite + review

  • Step 1: Run full suite

Run: pytest tests/ --timeout=30 Expected: all green.

  • Step 2: Manual smoke

Run locally: uvicorn app.main:app --reload, visit /admin/users as admin, create+edit+delete a test user, verify deactivated user's token is rejected on an API call.

  • Step 3: Request code review

Dispatch superpowers:code-reviewer agent with diff of git log --oneline main..HEAD scope; fix any blockers.

  • Step 4: Optional squash/rebase

If commits are noisy, rebase-interactive to a clean sequence. Otherwise leave as-is.


Phase 2 — Personal Access Tokens (#12)

Scope: schema v6 s tabulkou personal_access_tokens, JWT rozšíření (typ, jti), endpointy /auth/tokens (session-only) + admin variant, CLI (da auth token create|list|revoke), profile UI s one-time revealem, aktualizace cli/skills/security.md.

File Structure

  • Modify: src/db.py — schema v6 + personal_access_tokens
  • Create: src/repositories/access_tokens.py
  • Modify: app/auth/jwt.pytyp, jti verification for PATs
  • Modify: app/auth/dependencies.py — reject revoked/expired PATs, update last_used_at
  • Create: app/api/tokens.py/auth/tokens CRUD (session-only)
  • Modify: app/main.py — register tokens router
  • Create: cli/commands/tokens.py
  • Modify: cli/main.py — add token sub-typer under auth
  • Modify: cli/commands/auth.py — add token subcommand group hook
  • Create: app/web/templates/profile.html (or profile_tokens.html)
  • Modify: app/web/router.py/profile route
  • Modify: cli/skills/security.md — fix 24h vs 30d mismatch + add PAT section
  • Test: tests/test_pat.py (new)

Task 2.1: Schema v6 — personal_access_tokens table

Files:

  • Modify: src/db.py

  • Step 1: Failing test

# tests/test_pat.py
import os
import tempfile
import pytest


@pytest.fixture
def fresh_db(monkeypatch):
    with tempfile.TemporaryDirectory() as tmp:
        monkeypatch.setenv("DATA_DIR", tmp)
        monkeypatch.setenv("TESTING", "1")
        monkeypatch.setenv("JWT_SECRET_KEY", "test-jwt-secret-key-minimum-32-chars!!")
        yield tmp


def test_schema_v6_creates_pat_table(fresh_db):
    from src.db import get_system_db, get_schema_version, close_system_db
    conn = get_system_db()
    try:
        cols = conn.execute("PRAGMA table_info(personal_access_tokens)").fetchall()
        col_names = [c[1] for c in cols]
        for expected in ("id", "user_id", "name", "token_hash", "prefix",
                         "scopes", "created_at", "expires_at", "last_used_at", "revoked_at"):
            assert expected in col_names
        assert get_schema_version(conn) >= 6
    finally:
        conn.close()
        close_system_db()
  • Step 2: Run — fail

Run: pytest tests/test_pat.py::test_schema_v6_creates_pat_table -v

  • Step 3: Bump schema and add DDL

src/db.py:

SCHEMA_VERSION = 6

Append to _SYSTEM_SCHEMA just before the closing """:

CREATE TABLE IF NOT EXISTS personal_access_tokens (
    id           VARCHAR PRIMARY KEY,
    user_id      VARCHAR NOT NULL,
    name         VARCHAR NOT NULL,
    token_hash   VARCHAR NOT NULL,
    prefix       VARCHAR NOT NULL,
    scopes       VARCHAR,
    created_at   TIMESTAMP NOT NULL DEFAULT current_timestamp,
    expires_at   TIMESTAMP,
    last_used_at TIMESTAMP,
    revoked_at   TIMESTAMP
);

Add migration list:

_V5_TO_V6_MIGRATIONS = [
    """
    CREATE TABLE IF NOT EXISTS personal_access_tokens (
        id           VARCHAR PRIMARY KEY,
        user_id      VARCHAR NOT NULL,
        name         VARCHAR NOT NULL,
        token_hash   VARCHAR NOT NULL,
        prefix       VARCHAR NOT NULL,
        scopes       VARCHAR,
        created_at   TIMESTAMP NOT NULL DEFAULT current_timestamp,
        expires_at   TIMESTAMP,
        last_used_at TIMESTAMP,
        revoked_at   TIMESTAMP
    )
    """,
]

Extend dispatch:

            if current < 6:
                for sql in _V5_TO_V6_MIGRATIONS:
                    conn.execute(sql)
  • Step 4: Run — pass

Run: pytest tests/test_pat.py::test_schema_v6_creates_pat_table -v

  • Step 5: Commit
git add src/db.py tests/test_pat.py
git commit -m "feat(db): schema v6 — personal_access_tokens (#12)"

Task 2.2: AccessTokenRepository

Files:

  • Create: src/repositories/access_tokens.py

  • Step 1: Failing test

# tests/test_pat.py — append

def test_access_token_repo_create_and_lookup(fresh_db):
    import hashlib, uuid
    from datetime import datetime, timezone, timedelta
    from src.db import get_system_db, close_system_db
    from src.repositories.access_tokens import AccessTokenRepository

    conn = get_system_db()
    try:
        repo = AccessTokenRepository(conn)
        token_id = str(uuid.uuid4())
        raw = "abcdefgh" + "x" * 32
        repo.create(
            id=token_id,
            user_id="u1",
            name="laptop",
            token_hash=hashlib.sha256(raw.encode()).hexdigest(),
            prefix=raw[:8],
            expires_at=datetime.now(timezone.utc) + timedelta(days=90),
        )
        row = repo.get_by_id(token_id)
        assert row is not None
        assert row["name"] == "laptop"
        assert row["prefix"] == "abcdefgh"
        assert row["revoked_at"] is None

        rows = repo.list_for_user("u1")
        assert len(rows) == 1

        repo.revoke(token_id)
        assert repo.get_by_id(token_id)["revoked_at"] is not None
    finally:
        conn.close()
        close_system_db()


def test_access_token_repo_mark_used(fresh_db):
    import hashlib, uuid
    from datetime import datetime, timezone
    from src.db import get_system_db, close_system_db
    from src.repositories.access_tokens import AccessTokenRepository

    conn = get_system_db()
    try:
        repo = AccessTokenRepository(conn)
        tid = str(uuid.uuid4())
        repo.create(id=tid, user_id="u1", name="x",
                    token_hash=hashlib.sha256(b"r").hexdigest(), prefix="rrrrrrrr")
        assert repo.get_by_id(tid)["last_used_at"] is None
        repo.mark_used(tid)
        assert repo.get_by_id(tid)["last_used_at"] is not None
    finally:
        conn.close()
        close_system_db()
  • Step 2: Run — fail

  • Step 3: Implement repository

Create src/repositories/access_tokens.py:

"""Repository for personal access tokens (#12)."""

from datetime import datetime, timezone
from typing import Any, Optional, List, Dict

import duckdb


class AccessTokenRepository:
    def __init__(self, conn: duckdb.DuckDBPyConnection):
        self.conn = conn

    def _row_to_dict(self, row) -> Optional[Dict[str, Any]]:
        if not row:
            return None
        columns = [desc[0] for desc in self.conn.description]
        return dict(zip(columns, row))

    def create(
        self,
        id: str,
        user_id: str,
        name: str,
        token_hash: str,
        prefix: str,
        expires_at: Optional[datetime] = None,
        scopes: Optional[str] = None,
    ) -> None:
        self.conn.execute(
            """INSERT INTO personal_access_tokens
            (id, user_id, name, token_hash, prefix, scopes, created_at, expires_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            [id, user_id, name, token_hash, prefix, scopes,
             datetime.now(timezone.utc), expires_at],
        )

    def get_by_id(self, token_id: str) -> Optional[Dict[str, Any]]:
        result = self.conn.execute(
            "SELECT * FROM personal_access_tokens WHERE id = ?", [token_id]
        ).fetchone()
        return self._row_to_dict(result)

    def list_for_user(self, user_id: str, include_revoked: bool = True) -> List[Dict[str, Any]]:
        sql = "SELECT * FROM personal_access_tokens WHERE user_id = ?"
        if not include_revoked:
            sql += " AND revoked_at IS NULL"
        sql += " ORDER BY created_at DESC"
        rows = self.conn.execute(sql, [user_id]).fetchall()
        if not rows:
            return []
        columns = [desc[0] for desc in self.conn.description]
        return [dict(zip(columns, r)) for r in rows]

    def list_all(self) -> List[Dict[str, Any]]:
        rows = self.conn.execute(
            "SELECT * FROM personal_access_tokens ORDER BY created_at DESC"
        ).fetchall()
        if not rows:
            return []
        columns = [desc[0] for desc in self.conn.description]
        return [dict(zip(columns, r)) for r in rows]

    def revoke(self, token_id: str) -> None:
        self.conn.execute(
            "UPDATE personal_access_tokens SET revoked_at = ? WHERE id = ?",
            [datetime.now(timezone.utc), token_id],
        )

    def delete(self, token_id: str) -> None:
        self.conn.execute("DELETE FROM personal_access_tokens WHERE id = ?", [token_id])

    def mark_used(self, token_id: str) -> None:
        self.conn.execute(
            "UPDATE personal_access_tokens SET last_used_at = ? WHERE id = ?",
            [datetime.now(timezone.utc), token_id],
        )
  • Step 4: Run — pass

  • Step 5: Commit

git add src/repositories/access_tokens.py tests/test_pat.py
git commit -m "feat(users): access_tokens repository (#12)"

Task 2.3: JWT — typ field, helper for PAT vs session

Files:

  • Modify: app/auth/jwt.py

  • Step 1: Failing test

# tests/test_pat.py — append

def test_pat_token_carries_typ_claim(fresh_db):
    from app.auth.jwt import create_access_token, verify_token
    token = create_access_token(
        user_id="u1", email="u@test", role="analyst",
        token_id="deadbeef-1234", typ="pat",
    )
    payload = verify_token(token)
    assert payload["typ"] == "pat"
    assert payload["jti"] == "deadbeef-1234"


def test_session_token_defaults_typ(fresh_db):
    from app.auth.jwt import create_access_token, verify_token
    token = create_access_token(user_id="u1", email="u@test", role="analyst")
    payload = verify_token(token)
    # Default typ is "session".
    assert payload.get("typ") == "session"
  • Step 2: Run — fail

  • Step 3: Extend create_access_token

Replace the function body in app/auth/jwt.py:

def create_access_token(
    user_id: str,
    email: str,
    role: str = "analyst",
    expires_delta: Optional[timedelta] = None,
    token_id: Optional[str] = None,
    typ: str = "session",
) -> str:
    """Create a JWT. `typ` is "session" (interactive login) or "pat" (long-lived)."""
    expire = datetime.now(timezone.utc) + (
        expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
    )
    payload = {
        "sub": user_id,
        "email": email,
        "role": role,
        "typ": typ,
        "exp": expire,
        "iat": datetime.now(timezone.utc),
        "jti": token_id or uuid.uuid4().hex,
    }
    return jwt.encode(payload, _get_cached_secret_key(), algorithm=ALGORITHM)
  • Step 4: Run — pass

  • Step 5: Commit

git add app/auth/jwt.py tests/test_pat.py
git commit -m "feat(auth): JWT carries typ (session|pat) and explicit jti (#12)"

Task 2.4: Auth dependency — reject revoked/expired PATs, update last_used_at

Files:

  • Modify: app/auth/dependencies.py

  • Step 1: Failing test

# tests/test_pat.py — append

def test_revoked_pat_is_rejected(fresh_db, monkeypatch):
    from fastapi.testclient import TestClient
    import hashlib, uuid
    from datetime import datetime, timezone, timedelta
    from src.db import get_system_db, close_system_db
    from src.repositories.users import UserRepository
    from src.repositories.access_tokens import AccessTokenRepository
    from app.auth.jwt import create_access_token
    from app.main import app

    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
        token_id = str(uuid.uuid4())
        raw = "secretXX" + "a" * 32
        AccessTokenRepository(conn).create(
            id=token_id, user_id=uid, name="ci",
            token_hash=hashlib.sha256(raw.encode()).hexdigest(),
            prefix=raw[:8],
            expires_at=datetime.now(timezone.utc) + timedelta(days=30),
        )
        jwt_token = create_access_token(
            user_id=uid, email="u@t", role="admin", token_id=token_id, typ="pat",
        )
        # Revoke
        AccessTokenRepository(conn).revoke(token_id)
    finally:
        conn.close()
        close_system_db()

    client = TestClient(app)
    resp = client.get(
        "/api/users",
        headers={"Authorization": f"Bearer {jwt_token}", "Accept": "application/json"},
    )
    assert resp.status_code == 401


def test_expired_pat_is_rejected_from_db(fresh_db):
    """A PAT with a past expires_at in DB is rejected even if JWT exp is in future."""
    from fastapi.testclient import TestClient
    import hashlib, uuid
    from datetime import datetime, timezone, timedelta
    from src.db import get_system_db, close_system_db
    from src.repositories.users import UserRepository
    from src.repositories.access_tokens import AccessTokenRepository
    from app.auth.jwt import create_access_token
    from app.main import app

    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
        tid = str(uuid.uuid4())
        # Past-dated expiry in DB
        AccessTokenRepository(conn).create(
            id=tid, user_id=uid, name="stale",
            token_hash=hashlib.sha256(b"whatever").hexdigest(), prefix=tid.replace("-","")[:8],
            expires_at=datetime.now(timezone.utc) - timedelta(days=1),
        )
        # JWT with much longer TTL so signature-level `exp` would pass
        pat = create_access_token(
            user_id=uid, email="u@t", role="admin",
            token_id=tid, typ="pat",
            expires_delta=timedelta(days=365),
        )
    finally:
        conn.close()
        close_system_db()

    client = TestClient(app)
    resp = client.get(
        "/api/users",
        headers={"Authorization": f"Bearer {pat}", "Accept": "application/json"},
    )
    assert resp.status_code == 401
  • Step 2: Run — fail

  • Step 3: Add PAT validation in app/auth/dependencies.py

Inside get_current_user, after payload = verify_token(token):

    # PAT validation: check it's not revoked / expired / unknown in DB.
    if payload.get("typ") == "pat":
        from datetime import datetime, timezone
        import hashlib
        from src.repositories.access_tokens import AccessTokenRepository
        tokens_repo = AccessTokenRepository(conn)
        record = tokens_repo.get_by_id(payload.get("jti", ""))
        if not record:
            _fail("Token unknown")
        if record.get("revoked_at") is not None:
            _fail("Token revoked")
        exp_at = record.get("expires_at")
        if exp_at is not None:
            if isinstance(exp_at, str):
                exp_at = datetime.fromisoformat(exp_at)
            if exp_at.tzinfo is None:
                exp_at = exp_at.replace(tzinfo=timezone.utc)
            if datetime.now(timezone.utc) > exp_at:
                _fail("Token expired")
        # Defense-in-depth: stored token_hash must match sha256(bearer JWT).
        # Protects against a forged-but-unrevoked JWT using a stolen key.
        stored_hash = record.get("token_hash")
        if stored_hash:
            actual = hashlib.sha256(token.encode()).hexdigest()
            if actual != stored_hash:
                _fail("Token mismatch")
        # Record last_used_at synchronously — acceptable cost; can batch later.
        try:
            tokens_repo.mark_used(payload["jti"])
        except Exception:
            pass
  • Step 4: Run — pass

  • Step 5: Commit

git add app/auth/dependencies.py tests/test_pat.py
git commit -m "feat(auth): reject revoked/expired PATs; update last_used_at (#12)"

Task 2.5: API — /auth/tokens CRUD (session-only)

Files:

  • Create: app/api/tokens.py

  • Modify: app/main.py — include router

  • Modify: app/auth/dependencies.py — add require_session_token dep

  • Step 1: Failing test

# tests/test_pat.py — append

def test_create_pat_returns_raw_once(fresh_db):
    from fastapi.testclient import TestClient
    import uuid
    from src.db import get_system_db, close_system_db
    from src.repositories.users import UserRepository
    from app.auth.jwt import create_access_token
    from app.main import app

    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
        sess_token = create_access_token(user_id=uid, email="u@t", role="admin")  # typ=session
    finally:
        conn.close()
        close_system_db()

    client = TestClient(app)
    resp = client.post(
        "/auth/tokens",
        headers={"Authorization": f"Bearer {sess_token}"},
        json={"name": "laptop", "expires_in_days": 30},
    )
    assert resp.status_code == 201
    data = resp.json()
    assert data["name"] == "laptop"
    assert "token" in data and data["token"]  # raw token returned exactly once

    # Listing returns prefix, never raw.
    # Prefix is derived from the token id (jti), not the JWT string, to avoid
    # all tokens having the useless "eyJhbGci" JWT-header prefix.
    list_resp = client.get(
        "/auth/tokens", headers={"Authorization": f"Bearer {sess_token}"},
    )
    assert list_resp.status_code == 200
    rows = list_resp.json()
    assert len(rows) == 1
    assert "token" not in rows[0]
    assert rows[0]["prefix"] == data["prefix"]
    assert len(rows[0]["prefix"]) == 8
    assert not data["prefix"].startswith("eyJ")  # regression: not the JWT header


def test_pat_cannot_create_pat(fresh_db):
    from fastapi.testclient import TestClient
    import hashlib, uuid
    from datetime import datetime, timezone, timedelta
    from src.db import get_system_db, close_system_db
    from src.repositories.users import UserRepository
    from src.repositories.access_tokens import AccessTokenRepository
    from app.auth.jwt import create_access_token
    from app.main import app

    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="u@t", name="U", role="admin")
        tid = str(uuid.uuid4())
        raw = "abcdefgh" + "x" * 32
        AccessTokenRepository(conn).create(
            id=tid, user_id=uid, name="x",
            token_hash=hashlib.sha256(raw.encode()).hexdigest(), prefix=raw[:8],
            expires_at=datetime.now(timezone.utc) + timedelta(days=90),
        )
        pat = create_access_token(user_id=uid, email="u@t", role="admin", token_id=tid, typ="pat")
    finally:
        conn.close()
        close_system_db()

    client = TestClient(app)
    resp = client.post(
        "/auth/tokens",
        headers={"Authorization": f"Bearer {pat}"},
        json={"name": "bad", "expires_in_days": 30},
    )
    assert resp.status_code == 403
  • Step 2: Run — fail

  • Step 3: Add require_session_token dependency

In app/auth/dependencies.py, append:

async def require_session_token(request: Request, user: dict = Depends(get_current_user)) -> dict:
    """Like get_current_user but rejects PAT — for endpoints that must not
    be callable via a long-lived CI token (e.g. creating new tokens, changing password)."""
    auth = request.headers.get("authorization", "")
    token = None
    if auth.startswith("Bearer "):
        token = auth.removeprefix("Bearer ")
    if not token and request:
        token = request.cookies.get("access_token")
    if token:
        from app.auth.jwt import verify_token
        payload = verify_token(token) or {}
        if payload.get("typ") == "pat":
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="This endpoint requires an interactive session, not a PAT",
            )
    return user
  • Step 4: Create router app/api/tokens.py
"""Personal access token endpoints (#12)."""

import hashlib
import secrets
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional, List

import duckdb
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel

from app.auth.dependencies import require_session_token, require_role, Role, _get_db
from src.repositories.access_tokens import AccessTokenRepository
from src.repositories.audit import AuditRepository
from app.auth.jwt import create_access_token

router = APIRouter(prefix="/auth/tokens", tags=["tokens"])
admin_router = APIRouter(prefix="/auth/admin/tokens", tags=["tokens-admin"])


class CreateTokenRequest(BaseModel):
    name: str
    expires_in_days: Optional[int] = 90  # null = no expiry


class CreateTokenResponse(BaseModel):
    id: str
    name: str
    prefix: str
    token: str  # raw token — returned exactly once
    expires_at: Optional[str]
    created_at: str


class TokenListItem(BaseModel):
    id: str
    name: str
    prefix: str
    created_at: str
    expires_at: Optional[str]
    last_used_at: Optional[str]
    revoked_at: Optional[str]


def _audit(conn, actor: str, action: str, target: str, params=None):
    try:
        AuditRepository(conn).log(user_id=actor, action=action,
                                  resource=f"token:{target}", params=params)
    except Exception:
        pass


def _row_to_item(row: dict) -> TokenListItem:
    return TokenListItem(
        id=row["id"], name=row["name"], prefix=row["prefix"],
        created_at=str(row.get("created_at") or ""),
        expires_at=str(row["expires_at"]) if row.get("expires_at") else None,
        last_used_at=str(row["last_used_at"]) if row.get("last_used_at") else None,
        revoked_at=str(row["revoked_at"]) if row.get("revoked_at") else None,
    )


@router.post("", response_model=CreateTokenResponse, status_code=201)
async def create_token(
    payload: CreateTokenRequest,
    user: dict = Depends(require_session_token),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    if not payload.name.strip():
        raise HTTPException(status_code=400, detail="name is required")
    repo = AccessTokenRepository(conn)
    token_id = str(uuid.uuid4())
    expires_at = None
    expires_delta = None
    if payload.expires_in_days:
        expires_delta = timedelta(days=payload.expires_in_days)
        expires_at = datetime.now(timezone.utc) + expires_delta
    # Build the JWT that embeds jti=token_id and typ=pat
    jwt_token = create_access_token(
        user_id=user["id"], email=user["email"], role=user["role"],
        token_id=token_id, typ="pat", expires_delta=expires_delta,
    )
    # Prefix: first 8 chars of the jti (UUID) — uniquely identifies the token in UI
    # without exposing JWT headers (which all start with "eyJhbGci…" and are useless
    # for identification). The JWT itself is returned ONCE in the response body.
    prefix = token_id.replace("-", "")[:8]
    # token_hash = sha256(raw JWT). Used in verify_token as defense-in-depth.
    token_hash = hashlib.sha256(jwt_token.encode()).hexdigest()
    repo.create(
        id=token_id, user_id=user["id"], name=payload.name.strip(),
        token_hash=token_hash, prefix=prefix, expires_at=expires_at,
    )
    _audit(conn, user["id"], "token.create", token_id, {"name": payload.name})
    return CreateTokenResponse(
        id=token_id, name=payload.name.strip(), prefix=prefix,
        token=jwt_token,  # returned EXACTLY ONCE; never retrievable again
        expires_at=str(expires_at) if expires_at else None,
        created_at=str(datetime.now(timezone.utc)),
    )


@router.get("", response_model=List[TokenListItem])
async def list_tokens(
    user: dict = Depends(require_session_token),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    rows = AccessTokenRepository(conn).list_for_user(user["id"])
    return [_row_to_item(r) for r in rows]


@router.get("/{token_id}", response_model=TokenListItem)
async def get_token(
    token_id: str,
    user: dict = Depends(require_session_token),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    row = AccessTokenRepository(conn).get_by_id(token_id)
    if not row or row["user_id"] != user["id"]:
        raise HTTPException(status_code=404, detail="Token not found")
    return _row_to_item(row)


@router.delete("/{token_id}", status_code=204)
async def revoke_token(
    token_id: str,
    user: dict = Depends(require_session_token),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = AccessTokenRepository(conn)
    row = repo.get_by_id(token_id)
    if not row or row["user_id"] != user["id"]:
        raise HTTPException(status_code=404, detail="Token not found")
    repo.revoke(token_id)
    _audit(conn, user["id"], "token.revoke", token_id)


# Admin — list & revoke tokens across users (for incident response)

@admin_router.get("", response_model=List[TokenListItem])
async def admin_list_tokens(
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    return [_row_to_item(r) for r in AccessTokenRepository(conn).list_all()]


@admin_router.delete("/{token_id}", status_code=204)
async def admin_revoke_token(
    token_id: str,
    user: dict = Depends(require_role(Role.ADMIN)),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = AccessTokenRepository(conn)
    row = repo.get_by_id(token_id)
    if not row:
        raise HTTPException(status_code=404, detail="Token not found")
    repo.revoke(token_id)
    _audit(conn, user["id"], "token.admin_revoke", token_id, {"owner_id": row["user_id"]})
  • Step 5: Register routers in app/main.py

Add imports:

from app.api.tokens import router as tokens_router, admin_router as tokens_admin_router

Add in create_app near other include_router calls:

    app.include_router(tokens_router)
    app.include_router(tokens_admin_router)
  • Step 6: Run — pass

Run: pytest tests/test_pat.py -v

  • Step 7: Commit
git add app/api/tokens.py app/main.py app/auth/dependencies.py tests/test_pat.py
git commit -m "feat(api): /auth/tokens CRUD + admin revoke; session-only guard (#12)"

Task 2.6: CLI — da auth token create|list|revoke

Files:

  • Create: cli/commands/tokens.py

  • Modify: cli/commands/auth.py — register token sub-typer

  • Test: tests/test_cli_auth.py — append

  • Step 1: Failing test

# tests/test_cli_auth.py — append

def test_da_auth_token_create_calls_api(monkeypatch):
    import httpx
    from typer.testing import CliRunner
    from cli.commands.auth import auth_app
    from cli.commands import tokens as tok_mod

    captured = {}

    def fake_post(path, json=None, **kwargs):
        captured["path"] = path
        captured["json"] = json
        return httpx.Response(201, json={
            "id": "abc", "name": json["name"], "prefix": "XXXXXXXX",
            "token": "raw-token-once",
            "expires_at": None, "created_at": "2026-04-21T00:00:00+00:00",
        })

    monkeypatch.setattr(tok_mod, "api_post", fake_post, raising=False)

    runner = CliRunner()
    result = runner.invoke(auth_app, ["token", "create", "--name", "laptop", "--ttl", "30d"])
    assert result.exit_code == 0, result.output
    assert captured["path"] == "/auth/tokens"
    assert captured["json"] == {"name": "laptop", "expires_in_days": 30}
    assert "raw-token-once" in result.output
  • Step 2: Run — fail

  • Step 3: Create cli/commands/tokens.py

"""`da auth token` — manage personal access tokens (#12)."""

import json as _json
import re
from typing import Optional

import typer

from cli.client import api_post, api_get, api_delete

token_app = typer.Typer(help="Personal access tokens (long-lived CLI/CI auth)")


def _parse_ttl(ttl: Optional[str]) -> Optional[int]:
    """Parse "30d", "90d", "365d", "never" → days (int) or None."""
    if not ttl or ttl.lower() in ("never", "none", "no-expiry"):
        return None
    m = re.fullmatch(r"(\d+)d", ttl.lower().strip())
    if not m:
        raise typer.BadParameter(f"Invalid TTL: {ttl}. Use e.g. 30d, 90d, 365d, or 'never'.")
    return int(m.group(1))


@token_app.command("create")
def create(
    name: str = typer.Option(..., "--name", help="Human label for the token"),
    ttl: str = typer.Option("90d", "--ttl", help="Lifetime (e.g. 30d, 90d, 365d, never)"),
    raw: bool = typer.Option(False, "--raw", help="Print only the raw token (for CI)"),
):
    """Create a new personal access token."""
    body = {"name": name, "expires_in_days": _parse_ttl(ttl)}
    resp = api_post("/auth/tokens", json=body)
    if resp.status_code != 201:
        typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
        raise typer.Exit(1)
    data = resp.json()
    if raw:
        typer.echo(data["token"])
        return
    typer.echo("Personal access token created — this is shown ONCE:")
    typer.echo("")
    typer.echo(f"    {data['token']}")
    typer.echo("")
    typer.echo(f"id:      {data['id']}")
    typer.echo(f"name:    {data['name']}")
    typer.echo(f"expires: {data.get('expires_at') or 'never'}")
    typer.echo("")
    typer.echo("Export it so `da` can use it:")
    typer.echo(f"    export DA_TOKEN={data['token']}")


@token_app.command("list")
def list_tokens(as_json: bool = typer.Option(False, "--json")):
    """List your personal access tokens."""
    resp = api_get("/auth/tokens")
    if resp.status_code != 200:
        typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
        raise typer.Exit(1)
    rows = resp.json()
    if as_json:
        typer.echo(_json.dumps(rows, indent=2))
        return
    if not rows:
        typer.echo("No tokens yet. Create one with: da auth token create --name <label>")
        return
    typer.echo(f"{'ID':36s} {'NAME':20s} {'PREFIX':10s} {'EXPIRES':20s} {'LAST USED':20s} STATUS")
    for r in rows:
        status = "revoked" if r.get("revoked_at") else "active"
        typer.echo(
            f"{r['id']:36s} {r['name']:20s} {r['prefix']:10s} "
            f"{(r.get('expires_at') or 'never'):20s} "
            f"{(r.get('last_used_at') or '-'):20s} {status}"
        )


@token_app.command("revoke")
def revoke(
    ident: str = typer.Argument(..., help="Token id, prefix, or name"),
):
    """Revoke a token."""
    resp = api_get("/auth/tokens")
    if resp.status_code != 200:
        typer.echo(f"Failed to list tokens: {resp.text}", err=True)
        raise typer.Exit(1)
    rows = resp.json()
    match = next(
        (r for r in rows if r["id"] == ident or r["prefix"] == ident or r["name"] == ident),
        None,
    )
    if not match:
        typer.echo(f"No token matches {ident}", err=True)
        raise typer.Exit(1)
    del_resp = api_delete(f"/auth/tokens/{match['id']}")
    if del_resp.status_code != 204:
        typer.echo(f"Failed: {del_resp.text}", err=True)
        raise typer.Exit(1)
    typer.echo(f"Revoked token {match['id']} ({match['name']})")
  • Step 4: Wire into cli/commands/auth.py

At the end of cli/commands/auth.py add:

from cli.commands.tokens import token_app
auth_app.add_typer(token_app, name="token")
  • Step 5: Run — pass

Run: pytest tests/test_cli_auth.py -v

  • Step 6: Commit
git add cli/commands/tokens.py cli/commands/auth.py tests/test_cli_auth.py
git commit -m "feat(cli): da auth token create/list/revoke (#12)"

Task 2.7: UI — /profile page with PAT section

Files:

  • Create: app/web/templates/profile.html

  • Modify: app/web/router.py — add /profile route

  • Modify: app/web/templates/dashboard.html — add "Profile" link in nav (any authenticated user)

  • Step 1: Create template

Create app/web/templates/profile.html:

{% extends "base.html" %}
{% block title %}Profile — {{ config.INSTANCE_NAME }}{% endblock %}

{% block content %}
<div class="container">
  <h2>Profile</h2>
  <p>Signed in as <strong>{{ user.email }}</strong> ({{ user.role }}).</p>

  <h3>Personal access tokens</h3>
  <p>Long-lived tokens for CLI, CI, and headless clients.
     <a href="/install">How to use a token with <code>da</code> CLI →</a>
  </p>

  <form id="create-form" onsubmit="createToken(event)">
    <input id="new-name" type="text" placeholder="Token name (e.g. laptop, github-ci)" required>
    <select id="new-ttl">
      <option value="30">30 days</option>
      <option value="90" selected>90 days</option>
      <option value="365">1 year</option>
      <option value="">never</option>
    </select>
    <button class="btn btn-primary" type="submit">Create token</button>
  </form>

  <div id="new-token-reveal" style="display:none; margin: 1em 0; padding: 1em; background: #fee3; border: 1px solid #ca0;">
    <strong>Copy your token now — it will not be shown again:</strong>
    <pre><code id="new-token-raw"></code></pre>
    <button class="btn btn-sm" onclick="copyNewToken()">Copy</button>
    <button class="btn btn-sm" onclick="dismissReveal()">Dismiss</button>
  </div>

  <table class="table" id="tokens-table" style="margin-top: 1em;">
    <thead>
      <tr><th>Name</th><th>Prefix</th><th>Created</th><th>Expires</th>
          <th>Last used</th><th>Status</th><th>Actions</th></tr>
    </thead>
    <tbody></tbody>
  </table>
</div>

<script>
async function loadTokens() {
  const r = await fetch("/auth/tokens", {credentials: "include"});
  if (!r.ok) { alert("Failed: " + r.status); return; }
  const rows = await r.json();
  const tbody = document.querySelector("#tokens-table tbody");
  tbody.innerHTML = "";
  for (const t of rows) {
    const status = t.revoked_at ? "revoked" : "active";
    const tr = document.createElement("tr");
    tr.innerHTML = `
      <td>${t.name}</td>
      <td><code>${t.prefix}…</code></td>
      <td>${t.created_at.slice(0,19).replace("T"," ")}</td>
      <td>${t.expires_at ? t.expires_at.slice(0,19).replace("T"," ") : "never"}</td>
      <td>${t.last_used_at ? t.last_used_at.slice(0,19).replace("T"," ") : "-"}</td>
      <td>${status}</td>
      <td>${t.revoked_at ? "" :
        `<button class="btn btn-sm btn-danger" onclick="revokeToken('${t.id}')">Revoke</button>`}</td>`;
    tbody.appendChild(tr);
  }
}

async function createToken(ev) {
  ev.preventDefault();
  const name = document.getElementById("new-name").value;
  const ttl = document.getElementById("new-ttl").value;
  const body = {name, expires_in_days: ttl ? Number(ttl) : null};
  const r = await fetch("/auth/tokens", {
    method: "POST", credentials: "include",
    headers: {"Content-Type":"application/json"},
    body: JSON.stringify(body),
  });
  if (!r.ok) { alert("Failed: " + (await r.text())); return; }
  const data = await r.json();
  document.getElementById("new-token-raw").textContent = data.token;
  document.getElementById("new-token-reveal").style.display = "block";
  document.getElementById("new-name").value = "";
  loadTokens();
}

async function revokeToken(id) {
  if (!confirm("Revoke this token?")) return;
  const r = await fetch(`/auth/tokens/${id}`, {method: "DELETE", credentials: "include"});
  if (!r.ok) { alert("Failed: " + (await r.text())); return; }
  loadTokens();
}

function copyNewToken() {
  const txt = document.getElementById("new-token-raw").textContent;
  navigator.clipboard.writeText(txt);
}
function dismissReveal() {
  document.getElementById("new-token-reveal").style.display = "none";
  document.getElementById("new-token-raw").textContent = "";
}

loadTokens();
</script>
{% endblock %}
  • Step 2: Add route in app/web/router.py
@router.get("/profile", response_class=HTMLResponse)
async def profile_page(
    request: Request,
    user: dict = Depends(get_current_user),
):
    ctx = _build_context(request, user=user)
    return templates.TemplateResponse(request, "profile.html", ctx)
  • Step 3: Test
# tests/test_pat.py — append

def test_profile_page_renders(fresh_db):
    from fastapi.testclient import TestClient
    import uuid
    from src.db import get_system_db, close_system_db
    from src.repositories.users import UserRepository
    from app.auth.jwt import create_access_token
    from app.main import app

    conn = get_system_db()
    try:
        uid = str(uuid.uuid4())
        UserRepository(conn).create(id=uid, email="u@t", name="U", role="analyst")
        token = create_access_token(user_id=uid, email="u@t", role="analyst")
    finally:
        conn.close()
        close_system_db()

    client = TestClient(app)
    resp = client.get(
        "/profile",
        headers={"Accept": "text/html"},
        cookies={"access_token": token},
    )
    assert resp.status_code == 200
    assert "Personal access tokens" in resp.text
  • Step 4: Run — pass

  • Step 5: Commit

git add app/web/templates/profile.html app/web/router.py tests/test_pat.py
git commit -m "feat(ui): /profile page with PAT create/list/revoke (#12)"

Task 2.8: Docs — fix cli/skills/security.md 24h/30d mismatch + PAT section

Files:

  • Modify: cli/skills/security.md

  • Create: docs/HEADLESS_USAGE.md

  • Step 1: Update cli/skills/security.md

Find the line claiming "Issued on login, valid 30 days" and correct:

Session tokens: issued on interactive login (`da login`), valid 24 hours.
For long-lived CLI / CI use, create a Personal Access Token via the UI
(`/profile` → Personal access tokens) or CLI (`da auth token create`).
PATs are revocable and auditable; session tokens are not.
  • Step 2: Create docs/HEADLESS_USAGE.md
# Headless / CI usage

For unattended clients (CI, cron, Claude Code), authenticate with a Personal Access Token (PAT) rather than an interactive session.

## Create a PAT

**Via UI:** sign in, open `/profile`, create a token. Copy the raw value — it is shown exactly once.

**Via CLI (requires an interactive session):**

```bash
da auth token create --name "github-actions" --ttl 365d --raw

The --raw flag prints only the token, suitable for piping into a secret store.

Use the PAT

Set the DA_TOKEN env var:

export DA_TOKEN=<your-token>
da query "SELECT 1"

GitHub Actions example

- name: Sync data
  env:
    DA_TOKEN: ${{ secrets.AGNES_TOKEN }}
    DA_SERVER: https://agnes.example.com
  run: |
    pip install data-analyst
    da sync --all    

Revoke

da auth token list
da auth token revoke <id|prefix|name>

Or from /profile → Revoke.


- [ ] **Step 3: Commit**

```bash
git add cli/skills/security.md docs/HEADLESS_USAGE.md
git commit -m "docs: PAT usage and session/PAT TTL clarification (#12)"

Task 2.9: Phase 2 integration

  • Step 1: Run full suite

Run: pytest tests/ --timeout=30

  • Step 2: Smoke

Start server, sign in, create a PAT from /profile, use it via DA_TOKEN to run da query. Revoke it. Verify a revoked PAT fails on next call.

  • Step 3: Request code review

Dispatch superpowers:code-reviewer on Phase 2 diff. Fix blockers.


Phase 3 — CLI Distribution (#9)

Scope: Dockerfile staví wheel + install skript, FastAPI vystavuje /cli/download a /cli/install.sh s base-URL zaplétaným do skriptu, /install HTML stránka s návodem, link v dashboardu, fix bugu da login nezadává heslo. Statické docs v image.

File Structure

  • Modify: Dockerfileuv build + stash wheel at /app/dist
  • Create: app/api/cli_artifacts.py/cli/download + /cli/install.sh
  • Modify: app/main.py — register router
  • Create: app/web/templates/install.html
  • Modify: app/web/router.py — add /install route
  • Modify: app/web/templates/dashboard.html — link to /install
  • Modify: cli/commands/auth.py — prompt for password, send in body
  • Test: tests/test_cli_artifacts.py (new)
  • Test: tests/test_cli_auth.py — extend

Task 3.1: Dockerfile — build wheel + bake CLI version

Files:

  • Modify: Dockerfile

  • Step 1: Replace Dockerfile

FROM python:3.13-slim

RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*

COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

ARG AGNES_VERSION=dev
ARG RELEASE_CHANNEL=dev
ARG AGNES_COMMIT_SHA=unknown
ENV AGNES_VERSION=${AGNES_VERSION}
ENV RELEASE_CHANNEL=${RELEASE_CHANNEL}
ENV AGNES_COMMIT_SHA=${AGNES_COMMIT_SHA}

WORKDIR /app

COPY . .

# Build wheel artifact (served at /cli/download)
RUN uv build --wheel --out-dir /app/dist

# Install production dependencies from pyproject.toml
RUN uv pip install --system --no-cache .

EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
  • Step 2: Local build verification

Run: docker build -t agnes:test . Then: docker run --rm agnes:test ls -la /app/dist/ Expected: agnes_the_ai_analyst-*.whl file exists.

  • Step 3: Commit
git add Dockerfile
git commit -m "build(docker): produce wheel artifact for /cli/download (#9)"

Task 3.2: FastAPI — /cli/download + /cli/install.sh

Files:

  • Create: app/api/cli_artifacts.py

  • Modify: app/main.py

  • Step 1: Failing test

# tests/test_cli_artifacts.py
"""Tests for #9 — CLI artifact + install script endpoints."""

import os
from pathlib import Path
import tempfile


def test_cli_install_script_bakes_server_url(monkeypatch):
    from fastapi.testclient import TestClient
    from app.main import app

    client = TestClient(app, base_url="https://agnes.example.com")
    resp = client.get("/cli/install.sh", headers={"host": "agnes.example.com"})
    assert resp.status_code == 200
    assert resp.headers["content-type"].startswith("text/")
    body = resp.text
    assert "https://agnes.example.com" in body or "agnes.example.com" in body
    assert "pip install" in body or "uv tool install" in body


def test_cli_download_returns_wheel_or_404(monkeypatch):
    from fastapi.testclient import TestClient
    from app.main import app

    client = TestClient(app)
    resp = client.get("/cli/download")
    # Either serve the wheel or return a clear 404 telling where to find it.
    assert resp.status_code in (200, 404)
    if resp.status_code == 200:
        assert resp.headers["content-disposition"].startswith("attachment")


def test_cli_download_serves_wheel_when_present(monkeypatch, tmp_path):
    """Put a fake wheel and confirm the endpoint serves it."""
    wheel = tmp_path / "agnes_fake-1.0-py3-none-any.whl"
    wheel.write_bytes(b"PK\x03\x04fake-wheel-bytes")
    monkeypatch.setenv("AGNES_CLI_DIST_DIR", str(tmp_path))
    from fastapi.testclient import TestClient
    from app.main import app
    client = TestClient(app)
    resp = client.get("/cli/download")
    assert resp.status_code == 200
    assert resp.content.startswith(b"PK")
  • Step 2: Run — fail

  • Step 3: Implement app/api/cli_artifacts.py

"""CLI artifact download + install script endpoints (#9)."""

import os
from pathlib import Path

from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import FileResponse, PlainTextResponse

router = APIRouter(tags=["cli"])


def _dist_dir() -> Path:
    return Path(os.environ.get("AGNES_CLI_DIST_DIR", "/app/dist"))


def _find_wheel() -> Path | None:
    d = _dist_dir()
    if not d.exists():
        return None
    wheels = sorted(d.glob("*.whl"))
    return wheels[-1] if wheels else None


@router.get("/cli/download")
async def cli_download():
    wheel = _find_wheel()
    if not wheel:
        raise HTTPException(
            status_code=404,
            detail=(
                "CLI wheel not found in dist dir. Build it with `uv build --wheel` "
                "or run the official docker image (which builds on image-build)."
            ),
        )
    return FileResponse(
        path=str(wheel),
        filename=wheel.name,
        media_type="application/octet-stream",
        headers={"Content-Disposition": f'attachment; filename="{wheel.name}"'},
    )


@router.get("/cli/install.sh", response_class=PlainTextResponse)
async def cli_install_script(request: Request):
    """Shell installer — bakes this server's URL into the generated config."""
    base_url = str(request.base_url).rstrip("/")
    version = os.environ.get("AGNES_VERSION", "dev")
    script = f"""#!/usr/bin/env bash
# Agnes CLI installer — server: {base_url}
set -euo pipefail

SERVER="{base_url}"
echo "Installing Agnes CLI from $SERVER (version: {version})"

# 1. Download the wheel
WHEEL=$(mktemp -t agnes_cli.XXXXXX.whl)
curl -fsSL "$SERVER/cli/download" -o "$WHEEL"

# 2. Install via pip (prefer uv tool install if available)
if command -v uv >/dev/null 2>&1; then
    uv tool install --force "$WHEEL"
else
    python3 -m pip install --user --force-reinstall "$WHEEL"
fi

rm -f "$WHEEL"

# 3. Seed the server URL in CLI config
CFG_DIR="${{DA_CONFIG_DIR:-$HOME/.config/da}}"
mkdir -p "$CFG_DIR"
cat > "$CFG_DIR/config.yaml" <<EOF
server: $SERVER
EOF

echo "Installed."
echo "Next steps:"
echo "  1. Sign in to $SERVER and create a personal access token at $SERVER/profile"
echo "  2. Export it:   export DA_TOKEN=<your-token>"
echo "  3. Verify:      da auth whoami"
"""
    return script
  • Step 4: Register in app/main.py
from app.api.cli_artifacts import router as cli_artifacts_router
# ...
    app.include_router(cli_artifacts_router)
  • Step 5: Run — pass

Run: pytest tests/test_cli_artifacts.py -v

  • Step 6: Commit
git add app/api/cli_artifacts.py app/main.py tests/test_cli_artifacts.py
git commit -m "feat(api): /cli/download wheel + /cli/install.sh with baked server URL (#9)"

Task 3.3: /install HTML page

Files:

  • Create: app/web/templates/install.html

  • Modify: app/web/router.py

  • Step 1: Create template

app/web/templates/install.html:

{% extends "base.html" %}
{% block title %}Install CLI — {{ config.INSTANCE_NAME }}{% endblock %}

{% block content %}
<div class="container">
  <h2>Install the Agnes CLI</h2>
  <p>This server: <code>{{ server_url }}</code> (version <code>{{ agnes_version }}</code>)</p>

  <h3>One-liner (Linux / macOS)</h3>
  <pre><code>curl -fsSL {{ server_url }}/cli/install.sh | bash</code></pre>

  <h3>Manual</h3>
  <ol>
    <li>Download: <a href="/cli/download">{{ server_url }}/cli/download</a></li>
    <li>Install:
      <pre><code>uv tool install ./agnes-*.whl
# or
python3 -m pip install --user ./agnes-*.whl</code></pre>
    </li>
    <li>Seed server URL:
      <pre><code>mkdir -p ~/.config/da
echo "server: {{ server_url }}" > ~/.config/da/config.yaml</code></pre>
    </li>
  </ol>

  <h3>Connect</h3>
  <p>Create a personal access token (see <a href="/profile">/profile</a>) and export it:</p>
  <pre><code>export DA_TOKEN=&lt;your-token&gt;
da auth whoami</code></pre>

  <h3>Claude Code / MCP</h3>
  <p>
    Store your token in <code>~/.config/da/token.json</code> (Claude Code
    reads this automatically via the <code>da</code> entrypoint) or export
    <code>DA_TOKEN</code> in your shell.
  </p>

  <h3>CI / headless</h3>
  <p>See <a href="/docs/HEADLESS_USAGE.md" target="_blank">Headless usage guide</a>.</p>
</div>
{% endblock %}
  • Step 2: Add route

In app/web/router.py:

@router.get("/install", response_class=HTMLResponse)
async def install_page(request: Request):
    """Public install instructions for the CLI."""
    base_url = str(request.base_url).rstrip("/")
    ctx = _build_context(
        request,
        server_url=base_url,
        agnes_version=os.environ.get("AGNES_VERSION", "dev"),
    )
    return templates.TemplateResponse(request, "install.html", ctx)
  • Step 3: Dashboard link

In app/web/templates/dashboard.html, find the primary nav and add:

<a class="nav-link" href="/install">Install CLI</a>
  • Step 4: Test
# tests/test_cli_artifacts.py — append

def test_install_page_renders_with_server_url():
    from fastapi.testclient import TestClient
    from app.main import app
    client = TestClient(app)
    resp = client.get("/install", headers={"host": "agnes.test", "Accept": "text/html"})
    assert resp.status_code == 200
    assert "agnes.test" in resp.text
    assert "da auth whoami" in resp.text
  • Step 5: Run — pass

  • Step 6: Commit

git add app/web/router.py app/web/templates/install.html app/web/templates/dashboard.html tests/test_cli_artifacts.py
git commit -m "feat(ui): /install page with per-deployment install instructions (#9)"

Task 3.4: Fix da login password prompt bug

Files:

  • Modify: cli/commands/auth.py

  • Test: tests/test_cli_auth.py — append

  • Step 1: Failing test

# tests/test_cli_auth.py — append

def test_da_login_sends_password(monkeypatch):
    import httpx
    from typer.testing import CliRunner
    from cli.commands import auth as auth_mod

    captured = {}

    def fake_post(path, json=None, **kwargs):
        captured["path"] = path
        captured["json"] = json
        return httpx.Response(200, json={
            "access_token": "tok", "email": "u@t", "role": "analyst",
            "user_id": "u1", "token_type": "bearer",
        })

    monkeypatch.setattr(auth_mod, "api_post", fake_post, raising=False)

    runner = CliRunner()
    # Provide email and password via stdin (typer prompts)
    result = runner.invoke(auth_mod.auth_app, ["login"], input="u@t\nhunter2\n")
    assert result.exit_code == 0, result.output
    assert captured["path"] == "/auth/token"
    assert captured["json"] == {"email": "u@t", "password": "hunter2"}
  • Step 2: Run — fail

  • Step 3: Fix login command

Replace in cli/commands/auth.py:

@auth_app.command()
def login(
    email: str = typer.Option(..., prompt=True, help="Your email address"),
    password: str = typer.Option(
        "", prompt="Password (leave empty for magic-link / OAuth accounts)",
        hide_input=True, help="Your password (if the account has one)",
    ),
    server: str = typer.Option(None, help="Server URL override"),
):
    """Login and obtain a JWT token.

    Password-enabled accounts: enter the password when prompted.
    Magic-link / OAuth accounts: leave the password empty — the server will
    respond with guidance pointing you to the correct auth provider.
    """
    if server:
        import os
        os.environ["DA_SERVER"] = server

    body = {"email": email}
    if password:
        body["password"] = password

    try:
        resp = api_post("/auth/token", json=body)
        if resp.status_code == 200:
            data = resp.json()
            save_token(data["access_token"], data["email"], data["role"])
            typer.echo(f"Logged in as {data['email']} (role: {data['role']})")
            return
        # Helpful error for accounts that cannot login via password.
        try:
            detail = resp.json().get("detail", resp.text)
        except Exception:
            detail = resp.text
        if resp.status_code == 401 and "external authentication" in str(detail).lower():
            typer.echo(
                "This account uses a magic link / OAuth provider. "
                "Sign in via the web UI, open /profile, and create a personal "
                "access token — then export it as DA_TOKEN.",
                err=True,
            )
        else:
            typer.echo(f"Login failed: {detail}", err=True)
        raise typer.Exit(1)
    except typer.Exit:
        raise
    except Exception as e:
        typer.echo(f"Connection error: {e}", err=True)
        raise typer.Exit(1)
  • Step 4: Run — pass

Run: pytest tests/test_cli_auth.py::test_da_login_sends_password -v

  • Step 5: Commit
git add cli/commands/auth.py tests/test_cli_auth.py
git commit -m "fix(cli): da login prompts for password and sends it in body (#9)"

Task 3.5: Phase 3 integration

  • Step 1: Run full suite

Run: pytest tests/ --timeout=30

  • Step 2: Smoke

  • docker build produces wheel in /app/dist.

  • Hitting /cli/install.sh returns a shell script with the correct URL.

  • Hitting /install renders install instructions with the correct base URL.

  • da login now prompts for a password and succeeds against a password-enabled account.

  • Step 3: Request code review

Dispatch superpowers:code-reviewer on Phase 3 diff.


Final Integration

Task F.1: Full suite + merge

  • All three phases green: pytest tests/ --timeout=30
  • Docker build succeeds and serves the wheel + install.sh
  • Manual walkthrough: new user flow (create via UI → reset password → log in → create PAT → use PAT via CLI → revoke → verify PAT rejected).

Task F.2: Coverage check against issues

Run a final verification agent that reads each of #9, #10, #11, #12 and the resulting diff, and reports any unmet acceptance criterion. Iterate until every bullet is green.


Self-Review

  • Spec coverage for #10: HTML redirect handled in Phase 0. API clients still get 401.

  • Spec coverage for #11:

    • Schema v5 (active + deactivated_at/by)
    • PATCH, POST /reset-password, POST /set-password, POST /activate, POST /deactivate, audit log on every mutation
    • Self-deactivate + last-admin safeguards
    • get_current_user checks active
    • CLI commands (set-role, activate, deactivate, reset-password, set-password) + extended list-users
    • /admin/users UI
    • Tests for every bullet
  • Spec coverage for #12:

    • personal_access_tokens DuckDB table (schema v6)
    • JWT typ+jti; verify_token via DB for typ==pat
    • /auth/tokens CRUD + admin variant
    • CLI da auth token create|list|revoke
    • UI profile page with one-time reveal
    • Audit entries on create/revoke; last_used_at updated (sync)
    • cli/skills/security.md correction + docs/HEADLESS_USAGE.md
    • PAT cannot create new PATs (session-only guard)
  • Spec coverage for #9:

    • Wheel built in Dockerfile, stored at /app/dist
    • /cli/download + /cli/install.sh with base URL baked-in
    • /install page
    • da login password bug fix
    • Dashboard link
  • Placeholder scan: every code step has a full code block or exact command. No "TBD" or "implement later".

  • Type consistency: typ ("session"|"pat"), token_id → stored as id/jti, repository field names consistent across Phase 2 tasks.

All spec bullets covered. Ready for execution handoff.