From 61f6b8d2d5418caffcac7f8fd208a533fd705fae Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr <139972147+ZdenekSrotyr@users.noreply.github.com> Date: Wed, 29 Apr 2026 09:18:55 +0200 Subject: [PATCH] =?UTF-8?q?feat(ci+tests):=20deploy=20safety=20audit=20?= =?UTF-8?q?=E2=80=94=20linting,=20rollback,=20smoke=20tests,=2050+=20new?= =?UTF-8?q?=20tests=20(#120)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Comprehensive deploy safety audit implementing 19 improvements across CI/CD pipeline, test coverage, and source code. ### CI/CD Pipeline - ruff + mypy added to both release.yml and keboola-deploy.yml (continue-on-error) - Smoke test added to keboola-deploy.yml (was missing) - Automatic rollback on smoke test failure in release.yml - Expanded smoke-test.sh with catalog, admin/tables, marketplace.zip, metrics - Required status checks via .github/settings.yml - Dependabot + CODEOWNERS + pre-commit hooks + ruff config ### Source Code - DB schema version check in /api/health (db_schema: ok/mismatch/unhealthy) - Config versioning (config_version: 1 in instance.yaml, non-blocking validation) - BigQuery extractor ATTACH error handling (try/except around INSTALL+ATTACH) - Post-deploy smoke test script for prod VM validation ### Test Coverage (~50 new tests) - v13->v14 migration, Email magic link TTL, PAT, Marketplace ZIP/Git, Jira webhooks, Hybrid Query BQ, Keboola/BQ extractor failure modes, Orchestrator failure modes Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .github/CODEOWNERS | 9 + .github/dependabot.yml | 18 ++ .github/settings.yml | 9 + .github/workflows/keboola-deploy.yml | 43 +++ .github/workflows/release.yml | 134 +++++++++ .pre-commit-config.yaml | 26 ++ ARCHITECTURE.md | 267 +++++++++++------- CHANGELOG.md | 71 +++++ Dockerfile | 7 +- README.md | 2 +- app/api/health.py | 35 ++- config/instance.yaml.example | 5 + config/loader.py | 28 ++ connectors/bigquery/extractor.py | 33 ++- docs/DATA_SOURCES.md | 155 +++++++--- docs/QUICKSTART.md | 48 +++- docs/RBAC.md | 19 +- docs/auth-groups.md | 26 +- .../customer-instance/startup-script.sh.tpl | 6 + pyproject.toml | 4 + scripts/ops/post-deploy-smoke-test.sh | 99 +++++++ scripts/smoke-test.sh | 77 ++++- tests/test_auth_providers.py | 91 ++++++ tests/test_bigquery_extractor.py | 206 ++++++++++++++ tests/test_db.py | 195 +++++++++++++ tests/test_jira_webhooks.py | 247 ++++++++++++++++ tests/test_keboola_extractor.py | 183 ++++++++++++ tests/test_marketplace_server_git.py | 160 +++++------ tests/test_marketplace_server_zip.py | 132 ++++++--- tests/test_orchestrator.py | 127 +++++++++ tests/test_pat.py | 91 ++++++ tests/test_remote_query.py | 253 +++++++++++++++++ 32 files changed, 2504 insertions(+), 302 deletions(-) create mode 100644 .github/CODEOWNERS create mode 100644 .github/dependabot.yml create mode 100644 .github/settings.yml create mode 100644 .pre-commit-config.yaml create mode 100755 scripts/ops/post-deploy-smoke-test.sh diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..b538fba --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,9 @@ +# Default reviewers for everything +* @keboola/agnes-team + +# Infrastructure changes require infra team +/infra/ @keboola/agnes-team + +# Security-sensitive areas +/app/auth/ @keboola/agnes-team +/src/db.py @keboola/agnes-team diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..1a13cbb --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,18 @@ +version: 2 +updates: + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "weekly" + day: "monday" + open-pull-requests-limit: 5 + reviewers: + - "keboola/agnes-team" + labels: + - "dependencies" + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + day: "monday" diff --git a/.github/settings.yml b/.github/settings.yml new file mode 100644 index 0000000..a215759 --- /dev/null +++ b/.github/settings.yml @@ -0,0 +1,9 @@ +repository: + branch_protection_rules: + main: + required_status_checks: + strict: true + contexts: + - test + enforce_admins: false + required_pull_request_reviews: null diff --git a/.github/workflows/keboola-deploy.yml b/.github/workflows/keboola-deploy.yml index 57b09d8..3c73186 100644 --- a/.github/workflows/keboola-deploy.yml +++ b/.github/workflows/keboola-deploy.yml @@ -42,6 +42,18 @@ jobs: - name: Install dependencies run: uv pip install --system ".[dev]" + - name: Lint with ruff + run: | + pip install ruff + ruff check . || true + continue-on-error: true # Don't block on pre-existing lint issues; can tighten later + + - name: Type check with mypy + run: | + pip install mypy + mypy src/ app/ cli/ connectors/ --ignore-missing-imports --no-error-summary || true + continue-on-error: true # Don't block on mypy initially, can tighten later + - name: Run tests run: pytest tests/ -v --tb=short env: @@ -95,3 +107,34 @@ jobs: tags: | ghcr.io/${{ github.repository }}:${{ steps.meta.outputs.tag }} ghcr.io/${{ github.repository }}:keboola-deploy-latest + + smoke-test: + needs: build-and-push + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + + - name: Start Agnes from built image + run: | + touch .env + export AGNES_TAG="${{ needs.build-and-push.outputs.image_tag }}" + docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.ci.yml up -d app + timeout 60 bash -c 'until curl -sf http://localhost:8000/api/health | python3 -c "import sys,json; d=json.load(sys.stdin); sys.exit(0 if d[\"status\"]!=\"unhealthy\" else 1)"; do sleep 3; done' + + - name: Run smoke tests + run: bash scripts/smoke-test.sh http://localhost:8000 + + - name: Collect logs on failure + if: failure() + run: docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.ci.yml logs > smoke-test-logs.txt + + - name: Upload logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: smoke-test-logs + path: smoke-test-logs.txt + + - name: Teardown + if: always() + run: docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.ci.yml down -v diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e0125e8..1686ba1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -56,6 +56,18 @@ jobs: - name: Install dependencies run: uv pip install --system ".[dev]" + - name: Lint with ruff + run: | + pip install ruff + ruff check . || true + continue-on-error: true # Don't block on pre-existing lint issues; can tighten later + + - name: Type check with mypy + run: | + pip install mypy + mypy src/ app/ cli/ connectors/ --ignore-missing-imports --no-error-summary || true + continue-on-error: true # Don't block on mypy initially, can tighten later + - name: Run tests run: pytest tests/ -v --tb=short env: @@ -208,6 +220,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v5 + with: + fetch-depth: 0 + fetch-tags: true - name: Start Agnes from built image run: | @@ -222,6 +237,38 @@ jobs: - name: Run smoke tests run: bash scripts/smoke-test.sh http://localhost:8000 + - name: Automatic rollback on failure + if: failure() + run: | + IMAGE_TAG="${{ needs.build-and-push.outputs.image_tag }}" + VERSION="${{ needs.build-and-push.outputs.version }}" + DEPRECATED_TAG="deprecated-${VERSION}" + REPO="ghcr.io/${{ github.repository }}" + + echo "Smoke test failed — initiating rollback" + + # Tag the current (failed) image as :deprecated-YYYY.MM.N + docker pull "${REPO}:${IMAGE_TAG}" + docker tag "${REPO}:${IMAGE_TAG}" "${REPO}:${DEPRECATED_TAG}" + docker push "${REPO}:${DEPRECATED_TAG}" + echo "Tagged failed image as ${REPO}:${DEPRECATED_TAG}" + + # Revert :stable to the previous known-good image + PREV_TAG=$(git tag -l "stable-*" --sort=-version:refname | head -2 | tail -1) + if [ -n "$PREV_TAG" ]; then + docker pull "${REPO}:${PREV_TAG}" + docker tag "${REPO}:${PREV_TAG}" "${REPO}:stable" + docker push "${REPO}:stable" + echo "Reverted :stable to ${PREV_TAG}" + else + echo "WARNING: No previous stable tag found — cannot revert :stable automatically" + fi + + # Create a GitHub issue alerting about the failure + ISSUE_TITLE="Smoke test failure — rollback to ${PREV_TAG:-unknown}" + ISSUE_BODY="## Automatic Rollback Report\n\nThe smoke test for image \`${IMAGE_TAG}\` failed.\n\n- **Failed image**: \`${REPO}:${IMAGE_TAG}\`\n- **Deprecated tag**: \`${REPO}:${DEPRECATED_TAG}\`\n- **Rolled back to**: \`${PREV_TAG:-N/A}\`\n- **Commit**: \`${{ github.sha }}\`\n- **Run**: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}\n\nPlease investigate and fix before re-deploying." + gh issue create --title "$ISSUE_TITLE" --body "$(echo -e "$ISSUE_BODY")" --label "bug" || echo "Failed to create GitHub issue (gh CLI may not be available)" + - name: Collect logs on failure if: failure() run: docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.ci.yml logs > smoke-test-logs.txt @@ -236,3 +283,90 @@ jobs: - name: Teardown if: always() run: docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.ci.yml down -v + + # Reproduces the deploy shape that broke agnes-development on 2026-04-29: + # the production stack uses docker-compose.host-mount.yml to bind-mount /data + # from the host PD instead of using a Docker named volume. Docker initializes + # a fresh named volume from the image's /data dir (which the Dockerfile + # chowns to agnes:agnes BEFORE switching USER), so the existing smoke-test + # job above never reproduces the "host /data is root-owned, container is + # USER agnes" scenario. This job pre-creates a host dir, applies the same + # chown the startup-script does on the GCE VM, and asserts the smoke + # passes — locking in the chown contract so removing it from + # startup-script.sh.tpl or flipping the Dockerfile uid breaks CI. + e2e-bind-mount: + needs: build-and-push + if: github.ref == 'refs/heads/main' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + + - name: Pre-create /data with root-owned subdirs (mimics fresh GCE PD) + run: | + sudo mkdir -p /tmp/agnes-data/{state,analytics,extracts} + sudo chown -R 0:0 /tmp/agnes-data + ls -la /tmp/agnes-data + + - name: Negative test — image must fail to write before chown + run: | + IMAGE="ghcr.io/${{ github.repository }}:${{ needs.build-and-push.outputs.image_tag }}" + # USER agnes (uid 999) writing to root-owned dir must fail. + if docker run --rm -v /tmp/agnes-data:/data "$IMAGE" \ + sh -c "touch /data/state/.probe" 2>/dev/null; then + echo "REGRESSION: write to root-owned /data unexpectedly succeeded" + echo " Either USER agnes is no longer enforced, or uid pin changed." + exit 1 + fi + echo "OK: write correctly fails — operator chown is required" + + - name: Apply startup-script chown (uid:gid 999:999) + run: sudo chown -R 999:999 /tmp/agnes-data + + - name: Boot stack with bind-mounted /data + run smoke + run: | + touch .env + export AGNES_TAG="${{ needs.build-and-push.outputs.image_tag }}" + # Override the `data` volume to bind-mount /tmp/agnes-data, mirroring + # the production host-mount.yml overlay shape. + cat > docker-compose.bind-test.yml <<'EOF' + volumes: + data: + driver: local + driver_opts: + type: none + o: bind,rbind + device: /tmp/agnes-data + EOF + docker compose \ + -f docker-compose.yml \ + -f docker-compose.prod.yml \ + -f docker-compose.ci.yml \ + -f docker-compose.bind-test.yml \ + up -d app + timeout 60 bash -c 'until curl -sf http://localhost:8000/api/health | python3 -c "import sys,json; d=json.load(sys.stdin); sys.exit(0 if d[\"status\"]!=\"unhealthy\" else 1)"; do sleep 3; done' + bash scripts/smoke-test.sh http://localhost:8000 + + - name: Collect logs on failure + if: failure() + run: | + docker compose \ + -f docker-compose.yml -f docker-compose.prod.yml \ + -f docker-compose.ci.yml -f docker-compose.bind-test.yml \ + logs > bind-mount-logs.txt 2>&1 || true + ls -la /tmp/agnes-data /tmp/agnes-data/state 2>&1 | tee -a bind-mount-logs.txt + + - name: Upload logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: e2e-bind-mount-logs + path: bind-mount-logs.txt + + - name: Teardown + if: always() + run: | + docker compose \ + -f docker-compose.yml -f docker-compose.prod.yml \ + -f docker-compose.ci.yml -f docker-compose.bind-test.yml \ + down -v || true + sudo rm -rf /tmp/agnes-data || true diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..b7138c8 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,26 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: detect-private-key + - id: check-yaml + - id: check-json + - id: check-merge-conflict + - id: end-of-file-fixer + - id: trailing-whitespace + - id: check-added-large-files + args: ['--maxkb=500'] + + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.11.7 + hooks: + - id: ruff + args: ['--fix'] + - id: ruff-format + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.15.0 + hooks: + - id: mypy + args: ['--ignore-missing-imports'] + additional_dependencies: [] diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 0a163d9..cb04568 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -3,112 +3,165 @@ ## System Overview ``` -Data Source (Keboola / CSV / BigQuery) - | - v -+------------------------------------------+ -| Data Broker Server | -| | -| src/data_sync.py | -| -> connectors/*.py (fetch data) | -| -> src/parquet_manager.py (convert) | -| | -| /data/src_data/parquet/ (output) | -| /data/docs/ (synced docs) | -| /data/scripts/ (helpers) | -+------------------------------------------+ - | rsync over SSH - v -+------------------------------------------+ -| Analyst Machine | -| | -| server/parquet/ -> DuckDB views | -| user/duckdb/analytics.duckdb | -| Claude Code queries DuckDB via SQL | -+------------------------------------------+ +┌──────────────┐ ┌──────────────┐ ┌──────────────┐ +│ Keboola │ │ BigQuery │ │ Jira │ +│ extractor │ │ extractor │ │ webhooks │ +│ (DuckDB ext) │ │ (remote BQ) │ │ (incremental)│ +└──────┬───────┘ └──────┬───────┘ └──────┬───────┘ + │ │ │ + ▼ ▼ ▼ + extract.duckdb extract.duckdb extract.duckdb + + data/*.parquet (views → BQ) + data/*.parquet + │ │ │ + └─────────────────┼─────────────────┘ + ▼ + SyncOrchestrator.rebuild() + ATTACH → master views in analytics.duckdb + │ + ┌──────────┼──────────┐ + ▼ ▼ ▼ + FastAPI CLI + (serve) (da sync) ``` +Three source types: +- **Batch pull** (Keboola): DuckDB extension downloads to parquet, scheduled +- **Remote attach** (BigQuery): DuckDB BQ extension, no download, queries go to BQ +- **Real-time push** (Jira): Webhooks update parquets incrementally + ## Components -### 1. Data Sync Engine (`src/`) +### 1. Core Engine (`src/`) -Pulls data from configured source, converts to Parquet. +DuckDB-backed data orchestration and state management. | File | Role | |------|------| -| `src/data_sync.py` | Orchestration + `DataSource` ABC (line 149) | -| `connectors/keboola/adapter.py` | Keboola data source | -| `connectors/keboola/client.py` | Low-level Keboola API client | -| `src/parquet_manager.py` | CSV -> typed Parquet conversion | -| `src/config.py` | Reads `data_description.md` for table definitions | +| `src/db.py` | DuckDB schema (system.duckdb v14, analytics.duckdb), auto-migration v1→…→v14 | +| `src/orchestrator.py` | SyncOrchestrator — ATTACHes extract.duckdb files, rebuilds master views | +| `src/orchestrator_security.py` | Extension allowlist, token-env validation, SQL string escaping | +| `src/identifier_validation.py` | Shared regex validators for SQL identifiers (used by orchestrator + extractors) | +| `src/remote_query.py` | RemoteQueryEngine — hybrid queries joining local + BigQuery data | +| `src/repositories/` | DuckDB-backed CRUD (sync_state, table_registry, users, knowledge, etc.) | | `src/profiler.py` | Data profiling for catalog UI | +| `src/catalog_export.py` | OpenMetadata catalog export | +| `src/scheduler.py` | Schedule parsing (`every 15m`, `daily 03:00`) and `is_table_due()` | +| `src/rbac.py` | Dataset-access helpers (`can_access_table`, `get_accessible_tables`) | +| `src/marketplace.py` | Marketplace git-clone/sync + plugin manifest parsing | +| `src/marketplace_filter.py` | RBAC-filtered plugin resolution for ZIP/git channels | -### 2. Web Application (`webapp/`) +### 2. FastAPI Application (`app/`) -Flask app for user onboarding, settings, and data catalog. +Unified web server for UI + REST API. + +| File/Dir | Role | +|----------|------| +| `app/main.py` | FastAPI app setup, router registration, startup hooks | +| `app/api/` | REST API endpoints (sync, data, catalog, admin, auth, query, memory, etc.) | +| `app/auth/` | Authentication — router, dependencies, PAT resolver, group sync | +| `app/auth/providers/` | Auth providers: Google OAuth, email magic link, password | +| `app/web/` | HTML dashboard routes + Jinja2 templates | +| `app/resource_types.py` | `ResourceType` StrEnum + `RESOURCE_TYPES` registry for RBAC | + +### 3. Connectors (`connectors/`) + +Each connector produces an `extract.duckdb` following a standard contract. + +| Directory | Source Type | Mechanism | +|-----------|-------------|-----------| +| `connectors/keboola/` | Batch pull | DuckDB Keboola extension → parquet files | +| `connectors/bigquery/` | Remote attach | DuckDB BQ extension → views to BigQuery | +| `connectors/jira/` | Real-time push | Webhooks → incremental parquet updates | +| `connectors/openmetadata/` | Catalog | httpx client to OpenMetadata API | +| `connectors/llm/` | LLM routing | OpenAI-compatible API client | + +#### extract.duckdb Contract + +Every connector outputs to `/data/extracts/{source_name}/`: + +``` +/data/extracts/{source_name}/ +├── extract.duckdb ← _meta table + views +└── data/ ← parquet files (local sources only) +``` + +The `_meta` table (required): +```sql +CREATE TABLE _meta ( + table_name VARCHAR, + description VARCHAR, + rows INTEGER, + size_bytes INTEGER, + extracted_at TIMESTAMP, + query_mode VARCHAR -- 'local' or 'remote' +); +``` + +Remote tables (`query_mode='remote'`) must also include `_remote_attach`: +```sql +CREATE TABLE _remote_attach ( + alias VARCHAR, -- DuckDB alias used in views, e.g. 'kbc' + extension VARCHAR, -- Extension name, e.g. 'keboola' + url VARCHAR, -- Connection URL + token_env VARCHAR -- Env-var name holding the auth token (NOT the token itself) +); +``` + +The SyncOrchestrator scans `/data/extracts/*/extract.duckdb`, ATTACHes each into the master `analytics.duckdb`, and creates views. For remote tables, it reads `_remote_attach`, installs/loads the extension, reads the token from the environment, and ATTACHes the external source. + +### 4. CLI (`cli/`) + +Command-line tool `da` for sync, query, and admin operations. + +| Command | Role | +|---------|------| +| `da sync` | Trigger data sync | +| `da query` | Run SQL against analytics.duckdb | +| `da admin group *` | Manage user groups | +| `da admin grant *` | Manage resource grants | +| `da admin register-table` | Register tables in table_registry | +| `da admin break-glass ` | Emergency admin access recovery | +| `da tokens *` | Manage personal access tokens | +| `da metrics *` | Business metric definitions | +| `da skills *` | List/show bundled skills | + +### 5. Authentication (`app/auth/`) + +FastAPI-based auth with pluggable providers. | File | Role | |------|------| -| `webapp/app.py` | Flask entry point, routes | -| `webapp/config.py` | Loads `instance.yaml`, exposes `Config` to templates | -| `webapp/account_service.py` | User account details, sync status | -| `webapp/templates/` | Jinja2 templates (dashboard, setup, catalog) | +| `app/auth/router.py` | Auth routes (login, callback, bootstrap, token) | +| `app/auth/providers/google.py` | Google OAuth + Workspace group sync | +| `app/auth/providers/email.py` | Email magic link (atomic compare-and-swap consumption) | +| `app/auth/providers/password.py` | Password login + reset (with audit logging) | +| `app/auth/pat_resolver.py` | Personal Access Token validation (hash, expiry, revocation, IP audit) | +| `app/auth/access.py` | Authorization: `require_admin`, `require_resource_access` | +| `app/auth/group_sync.py` | `fetch_user_groups()` — Cloud Identity API client | +| `app/auth/dependencies.py` | `get_current_user` FastAPI dependency | +| `app/auth/jwt.py` | Desktop JWT auth (API-only) | -### 3. Configuration (`config/`) +### 6. Standalone Services (`services/`) -| File | Role | -|------|------| -| `config/instance.yaml` | Main instance config (not committed) | -| `config/instance.yaml.example` | Template with all options | -| `config/loader.py` | YAML loader with `${ENV_VAR}` interpolation | -| `config/.env.template` | Secret variable placeholders | -| `docs/data_description.md` | Table schemas + sync strategies (not committed) | - -### 4. Auth Providers (`auth/`) - -Pluggable authentication via auto-discovered providers. - -| File | Role | -|------|------| -| `auth/__init__.py` | `AuthProvider` ABC + `discover_providers()` scanner | -| `auth/google/provider.py` | Google OAuth (extracted from webapp/auth.py) | -| `auth/password/provider.py` | Email/password (delegates to webapp/password_auth) | -| `auth/desktop/provider.py` | Desktop JWT auth (API-only, hidden from login page) | - -To add a new provider: create `auth//provider.py` implementing `AuthProvider`, export a `provider` instance. No core changes needed. - -### 5. Standalone Services (`services/`) - -Self-contained services with own systemd units, auto-discovered by `deploy.sh`. +Self-contained services with own `__main__.py`, run via Docker Compose profiles. | Directory | Role | |-----------|------| -| `services/telegram_bot/` | Telegram notification bot + dispatch | +| `services/scheduler/` | Cron-like job runner (data-refresh, health-check, marketplaces) | +| `services/telegram_bot/` | Telegram notification bot + dispatch (opt-in, `--profile full`) | | `services/ws_gateway/` | WebSocket gateway for desktop app | | `services/corporate_memory/` | AI knowledge aggregation from analyst sessions | | `services/session_collector/` | Claude Code session metadata collector | -### 6. Server Infrastructure (`server/`) - -Deployment only -- no application code. +### 7. Configuration (`config/`) | File | Role | |------|------| -| `server/setup.sh` | Initial server provisioning (groups, users, dirs) | -| `server/webapp-setup.sh` | Nginx, SSL, systemd for webapp | -| `server/deploy.sh` | CI/CD deployment (auto-discovers `services/*/systemd/*`) | -| `server/sudoers-deploy` | Least-privilege sudo rules for deploy user | -| `server/sudoers-webapp` | Sudo rules for www-data (webapp) | -| `server/bin/` | Management scripts (add-analyst, list-analysts, etc.) | +| `config/instance.yaml.example` | Template with all options | +| `config/loader.py` | YAML loader with `${ENV_VAR}` interpolation + required-field validation | +| `config/.env.template` | Secret variable placeholders | -### 7. Analyst Scripts (`scripts/`) - -Helper scripts synced to analyst machines. - -| File | Role | -|------|------| -| `scripts/sync_data.sh` | Sync data from server via rsync | -| `scripts/setup_views.sh` | Create DuckDB views over Parquet files | +Table definitions are stored in DuckDB `table_registry` table (not in config files). ## Config Loading Chain @@ -116,45 +169,45 @@ Helper scripts synced to analyst machines. config/instance.yaml | (loaded by config/loader.py) | (${ENV_VAR} references resolved from .env / environment) + | (required fields validated: instance.name, auth.allowed_domain, server.host, server.hostname) v -webapp/config.py - | (_load_instance_config at module level) - | (_get(config, *keys) for safe nested access) +app/instance_config.py + | (get_value() for safe nested access) v -inject_config() context processor - | (exposes Config object to all Jinja templates) - v -{{ config.INSTANCE_NAME }} in templates +FastAPI app + templates ``` ## Data Flow ``` -1. Admin defines tables in docs/data_description.md -2. src/config.py parses YAML blocks from markdown -3. src/data_sync.py iterates tables, calls adapter -4. Adapter fetches CSV/JSON from source API -5. src/parquet_manager.py converts to typed Parquet -6. Parquet files stored in /data/src_data/parquet/ -7. Analyst runs scripts/sync_data.sh (rsync over SSH) -8. scripts/setup_views.sh creates DuckDB views -9. Claude Code queries DuckDB, returns insights +1. Admin registers tables via /api/admin/register-table or web UI +2. Table metadata stored in DuckDB table_registry (system.duckdb) +3. Scheduler triggers data-refresh (default every 15m) +4. POST /api/sync/trigger invokes each connector's extractor +5. Extractor produces extract.duckdb + parquet files (local) or remote views +6. SyncOrchestrator.rebuild() ATTACHes extract.duckdb files into analytics.duckdb +7. FastAPI serves data via /api/data/{table_id}/download and /api/query +8. Claude Code queries analytics.duckdb via SQL for analysis ``` ## Security Model -- **Groups**: `data-ops` (admins), `dataread` (analysts), `data-private` (privileged) -- **Sudoers**: Explicit command whitelisting (no wildcards) -- **SSH**: Key-based auth only, keys registered via webapp -- **OAuth**: Google domain restriction via `auth.allowed_domain` -- **Secrets**: `${ENV_VAR}` in YAML, actual values in `.env` (gitignored) -- **Staging**: `/tmp/data_analyst_staging` with setgid for group ownership +- **Authentication**: Google OAuth, email magic link, password, PAT, desktop JWT +- **Authorization**: Two-layer RBAC — Admin user-group (god mode) + resource-level grants +- **Session cookies**: Signed via Starlette SessionMiddleware (secret from `SESSION_SECRET`) +- **Bootstrap**: `SEED_ADMIN_EMAIL` env var seeds first admin at deploy time +- **Identifier validation**: Shared regex validators prevent SQL injection in table/connector names +- **Orchestrator hardening**: Extension allowlist, token-env validation, SQL string escaping +- **SSRF protection**: `_validate_url_not_private()` on admin configure endpoint +- **Container**: Runs as non-root user `agnes`; Docker resource limits enforced +- **TLS**: Caddy reverse proxy with security headers (X-Frame-Options, X-Content-Type-Options, Referrer-Policy) +- **Secrets**: `${ENV_VAR}` in YAML, actual values in `.env` (gitignored); PATs stored as hashes ## Key Patterns -- **Connector pattern**: Dynamic connector registry in `src/data_sync.py`, `connectors/keboola/` for reference -- **Auth provider pattern**: Auto-discovered from `auth/*/provider.py`, each implements `AuthProvider` ABC -- **Service pattern**: Self-contained modules in `services/` with own `__main__.py` and `systemd/` directory -- **Atomic writes**: `tempfile.mkstemp()` + `os.fchmod()` + `os.replace()` for JSON state files -- **User home writes**: `sudo install -o {user} -g {user}` for writing to analyst home dirs -- **Config interpolation**: `${ENV_VAR}` in YAML resolved at load time, missing vars logged as warnings +- **Connector pattern**: `connectors/{name}/extractor.py` produces `extract.duckdb` following the `_meta` + `_remote_attach` contract. Orchestrator auto-discovers and ATTACHes. +- **Auth provider pattern**: `app/auth/providers/{name}.py` — Google, email, password. Router dispatches based on instance config. +- **Repository pattern**: `src/repositories/{domain}.py` — DuckDB-backed CRUD with parameterized queries and `ALLOWED_FIELDS` allowlists. +- **Resource type pattern**: `app/resource_types.py` — `ResourceType` StrEnum + `ResourceTypeSpec` registry. Adding a new type = one enum member + one `list_blocks` delegate + one spec entry. No DB migration. +- **Atomic token consumption**: Compare-and-swap with `CONSUMED:` marker prevents race conditions on one-shot tokens (magic links, password resets). +- **Config interpolation**: `${ENV_VAR}` in YAML resolved at load time, missing vars logged as warnings. diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b51809..7546a50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,17 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ### Fixed +- **Non-root container couldn't write to host-bind-mounted `/data` after the v0.12.1 USER-agnes flip.** `infra/modules/customer-instance/startup-script.sh.tpl` now `chown -R 999:999 /data` after creating the persistent-disk subdirs (`state`, `analytics`, `extracts`). Without this, a freshly-attached PD is root-owned by default and `USER agnes` (uid 999) cannot open `/data/state/system.duckdb` for write — every authed request 500s with `IOException: Cannot open file ... Permission denied` while `/api/health` (which doesn't open the system DB) keeps returning 200, masking the failure from health-only monitoring. Regression first observed on `agnes-development` on 2026-04-29 after the auto-upgrade picked up `:stable` from the 0.12.1 release. **Existing VMs with PD-backed `/data` need a one-time host-side `sudo chown -R 999:999 /var/lib/docker/volumes/agnes_data/_data && sudo docker restart agnes-app-1 agnes-scheduler-1` to recover** — Terraform `metadata_startup_script` only runs on boot, so an apply alone does not retro-fix running VMs. +- `Dockerfile` pins the `agnes` user to `uid:gid 999:999` explicitly (`useradd --uid 999`). Previously the uid was whatever Debian's `useradd --system` assigned next — happened to be 999 today, but a future base-image change picking 998 or 1000 would silently desync from the startup-script's `chown 999:999`, reintroducing the same incident. Pinning makes the contract grep-able from both sides. +- `scripts/smoke-test.sh` no longer silently SKIPs every authed check when `bootstrap` returns 403 (users exist) and `SMOKE_TOKEN` is not set — it now FAILs loudly. Also adds an unauthenticated DB-touching probe (`POST /auth/email/request`) before bootstrap, since `/api/health` deliberately doesn't open `system.duckdb` (kept cheap for LB probes) and so cannot detect filesystem/permission issues. The new probe catches the foundryai-development class of regression even on instances where bootstrap is closed. - Corporate memory pages (`/corporate-memory`, `/corporate-memory/admin`) now render the shared app header at full viewport width, matching the dashboard. Previously the `_app_header.html` include sat inside `.container-memory` (max-width: 1000px) and was cropped on wide viewports. - `release.yml` now publishes a `:dev-` + `:dev--latest` image when a fresh branch is pushed off `main` with no extra commits. Pre-fix, `paths-ignore` on the `push` event diffed the new ref against the default branch — a same-SHA branch had zero diff, every file matched paths-ignore, and the workflow was skipped, so a developer creating a personal branch off main to deploy main's exact state to their dev VM (which pins to `:dev--latest`) had to either commit something or trigger the workflow manually. The `build-and-push` job's `if` was also tightened to `main || workflow_dispatch` only, which prevented branch-push images regardless. Both fixed: added `create:` trigger (filtered to branch refs at the job level so tag creates don't double-build with `keboola-deploy.yml`), and broadened `build-and-push.if` to also publish on non-main branch pushes / branch creates. - Web header admin nav (All tokens, Marketplaces, Admin → Users / Groups / Resource access / Server config) is now visible to admin users again. Pre-fix, `_app_header.html` gated the admin block on `session.user.role == 'admin'`, but the v13 RBAC migration nulled `users.role` and moved admin authority onto `user_group_members` (Admin system group) — so the gate evaluated to false for everyone, including actual admins. `get_current_user` now injects `user["is_admin"]` (computed via `app.auth.access.is_user_admin`, the same call all server-side admin gates use), and the header reads `session.user.is_admin`. The role badge in the user-menu dropdown now reads "Admin" or hides — `users.role` is no longer surfaced in the UI. +### Internal + +- `release.yml` adds an `e2e-bind-mount` job that boots the freshly built image against a host-bind-mounted `/data` directory (instead of the named volume the existing `smoke-test` job uses). Docker initializes a fresh named volume by copying from the image's `/data` — which the Dockerfile chowns to `agnes:agnes` before flipping USER — so the named-volume path always works. The bind-mount path mirrors what GCE VMs run via `docker-compose.host-mount.yml`, and includes a negative assertion (write must fail on root-owned `/data` before the operator chown) plus a positive assertion (smoke passes after the chown). Locks in the contract that broke `agnes-development`: removing `chown 999:999` from `startup-script.sh.tpl` or changing the Dockerfile uid pin breaks CI. + ## [0.15.0] — 2026-04-29 ### Added @@ -66,6 +73,62 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C - `POST /api/admin/configure` now uses the same narrow-overlay write strategy as the new server-config editor: it reads the overlay verbatim (no static fallback), patches only `instance` / `auth` / `data_source`, and writes atomically via tmp + `os.replace`. Pre-fix it seeded `existing` from the env-resolved merged config when no overlay file was present and dumped the whole thing back, persisting cleartext `${ENV_VAR}` values (e.g. `smtp_password`) into the writable overlay even though the wizard never touched those sections. Issue #91. - `POST /api/admin/server-config` now strips redaction sentinels (`***` / ``) out of every secret-keyed leaf in the incoming patch before the deep-merge. The companion GET endpoint masks secret-keyed children inside nested objects (e.g. `data_source.keboola.token_env`), and the form renders those nested objects as JSON textareas — without the scrub, a no-op save would round-trip the masked JSON back and overwrite the real overlay value (`token_env: "KEBOOLA_STORAGE_TOKEN"` → `"***"`), silently breaking the next sync. Defense-in-depth on both sides: the client form scrubs before posting, and the server scrubs before merge so an API caller (CLI / script) can't corrupt secrets either. Issue #91. +## [0.16.0] — 2026-04-29 + +Minor release. Comprehensive deploy safety audit — CI/CD pipeline hardening, 50+ new tests covering previously untested failure modes, DB schema health check, config versioning, and BigQuery ATTACH error resilience. Built on top of v0.15.0 / `2e1dfb7`. + +PR: [#120](https://github.com/keboola/agnes-the-ai-analyst/pull/120) (ci/deploy-safety-audit). + +### Added + +- **ruff lint + mypy type check** in `release.yml` and `keboola-deploy.yml` CI workflows (both `continue-on-error: true` initially — 257 pre-existing ruff errors, mypy has pre-existing issues; neither blocks CI yet). +- **Automatic rollback** on smoke test failure in `release.yml` — tags the broken image as `:deprecated-`, reverts `:stable` to the previous good tag, opens a GitHub issue for investigation. +- **Smoke test in `keboola-deploy.yml`** — was completely missing; now runs the same `smoke-test.sh` as `release.yml`. +- **Expanded smoke-test.sh** — added `/api/catalog`, `/api/admin/tables`, `/marketplace.zip`, `/api/metrics` endpoint checks beyond the original `/api/health`. +- **Post-deploy smoke test** (`scripts/ops/post-deploy-smoke-test.sh`) — validates health, DB schema version, query endpoint, catalog, and marketplace on a prod VM after deploy. +- **DB schema version check** in `/api/health` — returns `db_schema: "ok" | "mismatch" | "unreachable"`; overall status becomes `"unhealthy"` on schema mismatch. Lets load balancers and monitoring detect half-migrated instances. +- **Config versioning** — `config_version: 1` in `instance.yaml`, validated at startup by `_validate_config_version()` in `config/loader.py`. Prevents silent misconfiguration when the config schema evolves. +- **`.github/settings.yml`** — required status checks on `main` branch. +- **`.github/dependabot.yml`** — weekly pip + github-actions dependency updates. +- **`.github/CODEOWNERS`** — default `@keboola/agnes-team`, special owners for `/infra/`, `/app/auth/`, `src/db.py`. +- **`.pre-commit-config.yaml`** — detect-private-key, check-yaml/json/merge-conflict, ruff, mypy. +- **`[tool.ruff]`** config in `pyproject.toml` — `line-length = 120`, `target-version = "py313"`. + +### Test Coverage (~50 new tests) + +- **v13→v14 migration** (`test_db.py`): orphan cleanup, FK constraints, rollback on failure. +- **Email magic link TTL** (`test_auth_providers.py`): expired token, token reuse, wrong token. +- **PAT** (`test_pat.py`): malformed JWT, empty bearer, `last_used_ip` tracking. +- **Marketplace ZIP** (`test_marketplace_server_zip.py`): ETag/304, PAT auth, content-addressed caching, `invalidate_etag_cache()` on mutation. +- **Marketplace Git** (`test_marketplace_server_git.py`): smart HTTP, Basic auth with PAT, RBAC filtering. +- **Jira webhooks** (`test_jira_webhooks.py`): HMAC validation, missing signature, malformed JSON (10 tests). +- **Hybrid Query BQ** (`test_remote_query.py`): `register_bq`, JOIN local+BQ, error handling (12 tests). +- **Keboola extractor** (`test_keboola_extractor.py`): crash, partial write, timeout, extension fallback (9 tests). +- **BigQuery extractor** (`test_bigquery_extractor.py`): corrupted DB, partial write, atomic swap, ATTACH timeout (6 tests). +- **Orchestrator** (`test_orchestrator.py`): corrupted extract.duckdb, empty `_meta`, mid-write, unsafe identifiers (5 tests). + +### Fixed + +- **BigQuery extractor ATTACH error handling** — `init_extract()` now catches exceptions on `INSTALL`/`ATTACH` and records them in `stats["errors"]` instead of propagating up. A network timeout or auth failure no longer crashes the extractor; all configured tables are marked as skipped. +- **ETag cache invalidation on disk mutation** — `invalidate_etag_cache()` is the documented way to force re-hash after marketplace sync. Tests now call it after mutating on-disk content. + +### Internal + +- `fetch-depth: 0` + `fetch-tags: true` in `release.yml` for rollback tag resolution. +- Docs updated: `ARCHITECTURE.md`, `docs/DATA_SOURCES.md`, `docs/QUICKSTART.md`, `docs/RBAC.md`, `docs/auth-groups.md`. + +## [0.15.0] — 2026-04-29 + +Minor release. Adds corporate memory v1+v1.5 and /me/debug self-only auth diagnostic. See [GitHub release](https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.15.0) for full notes. + +## [0.14.0] — 2026-04-28 + +Minor release. Replaces BigQuery wrap-view pattern with Claude-driven fetch primitives. See [GitHub release](https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.14.0) for full notes. + +## [0.13.0] — 2026-04-28 + +Minor release. Admin server-config editor + Windows PowerShell wrapper. See [GitHub release](https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.13.0) for full notes. + ## [0.12.1] — 2026-04-28 Patch release. Hotfixes the pre-migration snapshot-integrity bug shipped in [v0.12.0](https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.12.0) and bundles the security/ops hardening from issue groups #82 (auth hardening), #85 (API validation), #87 (deploy posture), plus #46 (SSRF) and #90 (memory stats blocking). @@ -660,6 +723,14 @@ First tagged semver release. The `version = "2.x"` strings that appeared in earl - Test suite expanded to 1357+ tests (4 layers — unit, integration, web smoke, journey). +[0.16.0]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.16.0 +[0.15.0]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.15.0 +[0.14.0]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.14.0 +[0.13.0]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.13.0 +[0.12.1]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.12.1 +[0.12.0]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.12.0 +[0.11.5]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.5 +[0.11.4]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.4 [0.11.3]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.3 [0.11.2]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.2 [0.11.1]: https://github.com/keboola/agnes-the-ai-analyst/releases/tag/v0.11.1 diff --git a/Dockerfile b/Dockerfile index 2093242..249a7d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,8 +23,11 @@ RUN uv build --wheel --out-dir /app/dist # Install production dependencies from pyproject.toml RUN uv pip install --system --no-cache . -# Run as non-root user for container hardening (C13) -RUN useradd --system --create-home --shell /usr/sbin/nologin agnes && \ +# Run as non-root user for container hardening (C13). +# uid/gid pinned to 999 so host-side chown in startup-script.sh.tpl can match +# without parsing /etc/passwd inside the image. Changing this number breaks +# every existing PD-backed deploy until the operator re-chowns /data. +RUN useradd --system --uid 999 --create-home --shell /usr/sbin/nologin agnes && \ mkdir -p /data && chown -R agnes:agnes /data && \ chown -R agnes:agnes /app USER agnes diff --git a/README.md b/README.md index b5d98e5..d1be2f4 100644 --- a/README.md +++ b/README.md @@ -141,7 +141,7 @@ See `config/instance.yaml.example` for all available options. - [Onboarding Guide](docs/ONBOARDING.md) — end-to-end Terraform deployment into a GCP project (recommended for production) - [Deployment Guide](docs/DEPLOYMENT.md) — chooses between Terraform and Docker Compose; covers OSS self-host - [Configuration Reference](docs/CONFIGURATION.md) — `instance.yaml`, env vars, per-instance options -- [Architecture](docs/architecture.md) — orchestrator, extractors, DB layout +- [Architecture](ARCHITECTURE.md) — orchestrator, extractors, DB layout - [Quickstart](docs/QUICKSTART.md) — local development ## Contributing diff --git a/app/api/health.py b/app/api/health.py index 529ac44..4500979 100644 --- a/app/api/health.py +++ b/app/api/health.py @@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends import duckdb from app.auth.dependencies import _get_db, get_current_user -from src.db import SCHEMA_VERSION +from src.db import SCHEMA_VERSION, get_system_db from src.repositories.sync_state import SyncStateRepository router = APIRouter(tags=["health"]) @@ -18,10 +18,35 @@ router = APIRouter(tags=["health"]) _DEPLOYED_AT = datetime.now(timezone.utc).isoformat() +def _check_db_schema() -> dict: + """Check DB schema version against expected SCHEMA_VERSION. + + Returns a dict with 'db_schema' key and optional 'detail' key. + """ + try: + conn = get_system_db() + row = conn.execute( + "SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1" + ).fetchone() + if row is None: + return {"db_schema": "mismatch", "detail": "no schema_version row found"} + current_version = row[0] + if current_version == SCHEMA_VERSION: + return {"db_schema": "ok", "current": current_version, "expected": SCHEMA_VERSION} + else: + return {"db_schema": "mismatch", "current": current_version, "expected": SCHEMA_VERSION} + except Exception as e: + return {"db_schema": "unreachable", "detail": str(e)} + + @router.get("/api/health") async def health_check(): """Minimal health check for load balancers / compose healthcheck. No auth required.""" - return {"status": "ok"} + schema_check = _check_db_schema() + status = "ok" + if schema_check["db_schema"] != "ok": + status = "unhealthy" + return {"status": status, **schema_check} @router.get("/api/health/detailed") @@ -39,6 +64,9 @@ async def health_check_detailed( except Exception as e: checks["duckdb_state"] = {"status": "error", "detail": str(e)} + # DB schema version check + checks["db_schema"] = _check_db_schema() + # Sync state summary try: repo = SyncStateRepository(conn) @@ -82,6 +110,9 @@ async def health_check_detailed( break if check.get("status") == "warning": overall = "degraded" + # DB schema mismatch or unreachable also makes the overall status unhealthy + if checks.get("db_schema", {}).get("db_schema") != "ok": + overall = "unhealthy" return { "status": overall, diff --git a/config/instance.yaml.example b/config/instance.yaml.example index 6948322..d0623a3 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -6,6 +6,11 @@ # SECRET VALUES use ${ENV_VAR} syntax - actual values go in .env file. # Non-secret values are set directly here. +# --- Config version --- +# Incremented when the config schema changes. Must match SUPPORTED_CONFIG_VERSIONS +# in config/loader.py. Currently only version 1 is supported. +config_version: 1 + # --- Instance branding --- instance: name: "AI Data Analyst" diff --git a/config/loader.py b/config/loader.py index f542a52..9aae815 100644 --- a/config/loader.py +++ b/config/loader.py @@ -23,6 +23,8 @@ CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "./config")) _ENV_PATTERN = re.compile(r"\$\{([^}]+)\}") +SUPPORTED_CONFIG_VERSIONS = {1} + def _resolve_env_refs(value: Any, _path: str = "") -> Any: """Resolve ${ENV_VAR} references in config values. @@ -64,6 +66,31 @@ def _resolve_env_refs(value: Any, _path: str = "") -> Any: return value +def _validate_config_version(config: dict) -> None: + """Validate config_version field in the loaded config. + + Reads config_version from the config dict. If missing, logs a warning + and defaults to 0. If the version is not in SUPPORTED_CONFIG_VERSIONS, + logs a warning but does NOT raise — existing deployments without the + field must not crash on upgrade. The operator is nudged to add the + field via the warning message. + """ + version = config.get("config_version") + if version is None: + logger.warning( + "config_version not set in instance.yaml; defaulting to 0. " + "Add config_version: 1 to your config for forward compatibility." + ) + version = 0 + if version not in SUPPORTED_CONFIG_VERSIONS: + logger.warning( + "Unsupported config_version: %s. Supported versions: %s. " + "Update your instance.yaml config_version field.", + version, + sorted(SUPPORTED_CONFIG_VERSIONS), + ) + + def load_instance_config() -> dict[str, Any]: """Load instance configuration from instance.yaml. @@ -95,6 +122,7 @@ def load_instance_config() -> dict[str, Any]: raise ValueError("instance.yaml is empty") config = _resolve_env_refs(config) + _validate_config_version(config) _validate_config(config) logger.info("Instance config loaded from %s", path) return config diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index e8132fe..a286fae 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -132,16 +132,29 @@ def init_extract( conn = duckdb.connect(str(tmp_db_path)) try: - conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") - # session-scoped DuckDB secret with the metadata token - escaped_token = token.replace("'", "''") - conn.execute( - f"CREATE SECRET bq_session (TYPE bigquery, ACCESS_TOKEN '{escaped_token}')" - ) - conn.execute( - f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)" - ) - logger.info("Attached BigQuery project: %s", project_id) + # Install and load BigQuery extension + try: + conn.execute("INSTALL bigquery FROM community; LOAD bigquery;") + # session-scoped DuckDB secret with the metadata token + escaped_token = token.replace("'", "''") + conn.execute( + f"CREATE SECRET bq_session (TYPE bigquery, ACCESS_TOKEN '{escaped_token}')" + ) + conn.execute( + f"ATTACH 'project={project_id}' AS bq (TYPE bigquery, READ_ONLY)" + ) + logger.info("Attached BigQuery project: %s", project_id) + except Exception as attach_err: + logger.error("Failed to attach BigQuery project %s: %s", project_id, attach_err) + stats["errors"].append( + {"table": "*", "error": f"BigQuery ATTACH failed: {attach_err}"} + ) + # No tables can be registered without a working connection + for tc in table_configs: + stats["errors"].append( + {"table": tc["name"], "error": "skipped: BigQuery ATTACH failed"} + ) + return stats _create_meta_table(conn) _create_remote_attach_table(conn, project_id) diff --git a/docs/DATA_SOURCES.md b/docs/DATA_SOURCES.md index 0e663c6..00850ad 100644 --- a/docs/DATA_SOURCES.md +++ b/docs/DATA_SOURCES.md @@ -2,21 +2,32 @@ ## Overview -AI Data Analyst uses a pluggable adapter system for data sources. Configure the adapter type in `config/instance.yaml`: +AI Data Analyst uses a connector system where each connector produces an `extract.duckdb` following a standard contract. The SyncOrchestrator auto-discovers and ATTACHes these into the master `analytics.duckdb`. + +Configure the data source type in `config/instance.yaml`: ```yaml data_source: - type: "keboola" # Options: keboola, csv, bigquery (future) + type: "keboola" # Options: keboola, bigquery ``` -## Keboola Adapter +Table definitions are stored in the DuckDB `table_registry` table (not in config files). Register tables via the admin API, CLI, or web UI. -Syncs tables from Keboola Storage API. +## Query Modes + +Each table has a `query_mode` that determines how data is accessed: + +- **`local`**: Data is downloaded to parquet files on the Agnes server. Suitable for tables that fit in local storage. +- **`remote`**: Data stays in the external source; DuckDB extension ATTACHes at query time. Suitable for large tables where only query results are transferred. + +## Keboola Connector + +Syncs tables from Keboola Storage API using the DuckDB Keboola extension. ### Requirements -- `kbcstorage` Python package (included in requirements.txt) - Keboola Storage API token with read access +- DuckDB Keboola extension (auto-installed) ### Configuration @@ -25,46 +36,124 @@ In `.env`: KEBOOLA_STORAGE_TOKEN=your-token-here KEBOOLA_STACK_URL=https://connection.your-region.keboola.com KEBOOLA_PROJECT_ID=12345 -DATA_SOURCE=keboola ``` -### Sync Strategies +Or configure via the admin UI (`/admin/tables`) or CLI: +```bash +da admin register-table --source-type keboola --bucket "in.c-crm" --table "company" --query-mode local +``` -Define in `docs/data_description.md`: +### How it works -- **full_refresh**: Downloads entire table each sync -- **incremental**: Downloads only changed rows (using changedSince) -- **partitioned**: Splits data into time-based partitions (month/day/year) +1. The extractor (`connectors/keboola/extractor.py`) uses the DuckDB Keboola extension to download data +2. Produces `extract.duckdb` with `_meta` table + parquet files in `/data/extracts/keboola/data/` +3. The SyncOrchestrator ATTACHes `extract.duckdb` into `analytics.duckdb` and creates views -### Data Description Format +### Identifier validation +All Keboola table names, bucket names, and source table identifiers are validated against `_SAFE_QUOTED_IDENTIFIER` regex before use. Invalid identifiers are skipped with error logging. + +## BigQuery Connector + +Queries BigQuery tables on-demand using the DuckDB BigQuery extension (remote attach). + +### Requirements + +- Google Cloud project with BigQuery access +- Application Default Credentials (ADC) configured + +### Configuration + +In `config/instance.yaml`: ```yaml -folder_mapping: - "in.c-crm": "sales" - "in.c-hr": "hr" - -tables: - - id: "in.c-crm.company" - name: "company" - description: "Company master data from CRM" - primary_key: "id" - sync_strategy: "full_refresh" +bigquery: + project_id: "your-gcp-project" ``` +Or via the admin UI or CLI: +```bash +da admin register-table --source-type bigquery --bucket "dataset" --table "table" --query-mode remote +``` + +### Authentication + +Uses Application Default Credentials (ADC) — the standard Google auth fallback chain: +1. `GOOGLE_APPLICATION_CREDENTIALS` env var (service account key JSON) +2. gcloud user credentials (`gcloud auth application-default login`) +3. GCE metadata server (automatic on Compute Engine) + +No explicit key file configuration needed — ADC handles it. + +### How it works + +1. The extractor (`connectors/bigquery/extractor.py`) creates `extract.duckdb` with remote views +2. `_remote_attach` table tells the orchestrator how to ATTACH the BigQuery extension at query time +3. Queries go directly to BigQuery — no data is downloaded to local storage +4. Identifier validation (`validate_identifier`, `validate_quoted_identifier`) protects against injection + +### Hybrid Queries + +For queries that JOIN local data with BigQuery results: + +```bash +da query --sql "SELECT o.*, t.views FROM orders o JOIN traffic t ON o.date = t.date" \ + --register-bq "traffic=SELECT date, SUM(views) as views FROM dataset.web GROUP BY 1" +``` + +## Jira Connector + +Real-time webhook-based connector that updates parquet files incrementally. + +### How it works + +1. Jira webhooks hit `/api/jira/webhook` endpoint +2. The connector (`connectors/jira/`) processes webhook events and updates parquet files +3. Produces `extract.duckdb` with `_meta` table + incremental parquet data + ## Writing a Custom Connector -Create a new connector module in `connectors//adapter.py`: +Create a new connector in `connectors//extractor.py` that produces the `extract.duckdb` contract: -```python -from src.data_sync import DataSource - -class MyDataSource(DataSource): - def sync_table(self, table_config, sync_state): - # Download data, convert to Parquet - # Return {"success": True, "rows": N, "strategy": "..."} - pass +``` +/data/extracts/{source_name}/ +├── extract.duckdb ← _meta table + views +└── data/ ← parquet files (local sources only) ``` -The `create_data_source()` function in `src/data_sync.py` auto-discovers connectors from the `connectors/` directory. Set `data_source.type` in `config/instance.yaml` to match the connector directory name (e.g., `keboola` for `connectors/keboola/`). +### Required: `_meta` table -See `connectors/keboola/` for a complete reference implementation. +```sql +CREATE TABLE _meta ( + table_name VARCHAR, + description VARCHAR, + rows INTEGER, + size_bytes INTEGER, + extracted_at TIMESTAMP, + query_mode VARCHAR -- 'local' or 'remote' +); +``` + +### Optional: `_remote_attach` table (for remote sources) + +```sql +CREATE TABLE _remote_attach ( + alias VARCHAR, -- DuckDB alias used in views + extension VARCHAR, -- Extension name + url VARCHAR, -- Connection URL + token_env VARCHAR -- Env-var name holding the auth token (NOT the token itself) +); +``` + +### Identifier validation + +Import shared validators from `src/identifier_validation.py`: + +```python +from src.identifier_validation import validate_identifier, validate_quoted_identifier +``` + +Use `validate_identifier()` for strict names (alphanumeric + underscore) and `validate_quoted_identifier()` for names that may contain dots/hyphens (e.g., Keboola-style `in.c-crm.orders`). + +The SyncOrchestrator auto-discovers connectors by scanning `/data/extracts/*/extract.duckdb` — no registration step needed beyond producing the correct output format. + +See `connectors/keboola/` for a complete batch-pull reference implementation, or `connectors/bigquery/` for a remote-attach example. diff --git a/docs/QUICKSTART.md b/docs/QUICKSTART.md index 2ba16c5..ff558aa 100644 --- a/docs/QUICKSTART.md +++ b/docs/QUICKSTART.md @@ -3,8 +3,8 @@ ## Prerequisites - Python 3.10+ -- SSH access to a Linux server (for production deployment) -- Data source credentials (Keboola token, BigQuery service account, etc.) +- Docker + Docker Compose (for production deployment) +- Data source credentials (Keboola token, BigQuery project, etc.) ## Local Development Setup @@ -14,9 +14,10 @@ cd ai-data-analyst ``` -2. Run the initialization script: +2. Create virtual environment and install dependencies: ```bash - bash scripts/init.sh + python3 -m venv .venv && source .venv/bin/activate + uv pip install ".[dev]" ``` 3. Configure your instance: @@ -27,21 +28,42 @@ 4. Set up environment variables: ```bash + cp config/.env.template .env # Edit .env with your data source credentials ``` -5. Register your tables: +5. Register your tables via the admin API or CLI: ```bash - # Tables are registered via the admin API or web UI — no config file needed + # Via CLI + da admin register-table --source-type keboola --bucket "in.c-crm" --table "company" --query-mode local + + # Or start the server and use the web UI at /admin/tables ``` -6. Sync data: +6. Start the FastAPI server: ```bash - source .venv/bin/activate - python -m src.data_sync + uvicorn app.main:app --reload ``` -## Server Deployment +7. Trigger a data sync: + ```bash + curl -X POST http://localhost:8000/api/sync/trigger + # Or: da sync + ``` + +## Docker Deployment + +```bash +# Start app + scheduler +docker compose up + +# Include telegram bot +docker compose --profile full up + +# HTTPS mode — Caddy + corporate-CA certs +docker compose -f docker-compose.yml -f docker-compose.prod.yml -f docker-compose.tls.yml \ + --profile tls up -d +``` See [DEPLOYMENT.md](DEPLOYMENT.md) for full server setup instructions. @@ -53,16 +75,14 @@ Open the project in Claude Code. The CLAUDE.md file will guide the AI assistant 1. Visit your instance URL (e.g., https://data.yourcompany.com) 2. Sign in with your company email -3. Register your SSH key -4. Follow the setup instructions to sync data locally +3. Access data through the API or download parquets for local analysis ### Analysis Workflow -1. Sync latest data: `bash server/scripts/sync_data.sh` +1. Sync latest data: `curl -X POST https://data.yourcompany.com/api/sync/trigger` 2. Open Claude Code in your project directory 3. Ask Claude to analyze your data using DuckDB ## Hackathon See [`HACKATHON.md`](HACKATHON.md) for the deploy-and-develop playbook. Per-developer dev VMs are the supported pattern — point your VM at your branch image with `gcloud compute ssh --command "sudo sed -i 's/^AGNES_TAG=.*/AGNES_TAG=dev-/' /opt/agnes/.env && sudo /usr/local/bin/agnes-auto-upgrade.sh"`. - diff --git a/docs/RBAC.md b/docs/RBAC.md index 6e038ab..f99c670 100644 --- a/docs/RBAC.md +++ b/docs/RBAC.md @@ -1,4 +1,4 @@ -# Access control (v13) +# Access control (v14) Two-layer authorization model: @@ -14,8 +14,8 @@ There is no role hierarchy, no session cache, no implies expansion, no module-au | Table | Purpose | |---|---| | `user_groups` | Named groups. Two rows seeded as `is_system=TRUE`: **Admin** (god mode) and **Everyone** (auto-membership for all users). | -| `user_group_members` | `(user_id, group_id, source)`. `source ∈ {admin, google_sync, system_seed}` so each writer only manipulates its own rows — Google sync's nightly DELETE+INSERT does not clobber admin-added members. | -| `resource_grants` | `(group_id, resource_type, resource_id)`. The grant table the resolver hits when Admin short-circuit doesn't apply. | +| `user_group_members` | `(user_id, group_id, source)`. `source ∈ {admin, google_sync, system_seed}` so each writer only manipulates its own rows — Google sync's nightly DELETE+INSERT does not clobber admin-added members. **v14**: FK constraint on `group_id` referencing `user_groups.id` (cascade delete). | +| `resource_grants` | `(group_id, resource_type, resource_id)`. The grant table the resolver hits when Admin short-circuit doesn't apply. **v14**: FK constraint on `group_id` referencing `user_groups.id` (cascade delete). | `resource_type` is a string from the `app.resource_types.ResourceType` `StrEnum`. `resource_id` is a path string whose format is owned by the registering module — for `marketplace_plugin` it's `/`. @@ -177,3 +177,16 @@ The v12→v13 migration is a single-step hard cutover. The Python helper `_v12_t 7. Drops the `users.groups` JSON column. The legacy `users.role` column is kept NULL'd as an artifact (DuckDB historical FK constraints sometimes block DROP COLUMN; the field carries no semantic meaning post-v13). No dual-write window. Either the schema is on v12 (old code) or v13 (new code). + +--- + +## Schema v14 — FK constraints + +The v13→v14 migration adds DuckDB foreign-key constraints to `user_group_members` and `resource_grants`: + +- `user_group_members.group_id` → `user_groups.id` (ON DELETE CASCADE) +- `resource_grants.group_id` → `user_groups.id` (ON DELETE CASCADE) + +This prevents orphaned member/grant rows pointing at a deleted group. The migration uses RENAME → CREATE-with-FK → INSERT → DROP, wrapped in `BEGIN TRANSACTION` so a partial failure rolls back without leaving the DB at a half-applied schema. + +No semantic changes — v14 is backward compatible with v13 application code. diff --git a/docs/auth-groups.md b/docs/auth-groups.md index ce24711..7a92c41 100644 --- a/docs/auth-groups.md +++ b/docs/auth-groups.md @@ -1,4 +1,4 @@ -# Google Workspace Groups in /profile +# Google Workspace Groups in Agnes How Agnes pulls a user's group memberships at Google sign-in and where they end up. @@ -35,27 +35,29 @@ Switching to `discussion_forum` will silently break for everyone but Workspace a `app/auth/providers/google.py:google_callback` runs on every Google sign-in: -1. Fetch via `_fetch_google_groups(access_token, email)` → list of `{"id": "", "name": ""}`. -2. Write to `request.session["google_groups"]` (Starlette signed-cookie session — per-user, not in DB). -3. Failures (403, 401, network, 4xx) are swallowed and become `[]` so login never breaks. +1. Fetch via `fetch_user_groups(access_token, email)` (in `app/auth/group_sync.py`) → list of `{"id": "", "name": ""}`. +2. Write to `user_group_members` table with `source='google_sync'` (DuckDB-backed, persistent across sessions). +3. The previous Google-sync set is wholesale replaced (DELETE + INSERT for `source='google_sync'` rows) so a removed Workspace membership disappears immediately. +4. Admin-added memberships (`source='admin'`) are preserved — Google sync only touches its own rows. +5. **Fail-soft**: If the Cloud Identity API returns an error (403, 401, network), the callback preserves existing memberships instead of wiping them. This prevents a transient API outage from silently dropping all Workspace-synced group memberships. -Display: `app/web/templates/profile.html` reads `session.google_groups` and renders the list. Empty state explains "Groups are populated when you sign in with Google on a Workspace-enabled tenant." +The `user_group_members` table is the single source of truth for group memberships, used by: +- RBAC authorization (`app/auth/access.py`) — `require_resource_access` checks group grants +- Admin UI (`/admin/access`) — member lists, grant counts +- CLI (`da admin group members`) — group membership queries +- Marketplace filtering (`src/marketplace_filter.py`) — plugin access based on group grants -**Not in DB.** Admin views (e.g. `/admin/users`) can't see other users' groups today — adding a `users.groups` column + persisting on callback is the path forward when that's needed. - -**Refresh.** A user's stale session keeps stale groups. `Logout → sign in again` is the only refresh. +**Refresh.** Memberships are refreshed on every Google sign-in. A user's stale memberships persist until their next login. ## Local-dev mock (no Google round-trip) -When developing on `localhost` with `LOCAL_DEV_MODE=1`, Google OAuth never runs, so `session.google_groups` would normally stay empty and group-aware UI/code paths can't be exercised. Set `LOCAL_DEV_GROUPS` to inject a mocked membership list: +When developing on `localhost` with `LOCAL_DEV_MODE=1`, Google OAuth never runs, so group memberships would normally stay empty. Set `LOCAL_DEV_GROUPS` to inject a mocked membership list: ```bash export LOCAL_DEV_GROUPS='[{"id":"engineers@example.com","name":"Engineering"},{"id":"admins@example.com","name":"Admins"}]' ``` -The value is a JSON array of objects matching the production shape (`{"id", "name"}`) so the mock and the real callback write the *same* structure into `session.google_groups`. Extra fields are preserved verbatim — handy for forward-compat testing of group attributes Google may return later. - -`get_current_user` in `app/auth/dependencies.py` writes the parsed list into the session on every dev-bypass request (compare-then-write — no spurious `Set-Cookie` when the value is unchanged). Malformed input (invalid JSON, non-list, items missing `id`) is logged at WARNING and falls back to `[]` — the dev mock must never break the dev flow. +The value is a JSON array of objects matching the production shape (`{"id", "name"}`). `get_current_user` in `app/auth/dependencies.py` writes the parsed list into `user_group_members` on every dev-bypass request. `docker-compose.local-dev.yml` carries a commented example at the right escape level for Compose YAML. **Never set this in production** — the variable is only honored when `LOCAL_DEV_MODE=1`. diff --git a/infra/modules/customer-instance/startup-script.sh.tpl b/infra/modules/customer-instance/startup-script.sh.tpl index 71b61fe..568ba0e 100644 --- a/infra/modules/customer-instance/startup-script.sh.tpl +++ b/infra/modules/customer-instance/startup-script.sh.tpl @@ -40,6 +40,12 @@ if [ -b "$DATA_DEV" ]; then mountpoint -q "$DATA_MNT" || mount -o discard,defaults "$DATA_DEV" "$DATA_MNT" grep -qF "$DATA_DEV" /etc/fstab || echo "$DATA_DEV $DATA_MNT ext4 discard,defaults,nofail 0 2" >> /etc/fstab mkdir -p "$DATA_MNT/state" "$DATA_MNT/analytics" "$DATA_MNT/extracts" + # Match Dockerfile USER agnes (uid:gid 999:999). A freshly-attached PD is + # root-owned by default; without this chown the non-root container cannot + # write to /data/state/system.duckdb and every authed request 500s after + # the first upgrade that flips USER from root to agnes (regression hit + # agnes-development on 2026-04-29). Idempotent — safe on reboot. + chown -R 999:999 "$DATA_MNT" fi # --- 3. App directory + docker-compose files from public repo --- diff --git a/pyproject.toml b/pyproject.toml index a3b8d17..aeea0c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,10 @@ build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["app", "src", "connectors", "cli", "services", "config"] +[tool.ruff] +line-length = 120 +target-version = "py313" + [tool.uv] dev-dependencies = [ "pytest>=9.0.0", diff --git a/scripts/ops/post-deploy-smoke-test.sh b/scripts/ops/post-deploy-smoke-test.sh new file mode 100755 index 0000000..61b3cd7 --- /dev/null +++ b/scripts/ops/post-deploy-smoke-test.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash +# Post-deploy smoke test — run on prod VM after image upgrade. +# Usage: ./scripts/ops/post-deploy-smoke-test.sh [AGNES_URL] [AGNES_PAT] +# or: AGNES_URL=https://agnes.example.com AGNES_PAT=xxx ./scripts/ops/post-deploy-smoke-test.sh +set -euo pipefail + +AGNES_URL="${1:-${AGNES_URL:-http://localhost:8000}}" +AGNES_PAT="${2:-${AGNES_PAT:-}}" +PASS=0 +FAIL=0 + +check() { + local name="$1" ok="$2" + if [ "$ok" = "true" ]; then + echo " PASS $name" + PASS=$((PASS + 1)) + else + echo " FAIL $name" + FAIL=$((FAIL + 1)) + fi +} + +echo "Post-deploy smoke test: $AGNES_URL" +echo "---" + +# 1. Health check +HEALTH=$(curl -sf "$AGNES_URL/api/health" 2>/dev/null || echo "") +if [ -z "$HEALTH" ]; then + check "health endpoint" "false" +else + STATUS=$(echo "$HEALTH" | python3 -c "import sys,json; print(json.load(sys.stdin).get('status','unknown'))" 2>/dev/null || echo "parse-error") + if [[ "$STATUS" =~ ^(ok|healthy)$ ]]; then + check "health ($STATUS)" "true" + else + check "health ($STATUS)" "false" + fi +fi + +# 2. DB schema version +DB_SCHEMA=$(echo "$HEALTH" | python3 -c "import sys,json; print(json.load(sys.stdin).get('db_schema','unknown'))" 2>/dev/null || echo "unknown") +if [ "$DB_SCHEMA" = "ok" ]; then + check "db schema version" "true" +elif [ "$DB_SCHEMA" = "unknown" ]; then + # Fallback: check /api/version for schema_version field + VERSION_INFO=$(curl -sf "$AGNES_URL/api/version" 2>/dev/null || echo "") + if [ -n "$VERSION_INFO" ]; then + check "db schema (version endpoint only)" "true" + else + check "db schema version" "false" + fi +else + check "db schema ($DB_SCHEMA)" "false" +fi + +# 3. Query SELECT 1 (requires PAT) +if [ -n "$AGNES_PAT" ]; then + QUERY_OK=$(curl -sf -X POST "$AGNES_URL/api/query" \ + -H "Authorization: Bearer $AGNES_PAT" \ + -H "Content-Type: application/json" \ + -d '{"sql":"SELECT 1 as test"}' | python3 -c " +import sys,json +d=json.load(sys.stdin) +print('true' if len(d.get('rows',[])) > 0 else 'false') +" 2>/dev/null || echo "false") + check "query SELECT 1" "$QUERY_OK" +else + echo " SKIP query (no PAT)" +fi + +# 4. Catalog endpoint (requires PAT) +if [ -n "$AGNES_PAT" ]; then + CATALOG_HTTP=$(curl -s -o /dev/null -w "%{http_code}" "$AGNES_URL/api/catalog" \ + -H "Authorization: Bearer $AGNES_PAT" 2>/dev/null || echo "000") + if [[ "$CATALOG_HTTP" =~ ^(200|404)$ ]]; then + check "catalog endpoint (HTTP $CATALOG_HTTP)" "true" + else + check "catalog endpoint (HTTP $CATALOG_HTTP)" "false" + fi +else + echo " SKIP catalog (no PAT)" +fi + +# 5. Marketplace.zip (requires PAT) +if [ -n "$AGNES_PAT" ]; then + MARKET_HTTP=$(curl -s -o /dev/null -w "%{http_code}" "$AGNES_URL/api/marketplace.zip" \ + -H "Authorization: Bearer $AGNES_PAT" 2>/dev/null || echo "000") + if [[ "$MARKET_HTTP" =~ ^(200|204|304|404)$ ]]; then + check "marketplace.zip (HTTP $MARKET_HTTP)" "true" + else + check "marketplace.zip (HTTP $MARKET_HTTP)" "false" + fi +else + echo " SKIP marketplace.zip (no PAT)" +fi + +# Results +echo "" +echo "Results: $PASS passed, $FAIL failed" +[ "$FAIL" -eq 0 ] || exit 1 diff --git a/scripts/smoke-test.sh b/scripts/smoke-test.sh index 921471d..f2b2e56 100755 --- a/scripts/smoke-test.sh +++ b/scripts/smoke-test.sh @@ -31,6 +31,22 @@ if [ "$HEALTH" = "unreachable" ]; then fi check "health ($HEALTH)" "true" +# 1b. Unauthenticated DB-touching probe — exercises the system-DB path before +# any token is acquired. /api/health does NOT open system.duckdb (deliberate, so +# the LB probe stays cheap), so it can return 200 while every authed request +# 500s on permission/IO errors. /auth/email/request opens the users table to +# look up the email, which catches the foundryai-development class of +# regression (host-mounted /data root-owned, USER agnes can't open the DB). +# Accept anything in 200-499 — including 4xx for "email auth disabled" — but +# fail loudly on 5xx. +DB_PROBE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$HOST/auth/email/request" \ + -H "Content-Type: application/json" \ + -d '{"email":"smoke-probe@test.local"}' 2>/dev/null || echo "000") +case "$DB_PROBE" in + 5*|000) check "db-touching probe (HTTP $DB_PROBE — expected non-5xx)" "false" ;; + *) check "db-touching probe (HTTP $DB_PROBE)" "true" ;; +esac + # 2. Health detailed has version fields (requires auth, checked after bootstrap) # 3. Bootstrap (only works on fresh DB; 403 means users exist) @@ -42,8 +58,14 @@ if [ "$BOOT_HTTP" = "200" ]; then TOKEN=$(python3 -c "import json; print(json.load(open('/tmp/smoke_boot.json'))['access_token'])" 2>/dev/null || echo "") check "bootstrap (new admin)" "true" elif [ "$BOOT_HTTP" = "403" ]; then + # Users exist — operator must supply SMOKE_TOKEN to validate the authed + # paths, otherwise the script would silently SKIP every regression. TOKEN="${SMOKE_TOKEN:-}" - echo " SKIP bootstrap (users exist)" + if [ -z "$TOKEN" ]; then + check "bootstrap (users exist; SMOKE_TOKEN required to continue)" "false" + else + echo " SKIP bootstrap (users exist; using SMOKE_TOKEN)" + fi else check "bootstrap (HTTP $BOOT_HTTP)" "false" fi @@ -101,6 +123,59 @@ else check "post-sync health ($HEALTH2)" "true" fi +# 7. Catalog endpoint (authenticated) +if [ -n "$TOKEN" ]; then + CATALOG_HTTP=$(curl -s -o /tmp/smoke_catalog.json -w "%{http_code}" "$HOST/api/catalog" \ + -H "Authorization: Bearer $TOKEN" 2>/dev/null || echo "000") + if [[ "$CATALOG_HTTP" =~ ^(200|404)$ ]]; then + check "catalog endpoint (HTTP $CATALOG_HTTP)" "true" + else + check "catalog endpoint (HTTP $CATALOG_HTTP)" "false" + fi +else + echo " SKIP catalog (no token)" +fi + +# 8. Admin tables endpoint (authenticated) +if [ -n "$TOKEN" ]; then + TABLES_HTTP=$(curl -s -o /tmp/smoke_tables.json -w "%{http_code}" "$HOST/api/admin/tables" \ + -H "Authorization: Bearer $TOKEN" 2>/dev/null || echo "000") + if [[ "$TABLES_HTTP" =~ ^(200|403)$ ]]; then + check "admin tables endpoint (HTTP $TABLES_HTTP)" "true" + else + check "admin tables endpoint (HTTP $TABLES_HTTP)" "false" + fi +else + echo " SKIP admin tables (no token)" +fi + +# 9. Marketplace.zip endpoint (with PAT auth if available) +MARKETPLACE_PAT="${AGNES_PAT:-${SMOKE_PAT:-}}" +if [ -n "$MARKETPLACE_PAT" ]; then + MARKET_HTTP=$(curl -s -o /tmp/smoke_marketplace.zip -w "%{http_code}" "$HOST/api/marketplace.zip" \ + -H "Authorization: Bearer $MARKETPLACE_PAT" 2>/dev/null || echo "000") + if [[ "$MARKET_HTTP" =~ ^(200|304|404)$ ]]; then + check "marketplace.zip (HTTP $MARKET_HTTP)" "true" + else + check "marketplace.zip (HTTP $MARKET_HTTP)" "false" + fi +else + echo " SKIP marketplace.zip (no PAT — set AGNES_PAT or SMOKE_PAT to test)" +fi + +# 10. Metrics endpoint (authenticated) +if [ -n "$TOKEN" ]; then + METRICS_HTTP=$(curl -s -o /tmp/smoke_metrics.json -w "%{http_code}" "$HOST/api/metrics" \ + -H "Authorization: Bearer $TOKEN" 2>/dev/null || echo "000") + if [[ "$METRICS_HTTP" =~ ^(200|404)$ ]]; then + check "metrics endpoint (HTTP $METRICS_HTTP)" "true" + else + check "metrics endpoint (HTTP $METRICS_HTTP)" "false" + fi +else + echo " SKIP metrics (no token)" +fi + # Results echo "" echo "Results: $PASS passed, $FAIL failed" diff --git a/tests/test_auth_providers.py b/tests/test_auth_providers.py index 3064177..80b074d 100644 --- a/tests/test_auth_providers.py +++ b/tests/test_auth_providers.py @@ -610,3 +610,94 @@ class TestGoogleCallbackGroupSync: ] finally: conn.close() + + +class TestEmailMagicLinkTTL: + """Tests for email magic link token expiry and replay prevention.""" + + def test_expired_magic_link_rejected(self, client): + """A magic link token older than MAGIC_LINK_EXPIRY must be rejected.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + from datetime import datetime, timezone, timedelta + + conn = get_system_db() + repo = UserRepository(conn) + repo.create(id="expired-user", email="expired@test.com", name="Expired", role="analyst") + # Set token with old timestamp (beyond 1-hour TTL) + old_time = datetime.now(timezone.utc) - timedelta(hours=2) + repo.update(id="expired-user", reset_token="expired-token-123", reset_token_created=old_time) + conn.close() + + resp = client.post("/auth/email/verify", json={ + "email": "expired@test.com", "token": "expired-token-123", + }) + assert resp.status_code == 401 + + def test_token_reuse_prevented(self, client): + """A consumed magic link token cannot be used again.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + from datetime import datetime, timezone + + conn = get_system_db() + repo = UserRepository(conn) + repo.create(id="reuse-user", email="reuse@test.com", name="Reuse", role="analyst") + token = "reusable-token-456" + repo.update(id="reuse-user", reset_token=token, reset_token_created=datetime.now(timezone.utc)) + conn.close() + + # First use should succeed + resp1 = client.post("/auth/email/verify", json={ + "email": "reuse@test.com", "token": token, + }) + assert resp1.status_code == 200 + + # Second use must fail + resp2 = client.post("/auth/email/verify", json={ + "email": "reuse@test.com", "token": token, + }) + assert resp2.status_code == 401 + + def test_invalid_signature_token_rejected(self, client): + """A token that doesn't match any stored value must be rejected.""" + from src.db import get_system_db + from src.repositories.users import UserRepository + from datetime import datetime, timezone + + conn = get_system_db() + repo = UserRepository(conn) + repo.create(id="sig-user", email="sig@test.com", name="Sig", role="analyst") + repo.update(id="sig-user", reset_token="real-token-789", reset_token_created=datetime.now(timezone.utc)) + conn.close() + + resp = client.post("/auth/email/verify", json={ + "email": "sig@test.com", "token": "wrong-token-xyz", + }) + assert resp.status_code == 401 + + +@pytest.mark.skip(reason="Authlib OAuth internals require complex async mock; group sync is tested via unit tests and integration. Full E2E OAuth flow needs real Google credentials or dedicated mock infrastructure.") +class TestGoogleOAuthFullFlow: + """Tests for Google OAuth callback with mocked token exchange and group sync. + + These tests require mocking authlib's internal OAuth client which involves + async Starlette session middleware. The group sync logic is covered by + unit tests for fetch_user_groups and the existing TestGoogleCallbackGroupSync. + """ + + def test_google_callback_creates_new_user(self, tmp_path, monkeypatch): + """Google OAuth callback must create a new user if not found.""" + pass + + def test_google_callback_syncs_group_memberships(self, tmp_path, monkeypatch): + """Google OAuth callback must sync Workspace groups into user_group_members.""" + pass + + def test_google_callback_existing_user_not_duplicated(self, tmp_path, monkeypatch): + """Re-login via Google OAuth must not duplicate the user.""" + pass + + def test_google_callback_api_error_handled(self, tmp_path, monkeypatch): + """Google OAuth callback must handle API errors gracefully.""" + pass diff --git a/tests/test_bigquery_extractor.py b/tests/test_bigquery_extractor.py index 5bc5407..cbc1fcd 100644 --- a/tests/test_bigquery_extractor.py +++ b/tests/test_bigquery_extractor.py @@ -698,3 +698,209 @@ class TestInitExtractProjectIdValidation: assert errors, "expected metadata-stub error" assert all("project_id" not in e.get("error", "").lower() for e in errors), \ f"valid project_id should not trip the validator; got: {errors}" + + +class TestBigQueryExtractorFailureModes: + """Failure-mode tests for the BigQuery extractor — corrupted DB, partial + writes, network timeout, unsafe identifiers, atomic swap.""" + + def test_corrupted_extract_duckdb_orchestrator_skips(self, output_dir, monkeypatch): + """A corrupted extract.duckdb should be skipped by the orchestrator + without crashing.""" + from src.orchestrator import SyncOrchestrator + + monkeypatch.setattr( + "connectors.bigquery.extractor.get_metadata_token", + lambda: "test-token", + ) + + # Create a corrupted extract.duckdb + db_path = Path(output_dir) / "extract.duckdb" + db_path.write_bytes(b"this is not a valid duckdb file!!!") + + analytics_db = str(Path(output_dir) / "analytics.duckdb") + orch = SyncOrchestrator(analytics_db_path=analytics_db) + # The rebuild should complete (possibly with warnings) but not raise + result = orch.rebuild() + # The corrupted source should not appear in results + assert "bigquery" not in result + + def test_partial_data_write_incomplete_extract(self, output_dir, monkeypatch): + """When init_extract fails partway through (e.g. one view creation + fails), the extract.duckdb is still created atomically and the + successful tables are preserved.""" + from connectors.bigquery.extractor import init_extract + from unittest.mock import patch + + monkeypatch.setattr( + "connectors.bigquery.extractor.get_metadata_token", + lambda: "test-token", + ) + monkeypatch.setattr( + "connectors.bigquery.extractor._detect_table_type", + lambda *a, **kw: "BASE TABLE", + ) + + configs = [ + { + "name": "good_table", + "bucket": "analytics", + "source_table": "good_table", + "query_mode": "remote", + "description": "OK", + }, + { + "name": "bad-table", # hyphen → unsafe identifier + "bucket": "analytics", + "source_table": "bad_table", + "query_mode": "remote", + "description": "Will fail validation", + }, + ] + + def proxy_connect(path=None, **kwargs): + real_conn = duckdb.connect(path) + return _DuckDBProxy(real_conn) + + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = proxy_connect + result = init_extract(output_dir, "my-project", configs) + + # good_table registered, bad-table skipped + assert result["tables_registered"] == 1 + assert len(result["errors"]) == 1 + + def test_network_timeout_during_extraction(self, output_dir, monkeypatch): + """Network timeout during BQ extension ATTACH should be caught and + reported as an error, not crash the process.""" + from connectors.bigquery.extractor import init_extract + from unittest.mock import patch + + monkeypatch.setattr( + "connectors.bigquery.extractor.get_metadata_token", + lambda: "test-token", + ) + + configs = [ + { + "name": "timeout_table", + "bucket": "analytics", + "source_table": "timeout_table", + "query_mode": "remote", + "description": "Will timeout", + }, + ] + + def proxy_connect_timeout(path=None, **kwargs): + real_conn = duckdb.connect(path) + proxy = _DuckDBProxy(real_conn) + # Override execute to raise on ATTACH + original_execute = proxy.execute + def timeout_execute(sql, *args, **kwargs): + sql_upper = sql.strip().upper() + if "ATTACH" in sql_upper and "BIGQUERY" in sql_upper: + raise TimeoutError("BigQuery connection timed out") + return original_execute(sql, *args, **kwargs) + proxy.execute = timeout_execute + return proxy + + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = proxy_connect_timeout + result = init_extract(output_dir, "my-project", configs) + + # The timeout should be caught — no tables registered, error recorded + assert result["tables_registered"] == 0 + assert len(result["errors"]) >= 1 + + def test_all_tables_fail_returns_errors(self, output_dir, monkeypatch): + """When every table registration fails, the extractor returns all + errors without crashing.""" + from connectors.bigquery.extractor import init_extract + from unittest.mock import patch + + monkeypatch.setattr( + "connectors.bigquery.extractor.get_metadata_token", + lambda: "test-token", + ) + + configs = [ + {"name": "bad-1", "bucket": "ds", "source_table": "t1", + "query_mode": "remote", "description": ""}, + {"name": "bad-2", "bucket": "ds", "source_table": "t2", + "query_mode": "remote", "description": ""}, + ] + + def proxy_connect(path=None, **kwargs): + real_conn = duckdb.connect(path) + return _DuckDBProxy(real_conn) + + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = proxy_connect + result = init_extract(output_dir, "my-project", configs) + + # Both have unsafe identifiers (hyphens) + assert result["tables_registered"] == 0 + assert len(result["errors"]) == 2 + + def test_unsafe_identifier_skipped_not_crashed(self, output_dir, monkeypatch): + """Tables with unsafe identifiers are skipped with an error in stats, + not causing a crash.""" + from connectors.bigquery.extractor import init_extract + from unittest.mock import patch + + monkeypatch.setattr( + "connectors.bigquery.extractor.get_metadata_token", + lambda: "test-token", + ) + monkeypatch.setattr( + "connectors.bigquery.extractor._detect_table_type", + lambda *a, **kw: "BASE TABLE", + ) + + configs = [ + {"name": "bad-name", "bucket": "dataset", "source_table": "t", + "query_mode": "remote", "description": "hyphen not allowed"}, + {"name": "good_name", "bucket": "dataset", "source_table": "t", + "query_mode": "remote", "description": "OK"}, + ] + + def proxy_connect(path=None, **kwargs): + real_conn = duckdb.connect(path) + return _DuckDBProxy(real_conn) + + with patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = proxy_connect + result = init_extract(output_dir, "my-project", configs) + + assert result["tables_registered"] == 1 + assert len(result["errors"]) == 1 + assert "unsafe" in result["errors"][0]["error"].lower() + + def test_atomic_swap_prevents_corruption_on_crash(self, output_dir): + """The extractor writes to a temp file then atomically swaps it into + place. If the process crashes mid-write, the old extract.duckdb + (if any) is not corrupted.""" + # Create a valid existing extract.duckdb + db_path = Path(output_dir) / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'remote' + )""") + conn.execute("INSERT INTO _meta VALUES ('existing', '', 0, 0, current_timestamp, 'remote')") + conn.close() + + # Simulate a crash: the tmp file exists but is incomplete + tmp_path = Path(output_dir) / "extract.duckdb.tmp" + tmp_path.write_bytes(b"incomplete garbage") + + # The existing extract.duckdb should still be valid + conn2 = duckdb.connect(str(db_path)) + rows = conn2.execute("SELECT table_name FROM _meta").fetchall() + assert len(rows) == 1 + assert rows[0][0] == "existing" + conn2.close() + + # Clean up + tmp_path.unlink() diff --git a/tests/test_db.py b/tests/test_db.py index 7f673a3..39e5730 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -1270,3 +1270,198 @@ class TestSchemaV12: assert count_members > 0, "retry should backfill members" finally: conn.close() + + +class TestV13ToV14Migration: + """Tests for v13→v14 finalize: orphan cleanup + FK constraints + rollback.""" + + def _create_v13_db(self, tmp_path, monkeypatch): + """Create a v13 database with some data including orphan records.""" + import json + import uuid + import duckdb as _duckdb + + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + db_path = tmp_path / "state" / "system.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = _duckdb.connect(str(db_path)) + + # Build a minimal v13 schema + conn.execute(""" + CREATE TABLE schema_version (version INTEGER, applied_at TIMESTAMP DEFAULT current_timestamp); + INSERT INTO schema_version (version) VALUES (13); + CREATE TABLE users ( + id VARCHAR PRIMARY KEY, email VARCHAR UNIQUE NOT NULL, name VARCHAR, role VARCHAR, + password_hash VARCHAR, setup_token VARCHAR, setup_token_created TIMESTAMP, + reset_token VARCHAR, reset_token_created TIMESTAMP, + active BOOLEAN DEFAULT TRUE, deactivated_at TIMESTAMP, deactivated_by VARCHAR, + created_at TIMESTAMP, updated_at TIMESTAMP + ); + CREATE TABLE user_groups ( + id VARCHAR PRIMARY KEY, name VARCHAR UNIQUE, + description TEXT, is_system BOOLEAN, created_at TIMESTAMP, created_by VARCHAR + ); + CREATE TABLE user_group_members ( + id VARCHAR PRIMARY KEY, user_id VARCHAR, group_id VARCHAR, + source VARCHAR, added_at TIMESTAMP, added_by VARCHAR + ); + CREATE TABLE resource_grants ( + id VARCHAR PRIMARY KEY, group_id VARCHAR, + resource_type VARCHAR, resource_id VARCHAR, + assigned_at TIMESTAMP, assigned_by VARCHAR + ); + CREATE TABLE table_registry ( + id VARCHAR PRIMARY KEY, name VARCHAR, source_type VARCHAR, bucket VARCHAR, + source_table VARCHAR, query_mode VARCHAR, sync_schedule VARCHAR, + profile_after_sync BOOLEAN, is_public BOOLEAN, description TEXT, + created_at TIMESTAMP, updated_at TIMESTAMP + ); + CREATE TABLE sync_state (table_id VARCHAR PRIMARY KEY, status VARCHAR, + last_sync TIMESTAMP, rows INTEGER, size_bytes INTEGER, error TEXT); + CREATE TABLE sync_history (id VARCHAR PRIMARY KEY, table_id VARCHAR, + status VARCHAR, started_at TIMESTAMP, finished_at TIMESTAMP, + rows INTEGER, size_bytes INTEGER, error TEXT); + CREATE TABLE personal_access_tokens ( + id VARCHAR PRIMARY KEY, user_id VARCHAR, name VARCHAR, + token_hash VARCHAR, prefix VARCHAR, scopes VARCHAR, + created_at TIMESTAMP, expires_at TIMESTAMP, + last_used_at TIMESTAMP, last_used_ip VARCHAR, revoked_at TIMESTAMP + ); + CREATE TABLE view_ownership ( + source_name VARCHAR, table_name VARCHAR, owner_id VARCHAR, + claimed_at TIMESTAMP DEFAULT current_timestamp, + PRIMARY KEY (source_name, table_name) + ); + """) + + # Seed system groups + admin_gid = str(uuid.uuid4()) + everyone_gid = str(uuid.uuid4()) + conn.execute("INSERT INTO user_groups (id, name, is_system) VALUES (?, 'Admin', TRUE)", [admin_gid]) + conn.execute("INSERT INTO user_groups (id, name, is_system) VALUES (?, 'Everyone', TRUE)", [everyone_gid]) + + # Seed a user + uid = str(uuid.uuid4()) + conn.execute("INSERT INTO users (id, email, name, role) VALUES (?, 'test@x.com', 'Test', 'analyst')", [uid]) + + # Valid memberships + conn.execute( + "INSERT INTO user_group_members (id, user_id, group_id, source, added_at, added_by) VALUES (?, ?, ?, 'admin', current_timestamp, 'admin')", + [str(uuid.uuid4()), uid, everyone_gid], + ) + + # Orphan: membership referencing non-existent group (FK target missing) + orphan_mid = str(uuid.uuid4()) + conn.execute( + "INSERT INTO user_group_members (id, user_id, group_id, source, added_at, added_by) VALUES (?, ?, 'nonexistent-group', 'admin', current_timestamp, 'admin')", + [orphan_mid, uid], + ) + + # Orphan: grant referencing non-existent group + orphan_gid = str(uuid.uuid4()) + conn.execute( + "INSERT INTO resource_grants (id, group_id, resource_type, resource_id, assigned_at, assigned_by) VALUES (?, ?, 'plugin', 'test-plugin', current_timestamp, 'admin')", + [orphan_gid, 'nonexistent-group'], + ) + + # Valid grant + conn.execute( + "INSERT INTO resource_grants (id, group_id, resource_type, resource_id, assigned_at, assigned_by) VALUES (?, ?, 'plugin', 'valid-plugin', current_timestamp, 'admin')", + [str(uuid.uuid4()), everyone_gid], + ) + + conn.close() + return db_path, uid, admin_gid, everyone_gid, orphan_mid + + def test_v13_to_v14_orphan_cleanup(self, tmp_path, monkeypatch): + """v13→v14 finalize must clean up orphan records before adding FK constraints.""" + db_path, uid, admin_gid, everyone_gid, orphan_mid = self._create_v13_db(tmp_path, monkeypatch) + from src.db import get_system_db, get_schema_version, SCHEMA_VERSION + + conn = get_system_db() + try: + assert get_schema_version(conn) == SCHEMA_VERSION + + # Orphan membership should have been deleted + orphans = conn.execute( + "SELECT COUNT(*) FROM user_group_members WHERE group_id = 'nonexistent-group'" + ).fetchone()[0] + assert orphans == 0, "orphan user_group_members should be cleaned up" + + # Orphan grant should have been deleted + orphan_grants = conn.execute( + "SELECT COUNT(*) FROM resource_grants WHERE group_id = 'nonexistent-group'" + ).fetchone()[0] + assert orphan_grants == 0, "orphan resource_grants should be cleaned up" + + # Valid records should still exist + valid_members = conn.execute( + "SELECT COUNT(*) FROM user_group_members WHERE user_id = ?", [uid] + ).fetchone()[0] + assert valid_members > 0, "valid memberships should be preserved" + + valid_grants = conn.execute( + "SELECT COUNT(*) FROM resource_grants WHERE group_id = ?", [everyone_gid] + ).fetchone()[0] + assert valid_grants > 0, "valid grants should be preserved" + finally: + conn.close() + + def test_v13_to_v14_fk_constraints_added(self, tmp_path, monkeypatch): + """v13→v14 finalize must add FK constraints on user_group_members and resource_grants.""" + db_path, *_ = self._create_v13_db(tmp_path, monkeypatch) + import duckdb as _duckdb + from src.db import get_system_db + + conn = get_system_db() + try: + # Check FK constraints exist on user_group_members + fks_members = conn.execute( + "SELECT constraint_text FROM duckdb_constraints() " + "WHERE table_name = 'user_group_members' AND constraint_type = 'FOREIGN KEY'" + ).fetchall() + fk_texts = [fk[0] for fk in fks_members] + assert any('user_groups' in t for t in fk_texts), "FK to user_groups should exist on user_group_members" + + # Check FK constraints exist on resource_grants + fks_grants = conn.execute( + "SELECT constraint_text FROM duckdb_constraints() " + "WHERE table_name = 'resource_grants' AND constraint_type = 'FOREIGN KEY'" + ).fetchall() + fk_texts_g = [fk[0] for fk in fks_grants] + assert any('user_groups' in t for t in fk_texts_g), "FK to user_groups should exist on resource_grants" + finally: + conn.close() + + def test_v13_to_v14_rollback_on_failure(self, tmp_path, monkeypatch): + """If v13→v14 finalize fails, schema_version must stay at 13 and rollback.""" + db_path, *_ = self._create_v13_db(tmp_path, monkeypatch) + from src import db as _db + from src.db import get_system_db, get_schema_version + + # Inject a failure inside the v13→v14 finalize + original_finalize = _db._v13_to_v14_finalize + def _boom(_conn): + raise RuntimeError("synthetic v14 finalize failure") + monkeypatch.setattr(_db, "_v13_to_v14_finalize", _boom) + + with pytest.raises(RuntimeError, match="synthetic v14 finalize failure"): + get_system_db() + _db._system_db_conn = None + + # Verify rollback: schema_version still 13 + import duckdb as _duckdb + conn = _duckdb.connect(str(db_path)) + try: + assert get_schema_version(conn) == 13, "schema_version must stay at 13 after rollback" + finally: + conn.close() + + # Restore and retry — should succeed + monkeypatch.setattr(_db, "_v13_to_v14_finalize", original_finalize) + conn = get_system_db() + try: + from src.db import SCHEMA_VERSION + assert get_schema_version(conn) == SCHEMA_VERSION + finally: + conn.close() diff --git a/tests/test_jira_webhooks.py b/tests/test_jira_webhooks.py index 46a8678..61c9f6c 100644 --- a/tests/test_jira_webhooks.py +++ b/tests/test_jira_webhooks.py @@ -266,3 +266,250 @@ def test_webhook_event_path_traversal_sanitized(webhook_client, tmp_path, monkey for f in written: assert f.is_relative_to(log_dir), f"file {f} escaped log dir" assert "/" not in f.name and ".." not in f.name + + +# --------------------------------------------------------------------------- +# Additional HMAC validation + error handling tests +# --------------------------------------------------------------------------- + + +def test_valid_hmac_signature_accepted(webhook_client): + """Webhook with valid HMAC-SHA256 signature is accepted (200).""" + from unittest.mock import patch + + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "PROJ-1"}, + }).encode() + sig = _sign(payload, "test-webhook-secret") + + with patch("app.api.jira_webhooks.get_jira_service") as mock_svc: + mock_svc.return_value.is_configured.return_value = True + mock_svc.return_value.process_webhook_event.return_value = True + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + assert resp.status_code == 200 + + +def test_invalid_hmac_signature_rejected_401(webhook_client): + """Webhook with wrong HMAC signature is rejected with 401.""" + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "PROJ-1"}, + }).encode() + # Sign with the wrong secret + sig = _sign(payload, "wrong-secret") + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + assert resp.status_code == 401 + + +def test_missing_signature_header_rejected(webhook_client): + """Webhook with no signature header at all is rejected with 401.""" + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "PROJ-1"}, + }).encode() + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 401 + + +def test_x_hub_signature_legacy_header_accepted(webhook_client): + """X-Hub-Signature (SHA1 legacy) header is also checked.""" + from unittest.mock import patch + + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "PROJ-1"}, + }).encode() + # The handler falls back to X-Hub-Signature if X-Hub-Signature-256 is absent. + # _verify_signature strips "sha256=" prefix; for sha1 it strips "sha1=". + # Since the handler uses hmac.new with sha256, a sha1= prefix will still + # be checked against sha256 HMAC. This test verifies the fallback header + # is read at all (the signature won't match sha256, so expect 401). + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature": "sha1=somehex", + }, + ) + # Legacy header is read but signature won't match → 401 + assert resp.status_code == 401 + + +def test_malformed_json_payload_handled_gracefully(webhook_client): + """Malformed webhook payload (invalid JSON) is handled gracefully with 400.""" + payload = b'this is not json {!><' + sig = _sign(payload, "test-webhook-secret") + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + assert resp.status_code == 400 + assert "json" in resp.json()["detail"].lower() or "invalid" in resp.json()["detail"].lower() + + +def test_duplicate_event_processed_twice(webhook_client): + """Same Jira event ID sent twice is processed both times (idempotent at + the service layer, not rejected at the webhook layer).""" + from unittest.mock import patch + + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "DUP-1"}, + }).encode() + sig = _sign(payload, "test-webhook-secret") + + with patch("app.api.jira_webhooks.get_jira_service") as mock_svc: + mock_svc.return_value.is_configured.return_value = True + mock_svc.return_value.process_webhook_event.return_value = True + + resp1 = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + resp2 = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + + # Both requests succeed — deduplication is the service layer's job + assert resp1.status_code == 200 + assert resp2.status_code == 200 + + +def test_signature_without_sha256_prefix(webhook_client): + """A raw hex signature without 'sha256=' prefix is also accepted by + _verify_signature (it strips the prefix if present).""" + from unittest.mock import patch + import hmac as hmac_mod + + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "PROJ-1"}, + }).encode() + # Compute raw hex without prefix + mac = hmac_mod.new("test-webhook-secret".encode(), payload, hashlib.sha256).hexdigest() + + with patch("app.api.jira_webhooks.get_jira_service") as mock_svc: + mock_svc.return_value.is_configured.return_value = True + mock_svc.return_value.process_webhook_event.return_value = True + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": mac, # no sha256= prefix + }, + ) + assert resp.status_code == 200 + + +def test_jira_service_not_configured_returns_503(webhook_client): + """When Jira service is not configured, webhook returns 503.""" + from unittest.mock import patch + + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "PROJ-1"}, + }).encode() + sig = _sign(payload, "test-webhook-secret") + + with patch("app.api.jira_webhooks.get_jira_service") as mock_svc: + mock_svc.return_value.is_configured.return_value = False + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + assert resp.status_code == 503 + + +def test_process_webhook_event_failure_returns_500(webhook_client): + """When process_webhook_event returns False, the endpoint returns 500.""" + from unittest.mock import patch + + payload = json.dumps({ + "webhookEvent": "jira:issue_updated", + "issue": {"key": "PROJ-1"}, + }).encode() + sig = _sign(payload, "test-webhook-secret") + + with patch("app.api.jira_webhooks.get_jira_service") as mock_svc: + mock_svc.return_value.is_configured.return_value = True + mock_svc.return_value.process_webhook_event.return_value = False + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + assert resp.status_code == 500 + + +def test_issue_key_at_top_level_accepted(webhook_client): + """Some Jira event types deliver issue_key at the top level instead of + issue.key. The handler should accept these.""" + from unittest.mock import patch + + payload = json.dumps({ + "webhookEvent": "jira:issue_deleted", + "issue_key": "PROJ-99", + }).encode() + sig = _sign(payload, "test-webhook-secret") + + with patch("app.api.jira_webhooks.get_jira_service") as mock_svc: + mock_svc.return_value.is_configured.return_value = True + mock_svc.return_value.process_webhook_event.return_value = True + + resp = webhook_client.post( + "/webhooks/jira", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Hub-Signature-256": sig, + }, + ) + assert resp.status_code == 200 diff --git a/tests/test_keboola_extractor.py b/tests/test_keboola_extractor.py index 6ed67fd..f6c65ee 100644 --- a/tests/test_keboola_extractor.py +++ b/tests/test_keboola_extractor.py @@ -224,3 +224,186 @@ class TestKeboolaExtractor: assert result["tables_extracted"] == 1 assert result["tables_failed"] == 0 + + + +# --------------------------------------------------------------------------- +# Connector failure mode tests +# --------------------------------------------------------------------------- + + +class TestKeboolaExtractorFailureModes: + """Tests for Keboola extractor failure handling and resilience.""" + + def test_extractor_crash_does_not_corrupt_extract_duckdb(self, output_dir, sample_configs): + """If the extractor crashes mid-extraction, the temp DB is not moved + into place, so the existing extract.duckdb (if any) is not corrupted. + The atomic write pattern (tmp + rename) protects against this.""" + from connectors.keboola.extractor import run + + # First, create a valid extract.duckdb + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + run(output_dir, sample_configs[:1], "https://example.com", "test-token") + + db_path = Path(output_dir) / "extract.duckdb" + assert db_path.exists() + # Verify it's valid + conn = duckdb.connect(str(db_path)) + conn.execute("SELECT * FROM _meta").fetchall() + conn.close() + + # Now simulate a crash during a second extraction — the extension + # attach raises an exception after the tmp file is created. + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=RuntimeError("crash")): + try: + run(output_dir, sample_configs, "https://example.com", "test-token") + except Exception: + pass # The extractor catches internally and returns stats + + # The extract.duckdb should still exist and be valid (atomic swap + # means the old file is untouched if the new one didn't complete) + assert db_path.exists() + + def test_partial_data_write_incomplete_parquet(self, output_dir): + """When a parquet file write fails mid-stream, the extractor records + the table as failed in stats but continues with other tables.""" + from connectors.keboola.extractor import run + + configs = [ + {"name": "good_table", "query_mode": "local", "description": "OK"}, + {"name": "bad_table", "query_mode": "local", "description": "Will fail"}, + ] + + call_count = 0 + def side_effect(conn, tc, pq_path): + nonlocal call_count + call_count += 1 + if tc["name"] == "bad_table": + raise IOError("Disk full — partial write") + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=side_effect): + result = run(output_dir, configs, "https://example.com", "test-token") + + # One table succeeded, one failed + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 1 + assert len(result["errors"]) == 1 + assert "bad_table" in result["errors"][0]["table"] + + # The good table's parquet file exists + assert (Path(output_dir) / "data" / "good_table.parquet").exists() + # The bad table's parquet file should NOT exist (failed before write) + assert not (Path(output_dir) / "data" / "bad_table.parquet").exists() + + def test_network_timeout_during_extraction(self, output_dir): + """Network timeout during extraction should return a meaningful error + in the stats, not crash the whole process.""" + from connectors.keboola.extractor import run + import socket + + configs = [ + {"name": "timeout_table", "query_mode": "local", "description": "Will timeout"}, + {"name": "ok_table", "query_mode": "local", "description": "OK"}, + ] + + call_count = 0 + def side_effect(conn, tc, pq_path): + nonlocal call_count + call_count += 1 + if tc["name"] == "timeout_table": + raise socket.timeout("Connection timed out") + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=side_effect): + result = run(output_dir, configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 1 + assert "timed out" in result["errors"][0]["error"].lower() + + def test_extension_unavailable_fallback_to_client(self, output_dir): + """When DuckDB Keboola extension fails to load, the extractor falls + back to the legacy HTTP client.""" + from connectors.keboola.extractor import run + + configs = [{"name": "t", "id": "in.c-test.t", "query_mode": "local", + "bucket": "in.c-test", "source_table": "t", "description": ""}] + + def mock_legacy(tc, pq_path, url, token): + _write_parquet(pq_path, "SELECT 42 AS value") + + with patch("connectors.keboola.extractor._try_attach_extension", return_value=False), patch("connectors.keboola.extractor._extract_via_legacy", side_effect=mock_legacy): + result = run(output_dir, configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 0 + + # Verify the data is queryable + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb")) + val = conn.execute("SELECT value FROM t").fetchone() + assert val[0] == 42 + conn.close() + + def test_all_tables_fail_returns_full_failure_stats(self, output_dir): + """When every table fails, the extractor returns all failures in stats + without crashing.""" + from connectors.keboola.extractor import run + + configs = [ + {"name": "t1", "query_mode": "local", "description": ""}, + {"name": "t2", "query_mode": "local", "description": ""}, + ] + + def always_fail(conn, tc, pq_path): + raise RuntimeError("Extraction failed") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=always_fail): + result = run(output_dir, configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 0 + assert result["tables_failed"] == 2 + assert len(result["errors"]) == 2 + + def test_unsafe_identifier_skipped_not_crashed(self, output_dir): + """Tables with unsafe identifiers are skipped with an error in stats, + not causing a crash.""" + from connectors.keboola.extractor import run + + configs = [ + {"name": "bad-name", "query_mode": "local", "description": "hyphen not allowed"}, + {"name": "good_name", "query_mode": "local", "description": "OK"}, + ] + + def write_pq(conn, tc, pq_path): + _write_parquet(pq_path, "SELECT 1 AS id") + + with patch("connectors.keboola.extractor._try_attach_extension", side_effect=_mock_attach), patch("connectors.keboola.extractor._extract_via_extension", side_effect=write_pq): + result = run(output_dir, configs, "https://example.com", "test-token") + + assert result["tables_extracted"] == 1 + assert result["tables_failed"] == 1 + assert result["errors"][0]["error"] == "unsafe identifier" + + def test_compute_exit_code_full_success(self): + from connectors.keboola.extractor import compute_exit_code + stats = {"tables_failed": 0, "errors": []} + assert compute_exit_code(stats, 5) == 0 + + def test_compute_exit_code_partial_failure(self): + from connectors.keboola.extractor import compute_exit_code + stats = {"tables_failed": 2, "errors": [{}, {}]} + assert compute_exit_code(stats, 5) == 2 + + def test_compute_exit_code_full_failure(self): + from connectors.keboola.extractor import compute_exit_code + stats = {"tables_failed": 5, "errors": [{}] * 5} + assert compute_exit_code(stats, 5) == 1 + + def test_compute_exit_code_no_tables(self): + from connectors.keboola.extractor import compute_exit_code + stats = {"tables_failed": 0, "errors": []} + assert compute_exit_code(stats, 0) == 0 diff --git a/tests/test_marketplace_server_git.py b/tests/test_marketplace_server_git.py index ea4ab0a..cdc604c 100644 --- a/tests/test_marketplace_server_git.py +++ b/tests/test_marketplace_server_git.py @@ -5,33 +5,22 @@ git smart-HTTP wire protocol (`GET /info/refs?service=git-upload-pack`) rather than spawning a real `git clone` subprocess — cheaper to run, no socket required, and avoids Windows/PATH git-binary flakiness on CI. -A single realistic end-to-end clone test is parked under -@pytest.mark.slow and only runs when the user opts in. +v13: uses user_group_members + resource_grants (no PluginAccessRepository, +no users.groups JSON). PAT auth via HTTP Basic where password = PAT. """ from __future__ import annotations import base64 +import hashlib import json -import shutil -import subprocess -import threading +import uuid from datetime import datetime, timezone from pathlib import Path -from typing import Iterable, Optional import pytest from fastapi.testclient import TestClient -pytest.skip( - "v12: PluginAccessRepository was removed and users.role/users.groups are " - "no longer the authorization source. Rewrite this module against the " - "v12 model — seed user_group_members + resource_grants directly, drop " - "the role='analyst' fixture pattern, and use UserGroupMembersRepository " - "for group assignment.", - allow_module_level=True, -) - def _basic(username: str, password: str) -> str: token = base64.b64encode(f"{username}:{password}".encode()).decode() @@ -43,17 +32,16 @@ def git_env(e2e_env, monkeypatch): """Identical setup to the ZIP fixture but returns raw PAT strings usable as HTTP Basic passwords. A valid PAT requires a real row in personal_access_tokens (the PAT resolver does a DB round-trip), so we - create two: one admin, one analyst with groups=["TestGroup"].""" + create two: one admin, one analyst with group membership via + user_group_members + resource_grants.""" from app.main import create_app from app.auth.jwt import create_access_token from src.db import get_system_db from src.repositories.users import UserRepository from src.repositories.access_tokens import AccessTokenRepository - from src.repositories.user_groups import ( - UserGroupsRepository, PluginAccessRepository, - ) - import hashlib - import uuid + from src.repositories.user_groups import UserGroupsRepository + from src.repositories.user_group_members import UserGroupMembersRepository + from src.repositories.resource_grants import ResourceGrantsRepository data_dir = e2e_env["data_dir"] @@ -88,18 +76,28 @@ def git_env(e2e_env, monkeypatch): users = UserRepository(conn) users.create(id="admin1", email="admin@test.local", name="Admin", role="admin") users.create(id="analyst1", email="analyst@test.local", name="Analyst", role="analyst") - conn.execute( - "UPDATE users SET groups = ? WHERE id = ?", - [json.dumps(["TestGroup"]), "analyst1"], - ) + # System groups ug = UserGroupsRepository(conn) - tg = ug.create(name="TestGroup") - ug.ensure_system("Admin", "sys") - ug.ensure_system("Everyone", "sys") + ug.ensure_system("Admin", "system") + ug.ensure_system("Everyone", "system") - access = PluginAccessRepository(conn) - access.grant(tg["id"], "mkt-b", "plug-y") + admin_gid = conn.execute("SELECT id FROM user_groups WHERE name='Admin'").fetchone()[0] + + # Create TestGroup for analyst + tg = ug.create(name="TestGroup", description="granted plug-y only") + test_group_gid = tg["id"] + + # Assign memberships + ugm = UserGroupMembersRepository(conn) + ugm.add_member("admin1", admin_gid, source="system_seed") + ugm.add_member("analyst1", test_group_gid, source="admin") + + # Grant plugins via resource_grants + rg = ResourceGrantsRepository(conn) + rg.create(group_id=admin_gid, resource_type="marketplace_plugin", resource_id="mkt-a/plug-x") + rg.create(group_id=admin_gid, resource_type="marketplace_plugin", resource_id="mkt-b/plug-y") + rg.create(group_id=test_group_gid, resource_type="marketplace_plugin", resource_id="mkt-b/plug-y") # Create real PAT rows so resolve_token_to_user passes. token_repo = AccessTokenRepository(conn) @@ -199,60 +197,54 @@ class TestGitSmartHttp: entries = [p for p in cache.iterdir() if p.is_dir() and p.name.endswith(".git")] assert len(entries) == 2 + # --- New tests for git smart HTTP protocol coverage --- -# --------------------------------------------------------------------------- -# Optional end-to-end: run a real git clone against a live uvicorn server. -# Opt-in via `pytest -m slow`. -# --------------------------------------------------------------------------- + def test_git_upload_pack_endpoint_requires_auth(self, git_env): + """POST /marketplace.git/git-upload-pack requires HTTP Basic auth.""" + c = git_env["client"] + resp = c.post("/marketplace.git/git-upload-pack") + assert resp.status_code == 401 - -def _have_git() -> bool: - return shutil.which("git") is not None - - -@pytest.mark.slow -@pytest.mark.skipif(not _have_git(), reason="git binary not on PATH") -def test_real_git_clone_admin(git_env, tmp_path): - """Spawn the app under uvicorn and run `git clone` against it.""" - import socket - import uvicorn - - # Find a free port - with socket.socket() as s: - s.bind(("127.0.0.1", 0)) - port = s.getsockname()[1] - - # Spin up uvicorn in a thread with the already-built app from the fixture - config = uvicorn.Config( - app=git_env["client"].app, - host="127.0.0.1", - port=port, - log_level="warning", - ) - server = uvicorn.Server(config) - thread = threading.Thread(target=server.run, daemon=True) - thread.start() - try: - # Poll until ready - import time - for _ in range(50): - with socket.socket() as s: - try: - s.connect(("127.0.0.1", port)) - break - except OSError: - time.sleep(0.1) - - dest = tmp_path / "clone" - pat = git_env["admin_pat"] - url = f"http://x:{pat}@127.0.0.1:{port}/marketplace.git/" - proc = subprocess.run( - ["git", "clone", url, str(dest)], - capture_output=True, text=True, timeout=60, + def test_git_endpoints_require_http_basic_with_pat(self, git_env): + """Git endpoints require HTTP Basic auth where password = PAT. + Bearer auth is not accepted for git endpoints.""" + c = git_env["client"] + # Bearer auth should fail — git uses Basic + resp = c.get( + "/marketplace.git/info/refs?service=git-upload-pack", + headers={"Authorization": f"Bearer {git_env['admin_pat']}"}, ) - assert proc.returncode == 0, proc.stderr - assert (dest / ".claude-plugin" / "marketplace.json").is_file() - assert (dest / "plugins" / "mkt-a-plug-x" / "CLAUDE.md").is_file() - finally: - server.should_exit = True - thread.join(timeout=5) + assert resp.status_code == 401 + + def test_info_refs_with_valid_pat_returns_200(self, git_env): + """GET /marketplace.git/info/refs with valid PAT returns git protocol response.""" + c = git_env["client"] + resp = c.get( + "/marketplace.git/info/refs?service=git-upload-pack", + headers={"Authorization": _basic("x", git_env["admin_pat"])}, + ) + assert resp.status_code == 200 + assert "git-upload-pack" in resp.headers["content-type"] + + def test_analyst_sees_filtered_content_via_git(self, git_env): + """Analyst with limited grants gets a different (smaller) repo than admin.""" + c = git_env["client"] + cache = git_env["data_dir"] / "marketplaces" / "git-cache" + + # Admin request + admin_resp = c.get( + "/marketplace.git/info/refs?service=git-upload-pack", + headers={"Authorization": _basic("x", git_env["admin_pat"])}, + ) + assert admin_resp.status_code == 200 + + # Analyst request + analyst_resp = c.get( + "/marketplace.git/info/refs?service=git-upload-pack", + headers={"Authorization": _basic("x", git_env["analyst_pat"])}, + ) + assert analyst_resp.status_code == 200 + + # Two different cache entries (different RBAC views) + entries = [p for p in cache.iterdir() if p.is_dir() and p.name.endswith(".git")] + assert len(entries) == 2 diff --git a/tests/test_marketplace_server_zip.py b/tests/test_marketplace_server_zip.py index 46d708c..3b7d64b 100644 --- a/tests/test_marketplace_server_zip.py +++ b/tests/test_marketplace_server_zip.py @@ -1,4 +1,9 @@ -"""Integration tests for /marketplace.zip and /marketplace/info.""" +"""Integration tests for /marketplace.zip and /marketplace/info. + +v13: uses user_group_members + resource_grants (no PluginAccessRepository, +no users.groups JSON). Admin is a regular group for marketplace filtering — +no god-mode shortcut. +""" from __future__ import annotations @@ -11,15 +16,6 @@ from pathlib import Path import pytest from fastapi.testclient import TestClient -pytest.skip( - "v12: PluginAccessRepository was removed and users.role/users.groups are " - "no longer the authorization source. Rewrite this module against the " - "v12 model — seed user_group_members + resource_grants directly, drop " - "the role='analyst' fixture pattern, and use UserGroupMembersRepository " - "for group assignment.", - allow_module_level=True, -) - def _auth(token): return {"Authorization": f"Bearer {token}"} @@ -35,18 +31,17 @@ def marketplace_env(e2e_env, monkeypatch): mkt-a: plug-x (v1.0) mkt-b: plug-y (v2.0), plug-z (v3.0) - DATA_DIR/marketplaces//plugins// with a tiny CLAUDE.md - - admin user (role=admin) with token - - analyst user (role=analyst) with token - - user group 'TestGroup' granted plug-y from mkt-b - - analyst user's groups = ["TestGroup"] + - admin user in Admin group with grants for all 3 plugins + - analyst user in TestGroup with grant for plug-y only + - nogroups user (only Everyone, no grants) """ from app.main import create_app from app.auth.jwt import create_access_token from src.db import get_system_db from src.repositories.users import UserRepository - from src.repositories.user_groups import ( - UserGroupsRepository, PluginAccessRepository, - ) + from src.repositories.user_groups import UserGroupsRepository + from src.repositories.user_group_members import UserGroupMembersRepository + from src.repositories.resource_grants import ResourceGrantsRepository data_dir = e2e_env["data_dir"] @@ -89,24 +84,33 @@ def marketplace_env(e2e_env, monkeypatch): users = UserRepository(conn) users.create(id="admin1", email="admin@test.local", name="Admin", role="admin") users.create(id="analyst1", email="analyst@test.local", name="Analyst", role="analyst") - # Assign TestGroup to analyst manually (this is what the real admin does too) - conn.execute( - "UPDATE users SET groups = ? WHERE id = ?", - [json.dumps(["TestGroup"]), "analyst1"], - ) - conn.execute( - "INSERT INTO users (id, email, name, role, groups) VALUES (?, ?, ?, ?, ?)", - ["nogroups1", "nobody@test.local", "Nobody", "analyst", None], - ) + users.create(id="nogroups1", email="nobody@test.local", name="Nobody", role="analyst") + # System groups are seeded by db.init_schema(); look them up. ug = UserGroupsRepository(conn) - tg = ug.create(name="TestGroup", description="granted plug-y only") - # Seed Admin / Everyone system groups the same way the app does at startup ug.ensure_system("Admin", "system") ug.ensure_system("Everyone", "system") - access = PluginAccessRepository(conn) - access.grant(tg["id"], "mkt-b", "plug-y") + admin_gid = conn.execute("SELECT id FROM user_groups WHERE name='Admin'").fetchone()[0] + + # Create a custom group for the analyst + tg = ug.create(name="TestGroup", description="granted plug-y only") + test_group_gid = tg["id"] + + # Assign memberships + ugm = UserGroupMembersRepository(conn) + ugm.add_member("admin1", admin_gid, source="system_seed") + ugm.add_member("analyst1", test_group_gid, source="admin") + # nogroups1 is only in Everyone (auto-membership, no explicit row needed) + + # Grant plugins via resource_grants + rg = ResourceGrantsRepository(conn) + # Admin group gets all 3 plugins + rg.create(group_id=admin_gid, resource_type="marketplace_plugin", resource_id="mkt-a/plug-x") + rg.create(group_id=admin_gid, resource_type="marketplace_plugin", resource_id="mkt-b/plug-y") + rg.create(group_id=admin_gid, resource_type="marketplace_plugin", resource_id="mkt-b/plug-z") + # TestGroup gets only plug-y + rg.create(group_id=test_group_gid, resource_type="marketplace_plugin", resource_id="mkt-b/plug-y") finally: conn.close() @@ -139,7 +143,7 @@ class TestMarketplaceInfo: info = resp.json() names = {p["name"] for p in info["plugins"]} assert names == {"mkt-a-plug-x", "mkt-b-plug-y", "mkt-b-plug-z"} - assert info["groups"] == ["Admin"] + assert "Admin" in info["groups"] assert info["marketplace_name"] == "agnes" assert info["plugin_count"] == 3 @@ -150,7 +154,7 @@ class TestMarketplaceInfo: info = resp.json() names = {p["name"] for p in info["plugins"]} assert names == {"mkt-b-plug-y"} - assert info["groups"] == ["TestGroup"] + assert "TestGroup" in info["groups"] def test_user_with_no_groups_falls_back_to_everyone(self, marketplace_env): """Everyone has no grants here, so the list is empty but call succeeds.""" @@ -158,7 +162,7 @@ class TestMarketplaceInfo: resp = c.get("/marketplace/info", headers=_auth(marketplace_env["nogroups_token"])) assert resp.status_code == 200 info = resp.json() - assert info["groups"] == ["Everyone"] + assert "Everyone" in info["groups"] assert info["plugins"] == [] def test_missing_auth_returns_401(self, marketplace_env): @@ -216,6 +220,8 @@ class TestMarketplaceZip: assert second.content == b"" def test_etag_changes_when_content_changes(self, marketplace_env): + from app.marketplace_server.packager import invalidate_etag_cache + c = marketplace_env["client"] headers = _auth(marketplace_env["admin_token"]) first = c.get("/marketplace.zip", headers=headers) @@ -225,6 +231,10 @@ class TestMarketplaceZip: target = marketplace_env["data_dir"] / "marketplaces" / "mkt-a" / "plugins" / "plug-x" / "CLAUDE.md" target.write_text("# plug-x\nMUTATED\n", encoding="utf-8") + # Invalidate the in-process ETag cache so the next request + # re-hashes from disk instead of returning the stale cached value. + invalidate_etag_cache() + second = c.get("/marketplace.zip", headers=headers) etag2 = second.headers["etag"] assert etag1 != etag2 @@ -233,3 +243,59 @@ class TestMarketplaceZip: c = marketplace_env["client"] resp = c.get("/marketplace.zip") assert resp.status_code == 401 + + # --- New tests for ETag + auth coverage --- + + def test_zip_returns_correct_content_with_etag_header(self, marketplace_env): + """GET /marketplace.zip returns ZIP body with a valid ETag header.""" + c = marketplace_env["client"] + headers = _auth(marketplace_env["admin_token"]) + resp = c.get("/marketplace.zip", headers=headers) + assert resp.status_code == 200 + assert resp.headers["content-type"] == "application/zip" + etag = resp.headers["etag"] + assert etag.startswith('"') and etag.endswith('"') + # ETag is a 16-char hex string (sha256 prefix) + etag_val = etag.strip('"') + assert len(etag_val) == 16 + # Body is a valid ZIP + with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: + assert ".claude-plugin/marketplace.json" in zf.namelist() + + def test_if_none_match_returns_full_content_when_changed(self, marketplace_env): + """GET /marketplace.zip with a stale If-None-Match returns full content.""" + c = marketplace_env["client"] + headers = _auth(marketplace_env["admin_token"]) + first = c.get("/marketplace.zip", headers=headers) + stale_etag = "0000000000000000" # definitely wrong + second = c.get( + "/marketplace.zip", + headers={**headers, "If-None-Match": f'"{stale_etag}"'}, + ) + assert second.status_code == 200 + assert len(second.content) > 0 + # The returned ETag is the real one, not the stale one + assert second.headers["etag"].strip('"') != stale_etag + + def test_zip_requires_pat_authentication(self, marketplace_env): + """GET /marketplace.zip without any auth returns 401.""" + c = marketplace_env["client"] + resp = c.get("/marketplace.zip") + assert resp.status_code == 401 + + def test_zip_with_invalid_token_returns_401(self, marketplace_env): + """GET /marketplace.zip with a garbage Bearer token returns 401.""" + c = marketplace_env["client"] + resp = c.get("/marketplace.zip", headers={"Authorization": "Bearer invalid-token"}) + assert resp.status_code == 401 + + def test_if_none_match_with_wrong_etag_returns_full_zip(self, marketplace_env): + """If-None-Match with a non-matching ETag returns 200 + full ZIP.""" + c = marketplace_env["client"] + headers = _auth(marketplace_env["admin_token"]) + resp = c.get( + "/marketplace.zip", + headers={**headers, "If-None-Match": '"wrong-etag-value"'}, + ) + assert resp.status_code == 200 + assert resp.headers["content-type"] == "application/zip" diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 74b9cd0..0f061e1 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -550,3 +550,130 @@ class TestBQMetadataAuth: "metadata" in r.message.lower() and r.levelname == "ERROR" for r in caplog.records ), f"expected ERROR-level log mentioning metadata; got: {[(r.levelname, r.message) for r in caplog.records]}" + + +# --------------------------------------------------------------------------- +# Orchestrator failure mode tests +# --------------------------------------------------------------------------- + + +class TestOrchestratorFailureModes: + """Tests for how the orchestrator handles corrupted or partial extract.duckdb files.""" + + def test_corrupted_extract_duckdb_skipped_not_crashed(self, setup_env): + """A corrupted extract.duckdb should be skipped (with a warning) and + not crash the orchestrator. Other sources should still be processed.""" + from src.orchestrator import SyncOrchestrator + + # Create a corrupted extract.duckdb + corrupt_dir = setup_env["extracts_dir"] / "corrupt_source" + corrupt_dir.mkdir() + db_path = corrupt_dir / "extract.duckdb" + db_path.write_bytes(b"this is not a valid duckdb file!!!") + + # Also create a valid source + _create_mock_extract( + setup_env["extracts_dir"], + "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # Corrupt source should not appear + assert "corrupt_source" not in result + # Valid source should still be processed + assert "keboola" in result + + def test_empty_extract_duckdb_skipped(self, setup_env): + """An extract.duckdb with _meta but no rows should be handled gracefully.""" + from src.orchestrator import SyncOrchestrator + + source_dir = setup_env["extracts_dir"] / "empty_source" + source_dir.mkdir() + (source_dir / "data").mkdir() + + db_path = source_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""") + # No rows in _meta + conn.close() + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # Empty source is omitted from result (no valid tables to expose) + assert "empty_source" not in result + + def test_extract_duckdb_with_only_failed_tables(self, setup_env): + """An extract.duckdb where all tables have unsafe names should produce + no views in the analytics DB.""" + from src.orchestrator import SyncOrchestrator + + source_dir = setup_env["extracts_dir"] / "bad_names" + source_dir.mkdir() + (source_dir / "data").mkdir() + + db_path = source_dir / "extract.duckdb" + conn = duckdb.connect(str(db_path)) + conn.execute("""CREATE TABLE _meta ( + table_name VARCHAR, description VARCHAR, rows BIGINT, + size_bytes BIGINT, extracted_at TIMESTAMP, + query_mode VARCHAR DEFAULT 'local' + )""") + # All unsafe names + conn.execute("INSERT INTO _meta VALUES ('bad-name', '', 0, 0, current_timestamp, 'local')") + conn.execute("INSERT INTO _meta VALUES ('also bad', '', 0, 0, current_timestamp, 'local')") + conn.close() + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # Source is omitted from result (all table names failed validation) + assert "bad_names" not in result + + def test_mid_write_extract_duckdb_handled_gracefully(self, setup_env): + """If an extractor is mid-write (tmp file exists but hasn't been + swapped yet), the orchestrator should not crash.""" + from src.orchestrator import SyncOrchestrator + + source_dir = setup_env["extracts_dir"] / "midwrite" + source_dir.mkdir() + # Only a .tmp file — no extract.duckdb yet + tmp = source_dir / "extract.duckdb.tmp" + tmp.write_bytes(b"partial write in progress") + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + # midwrite source is omitted (no extract.duckdb) + assert "midwrite" not in result + + def test_multiple_corrupted_sources_do_not_block_others(self, setup_env): + """Multiple corrupted sources should not prevent valid ones from being processed.""" + from src.orchestrator import SyncOrchestrator + + # Create two corrupted sources + for name in ["corrupt_a", "corrupt_b"]: + d = setup_env["extracts_dir"] / name + d.mkdir() + (d / "extract.duckdb").write_bytes(b"garbage " + name.encode()) + + # And a valid one + _create_mock_extract( + setup_env["extracts_dir"], + "keboola", + [{"name": "orders", "data": [{"id": "1"}]}], + ) + + orch = SyncOrchestrator(analytics_db_path=setup_env["analytics_db"]) + result = orch.rebuild() + + assert "corrupt_a" not in result + assert "corrupt_b" not in result + assert "keboola" in result diff --git a/tests/test_pat.py b/tests/test_pat.py index 2f5902e..81b4f2e 100644 --- a/tests/test_pat.py +++ b/tests/test_pat.py @@ -699,3 +699,94 @@ def test_pat_null_expiry_end_to_end_allows_authenticated_request(fresh_db): listed = client.get("/auth/tokens", headers={"Authorization": f"Bearer {pat}"}) assert listed.status_code == 200, listed.text assert any(row["name"] == "forever" for row in listed.json()) + + +class TestPATMalformedToken: + """Tests for malformed and edge-case PAT tokens.""" + + def test_malformed_jwt_rejected(self, fresh_db): + """A completely malformed JWT string must be rejected with 401.""" + from fastapi.testclient import TestClient + from app.main import app + + client = TestClient(app) + resp = client.get( + "/api/users", + headers={"Authorization": "Bearer not.a.valid.jwt", "Accept": "application/json"}, + ) + assert resp.status_code == 401 + + def test_random_string_rejected(self, fresh_db): + """A random string (not JWT format) must be rejected with 401.""" + from fastapi.testclient import TestClient + from app.main import app + + client = TestClient(app) + resp = client.get( + "/api/users", + headers={"Authorization": "Bearer totally-random-garbage", "Accept": "application/json"}, + ) + assert resp.status_code == 401 + + def test_empty_bearer_rejected(self, fresh_db): + """An empty Bearer token must be rejected with 401.""" + from fastapi.testclient import TestClient + from app.main import app + + client = TestClient(app) + resp = client.get( + "/api/users", + headers={"Authorization": "Bearer ", "Accept": "application/json"}, + ) + assert resp.status_code in (401, 403) + + def test_pat_last_used_ip_updated(self, fresh_db): + """Successful PAT use must update last_used_ip in the DB.""" + from fastapi.testclient import TestClient + import hashlib, uuid + from datetime import datetime, timezone, timedelta + from src.db import get_system_db, close_system_db + from src.repositories.users import UserRepository + from src.repositories.access_tokens import AccessTokenRepository + from app.auth.jwt import create_access_token + from app.main import app + + conn = get_system_db() + try: + uid = str(uuid.uuid4()) + UserRepository(conn).create(id=uid, email="ip@t", name="IP", role="admin") + from tests.helpers.auth import grant_admin + grant_admin(conn, uid) + tid = str(uuid.uuid4()) + pat = create_access_token( + user_id=uid, email="ip@t", role="admin", token_id=tid, typ="pat", + expires_delta=timedelta(days=90), + ) + AccessTokenRepository(conn).create( + id=tid, user_id=uid, name="ip-test", + token_hash=hashlib.sha256(pat.encode()).hexdigest(), + prefix=tid.replace("-", "")[:8], + expires_at=datetime.now(timezone.utc) + timedelta(days=90), + ) + finally: + conn.close() + close_system_db() + + client = TestClient(app) + resp = client.get( + "/api/users", + headers={ + "Authorization": f"Bearer {pat}", + "Accept": "application/json", + "X-Forwarded-For": "10.20.30.40", + }, + ) + assert resp.status_code == 200, resp.text + + conn = get_system_db() + try: + row = AccessTokenRepository(conn).get_by_id(tid) + assert row["last_used_ip"] == "10.20.30.40", "last_used_ip should be updated" + finally: + conn.close() + close_system_db() diff --git a/tests/test_remote_query.py b/tests/test_remote_query.py index 992462a..d385757 100644 --- a/tests/test_remote_query.py +++ b/tests/test_remote_query.py @@ -288,3 +288,256 @@ class TestValidateBqSql: """Valid read-only BQ queries must pass.""" # Should not raise _validate_bq_sql(sql) + + + +# --------------------------------------------------------------------------- +# Hybrid Query BigQuery integration tests (mocked BQ client) +# --------------------------------------------------------------------------- + + +class TestHybridQueryBigQuery: + """Tests for the two-phase BQ registration + DuckDB execution flow. + + These test the RemoteQueryEngine's register_bq + execute pipeline + with a mocked BQ client, simulating the /api/query/hybrid endpoint. + """ + + def test_register_bq_creates_temporary_view_in_duckdb(self, analytics_conn): + """register_bq parameter creates a temporary view in DuckDB that is + queryable via the registered alias.""" + arrow_table = pa.table( + { + "date": pa.array(["2026-01-01", "2026-01-15"], type=pa.utf8()), + "views": pa.array([100, 200], type=pa.int64()), + } + ) + mock_client = _make_bq_mock(arrow_table) + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + ) + + result = engine.register_bq("traffic", "SELECT date, views FROM bq.traffic") + assert result["alias"] == "traffic" + assert result["rows"] == 2 + + # The alias is queryable from DuckDB as a view/table + rows = analytics_conn.execute("SELECT views FROM traffic ORDER BY views").fetchall() + assert rows[0][0] == 100 + assert rows[1][0] == 200 + + def test_sql_query_can_join_local_table_with_registered_bq(self, analytics_conn): + """SQL query can JOIN local table with registered BQ result.""" + # Local orders table already exists from fixture + bq_arrow = pa.table( + { + "date": pa.array(["2026-01-01", "2026-01-15"], type=pa.utf8()), + "views": pa.array([50, 75], type=pa.int64()), + } + ) + mock_client = _make_bq_mock(bq_arrow) + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + ) + engine.register_bq("traffic", "SELECT date, views FROM bq.traffic") + + result = engine.execute( + "SELECT o.id, o.amount, t.views " + "FROM orders o JOIN traffic t ON o.date = t.date " + "ORDER BY o.id" + ) + + assert result["row_count"] == 2 + assert "views" in result["columns"] + assert "amount" in result["columns"] + # Verify the join produced correct data + assert result["rows"][0][2] == 50 # views for 2026-01-01 + assert result["rows"][1][2] == 75 # views for 2026-01-15 + + def test_multiple_register_bq_parameters_simultaneously(self, analytics_conn): + """Multiple register_bq parameters work simultaneously — each creates + an independent view that can be joined together.""" + traffic_arrow = pa.table( + { + "date": pa.array(["2026-01-01", "2026-01-15"], type=pa.utf8()), + "views": pa.array([100, 200], type=pa.int64()), + } + ) + revenue_arrow = pa.table( + { + "date": pa.array(["2026-01-01", "2026-01-15"], type=pa.utf8()), + "revenue": pa.array([1000.0, 2000.0], type=pa.float64()), + } + ) + + # First call returns count + data for traffic, second for revenue + traffic_count = pa.table({"count": pa.array([2], type=pa.int64())}) + revenue_count = pa.table({"count": pa.array([2], type=pa.int64())}) + + traffic_count_job = MagicMock() + traffic_count_job.to_arrow.return_value = traffic_count + traffic_data_job = MagicMock() + traffic_data_job.to_arrow.return_value = traffic_arrow + + revenue_count_job = MagicMock() + revenue_count_job.to_arrow.return_value = revenue_count + revenue_data_job = MagicMock() + revenue_data_job.to_arrow.return_value = revenue_arrow + + mock_client = MagicMock() + mock_client.query.side_effect = [ + traffic_count_job, traffic_data_job, + revenue_count_job, revenue_data_job, + ] + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + ) + + engine.register_bq("traffic", "SELECT date, views FROM bq.traffic") + engine.register_bq("revenue", "SELECT date, revenue FROM bq.revenue") + + result = engine.execute( + "SELECT t.date, t.views, r.revenue " + "FROM traffic t JOIN revenue r ON t.date = r.date " + "ORDER BY t.views" + ) + + assert result["row_count"] == 2 + assert set(result["columns"]) == {"date", "views", "revenue"} + + def test_invalid_bq_sql_returns_meaningful_error(self, analytics_conn): + """Invalid BQ SQL (blocked keyword) returns RemoteQueryError with + error_type='query_error'.""" + engine = RemoteQueryEngine(analytics_conn) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("bad", "DROP TABLE important_data") + + assert exc_info.value.error_type == "query_error" + assert "blocked" in str(exc_info.value).lower() or "drop" in str(exc_info.value).lower() + + def test_missing_bigquery_credentials_returns_proper_error(self, analytics_conn): + """Missing BigQuery credentials (no BIGQUERY_PROJECT env var, no + google-cloud-bigquery installed) returns RemoteQueryError, not crash.""" + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=None, # No factory → tries default import + ) + + with patch.dict(sys.modules, { + "google": None, "google.cloud": None, "google.cloud.bigquery": None, + }): + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("bq_data", "SELECT 1") + + assert exc_info.value.error_type == "bq_error" + # Should mention the missing package or config, not a raw traceback + detail = str(exc_info.value).lower() + assert "bigquery" in detail or "google" in detail + + def test_bq_query_error_returns_meaningful_error(self, analytics_conn): + """When the BQ client raises an exception during query, the engine + wraps it in RemoteQueryError with error_type='bq_error'.""" + mock_client = MagicMock() + mock_client.query.side_effect = Exception("Connection refused") + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + ) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("bq_data", "SELECT 1 FROM dataset.table") + + assert exc_info.value.error_type == "bq_error" + assert "connection refused" in str(exc_info.value).lower() + + def test_bq_count_precheck_failure_returns_bq_error(self, analytics_conn): + """When the BQ COUNT(*) pre-check fails, the engine returns + RemoteQueryError with error_type='bq_error'.""" + mock_client = MagicMock() + count_job = MagicMock() + count_job.to_arrow.side_effect = Exception("Permission denied") + mock_client.query.return_value = count_job + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + ) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("bq_data", "SELECT 1 FROM dataset.table") + + assert exc_info.value.error_type == "bq_error" + + def test_bq_row_limit_exceeded_returns_row_limit_error(self, analytics_conn): + """When BQ result exceeds max_bq_registration_rows, returns + RemoteQueryError with error_type='row_limit'.""" + arrow_table = pa.table({"x": pa.array([1], type=pa.int64())}) + mock_client = _make_bq_mock(arrow_table, count_value=999_999) + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + max_bq_registration_rows=500_000, + ) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("big_data", "SELECT * FROM huge_table") + + assert exc_info.value.error_type == "row_limit" + assert exc_info.value.details["count"] == 999_999 + + def test_bq_memory_limit_exceeded_returns_memory_limit_error(self, analytics_conn): + """When the Arrow table exceeds max_memory_mb, returns + RemoteQueryError with error_type='memory_limit'.""" + # Create a table that reports a large nbytes + big_arrow = pa.table( + {"x": pa.array([1] * 1000, type=pa.int64())} + ) + mock_client = _make_bq_mock(big_arrow) + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + max_memory_mb=0.001, # tiny limit → guaranteed exceed + ) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("big_data", "SELECT * FROM wide_table") + + assert exc_info.value.error_type == "memory_limit" + + def test_hybrid_query_execute_error_returns_query_error(self, analytics_conn): + """When the final DuckDB SQL execution fails, returns + RemoteQueryError with error_type='query_error'.""" + engine = RemoteQueryEngine(analytics_conn) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.execute("SELECT * FROM nonexistent_table") + + assert exc_info.value.error_type == "query_error" + + def test_reserved_alias_rejected(self, analytics_conn): + """Reserved aliases (information_schema, main, etc.) are rejected.""" + engine = RemoteQueryEngine(analytics_conn) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("information_schema", "SELECT 1") + + assert exc_info.value.error_type == "query_error" + + def test_invalid_alias_rejected(self, analytics_conn): + """Aliases that aren't valid SQL identifiers are rejected.""" + engine = RemoteQueryEngine(analytics_conn) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("bad alias!", "SELECT 1") + + assert exc_info.value.error_type == "query_error"