Step 28: Remote query architecture for local+remote table JOINs
Add src/remote_query.py CLI module enabling the AI agent to run SQL queries spanning local Parquet tables and remote BigQuery tables in a single DuckDB session on the server. Two-phase protocol: BQ sub-queries (--register-bq) fetch filtered/aggregated data, then DuckDB SQL (--sql) joins everything. Safety: COUNT(*) pre-check, memory estimation (2GB cap), row limits (500K per BQ sub-query, 100K final result). Changes: - New src/remote_query.py with CLI, BQ registration, output formatting - Add bq_entity_type field to TableConfig (view vs table routing) - Extract create_local_views() from duckdb_manager.py for reuse - Update claude_md_template.txt with remote query agent instructions - Update example configs with remote_query section and docs - 52 new tests (42 remote_query + 10 bq_entity_type), all passing
This commit is contained in:
parent
ed16122994
commit
d180b2014e
8 changed files with 1678 additions and 73 deletions
|
|
@ -46,6 +46,18 @@ tables:
|
|||
query_mode: "local"
|
||||
sync_schedule: "every 1h"
|
||||
profile_after_sync: false
|
||||
|
||||
# Remote table - too large for local sync, queried via BigQuery
|
||||
- id: "project.dataset.page_views"
|
||||
name: "page_views"
|
||||
description: "Page-level traffic metrics (~5M rows/day)"
|
||||
primary_key: "page_view_id"
|
||||
sync_strategy: "partitioned"
|
||||
partition_by: "event_date"
|
||||
partition_column_type: "DATE"
|
||||
partition_granularity: "day"
|
||||
query_mode: "remote"
|
||||
bq_entity_type: "view"
|
||||
```
|
||||
|
||||
## Table Configuration Reference
|
||||
|
|
@ -84,6 +96,7 @@ tables:
|
|||
| Field | Default | Description |
|
||||
|-------|---------|-------------|
|
||||
| `query_mode` | `"local"` | How the AI agent queries this table |
|
||||
| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) |
|
||||
|
||||
| Mode | Description | Best for |
|
||||
|------|-------------|----------|
|
||||
|
|
@ -91,6 +104,20 @@ tables:
|
|||
| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data |
|
||||
| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data |
|
||||
|
||||
### Remote Table Metadata
|
||||
|
||||
Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent
|
||||
write safe, efficient queries:
|
||||
|
||||
| Field | Description |
|
||||
|-------|-------------|
|
||||
| `grain` | Row granularity description |
|
||||
| `volume` | Size estimates (rows_per_day, unique entities) |
|
||||
| `columns` | Column descriptions with value distributions |
|
||||
| `dimension_profile` | Cardinality per dimension |
|
||||
| `query_result_estimates` | Expected rows after GROUP BY combinations |
|
||||
| `join_keys` | How to join with other tables |
|
||||
|
||||
### Automatic Sync Schedule
|
||||
|
||||
| Field | Default | Description |
|
||||
|
|
|
|||
|
|
@ -181,3 +181,13 @@ catalog:
|
|||
# alert_missing_pct: 5.0 # Alert threshold for missing %
|
||||
# alert_imbalance_pct: 60.0 # Alert threshold for imbalance %
|
||||
# alert_high_cardinality: 50 # Alert threshold for high cardinality columns
|
||||
|
||||
# --- Remote query (optional) ---
|
||||
# Settings for remote BigQuery queries via `python -m src.remote_query`.
|
||||
# Used when tables have query_mode: "remote" in data_description.md.
|
||||
# remote_query:
|
||||
# timeout_seconds: 300 # BQ + DuckDB query timeout
|
||||
# max_result_rows: 100000 # Max rows in final output
|
||||
# max_bq_registration_rows: 500000 # Max rows per --register-bq sub-query
|
||||
# default_format: "table" # Default output format
|
||||
# output_dir: "/tmp/remote_query" # Directory for Parquet/CSV exports
|
||||
|
|
|
|||
|
|
@ -183,6 +183,133 @@ You're ready to analyze!
|
|||
|
||||
---
|
||||
|
||||
## Remote Queries (BigQuery)
|
||||
|
||||
Some tables are too large for local Parquet sync and are queried remotely via BigQuery.
|
||||
These tables have `query_mode: "remote"` in `server/docs/data_description.md`.
|
||||
|
||||
### How to recognize remote tables
|
||||
|
||||
Before writing any query, read `server/docs/data_description.md`. Each table has:
|
||||
- `query_mode: "local"` -- available as a local DuckDB view (query normally)
|
||||
- `query_mode: "remote"` -- NOT in local DuckDB, must use remote query protocol below
|
||||
- `query_mode: "hybrid"` -- local view exists AND can query BQ for live data
|
||||
|
||||
### Remote table metadata in data_description.md
|
||||
|
||||
Remote tables include metadata to help you write safe queries:
|
||||
|
||||
- **`volume`** -- rows_per_day, unique entities per day (tells you table size)
|
||||
- **`columns`** -- column names, types, value distributions
|
||||
- **`dimension_profile`** -- cardinality per dimension with value distributions
|
||||
- **`query_result_estimates`** -- expected row counts after GROUP BY combinations
|
||||
- **`join_keys`** -- how to join with other tables
|
||||
|
||||
**ALWAYS read these sections before writing a remote query.** Use `query_result_estimates`
|
||||
to predict how many rows your query will return. The server has limited RAM -- keep BQ
|
||||
sub-query results under 500K rows.
|
||||
|
||||
### Two-phase query protocol
|
||||
|
||||
Remote queries run **on the server** via SSH (server has DuckDB + Parquet + BigQuery access).
|
||||
You write two SQL statements:
|
||||
|
||||
1. **BQ sub-query** (`--register-bq "alias=SQL"`) -- runs on BigQuery, result registered in DuckDB as a view.
|
||||
This MUST contain WHERE and/or GROUP BY to reduce the result set. Never SELECT * from a remote table.
|
||||
2. **DuckDB SQL** (`--sql "SQL"`) -- runs in DuckDB after all views (local + BQ) are ready.
|
||||
Can JOIN local tables with registered BQ results.
|
||||
|
||||
### Command format
|
||||
|
||||
```bash
|
||||
ssh {ssh_alias} 'cd /opt/data-analyst && \
|
||||
set -a && source .env && set +a && \
|
||||
PYTHONPATH=repo CONFIG_DIR=instance/config \
|
||||
.venv/bin/python3 -m src.remote_query \
|
||||
--register-bq "ALIAS=BQ_SQL_QUERY" \
|
||||
--sql "DUCKDB_SQL_QUERY" \
|
||||
--format table'
|
||||
```
|
||||
|
||||
Arguments:
|
||||
- `--register-bq "alias=SQL"` -- Register a BQ query result as DuckDB view (repeatable for multiple remote tables)
|
||||
- `--sql "SQL"` -- The final DuckDB query (can reference local views + registered BQ aliases)
|
||||
- `--format table|csv|json|parquet` -- Output format (default: table)
|
||||
- `--output /path/file` -- Output file for parquet/csv/json
|
||||
- `--max-rows N` -- Override max result rows
|
||||
|
||||
### Example 1: Remote-only query (aggregated data)
|
||||
|
||||
```bash
|
||||
ssh {ssh_alias} 'cd /opt/data-analyst && \
|
||||
set -a && source .env && set +a && \
|
||||
PYTHONPATH=repo CONFIG_DIR=instance/config \
|
||||
.venv/bin/python3 -m src.remote_query \
|
||||
--register-bq "agg_data=SELECT date_col, dim_col, SUM(metric) as total FROM \`project.dataset.table\` WHERE date_col >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) GROUP BY 1,2" \
|
||||
--sql "SELECT * FROM agg_data ORDER BY date_col, dim_col" \
|
||||
--format table'
|
||||
```
|
||||
|
||||
### Example 2: JOIN local + remote
|
||||
|
||||
```bash
|
||||
ssh {ssh_alias} 'cd /opt/data-analyst && \
|
||||
set -a && source .env && set +a && \
|
||||
PYTHONPATH=repo CONFIG_DIR=instance/config \
|
||||
.venv/bin/python3 -m src.remote_query \
|
||||
--register-bq "remote_data=SELECT date_col, dim_col, SUM(metric) as total FROM \`project.dataset.table\` WHERE date_col >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) GROUP BY 1,2" \
|
||||
--sql "SELECT l.*, r.total FROM local_table l JOIN remote_data r ON l.date_col = r.date_col AND l.dim_col = r.dim_col ORDER BY 1,2" \
|
||||
--format table'
|
||||
```
|
||||
|
||||
### Example 3: Download result as Parquet for local analysis
|
||||
|
||||
```bash
|
||||
# 1. Run query, save as Parquet on server
|
||||
ssh {ssh_alias} 'cd /opt/data-analyst && \
|
||||
set -a && source .env && set +a && \
|
||||
PYTHONPATH=repo CONFIG_DIR=instance/config \
|
||||
.venv/bin/python3 -m src.remote_query \
|
||||
--register-bq "remote_data=SELECT ... GROUP BY ..." \
|
||||
--sql "SELECT ... JOIN ..." \
|
||||
--format parquet --output /tmp/remote_query/analysis.parquet'
|
||||
|
||||
# 2. Download to local machine
|
||||
scp {ssh_alias}:/tmp/remote_query/analysis.parquet ./user/parquet/
|
||||
|
||||
# 3. Register in local DuckDB for further analysis
|
||||
python3 -c "
|
||||
import duckdb
|
||||
conn = duckdb.connect('user/duckdb/analytics.duckdb')
|
||||
conn.execute(\"CREATE OR REPLACE VIEW analysis AS SELECT * FROM read_parquet('user/parquet/analysis.parquet')\")
|
||||
print('View created:', conn.execute('SELECT COUNT(*) FROM analysis').fetchone()[0], 'rows')
|
||||
conn.close()
|
||||
"
|
||||
```
|
||||
|
||||
### How to estimate result sizes
|
||||
|
||||
Before writing a BQ sub-query, check `dimension_profile` and `query_result_estimates`
|
||||
in `server/docs/data_description.md`.
|
||||
|
||||
**Rule of thumb:** rows = (estimate per day from query_result_estimates) * (number of days in WHERE clause).
|
||||
If that exceeds 100K rows, add more aggregation or tighter date filters.
|
||||
|
||||
### Safety rules
|
||||
|
||||
1. **NEVER** run `SELECT * FROM remote_table` without WHERE + GROUP BY
|
||||
2. **ALWAYS** check `dimension_profile` before writing BQ sub-queries
|
||||
3. **ALWAYS** include date range in WHERE clause
|
||||
4. **Limits**: 500K rows max per BQ sub-query, 100K rows max in final result
|
||||
5. If the query might take > 60 seconds, use nohup pattern:
|
||||
```bash
|
||||
ssh {ssh_alias} 'nohup ... --format parquet --output /tmp/result.parquet > /tmp/rq.log 2>&1 &'
|
||||
ssh {ssh_alias} 'tail -5 /tmp/rq.log' # check progress
|
||||
scp {ssh_alias}:/tmp/result.parquet ./user/parquet/
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Reporting Issues
|
||||
|
||||
Report issues to your platform team or the project's issue tracker.
|
||||
|
|
|
|||
|
|
@ -270,6 +270,89 @@ def get_remote_tables(table_configs: List[Dict]) -> List[Dict]:
|
|||
]
|
||||
|
||||
|
||||
def create_local_views(
|
||||
conn: duckdb.DuckDBPyConnection,
|
||||
data_dir: str,
|
||||
project_root: Optional[Path] = None,
|
||||
verbose: bool = True,
|
||||
) -> Tuple[List[str], List[str]]:
|
||||
"""Create DuckDB views for local/hybrid tables from Parquet files.
|
||||
|
||||
Extracts the shared logic from init_duckdb() so it can be reused
|
||||
by remote_query.py without creating a persistent DB file.
|
||||
|
||||
Args:
|
||||
conn: Open DuckDB connection (in-memory or file-backed)
|
||||
data_dir: Base data directory (e.g., "server", "/data/src_data")
|
||||
project_root: Project root path. If None, auto-detected.
|
||||
verbose: Print progress messages
|
||||
|
||||
Returns:
|
||||
Tuple of (created_view_names, skipped_table_names)
|
||||
"""
|
||||
if project_root is None:
|
||||
project_root = find_project_root()
|
||||
|
||||
table_configs, folder_mapping = parse_data_description(project_root)
|
||||
|
||||
# Filter to local and hybrid tables (both have local Parquet)
|
||||
eligible_tables = [
|
||||
tc for tc in table_configs
|
||||
if tc.get("query_mode", "local") in ("local", "hybrid")
|
||||
]
|
||||
|
||||
created_views = []
|
||||
skipped_views = []
|
||||
|
||||
for table_config in eligible_tables:
|
||||
table_name = table_config['name']
|
||||
|
||||
try:
|
||||
parquet_path = get_parquet_path(table_config, folder_mapping, data_dir)
|
||||
|
||||
if not parquet_path.exists():
|
||||
skipped_views.append(table_name)
|
||||
if verbose:
|
||||
print(f" [SKIP] {table_name} - parquet not found: {parquet_path}")
|
||||
continue
|
||||
|
||||
# Determine if partitioned
|
||||
sync_strategy = table_config.get('sync_strategy', 'full_refresh')
|
||||
is_partitioned = (
|
||||
sync_strategy == "partitioned" or
|
||||
(sync_strategy == "incremental" and table_config.get('partition_by'))
|
||||
)
|
||||
|
||||
if is_partitioned:
|
||||
glob_pattern = parquet_path / "*.parquet"
|
||||
partition_files = list(parquet_path.glob("*.parquet"))
|
||||
if not partition_files:
|
||||
skipped_views.append(table_name)
|
||||
if verbose:
|
||||
print(f" [SKIP] {table_name} - no partition files")
|
||||
continue
|
||||
|
||||
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)"
|
||||
if verbose:
|
||||
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
|
||||
print(f" [OK] {table_name} ({len(partition_files)} partitions, {mode_label})")
|
||||
else:
|
||||
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}')"
|
||||
if verbose:
|
||||
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
|
||||
print(f" [OK] {table_name} ({mode_label})")
|
||||
|
||||
conn.execute(sql)
|
||||
created_views.append(table_name)
|
||||
|
||||
except Exception as e:
|
||||
if verbose:
|
||||
print(f" [ERR] Error creating {table_name}: {e}")
|
||||
raise
|
||||
|
||||
return created_views, skipped_views
|
||||
|
||||
|
||||
def init_duckdb(
|
||||
db_path="user/duckdb/analytics.duckdb",
|
||||
data_dir="server",
|
||||
|
|
@ -299,7 +382,6 @@ def init_duckdb(
|
|||
print("Initializing DuckDB database...")
|
||||
|
||||
try:
|
||||
# Find project root and parse data_description.md
|
||||
project_root = find_project_root()
|
||||
if verbose:
|
||||
print(f" Project root: {project_root}")
|
||||
|
|
@ -308,87 +390,25 @@ def init_duckdb(
|
|||
if verbose:
|
||||
print(f" Loaded {len(table_configs)} tables from data_description.md")
|
||||
|
||||
# Separate tables by query_mode
|
||||
local_tables = []
|
||||
remote_tables = []
|
||||
hybrid_tables = []
|
||||
|
||||
for tc in table_configs:
|
||||
mode = tc.get("query_mode", "local")
|
||||
if mode == "remote":
|
||||
remote_tables.append(tc)
|
||||
elif mode == "hybrid":
|
||||
hybrid_tables.append(tc)
|
||||
else:
|
||||
local_tables.append(tc)
|
||||
# Classify tables by query_mode (for display only)
|
||||
remote_tables = [tc for tc in table_configs if tc.get("query_mode") == "remote"]
|
||||
local_count = len([tc for tc in table_configs if tc.get("query_mode", "local") == "local"])
|
||||
hybrid_count = len([tc for tc in table_configs if tc.get("query_mode") == "hybrid"])
|
||||
|
||||
if verbose:
|
||||
print(f" Query modes: {len(local_tables)} local, "
|
||||
f"{len(remote_tables)} remote, {len(hybrid_tables)} hybrid")
|
||||
print(f" Query modes: {local_count} local, "
|
||||
f"{len(remote_tables)} remote, {hybrid_count} hybrid")
|
||||
|
||||
# Connect to database (creates if doesn't exist)
|
||||
conn = duckdb.connect(db_path)
|
||||
|
||||
# Create local views from parquet files
|
||||
if verbose:
|
||||
print("\n Creating views from parquet files...")
|
||||
|
||||
created_views = []
|
||||
skipped_views = []
|
||||
|
||||
# Process local and hybrid tables (both have local parquet)
|
||||
for table_config in local_tables + hybrid_tables:
|
||||
table_name = table_config['name']
|
||||
|
||||
try:
|
||||
# Get parquet path
|
||||
parquet_path = get_parquet_path(table_config, folder_mapping, data_dir)
|
||||
|
||||
# Check if file/directory exists
|
||||
if not parquet_path.exists():
|
||||
skipped_views.append(table_name)
|
||||
if verbose:
|
||||
print(f" [SKIP] {table_name} - parquet not found: {parquet_path}")
|
||||
continue
|
||||
|
||||
# Determine if partitioned
|
||||
sync_strategy = table_config.get('sync_strategy', 'full_refresh')
|
||||
is_partitioned = (
|
||||
sync_strategy == "partitioned" or
|
||||
(sync_strategy == "incremental" and table_config.get('partition_by'))
|
||||
)
|
||||
|
||||
# Create view
|
||||
if is_partitioned:
|
||||
# For partitioned tables, use glob pattern
|
||||
glob_pattern = parquet_path / "*.parquet"
|
||||
|
||||
# Check if there are any partition files
|
||||
partition_files = list(parquet_path.glob("*.parquet"))
|
||||
if not partition_files:
|
||||
skipped_views.append(table_name)
|
||||
if verbose:
|
||||
print(f" [SKIP] {table_name} - no partition files")
|
||||
continue
|
||||
|
||||
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)"
|
||||
if verbose:
|
||||
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
|
||||
print(f" [OK] {table_name} ({len(partition_files)} partitions, {mode_label})")
|
||||
else:
|
||||
# Single parquet file
|
||||
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}')"
|
||||
if verbose:
|
||||
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
|
||||
print(f" [OK] {table_name} ({mode_label})")
|
||||
|
||||
conn.execute(sql)
|
||||
created_views.append(table_name)
|
||||
|
||||
except Exception as e:
|
||||
if verbose:
|
||||
print(f" [ERR] Error creating {table_name}: {e}")
|
||||
return False
|
||||
# Delegate to create_local_views
|
||||
created_views, skipped_views = create_local_views(
|
||||
conn, data_dir, project_root=project_root, verbose=verbose,
|
||||
)
|
||||
|
||||
# Log remote tables (queried at runtime via register_bq_table)
|
||||
if remote_tables:
|
||||
|
|
|
|||
|
|
@ -107,6 +107,7 @@ class TableConfig:
|
|||
columns: Optional[List[str]] = None # Subset of columns to sync (None = all)
|
||||
row_filter: Optional[str] = None # SQL WHERE clause for filtering (e.g., "event_date >= '2024-01-01'")
|
||||
query_mode: str = "local" # "local" (Parquet) | "remote" (BQ direct) | "hybrid" (sync subset, query BQ)
|
||||
bq_entity_type: str = "view" # "view" (Python BQ client) | "table" (DuckDB BQ extension)
|
||||
partition_column_type: str = "TIMESTAMP" # BQ SQL type for partition column: "DATE", "TIMESTAMP", "DATETIME"
|
||||
catalog_fqn: Optional[str] = None # Explicit OpenMetadata FQN override (auto-derived if not set)
|
||||
sync_schedule: Optional[str] = None # Schedule: "every 15m", "every 1h", "daily 05:00" (UTC)
|
||||
|
|
@ -122,6 +123,14 @@ class TableConfig:
|
|||
f"Allowed values: {', '.join(valid_query_modes)}"
|
||||
)
|
||||
|
||||
# Validate bq_entity_type
|
||||
valid_entity_types = ("view", "table")
|
||||
if self.bq_entity_type not in valid_entity_types:
|
||||
raise ValueError(
|
||||
f"Invalid bq_entity_type '{self.bq_entity_type}' for table {self.id}. "
|
||||
f"Allowed values: {', '.join(valid_entity_types)}"
|
||||
)
|
||||
|
||||
# Validate sync_strategy
|
||||
if self.sync_strategy not in ["full_refresh", "incremental", "partitioned"]:
|
||||
raise ValueError(
|
||||
|
|
@ -473,6 +482,7 @@ class Config:
|
|||
columns=table_data.get("columns"),
|
||||
row_filter=table_data.get("row_filter"),
|
||||
query_mode=table_data.get("query_mode", "local"),
|
||||
bq_entity_type=table_data.get("bq_entity_type", "view"),
|
||||
partition_column_type=table_data.get("partition_column_type", "TIMESTAMP"),
|
||||
catalog_fqn=table_data.get("catalog_fqn"),
|
||||
sync_schedule=table_data.get("sync_schedule"),
|
||||
|
|
|
|||
564
src/remote_query.py
Normal file
564
src/remote_query.py
Normal file
|
|
@ -0,0 +1,564 @@
|
|||
"""
|
||||
Remote Query - Execute DuckDB queries spanning local Parquet + remote BigQuery tables.
|
||||
|
||||
Provides a server-side CLI for the AI agent to run SQL queries that JOIN local
|
||||
(Parquet-backed) tables with on-demand BigQuery results. Designed for tables too
|
||||
large to sync locally (e.g., daily_deal_traffic: ~3M rows/day).
|
||||
|
||||
Two-phase query protocol:
|
||||
1. BQ sub-queries (--register-bq "alias=SQL") run on BigQuery, results registered
|
||||
as DuckDB views via PyArrow (reuses register_bq_table from duckdb_manager).
|
||||
2. DuckDB SQL (--sql) runs against local Parquet views + registered BQ results.
|
||||
|
||||
Usage:
|
||||
python -m src.remote_query \\
|
||||
--sql "SELECT ... FROM order_economics o JOIN traffic t ON ..." \\
|
||||
--register-bq "traffic=SELECT ... FROM \\`project.dataset.table\\` WHERE ..." \\
|
||||
--format table
|
||||
|
||||
Safety features:
|
||||
- COUNT(*) pre-check before fetching BQ data
|
||||
- Memory estimation (refuses queries > 2 GB estimated)
|
||||
- Configurable row limits (per BQ sub-query and final result)
|
||||
- Query timeout support
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import duckdb
|
||||
|
||||
from config.loader import load_instance_config, get_instance_value
|
||||
from scripts.duckdb_manager import (
|
||||
create_local_views,
|
||||
register_bq_table,
|
||||
_create_bq_client,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RemoteQueryError(Exception):
|
||||
"""Error during remote query execution."""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_remote_query_config() -> dict:
|
||||
"""Load remote_query settings from instance.yaml with defaults."""
|
||||
try:
|
||||
instance_config = load_instance_config()
|
||||
except (FileNotFoundError, ValueError) as e:
|
||||
logger.warning("Could not load instance config: %s. Using defaults.", e)
|
||||
instance_config = {}
|
||||
|
||||
return {
|
||||
"timeout_seconds": get_instance_value(
|
||||
instance_config, "remote_query", "timeout_seconds", default=300,
|
||||
),
|
||||
"max_result_rows": get_instance_value(
|
||||
instance_config, "remote_query", "max_result_rows", default=100_000,
|
||||
),
|
||||
"max_bq_registration_rows": get_instance_value(
|
||||
instance_config, "remote_query", "max_bq_registration_rows", default=500_000,
|
||||
),
|
||||
"default_format": get_instance_value(
|
||||
instance_config, "remote_query", "default_format", default="table",
|
||||
),
|
||||
"output_dir": get_instance_value(
|
||||
instance_config, "remote_query", "output_dir", default="/tmp/remote_query",
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BQ registration with safety checks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _validate_bq_result_size(
|
||||
bq_client, sql: str, alias: str, max_rows: int,
|
||||
) -> int:
|
||||
"""Execute COUNT(*) on the BQ sub-query before fetching all rows.
|
||||
|
||||
Args:
|
||||
bq_client: BigQuery client instance
|
||||
sql: The BQ SQL query to count
|
||||
alias: Alias name (for error messages)
|
||||
max_rows: Maximum allowed rows
|
||||
|
||||
Returns:
|
||||
Row count
|
||||
|
||||
Raises:
|
||||
RemoteQueryError: If count exceeds max_rows
|
||||
"""
|
||||
count_sql = f"SELECT COUNT(*) AS cnt FROM ({sql})"
|
||||
_log_progress(f" Counting rows for '{alias}'...")
|
||||
|
||||
job = bq_client.query(count_sql)
|
||||
result = job.result()
|
||||
row_count = next(iter(result))[0]
|
||||
|
||||
if row_count > max_rows:
|
||||
raise RemoteQueryError(
|
||||
f"BQ sub-query '{alias}' would return {row_count:,} rows "
|
||||
f"(limit: {max_rows:,}). Add more WHERE filters or GROUP BY "
|
||||
f"to reduce the result set."
|
||||
)
|
||||
|
||||
return row_count
|
||||
|
||||
|
||||
def _estimate_memory_mb(row_count: int, column_count: int) -> float:
|
||||
"""Estimate memory usage in MB for a PyArrow table.
|
||||
|
||||
Uses ~50 bytes per cell as a rough average across data types.
|
||||
"""
|
||||
return (row_count * column_count * 50) / (1024 * 1024)
|
||||
|
||||
|
||||
def _register_bq_views(
|
||||
conn: duckdb.DuckDBPyConnection,
|
||||
registrations: list[tuple[str, str]],
|
||||
max_bq_rows: int,
|
||||
timeout_seconds: int,
|
||||
quiet: bool = False,
|
||||
) -> dict[str, int]:
|
||||
"""Register BQ query results as DuckDB views with safety checks.
|
||||
|
||||
Args:
|
||||
conn: DuckDB connection
|
||||
registrations: List of (alias, bq_sql) tuples
|
||||
max_bq_rows: Max rows per sub-query
|
||||
timeout_seconds: BQ job timeout
|
||||
quiet: Suppress progress messages
|
||||
|
||||
Returns:
|
||||
Dict of {alias: row_count}
|
||||
"""
|
||||
if not registrations:
|
||||
return {}
|
||||
|
||||
bq_project = os.environ.get("BIGQUERY_PROJECT")
|
||||
if not bq_project:
|
||||
raise RemoteQueryError(
|
||||
"BIGQUERY_PROJECT environment variable not set. "
|
||||
"Required for BigQuery sub-queries."
|
||||
)
|
||||
|
||||
bq_client = _create_bq_client(bq_project)
|
||||
results = {}
|
||||
|
||||
for alias, bq_sql in registrations:
|
||||
start_time = time.time()
|
||||
|
||||
# Phase 1: COUNT(*) safety check
|
||||
row_count = _validate_bq_result_size(bq_client, bq_sql, alias, max_bq_rows)
|
||||
_log_progress(f" '{alias}': {row_count:,} rows (within limit)")
|
||||
|
||||
# Phase 2: Memory estimation
|
||||
# Estimate column count from a LIMIT 0 query (cheap)
|
||||
sample_job = bq_client.query(f"SELECT * FROM ({bq_sql}) LIMIT 0")
|
||||
schema = sample_job.result().schema
|
||||
col_count = len(schema)
|
||||
estimated_mb = _estimate_memory_mb(row_count, col_count)
|
||||
|
||||
if estimated_mb > 2048: # 2 GB = 25% of 8 GB server RAM
|
||||
raise RemoteQueryError(
|
||||
f"BQ sub-query '{alias}' estimated memory: {estimated_mb:.0f} MB "
|
||||
f"({row_count:,} rows x {col_count} cols). "
|
||||
f"Limit is 2048 MB. Add more aggregation or filters."
|
||||
)
|
||||
|
||||
# Phase 3: Execute and register
|
||||
_log_progress(f" Fetching '{alias}' ({row_count:,} rows, ~{estimated_mb:.0f} MB)...")
|
||||
actual_rows = register_bq_table(
|
||||
conn=conn,
|
||||
table_id=f"bq_registration.{alias}",
|
||||
view_name=alias,
|
||||
sql=bq_sql,
|
||||
bq_project=bq_project,
|
||||
)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
_log_progress(f" '{alias}' registered: {actual_rows:,} rows in {elapsed:.1f}s")
|
||||
results[alias] = actual_rows
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Local view setup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _setup_local_views(
|
||||
conn: duckdb.DuckDBPyConnection,
|
||||
data_dir: str,
|
||||
quiet: bool = False,
|
||||
) -> list[str]:
|
||||
"""Create DuckDB views for all local/hybrid tables from Parquet.
|
||||
|
||||
Args:
|
||||
conn: DuckDB connection
|
||||
data_dir: Path to data directory (e.g., "/data/src_data")
|
||||
quiet: Suppress progress messages
|
||||
|
||||
Returns:
|
||||
List of created view names
|
||||
"""
|
||||
created, skipped = create_local_views(
|
||||
conn=conn,
|
||||
data_dir=data_dir,
|
||||
verbose=not quiet,
|
||||
)
|
||||
return created
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Output formatting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _print_table(columns: list[str], rows: list[tuple]) -> None:
|
||||
"""Print an aligned ASCII table to stdout."""
|
||||
if not rows:
|
||||
print("(empty result)")
|
||||
return
|
||||
|
||||
# Calculate column widths
|
||||
str_rows = [[str(v) if v is not None else "NULL" for v in row] for row in rows]
|
||||
widths = [len(col) for col in columns]
|
||||
for row in str_rows:
|
||||
for i, val in enumerate(row):
|
||||
widths[i] = max(widths[i], len(val))
|
||||
|
||||
# Header
|
||||
header = " | ".join(col.ljust(widths[i]) for i, col in enumerate(columns))
|
||||
separator = "-+-".join("-" * widths[i] for i in range(len(columns)))
|
||||
print(header)
|
||||
print(separator)
|
||||
|
||||
# Rows
|
||||
for row in str_rows:
|
||||
line = " | ".join(val.ljust(widths[i]) for i, val in enumerate(row))
|
||||
print(line)
|
||||
|
||||
print(f"\n({len(rows)} rows)")
|
||||
|
||||
|
||||
def _format_output(
|
||||
conn: duckdb.DuckDBPyConnection,
|
||||
sql: str,
|
||||
fmt: str,
|
||||
output_path: Optional[str],
|
||||
max_rows: int,
|
||||
) -> None:
|
||||
"""Execute final SQL and output results in the requested format.
|
||||
|
||||
Args:
|
||||
conn: DuckDB connection with all views registered
|
||||
sql: The final DuckDB SQL query
|
||||
fmt: Output format (table, csv, json, parquet)
|
||||
output_path: File path for file-based outputs
|
||||
max_rows: Maximum rows to return
|
||||
"""
|
||||
# Add LIMIT to prevent runaway results
|
||||
limited_sql = f"SELECT * FROM ({sql}) AS _rq LIMIT {max_rows + 1}"
|
||||
result = conn.execute(limited_sql)
|
||||
columns = [desc[0] for desc in result.description]
|
||||
rows = result.fetchall()
|
||||
|
||||
# Check if result exceeded limit
|
||||
if len(rows) > max_rows:
|
||||
rows = rows[:max_rows]
|
||||
_log_progress(
|
||||
f" WARNING: Result truncated to {max_rows:,} rows. "
|
||||
f"Add more filters or increase --max-rows."
|
||||
)
|
||||
|
||||
if fmt == "table":
|
||||
_print_table(columns, rows)
|
||||
|
||||
elif fmt == "csv":
|
||||
if output_path:
|
||||
with open(output_path, "w", newline="") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(columns)
|
||||
writer.writerows(rows)
|
||||
_log_progress(f" CSV written: {output_path} ({len(rows)} rows)")
|
||||
else:
|
||||
writer = csv.writer(sys.stdout)
|
||||
writer.writerow(columns)
|
||||
writer.writerows(rows)
|
||||
|
||||
elif fmt == "json":
|
||||
data = [dict(zip(columns, row)) for row in rows]
|
||||
json_str = json.dumps(data, default=str, indent=2)
|
||||
if output_path:
|
||||
with open(output_path, "w") as f:
|
||||
f.write(json_str)
|
||||
_log_progress(f" JSON written: {output_path} ({len(rows)} rows)")
|
||||
else:
|
||||
print(json_str)
|
||||
|
||||
elif fmt == "parquet":
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
# Re-execute without limit wrapper for clean Arrow export
|
||||
arrow_result = conn.execute(
|
||||
f"SELECT * FROM ({sql}) AS _rq LIMIT {max_rows}"
|
||||
).fetch_arrow_table()
|
||||
|
||||
if not output_path:
|
||||
output_path = str(Path(_load_remote_query_config()["output_dir"]) / "result.parquet")
|
||||
|
||||
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
pq.write_table(arrow_result, output_path)
|
||||
_log_progress(
|
||||
f" Parquet written: {output_path} "
|
||||
f"({arrow_result.num_rows} rows, {arrow_result.num_columns} cols)"
|
||||
)
|
||||
|
||||
else:
|
||||
raise RemoteQueryError(f"Unknown format: {fmt}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Progress logging (stderr so stdout stays clean for data)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_quiet_mode = False
|
||||
|
||||
|
||||
def _log_progress(msg: str) -> None:
|
||||
"""Print progress message to stderr (keeps stdout clean for data output)."""
|
||||
if not _quiet_mode:
|
||||
print(msg, file=sys.stderr)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main execution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def execute_remote_query(
|
||||
sql: str,
|
||||
bq_registrations: list[tuple[str, str]],
|
||||
fmt: str = "table",
|
||||
output: Optional[str] = None,
|
||||
max_rows: Optional[int] = None,
|
||||
max_bq_rows: Optional[int] = None,
|
||||
timeout: Optional[int] = None,
|
||||
data_dir: str = "/data/src_data",
|
||||
quiet: bool = False,
|
||||
) -> None:
|
||||
"""Main execution function for remote queries.
|
||||
|
||||
Args:
|
||||
sql: DuckDB SQL query to execute
|
||||
bq_registrations: List of (alias, bq_sql) tuples
|
||||
fmt: Output format (table, csv, json, parquet)
|
||||
output: Output file path (for parquet/csv/json)
|
||||
max_rows: Max rows in final result
|
||||
max_bq_rows: Max rows per BQ sub-query
|
||||
timeout: Query timeout in seconds
|
||||
data_dir: Path to data directory
|
||||
quiet: Suppress progress messages
|
||||
"""
|
||||
global _quiet_mode
|
||||
_quiet_mode = quiet
|
||||
|
||||
config = _load_remote_query_config()
|
||||
max_rows = max_rows or config["max_result_rows"]
|
||||
max_bq_rows = max_bq_rows or config["max_bq_registration_rows"]
|
||||
timeout = timeout or config["timeout_seconds"]
|
||||
fmt = fmt or config["default_format"]
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# Create in-memory DuckDB connection
|
||||
conn = duckdb.connect(":memory:")
|
||||
|
||||
try:
|
||||
# Step 1: Register local Parquet views
|
||||
_log_progress("Setting up local views...")
|
||||
local_views = _setup_local_views(conn, data_dir, quiet=quiet)
|
||||
_log_progress(f" {len(local_views)} local views ready")
|
||||
|
||||
# Step 2: Register BQ sub-query results
|
||||
if bq_registrations:
|
||||
_log_progress(f"Registering {len(bq_registrations)} BQ sub-queries...")
|
||||
bq_results = _register_bq_views(
|
||||
conn, bq_registrations, max_bq_rows, timeout, quiet=quiet,
|
||||
)
|
||||
for alias, count in bq_results.items():
|
||||
_log_progress(f" {alias}: {count:,} rows")
|
||||
|
||||
# Step 3: Execute the final DuckDB query
|
||||
_log_progress("Executing query...")
|
||||
_format_output(conn, sql, fmt, output, max_rows)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
_log_progress(f"Done in {elapsed:.1f}s")
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI argument parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_register_bq(value: str) -> tuple[str, str]:
|
||||
"""Parse --register-bq argument in 'alias=SQL' format.
|
||||
|
||||
Args:
|
||||
value: String in format "alias=SELECT ..."
|
||||
|
||||
Returns:
|
||||
Tuple of (alias, sql)
|
||||
|
||||
Raises:
|
||||
argparse.ArgumentTypeError: If format is invalid
|
||||
"""
|
||||
eq_pos = value.find("=")
|
||||
if eq_pos <= 0:
|
||||
raise argparse.ArgumentTypeError(
|
||||
f"Invalid --register-bq format: '{value}'. "
|
||||
f"Expected: 'alias=SELECT ...' (e.g., 'traffic=SELECT report_date, ...')"
|
||||
)
|
||||
alias = value[:eq_pos].strip()
|
||||
sql = value[eq_pos + 1:].strip()
|
||||
if not sql:
|
||||
raise argparse.ArgumentTypeError(
|
||||
f"Empty SQL in --register-bq for alias '{alias}'"
|
||||
)
|
||||
return alias, sql
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
"""Build the argument parser for remote_query CLI."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Execute DuckDB queries spanning local Parquet + remote BigQuery tables",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
# Local-only query (no BigQuery):
|
||||
python -m src.remote_query --sql "SELECT COUNT(*) FROM order_economics"
|
||||
|
||||
# Register BQ result and query it:
|
||||
python -m src.remote_query \\
|
||||
--register-bq "traffic=SELECT report_date, SUM(visitors) FROM \\`proj.ds.table\\` GROUP BY 1" \\
|
||||
--sql "SELECT * FROM traffic ORDER BY report_date"
|
||||
|
||||
# JOIN local + remote:
|
||||
python -m src.remote_query \\
|
||||
--register-bq "traffic=SELECT ... GROUP BY ..." \\
|
||||
--sql "SELECT o.*, t.visitors FROM order_economics o JOIN traffic t ON ..." \\
|
||||
--format parquet --output /tmp/result.parquet
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--sql",
|
||||
required=True,
|
||||
help="DuckDB SQL query (executed after all views are registered)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--register-bq",
|
||||
action="append",
|
||||
type=_parse_register_bq,
|
||||
default=[],
|
||||
metavar="ALIAS=SQL",
|
||||
dest="bq_registrations",
|
||||
help='Register BQ query result as DuckDB view. Format: "alias=BQ_SQL". Repeatable.',
|
||||
)
|
||||
parser.add_argument(
|
||||
"--format",
|
||||
choices=["table", "csv", "json", "parquet"],
|
||||
default=None,
|
||||
dest="fmt",
|
||||
help="Output format (default: from config or 'table')",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
default=None,
|
||||
help="Output file path for parquet/csv/json (default: auto for parquet)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-rows",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Max rows in final result (default: from config)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-bq-rows",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Max rows per BQ sub-query (default: from config)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Query timeout in seconds (default: from config)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--data-dir",
|
||||
default="/data/src_data",
|
||||
help="Parquet data directory (default: /data/src_data)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--quiet",
|
||||
action="store_true",
|
||||
help="Suppress progress messages (stderr)",
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: Optional[list[str]] = None) -> None:
|
||||
"""CLI entry point."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(
|
||||
level=logging.WARNING if args.quiet else logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
stream=sys.stderr,
|
||||
)
|
||||
|
||||
try:
|
||||
execute_remote_query(
|
||||
sql=args.sql,
|
||||
bq_registrations=args.bq_registrations,
|
||||
fmt=args.fmt,
|
||||
output=args.output,
|
||||
max_rows=args.max_rows,
|
||||
max_bq_rows=args.max_bq_rows,
|
||||
timeout=args.timeout,
|
||||
data_dir=args.data_dir,
|
||||
quiet=args.quiet,
|
||||
)
|
||||
except RemoteQueryError as e:
|
||||
print(f"ERROR: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted.", file=sys.stderr)
|
||||
sys.exit(130)
|
||||
except Exception as e:
|
||||
print(f"UNEXPECTED ERROR: {e}", file=sys.stderr)
|
||||
logger.exception("Unexpected error in remote_query")
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
69
tests/test_config_bq_entity_type.py
Normal file
69
tests/test_config_bq_entity_type.py
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
"""Tests for TableConfig.bq_entity_type field validation."""
|
||||
|
||||
import pytest
|
||||
|
||||
from src.config import TableConfig
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
def _make_table(**overrides) -> TableConfig:
|
||||
"""Create a TableConfig with sensible defaults, applying overrides."""
|
||||
defaults = dict(
|
||||
id="test.dataset.table",
|
||||
name="test_table",
|
||||
description="Test",
|
||||
primary_key="id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
defaults.update(overrides)
|
||||
return TableConfig(**defaults)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestBqEntityTypeDefault:
|
||||
def test_default_is_view(self):
|
||||
table = _make_table()
|
||||
assert table.bq_entity_type == "view"
|
||||
|
||||
|
||||
class TestBqEntityTypeValidValues:
|
||||
@pytest.mark.parametrize("entity_type", ["view", "table"])
|
||||
def test_valid_bq_entity_type(self, entity_type):
|
||||
table = _make_table(bq_entity_type=entity_type)
|
||||
assert table.bq_entity_type == entity_type
|
||||
|
||||
|
||||
class TestBqEntityTypeInvalid:
|
||||
@pytest.mark.parametrize("bad_type", ["VIEW", "physical", "", "tables", "materialized"])
|
||||
def test_invalid_bq_entity_type_raises(self, bad_type):
|
||||
with pytest.raises(ValueError, match="Invalid bq_entity_type"):
|
||||
_make_table(bq_entity_type=bad_type)
|
||||
|
||||
|
||||
class TestBqEntityTypeFromKwarg:
|
||||
def test_kwarg_sets_bq_entity_type(self):
|
||||
"""Simulate what _parse_data_description does: pass bq_entity_type as kwarg."""
|
||||
table = TableConfig(
|
||||
id="proj.dataset.orders",
|
||||
name="orders",
|
||||
description="Order data",
|
||||
primary_key="order_id",
|
||||
sync_strategy="full_refresh",
|
||||
bq_entity_type="table",
|
||||
)
|
||||
assert table.bq_entity_type == "table"
|
||||
|
||||
def test_kwarg_default_when_omitted(self):
|
||||
"""When YAML has no bq_entity_type, _parse_data_description passes 'view'."""
|
||||
table = TableConfig(
|
||||
id="proj.dataset.orders",
|
||||
name="orders",
|
||||
description="Order data",
|
||||
primary_key="order_id",
|
||||
sync_strategy="full_refresh",
|
||||
)
|
||||
assert table.bq_entity_type == "view"
|
||||
778
tests/test_remote_query.py
Normal file
778
tests/test_remote_query.py
Normal file
|
|
@ -0,0 +1,778 @@
|
|||
"""Tests for remote_query module - hybrid local Parquet + remote BigQuery queries.
|
||||
|
||||
Tests cover:
|
||||
- CLI argument parsing (_parse_register_bq, build_parser)
|
||||
- Local view setup (_setup_local_views via create_local_views)
|
||||
- BQ registration with safety checks (_validate_bq_result_size, _estimate_memory_mb, _register_bq_views)
|
||||
- Output formatting (_print_table, _format_output)
|
||||
- End-to-end local-only queries (no BQ mocking needed)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import duckdb
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import pytest
|
||||
|
||||
from src.remote_query import (
|
||||
RemoteQueryError,
|
||||
_estimate_memory_mb,
|
||||
_format_output,
|
||||
_parse_register_bq,
|
||||
_print_table,
|
||||
_register_bq_views,
|
||||
_validate_bq_result_size,
|
||||
build_parser,
|
||||
execute_remote_query,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_local_project(tmp_path):
|
||||
"""Create a minimal project with docs/data_description.md and parquet files.
|
||||
|
||||
Layout:
|
||||
tmp_path/
|
||||
docs/data_description.md (YAML with local + remote + hybrid tables)
|
||||
server/parquet/crm_data/orders.parquet
|
||||
server/parquet/crm_data/products.parquet
|
||||
|
||||
Returns (project_root, data_dir) where data_dir = tmp_path / "server".
|
||||
"""
|
||||
docs_dir = tmp_path / "docs"
|
||||
docs_dir.mkdir()
|
||||
|
||||
data_description = """\
|
||||
# Data Description
|
||||
|
||||
```yaml
|
||||
folder_mapping:
|
||||
in.c-crm: crm_data
|
||||
|
||||
tables:
|
||||
- id: "in.c-crm.orders"
|
||||
name: "orders"
|
||||
description: "Order data"
|
||||
primary_key: "order_id"
|
||||
sync_strategy: "full_refresh"
|
||||
|
||||
- id: "in.c-crm.products"
|
||||
name: "products"
|
||||
description: "Product catalog"
|
||||
primary_key: "product_id"
|
||||
sync_strategy: "full_refresh"
|
||||
|
||||
- id: "prj-grp-dataview-prod-1ff9.supply.traffic"
|
||||
name: "traffic"
|
||||
description: "Remote BQ traffic table"
|
||||
primary_key: "id"
|
||||
query_mode: "remote"
|
||||
|
||||
- id: "in.c-crm.inventory"
|
||||
name: "inventory"
|
||||
description: "Hybrid inventory"
|
||||
primary_key: "id"
|
||||
sync_strategy: "full_refresh"
|
||||
query_mode: "hybrid"
|
||||
```
|
||||
"""
|
||||
(docs_dir / "data_description.md").write_text(data_description)
|
||||
|
||||
# Create parquet files for local tables
|
||||
crm_dir = tmp_path / "server" / "parquet" / "crm_data"
|
||||
crm_dir.mkdir(parents=True)
|
||||
|
||||
orders_table = pa.table({
|
||||
"order_id": [1, 2, 3, 4, 5],
|
||||
"amount": [10.0, 20.0, 30.0, 40.0, 50.0],
|
||||
"product_id": [101, 102, 101, 103, 102],
|
||||
})
|
||||
pq.write_table(orders_table, crm_dir / "orders.parquet")
|
||||
|
||||
products_table = pa.table({
|
||||
"product_id": [101, 102, 103],
|
||||
"name": ["Widget", "Gadget", "Doohickey"],
|
||||
})
|
||||
pq.write_table(products_table, crm_dir / "products.parquet")
|
||||
|
||||
# Create parquet for hybrid table
|
||||
inventory_table = pa.table({
|
||||
"id": [1, 2],
|
||||
"stock": [100, 200],
|
||||
})
|
||||
pq.write_table(inventory_table, crm_dir / "inventory.parquet")
|
||||
|
||||
data_dir = str(tmp_path / "server")
|
||||
return tmp_path, data_dir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def duckdb_conn():
|
||||
"""Create an in-memory DuckDB connection, closed after test."""
|
||||
conn = duckdb.connect(":memory:")
|
||||
yield conn
|
||||
conn.close()
|
||||
|
||||
|
||||
class _DuckDBConnectionProxy:
|
||||
"""Proxy around DuckDBPyConnection that silently ignores unsupported SET commands.
|
||||
|
||||
DuckDB versions may not support 'statement_timeout'. This proxy catches
|
||||
CatalogException for SET commands so end-to-end tests work across versions.
|
||||
The real connection's execute method is read-only, so we wrap it.
|
||||
"""
|
||||
|
||||
def __init__(self, conn):
|
||||
object.__setattr__(self, "_conn", conn)
|
||||
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
if isinstance(sql, str) and sql.strip().upper().startswith("SET "):
|
||||
try:
|
||||
return self._conn.execute(sql, *args, **kwargs)
|
||||
except duckdb.CatalogException:
|
||||
return None
|
||||
return self._conn.execute(sql, *args, **kwargs)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._conn, name)
|
||||
|
||||
|
||||
def _patched_duckdb_connect(*args, **kwargs):
|
||||
"""Create a DuckDB connection wrapped in the proxy."""
|
||||
conn = duckdb.connect(*args, **kwargs)
|
||||
return _DuckDBConnectionProxy(conn)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: CLI argument parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCLIArgParsing:
|
||||
"""Test _parse_register_bq() and build_parser()."""
|
||||
|
||||
def test_requires_sql(self):
|
||||
"""Parser should fail when --sql is missing."""
|
||||
parser = build_parser()
|
||||
with pytest.raises(SystemExit):
|
||||
parser.parse_args([])
|
||||
|
||||
def test_register_bq_parsing(self):
|
||||
"""'alias=SELECT ...' parses into (alias, sql) tuple."""
|
||||
result = _parse_register_bq("traffic=SELECT report_date FROM `proj.ds.table`")
|
||||
assert result == ("traffic", "SELECT report_date FROM `proj.ds.table`")
|
||||
|
||||
def test_register_bq_invalid_format(self):
|
||||
"""Missing '=' should raise ArgumentTypeError."""
|
||||
with pytest.raises(argparse.ArgumentTypeError, match="Invalid --register-bq format"):
|
||||
_parse_register_bq("no_equals_sign_here")
|
||||
|
||||
def test_register_bq_empty_sql(self):
|
||||
"""Alias with empty SQL after '=' should raise."""
|
||||
with pytest.raises(argparse.ArgumentTypeError, match="Empty SQL"):
|
||||
_parse_register_bq("alias=")
|
||||
|
||||
def test_register_bq_empty_alias(self):
|
||||
"""'=SELECT ...' (empty alias) should raise."""
|
||||
with pytest.raises(argparse.ArgumentTypeError, match="Invalid --register-bq format"):
|
||||
_parse_register_bq("=SELECT 1")
|
||||
|
||||
def test_multiple_register_bq(self):
|
||||
"""Multiple --register-bq args should be collected into a list."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args([
|
||||
"--sql", "SELECT 1",
|
||||
"--register-bq", "t1=SELECT a FROM x",
|
||||
"--register-bq", "t2=SELECT b FROM y",
|
||||
])
|
||||
assert len(args.bq_registrations) == 2
|
||||
assert args.bq_registrations[0] == ("t1", "SELECT a FROM x")
|
||||
assert args.bq_registrations[1] == ("t2", "SELECT b FROM y")
|
||||
|
||||
def test_default_format_is_none(self):
|
||||
"""Default --format should be None (uses config default at runtime)."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(["--sql", "SELECT 1"])
|
||||
assert args.fmt is None
|
||||
|
||||
def test_explicit_format(self):
|
||||
"""Explicit --format should be respected."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(["--sql", "SELECT 1", "--format", "csv"])
|
||||
assert args.fmt == "csv"
|
||||
|
||||
def test_invalid_format_rejected(self):
|
||||
"""Invalid --format value should cause parser error."""
|
||||
parser = build_parser()
|
||||
with pytest.raises(SystemExit):
|
||||
parser.parse_args(["--sql", "SELECT 1", "--format", "xml"])
|
||||
|
||||
def test_no_register_bq_yields_empty_list(self):
|
||||
"""When no --register-bq is provided, bq_registrations defaults to []."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(["--sql", "SELECT 1"])
|
||||
assert args.bq_registrations == []
|
||||
|
||||
def test_register_bq_sql_with_equals(self):
|
||||
"""SQL containing '=' should be parsed correctly (split only on first '=')."""
|
||||
result = _parse_register_bq("view=SELECT * FROM t WHERE col = 5")
|
||||
assert result[0] == "view"
|
||||
assert result[1] == "SELECT * FROM t WHERE col = 5"
|
||||
|
||||
def test_quiet_flag(self):
|
||||
"""--quiet should set quiet=True."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(["--sql", "SELECT 1", "--quiet"])
|
||||
assert args.quiet is True
|
||||
|
||||
def test_max_rows_parsing(self):
|
||||
"""--max-rows should be parsed as integer."""
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(["--sql", "SELECT 1", "--max-rows", "500"])
|
||||
assert args.max_rows == 500
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Local view setup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLocalViewSetup:
|
||||
"""Test _setup_local_views via create_local_views with tmp_path fixture."""
|
||||
|
||||
def test_creates_views_from_parquet(self, tmp_local_project, duckdb_conn):
|
||||
"""Local tables should be available as DuckDB views after setup."""
|
||||
project_root, data_dir = tmp_local_project
|
||||
|
||||
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
|
||||
from src.remote_query import _setup_local_views
|
||||
created = _setup_local_views(duckdb_conn, data_dir, quiet=True)
|
||||
|
||||
assert "orders" in created
|
||||
assert "products" in created
|
||||
|
||||
# Verify data is queryable
|
||||
count = duckdb_conn.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
|
||||
assert count == 5
|
||||
|
||||
def test_skips_remote_tables(self, tmp_local_project, duckdb_conn):
|
||||
"""Remote tables (query_mode='remote') should NOT create local views."""
|
||||
project_root, data_dir = tmp_local_project
|
||||
|
||||
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
|
||||
from src.remote_query import _setup_local_views
|
||||
created = _setup_local_views(duckdb_conn, data_dir, quiet=True)
|
||||
|
||||
assert "traffic" not in created
|
||||
|
||||
# Verify the remote table is not queryable
|
||||
tables = [row[0] for row in duckdb_conn.execute("SHOW TABLES").fetchall()]
|
||||
assert "traffic" not in tables
|
||||
|
||||
def test_includes_hybrid_tables(self, tmp_local_project, duckdb_conn):
|
||||
"""Hybrid tables (query_mode='hybrid') should create local views."""
|
||||
project_root, data_dir = tmp_local_project
|
||||
|
||||
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
|
||||
from src.remote_query import _setup_local_views
|
||||
created = _setup_local_views(duckdb_conn, data_dir, quiet=True)
|
||||
|
||||
assert "inventory" in created
|
||||
|
||||
count = duckdb_conn.execute("SELECT COUNT(*) FROM inventory").fetchone()[0]
|
||||
assert count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: BQ registration with safety checks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBQRegistration:
|
||||
"""Test BQ result validation and registration (mocked BigQuery)."""
|
||||
|
||||
@staticmethod
|
||||
def _make_mock_bq_client(count_result: int = 100, schema_fields: int = 5):
|
||||
"""Create a mock BQ client that returns controlled count and schema.
|
||||
|
||||
Args:
|
||||
count_result: Row count returned by COUNT(*) query
|
||||
schema_fields: Number of fields in the schema
|
||||
"""
|
||||
mock_client = MagicMock()
|
||||
|
||||
# COUNT(*) query result
|
||||
count_row = MagicMock()
|
||||
count_row.__getitem__ = MagicMock(return_value=count_result)
|
||||
count_iter = iter([count_row])
|
||||
|
||||
# Schema query result (LIMIT 0)
|
||||
mock_schema_fields = [MagicMock() for _ in range(schema_fields)]
|
||||
mock_schema = MagicMock()
|
||||
mock_schema.__len__ = MagicMock(return_value=schema_fields)
|
||||
|
||||
# Use side_effect to return different results for different queries
|
||||
def query_side_effect(sql):
|
||||
job = MagicMock()
|
||||
if sql.startswith("SELECT COUNT(*)"):
|
||||
result = MagicMock()
|
||||
result.__iter__ = MagicMock(return_value=iter([count_row]))
|
||||
job.result.return_value = result
|
||||
elif "LIMIT 0" in sql:
|
||||
result = MagicMock()
|
||||
result.schema = mock_schema_fields
|
||||
job.result.return_value = result
|
||||
return job
|
||||
|
||||
mock_client.query.side_effect = query_side_effect
|
||||
return mock_client
|
||||
|
||||
def test_count_check_blocks_large_result(self):
|
||||
"""BQ sub-query exceeding max_rows should raise RemoteQueryError."""
|
||||
mock_client = self._make_mock_bq_client(count_result=1_000_000)
|
||||
|
||||
with pytest.raises(RemoteQueryError, match="would return 1,000,000 rows"):
|
||||
_validate_bq_result_size(
|
||||
bq_client=mock_client,
|
||||
sql="SELECT * FROM big_table",
|
||||
alias="big_table",
|
||||
max_rows=500_000,
|
||||
)
|
||||
|
||||
def test_validates_small_result_passes(self):
|
||||
"""BQ sub-query within limits should return the row count."""
|
||||
mock_client = self._make_mock_bq_client(count_result=1000)
|
||||
|
||||
row_count = _validate_bq_result_size(
|
||||
bq_client=mock_client,
|
||||
sql="SELECT * FROM small_table",
|
||||
alias="small_table",
|
||||
max_rows=500_000,
|
||||
)
|
||||
|
||||
assert row_count == 1000
|
||||
|
||||
def test_memory_estimate_blocks_huge_result(self):
|
||||
"""_register_bq_views should refuse when estimated memory exceeds 2 GB."""
|
||||
# Create a mock that passes count check but fails memory check
|
||||
# 500K rows x 100 cols x 50 bytes/cell = ~2384 MB > 2048 MB limit
|
||||
mock_client = self._make_mock_bq_client(count_result=500_000, schema_fields=100)
|
||||
|
||||
conn = duckdb.connect(":memory:")
|
||||
try:
|
||||
with patch("src.remote_query._create_bq_client", return_value=mock_client), \
|
||||
patch.dict(os.environ, {"BIGQUERY_PROJECT": "test-proj"}):
|
||||
with pytest.raises(RemoteQueryError, match="estimated memory"):
|
||||
_register_bq_views(
|
||||
conn=conn,
|
||||
registrations=[("huge", "SELECT * FROM huge_table")],
|
||||
max_bq_rows=1_000_000,
|
||||
timeout_seconds=60,
|
||||
quiet=True,
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def test_registers_small_result(self):
|
||||
"""BQ sub-query within all limits should register successfully."""
|
||||
# Small result: 100 rows x 5 cols = ~0.02 MB
|
||||
mock_client = self._make_mock_bq_client(count_result=100, schema_fields=5)
|
||||
|
||||
# Mock register_bq_table to return the row count
|
||||
conn = duckdb.connect(":memory:")
|
||||
try:
|
||||
with patch("src.remote_query._create_bq_client", return_value=mock_client), \
|
||||
patch("src.remote_query.register_bq_table", return_value=100) as mock_reg, \
|
||||
patch.dict(os.environ, {"BIGQUERY_PROJECT": "test-proj"}):
|
||||
results = _register_bq_views(
|
||||
conn=conn,
|
||||
registrations=[("small_view", "SELECT * FROM small_table")],
|
||||
max_bq_rows=500_000,
|
||||
timeout_seconds=60,
|
||||
quiet=True,
|
||||
)
|
||||
|
||||
assert results == {"small_view": 100}
|
||||
mock_reg.assert_called_once()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def test_missing_bigquery_project_raises(self):
|
||||
"""Missing BIGQUERY_PROJECT env var should raise RemoteQueryError."""
|
||||
conn = duckdb.connect(":memory:")
|
||||
try:
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
with pytest.raises(RemoteQueryError, match="BIGQUERY_PROJECT"):
|
||||
_register_bq_views(
|
||||
conn=conn,
|
||||
registrations=[("v", "SELECT 1")],
|
||||
max_bq_rows=100,
|
||||
timeout_seconds=60,
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def test_empty_registrations_returns_empty(self):
|
||||
"""Empty registration list should return empty dict without BQ calls."""
|
||||
conn = duckdb.connect(":memory:")
|
||||
try:
|
||||
result = _register_bq_views(
|
||||
conn=conn,
|
||||
registrations=[],
|
||||
max_bq_rows=100,
|
||||
timeout_seconds=60,
|
||||
)
|
||||
assert result == {}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Memory estimation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMemoryEstimation:
|
||||
"""Test _estimate_memory_mb calculation."""
|
||||
|
||||
def test_small_table(self):
|
||||
"""100 rows x 10 cols = 50_000 bytes ~ 0.048 MB."""
|
||||
result = _estimate_memory_mb(100, 10)
|
||||
assert abs(result - 50_000 / (1024 * 1024)) < 0.001
|
||||
|
||||
def test_large_table(self):
|
||||
"""1M rows x 50 cols x 50 bytes = ~2384 MB."""
|
||||
result = _estimate_memory_mb(1_000_000, 50)
|
||||
expected = (1_000_000 * 50 * 50) / (1024 * 1024)
|
||||
assert abs(result - expected) < 0.01
|
||||
|
||||
def test_zero_rows(self):
|
||||
"""Zero rows should return 0 MB."""
|
||||
assert _estimate_memory_mb(0, 50) == 0.0
|
||||
|
||||
def test_zero_columns(self):
|
||||
"""Zero columns should return 0 MB."""
|
||||
assert _estimate_memory_mb(1000, 0) == 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: Output formatting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestOutputFormatting:
|
||||
"""Test _print_table and _format_output for various formats."""
|
||||
|
||||
def test_table_format_aligned(self, capsys):
|
||||
"""Table format should produce aligned columns with header separator."""
|
||||
columns = ["id", "name", "value"]
|
||||
rows = [(1, "alice", 100), (2, "bob", 200)]
|
||||
|
||||
_print_table(columns, rows)
|
||||
|
||||
output = capsys.readouterr().out
|
||||
lines = output.strip().split("\n")
|
||||
|
||||
# Header line
|
||||
assert "id" in lines[0]
|
||||
assert "name" in lines[0]
|
||||
assert "value" in lines[0]
|
||||
|
||||
# Separator line
|
||||
assert "-+-" in lines[1]
|
||||
|
||||
# Data rows
|
||||
assert "alice" in lines[2]
|
||||
assert "bob" in lines[3]
|
||||
|
||||
# Row count footer
|
||||
assert "(2 rows)" in output
|
||||
|
||||
def test_table_format_empty_result(self, capsys):
|
||||
"""Empty result should print '(empty result)'."""
|
||||
_print_table(["col1"], [])
|
||||
|
||||
output = capsys.readouterr().out
|
||||
assert "(empty result)" in output
|
||||
|
||||
def test_table_format_null_values(self, capsys):
|
||||
"""None values should be rendered as 'NULL'."""
|
||||
_print_table(["col"], [(None,)])
|
||||
|
||||
output = capsys.readouterr().out
|
||||
assert "NULL" in output
|
||||
|
||||
def test_csv_format(self, tmp_path, duckdb_conn):
|
||||
"""CSV output should contain header + data rows."""
|
||||
duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS id, 'hello' AS msg")
|
||||
|
||||
output_path = str(tmp_path / "result.csv")
|
||||
_format_output(
|
||||
conn=duckdb_conn,
|
||||
sql="SELECT * FROM test",
|
||||
fmt="csv",
|
||||
output_path=output_path,
|
||||
max_rows=1000,
|
||||
)
|
||||
|
||||
with open(output_path) as f:
|
||||
reader = csv.reader(f)
|
||||
header = next(reader)
|
||||
rows = list(reader)
|
||||
|
||||
assert header == ["id", "msg"]
|
||||
assert len(rows) == 1
|
||||
assert rows[0][1] == "hello"
|
||||
|
||||
def test_csv_format_to_stdout(self, capsys, duckdb_conn):
|
||||
"""CSV with no output path should write to stdout."""
|
||||
duckdb_conn.execute("CREATE TABLE test AS SELECT 42 AS val")
|
||||
|
||||
_format_output(
|
||||
conn=duckdb_conn,
|
||||
sql="SELECT * FROM test",
|
||||
fmt="csv",
|
||||
output_path=None,
|
||||
max_rows=1000,
|
||||
)
|
||||
|
||||
output = capsys.readouterr().out
|
||||
assert "val" in output
|
||||
assert "42" in output
|
||||
|
||||
def test_json_format(self, tmp_path, duckdb_conn):
|
||||
"""JSON output should contain a list of dicts."""
|
||||
duckdb_conn.execute(
|
||||
"CREATE TABLE test AS SELECT 1 AS id, 'world' AS msg"
|
||||
)
|
||||
|
||||
output_path = str(tmp_path / "result.json")
|
||||
_format_output(
|
||||
conn=duckdb_conn,
|
||||
sql="SELECT * FROM test",
|
||||
fmt="json",
|
||||
output_path=output_path,
|
||||
max_rows=1000,
|
||||
)
|
||||
|
||||
with open(output_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert len(data) == 1
|
||||
assert data[0]["id"] == 1
|
||||
assert data[0]["msg"] == "world"
|
||||
|
||||
def test_json_format_to_stdout(self, capsys, duckdb_conn):
|
||||
"""JSON with no output path should print to stdout."""
|
||||
duckdb_conn.execute("CREATE TABLE test AS SELECT 99 AS num")
|
||||
|
||||
_format_output(
|
||||
conn=duckdb_conn,
|
||||
sql="SELECT * FROM test",
|
||||
fmt="json",
|
||||
output_path=None,
|
||||
max_rows=1000,
|
||||
)
|
||||
|
||||
output = capsys.readouterr().out
|
||||
data = json.loads(output)
|
||||
assert data[0]["num"] == 99
|
||||
|
||||
def test_parquet_write(self, tmp_path, duckdb_conn):
|
||||
"""Parquet output should create a readable parquet file."""
|
||||
duckdb_conn.execute(
|
||||
"CREATE TABLE test AS SELECT 1 AS id, 2.5 AS val"
|
||||
)
|
||||
|
||||
output_path = str(tmp_path / "output" / "result.parquet")
|
||||
|
||||
with patch("src.remote_query._load_remote_query_config", return_value={
|
||||
"output_dir": str(tmp_path / "default_output"),
|
||||
"timeout_seconds": 300,
|
||||
"max_result_rows": 100_000,
|
||||
"max_bq_registration_rows": 500_000,
|
||||
"default_format": "table",
|
||||
}):
|
||||
_format_output(
|
||||
conn=duckdb_conn,
|
||||
sql="SELECT * FROM test",
|
||||
fmt="parquet",
|
||||
output_path=output_path,
|
||||
max_rows=1000,
|
||||
)
|
||||
|
||||
assert Path(output_path).exists()
|
||||
|
||||
# Read it back and verify
|
||||
result = pq.read_table(output_path)
|
||||
assert result.num_rows == 1
|
||||
assert result.num_columns == 2
|
||||
assert result.column_names == ["id", "val"]
|
||||
|
||||
def test_parquet_default_path(self, tmp_path, duckdb_conn):
|
||||
"""Parquet with no output path should use config default dir."""
|
||||
duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS x")
|
||||
|
||||
default_dir = str(tmp_path / "default_output")
|
||||
with patch("src.remote_query._load_remote_query_config", return_value={
|
||||
"output_dir": default_dir,
|
||||
"timeout_seconds": 300,
|
||||
"max_result_rows": 100_000,
|
||||
"max_bq_registration_rows": 500_000,
|
||||
"default_format": "table",
|
||||
}):
|
||||
_format_output(
|
||||
conn=duckdb_conn,
|
||||
sql="SELECT * FROM test",
|
||||
fmt="parquet",
|
||||
output_path=None,
|
||||
max_rows=1000,
|
||||
)
|
||||
|
||||
expected_path = Path(default_dir) / "result.parquet"
|
||||
assert expected_path.exists()
|
||||
|
||||
def test_unknown_format_raises(self, duckdb_conn):
|
||||
"""Unknown format should raise RemoteQueryError."""
|
||||
duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS id")
|
||||
|
||||
with pytest.raises(RemoteQueryError, match="Unknown format"):
|
||||
_format_output(
|
||||
conn=duckdb_conn,
|
||||
sql="SELECT * FROM test",
|
||||
fmt="xml",
|
||||
output_path=None,
|
||||
max_rows=1000,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests: End-to-end (local-only, no BQ mocking needed)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEndToEnd:
|
||||
"""End-to-end tests with local Parquet data only (no BigQuery dependency).
|
||||
|
||||
Uses _patched_duckdb_connect to handle DuckDB version differences
|
||||
(statement_timeout may not be supported in all versions).
|
||||
"""
|
||||
|
||||
_CONFIG = {
|
||||
"timeout_seconds": 300,
|
||||
"max_result_rows": 100_000,
|
||||
"max_bq_registration_rows": 500_000,
|
||||
"default_format": "table",
|
||||
"output_dir": "/tmp/remote_query_test",
|
||||
}
|
||||
|
||||
def _run(self, tmp_local_project, **kwargs):
|
||||
"""Helper to run execute_remote_query with standard patches."""
|
||||
project_root, data_dir = tmp_local_project
|
||||
config = dict(self._CONFIG)
|
||||
config.update(kwargs.pop("config_overrides", {}))
|
||||
|
||||
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root), \
|
||||
patch("src.remote_query._load_remote_query_config", return_value=config), \
|
||||
patch("src.remote_query.duckdb") as mock_duckdb_mod:
|
||||
mock_duckdb_mod.connect = _patched_duckdb_connect
|
||||
kwargs.setdefault("data_dir", data_dir)
|
||||
kwargs.setdefault("bq_registrations", [])
|
||||
kwargs.setdefault("quiet", True)
|
||||
execute_remote_query(**kwargs)
|
||||
|
||||
def test_local_only_query(self, tmp_local_project, capsys):
|
||||
"""Execute a query against local Parquet views and verify table output."""
|
||||
self._run(
|
||||
tmp_local_project,
|
||||
sql="SELECT COUNT(*) AS cnt FROM orders",
|
||||
fmt="table",
|
||||
)
|
||||
|
||||
output = capsys.readouterr().out
|
||||
assert "cnt" in output
|
||||
assert "5" in output
|
||||
|
||||
def test_local_join_query(self, tmp_local_project, capsys):
|
||||
"""JOIN between two local tables should work."""
|
||||
self._run(
|
||||
tmp_local_project,
|
||||
sql=(
|
||||
"SELECT p.name, SUM(o.amount) AS total "
|
||||
"FROM orders o JOIN products p ON o.product_id = p.product_id "
|
||||
"GROUP BY p.name ORDER BY total DESC"
|
||||
),
|
||||
fmt="json",
|
||||
)
|
||||
|
||||
output = capsys.readouterr().out
|
||||
data = json.loads(output)
|
||||
assert len(data) == 3
|
||||
# Widget: orders 1,3 -> 10+30=40
|
||||
widget = next(r for r in data if r["name"] == "Widget")
|
||||
assert widget["total"] == 40.0
|
||||
|
||||
def test_result_row_limit(self, tmp_local_project, capsys):
|
||||
"""Result exceeding max_rows should be truncated."""
|
||||
self._run(
|
||||
tmp_local_project,
|
||||
sql="SELECT * FROM orders ORDER BY order_id",
|
||||
fmt="table",
|
||||
max_rows=2,
|
||||
quiet=False,
|
||||
config_overrides={"max_result_rows": 2},
|
||||
)
|
||||
|
||||
out = capsys.readouterr().out
|
||||
# Table output should show exactly 2 data rows
|
||||
assert "(2 rows)" in out
|
||||
|
||||
def test_csv_output_to_file(self, tmp_local_project, tmp_path):
|
||||
"""End-to-end CSV output written to a file."""
|
||||
output_path = str(tmp_path / "result.csv")
|
||||
|
||||
self._run(
|
||||
tmp_local_project,
|
||||
sql="SELECT order_id, amount FROM orders ORDER BY order_id",
|
||||
fmt="csv",
|
||||
output=output_path,
|
||||
)
|
||||
|
||||
with open(output_path) as f:
|
||||
reader = csv.DictReader(f)
|
||||
rows = list(reader)
|
||||
|
||||
assert len(rows) == 5
|
||||
assert rows[0]["order_id"] == "1"
|
||||
assert rows[0]["amount"] == "10.0"
|
||||
|
||||
def test_hybrid_table_queryable(self, tmp_local_project, capsys):
|
||||
"""Hybrid table should be accessible in local queries."""
|
||||
self._run(
|
||||
tmp_local_project,
|
||||
sql="SELECT SUM(stock) AS total_stock FROM inventory",
|
||||
fmt="json",
|
||||
)
|
||||
|
||||
output = capsys.readouterr().out
|
||||
data = json.loads(output)
|
||||
assert data[0]["total_stock"] == 300
|
||||
|
||||
def test_quiet_mode_suppresses_stderr(self, tmp_local_project, capsys):
|
||||
"""With quiet=True, no progress messages should appear on stderr."""
|
||||
self._run(
|
||||
tmp_local_project,
|
||||
sql="SELECT COUNT(*) AS cnt FROM orders",
|
||||
fmt="table",
|
||||
quiet=True,
|
||||
)
|
||||
|
||||
err = capsys.readouterr().err
|
||||
# In quiet mode, _log_progress should not emit anything
|
||||
assert "Setting up" not in err
|
||||
assert "local views" not in err
|
||||
Loading…
Reference in a new issue