agnes-the-ai-analyst/tests/test_connector_kit_poc.py
ZdenekSrotyr d2c76cb221
User management + PAT + CLI distribution + HTML auth redirect (#9 #10 #11 #12) (#28)
* fix: redirect unauthenticated HTML routes to /login (#10)

* docs(plan): user mgmt + PAT + CLI distribution implementation plan (#9 #10 #11 #12)

* build(docker): produce wheel artifact for /cli/download (#9)

* feat(db): schema v5 — users.active + deactivated_at/by (#11)

* feat(api): /cli/download wheel + /cli/install.sh with baked server URL (#9)

* feat(users): repository supports active flag + count_admins (#11)

* feat(ui): /install page with per-deployment install instructions (#9)

* feat(api): user PATCH/reset-password/set-password/activate/deactivate (#11)

* fix(cli): da login prompts for password and sends it in body (#9)

* test(api): safeguard tests for self-deactivate and last admin (#11)

* feat(auth): reject requests from deactivated users (#11)

* fixup(#10): propagate next through /login buttons + lock down sanitizer tests

* feat(cli): da admin set-role/activate/deactivate/reset-password/set-password (#11)

* feat(ui): /admin/users management page (#11)

* feat(db): schema v6 — personal_access_tokens (#12)

* feat(users): access_tokens repository (#12)

* feat(auth): JWT carries typ (session|pat) and explicit jti (#12)

* feat(auth): reject revoked/expired PATs; update last_used_at (#12)

* feat(api): /auth/tokens CRUD + admin revoke; session-only guard (#12)

* feat(cli): da auth token create/list/revoke (#12)

* feat(ui): /profile page with PAT create/list/revoke (#12)

* docs: PAT usage and session/PAT TTL clarification (#12)

* feat(auth): PAT first-use-from-new-IP audit + last_used_ip (schema v7) (#12)

Closes remaining acceptance gap from issue #12: audit_log entry on first use
of a PAT from an IP that differs from the recorded last_used_ip.

- schema v7: personal_access_tokens.last_used_ip column
- AccessTokenRepository.mark_used now stores the client IP
- get_current_user extracts client IP (X-Forwarded-For first hop, fallback
  to request.client.host) and emits a token.first_use_new_ip audit when the
  IP changes on a subsequent use (not the very first use)
- tests: new-ip audit, same-ip no-op, first-ever-use no-op, schema v7 column

* fix: address Devin review findings on PR #28

- app/main.py: exclude /auth/* from HTML redirect handler so JSON
  endpoints under /auth/ (PAT CRUD used by `da auth token` CLI) keep
  their 401 JSON contract (Devin #1, bug)
- app/api/tokens.py: reject expires_in_days <= 0 explicitly; use
  `is not None` so 0 no longer silently creates a non-expiring token
  (Devin #2)
- app/api/users.py: validate role against Role enum in create_user
  to match update_user and prevent 500 on role-protected requests
  later (Devin #3)
- app/web/templates/admin_users.html: escape user-supplied strings
  before innerHTML; move onclick handlers to addEventListener via
  data attributes so emails with quotes / HTML no longer break the UI
  or enable stored XSS (Devin #4)
- app/auth/router.py, app/auth/providers/{password,google}.py:
  reject deactivated users at login instead of issuing a JWT that
  would then fail on the next request — removes the confusing
  redirect loop (Devin #5)
- CLAUDE.md: document schema v7 instead of stale v4 (Devin #6)
- tests/test_web_ui.py: regression test for the /auth/* JSON 401

* feat(web): add /profile and /admin/users links to dashboard nav

* feat(web): point setup banner at /install page

* chore(web): drop unused setup_instructions context

* fix: address Devin review round 2 on PR #28

- app/api/tokens.py: when expires_in_days is None (the "never" option),
  use a ~100-year JWT expiry so the token doesn't silently die in 24h
  via the session-default fallback in create_access_token. The real
  expiry enforcement stays in verify_token's DB-level check (Devin 🔴)
- app/web/templates/profile.html: escape t.name and other user-supplied
  strings via esc() helper before innerHTML, same pattern as
  admin_users.html. Move revoke onclick to data-attribute +
  addEventListener (Devin 🟡)
- app/api/cli_artifacts.py: use `mktemp -d` with X's at end of template
  for GNU/BSD portability, place wheel inside the temp dir and
  clean up with rm -rf (Devin 🚩)

* feat(web): redesign /install page; make curl one-liner primary, collapse manual

Rebuild the public /install page using the dashboard visual language
(shared header, card layout, gradient hero, design tokens from
style-custom.css). The page is now anchored on the one-liner install
path: curl -fsSL <server>/cli/install.sh | bash is rendered as the
primary, prominent step 1, while the old manual wheel-download flow
is tucked behind a closed-by-default <details> block for users in
restricted/offline environments.

Information architecture:
  hero (server URL + version)
  -> step 1: quick install (one-liner, big Copy button)
  -> step 2: create PAT on /profile + export DA_TOKEN / da auth whoami
  -> step 3: Claude Code / MCP via ~/.config/da/token.json
  -> collapsed "Manual install" details for download-wheel flow
  -> footer link to docs/HEADLESS_USAGE.md

Every shell snippet has a vanilla-JS "Copy" button that confirms
visually ("Copied!" for 1.5s) and falls back to textarea+execCommand
on non-secure contexts. No new dependencies, no bundler.

The route now also pulls an optional user so the header shows the
same nav (Dashboard / Profile / Logout) as dashboard.html when a
session exists, while staying fully public when signed out.

* fix(cli): use real wheel filename in install.sh (broken pip/uv install)

The installer wrote the downloaded wheel as agnes_cli.whl, which lacks a
PEP-427 version component — both pip and uv tool install reject it and
abort the one-liner.

Use curl -OJ so Content-Disposition determines the on-disk filename, then
resolve it via glob. Install an EXIT trap to remove the tmpdir even when
install fails.

* fix(web): correct manual install wheel glob and add PEP 668 / PATH hints

- Wheel glob is agnes_the_ai_analyst-*.whl (not agnes-*.whl) — the old
  pattern never matched the real artefact name from the build.
- Add — or — separator between uv tool install and pip install.
- Warn that pip install --user is blocked on macOS Homebrew / modern
  Debian (PEP 668) and recommend uv tool install as the default path.
- Both flows now show the ~/.local/bin PATH hint so a fresh shell can
  find the da binary after install.

* fix(web): consistent session.user reference in install header

The avatar-letter fallback inside {% if session.user %} was reading
user.name / user.email directly, but the route dependency can pass
user=None — those references resolved to an empty FlexDict and produced
an empty avatar circle. Read everything through session.user to match
the guard and the dashboard pattern.

* fix(web): point headless usage link at GitHub source

/docs/HEADLESS_USAGE.md 404s — no static route serves repo docs. Point
the footer link at the rendered markdown on GitHub instead of adding a
dedicated docs serving route just for one file.

* feat(web): /install hero size, anon sign-in banner, step 2 copy polish

- Bump hero h1 from 26px to 30px to match dashboard primary scale.
- Anonymous visitors see a small sign-in banner above Step 2 (creating
  a token requires auth; without the banner the flow appears stuck).
- Add an 'After generating your token' section label inside Step 2 so
  the /profile CTA button no longer looks wedged mid-sentence between
  adjacent paragraphs.

* chore(web): /install a11y + version pill polish

- aria-live='polite' on copy buttons so screen readers announce the
  'Copied!' state change.
- Replace redundant INSTANCE_NAME eyebrow (already in the header logo)
  with 'Getting started'.
- Hide the version pill when AGNES_VERSION is unset/'dev' — avoids the
  misleading 'vdev' label in local/unbuilt runs.
- Manual summary focus-visible outline-offset +2px (was -2px which
  clipped inside the card), and mark the chevron as decorative.

* fix(web): use session.user in dashboard avatar fallback

Inside {% if session.user %} guard, the avatar fallback referenced
(user.name or user.email). If user is None the block crashes when
the profile picture is absent. Align with the guard variable.

* fix: address Devin review round 3 on PR #28

- app/api/users.py: stop auto-sending email from reset_password. The
  magic-link sender would deliver a "Login Link" that — when clicked —
  consumes the reset_token via verify_magic_link and logs the user in
  WITHOUT prompting for a new password. Admins now share the raw
  reset_token from the API response manually, or use set-password
  directly. email_sent is always False. Documented inline. (Devin 🟡)
- app/api/cli_artifacts.py: harden /cli/install.sh generation against
  shell injection via Host header or AGNES_VERSION. base_url is
  validated against a strict scheme+host+port regex; version against
  an alnum + dot/dash/underscore allowlist. Both values are also
  piped through shlex.quote() as defense in depth. (Devin 🟡)

The shared users.reset_token column between magic-link and password-
reset flows (Devin 🚩) remains an architectural gap; splitting into
separate columns needs schema v8 and is tracked for a follow-up PR.

* docs, chore(grpn): manual-deploy helpers + hackathon deploy learnings

Adds scripts/grpn/ — Makefile + agnes-auto-upgrade.sh + README for
operating Agnes on GRPN's existing foundryai-development VM when the
full Terraform flow is blocked by org policies:

- iam.disableServiceAccountKeyCreation (org constraint) forbids SA
  JSON keys, so GCP_SA_KEY-based CI is unavailable
- No projectIamAdmin delegation → bootstrap-gcp.sh can't grant roles
- Secret Manager IAM bindings require setIamPolicy which editor lacks

Helper targets: deploy, deploy-tag, recreate, restart, stop, start,
status, version, logs, ps, env, ssh, tunnel, open, bootstrap-admin,
set-data-source, install-cron, uninstall-cron.

docs/superpowers/plans/2026-04-22-grpn-deploy-learnings.md — running
log of all org-policy constraints hit during the hackathon deploy,
with workarounds and derived follow-ups (WIF support, external_ip
variable, customer onboarding IAM checklist).

Not a replacement for the TF flow — stopgap until WIF lands.

* fix(web): make header logos clickable links to home

* feat(web): one-click "Setup a new Claude Code" button

Adds a single-button flow on the dashboard and /install page that
generates a fresh personal access token via POST /auth/tokens and
copies a complete, paste-ready setup script (server URL, token,
install/verify commands) to the clipboard. Falls back to a modal
textarea when the clipboard is blocked; redirects to /login on 401;
surfaces backend errors inline.

- dashboard.html: replaces the top "Set up your local environment"
  anchor with a real button wired to setupNewClaude(). Removes the
  duplicate bottom setup banner to keep a single entry point.
- install.html: for signed-in users, Step 1 leads with the one-click
  button and demotes the curl one-liner into a collapsible "Or run
  manually" aside. Anonymous visitors still see the curl flow plus a
  sign-in hint.
- No new deps. Vanilla JS. Token lives in memory/clipboard only —
  never rendered into persistent DOM.

* feat(cli): add "da auth import-token" for non-interactive PAT login

Writes a provided JWT into ~/.config/da/token.json using the canonical
{access_token, email, role} shape expected by save_token(). Decodes the
token locally to pull email/role claims, verifies it against the server
via GET /api/catalog/tables, and refuses to overwrite an existing token
file if the server returns 401. --email / --role overrides exist for
tokens missing those claims; --skip-verify bypasses the server round-trip
for offline / CI scenarios.

* test(cli): cover da auth import-token success + 401 + claim-fallback paths

Three new tests in TestAuthImportToken:
- valid JWT + 200 -> canonical token.json written
- 401 from /api/catalog/tables -> exit 1, existing token file untouched
- JWT without email/role claims -> refused without overrides, accepted
  with --email / --role flags

* feat(web): update one-click Claude setup instructions — explicit uv install, import-token, skills question

Replaces the fragile `cat > token.json <<EOF` clipboard payload with an
explicit, auditable sequence:

  1. `curl -fsSL /cli/download` + `uv tool install --force` (no opaque
     `curl | bash`).
  2. `da auth import-token --token ...` instead of hand-written JSON.
  3. Explicit PATH persistence for zsh/bash.
  4. A required question to the user about whether to copy the bundled
     skills into ~/.claude/skills/agnes/ or pull them on-demand via
     `da skills show`.
  5. A final confirmation step with whoami + version output.

Factored both pages to include a shared partial
(app/web/templates/_claude_setup_instructions.jinja) so dashboard.html
and install.html can never drift apart again. {server_url} and {token}
stay as runtime placeholders substituted by renderSetupInstructions().

* feat(ui): modernize /admin/users + unify header nav across pages

- New shared partial app/web/templates/_app_header.html — single source
  of truth for the top navigation. Used by base.html and dashboard.html
  (which doesn't extend base.html). Active page highlighted via
  request.url.path. Admin "Users" link gated by session.user.role.
- style-custom.css: add .app-header / .app-nav-link / .app-btn-logout /
  .app-avatar styles (mirrors dashboard's previous inline copy under
  app-* prefix). Mobile-friendly fallback at <720px.
- base.html: include the new partial so every page extending base
  (admin_users, profile, login_email, error, …) gets the same chrome
  the dashboard has.
- dashboard.html: replace its inline <header class="header"> markup
  with the shared partial. Inline .header CSS left in place as
  harmless dead code (separate cleanup PR).
- admin_users.html: rewritten with avatars, role pills (color-coded
  per role), toggle switch for active, search/filter input, toast
  notifications, modal dialogs replacing alert/confirm/prompt,
  one-click copy for the reset token, empty / loading states.
  All XSS-safe via the existing esc() helper + data-attribute
  event delegation.
- tests/test_web_ui.py: smoke test that /admin/users renders the new
  shared header chrome and the modernized markup.

* feat(api): serve CLI wheel at /cli/agnes.whl for direct uv install

uv tool install inspects the URL path suffix to recognise a wheel, so
/cli/download (which has no .whl suffix) cannot be installed directly.
Expose a stable /cli/agnes.whl alias over the same wheel lookup so users
can run: uv tool install --force https://<server>/cli/agnes.whl

* test(cli): cover da auth import-token --server persisting to config.yaml

The server persistence was already implemented in the import-token command
(save_config({server}) call) but not covered by tests. Add an explicit test
so the one-step setup contract — single import-token call writes both token
and server — cannot regress.

* feat(web): simpler Claude setup — single uv install URL, single import-token call

User feedback: the prior clipboard payload repeated the server URL and
token across multiple steps (curl + tmpfile + install + rm + separate
seed-config + import-token). Collapse to:

 1. uv tool install --force {server_url}/cli/agnes.whl  (single URL, direct)
 2. da auth import-token --token ... --server ...        (one call, persists both)
 3. da auth whoami
 4. skills (ask user first)
 5. confirm

uv accepts HTTPS URLs that end in .whl and installs them directly, so
the tmpfile dance is unnecessary. import-token --server already persists
the server to config.yaml, so no separate printf > config.yaml step.

* fix(tests): update admin users heading assertion after template rename

The admin_users.html template now uses <h2 class="users-title">Users</h2>
instead of <h2>User management</h2>. Update the assertion to match.

* feat(ui): unify header across remaining 7 standalone pages

These 7 pages render their own full <html> and don't extend base.html,
so the previous unification commit only covered base + dashboard. Each
had its own ad-hoc <header> markup with inconsistent classes
(.top-header / .header / .page-header), inconsistent nav-link sets,
and inconsistent avatar/email styling.

Replace each inline <header>...</header> block with the shared
{% include '_app_header.html' %} so /activity-center, /admin/permissions,
/admin/tables, /catalog, /corporate-memory, /corporate-memory/admin,
and /install all show the same chrome (Dashboard / Install CLI /
Profile / Users / email + avatar / Logout) with the active page
highlighted via request.url.path.

Old inline header CSS (.header, .top-header, .page-header, .nav-link,
etc.) is left in place as harmless dead code; it can be cleaned up in
a follow-up sweep.

* feat(web): add readable preview of Claude setup payload on dashboard + /install

Move the line-by-line setup instructions into app/web/setup_instructions.py
as the single source of truth, then render them in two modes from the
existing _claude_setup_instructions.jinja partial:

- preview_mode=True  → visible, read-only <pre><code> block with the real
  server URL and a clearly-styled placeholder token (never a real one).
- preview_mode=False → the JS SETUP_INSTRUCTIONS_TEMPLATE used by the
  one-click flow (unchanged behaviour).

Both /dashboard (env-setup-cta card) and /install (Step 1 card) now show
the preview directly under the 'Setup a new Claude Code' button so users
can see exactly what will land in their clipboard before they click.

* feat(web): update setup instructions — `da diagnose` step, explicit section titles

Rework the Claude Code setup payload to:

- Give every numbered step an unambiguous verb header ("1) Install the CLI",
  "2) Log in", "3) Verify the login", "4) Run diagnostics", "5) Skills (ask
  the user first)", "6) Confirm").
- Add step 4 `da diagnose` as the post-login health check. The CLI already
  ships this command (cli/commands/diagnose.py); it prints "Overall:
  healthy" and a list of green checks that map cleanly to next actions.
- Ask the skills copy-vs-on-demand question verbatim so Claude Code always
  prompts the user the same way.
- Replace the terse "Confirm" line with a 4-bullet summary (version,
  whoami, skills choice, diagnose status) so the return message is
  structured and comparable across setups.

* chore(web): remove stale MCP card from /install (no MCP server today)

The 'Use with Claude Code / MCP' card (Step 3 on /install) referenced an
MCP integration Agnes does not ship. Remove the whole card. The one-click
'Setup a new Claude Code' flow in Step 1 already covers the long-lived
client use case and is less confusing than dangling persistence tips for
a non-existent integration.

* feat(api): include user_email + last_used_ip + user_id in admin tokens list response

Adds AdminTokenItem response model (superset of TokenListItem) and
AccessTokenRepository.list_all_with_user() joining personal_access_tokens
with users to denormalize user_email. Needed for /admin/tokens UI where
admins triage tokens across all users.

* feat(web): /admin/tokens page — list, filter, search, revoke across all users

Adds a new admin-only page with client-side filtering (status, user email,
last-used window), column sorting, counts bar (active/revoked/expired),
and an inline revoke action. Mirrors the /admin/users visual language.

* feat(web): add Tokens nav link for admins + deep-link from admin/users row

Admin-only nav entry to /admin/tokens, and a per-row Tokens button on
/admin/users that prefills the token page's user filter via ?user=<email>.

* test(admin): cover /admin/tokens rendering, filter state, non-admin denial, revoke

Verifies admin can render the page (title + JS hooks present), a non-admin
is blocked, unauthenticated users are redirected, the admin list response
includes user_email / user_id / last_used_ip, and admin can revoke another
user's token.

* feat(web): modern redesign of /admin/tokens — hero, stat strip, refined table, responsive cards, a11y

* feat(web): ditch the table — /admin/tokens as a card stack, modern GitHub-style list

Replaces the table-based layout with a stack of self-contained token cards
inside a <ul role=list>. Each card is a flex row: avatar + name/meta on the
left, last-used block in the middle, status pill + outlined 'Revoke' button
on the right. Status and sort controls are pill-shaped toggle chips; user
email search has an inline search icon. No <table>/<tr>/<th>/<td> anywhere.
Responsive below 720px (card stacks vertically) and 480px (stat chips 2x2).
Preserves filter IDs (flt-status, flt-user, flt-last-used) and data-revoke
for existing tests.

* feat(web): add /tokens (role-aware) — single page for both user PAT CRUD and admin overview

- Rename admin_tokens.html -> tokens.html with a new is_admin context flag.
- New route GET /tokens: renders the same card-stack UI for everyone.
  * Admins: loads /auth/admin/tokens, shows owner column + stat strip, keeps
    the owner-email search box and sort-by-owner chip.
  * Non-admins: loads /auth/tokens (own tokens only), hides owner column +
    stat chips, adds a 'New token' CTA in the hero that opens a modal
    (name + expires_in_days) calling POST /auth/tokens. The raw token is
    revealed once in a dismissable banner and cleared from the DOM on Hide.
- GET /admin/tokens now 302-redirects to /tokens, preserving query string
  (so the /admin/users deep-link ?user=foo still works).

* feat(web): /tokens full-bleed layout to match dashboard width

The hero, toolbar, and card list used to sit inside base.html's .container
(max-width 800px). Break out with negative horizontal margins so the page
spans the viewport like /dashboard does, capped at 1440px for readability
on very wide screens with a 24px gutter on each side.

- No change to base.html itself. The override is scoped to .tokens-page.
- body { overflow-x: hidden; } guards against rare horizontal scrollbars.
- < 808px viewport: reset to natural flow (mobile already narrower).
- ≥ 1488px viewport: cap to 1440px and re-center.

* chore(web): remove /profile template + nav link (redirect /profile -> /tokens)

The old /profile PAT CRUD page is now redundant — the modern /tokens page
covers both user and admin flows. Delete the template; the router's
/profile handler already 302-redirects to /tokens.

Nav cleanup:
- Remove the 'Profile' link.
- Show a single 'Tokens' link to every signed-in user (previously only
  admins saw it).
- Active-state matches /tokens, /admin/tokens, and /profile so the
  highlight survives the redirect chain.

/install CTA now points at /tokens instead of /profile.

* test: cover /tokens for admin + non-admin flows, /profile redirect, nav update

tests/test_admin_tokens_ui.py
- Point admin rendering test at /tokens directly and tighten assertions
  (admin-only stat strip + owner search, non-admin CTA absent).
- Add test_non_admin_can_render_tokens_page: personal body, New-token CTA,
  create-modal, reveal banner; stat strip + owner search absent.
- Add test_admin_tokens_redirects_to_tokens: 302 to /tokens, query string
  (?user=...) preserved for the /admin/users deep-link.
- Add test_profile_redirects_to_tokens: 302 to /tokens.
- Add test_non_admin_can_create_pat_via_tokens_page_api: exercises the
  POST /auth/tokens call that the non-admin create-modal submits.

tests/test_pat.py
- test_profile_page_renders -> test_profile_page_redirects_to_tokens:
  assert the 302 + that /tokens lands on the unified non-admin body.

tests/test_web_ui.py
- admin_users nav assertion: 'Tokens' link present, 'Profile' link absent.
- Add test_nav_shows_tokens_link_for_non_admin: non-admins see the same
  'Tokens' link (previously only admins did).
- Add test_profile_redirects_to_tokens back-compat check.

* feat(web): collapse 'What Claude Code will receive' by default

The preview block on /dashboard and /install now uses <details>/<summary>
so it is hidden by default. Click the chevron/title to expand and review
the clipboard payload. Markup stays in the DOM so existing tests that
assert on content continue to pass.

* fix(web): /tokens width — override .container to 1280px like dashboard

The negative-margin full-bleed trick was fragile and pushed content past
the right edge on deployed viewports. Replace with a simple max-width
override of base.html's .container on this page only, matching
/dashboard's 1280px center-column layout.

* feat(web): split role-aware /tokens into my_tokens.html + admin_tokens.html

* feat(web): router — separate handlers for /tokens (own) and /admin/tokens (all)

* feat(web): nav — show Tokens for all, add All tokens for admins

* test: cover split token pages (own vs all) + admin access gating

* feat(web): move 'My tokens' into a user dropdown menu

Replaces the separate Tokens/email/Logout nav trio with a rounded
avatar trigger that opens a dropdown containing the user's email,
role, a 'My tokens' link, and Logout. Admin-only 'All tokens' stays
as a top-level nav item since it's an admin function, not a personal
one. Click-outside and Escape close the panel; chevron rotates on
open.

* fix(api): allow PATs to list/get/revoke their own tokens (CLI flow)

The documented 'da auth token list/revoke' CLI flow in
docs/HEADLESS_USAGE.md uses a PAT, but the previous dependency
(require_session_token) returned 403. Only create_token must be
session-only to prevent PAT-spawning-PAT chains; listing and
revoking your own tokens is safe with a PAT.

* fix(api): cap expires_in_days at 3650 to avoid datetime overflow (500 to 400)

Values above ~11 million days overflowed datetime.max in
datetime.now(utc) + timedelta(days=...) and surfaced as an
unhandled OverflowError → 500. Cap at 10 years with a clear
400 instead; the no-expiry code path is unaffected.

* fix(api): relax _SAFE_URL_RE to allow path prefixes, underscores, and IPv6

The previous regex rejected legitimate reverse-proxy base_url values
(https://host/agnes/), underscores in Docker Compose hostnames, and
IPv6 literals (http://[::1]:8000). Widen the charset and allow an
optional trailing path. shlex.quote continues to provide
defense-in-depth against any metacharacter that slips through.

* fix(web): /login/email and Google OAuth propagate next_path

Previously, /login/email silently dropped the ?next=<path> query
param so the hidden form field rendered empty and login always
landed on /dashboard. Google's button was hard-coded to
/auth/google/login, ignoring next entirely.

- /login page now appends ?next to the Google button URL
- /login/email reads + sanitizes next, passes as template context
- google_login stashes sanitized next_path in session['login_next']
- google_callback pops + re-sanitizes and redirects there

Sanitization factored into app/auth/_common.safe_next_path.

* fix(auth): differentiate argon2 VerifyMismatchError from internal errors in web login

The previous except (VerifyMismatchError, Exception) collapsed both
cases into the generic 'invalid credentials' redirect, silently
hiding corrupted-hash / library errors from ops. Split the two:
bad password still gets ?error=invalid; anything else logs via
logger.exception and redirects with ?err=auth_internal so ops have
a visible signal and users don't retry forever against a broken
password_hash column.

* docs: correct CLAUDE.md table name (personal_access_tokens)

v7 note referenced 'access_tokens.last_used_ip' but the real table
is personal_access_tokens (as mentioned two tokens earlier in the
same bullet). Same-file consistency fix.

* chore(web): clarify admin user-reset UI — encourage Set password over the unused reset_token

POST /api/users/{id}/reset-password stores and returns a token
but no endpoint consumes it — the magic-link sender would log the
user in without prompting for a new password, defeating the reset.
- Drop the 'Reset' row action from admin_users so admins aren't
  pointed at a dead end.
- Rewrite the reveal-modal copy to tell admins to use Set password
  and explicitly note that the magic-link flow isn't available
  for reset tokens in this build.
The API endpoint stays for API-level future use.

* test: cover PAT CLI flow, expires_in_days overflow, proxy base_url, next propagation

- tests/test_pat.py: PAT can list own tokens (200, was 403);
  PAT can revoke own tokens (204); create_token returns 400 for
  expires_in_days > 3650 (was 500 via datetime overflow).
- tests/test_cli_artifacts.py: _SAFE_URL_RE accepts reverse-proxy
  path prefixes, underscores, and IPv6 literals; end-to-end check
  of cli_install_script with a stubbed base_url that includes
  a path prefix (Agnes behind /agnes/).
- tests/test_web_ui.py: /login propagates ?next to the Google
  button URL; /login/email renders next in the hidden form field
  and strips hostile values; unit coverage of safe_next_path.

* fix(security): use \Z instead of $ in URL/version allowlists (trailing-\n bypass)

Python regex `$` also matches just before a trailing newline, so a Host
header or AGNES_VERSION value like "good.example.com\n$(rm -rf /)"
would slip past the allowlist. `\Z` anchors to strict end-of-string.

shlex.quote downstream remains as defense-in-depth, but the allowlist
is now the tight gate it claims to be.

* fix(auth): PAT with null expiry omits JWT exp claim (DB is the source of truth)

Previously a PAT created with `expires_in_days=null` (user-requested
"never expires") set the DB `expires_at` to NULL (correct) but still
baked a ~100y `exp` claim into the JWT. That is misleading: the PAT
silently did expire eventually, despite the UI and API promising
"no expiry".

`create_access_token` now accepts `omit_exp=True` to skip the `exp`
claim entirely. `app/api/tokens.py` passes that when `expires_in_days
is None`. The authoritative expiry check lives in
`app/auth/dependencies.py`, which reads `expires_at` from the DB row —
unchanged. PyJWT accepts claim-less JWTs indefinitely.

* test: cover trailing-newline regex bypass + no-exp JWT for unbounded PAT

- test_safe_url_re_rejects_trailing_newline_bypass: asserts both
  `_SAFE_URL_RE` and `_SAFE_VERSION_RE` reject values with a trailing
  `\n` (previously accepted because Python `$` matches before `\n`).
- test_pat_null_expiry_jwt_has_no_exp_claim: POST /auth/tokens with
  `expires_in_days=null`, decode the returned JWT, assert `exp` is
  absent while `typ=pat`, `sub`, and `jti` are still present.
- test_pat_with_null_expiry_is_accepted_by_verify_token: verify_token
  round-trips a claim-less JWT without ExpiredSignatureError.
- test_pat_null_expiry_end_to_end_allows_authenticated_request: use
  the null-expiry PAT against /auth/tokens and confirm it authenticates.

* docs(auth): document X-Forwarded-For trust model in _client_ip

Deployment runs behind Caddy which strips incoming X-Forwarded-For
and sets its own, so the leftmost hop is trustworthy. Clarify that
the stored last_used_ip is audit-only and never used for access
control — if the app is ever exposed directly, this value becomes
client-settable.

* docs: /profile → /tokens in install.sh next-steps, CLI error, HEADLESS_USAGE, security skill

After splitting PAT management to /tokens (with /profile as a back-compat
302), stale references remained in user-facing text. Update them to the
canonical /tokens URL so shell scripts, CLI error hints, docs, and the
bundled security skill are all consistent.
2026-04-22 14:24:28 +02:00

959 lines
33 KiB
Python

"""
Proof-of-concept: Connector Kit architecture validation.
Tests that the proposed Connector Protocol + Runtime model is:
1. Implementable in Python (Protocol, Cap flags, partial implementation)
2. Arrow RecordBatch iteration works with DuckDB (zero-copy)
3. ConnectorRuntime can build extract.duckdb from any connector
4. Schema evolution detection works via Arrow schema diff
5. A real connector can be written in ~50 lines
6. Incremental state tracking works
7. Manifest validation works
8. Discovery → read pipeline is end-to-end functional
"""
import asyncio
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from enum import Flag, auto
from pathlib import Path
from typing import AsyncIterator, Iterator, Protocol, runtime_checkable
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
import yaml
# ============================================================================
# Layer 2: Connector Protocol (the contract)
# ============================================================================
class Cap(Flag):
"""Connector capabilities — declare what you support."""
DISCOVER = auto()
READ = auto()
STREAM = auto()
REMOTE = auto()
WRITE = auto()
@dataclass
class TableInfo:
name: str
schema: pa.Schema
capabilities: Cap
primary_key: list[str] | None = None
description: str = ""
@dataclass
class ReadOptions:
columns: list[str] | None = None
filter: dict | None = None
incremental_key: str | None = None
incremental_value: str | None = None
batch_size: int = 10_000
@dataclass
class RemoteAttachInfo:
extension: str
url: str
token_env: str
@runtime_checkable
class Connector(Protocol):
@property
def capabilities(self) -> Cap: ...
def discover(self) -> list[TableInfo]: ...
def read(self, table: str, options: ReadOptions) -> Iterator[pa.RecordBatch]: ...
# ============================================================================
# Layer 3: Connector Runtime (the SDK — replaces manual boilerplate)
# ============================================================================
@dataclass
class ExtractStats:
tables_extracted: int = 0
tables_failed: int = 0
total_rows: int = 0
schema_changes: list[str] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
class ConnectorRuntime:
"""Handles extract.duckdb lifecycle — what every connector does manually today."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.data_dir = output_dir / "data"
self.db_path = output_dir / "extract.duckdb"
self.state_path = output_dir / ".state.yaml"
self.data_dir.mkdir(parents=True, exist_ok=True)
def run(self, connector: Connector, tables: list[str] | None = None) -> ExtractStats:
stats = ExtractStats()
# 1. Discovery
available: list[TableInfo] = []
if Cap.DISCOVER in connector.capabilities:
available = connector.discover()
# If no tables specified, extract all discovered
if tables is None:
tables = [t.name for t in available if Cap.READ in t.capabilities]
# 2. Schema evolution check
for table_name in tables:
table_info = self._find_table(available, table_name)
if table_info:
change = self._check_schema_evolution(table_name, table_info.schema)
if change:
stats.schema_changes.append(change)
# 3. Extract via read()
if Cap.READ in connector.capabilities:
for table_name in tables:
try:
options = self._build_read_options(table_name, available)
rows = self._extract_table(connector, table_name, options)
stats.tables_extracted += 1
stats.total_rows += rows
except Exception as e:
stats.tables_failed += 1
stats.errors.append(f"{table_name}: {e}")
# 4. Remote attach (if supported)
if Cap.REMOTE in connector.capabilities:
try:
info = connector.remote() # type: ignore[attr-defined]
self._write_remote_attach(info)
except Exception as e:
stats.errors.append(f"remote_attach: {e}")
# 5. Build extract.duckdb (_meta + views)
self._build_extract_db(available, tables)
# 6. Save incremental state
self._save_state(tables)
return stats
def _extract_table(self, connector: Connector, table: str, options: ReadOptions) -> int:
"""Extract a table via Arrow RecordBatch iterator → Parquet."""
parquet_path = self.data_dir / f"{table}.parquet"
writer = None
total_rows = 0
for batch in connector.read(table, options):
if writer is None:
writer = pq.ParquetWriter(str(parquet_path), batch.schema)
writer.write_batch(batch)
total_rows += batch.num_rows
if writer:
writer.close()
return total_rows
def _build_extract_db(self, available: list[TableInfo], tables: list[str]):
"""Build extract.duckdb with _meta and views — atomic swap."""
tmp_db = self.output_dir / "extract.duckdb.tmp"
if tmp_db.exists():
tmp_db.unlink()
con = duckdb.connect(str(tmp_db))
try:
# _meta table
con.execute("""
CREATE TABLE _meta (
table_name VARCHAR NOT NULL,
description VARCHAR,
rows BIGINT,
size_bytes BIGINT,
extracted_at TIMESTAMP DEFAULT current_timestamp,
query_mode VARCHAR DEFAULT 'local',
schema_json VARCHAR
)
""")
for table_name in tables:
parquet_path = self.data_dir / f"{table_name}.parquet"
if parquet_path.exists():
# Create view pointing to parquet
con.execute(
f'CREATE VIEW "{table_name}" AS '
f"SELECT * FROM read_parquet('{parquet_path}')"
)
# Get row count + size
rows = con.execute(f'SELECT count(*) FROM "{table_name}"').fetchone()[0]
size = parquet_path.stat().st_size
# Find description and schema
info = self._find_table(available, table_name)
desc = info.description if info else ""
schema_json = info.schema.to_string() if info else ""
con.execute(
"INSERT INTO _meta (table_name, description, rows, size_bytes, schema_json) "
"VALUES (?, ?, ?, ?, ?)",
[table_name, desc, rows, size, schema_json],
)
finally:
con.close()
# Atomic swap
if self.db_path.exists():
self.db_path.unlink()
# Clean WAL if exists
wal = Path(str(tmp_db) + ".wal")
if wal.exists():
wal.unlink()
tmp_db.rename(self.db_path)
def _find_table(self, available: list[TableInfo], name: str) -> TableInfo | None:
return next((t for t in available if t.name == name), None)
def _check_schema_evolution(self, table: str, new_schema: pa.Schema) -> str | None:
"""Detect schema changes by comparing Arrow schemas."""
schema_file = self.output_dir / f".schema_{table}.arrow"
if schema_file.exists():
reader = pa.ipc.open_stream(schema_file.read_bytes())
old_schema = reader.schema
if old_schema != new_schema:
# Diff: added, removed, changed fields
old_names = set(old_schema.names)
new_names = set(new_schema.names)
added = new_names - old_names
removed = old_names - new_names
msg = f"{table}: "
if added:
msg += f"+{added} "
if removed:
msg += f"-{removed} "
# Check type changes for common fields
for name in old_names & new_names:
old_type = old_schema.field(name).type
new_type = new_schema.field(name).type
if old_type != new_type:
msg += f"{name}:{old_type}{new_type} "
# Save new schema
self._save_schema(table, new_schema)
return msg.strip()
else:
self._save_schema(table, new_schema)
return None
def _save_schema(self, table: str, schema: pa.Schema):
"""Serialize Arrow schema via IPC stream (compatible with all PyArrow versions)."""
schema_file = self.output_dir / f".schema_{table}.arrow"
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, schema)
writer.close()
schema_file.write_bytes(sink.getvalue().to_pybytes())
def _build_read_options(self, table: str, available: list[TableInfo]) -> ReadOptions:
"""Build ReadOptions with incremental state if available."""
options = ReadOptions()
state = self._load_state()
if table in state:
options.incremental_key = state[table].get("incremental_key")
options.incremental_value = state[table].get("incremental_value")
return options
def _load_state(self) -> dict:
if self.state_path.exists():
return yaml.safe_load(self.state_path.read_text()) or {}
return {}
def _save_state(self, tables: list[str]):
state = self._load_state()
for table in tables:
if table not in state:
state[table] = {}
state[table]["last_extracted"] = str(duckdb.query("SELECT current_timestamp").fetchone()[0])
self.state_path.write_text(yaml.dump(state))
def _write_remote_attach(self, info: RemoteAttachInfo):
"""Write _remote_attach info for orchestrator."""
# This gets added to extract.duckdb in _build_extract_db
# For POC, store as yaml; real impl writes to DuckDB
ra_path = self.output_dir / ".remote_attach.yaml"
ra_path.write_text(
yaml.dump({"extension": info.extension, "url": info.url, "token_env": info.token_env})
)
# ============================================================================
# Example connectors (proving the contract works)
# ============================================================================
class SampleAPIConnector:
"""
A sample connector simulating an HTTP API source.
Proves: ~50 lines for a complete connector implementation.
"""
capabilities = Cap.DISCOVER | Cap.READ
ORDERS_SCHEMA = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("customer", pa.string()),
pa.field("amount", pa.float64()),
pa.field("date", pa.string()),
]
)
USERS_SCHEMA = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("email", pa.string()),
]
)
# Simulated API data
_data = {
"orders": [
{"id": 1, "customer": "Alice", "amount": 100.0, "date": "2026-01-15"},
{"id": 2, "customer": "Bob", "amount": 250.0, "date": "2026-02-01"},
{"id": 3, "customer": "Carol", "amount": 75.5, "date": "2026-03-10"},
],
"users": [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
],
}
def discover(self) -> list[TableInfo]:
return [
TableInfo(
name="orders",
schema=self.ORDERS_SCHEMA,
capabilities=Cap.READ,
primary_key=["id"],
description="Sales orders",
),
TableInfo(
name="users",
schema=self.USERS_SCHEMA,
capabilities=Cap.READ,
primary_key=["id"],
description="Registered users",
),
]
def read(self, table: str, options: ReadOptions) -> Iterator[pa.RecordBatch]:
data = self._data.get(table, [])
schema = self.ORDERS_SCHEMA if table == "orders" else self.USERS_SCHEMA
# Simulate batched reading (batch_size controls chunking)
for i in range(0, len(data), options.batch_size):
chunk = data[i : i + options.batch_size]
arrays = [pa.array([row[col] for row in chunk], type=schema.field(col).type) for col in schema.names]
yield pa.RecordBatch.from_arrays(arrays, schema=schema)
class StreamingConnector:
"""Proves: async stream capability works."""
capabilities = Cap.DISCOVER | Cap.STREAM
EVENTS_SCHEMA = pa.schema(
[
pa.field("event_id", pa.string()),
pa.field("type", pa.string()),
pa.field("payload", pa.string()),
]
)
def discover(self) -> list[TableInfo]:
return [
TableInfo(
name="events",
schema=self.EVENTS_SCHEMA,
capabilities=Cap.STREAM,
description="Real-time events",
)
]
async def stream(self, table: str) -> AsyncIterator[pa.RecordBatch]:
"""Simulate webhook events arriving."""
events = [
{"event_id": "e1", "type": "created", "payload": '{"issue": "PROJ-1"}'},
{"event_id": "e2", "type": "updated", "payload": '{"issue": "PROJ-2"}'},
{"event_id": "e3", "type": "deleted", "payload": '{"issue": "PROJ-3"}'},
]
for event in events:
arrays = [pa.array([event[col]], type=self.EVENTS_SCHEMA.field(col).type) for col in self.EVENTS_SCHEMA.names]
yield pa.RecordBatch.from_arrays(arrays, schema=self.EVENTS_SCHEMA)
class RemoteOnlyConnector:
"""Proves: remote-only connector (like BigQuery) works."""
capabilities = Cap.DISCOVER | Cap.REMOTE
def discover(self) -> list[TableInfo]:
return [
TableInfo(
name="big_table",
schema=pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.string())]),
capabilities=Cap.REMOTE,
description="Remote-only table, queries go to source",
)
]
def remote(self) -> RemoteAttachInfo:
return RemoteAttachInfo(
extension="bigquery",
url="project_id=my-project",
token_env="GOOGLE_APPLICATION_CREDENTIALS",
)
# ============================================================================
# Tests
# ============================================================================
class TestCapabilityFlags:
"""Test 1: Cap Flag enum works for declaration and checking."""
def test_flag_composition(self):
caps = Cap.DISCOVER | Cap.READ | Cap.REMOTE
assert Cap.DISCOVER in caps
assert Cap.READ in caps
assert Cap.REMOTE in caps
assert Cap.STREAM not in caps
assert Cap.WRITE not in caps
def test_per_table_capabilities(self):
info = TableInfo(
name="orders",
schema=pa.schema([pa.field("id", pa.int64())]),
capabilities=Cap.READ | Cap.STREAM,
)
assert Cap.READ in info.capabilities
assert Cap.STREAM in info.capabilities
assert Cap.WRITE not in info.capabilities
def test_flag_iteration(self):
"""Can iterate individual flags from a composite."""
caps = Cap.DISCOVER | Cap.READ | Cap.STREAM
individual = list(caps)
assert len(individual) == 3
assert Cap.DISCOVER in individual
class TestProtocolCompliance:
"""Test 2: Protocol type checking works at runtime."""
def test_sample_connector_is_connector(self):
c = SampleAPIConnector()
assert isinstance(c, Connector)
def test_streaming_connector_partial_protocol(self):
"""StreamingConnector doesn't implement read() — that's OK.
Protocol is structural, not enforced for methods you don't use."""
c = StreamingConnector()
assert hasattr(c, "capabilities")
assert hasattr(c, "discover")
assert Cap.STREAM in c.capabilities
def test_remote_connector_is_valid(self):
c = RemoteOnlyConnector()
assert hasattr(c, "discover")
assert hasattr(c, "remote")
assert Cap.REMOTE in c.capabilities
class TestArrowIntegration:
"""Test 3: Arrow RecordBatch → DuckDB zero-copy works."""
def test_record_batch_to_duckdb(self):
"""DuckDB can query Arrow RecordBatches directly."""
schema = pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.string())])
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array(["a", "b", "c"])],
schema=schema,
)
con = duckdb.connect()
result = con.execute("SELECT * FROM batch WHERE id > 1").fetchall()
assert len(result) == 2
assert result[0] == (2, "b")
def test_record_batch_iterator_to_duckdb(self):
"""DuckDB can consume an iterator of RecordBatches."""
schema = pa.schema([pa.field("value", pa.float64())])
def generate_batches():
for i in range(3):
yield pa.RecordBatch.from_arrays(
[pa.array([float(i * 10 + j) for j in range(5)])],
schema=schema,
)
reader = pa.RecordBatchReader.from_batches(schema, generate_batches())
con = duckdb.connect()
result = con.execute("SELECT count(*), sum(value) FROM reader").fetchone()
assert result[0] == 15 # 3 batches * 5 rows
assert result[1] == sum(float(i * 10 + j) for i in range(3) for j in range(5))
def test_arrow_to_parquet_roundtrip(self):
"""Arrow → Parquet → DuckDB roundtrip preserves data."""
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("amount", pa.float64()),
pa.field("label", pa.string()),
]
)
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2]), pa.array([99.9, 200.0]), pa.array(["x", "y"])],
schema=schema,
)
with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as f:
pq.write_table(pa.Table.from_batches([batch]), f.name)
con = duckdb.connect()
result = con.execute(f"SELECT * FROM read_parquet('{f.name}')").fetchall()
assert result == [(1, 99.9, "x"), (2, 200.0, "y")]
os.unlink(f.name)
class TestConnectorRuntime:
"""Test 4: Full runtime pipeline — connector → extract.duckdb."""
@pytest.fixture
def output_dir(self, tmp_path):
return tmp_path / "extract_test"
def test_full_extract_pipeline(self, output_dir):
"""End-to-end: connector → runtime → extract.duckdb with _meta + views."""
connector = SampleAPIConnector()
runtime = ConnectorRuntime(output_dir)
stats = runtime.run(connector)
# Stats are correct
assert stats.tables_extracted == 2
assert stats.tables_failed == 0
assert stats.total_rows == 5 # 3 orders + 2 users
assert stats.errors == []
# extract.duckdb exists and is valid
db_path = output_dir / "extract.duckdb"
assert db_path.exists()
con = duckdb.connect(str(db_path), read_only=True)
# _meta table has both tables
meta = con.execute("SELECT table_name, rows, description FROM _meta ORDER BY table_name").fetchall()
assert len(meta) == 2
assert meta[0] == ("orders", 3, "Sales orders")
assert meta[1] == ("users", 2, "Registered users")
# Views work — can query data through extract.duckdb
orders = con.execute("SELECT * FROM orders ORDER BY id").fetchall()
assert len(orders) == 3
assert orders[0] == (1, "Alice", 100.0, "2026-01-15")
users = con.execute("SELECT * FROM users ORDER BY id").fetchall()
assert len(users) == 2
assert users[0][1] == "Alice"
# Cross-table query works
result = con.execute("""
SELECT u.name, SUM(o.amount) as total
FROM orders o JOIN users u ON o.customer = u.name
GROUP BY u.name ORDER BY total DESC
""").fetchall()
assert result[0] == ("Bob", 250.0)
assert result[1] == ("Alice", 100.0)
con.close()
def test_selective_table_extract(self, output_dir):
"""Can extract specific tables only."""
connector = SampleAPIConnector()
runtime = ConnectorRuntime(output_dir)
stats = runtime.run(connector, tables=["orders"])
assert stats.tables_extracted == 1
assert stats.total_rows == 3
con = duckdb.connect(str(output_dir / "extract.duckdb"), read_only=True)
tables = con.execute("SELECT table_name FROM _meta").fetchall()
assert tables == [("orders",)]
con.close()
def test_incremental_state_tracking(self, output_dir):
"""Runtime saves and loads incremental state between runs."""
connector = SampleAPIConnector()
runtime = ConnectorRuntime(output_dir)
# First run
runtime.run(connector, tables=["orders"])
# State file exists
state_path = output_dir / ".state.yaml"
assert state_path.exists()
state = yaml.safe_load(state_path.read_text())
assert "orders" in state
assert "last_extracted" in state["orders"]
# Second run — state persists
runtime2 = ConnectorRuntime(output_dir)
runtime2.run(connector, tables=["orders"])
state2 = yaml.safe_load(state_path.read_text())
assert "orders" in state2
def test_empty_table_handling(self, output_dir):
"""Connector that yields nothing for a table doesn't crash."""
class EmptyConnector:
capabilities = Cap.DISCOVER | Cap.READ
def discover(self) -> list[TableInfo]:
return [
TableInfo(
name="empty",
schema=pa.schema([pa.field("id", pa.int64())]),
capabilities=Cap.READ,
)
]
def read(self, table: str, options: ReadOptions) -> Iterator[pa.RecordBatch]:
return iter([]) # No data
runtime = ConnectorRuntime(output_dir)
stats = runtime.run(EmptyConnector())
# Extracted 0 rows, but no failure
assert stats.tables_extracted == 1
assert stats.total_rows == 0
assert stats.errors == []
def test_error_in_one_table_doesnt_stop_others(self, output_dir):
"""Partial failure: one table fails, others still extract."""
class PartialFailConnector:
capabilities = Cap.DISCOVER | Cap.READ
def discover(self) -> list[TableInfo]:
return [
TableInfo("good", pa.schema([pa.field("id", pa.int64())]), Cap.READ),
TableInfo("bad", pa.schema([pa.field("id", pa.int64())]), Cap.READ),
]
def read(self, table: str, options: ReadOptions) -> Iterator[pa.RecordBatch]:
if table == "bad":
raise ConnectionError("API timeout")
yield pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3])],
schema=pa.schema([pa.field("id", pa.int64())]),
)
runtime = ConnectorRuntime(output_dir)
stats = runtime.run(PartialFailConnector())
assert stats.tables_extracted == 1
assert stats.tables_failed == 1
assert "bad: API timeout" in stats.errors[0]
class TestSchemaEvolution:
"""Test 5: Schema change detection via Arrow schema diff."""
def test_detect_added_column(self, tmp_path):
output_dir = tmp_path / "schema_test"
runtime = ConnectorRuntime(output_dir)
# V1 schema
schema_v1 = pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.string())])
runtime._save_schema("orders", schema_v1)
# V2 schema — added column
schema_v2 = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("email", pa.string()),
]
)
change = runtime._check_schema_evolution("orders", schema_v2)
assert change is not None
assert "email" in change
assert "+" in change
def test_detect_removed_column(self, tmp_path):
output_dir = tmp_path / "schema_test"
runtime = ConnectorRuntime(output_dir)
schema_v1 = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("old_field", pa.string()),
]
)
runtime._save_schema("orders", schema_v1)
schema_v2 = pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.string())])
change = runtime._check_schema_evolution("orders", schema_v2)
assert change is not None
assert "old_field" in change
assert "-" in change
def test_detect_type_change(self, tmp_path):
output_dir = tmp_path / "schema_test"
runtime = ConnectorRuntime(output_dir)
schema_v1 = pa.schema([pa.field("id", pa.int32()), pa.field("value", pa.string())])
runtime._save_schema("data", schema_v1)
schema_v2 = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.string())])
change = runtime._check_schema_evolution("data", schema_v2)
assert change is not None
assert "int32" in change
assert "int64" in change
def test_no_change_detected(self, tmp_path):
output_dir = tmp_path / "schema_test"
runtime = ConnectorRuntime(output_dir)
schema = pa.schema([pa.field("id", pa.int64())])
runtime._save_schema("stable", schema)
change = runtime._check_schema_evolution("stable", schema)
assert change is None
def test_first_run_no_previous_schema(self, tmp_path):
output_dir = tmp_path / "schema_test"
runtime = ConnectorRuntime(output_dir)
schema = pa.schema([pa.field("id", pa.int64())])
change = runtime._check_schema_evolution("new_table", schema)
assert change is None # First run, no previous schema to compare
class TestStreamingCapability:
"""Test 6: Async streaming connector works."""
def test_async_stream(self):
async def _run():
connector = StreamingConnector()
batches = []
async for batch in connector.stream("events"):
batches.append(batch)
return batches
batches = asyncio.run(_run())
assert len(batches) == 3
assert batches[0].num_rows == 1
assert batches[0].column("type")[0].as_py() == "created"
def test_stream_to_duckdb(self):
"""Stream batches can be consumed by DuckDB."""
async def _run():
connector = StreamingConnector()
all_batches = []
async for batch in connector.stream("events"):
all_batches.append(batch)
return all_batches
all_batches = asyncio.run(_run())
arrow_table = pa.Table.from_batches(all_batches)
con = duckdb.connect()
result = con.execute("SELECT count(*) FROM arrow_table").fetchone()
assert result[0] == 3
class TestRemoteOnlyConnector:
"""Test 7: Remote-only connector produces correct metadata."""
def test_remote_attach_info(self, tmp_path):
output_dir = tmp_path / "remote_test"
connector = RemoteOnlyConnector()
runtime = ConnectorRuntime(output_dir)
stats = runtime.run(connector)
# No tables extracted (remote only), but no errors
assert stats.tables_extracted == 0
assert stats.errors == []
# Remote attach info saved
ra_path = output_dir / ".remote_attach.yaml"
assert ra_path.exists()
ra = yaml.safe_load(ra_path.read_text())
assert ra["extension"] == "bigquery"
assert ra["token_env"] == "GOOGLE_APPLICATION_CREDENTIALS"
class TestManifestValidation:
"""Test 8: YAML manifest parsing and validation."""
SAMPLE_MANIFEST = """
name: sample_api
version: "1.0.0"
description: "Sample API connector"
entrypoint: connectors.sample.SampleAPIConnector
capabilities: [discover, read]
auth:
type: token
env_vars:
- name: SAMPLE_API_TOKEN
required: true
description: "API authentication token"
config:
base_url:
type: string
required: true
batch_size:
type: integer
required: false
default: 1000
health_check:
endpoint: "${base_url}/health"
method: GET
expect_status: 200
"""
def test_manifest_parses(self):
manifest = yaml.safe_load(self.SAMPLE_MANIFEST)
assert manifest["name"] == "sample_api"
assert manifest["version"] == "1.0.0"
assert "discover" in manifest["capabilities"]
assert "read" in manifest["capabilities"]
def test_manifest_capabilities_to_flags(self):
manifest = yaml.safe_load(self.SAMPLE_MANIFEST)
cap_map = {c.name.lower(): c for c in Cap}
flags = Cap(0)
for c in manifest["capabilities"]:
flags |= cap_map[c]
assert Cap.DISCOVER in flags
assert Cap.READ in flags
assert Cap.STREAM not in flags
def test_manifest_auth_config(self):
manifest = yaml.safe_load(self.SAMPLE_MANIFEST)
assert manifest["auth"]["type"] == "token"
assert manifest["auth"]["env_vars"][0]["name"] == "SAMPLE_API_TOKEN"
assert manifest["auth"]["env_vars"][0]["required"] is True
def test_manifest_config_schema(self):
manifest = yaml.safe_load(self.SAMPLE_MANIFEST)
assert manifest["config"]["base_url"]["required"] is True
assert manifest["config"]["batch_size"]["default"] == 1000
def test_manifest_health_check(self):
manifest = yaml.safe_load(self.SAMPLE_MANIFEST)
hc = manifest["health_check"]
assert "${base_url}" in hc["endpoint"]
assert hc["expect_status"] == 200
class TestDiscoveryToReadPipeline:
"""Test 9: Full discovery → read → query pipeline."""
def test_discover_then_read_all(self, tmp_path):
"""discover() → pick tables → read() → query in DuckDB."""
connector = SampleAPIConnector()
# Step 1: Discovery
tables = connector.discover()
assert len(tables) == 2
assert all(isinstance(t, TableInfo) for t in tables)
assert all(t.schema is not None for t in tables)
# Step 2: Read via runtime (auto-discovers all tables)
runtime = ConnectorRuntime(tmp_path / "full_pipeline")
stats = runtime.run(connector) # No tables= arg → discovers automatically
assert stats.tables_extracted == 2
# Step 3: Query
con = duckdb.connect(str(tmp_path / "full_pipeline" / "extract.duckdb"), read_only=True)
result = con.execute("""
SELECT table_name, rows, description
FROM _meta ORDER BY table_name
""").fetchall()
assert result[0][0] == "orders"
assert result[0][1] == 3
con.close()
class TestLargeDataBatching:
"""Test 10: Connector can handle large data via batched iteration."""
def test_batched_read_memory_constant(self, tmp_path):
"""Large dataset extracted in batches — memory doesn't explode."""
class LargeConnector:
capabilities = Cap.DISCOVER | Cap.READ
NUM_BATCHES = 100
BATCH_SIZE = 1000
def discover(self) -> list[TableInfo]:
return [
TableInfo(
"big_table",
pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.float64())]),
Cap.READ,
)
]
def read(self, table: str, options: ReadOptions) -> Iterator[pa.RecordBatch]:
schema = pa.schema([pa.field("id", pa.int64()), pa.field("value", pa.float64())])
for batch_num in range(self.NUM_BATCHES):
start = batch_num * self.BATCH_SIZE
yield pa.RecordBatch.from_arrays(
[
pa.array(range(start, start + self.BATCH_SIZE), type=pa.int64()),
pa.array(
[float(i) * 0.1 for i in range(start, start + self.BATCH_SIZE)],
type=pa.float64(),
),
],
schema=schema,
)
runtime = ConnectorRuntime(tmp_path / "large_test")
stats = runtime.run(LargeConnector())
assert stats.total_rows == 100_000
assert stats.tables_extracted == 1
# Verify DuckDB can read it
con = duckdb.connect(str(tmp_path / "large_test" / "extract.duckdb"), read_only=True)
count = con.execute("SELECT count(*) FROM big_table").fetchone()[0]
assert count == 100_000
con.close()