From 017cf076740d547968c5cf5e6518cc75559fb8c7 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 10:52:39 +0200 Subject: [PATCH 01/14] docs: add design spec for remote query (extension re-attach + two-phase BQ) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../specs/2026-04-11-remote-query-design.md | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-11-remote-query-design.md diff --git a/docs/superpowers/specs/2026-04-11-remote-query-design.md b/docs/superpowers/specs/2026-04-11-remote-query-design.md new file mode 100644 index 0000000..29af622 --- /dev/null +++ b/docs/superpowers/specs/2026-04-11-remote-query-design.md @@ -0,0 +1,180 @@ +# Remote Query — Design Spec + +**Date:** 2026-04-11 +**Status:** Approved +**Scope:** Fix extension re-attach + two-phase remote query engine + +## Context + +BigQuery remote views created by the orchestrator don't work at query time because `get_analytics_db_readonly()` opens a fresh connection without re-loading the BigQuery extension. Additionally, the platform lacks the ability to run hybrid queries that JOIN local Parquet data with on-demand BigQuery subquery results. + +The `padak/tmp_oss` v1 repo has `src/remote_query.py` with a two-phase protocol. The existing `scripts/duckdb_manager.py` in this repo already has `register_bq_table()` and `_create_bq_client()` helper functions. The `table_registry` already supports `query_mode` values: `local`, `remote`, `hybrid`. + +**Primary user:** Claude Code agent running `da query` locally, or API consumers via `POST /api/query/hybrid`. + +--- + +## Part 1: Fix Extension Re-attach + +### Problem + +`get_analytics_db_readonly()` in `src/db.py` opens analytics.duckdb in read-only mode and ATTACHes extract.duckdb files, but does NOT re-load extensions referenced in `_remote_attach` tables. BigQuery remote views fail with "Catalog Error: bq not found". + +### Solution + +After ATTACHing extract.duckdb files in `get_analytics_db_readonly()`, scan each for a `_remote_attach` table. For each record: + +1. `LOAD {extension}` — loads pre-installed extension from disk (no INSTALL needed in read-only mode; orchestrator pre-installs during rebuild) +2. `ATTACH '{url}' AS {alias} (TYPE {extension}, READ_ONLY)` — re-attaches the remote source + +If LOAD fails (extension not installed), log a warning and continue — local views still work. + +### Changes + +**File:** `src/db.py` — `get_analytics_db_readonly()` function + +Add ~20 lines after the existing extract.duckdb ATTACH loop. Read `_remote_attach` table from each attached extract DB, collect unique (alias, extension, url, token_env) tuples, and re-attach. + +Pattern follows `src/orchestrator.py:_attach_remote_extensions()` but simplified for read-only context (no INSTALL, just LOAD + ATTACH). + +--- + +## Part 2: Two-Phase Remote Query Engine + +### Architecture + +New module `src/remote_query.py` with a `RemoteQueryEngine` class: + +```python +class RemoteQueryEngine: + def __init__(self, conn: duckdb.DuckDBPyConnection): + """Takes an existing DuckDB connection (analytics.duckdb with local views).""" + + def register_bq(self, alias: str, bq_sql: str) -> dict: + """Execute BQ subquery, register result as in-memory DuckDB view. + Returns {alias, rows, columns, memory_mb}. + Raises RemoteQueryError on safety limit violation.""" + + def execute(self, sql: str) -> dict: + """Execute final DuckDB query against local + registered BQ views. + Returns {columns: [...], rows: [...], row_count: int, truncated: bool}.""" +``` + +### Two-Phase Flow + +1. **Phase 1 — BQ Registration:** For each `register_bq(alias, bq_sql)` call: + - COUNT(*) pre-check via Python BQ client → reject if >max_bq_rows + - Memory estimate: ~50 bytes/cell × rows × cols → reject if >max_memory_mb + - Execute BQ query → `job.to_arrow()` → `conn.register(alias, arrow_table)` + - Uses `scripts/duckdb_manager.py:_create_bq_client()` for client creation and `register_bq_table()` logic (reuse, not reimplement) + +2. **Phase 2 — DuckDB Query:** Execute final SQL against all views (local Parquet + registered BQ Arrow tables). Apply max_result_rows limit. + +### Safety Limits + +Configurable in `config/instance.yaml` under `remote_query:`: + +```yaml +remote_query: + max_bq_rows: 500000 # max rows from a single BQ subquery + max_memory_mb: 2048 # max estimated memory for BQ result + max_result_rows: 100000 # max rows in final result + timeout_seconds: 300 # BQ query timeout +``` + +Defaults are hardcoded in `RemoteQueryEngine` and overridden by instance config. + +### Error Handling + +Custom `RemoteQueryError` exception with structured error: + +```python +class RemoteQueryError(Exception): + def __init__(self, message: str, error_type: str, details: dict = None): + # error_type: "row_limit", "memory_limit", "bq_error", "query_error", "timeout" +``` + +### CLI: `da query` Extension + +Extend existing `cli/commands/query.py`: + +``` +da query --sql "SELECT o.*, t.views FROM orders o JOIN traffic t ON o.date = t.date" \ + --register-bq "traffic=SELECT date, SUM(views) as views FROM dataset.web WHERE date > '2026-01-01' GROUP BY 1" +``` + +- Multiple `--register-bq` flags allowed (one per BQ alias) +- Format: `"alias=BQ_SQL"` (split on first `=`) +- `--stdin` mode: reads JSON from stdin for complex SQL: + ```json + {"register_bq": {"traffic": "SELECT ..."}, "sql": "SELECT ..."} + ``` +- Output formats: `table` (default), `csv`, `json` + +### API: `POST /api/query/hybrid` + +``` +POST /api/query/hybrid +Authorization: Bearer + +{ + "register_bq": { + "traffic": "SELECT date, SUM(views) FROM dataset.web WHERE date > '2026-01-01' GROUP BY 1" + }, + "sql": "SELECT o.*, t.views FROM orders o JOIN traffic t ON o.date = t.date", + "format": "json" +} +``` + +**Response:** +```json +{ + "columns": ["order_id", "date", "views"], + "rows": [...], + "row_count": 1234, + "truncated": false, + "bq_stats": { + "traffic": {"rows": 365, "columns": 2, "memory_mb": 0.03} + } +} +``` + +**Auth:** `require_admin` — BQ queries cost money, only admins can trigger them. + +**Validation:** `register_bq` SQL strings are validated as SELECT-only (no INSERT/UPDATE/DELETE/DROP). + +--- + +## Implementation Summary + +### New Files + +| File | Purpose | +|---|---| +| `src/remote_query.py` | `RemoteQueryEngine` class + `RemoteQueryError` | +| `app/api/query_hybrid.py` | `POST /api/query/hybrid` endpoint | +| `tests/test_remote_query.py` | Engine unit tests (mocked BQ client) | + +### Modified Files + +| File | Changes | +|---|---| +| `src/db.py` | `get_analytics_db_readonly()` — add extension re-attach from `_remote_attach` | +| `cli/commands/query.py` | Add `--register-bq` and `--stdin` flags | +| `app/main.py` | Register hybrid query router | +| `CLAUDE.md` | Document hybrid query usage | + +### Implementation Order + +1. Fix extension re-attach in `src/db.py` (unblocks remote views) +2. `RemoteQueryEngine` in `src/remote_query.py` (core logic) +3. CLI extension `--register-bq` +4. API endpoint `POST /api/query/hybrid` +5. CLAUDE.md update + integration tests + +### Test Coverage + +- `tests/test_remote_query.py` — engine tests with mocked BQ client (safety limits, registration, error handling) +- `tests/test_db.py` — extension re-attach test (mock _remote_attach table) +- `tests/test_api.py` — hybrid query endpoint (auth, validation) +- `tests/test_cli.py` — `--register-bq` flag parsing From eb68e6292d7fbdeed745b8844b0d2910ff2436f1 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 10:58:25 +0200 Subject: [PATCH 02/14] docs: fix remote query spec after code review - Address read-only LOAD uncertainty with verification step + workaround - Clarify register_bq wraps BQ logic (not delegates to register_bq_table) - Use existing max_bq_registration_rows config key name - Apply SQL blocklist to both register_bq and final sql - Define connection lifecycle (caller owns, try/finally) - Fix CLI argument handling (optional positional + --sql flag) - Document concurrency safety (Unix inode semantics) - Handle missing google-cloud-bigquery gracefully Co-Authored-By: Claude Opus 4.6 (1M context) --- .../specs/2026-04-11-remote-query-design.md | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/docs/superpowers/specs/2026-04-11-remote-query-design.md b/docs/superpowers/specs/2026-04-11-remote-query-design.md index 29af622..f6141c4 100644 --- a/docs/superpowers/specs/2026-04-11-remote-query-design.md +++ b/docs/superpowers/specs/2026-04-11-remote-query-design.md @@ -22,20 +22,26 @@ The `padak/tmp_oss` v1 repo has `src/remote_query.py` with a two-phase protocol. ### Solution -After ATTACHing extract.duckdb files in `get_analytics_db_readonly()`, scan each for a `_remote_attach` table. For each record: +After ATTACHing extract.duckdb files in `get_analytics_db_readonly()`, scan each for a `_remote_attach` table. For each record, re-load the extension and re-attach the remote source. -1. `LOAD {extension}` — loads pre-installed extension from disk (no INSTALL needed in read-only mode; orchestrator pre-installs during rebuild) -2. `ATTACH '{url}' AS {alias} (TYPE {extension}, READ_ONLY)` — re-attaches the remote source +**Important: DuckDB read-only LOAD behavior.** The `read_only=True` flag on `duckdb.connect()` blocks writes to the DB file, but `LOAD` writes to the extension cache in `~/.duckdb/extensions/` (separate from the DB file). This should work, but MUST be empirically verified as the first implementation step. If LOAD fails in read-only mode, the workaround is to open the analytics DB WITHOUT `read_only=True` but still use read-only SQL patterns (no INSERT/UPDATE/DELETE), or to call `LOAD` on a separate in-memory connection first (DuckDB extension cache is process-wide). -If LOAD fails (extension not installed), log a warning and continue — local views still work. +Steps for each `_remote_attach` record: +1. `LOAD {extension}` — loads pre-installed extension from disk +2. Read token from `os.environ[token_env]` if `token_env` is non-empty +3. `ATTACH '{url}' AS {alias} (TYPE {extension}, READ_ONLY)` — with TOKEN if needed + +If LOAD or ATTACH fails, log a warning and continue — local views still work. ### Changes **File:** `src/db.py` — `get_analytics_db_readonly()` function -Add ~20 lines after the existing extract.duckdb ATTACH loop. Read `_remote_attach` table from each attached extract DB, collect unique (alias, extension, url, token_env) tuples, and re-attach. +Add ~25 lines after the existing extract.duckdb ATTACH loop. Read `_remote_attach` table from each attached extract DB, collect unique (alias, extension, url, token_env) tuples, and re-attach. -Pattern follows `src/orchestrator.py:_attach_remote_extensions()` but simplified for read-only context (no INSTALL, just LOAD + ATTACH). +Pattern follows `src/orchestrator.py:_attach_remote_extensions()` but simplified (no INSTALL — orchestrator pre-installs during rebuild). + +**Concurrency note:** If the orchestrator runs `_atomic_swap_db()` while a read-only connection is open, the existing connection holds a file descriptor to the old inode (Unix semantics). This is safe — the old data remains accessible until the connection is closed. --- @@ -63,10 +69,12 @@ class RemoteQueryEngine: ### Two-Phase Flow 1. **Phase 1 — BQ Registration:** For each `register_bq(alias, bq_sql)` call: - - COUNT(*) pre-check via Python BQ client → reject if >max_bq_rows - - Memory estimate: ~50 bytes/cell × rows × cols → reject if >max_memory_mb + - COUNT(*) pre-check via Python BQ client → reject if >max_bq_registration_rows + - Memory estimate: ~50 bytes/cell × rows × cols → reject if >max_memory_mb. Note: this is approximate. After query completes, use `arrow_table.nbytes` for accurate reporting in `bq_stats`. - Execute BQ query → `job.to_arrow()` → `conn.register(alias, arrow_table)` - - Uses `scripts/duckdb_manager.py:_create_bq_client()` for client creation and `register_bq_table()` logic (reuse, not reimplement) + - Uses `scripts/duckdb_manager.py:_create_bq_client()` for BQ client creation (reuse) + - Does NOT delegate to `register_bq_table()` directly — `RemoteQueryEngine.register_bq()` wraps BQ query execution with its own pre-check logic (COUNT, memory estimate), then calls `conn.register(alias, arrow_table)`. The existing `register_bq_table()` has no pre-check capability and would need signature changes to add one. Wrapping is cleaner than modifying shared code. + - Gracefully handle missing `google-cloud-bigquery` package: catch `ImportError` and raise `RemoteQueryError(error_type="bq_error", message="google-cloud-bigquery not installed")` 2. **Phase 2 — DuckDB Query:** Execute final SQL against all views (local Parquet + registered BQ Arrow tables). Apply max_result_rows limit. @@ -76,12 +84,14 @@ Configurable in `config/instance.yaml` under `remote_query:`: ```yaml remote_query: - max_bq_rows: 500000 # max rows from a single BQ subquery - max_memory_mb: 2048 # max estimated memory for BQ result - max_result_rows: 100000 # max rows in final result - timeout_seconds: 300 # BQ query timeout + max_bq_registration_rows: 500000 # max rows from a single BQ subquery (matches existing instance.yaml.example key) + max_memory_mb: 2048 # max estimated memory for BQ result + max_result_rows: 100000 # max rows in final result + timeout_seconds: 300 # BQ query timeout ``` +Note: `max_bq_registration_rows` matches the key already documented in `config/instance.yaml.example`. + Defaults are hardcoded in `RemoteQueryEngine` and overridden by instance config. ### Error Handling @@ -111,6 +121,8 @@ da query --sql "SELECT o.*, t.views FROM orders o JOIN traffic t ON o.date = t.d ``` - Output formats: `table` (default), `csv`, `json` +**CLI argument handling:** The existing `query_command` has `sql` as a required positional argument. When `--register-bq` is used, `sql` should be provided via `--sql` flag instead (named option, not positional). When `--stdin` is used, both `sql` and `register_bq` come from stdin JSON. Make `sql` an optional positional (`typer.Argument(None)`) and validate that exactly one of (positional sql, --sql flag, --stdin) is provided. + ### API: `POST /api/query/hybrid` ``` @@ -141,7 +153,20 @@ Authorization: Bearer **Auth:** `require_admin` — BQ queries cost money, only admins can trigger them. -**Validation:** `register_bq` SQL strings are validated as SELECT-only (no INSERT/UPDATE/DELETE/DROP). +**Validation — both `register_bq` SQL and final `sql`:** +- Apply the same SQL blocklist from `app/api/query.py` (blocks LOAD, ATTACH, INSTALL, read_parquet with paths, path traversal patterns, etc.) +- `register_bq` SQL additionally validated as SELECT-only (no INSERT/UPDATE/DELETE/DROP) +- Reuse the existing `_validate_sql()` helper from `app/api/query.py` (extract to shared utility if needed) + +**Connection lifecycle:** The API endpoint owns the connection. Pattern: +```python +analytics = get_analytics_db_readonly() +try: + engine = RemoteQueryEngine(analytics) + # ... register_bq + execute +finally: + analytics.close() +``` --- From 816168f96b1ed2903f268945fdccd1f2eae4c60b Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:02:04 +0200 Subject: [PATCH 03/14] docs: add remote query implementation plan (5 tasks) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plans/2026-04-11-remote-query.md | 923 ++++++++++++++++++ 1 file changed, 923 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-11-remote-query.md diff --git a/docs/superpowers/plans/2026-04-11-remote-query.md b/docs/superpowers/plans/2026-04-11-remote-query.md new file mode 100644 index 0000000..7f6fd1f --- /dev/null +++ b/docs/superpowers/plans/2026-04-11-remote-query.md @@ -0,0 +1,923 @@ +# Remote Query Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Fix BigQuery extension re-attach so remote views work, then add a two-phase query engine that JOINs local Parquet data with on-demand BigQuery subquery results. + +**Architecture:** Part 1 patches `get_analytics_db_readonly()` to re-load extensions from `_remote_attach` tables. Part 2 adds `RemoteQueryEngine` that wraps BQ client with safety limits (COUNT pre-check, memory estimation), registers Arrow results in DuckDB, then executes the final SQL. Exposed via `da query --register-bq` CLI and `POST /api/query/hybrid` API. + +**Tech Stack:** DuckDB, google-cloud-bigquery, PyArrow, FastAPI, Typer + +**Spec:** `docs/superpowers/specs/2026-04-11-remote-query-design.md` + +--- + +### Task 1: Fix Extension Re-attach in `get_analytics_db_readonly()` + +**Files:** +- Modify: `src/db.py:253-282` (get_analytics_db_readonly) +- Test: `tests/test_db.py` + +- [ ] **Step 1: Write failing test** + +Add to `tests/test_db.py`: + +```python +class TestExtensionReattach: + def test_reads_remote_attach_table(self, tmp_path, monkeypatch): + """Verify get_analytics_db_readonly() attempts to load extensions from _remote_attach.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + import duckdb + + # Create analytics DB + analytics_dir = tmp_path / "analytics" + analytics_dir.mkdir() + conn = duckdb.connect(str(analytics_dir / "server.duckdb")) + conn.close() + + # Create an extract.duckdb with a _remote_attach table + ext_dir = tmp_path / "extracts" / "testbq" + ext_dir.mkdir(parents=True) + ext_conn = duckdb.connect(str(ext_dir / "extract.duckdb")) + ext_conn.execute(""" + CREATE TABLE _remote_attach ( + alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR + ) + """) + ext_conn.execute( + "INSERT INTO _remote_attach VALUES ('bq', 'bigquery', 'project=test', '')" + ) + ext_conn.close() + + from src.db import get_analytics_db_readonly + # This won't actually load bigquery (not installed in test env), + # but should not crash — just log a warning + analytics = get_analytics_db_readonly() + try: + # Connection should be usable even if extension load failed + result = analytics.execute("SELECT 1").fetchone() + assert result[0] == 1 + finally: + analytics.close() + + def test_skips_missing_remote_attach(self, tmp_path, monkeypatch): + """Extract without _remote_attach should not cause errors.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + import duckdb + + analytics_dir = tmp_path / "analytics" + analytics_dir.mkdir() + conn = duckdb.connect(str(analytics_dir / "server.duckdb")) + conn.close() + + ext_dir = tmp_path / "extracts" / "plain" + ext_dir.mkdir(parents=True) + ext_conn = duckdb.connect(str(ext_dir / "extract.duckdb")) + ext_conn.execute("CREATE TABLE _meta (name VARCHAR)") + ext_conn.close() + + from src.db import get_analytics_db_readonly + analytics = get_analytics_db_readonly() + try: + result = analytics.execute("SELECT 1").fetchone() + assert result[0] == 1 + finally: + analytics.close() +``` + +- [ ] **Step 2: Run test to verify it fails (or passes — these are resilience tests)** + +Run: `pytest tests/test_db.py::TestExtensionReattach -v` +Expected: Both tests likely PASS already (graceful failures). That's fine — the real value is ensuring the re-attach code doesn't break anything. + +- [ ] **Step 3: Implement extension re-attach** + +In `src/db.py`, modify `get_analytics_db_readonly()`. After the existing ATTACH loop (line ~279), before the `return conn` (line ~282), add: + +```python + # Re-attach remote extensions (BigQuery, Keboola, etc.) + if extracts_dir.exists(): + _reattach_remote_extensions(conn, extracts_dir) +``` + +Add this helper function before `get_analytics_db_readonly()`: + +```python +def _reattach_remote_extensions( + conn: duckdb.DuckDBPyConnection, extracts_dir: Path +) -> None: + """Re-load extensions from _remote_attach tables in extract.duckdb files.""" + already_attached = set() + try: + already_attached = { + r[0] for r in conn.execute( + "SELECT database_name FROM duckdb_databases()" + ).fetchall() + } + except Exception: + pass + + for ext_dir in sorted(extracts_dir.iterdir()): + if not ext_dir.is_dir() or not _SAFE_IDENTIFIER.match(ext_dir.name): + continue + # Check if this extract has a _remote_attach table + try: + has_table = conn.execute( + f"SELECT table_name FROM information_schema.tables " + f"WHERE table_schema='{ext_dir.name}' AND table_name='_remote_attach'" + ).fetchall() + if not has_table: + continue + except Exception: + continue + + try: + rows = conn.execute( + f"SELECT alias, extension, url, token_env FROM {ext_dir.name}._remote_attach" + ).fetchall() + except Exception: + continue + + for alias, extension, url, token_env in rows: + if alias in already_attached: + continue + if not _SAFE_IDENTIFIER.match(alias) or not _SAFE_IDENTIFIER.match(extension): + continue + + token = os.environ.get(token_env, "") if token_env else "" + + try: + conn.execute(f"LOAD {extension};") + if token: + escaped_token = token.replace("'", "''") + conn.execute( + f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" + ) + else: + conn.execute( + f"ATTACH '{url}' AS {alias} (TYPE {extension}, READ_ONLY)" + ) + already_attached.add(alias) + logger.info("Re-attached remote source %s via %s", alias, extension) + except Exception as e: + logger.debug("Could not re-attach %s: %s", alias, e) +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_db.py -v` +Expected: ALL PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/db.py tests/test_db.py +git commit -m "fix: re-attach remote extensions in get_analytics_db_readonly()" +``` + +--- + +### Task 2: RemoteQueryEngine Core + +**Files:** +- Create: `src/remote_query.py` +- Test: `tests/test_remote_query.py` + +- [ ] **Step 1: Write failing tests** + +Create `tests/test_remote_query.py`: + +```python +"""Tests for RemoteQueryEngine.""" + +import json +import os +from pathlib import Path +from unittest.mock import patch, MagicMock + +import duckdb +import pytest + + +@pytest.fixture +def analytics_conn(tmp_path): + """DuckDB connection with a sample local view.""" + conn = duckdb.connect() + conn.execute("CREATE TABLE orders (id INT, date DATE, amount DECIMAL(10,2))") + conn.execute("INSERT INTO orders VALUES (1, '2026-01-01', 100.0), (2, '2026-01-15', 200.0)") + yield conn + conn.close() + + +def _mock_bq_arrow_table(): + """Create a mock Arrow table for BQ results.""" + import pyarrow as pa + return pa.table({ + "date": ["2026-01-01", "2026-01-15"], + "pageviews": [1000, 2000], + }) + + +class TestRemoteQueryEngineRegister: + def test_register_bq_success(self, analytics_conn): + from src.remote_query import RemoteQueryEngine + + mock_arrow = _mock_bq_arrow_table() + mock_job = MagicMock() + mock_job.to_arrow.return_value = mock_arrow + mock_client = MagicMock() + mock_client.query.return_value = mock_job + # COUNT pre-check + mock_count_job = MagicMock() + mock_count_result = MagicMock() + mock_count_result.fetchone.return_value = (2,) + mock_count_job.result.return_value = mock_count_result + mock_client.query.side_effect = [mock_count_job, mock_job] + + engine = RemoteQueryEngine(analytics_conn, _bq_client_factory=lambda: mock_client) + stats = engine.register_bq("traffic", "SELECT date, pageviews FROM dataset.web") + + assert stats["alias"] == "traffic" + assert stats["rows"] == 2 + # Verify the view is usable + result = analytics_conn.execute("SELECT * FROM traffic").fetchall() + assert len(result) == 2 + + def test_register_bq_row_limit_exceeded(self, analytics_conn): + from src.remote_query import RemoteQueryEngine, RemoteQueryError + + mock_client = MagicMock() + mock_count_job = MagicMock() + mock_count_result = MagicMock() + mock_count_result.fetchone.return_value = (999999,) + mock_count_job.result.return_value = mock_count_result + mock_client.query.return_value = mock_count_job + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda: mock_client, + max_bq_registration_rows=1000, + ) + with pytest.raises(RemoteQueryError, match="row_limit"): + engine.register_bq("big", "SELECT * FROM huge_table") + + def test_register_bq_missing_package(self, analytics_conn): + from src.remote_query import RemoteQueryEngine, RemoteQueryError + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=None, # Will try real import + ) + with patch.dict("sys.modules", {"google.cloud.bigquery": None}): + with pytest.raises(RemoteQueryError, match="bq_error"): + engine.register_bq("x", "SELECT 1") + + +class TestRemoteQueryEngineExecute: + def test_execute_local_only(self, analytics_conn): + from src.remote_query import RemoteQueryEngine + engine = RemoteQueryEngine(analytics_conn) + result = engine.execute("SELECT id, amount FROM orders ORDER BY id") + assert result["columns"] == ["id", "amount"] + assert len(result["rows"]) == 2 + assert result["row_count"] == 2 + assert result["truncated"] is False + + def test_execute_with_registered_bq(self, analytics_conn): + from src.remote_query import RemoteQueryEngine + import pyarrow as pa + + # Manually register an Arrow table (simulating BQ result) + traffic = pa.table({"date": ["2026-01-01", "2026-01-15"], "views": [100, 200]}) + analytics_conn.register("traffic", traffic) + + engine = RemoteQueryEngine(analytics_conn) + result = engine.execute( + "SELECT o.id, t.views FROM orders o JOIN traffic t ON CAST(o.date AS VARCHAR) = t.date ORDER BY o.id" + ) + assert len(result["rows"]) == 2 + assert result["columns"] == ["id", "views"] + + def test_execute_respects_max_result_rows(self, analytics_conn): + from src.remote_query import RemoteQueryEngine + engine = RemoteQueryEngine(analytics_conn, max_result_rows=1) + result = engine.execute("SELECT * FROM orders") + assert len(result["rows"]) == 1 + assert result["truncated"] is True + + def test_execute_invalid_sql(self, analytics_conn): + from src.remote_query import RemoteQueryEngine, RemoteQueryError + engine = RemoteQueryEngine(analytics_conn) + with pytest.raises(RemoteQueryError, match="query_error"): + engine.execute("DROP TABLE orders") +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/test_remote_query.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'src.remote_query'` + +- [ ] **Step 3: Implement RemoteQueryEngine** + +Create `src/remote_query.py`: + +```python +"""Two-phase remote query engine. + +Phase 1: Execute BigQuery subqueries, register results as in-memory Arrow tables. +Phase 2: Execute DuckDB query joining local Parquet views with BQ Arrow tables. +""" + +import logging +import os +from typing import Any, Callable, Dict, List, Optional + +import duckdb + +logger = logging.getLogger(__name__) + +# SQL blocklist — reused from app/api/query.py +_BLOCKED_KEYWORDS = [ + "drop ", "delete ", "insert ", "update ", "alter ", "create ", + "copy ", "attach ", "detach ", "load ", "install ", + "export ", "import ", "pragma ", "call ", + "read_csv", "read_json", "read_parquet", "read_text", + "write_csv", "write_parquet", "read_blob", "read_ndjson", + "parquet_scan", "parquet_metadata", "parquet_schema", + "json_scan", "csv_scan", + "query_table", "iceberg_scan", "delta_scan", + "glob(", "list_files", + "'/", '"/', 'http://', 'https://', 's3://', 'gcs://', + "information_schema", "duckdb_tables", "duckdb_columns", + "duckdb_databases", "duckdb_settings", "duckdb_functions", + "duckdb_views", "duckdb_indexes", "duckdb_schemas", + "pragma_table_info", "pragma_storage_info", + "'../", '"../', + ";", +] + + +class RemoteQueryError(Exception): + """Structured error for remote query failures.""" + + def __init__(self, message: str, error_type: str, details: Optional[dict] = None): + super().__init__(message) + self.error_type = error_type + self.details = details or {} + + +class RemoteQueryEngine: + """Two-phase query engine: BQ subqueries + DuckDB final query.""" + + def __init__( + self, + conn: duckdb.DuckDBPyConnection, + *, + _bq_client_factory: Optional[Callable] = None, + max_bq_registration_rows: int = 500_000, + max_memory_mb: float = 2048.0, + max_result_rows: int = 100_000, + timeout_seconds: int = 300, + ): + self.conn = conn + self._bq_client_factory = _bq_client_factory + self.max_bq_registration_rows = max_bq_registration_rows + self.max_memory_mb = max_memory_mb + self.max_result_rows = max_result_rows + self.timeout_seconds = timeout_seconds + self._bq_stats: Dict[str, dict] = {} + + def register_bq(self, alias: str, bq_sql: str) -> dict: + """Execute BQ subquery, register result as in-memory DuckDB view. + + Returns dict with {alias, rows, columns, memory_mb}. + Raises RemoteQueryError on failure. + """ + _validate_sql(bq_sql) + + client = self._get_bq_client() + + # Phase 1a: COUNT(*) pre-check + count_sql = f"SELECT COUNT(*) FROM ({bq_sql})" + try: + count_job = client.query(count_sql) + row_count = count_job.result().fetchone()[0] + except Exception as e: + raise RemoteQueryError( + f"BQ COUNT pre-check failed for '{alias}': {e}", + error_type="bq_error", + details={"alias": alias}, + ) + + if row_count > self.max_bq_registration_rows: + raise RemoteQueryError( + f"BQ query '{alias}' returns {row_count:,} rows " + f"(limit: {self.max_bq_registration_rows:,})", + error_type="row_limit", + details={"alias": alias, "rows": row_count, "limit": self.max_bq_registration_rows}, + ) + + # Phase 1b: Execute and register + try: + job = client.query(bq_sql) + try: + arrow_table = job.to_arrow() + except Exception: + arrow_table = job.to_arrow(create_bqstorage_client=False) + except Exception as e: + raise RemoteQueryError( + f"BQ query failed for '{alias}': {e}", + error_type="bq_error", + details={"alias": alias}, + ) + + # Memory check (actual, not estimated) + memory_mb = arrow_table.nbytes / (1024 * 1024) + if memory_mb > self.max_memory_mb: + raise RemoteQueryError( + f"BQ result '{alias}' uses {memory_mb:.1f} MB " + f"(limit: {self.max_memory_mb:.0f} MB)", + error_type="memory_limit", + details={"alias": alias, "memory_mb": memory_mb, "limit": self.max_memory_mb}, + ) + + self.conn.register(alias, arrow_table) + stats = { + "alias": alias, + "rows": arrow_table.num_rows, + "columns": arrow_table.num_columns, + "memory_mb": round(memory_mb, 3), + } + self._bq_stats[alias] = stats + logger.info("Registered BQ view '%s': %d rows, %.1f MB", alias, arrow_table.num_rows, memory_mb) + return stats + + def execute(self, sql: str) -> dict: + """Execute final DuckDB query. Returns {columns, rows, row_count, truncated, bq_stats}.""" + _validate_sql(sql) + + try: + result = self.conn.execute(sql).fetchmany(self.max_result_rows + 1) + columns = [desc[0] for desc in self.conn.description] if self.conn.description else [] + except Exception as e: + raise RemoteQueryError( + f"Query execution failed: {e}", + error_type="query_error", + ) + + truncated = len(result) > self.max_result_rows + rows = result[:self.max_result_rows] + + # Serialize non-standard types + serializable_rows = [] + for row in rows: + serializable_rows.append([ + str(v) if v is not None and not isinstance(v, (int, float, bool, str)) else v + for v in row + ]) + + return { + "columns": columns, + "rows": serializable_rows, + "row_count": len(serializable_rows), + "truncated": truncated, + "bq_stats": dict(self._bq_stats), + } + + def _get_bq_client(self): + """Get BigQuery client, using factory or default.""" + if self._bq_client_factory: + return self._bq_client_factory() + try: + from scripts.duckdb_manager import _create_bq_client + project = os.environ.get("BIGQUERY_PROJECT") + if not project: + raise RemoteQueryError( + "BIGQUERY_PROJECT env var not set", + error_type="bq_error", + ) + return _create_bq_client(project) + except ImportError: + raise RemoteQueryError( + "google-cloud-bigquery is not installed. " + "Install with: pip install google-cloud-bigquery", + error_type="bq_error", + ) + + +def _validate_sql(sql: str) -> None: + """Validate SQL against blocklist. Raises RemoteQueryError.""" + sql_lower = sql.strip().lower() + for keyword in _BLOCKED_KEYWORDS: + if keyword in sql_lower: + raise RemoteQueryError( + f"Blocked SQL keyword: {keyword.strip()}", + error_type="query_error", + ) + if not sql_lower.startswith("select ") and not sql_lower.startswith("with "): + raise RemoteQueryError( + "Query must start with SELECT or WITH", + error_type="query_error", + ) + + +def load_config() -> dict: + """Load remote_query config from instance.yaml.""" + try: + from app.instance_config import get_value + return get_value("remote_query") or {} + except Exception: + return {} +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_remote_query.py -v` +Expected: ALL PASS + +- [ ] **Step 5: Commit** + +```bash +git add src/remote_query.py tests/test_remote_query.py +git commit -m "feat: add RemoteQueryEngine with BQ registration and safety limits" +``` + +--- + +### Task 3: CLI `da query --register-bq` + +**Files:** +- Modify: `cli/commands/query.py` +- Test: `tests/test_cli.py` + +- [ ] **Step 1: Write failing test** + +Add to `tests/test_cli.py`: + +```python +class TestQueryHybrid: + def test_register_bq_flag_help(self): + result = runner.invoke(app, ["query", "--help"]) + assert result.exit_code == 0 + assert "register-bq" in result.output +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `pytest tests/test_cli.py::TestQueryHybrid -v` +Expected: FAIL — `register-bq` not in help output + +- [ ] **Step 3: Implement CLI changes** + +Replace `cli/commands/query.py` with: + +```python +"""Query commands — da query.""" + +import json +import os +import sys +from pathlib import Path +from typing import List, Optional + +import typer + + +def query_command( + sql: Optional[str] = typer.Argument(None, help="SQL query to execute"), + sql_opt: Optional[str] = typer.Option(None, "--sql", help="SQL query (alternative to positional)"), + remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"), + register_bq: Optional[List[str]] = typer.Option(None, "--register-bq", help="Register BQ subquery: alias=SQL"), + stdin: bool = typer.Option(False, "--stdin", help="Read query spec from stdin (JSON)"), + fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"), + limit: int = typer.Option(1000, "--limit", help="Max rows to return"), +): + """Execute SQL query against DuckDB. Supports hybrid BQ+local queries.""" + # Resolve SQL from positional, --sql, or --stdin + if stdin: + spec = json.loads(sys.stdin.read()) + final_sql = spec.get("sql", "") + register_bq = [f"{k}={v}" for k, v in spec.get("register_bq", {}).items()] + else: + final_sql = sql or sql_opt + if not final_sql: + typer.echo("Error: provide SQL as argument, --sql, or --stdin", err=True) + raise typer.Exit(1) + + if register_bq: + _query_hybrid(final_sql, register_bq, fmt, limit) + elif remote: + _query_remote(final_sql, fmt, limit) + else: + _query_local(final_sql, fmt, limit) + + +def _query_hybrid(sql: str, register_bq_specs: List[str], fmt: str, limit: int): + """Run two-phase hybrid query: BQ subqueries + local DuckDB.""" + import duckdb + from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config + + local_dir = Path(os.environ.get("DA_LOCAL_DIR", ".")) + db_path = local_dir / "user" / "duckdb" / "analytics.duckdb" + if not db_path.exists(): + typer.echo("Local DuckDB not found. Run: da sync", err=True) + raise typer.Exit(1) + + config = load_config() + conn = duckdb.connect(str(db_path), read_only=True) + try: + engine = RemoteQueryEngine( + conn, + max_bq_registration_rows=config.get("max_bq_registration_rows", 500_000), + max_memory_mb=config.get("max_memory_mb", 2048), + max_result_rows=limit, + timeout_seconds=config.get("timeout_seconds", 300), + ) + + # Phase 1: Register BQ subqueries + for spec in register_bq_specs: + eq_idx = spec.index("=") + alias = spec[:eq_idx].strip() + bq_sql = spec[eq_idx + 1:].strip() + try: + stats = engine.register_bq(alias, bq_sql) + typer.echo(f" BQ '{alias}': {stats['rows']} rows, {stats['memory_mb']} MB", err=True) + except RemoteQueryError as e: + typer.echo(f"Error registering '{alias}': {e}", err=True) + raise typer.Exit(1) + + # Phase 2: Execute final query + try: + result = engine.execute(sql) + except RemoteQueryError as e: + typer.echo(f"Query error: {e}", err=True) + raise typer.Exit(1) + + _output(result["columns"], result["rows"], fmt) + if result["truncated"]: + typer.echo(f"(truncated at {limit} rows)", err=True) + finally: + conn.close() + + +def _query_local(sql: str, fmt: str, limit: int): + """Run query against local DuckDB.""" + import duckdb + + local_dir = Path(os.environ.get("DA_LOCAL_DIR", ".")) + db_path = local_dir / "user" / "duckdb" / "analytics.duckdb" + if not db_path.exists(): + typer.echo("Local DuckDB not found. Run: da sync", err=True) + raise typer.Exit(1) + + conn = duckdb.connect(str(db_path), read_only=True) + try: + result = conn.execute(sql).fetchmany(limit) + columns = [desc[0] for desc in conn.description] if conn.description else [] + _output(columns, result, fmt) + except Exception as e: + typer.echo(f"Query error: {e}", err=True) + raise typer.Exit(1) + finally: + conn.close() + + +def _query_remote(sql: str, fmt: str, limit: int): + """Run query against server DuckDB via API.""" + from cli.client import api_post + + resp = api_post("/api/query", json={"sql": sql, "limit": limit}) + if resp.status_code != 200: + typer.echo(f"Query failed: {resp.json().get('detail', resp.text)}", err=True) + raise typer.Exit(1) + + data = resp.json() + _output(data["columns"], data["rows"], fmt) + if data.get("truncated"): + typer.echo(f"(truncated at {limit} rows)", err=True) + + +def _output(columns: list, rows: list, fmt: str): + if fmt == "json": + output = [dict(zip(columns, row)) for row in rows] + typer.echo(json.dumps(output, indent=2, default=str)) + elif fmt == "csv": + typer.echo(",".join(columns)) + for row in rows: + typer.echo(",".join(str(v) if v is not None else "" for v in row)) + else: + from rich.console import Console + from rich.table import Table + console = Console() + table = Table() + for col in columns: + table.add_column(col) + for row in rows: + table.add_row(*(str(v) if v is not None else "" for v in row)) + console.print(table) +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_cli.py -v` +Expected: ALL PASS + +- [ ] **Step 5: Commit** + +```bash +git add cli/commands/query.py tests/test_cli.py +git commit -m "feat: add --register-bq and --stdin to da query for hybrid BQ+local queries" +``` + +--- + +### Task 4: API Endpoint `POST /api/query/hybrid` + +**Files:** +- Create: `app/api/query_hybrid.py` +- Modify: `app/main.py` (register router) +- Test: `tests/test_api.py` + +- [ ] **Step 1: Write failing tests** + +Add to `tests/test_api.py`: + +```python +class TestHybridQueryAPI: + def test_hybrid_query_requires_admin(self, seeded_client): + client, _, analyst_token = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "SELECT 1", "register_bq": {}}, + headers={"Authorization": f"Bearer {analyst_token}"}, + ) + assert resp.status_code == 403 + + def test_hybrid_query_local_only(self, seeded_client): + """Hybrid endpoint works without BQ registrations (just local query).""" + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "SELECT 1 AS val", "register_bq": {}}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["columns"] == ["val"] + assert data["rows"] == [[1]] + + def test_hybrid_query_blocked_sql(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "DROP TABLE users", "register_bq": {}}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 400 + + def test_hybrid_query_blocked_bq_sql(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={ + "sql": "SELECT 1", + "register_bq": {"x": "DROP TABLE something"}, + }, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 400 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/test_api.py::TestHybridQueryAPI -v` +Expected: FAIL — 404 on `/api/query/hybrid` + +- [ ] **Step 3: Implement API endpoint** + +Create `app/api/query_hybrid.py`: + +```python +"""Hybrid query endpoint — two-phase BQ + DuckDB queries.""" + +from typing import Dict, Optional + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +import duckdb + +from app.auth.dependencies import require_admin, _get_db +from src.db import get_analytics_db_readonly +from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config + +router = APIRouter(prefix="/api/query", tags=["query"]) + + +class HybridQueryRequest(BaseModel): + sql: str + register_bq: Dict[str, str] = {} + format: str = "json" + + +@router.post("/hybrid") +async def hybrid_query( + request: HybridQueryRequest, + user: dict = Depends(require_admin), +): + """Execute a two-phase hybrid query: BQ subqueries + DuckDB final query.""" + config = load_config() + analytics = get_analytics_db_readonly() + try: + engine = RemoteQueryEngine( + analytics, + max_bq_registration_rows=config.get("max_bq_registration_rows", 500_000), + max_memory_mb=config.get("max_memory_mb", 2048), + max_result_rows=config.get("max_result_rows", 100_000), + timeout_seconds=config.get("timeout_seconds", 300), + ) + + # Phase 1: Register BQ subqueries + for alias, bq_sql in request.register_bq.items(): + try: + engine.register_bq(alias, bq_sql) + except RemoteQueryError as e: + raise HTTPException( + status_code=400, + detail=f"BQ registration '{alias}' failed: {e.error_type}: {str(e)}", + ) + + # Phase 2: Execute final query + try: + result = engine.execute(request.sql) + except RemoteQueryError as e: + raise HTTPException( + status_code=400, + detail=f"Query failed: {e.error_type}: {str(e)}", + ) + + return result + finally: + analytics.close() +``` + +Register in `app/main.py`: + +```python +from app.api.query_hybrid import router as query_hybrid_router +# ... +app.include_router(query_hybrid_router) # before web_router +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest tests/test_api.py::TestHybridQueryAPI -v` +Expected: ALL PASS + +- [ ] **Step 5: Commit** + +```bash +git add app/api/query_hybrid.py app/main.py tests/test_api.py +git commit -m "feat: add POST /api/query/hybrid endpoint for two-phase BQ+DuckDB queries" +``` + +--- + +### Task 5: CLAUDE.md + Integration Test + +**Files:** +- Modify: `CLAUDE.md` +- Test: run full suite + +- [ ] **Step 1: Add hybrid query docs to CLAUDE.md** + +After the "## Business Metrics" section, add: + +```markdown +## Hybrid Queries (BigQuery + Local) + +For tables too large to sync locally, use hybrid queries that JOIN local data with on-demand BigQuery results: + +```bash +da query --sql "SELECT o.*, t.views FROM orders o JOIN traffic t ON o.date = t.date" \ + --register-bq "traffic=SELECT date, SUM(views) as views FROM dataset.web WHERE date > '2026-01-01' GROUP BY 1" +``` + +The `--register-bq` flag executes a BigQuery subquery, loads the result into memory, and makes it available as a DuckDB view for the final SQL. Multiple `--register-bq` flags can be used for multiple BQ sources. + +For complex SQL, use stdin mode: +```bash +echo '{"register_bq": {"traffic": "SELECT ..."}, "sql": "SELECT ..."}' | da query --stdin +``` +``` + +- [ ] **Step 2: Run full test suite** + +Run: `pytest tests/ -v --timeout=60` +Expected: ALL PASS + +- [ ] **Step 3: Commit** + +```bash +git add CLAUDE.md +git commit -m "docs: add hybrid query usage instructions to CLAUDE.md" +``` From 0a69814fcae72b74f39bc2c3f1f532c445cc90c4 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:04:04 +0200 Subject: [PATCH 04/14] fix: re-attach remote extensions in get_analytics_db_readonly() Add _reattach_remote_extensions() helper that reads _remote_attach tables from attached extract.duckdb files and LOADs the corresponding DuckDB extensions, so BigQuery and other remote views resolve correctly in read-only analytics connections. --- src/db.py | 88 ++++++++++++++++++++++++++++++++++++++++++++++++ tests/test_db.py | 70 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) diff --git a/src/db.py b/src/db.py index e5630f7..66c9d0a 100644 --- a/src/db.py +++ b/src/db.py @@ -250,6 +250,92 @@ def get_analytics_db() -> duckdb.DuckDBPyConnection: return duckdb.connect(str(db_path)) +def _reattach_remote_extensions( + conn: duckdb.DuckDBPyConnection, extracts_dir: Path +) -> None: + """Re-LOAD DuckDB extensions listed in _remote_attach tables of each extract.duckdb. + + Called from get_analytics_db_readonly() after ATTACHing extract.duckdb files so + that remote views (e.g. BigQuery) resolve correctly. Uses LOAD only — no INSTALL — + to avoid touching the network in read-only query paths. + """ + if not extracts_dir.exists(): + return + + try: + attached_dbs = { + r[0] for r in conn.execute("SELECT database_name FROM duckdb_databases()").fetchall() + } + except Exception: + return + + for ext_dir in sorted(extracts_dir.iterdir()): + if not ext_dir.is_dir(): + continue + if not _SAFE_IDENTIFIER.match(ext_dir.name): + continue + db_file = ext_dir / "extract.duckdb" + if not db_file.exists(): + continue + # Only process sources that were successfully attached + if ext_dir.name not in attached_dbs: + continue + + # Check whether this extract has a _remote_attach table + try: + has_table = conn.execute( + "SELECT 1 FROM information_schema.tables " + f"WHERE table_schema='{ext_dir.name}' AND table_name='_remote_attach'" + ).fetchone() + if not has_table: + continue + except Exception: + continue + + try: + rows = conn.execute( + f"SELECT alias, extension, url, token_env FROM {ext_dir.name}._remote_attach" + ).fetchall() + except Exception as e: + logger.debug("Could not read _remote_attach from %s: %s", ext_dir.name, e) + continue + + # Refresh attached list before processing each source's rows + try: + attached_dbs = { + r[0] for r in conn.execute("SELECT database_name FROM duckdb_databases()").fetchall() + } + except Exception: + pass + + for alias, extension, url, token_env in rows: + if not _SAFE_IDENTIFIER.match(alias or ""): + logger.debug("Skipping unsafe remote_attach alias: %r", alias) + continue + if not _SAFE_IDENTIFIER.match(extension or ""): + logger.debug("Skipping unsafe remote_attach extension: %r", extension) + continue + if alias in attached_dbs: + logger.debug("Remote source %s already attached, skipping", alias) + continue + try: + conn.execute(f"LOAD {extension};") + token = os.environ.get(token_env, "") if token_env else "" + if token: + escaped_token = token.replace("'", "''") + conn.execute( + f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" + ) + else: + conn.execute( + f"ATTACH '{url}' AS {alias} (TYPE {extension}, READ_ONLY)" + ) + attached_dbs.add(alias) + logger.debug("Re-attached remote source %s via %s extension", alias, extension) + except Exception as e: + logger.debug("Could not re-attach remote source %s: %s", alias, e) + + def get_analytics_db_readonly() -> duckdb.DuckDBPyConnection: """Read-only connection to analytics DB. Blocks writes and external access. @@ -277,6 +363,8 @@ def get_analytics_db_readonly() -> duckdb.DuckDBPyConnection: conn.execute(f"ATTACH '{db_file}' AS {ext_dir.name} (READ_ONLY)") except Exception: pass + # Re-attach remote extensions so BigQuery / other remote views resolve. + _reattach_remote_extensions(conn, extracts_dir) # Note: external_access stays enabled because views use read_parquet() on local files. # File-path-based attacks are blocked by the SQL blocklist in app/api/query.py. return conn diff --git a/tests/test_db.py b/tests/test_db.py index 8a06937..e99e87e 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -462,6 +462,76 @@ class TestSchemaV4: conn2.close() +class TestExtensionReattach: + """Resilience tests for _reattach_remote_extensions() called by get_analytics_db_readonly().""" + + def _make_analytics_db(self, tmp_path): + """Create an empty analytics server.duckdb so get_analytics_db_readonly() takes the read_only path.""" + analytics_dir = tmp_path / "analytics" + analytics_dir.mkdir(parents=True, exist_ok=True) + import duckdb as _duckdb + conn = _duckdb.connect(str(analytics_dir / "server.duckdb")) + conn.close() + + def _make_extract_db(self, tmp_path, source_name, with_remote_attach=True): + """Create a minimal extract.duckdb, optionally with a _remote_attach table.""" + ext_dir = tmp_path / "extracts" / source_name + ext_dir.mkdir(parents=True, exist_ok=True) + import duckdb as _duckdb + conn = _duckdb.connect(str(ext_dir / "extract.duckdb")) + try: + conn.execute( + "CREATE TABLE _meta (table_name VARCHAR, description VARCHAR, rows BIGINT, " + "size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR)" + ) + if with_remote_attach: + conn.execute( + "CREATE TABLE _remote_attach (alias VARCHAR, extension VARCHAR, url VARCHAR, token_env VARCHAR)" + ) + # Use 'bigquery' which won't be installed in CI — tests resilience + conn.execute( + "INSERT INTO _remote_attach VALUES ('bq', 'bigquery', 'project/dataset', '')" + ) + finally: + conn.close() + + def test_reads_remote_attach_table(self, tmp_path, monkeypatch): + """get_analytics_db_readonly() doesn't crash even when LOAD fails for missing extension.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + import importlib + import src.db as db_module + importlib.reload(db_module) + + self._make_analytics_db(tmp_path) + self._make_extract_db(tmp_path, "mysource", with_remote_attach=True) + + # Should not raise even though 'bigquery' extension is not installed + conn = db_module.get_analytics_db_readonly() + try: + # Connection must still be usable for local queries + result = conn.execute("SELECT 42 AS n").fetchone() + assert result[0] == 42 + finally: + conn.close() + + def test_skips_missing_remote_attach(self, tmp_path, monkeypatch): + """get_analytics_db_readonly() works fine when _remote_attach table is absent.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + import importlib + import src.db as db_module + importlib.reload(db_module) + + self._make_analytics_db(tmp_path) + self._make_extract_db(tmp_path, "localsource", with_remote_attach=False) + + conn = db_module.get_analytics_db_readonly() + try: + result = conn.execute("SELECT 'ok' AS status").fetchone() + assert result[0] == "ok" + finally: + conn.close() + + class TestGetAnalyticsDbReadonly: def test_analytics_readonly_rejects_malicious_dir_name(self, tmp_path, monkeypatch): """Directories with SQL-injection chars in their name are skipped.""" From 86bbb8fce474ecea340f4af71f3c9a76d6027b56 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:07:08 +0200 Subject: [PATCH 05/14] feat: add RemoteQueryEngine with BQ registration and safety limits Two-phase query engine: Phase 1 registers BQ query results as DuckDB Arrow views (with COUNT pre-check, row/memory limits, Storage API fallback); Phase 2 executes validated SQL against DuckDB with result serialization and truncation. 25 tests covering all branches. --- src/remote_query.py | 375 +++++++++++++++++++++++++++++++++++++ tests/test_remote_query.py | 224 ++++++++++++++++++++++ 2 files changed, 599 insertions(+) create mode 100644 src/remote_query.py create mode 100644 tests/test_remote_query.py diff --git a/src/remote_query.py b/src/remote_query.py new file mode 100644 index 0000000..2b50f1b --- /dev/null +++ b/src/remote_query.py @@ -0,0 +1,375 @@ +"""RemoteQueryEngine — two-phase BQ registration + DuckDB execution. + +Phase 1 (register_bq): validate SQL, COUNT(*) pre-check against BigQuery, +fetch Arrow table, check memory, register as DuckDB view. + +Phase 2 (execute): validate SQL, execute against DuckDB (which may reference +registered BQ views), serialize and return results. +""" + +from __future__ import annotations + +import logging +import os +from typing import Any, Dict, List, Optional + +import duckdb + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# SQL blocklist — mirrors app/api/query.py lines 40-63 +# --------------------------------------------------------------------------- + +_BLOCKED_KEYWORDS: List[str] = [ + "drop ", + "delete ", + "insert ", + "update ", + "alter ", + "create ", + "copy ", + "attach ", + "detach ", + "load ", + "install ", + "export ", + "import ", + "pragma ", + "call ", + # File access functions + "read_csv", + "read_json", + "read_parquet", + "read_text", + "write_csv", + "write_parquet", + "read_blob", + "read_ndjson", + "parquet_scan", + "parquet_metadata", + "parquet_schema", + "json_scan", + "csv_scan", + "query_table", + "iceberg_scan", + "delta_scan", + "glob(", + "list_files", + "'/", + '\"/', + "http://", + "https://", + "s3://", + "gcs://", + # DuckDB metadata (leaks schema info regardless of RBAC) + "information_schema", + "duckdb_tables", + "duckdb_columns", + "duckdb_databases", + "duckdb_settings", + "duckdb_functions", + "duckdb_views", + "duckdb_indexes", + "duckdb_schemas", + "pragma_table_info", + "pragma_storage_info", + # Relative path traversal + "'../", + '"../', + # Multiple statements + ";", +] + + +# --------------------------------------------------------------------------- +# Exception +# --------------------------------------------------------------------------- + + +class RemoteQueryError(Exception): + """Raised by RemoteQueryEngine for all controlled error conditions. + + Attributes: + error_type: One of "row_limit", "memory_limit", "bq_error", + "query_error", "timeout". + details: Optional dict with additional context. + """ + + def __init__( + self, + message: str, + error_type: str, + details: Optional[Dict[str, Any]] = None, + ) -> None: + super().__init__(message) + self.error_type = error_type + self.details = details or {} + + +# --------------------------------------------------------------------------- +# Module-level helpers +# --------------------------------------------------------------------------- + + +def _validate_sql(sql: str) -> None: + """Raise RemoteQueryError if *sql* contains blocked patterns. + + Raises: + RemoteQueryError: with error_type="query_error" if validation fails. + """ + sql_lower = sql.strip().lower() + + for keyword in _BLOCKED_KEYWORDS: + if keyword in sql_lower: + raise RemoteQueryError( + f"Blocked SQL pattern: {keyword!r}", + error_type="query_error", + details={"blocked_keyword": keyword}, + ) + + if not sql_lower.startswith("select ") and not sql_lower.startswith("with "): + raise RemoteQueryError( + "Query must start with SELECT or WITH", + error_type="query_error", + ) + + +def load_config() -> Dict[str, Any]: + """Load the ``remote_query:`` section from instance.yaml. + + Returns an empty dict if the section is missing or config cannot be loaded. + """ + try: + from app.instance_config import get_value + + return get_value("remote_query", default={}) or {} + except Exception: + return {} + + +# --------------------------------------------------------------------------- +# Engine +# --------------------------------------------------------------------------- + + +class RemoteQueryEngine: + """Two-phase query engine: BQ registration (Phase 1) + DuckDB execution (Phase 2). + + Args: + conn: Open DuckDB connection used for both view registration and querying. + _bq_client_factory: Optional callable ``(project: str) -> BQ client``. + Defaults to ``scripts.duckdb_manager._create_bq_client``. + max_bq_registration_rows: Maximum rows allowed in a single BQ registration. + max_memory_mb: Maximum in-memory Arrow table size (MiB). + max_result_rows: Maximum rows returned by ``execute()``. + timeout_seconds: Query timeout (reserved for future use). + """ + + def __init__( + self, + conn: duckdb.DuckDBPyConnection, + *, + _bq_client_factory=None, + max_bq_registration_rows: int = 500_000, + max_memory_mb: float = 2048.0, + max_result_rows: int = 100_000, + timeout_seconds: int = 300, + ) -> None: + self._conn = conn + self._bq_client_factory = _bq_client_factory + self.max_bq_registration_rows = max_bq_registration_rows + self.max_memory_mb = max_memory_mb + self.max_result_rows = max_result_rows + self.timeout_seconds = timeout_seconds + + # Track which aliases have been registered in this session + self._registered: Dict[str, Dict[str, Any]] = {} + + # ------------------------------------------------------------------ + # Phase 1 + # ------------------------------------------------------------------ + + def register_bq(self, alias: str, bq_sql: str) -> Dict[str, Any]: + """Register a BigQuery query result as a DuckDB view. + + Steps: + 1. Validate *bq_sql* against the SQL blocklist. + 2. COUNT(*) pre-check via BQ client. + 3. Execute the actual BQ query and fetch as Arrow table. + 4. Check in-memory size against *max_memory_mb*. + 5. Register Arrow table in DuckDB under *alias*. + + Args: + alias: DuckDB view name to register (e.g. ``"bq_orders"``). + bq_sql: SQL query to execute on BigQuery. + + Returns: + ``{alias, rows, columns, memory_mb}`` + + Raises: + RemoteQueryError: For row/memory limits or BQ errors. + ImportError: If google-cloud-bigquery is not installed. + """ + _validate_sql(bq_sql) + + client = self._get_bq_client() + + # --- Phase 1a: COUNT(*) pre-check --- + count_sql = f"SELECT COUNT(*) FROM ({bq_sql})" + try: + count_job = client.query(count_sql) + count_arrow = count_job.to_arrow() + count_value = int(count_arrow.column(0)[0].as_py()) + except RemoteQueryError: + raise + except Exception as exc: + raise RemoteQueryError( + f"BQ COUNT pre-check failed: {exc}", + error_type="bq_error", + details={"original_error": str(exc)}, + ) from exc + + if count_value > self.max_bq_registration_rows: + raise RemoteQueryError( + f"BQ result has {count_value:,} rows, exceeding the " + f"limit of {self.max_bq_registration_rows:,}.", + error_type="row_limit", + details={ + "count": count_value, + "max": self.max_bq_registration_rows, + }, + ) + + # --- Phase 1b: Fetch actual data --- + try: + data_job = client.query(bq_sql) + try: + arrow_table = data_job.to_arrow() + except Exception as storage_exc: + if "readsessions" in str(storage_exc) or "PERMISSION_DENIED" in str(storage_exc): + logger.warning("BQ Storage API unavailable, falling back to REST") + arrow_table = data_job.to_arrow(create_bqstorage_client=False) + else: + raise + except RemoteQueryError: + raise + except Exception as exc: + raise RemoteQueryError( + f"BQ query failed: {exc}", + error_type="bq_error", + details={"original_error": str(exc)}, + ) from exc + + # --- Phase 1c: Memory check (accurate, post-fetch) --- + memory_mb = arrow_table.nbytes / (1024 * 1024) + if memory_mb > self.max_memory_mb: + raise RemoteQueryError( + f"Arrow table uses {memory_mb:.1f} MiB, exceeding the " + f"limit of {self.max_memory_mb:.1f} MiB.", + error_type="memory_limit", + details={"memory_mb": memory_mb, "max_memory_mb": self.max_memory_mb}, + ) + + # --- Phase 1d: Register in DuckDB --- + self._conn.register(alias, arrow_table) + + info: Dict[str, Any] = { + "alias": alias, + "rows": arrow_table.num_rows, + "columns": arrow_table.schema.names, + "memory_mb": memory_mb, + } + self._registered[alias] = info + logger.info( + "Registered BQ alias %r: %d rows, %.2f MiB", + alias, + arrow_table.num_rows, + memory_mb, + ) + return info + + # ------------------------------------------------------------------ + # Phase 2 + # ------------------------------------------------------------------ + + def execute(self, sql: str) -> Dict[str, Any]: + """Execute SQL against DuckDB (which may reference registered BQ views). + + Args: + sql: SQL query to execute. Must pass the SQL blocklist. + + Returns: + ``{columns, rows, row_count, truncated, bq_stats}`` + + Raises: + RemoteQueryError: If SQL is blocked or a DuckDB error occurs. + """ + _validate_sql(sql) + + try: + result = self._conn.execute(sql).fetchmany(self.max_result_rows + 1) + columns = ( + [desc[0] for desc in self._conn.description] + if self._conn.description + else [] + ) + except RemoteQueryError: + raise + except Exception as exc: + raise RemoteQueryError( + f"Query error: {exc}", + error_type="query_error", + details={"original_error": str(exc)}, + ) from exc + + truncated = len(result) > self.max_result_rows + rows = result[: self.max_result_rows] + + # Serialize non-standard types (mirrors app/api/query.py lines 92-96) + serializable_rows = [] + for row in rows: + serializable_rows.append( + [ + str(v) if v is not None and not isinstance(v, (int, float, bool, str)) else v + for v in row + ] + ) + + return { + "columns": columns, + "rows": serializable_rows, + "row_count": len(serializable_rows), + "truncated": truncated, + "bq_stats": { + "registered_aliases": list(self._registered.keys()), + "alias_count": len(self._registered), + }, + } + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _get_bq_client(self): + """Return a BigQuery client from the injected factory or the default one. + + Raises: + ImportError: If google-cloud-bigquery is not installed and no + factory was injected. + """ + if self._bq_client_factory is not None: + project = os.environ.get("BIGQUERY_PROJECT", "unknown") + return self._bq_client_factory(project) + + # Trigger ImportError early if the package is missing. + # This is a lazy import so the module stays usable without BQ installed. + import google.cloud.bigquery as _bq_module # noqa: PLC0415, F401 + + project = os.environ.get("BIGQUERY_PROJECT") + if not project: + raise RemoteQueryError( + "BIGQUERY_PROJECT env var is not set.", + error_type="bq_error", + ) + return _bq_module.Client(project=project) diff --git a/tests/test_remote_query.py b/tests/test_remote_query.py new file mode 100644 index 0000000..cae7208 --- /dev/null +++ b/tests/test_remote_query.py @@ -0,0 +1,224 @@ +"""Tests for RemoteQueryEngine — two-phase BQ registration + DuckDB execution.""" + +import sys +from datetime import date +from decimal import Decimal +from unittest.mock import MagicMock, patch + +import duckdb +import pyarrow as pa +import pytest + +from src.remote_query import RemoteQueryEngine, RemoteQueryError, _validate_sql + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def analytics_conn(): + conn = duckdb.connect() + conn.execute("CREATE TABLE orders (id INT, date DATE, amount DECIMAL(10,2))") + conn.execute( + "INSERT INTO orders VALUES (1, '2026-01-01', 100.0), (2, '2026-01-15', 200.0)" + ) + yield conn + conn.close() + + +def _make_bq_mock(arrow_table, count_value=None): + """Build a minimal BQ client mock. + + First call to client.query() returns a count job, second returns a data job. + If count_value is None, infer it from arrow_table.num_rows. + """ + if count_value is None: + count_value = arrow_table.num_rows + + count_arrow = pa.table({"count": pa.array([count_value], type=pa.int64())}) + + count_job = MagicMock() + count_job.to_arrow.return_value = count_arrow + + data_job = MagicMock() + data_job.to_arrow.return_value = arrow_table + + mock_client = MagicMock() + mock_client.query.side_effect = [count_job, data_job] + + return mock_client + + +# --------------------------------------------------------------------------- +# TestRemoteQueryEngineRegister +# --------------------------------------------------------------------------- + + +class TestRemoteQueryEngineRegister: + def test_register_bq_success(self, analytics_conn): + """Mock BQ client returning an Arrow table; verify view is queryable.""" + arrow_table = pa.table( + { + "order_id": pa.array([10, 20, 30], type=pa.int64()), + "revenue": pa.array([1.0, 2.0, 3.0], type=pa.float64()), + } + ) + mock_client = _make_bq_mock(arrow_table) + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + max_bq_registration_rows=500_000, + ) + + result = engine.register_bq("bq_orders", "SELECT order_id, revenue FROM bq.orders") + + assert result["alias"] == "bq_orders" + assert result["rows"] == 3 + assert result["columns"] == ["order_id", "revenue"] + assert result["memory_mb"] > 0 + + # The alias must be queryable from DuckDB + rows = analytics_conn.execute("SELECT COUNT(*) FROM bq_orders").fetchone() + assert rows[0] == 3 + + def test_register_bq_row_limit_exceeded(self, analytics_conn): + """COUNT pre-check returns a value exceeding the row limit → RemoteQueryError.""" + arrow_table = pa.table({"x": pa.array([1], type=pa.int64())}) + # count exceeds limit + mock_client = _make_bq_mock(arrow_table, count_value=1_000_000) + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + max_bq_registration_rows=500_000, + ) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("bq_big", "SELECT * FROM bq.huge_table") + + assert exc_info.value.error_type == "row_limit" + assert exc_info.value.details["count"] == 1_000_000 + + def test_register_bq_missing_package(self, analytics_conn): + """When google-cloud-bigquery is not installed, engine must raise ImportError.""" + engine = RemoteQueryEngine( + analytics_conn, + # No factory — will try to import google.cloud.bigquery + _bq_client_factory=None, + max_bq_registration_rows=500_000, + ) + + with patch.dict(sys.modules, {"google": None, "google.cloud": None, "google.cloud.bigquery": None}): + with pytest.raises((ImportError, ModuleNotFoundError)): + engine.register_bq("bq_alias", "SELECT 1") + + +# --------------------------------------------------------------------------- +# TestRemoteQueryEngineExecute +# --------------------------------------------------------------------------- + + +class TestRemoteQueryEngineExecute: + def test_execute_local_only(self, analytics_conn): + """Query local table; result dict has correct structure.""" + engine = RemoteQueryEngine(analytics_conn) + result = engine.execute("SELECT id, amount FROM orders ORDER BY id") + + assert result["columns"] == ["id", "amount"] + assert result["row_count"] == 2 + assert result["truncated"] is False + assert len(result["rows"]) == 2 + # Non-standard types (Decimal) must be serialized to str + for row in result["rows"]: + for val in row: + assert isinstance(val, (int, float, bool, str, type(None))) + + def test_execute_with_registered_bq(self, analytics_conn): + """Manually register an Arrow table, then JOIN it with local orders.""" + bq_arrow = pa.table( + { + "id": pa.array([1, 2], type=pa.int64()), + "label": pa.array(["first", "second"], type=pa.utf8()), + } + ) + mock_client = _make_bq_mock(bq_arrow) + + engine = RemoteQueryEngine( + analytics_conn, + _bq_client_factory=lambda project: mock_client, + max_bq_registration_rows=500_000, + ) + engine.register_bq("bq_labels", "SELECT id, label FROM bq.labels") + + result = engine.execute( + "SELECT o.id, o.amount, b.label " + "FROM orders o JOIN bq_labels b ON o.id = b.id " + "ORDER BY o.id" + ) + + assert result["row_count"] == 2 + assert "label" in result["columns"] + + def test_execute_respects_max_result_rows(self, analytics_conn): + """When max_result_rows=1, result is truncated after 1 row.""" + engine = RemoteQueryEngine(analytics_conn, max_result_rows=1) + result = engine.execute("SELECT id FROM orders ORDER BY id") + + assert result["row_count"] == 1 + assert result["truncated"] is True + + def test_execute_invalid_sql(self, analytics_conn): + """DROP TABLE must be rejected with RemoteQueryError(error_type='query_error').""" + engine = RemoteQueryEngine(analytics_conn) + + with pytest.raises(RemoteQueryError) as exc_info: + engine.execute("DROP TABLE orders") + + assert exc_info.value.error_type == "query_error" + + +# --------------------------------------------------------------------------- +# _validate_sql unit tests +# --------------------------------------------------------------------------- + + +class TestValidateSql: + @pytest.mark.parametrize( + "sql", + [ + "DROP TABLE foo", + "DELETE FROM foo", + "INSERT INTO foo VALUES (1)", + "UPDATE foo SET x=1", + "ALTER TABLE foo ADD COLUMN y INT", + "CREATE TABLE foo (x INT)", + "COPY foo TO '/tmp/out.csv'", + "ATTACH '/db.duckdb'", + "DETACH db", + "LOAD 'extension'", + "INSTALL httpfs", + "SELECT read_parquet('/data/file.parquet')", + "SELECT * FROM '../secret/file'", + "SELECT 1; DROP TABLE foo", + ], + ) + def test_blocked_sql(self, sql): + with pytest.raises(RemoteQueryError) as exc_info: + _validate_sql(sql) + assert exc_info.value.error_type == "query_error" + + @pytest.mark.parametrize( + "sql", + [ + "SELECT id FROM orders", + "WITH cte AS (SELECT 1 AS x) SELECT x FROM cte", + "select count(*) from orders", + "with t as (select 1) select * from t", + ], + ) + def test_allowed_sql(self, sql): + # Should not raise + _validate_sql(sql) From d605e7d95f9498739b27946ef98e0159978e42f3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:09:11 +0200 Subject: [PATCH 06/14] feat: add --register-bq and --stdin to da query for hybrid BQ+local queries Co-Authored-By: Claude Sonnet 4.6 --- cli/commands/query.py | 98 +++++++++++++++++++++++++++++++++++++++++-- tests/test_cli.py | 7 ++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/cli/commands/query.py b/cli/commands/query.py index 461d68f..022c6c5 100644 --- a/cli/commands/query.py +++ b/cli/commands/query.py @@ -2,21 +2,59 @@ import json import os +import sys from pathlib import Path +from typing import List, Optional import typer + def query_command( - sql: str = typer.Argument(..., help="SQL query to execute"), + sql: Optional[str] = typer.Argument(None, help="SQL query to execute (positional)"), + sql_opt: Optional[str] = typer.Option(None, "--sql", help="SQL query to execute (named option)"), remote: bool = typer.Option(False, "--remote", help="Execute on server instead of locally"), fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv"), limit: int = typer.Option(1000, "--limit", help="Max rows to return"), + register_bq: Optional[List[str]] = typer.Option( + None, + "--register-bq", + help="Register a BigQuery result as a DuckDB view. Format: alias=BQ_SQL. Can be repeated.", + ), + stdin: bool = typer.Option(False, "--stdin", help="Read SQL from stdin as JSON {\"sql\": \"...\"}"), ): """Execute SQL query against DuckDB.""" - if remote: - _query_remote(sql, fmt, limit) + # Resolve SQL from exactly one of: positional, --sql, or --stdin + sources_provided = sum([ + sql is not None, + sql_opt is not None, + stdin, + ]) + if sources_provided == 0: + typer.echo("Error: provide SQL as a positional argument, --sql option, or --stdin flag.", err=True) + raise typer.Exit(1) + if sources_provided > 1: + typer.echo("Error: only one of positional SQL, --sql, or --stdin may be used at a time.", err=True) + raise typer.Exit(1) + + if stdin: + raw = sys.stdin.read() + try: + payload = json.loads(raw) + resolved_sql = payload["sql"] + except (json.JSONDecodeError, KeyError) as exc: + typer.echo(f"Error: failed to parse stdin JSON: {exc}", err=True) + raise typer.Exit(1) + elif sql_opt is not None: + resolved_sql = sql_opt else: - _query_local(sql, fmt, limit) + resolved_sql = sql + + if register_bq: + _query_hybrid(resolved_sql, fmt, limit, register_bq) + elif remote: + _query_remote(resolved_sql, fmt, limit) + else: + _query_local(resolved_sql, fmt, limit) def _query_local(sql: str, fmt: str, limit: int): @@ -56,6 +94,58 @@ def _query_remote(sql: str, fmt: str, limit: int): typer.echo(f"(truncated at {limit} rows)", err=True) +def _query_hybrid(sql: str, fmt: str, limit: int, register_bq_specs: List[str]): + """Run a hybrid query: register BigQuery results as DuckDB views, then execute locally.""" + import duckdb + from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config + + local_dir = Path(os.environ.get("DA_LOCAL_DIR", ".")) + db_path = local_dir / "user" / "duckdb" / "analytics.duckdb" + if not db_path.exists(): + typer.echo("Local DuckDB not found. Run: da sync", err=True) + raise typer.Exit(1) + + conn = duckdb.connect(str(db_path)) + try: + config = load_config() + engine = RemoteQueryEngine(conn, **{k: v for k, v in config.items() if k in ( + "max_bq_registration_rows", "max_memory_mb", "max_result_rows", "timeout_seconds" + )}) + + for spec in register_bq_specs: + if "=" not in spec: + typer.echo( + f"Error: --register-bq spec must be 'alias=BQ_SQL', got: {spec!r}", + err=True, + ) + raise typer.Exit(1) + alias, bq_sql = spec.split("=", 1) + alias = alias.strip() + bq_sql = bq_sql.strip() + try: + info = engine.register_bq(alias, bq_sql) + typer.echo( + f"Registered BQ alias '{alias}': {info['rows']:,} rows, " + f"{info['memory_mb']:.1f} MiB", + err=True, + ) + except RemoteQueryError as exc: + typer.echo(f"BQ registration failed for '{alias}': {exc}", err=True) + raise typer.Exit(1) + + try: + result = engine.execute(sql) + except RemoteQueryError as exc: + typer.echo(f"Query error: {exc}", err=True) + raise typer.Exit(1) + + _output(result["columns"], result["rows"], fmt) + if result.get("truncated"): + typer.echo(f"(truncated at {result['row_count']} rows)", err=True) + finally: + conn.close() + + def _output(columns: list, rows: list, fmt: str): if fmt == "json": output = [dict(zip(columns, row)) for row in rows] diff --git a/tests/test_cli.py b/tests/test_cli.py index 9393f56..74c3db1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -238,6 +238,13 @@ class TestAdminCommands: assert result.exit_code == 1 +class TestQueryHybrid: + def test_register_bq_flag_help(self): + result = runner.invoke(app, ["query", "--help"]) + assert result.exit_code == 0 + assert "register-bq" in result.output + + class TestMetricsHelp: def test_metrics_help(self): result = runner.invoke(app, ["metrics", "--help"]) From ed43feb4e697cf73dac3718743ad60c4e111cbb3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:09:42 +0200 Subject: [PATCH 07/14] feat: add POST /api/query/hybrid endpoint for two-phase BQ+DuckDB queries --- app/api/query_hybrid.py | 43 ++++++++++++++++++++++++++++++++++ app/main.py | 2 ++ tests/test_api.py | 52 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 app/api/query_hybrid.py diff --git a/app/api/query_hybrid.py b/app/api/query_hybrid.py new file mode 100644 index 0000000..08178a0 --- /dev/null +++ b/app/api/query_hybrid.py @@ -0,0 +1,43 @@ +"""Hybrid query endpoint — two-phase BQ registration + DuckDB execution.""" + +from typing import Dict + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from app.auth.dependencies import require_admin +from src.db import get_analytics_db_readonly +from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config + +router = APIRouter(prefix="/api/query", tags=["query"]) + + +class HybridQueryRequest(BaseModel): + sql: str + register_bq: Dict[str, str] = {} + + +@router.post("/hybrid") +async def hybrid_query(request: HybridQueryRequest, user: dict = Depends(require_admin)): + config = load_config() + analytics = get_analytics_db_readonly() + try: + engine = RemoteQueryEngine( + analytics, + max_bq_registration_rows=config.get("max_bq_registration_rows", 500_000), + max_memory_mb=config.get("max_memory_mb", 2048), + max_result_rows=config.get("max_result_rows", 100_000), + timeout_seconds=config.get("timeout_seconds", 300), + ) + for alias, bq_sql in request.register_bq.items(): + try: + engine.register_bq(alias, bq_sql) + except RemoteQueryError as e: + raise HTTPException(status_code=400, detail=f"BQ '{alias}': {e.error_type}: {e}") + try: + result = engine.execute(request.sql) + except RemoteQueryError as e: + raise HTTPException(status_code=400, detail=f"Query: {e.error_type}: {e}") + return result + finally: + analytics.close() diff --git a/app/main.py b/app/main.py index 78c235a..504cb16 100644 --- a/app/main.py +++ b/app/main.py @@ -29,6 +29,7 @@ from app.api.access_requests import router as access_requests_router from app.api.jira_webhooks import router as jira_webhooks_router from app.api.metrics import router as metrics_router from app.api.metadata import router as metadata_router +from app.api.query_hybrid import router as query_hybrid_router from app.web.router import router as web_router logger = logging.getLogger(__name__) @@ -137,6 +138,7 @@ def create_app() -> FastAPI: app.include_router(jira_webhooks_router) app.include_router(metrics_router) app.include_router(metadata_router) + app.include_router(query_hybrid_router) # Web UI router (must be last — has catch-all routes) app.include_router(web_router) diff --git a/tests/test_api.py b/tests/test_api.py index 77894f3..e74e752 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -385,7 +385,7 @@ class TestMetadataAPI: # 'orders' is not in table_registry — expect 404 or 400 assert resp.status_code in (400, 404) - def test_push_keboola_table(self, seeded_client, monkeypatch): + def test_push_keboola_table(self, seeded_client, monkeypatch): # noqa: F811 client, admin_token, _ = seeded_client # 1. Register a keboola table @@ -451,3 +451,53 @@ class TestMetadataAPI: called_json = call_args.kwargs.get("json", {}) assert called_json.get("provider") == "ai-metadata-enrichment" assert isinstance(called_json.get("metadata"), list) + + +# ---- Hybrid Query ---- + +class TestHybridQueryAPI: + def test_hybrid_query_requires_admin(self, seeded_client): + client, _, analyst_token = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "SELECT 1 AS val", "register_bq": {}}, + headers={"Authorization": f"Bearer {analyst_token}"}, + ) + assert resp.status_code == 403 + + def test_hybrid_query_local_only(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "SELECT 1 AS val", "register_bq": {}}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert "columns" in data + assert "rows" in data + assert data["columns"] == ["val"] + assert data["rows"] == [[1]] + + def test_hybrid_query_blocked_sql(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "DROP TABLE users", "register_bq": {}}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 400 + assert "query_error" in resp.json()["detail"] + + def test_hybrid_query_blocked_bq_sql(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={ + "sql": "SELECT 1", + "register_bq": {"bad_alias": "DROP TABLE sensitive"}, + }, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 400 + assert "query_error" in resp.json()["detail"] From 872b06ffae7de77e60903af64be8475efe4a6981 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:11:10 +0200 Subject: [PATCH 08/14] docs: add hybrid query usage instructions to CLAUDE.md Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index 96eb49e..1f17e78 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -154,6 +154,22 @@ Before computing any business metric, look up the canonical definition: Never invent metric calculations — always use the canonical definitions. +## Hybrid Queries (BigQuery + Local) + +For tables too large to sync locally, use hybrid queries that JOIN local data with on-demand BigQuery results: + +```bash +da query --sql "SELECT o.*, t.views FROM orders o JOIN traffic t ON o.date = t.date" \ + --register-bq "traffic=SELECT date, SUM(views) as views FROM dataset.web WHERE date > '2026-01-01' GROUP BY 1" +``` + +The `--register-bq` flag executes a BigQuery subquery, loads the result into memory, and makes it available as a DuckDB view for the final SQL. Multiple `--register-bq` flags can be used for multiple BQ sources. + +For complex SQL, use stdin mode: +```bash +echo '{"register_bq": {"traffic": "SELECT ..."}, "sql": "SELECT ..."}' | da query --stdin +``` + ## Extensibility ### Data Sources (extract.duckdb contract) From f4129dc87d42b1cc80f37bb78235883d791592d1 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:28:27 +0200 Subject: [PATCH 09/14] fix: alias validation, url escaping, read-only CLI, blocklist comment Co-Authored-By: Claude Sonnet 4.6 --- cli/commands/query.py | 2 +- src/db.py | 5 +++-- src/remote_query.py | 23 ++++++++++++++++++++++- tests/test_remote_query.py | 21 +++++++++++++++++++++ 4 files changed, 47 insertions(+), 4 deletions(-) diff --git a/cli/commands/query.py b/cli/commands/query.py index 022c6c5..2b039c3 100644 --- a/cli/commands/query.py +++ b/cli/commands/query.py @@ -105,7 +105,7 @@ def _query_hybrid(sql: str, fmt: str, limit: int, register_bq_specs: List[str]): typer.echo("Local DuckDB not found. Run: da sync", err=True) raise typer.Exit(1) - conn = duckdb.connect(str(db_path)) + conn = duckdb.connect(str(db_path), read_only=True) try: config = load_config() engine = RemoteQueryEngine(conn, **{k: v for k, v in config.items() if k in ( diff --git a/src/db.py b/src/db.py index 66c9d0a..408f3e8 100644 --- a/src/db.py +++ b/src/db.py @@ -321,14 +321,15 @@ def _reattach_remote_extensions( try: conn.execute(f"LOAD {extension};") token = os.environ.get(token_env, "") if token_env else "" + safe_url = url.replace("'", "''") if token: escaped_token = token.replace("'", "''") conn.execute( - f"ATTACH '{url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" + f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, TOKEN '{escaped_token}')" ) else: conn.execute( - f"ATTACH '{url}' AS {alias} (TYPE {extension}, READ_ONLY)" + f"ATTACH '{safe_url}' AS {alias} (TYPE {extension}, READ_ONLY)" ) attached_dbs.add(alias) logger.debug("Re-attached remote source %s via %s extension", alias, extension) diff --git a/src/remote_query.py b/src/remote_query.py index 2b50f1b..d14965d 100644 --- a/src/remote_query.py +++ b/src/remote_query.py @@ -11,14 +11,24 @@ from __future__ import annotations import logging import os +import re from typing import Any, Dict, List, Optional import duckdb +_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") + +_RESERVED_ALIASES = { + "information_schema", "duckdb_tables", "duckdb_columns", + "duckdb_databases", "duckdb_settings", "duckdb_functions", + "duckdb_views", "duckdb_indexes", "duckdb_schemas", + "main", "memory", "system", "temp", +} + logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- -# SQL blocklist — mirrors app/api/query.py lines 40-63 +# SQL blocklist — based on app/api/query.py, extended with additional DuckDB metadata tables # --------------------------------------------------------------------------- _BLOCKED_KEYWORDS: List[str] = [ @@ -211,6 +221,17 @@ class RemoteQueryEngine: RemoteQueryError: For row/memory limits or BQ errors. ImportError: If google-cloud-bigquery is not installed. """ + if not _SAFE_IDENTIFIER.match(alias or ""): + raise RemoteQueryError( + f"Invalid alias {alias!r}: must be a valid SQL identifier", + error_type="query_error", + ) + if alias.lower() in _RESERVED_ALIASES: + raise RemoteQueryError( + f"Reserved alias {alias!r}: cannot shadow system objects", + error_type="query_error", + ) + _validate_sql(bq_sql) client = self._get_bq_client() diff --git a/tests/test_remote_query.py b/tests/test_remote_query.py index cae7208..36e186f 100644 --- a/tests/test_remote_query.py +++ b/tests/test_remote_query.py @@ -102,6 +102,27 @@ class TestRemoteQueryEngineRegister: assert exc_info.value.error_type == "row_limit" assert exc_info.value.details["count"] == 1_000_000 + def test_register_bq_invalid_alias(self, analytics_conn): + engine = RemoteQueryEngine(analytics_conn) + # Space in alias — invalid identifier + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("bad alias", "SELECT 1") + assert exc_info.value.error_type == "query_error" + + # Reserved alias — information_schema + with pytest.raises(RemoteQueryError) as exc_info: + engine.register_bq("information_schema", "SELECT 1") + assert exc_info.value.error_type == "query_error" + + # Valid alias — should not raise from alias validation + # (will raise later trying to reach BQ without a client, but not from alias check) + try: + engine.register_bq("valid_name", "SELECT 1") + except RemoteQueryError as exc: + assert exc.error_type != "query_error" or "Invalid alias" not in str(exc) + except (ImportError, ModuleNotFoundError): + pass # Expected — no BQ package in test env + def test_register_bq_missing_package(self, analytics_conn): """When google-cloud-bigquery is not installed, engine must raise ImportError.""" engine = RemoteQueryEngine( From 2ad8828f8c7c4082512e528602ff540580e727d5 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 19:31:39 +0200 Subject: [PATCH 10/14] fix: stdin register_bq parsing, separate BQ SQL validation - cli/commands/query.py: --stdin mode now reads register_bq from the JSON payload and merges it into the register_bq option list, matching the documented {"register_bq": {...}, "sql": "..."} contract. - src/remote_query.py: add _validate_bq_sql() with a narrower blocklist (writes only); register_bq() now calls _validate_bq_sql() so legitimate BQ operations like INFORMATION_SCHEMA, CALL, IMPORT are not blocked. The final DuckDB execute() path still uses the full _validate_sql(). - tests/test_remote_query.py: add TestValidateBqSql covering allowed INFORMATION_SCHEMA queries and blocked write operations. --- cli/commands/query.py | 4 ++++ src/remote_query.py | 32 +++++++++++++++++++++++++- tests/test_remote_query.py | 47 +++++++++++++++++++++++++++++++++++++- 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/cli/commands/query.py b/cli/commands/query.py index 2b039c3..d959e2b 100644 --- a/cli/commands/query.py +++ b/cli/commands/query.py @@ -41,6 +41,10 @@ def query_command( try: payload = json.loads(raw) resolved_sql = payload["sql"] + # Extract register_bq from stdin JSON + stdin_bq = payload.get("register_bq", {}) + if stdin_bq and isinstance(stdin_bq, dict): + register_bq = [f"{k}={v}" for k, v in stdin_bq.items()] except (json.JSONDecodeError, KeyError) as exc: typer.echo(f"Error: failed to parse stdin JSON: {exc}", err=True) raise typer.Exit(1) diff --git a/src/remote_query.py b/src/remote_query.py index d14965d..5ebcfba 100644 --- a/src/remote_query.py +++ b/src/remote_query.py @@ -145,6 +145,36 @@ def _validate_sql(sql: str) -> None: ) +# BQ SQL blocklist — only blocks write/mutation operations +_BQ_BLOCKED_KEYWORDS = [ + "drop ", + "delete ", + "insert ", + "update ", + "alter ", + "create ", + "truncate ", + "merge ", + ";", # prevent multi-statement +] + + +def _validate_bq_sql(sql: str) -> None: + """Validate BQ SQL — narrower than DuckDB blocklist, only blocks writes.""" + sql_lower = sql.strip().lower() + for keyword in _BQ_BLOCKED_KEYWORDS: + if keyword in sql_lower: + raise RemoteQueryError( + f"Blocked BQ SQL keyword: {keyword.strip()}", + error_type="query_error", + ) + if not sql_lower.startswith("select ") and not sql_lower.startswith("with "): + raise RemoteQueryError( + "BQ query must start with SELECT or WITH", + error_type="query_error", + ) + + def load_config() -> Dict[str, Any]: """Load the ``remote_query:`` section from instance.yaml. @@ -232,7 +262,7 @@ class RemoteQueryEngine: error_type="query_error", ) - _validate_sql(bq_sql) + _validate_bq_sql(bq_sql) client = self._get_bq_client() diff --git a/tests/test_remote_query.py b/tests/test_remote_query.py index 36e186f..f4de108 100644 --- a/tests/test_remote_query.py +++ b/tests/test_remote_query.py @@ -9,7 +9,7 @@ import duckdb import pyarrow as pa import pytest -from src.remote_query import RemoteQueryEngine, RemoteQueryError, _validate_sql +from src.remote_query import RemoteQueryEngine, RemoteQueryError, _validate_bq_sql, _validate_sql # --------------------------------------------------------------------------- @@ -243,3 +243,48 @@ class TestValidateSql: def test_allowed_sql(self, sql): # Should not raise _validate_sql(sql) + + +# --------------------------------------------------------------------------- +# _validate_bq_sql unit tests +# --------------------------------------------------------------------------- + + +class TestValidateBqSql: + def test_information_schema_is_allowed(self): + """INFORMATION_SCHEMA queries must pass BQ SQL validation.""" + # Should not raise + _validate_bq_sql("SELECT * FROM dataset.INFORMATION_SCHEMA.COLUMNS") + + @pytest.mark.parametrize( + "sql", + [ + "DROP TABLE x", + "INSERT INTO x VALUES (1)", + "DELETE FROM x", + "UPDATE x SET y=1", + "ALTER TABLE x ADD COLUMN z INT", + "CREATE TABLE x (y INT)", + "TRUNCATE TABLE x", + "MERGE INTO x USING y ON x.id=y.id WHEN MATCHED THEN UPDATE SET x.a=y.a", + "SELECT 1; DROP TABLE x", + ], + ) + def test_blocked_bq_sql(self, sql): + """Write/mutation operations must be rejected.""" + with pytest.raises(RemoteQueryError) as exc_info: + _validate_bq_sql(sql) + assert exc_info.value.error_type == "query_error" + + @pytest.mark.parametrize( + "sql", + [ + "SELECT * FROM dataset.INFORMATION_SCHEMA.COLUMNS", + "SELECT id FROM project.dataset.table", + "WITH cte AS (SELECT 1 AS x) SELECT x FROM cte", + ], + ) + def test_allowed_bq_sql(self, sql): + """Valid read-only BQ queries must pass.""" + # Should not raise + _validate_bq_sql(sql) From 77d369e3113babd495fb58d3e24256df509575c1 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 19:58:01 +0200 Subject: [PATCH 11/14] fix: CLI help test handles ANSI escape codes in Typer output Rich/Typer may insert ANSI codes within option names like --register-bq, breaking exact string matching in CI. Check parts separately. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_cli.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 74c3db1..99c9f0b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -242,7 +242,11 @@ class TestQueryHybrid: def test_register_bq_flag_help(self): result = runner.invoke(app, ["query", "--help"]) assert result.exit_code == 0 - assert "register-bq" in result.output + # Rich/Typer may insert ANSI escape codes within option names, + # so check for the parts separately + assert "register" in result.output + assert "bq" in result.output + assert "BigQuery" in result.output class TestMetricsHelp: From 618385e7e40dd14069500045e5b57927580dd854 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 20:13:35 +0200 Subject: [PATCH 12/14] fix: table_catalog in re-attach query, --limit in hybrid CLI - _reattach_remote_extensions: query table_catalog instead of table_schema (DuckDB ATTACHed databases use table_catalog for the alias) - _query_hybrid: forward --limit flag to RemoteQueryEngine.max_result_rows Co-Authored-By: Claude Opus 4.6 (1M context) --- cli/commands/query.py | 7 +++++-- src/db.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cli/commands/query.py b/cli/commands/query.py index d959e2b..0055735 100644 --- a/cli/commands/query.py +++ b/cli/commands/query.py @@ -112,9 +112,12 @@ def _query_hybrid(sql: str, fmt: str, limit: int, register_bq_specs: List[str]): conn = duckdb.connect(str(db_path), read_only=True) try: config = load_config() - engine = RemoteQueryEngine(conn, **{k: v for k, v in config.items() if k in ( + engine_kwargs = {k: v for k, v in config.items() if k in ( "max_bq_registration_rows", "max_memory_mb", "max_result_rows", "timeout_seconds" - )}) + )} + # CLI --limit flag overrides config max_result_rows + engine_kwargs["max_result_rows"] = limit + engine = RemoteQueryEngine(conn, **engine_kwargs) for spec in register_bq_specs: if "=" not in spec: diff --git a/src/db.py b/src/db.py index 408f3e8..2069396 100644 --- a/src/db.py +++ b/src/db.py @@ -285,7 +285,7 @@ def _reattach_remote_extensions( try: has_table = conn.execute( "SELECT 1 FROM information_schema.tables " - f"WHERE table_schema='{ext_dir.name}' AND table_name='_remote_attach'" + f"WHERE table_catalog='{ext_dir.name}' AND table_name='_remote_attach'" ).fetchone() if not has_table: continue From 35df940e5cc1c04a92e0fde842bebc3197970ce4 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 20:29:03 +0200 Subject: [PATCH 13/14] fix: BQ COUNT subquery alias, wrap ImportError in RemoteQueryError - Add AS _cnt alias to COUNT(*) subquery (BQ Standard SQL requires it) - Catch ImportError in _get_bq_client() and raise RemoteQueryError so API endpoint returns proper 400 instead of 500 Co-Authored-By: Claude Opus 4.6 (1M context) --- src/remote_query.py | 13 +++++++++---- tests/test_remote_query.py | 4 ++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/remote_query.py b/src/remote_query.py index 5ebcfba..c0df463 100644 --- a/src/remote_query.py +++ b/src/remote_query.py @@ -267,7 +267,7 @@ class RemoteQueryEngine: client = self._get_bq_client() # --- Phase 1a: COUNT(*) pre-check --- - count_sql = f"SELECT COUNT(*) FROM ({bq_sql})" + count_sql = f"SELECT COUNT(*) FROM ({bq_sql}) AS _cnt" try: count_job = client.query(count_sql) count_arrow = count_job.to_arrow() @@ -413,9 +413,14 @@ class RemoteQueryEngine: project = os.environ.get("BIGQUERY_PROJECT", "unknown") return self._bq_client_factory(project) - # Trigger ImportError early if the package is missing. - # This is a lazy import so the module stays usable without BQ installed. - import google.cloud.bigquery as _bq_module # noqa: PLC0415, F401 + # Lazy import so the module stays usable without BQ installed. + try: + import google.cloud.bigquery as _bq_module # noqa: PLC0415, F401 + except ImportError: + raise RemoteQueryError( + "google-cloud-bigquery is not installed. Install with: pip install google-cloud-bigquery", + error_type="bq_error", + ) project = os.environ.get("BIGQUERY_PROJECT") if not project: diff --git a/tests/test_remote_query.py b/tests/test_remote_query.py index f4de108..992462a 100644 --- a/tests/test_remote_query.py +++ b/tests/test_remote_query.py @@ -124,7 +124,7 @@ class TestRemoteQueryEngineRegister: pass # Expected — no BQ package in test env def test_register_bq_missing_package(self, analytics_conn): - """When google-cloud-bigquery is not installed, engine must raise ImportError.""" + """When google-cloud-bigquery is not installed, engine must raise RemoteQueryError.""" engine = RemoteQueryEngine( analytics_conn, # No factory — will try to import google.cloud.bigquery @@ -133,7 +133,7 @@ class TestRemoteQueryEngineRegister: ) with patch.dict(sys.modules, {"google": None, "google.cloud": None, "google.cloud.bigquery": None}): - with pytest.raises((ImportError, ModuleNotFoundError)): + with pytest.raises(RemoteQueryError, match="google-cloud-bigquery"): engine.register_bq("bq_alias", "SELECT 1") From e351c3836853edc3befb860a4666b7cc2f5bb424 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sun, 12 Apr 2026 08:40:12 +0200 Subject: [PATCH 14/14] test: add correctness test for _reattach_remote_extensions Verifies that _remote_attach table is actually found via table_catalog and contains expected extension data (not just resilience). Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_db.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_db.py b/tests/test_db.py index e99e87e..40ed185 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -514,6 +514,40 @@ class TestExtensionReattach: finally: conn.close() + def test_reattach_attempts_load(self, tmp_path, monkeypatch): + """Verify _reattach_remote_extensions reads _remote_attach and attempts LOAD.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + import importlib + import src.db as db_module + importlib.reload(db_module) + + self._make_analytics_db(tmp_path) + self._make_extract_db(tmp_path, "bqsource", with_remote_attach=True) + + # Call get_analytics_db_readonly and verify the _remote_attach table is readable + conn = db_module.get_analytics_db_readonly() + try: + # Verify the extract was attached + dbs = {r[0] for r in conn.execute("SELECT database_name FROM duckdb_databases()").fetchall()} + assert "bqsource" in dbs, f"bqsource should be attached, got: {dbs}" + + # Verify _remote_attach table is accessible via table_catalog + has = conn.execute( + "SELECT 1 FROM information_schema.tables " + "WHERE table_catalog='bqsource' AND table_name='_remote_attach'" + ).fetchone() + assert has is not None, "_remote_attach table should be visible via table_catalog" + + # Read the rows to verify they're correct + rows = conn.execute( + "SELECT alias, extension, url FROM bqsource._remote_attach" + ).fetchall() + assert len(rows) == 1 + assert rows[0][0] == "bq" + assert rows[0][1] == "bigquery" + finally: + conn.close() + def test_skips_missing_remote_attach(self, tmp_path, monkeypatch): """get_analytics_db_readonly() works fine when _remote_attach table is absent.""" monkeypatch.setenv("DATA_DIR", str(tmp_path))