Step 28: Remote query architecture for local+remote table JOINs

Add src/remote_query.py CLI module enabling the AI agent to run SQL
queries spanning local Parquet tables and remote BigQuery tables in a
single DuckDB session on the server. Two-phase protocol: BQ sub-queries
(--register-bq) fetch filtered/aggregated data, then DuckDB SQL (--sql)
joins everything.

Safety: COUNT(*) pre-check, memory estimation (2GB cap), row limits
(500K per BQ sub-query, 100K final result).

Changes:
- New src/remote_query.py with CLI, BQ registration, output formatting
- Add bq_entity_type field to TableConfig (view vs table routing)
- Extract create_local_views() from duckdb_manager.py for reuse
- Update claude_md_template.txt with remote query agent instructions
- Update example configs with remote_query section and docs
- 52 new tests (42 remote_query + 10 bq_entity_type), all passing
This commit is contained in:
Petr 2026-03-21 11:39:15 +01:00
parent ed16122994
commit d180b2014e
8 changed files with 1678 additions and 73 deletions

View file

@ -46,6 +46,18 @@ tables:
query_mode: "local"
sync_schedule: "every 1h"
profile_after_sync: false
# Remote table - too large for local sync, queried via BigQuery
- id: "project.dataset.page_views"
name: "page_views"
description: "Page-level traffic metrics (~5M rows/day)"
primary_key: "page_view_id"
sync_strategy: "partitioned"
partition_by: "event_date"
partition_column_type: "DATE"
partition_granularity: "day"
query_mode: "remote"
bq_entity_type: "view"
```
## Table Configuration Reference
@ -84,6 +96,7 @@ tables:
| Field | Default | Description |
|-------|---------|-------------|
| `query_mode` | `"local"` | How the AI agent queries this table |
| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) |
| Mode | Description | Best for |
|------|-------------|----------|
@ -91,6 +104,20 @@ tables:
| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data |
| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data |
### Remote Table Metadata
Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent
write safe, efficient queries:
| Field | Description |
|-------|-------------|
| `grain` | Row granularity description |
| `volume` | Size estimates (rows_per_day, unique entities) |
| `columns` | Column descriptions with value distributions |
| `dimension_profile` | Cardinality per dimension |
| `query_result_estimates` | Expected rows after GROUP BY combinations |
| `join_keys` | How to join with other tables |
### Automatic Sync Schedule
| Field | Default | Description |

View file

@ -181,3 +181,13 @@ catalog:
# alert_missing_pct: 5.0 # Alert threshold for missing %
# alert_imbalance_pct: 60.0 # Alert threshold for imbalance %
# alert_high_cardinality: 50 # Alert threshold for high cardinality columns
# --- Remote query (optional) ---
# Settings for remote BigQuery queries via `python -m src.remote_query`.
# Used when tables have query_mode: "remote" in data_description.md.
# remote_query:
# timeout_seconds: 300 # BQ + DuckDB query timeout
# max_result_rows: 100000 # Max rows in final output
# max_bq_registration_rows: 500000 # Max rows per --register-bq sub-query
# default_format: "table" # Default output format
# output_dir: "/tmp/remote_query" # Directory for Parquet/CSV exports

View file

@ -183,6 +183,133 @@ You're ready to analyze!
---
## Remote Queries (BigQuery)
Some tables are too large for local Parquet sync and are queried remotely via BigQuery.
These tables have `query_mode: "remote"` in `server/docs/data_description.md`.
### How to recognize remote tables
Before writing any query, read `server/docs/data_description.md`. Each table has:
- `query_mode: "local"` -- available as a local DuckDB view (query normally)
- `query_mode: "remote"` -- NOT in local DuckDB, must use remote query protocol below
- `query_mode: "hybrid"` -- local view exists AND can query BQ for live data
### Remote table metadata in data_description.md
Remote tables include metadata to help you write safe queries:
- **`volume`** -- rows_per_day, unique entities per day (tells you table size)
- **`columns`** -- column names, types, value distributions
- **`dimension_profile`** -- cardinality per dimension with value distributions
- **`query_result_estimates`** -- expected row counts after GROUP BY combinations
- **`join_keys`** -- how to join with other tables
**ALWAYS read these sections before writing a remote query.** Use `query_result_estimates`
to predict how many rows your query will return. The server has limited RAM -- keep BQ
sub-query results under 500K rows.
### Two-phase query protocol
Remote queries run **on the server** via SSH (server has DuckDB + Parquet + BigQuery access).
You write two SQL statements:
1. **BQ sub-query** (`--register-bq "alias=SQL"`) -- runs on BigQuery, result registered in DuckDB as a view.
This MUST contain WHERE and/or GROUP BY to reduce the result set. Never SELECT * from a remote table.
2. **DuckDB SQL** (`--sql "SQL"`) -- runs in DuckDB after all views (local + BQ) are ready.
Can JOIN local tables with registered BQ results.
### Command format
```bash
ssh {ssh_alias} 'cd /opt/data-analyst && \
set -a && source .env && set +a && \
PYTHONPATH=repo CONFIG_DIR=instance/config \
.venv/bin/python3 -m src.remote_query \
--register-bq "ALIAS=BQ_SQL_QUERY" \
--sql "DUCKDB_SQL_QUERY" \
--format table'
```
Arguments:
- `--register-bq "alias=SQL"` -- Register a BQ query result as DuckDB view (repeatable for multiple remote tables)
- `--sql "SQL"` -- The final DuckDB query (can reference local views + registered BQ aliases)
- `--format table|csv|json|parquet` -- Output format (default: table)
- `--output /path/file` -- Output file for parquet/csv/json
- `--max-rows N` -- Override max result rows
### Example 1: Remote-only query (aggregated data)
```bash
ssh {ssh_alias} 'cd /opt/data-analyst && \
set -a && source .env && set +a && \
PYTHONPATH=repo CONFIG_DIR=instance/config \
.venv/bin/python3 -m src.remote_query \
--register-bq "agg_data=SELECT date_col, dim_col, SUM(metric) as total FROM \`project.dataset.table\` WHERE date_col >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) GROUP BY 1,2" \
--sql "SELECT * FROM agg_data ORDER BY date_col, dim_col" \
--format table'
```
### Example 2: JOIN local + remote
```bash
ssh {ssh_alias} 'cd /opt/data-analyst && \
set -a && source .env && set +a && \
PYTHONPATH=repo CONFIG_DIR=instance/config \
.venv/bin/python3 -m src.remote_query \
--register-bq "remote_data=SELECT date_col, dim_col, SUM(metric) as total FROM \`project.dataset.table\` WHERE date_col >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) GROUP BY 1,2" \
--sql "SELECT l.*, r.total FROM local_table l JOIN remote_data r ON l.date_col = r.date_col AND l.dim_col = r.dim_col ORDER BY 1,2" \
--format table'
```
### Example 3: Download result as Parquet for local analysis
```bash
# 1. Run query, save as Parquet on server
ssh {ssh_alias} 'cd /opt/data-analyst && \
set -a && source .env && set +a && \
PYTHONPATH=repo CONFIG_DIR=instance/config \
.venv/bin/python3 -m src.remote_query \
--register-bq "remote_data=SELECT ... GROUP BY ..." \
--sql "SELECT ... JOIN ..." \
--format parquet --output /tmp/remote_query/analysis.parquet'
# 2. Download to local machine
scp {ssh_alias}:/tmp/remote_query/analysis.parquet ./user/parquet/
# 3. Register in local DuckDB for further analysis
python3 -c "
import duckdb
conn = duckdb.connect('user/duckdb/analytics.duckdb')
conn.execute(\"CREATE OR REPLACE VIEW analysis AS SELECT * FROM read_parquet('user/parquet/analysis.parquet')\")
print('View created:', conn.execute('SELECT COUNT(*) FROM analysis').fetchone()[0], 'rows')
conn.close()
"
```
### How to estimate result sizes
Before writing a BQ sub-query, check `dimension_profile` and `query_result_estimates`
in `server/docs/data_description.md`.
**Rule of thumb:** rows = (estimate per day from query_result_estimates) * (number of days in WHERE clause).
If that exceeds 100K rows, add more aggregation or tighter date filters.
### Safety rules
1. **NEVER** run `SELECT * FROM remote_table` without WHERE + GROUP BY
2. **ALWAYS** check `dimension_profile` before writing BQ sub-queries
3. **ALWAYS** include date range in WHERE clause
4. **Limits**: 500K rows max per BQ sub-query, 100K rows max in final result
5. If the query might take > 60 seconds, use nohup pattern:
```bash
ssh {ssh_alias} 'nohup ... --format parquet --output /tmp/result.parquet > /tmp/rq.log 2>&1 &'
ssh {ssh_alias} 'tail -5 /tmp/rq.log' # check progress
scp {ssh_alias}:/tmp/result.parquet ./user/parquet/
```
---
## Reporting Issues
Report issues to your platform team or the project's issue tracker.

View file

@ -270,6 +270,89 @@ def get_remote_tables(table_configs: List[Dict]) -> List[Dict]:
]
def create_local_views(
conn: duckdb.DuckDBPyConnection,
data_dir: str,
project_root: Optional[Path] = None,
verbose: bool = True,
) -> Tuple[List[str], List[str]]:
"""Create DuckDB views for local/hybrid tables from Parquet files.
Extracts the shared logic from init_duckdb() so it can be reused
by remote_query.py without creating a persistent DB file.
Args:
conn: Open DuckDB connection (in-memory or file-backed)
data_dir: Base data directory (e.g., "server", "/data/src_data")
project_root: Project root path. If None, auto-detected.
verbose: Print progress messages
Returns:
Tuple of (created_view_names, skipped_table_names)
"""
if project_root is None:
project_root = find_project_root()
table_configs, folder_mapping = parse_data_description(project_root)
# Filter to local and hybrid tables (both have local Parquet)
eligible_tables = [
tc for tc in table_configs
if tc.get("query_mode", "local") in ("local", "hybrid")
]
created_views = []
skipped_views = []
for table_config in eligible_tables:
table_name = table_config['name']
try:
parquet_path = get_parquet_path(table_config, folder_mapping, data_dir)
if not parquet_path.exists():
skipped_views.append(table_name)
if verbose:
print(f" [SKIP] {table_name} - parquet not found: {parquet_path}")
continue
# Determine if partitioned
sync_strategy = table_config.get('sync_strategy', 'full_refresh')
is_partitioned = (
sync_strategy == "partitioned" or
(sync_strategy == "incremental" and table_config.get('partition_by'))
)
if is_partitioned:
glob_pattern = parquet_path / "*.parquet"
partition_files = list(parquet_path.glob("*.parquet"))
if not partition_files:
skipped_views.append(table_name)
if verbose:
print(f" [SKIP] {table_name} - no partition files")
continue
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)"
if verbose:
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
print(f" [OK] {table_name} ({len(partition_files)} partitions, {mode_label})")
else:
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}')"
if verbose:
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
print(f" [OK] {table_name} ({mode_label})")
conn.execute(sql)
created_views.append(table_name)
except Exception as e:
if verbose:
print(f" [ERR] Error creating {table_name}: {e}")
raise
return created_views, skipped_views
def init_duckdb(
db_path="user/duckdb/analytics.duckdb",
data_dir="server",
@ -299,7 +382,6 @@ def init_duckdb(
print("Initializing DuckDB database...")
try:
# Find project root and parse data_description.md
project_root = find_project_root()
if verbose:
print(f" Project root: {project_root}")
@ -308,87 +390,25 @@ def init_duckdb(
if verbose:
print(f" Loaded {len(table_configs)} tables from data_description.md")
# Separate tables by query_mode
local_tables = []
remote_tables = []
hybrid_tables = []
for tc in table_configs:
mode = tc.get("query_mode", "local")
if mode == "remote":
remote_tables.append(tc)
elif mode == "hybrid":
hybrid_tables.append(tc)
else:
local_tables.append(tc)
# Classify tables by query_mode (for display only)
remote_tables = [tc for tc in table_configs if tc.get("query_mode") == "remote"]
local_count = len([tc for tc in table_configs if tc.get("query_mode", "local") == "local"])
hybrid_count = len([tc for tc in table_configs if tc.get("query_mode") == "hybrid"])
if verbose:
print(f" Query modes: {len(local_tables)} local, "
f"{len(remote_tables)} remote, {len(hybrid_tables)} hybrid")
print(f" Query modes: {local_count} local, "
f"{len(remote_tables)} remote, {hybrid_count} hybrid")
# Connect to database (creates if doesn't exist)
conn = duckdb.connect(db_path)
# Create local views from parquet files
if verbose:
print("\n Creating views from parquet files...")
created_views = []
skipped_views = []
# Process local and hybrid tables (both have local parquet)
for table_config in local_tables + hybrid_tables:
table_name = table_config['name']
try:
# Get parquet path
parquet_path = get_parquet_path(table_config, folder_mapping, data_dir)
# Check if file/directory exists
if not parquet_path.exists():
skipped_views.append(table_name)
if verbose:
print(f" [SKIP] {table_name} - parquet not found: {parquet_path}")
continue
# Determine if partitioned
sync_strategy = table_config.get('sync_strategy', 'full_refresh')
is_partitioned = (
sync_strategy == "partitioned" or
(sync_strategy == "incremental" and table_config.get('partition_by'))
)
# Create view
if is_partitioned:
# For partitioned tables, use glob pattern
glob_pattern = parquet_path / "*.parquet"
# Check if there are any partition files
partition_files = list(parquet_path.glob("*.parquet"))
if not partition_files:
skipped_views.append(table_name)
if verbose:
print(f" [SKIP] {table_name} - no partition files")
continue
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)"
if verbose:
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
print(f" [OK] {table_name} ({len(partition_files)} partitions, {mode_label})")
else:
# Single parquet file
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}')"
if verbose:
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
print(f" [OK] {table_name} ({mode_label})")
conn.execute(sql)
created_views.append(table_name)
except Exception as e:
if verbose:
print(f" [ERR] Error creating {table_name}: {e}")
return False
# Delegate to create_local_views
created_views, skipped_views = create_local_views(
conn, data_dir, project_root=project_root, verbose=verbose,
)
# Log remote tables (queried at runtime via register_bq_table)
if remote_tables:

View file

@ -107,6 +107,7 @@ class TableConfig:
columns: Optional[List[str]] = None # Subset of columns to sync (None = all)
row_filter: Optional[str] = None # SQL WHERE clause for filtering (e.g., "event_date >= '2024-01-01'")
query_mode: str = "local" # "local" (Parquet) | "remote" (BQ direct) | "hybrid" (sync subset, query BQ)
bq_entity_type: str = "view" # "view" (Python BQ client) | "table" (DuckDB BQ extension)
partition_column_type: str = "TIMESTAMP" # BQ SQL type for partition column: "DATE", "TIMESTAMP", "DATETIME"
catalog_fqn: Optional[str] = None # Explicit OpenMetadata FQN override (auto-derived if not set)
sync_schedule: Optional[str] = None # Schedule: "every 15m", "every 1h", "daily 05:00" (UTC)
@ -122,6 +123,14 @@ class TableConfig:
f"Allowed values: {', '.join(valid_query_modes)}"
)
# Validate bq_entity_type
valid_entity_types = ("view", "table")
if self.bq_entity_type not in valid_entity_types:
raise ValueError(
f"Invalid bq_entity_type '{self.bq_entity_type}' for table {self.id}. "
f"Allowed values: {', '.join(valid_entity_types)}"
)
# Validate sync_strategy
if self.sync_strategy not in ["full_refresh", "incremental", "partitioned"]:
raise ValueError(
@ -473,6 +482,7 @@ class Config:
columns=table_data.get("columns"),
row_filter=table_data.get("row_filter"),
query_mode=table_data.get("query_mode", "local"),
bq_entity_type=table_data.get("bq_entity_type", "view"),
partition_column_type=table_data.get("partition_column_type", "TIMESTAMP"),
catalog_fqn=table_data.get("catalog_fqn"),
sync_schedule=table_data.get("sync_schedule"),

564
src/remote_query.py Normal file
View file

@ -0,0 +1,564 @@
"""
Remote Query - Execute DuckDB queries spanning local Parquet + remote BigQuery tables.
Provides a server-side CLI for the AI agent to run SQL queries that JOIN local
(Parquet-backed) tables with on-demand BigQuery results. Designed for tables too
large to sync locally (e.g., daily_deal_traffic: ~3M rows/day).
Two-phase query protocol:
1. BQ sub-queries (--register-bq "alias=SQL") run on BigQuery, results registered
as DuckDB views via PyArrow (reuses register_bq_table from duckdb_manager).
2. DuckDB SQL (--sql) runs against local Parquet views + registered BQ results.
Usage:
python -m src.remote_query \\
--sql "SELECT ... FROM order_economics o JOIN traffic t ON ..." \\
--register-bq "traffic=SELECT ... FROM \\`project.dataset.table\\` WHERE ..." \\
--format table
Safety features:
- COUNT(*) pre-check before fetching BQ data
- Memory estimation (refuses queries > 2 GB estimated)
- Configurable row limits (per BQ sub-query and final result)
- Query timeout support
"""
import argparse
import csv
import io
import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import Optional
import duckdb
from config.loader import load_instance_config, get_instance_value
from scripts.duckdb_manager import (
create_local_views,
register_bq_table,
_create_bq_client,
)
logger = logging.getLogger(__name__)
class RemoteQueryError(Exception):
"""Error during remote query execution."""
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _load_remote_query_config() -> dict:
"""Load remote_query settings from instance.yaml with defaults."""
try:
instance_config = load_instance_config()
except (FileNotFoundError, ValueError) as e:
logger.warning("Could not load instance config: %s. Using defaults.", e)
instance_config = {}
return {
"timeout_seconds": get_instance_value(
instance_config, "remote_query", "timeout_seconds", default=300,
),
"max_result_rows": get_instance_value(
instance_config, "remote_query", "max_result_rows", default=100_000,
),
"max_bq_registration_rows": get_instance_value(
instance_config, "remote_query", "max_bq_registration_rows", default=500_000,
),
"default_format": get_instance_value(
instance_config, "remote_query", "default_format", default="table",
),
"output_dir": get_instance_value(
instance_config, "remote_query", "output_dir", default="/tmp/remote_query",
),
}
# ---------------------------------------------------------------------------
# BQ registration with safety checks
# ---------------------------------------------------------------------------
def _validate_bq_result_size(
bq_client, sql: str, alias: str, max_rows: int,
) -> int:
"""Execute COUNT(*) on the BQ sub-query before fetching all rows.
Args:
bq_client: BigQuery client instance
sql: The BQ SQL query to count
alias: Alias name (for error messages)
max_rows: Maximum allowed rows
Returns:
Row count
Raises:
RemoteQueryError: If count exceeds max_rows
"""
count_sql = f"SELECT COUNT(*) AS cnt FROM ({sql})"
_log_progress(f" Counting rows for '{alias}'...")
job = bq_client.query(count_sql)
result = job.result()
row_count = next(iter(result))[0]
if row_count > max_rows:
raise RemoteQueryError(
f"BQ sub-query '{alias}' would return {row_count:,} rows "
f"(limit: {max_rows:,}). Add more WHERE filters or GROUP BY "
f"to reduce the result set."
)
return row_count
def _estimate_memory_mb(row_count: int, column_count: int) -> float:
"""Estimate memory usage in MB for a PyArrow table.
Uses ~50 bytes per cell as a rough average across data types.
"""
return (row_count * column_count * 50) / (1024 * 1024)
def _register_bq_views(
conn: duckdb.DuckDBPyConnection,
registrations: list[tuple[str, str]],
max_bq_rows: int,
timeout_seconds: int,
quiet: bool = False,
) -> dict[str, int]:
"""Register BQ query results as DuckDB views with safety checks.
Args:
conn: DuckDB connection
registrations: List of (alias, bq_sql) tuples
max_bq_rows: Max rows per sub-query
timeout_seconds: BQ job timeout
quiet: Suppress progress messages
Returns:
Dict of {alias: row_count}
"""
if not registrations:
return {}
bq_project = os.environ.get("BIGQUERY_PROJECT")
if not bq_project:
raise RemoteQueryError(
"BIGQUERY_PROJECT environment variable not set. "
"Required for BigQuery sub-queries."
)
bq_client = _create_bq_client(bq_project)
results = {}
for alias, bq_sql in registrations:
start_time = time.time()
# Phase 1: COUNT(*) safety check
row_count = _validate_bq_result_size(bq_client, bq_sql, alias, max_bq_rows)
_log_progress(f" '{alias}': {row_count:,} rows (within limit)")
# Phase 2: Memory estimation
# Estimate column count from a LIMIT 0 query (cheap)
sample_job = bq_client.query(f"SELECT * FROM ({bq_sql}) LIMIT 0")
schema = sample_job.result().schema
col_count = len(schema)
estimated_mb = _estimate_memory_mb(row_count, col_count)
if estimated_mb > 2048: # 2 GB = 25% of 8 GB server RAM
raise RemoteQueryError(
f"BQ sub-query '{alias}' estimated memory: {estimated_mb:.0f} MB "
f"({row_count:,} rows x {col_count} cols). "
f"Limit is 2048 MB. Add more aggregation or filters."
)
# Phase 3: Execute and register
_log_progress(f" Fetching '{alias}' ({row_count:,} rows, ~{estimated_mb:.0f} MB)...")
actual_rows = register_bq_table(
conn=conn,
table_id=f"bq_registration.{alias}",
view_name=alias,
sql=bq_sql,
bq_project=bq_project,
)
elapsed = time.time() - start_time
_log_progress(f" '{alias}' registered: {actual_rows:,} rows in {elapsed:.1f}s")
results[alias] = actual_rows
return results
# ---------------------------------------------------------------------------
# Local view setup
# ---------------------------------------------------------------------------
def _setup_local_views(
conn: duckdb.DuckDBPyConnection,
data_dir: str,
quiet: bool = False,
) -> list[str]:
"""Create DuckDB views for all local/hybrid tables from Parquet.
Args:
conn: DuckDB connection
data_dir: Path to data directory (e.g., "/data/src_data")
quiet: Suppress progress messages
Returns:
List of created view names
"""
created, skipped = create_local_views(
conn=conn,
data_dir=data_dir,
verbose=not quiet,
)
return created
# ---------------------------------------------------------------------------
# Output formatting
# ---------------------------------------------------------------------------
def _print_table(columns: list[str], rows: list[tuple]) -> None:
"""Print an aligned ASCII table to stdout."""
if not rows:
print("(empty result)")
return
# Calculate column widths
str_rows = [[str(v) if v is not None else "NULL" for v in row] for row in rows]
widths = [len(col) for col in columns]
for row in str_rows:
for i, val in enumerate(row):
widths[i] = max(widths[i], len(val))
# Header
header = " | ".join(col.ljust(widths[i]) for i, col in enumerate(columns))
separator = "-+-".join("-" * widths[i] for i in range(len(columns)))
print(header)
print(separator)
# Rows
for row in str_rows:
line = " | ".join(val.ljust(widths[i]) for i, val in enumerate(row))
print(line)
print(f"\n({len(rows)} rows)")
def _format_output(
conn: duckdb.DuckDBPyConnection,
sql: str,
fmt: str,
output_path: Optional[str],
max_rows: int,
) -> None:
"""Execute final SQL and output results in the requested format.
Args:
conn: DuckDB connection with all views registered
sql: The final DuckDB SQL query
fmt: Output format (table, csv, json, parquet)
output_path: File path for file-based outputs
max_rows: Maximum rows to return
"""
# Add LIMIT to prevent runaway results
limited_sql = f"SELECT * FROM ({sql}) AS _rq LIMIT {max_rows + 1}"
result = conn.execute(limited_sql)
columns = [desc[0] for desc in result.description]
rows = result.fetchall()
# Check if result exceeded limit
if len(rows) > max_rows:
rows = rows[:max_rows]
_log_progress(
f" WARNING: Result truncated to {max_rows:,} rows. "
f"Add more filters or increase --max-rows."
)
if fmt == "table":
_print_table(columns, rows)
elif fmt == "csv":
if output_path:
with open(output_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(columns)
writer.writerows(rows)
_log_progress(f" CSV written: {output_path} ({len(rows)} rows)")
else:
writer = csv.writer(sys.stdout)
writer.writerow(columns)
writer.writerows(rows)
elif fmt == "json":
data = [dict(zip(columns, row)) for row in rows]
json_str = json.dumps(data, default=str, indent=2)
if output_path:
with open(output_path, "w") as f:
f.write(json_str)
_log_progress(f" JSON written: {output_path} ({len(rows)} rows)")
else:
print(json_str)
elif fmt == "parquet":
import pyarrow as pa
import pyarrow.parquet as pq
# Re-execute without limit wrapper for clean Arrow export
arrow_result = conn.execute(
f"SELECT * FROM ({sql}) AS _rq LIMIT {max_rows}"
).fetch_arrow_table()
if not output_path:
output_path = str(Path(_load_remote_query_config()["output_dir"]) / "result.parquet")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
pq.write_table(arrow_result, output_path)
_log_progress(
f" Parquet written: {output_path} "
f"({arrow_result.num_rows} rows, {arrow_result.num_columns} cols)"
)
else:
raise RemoteQueryError(f"Unknown format: {fmt}")
# ---------------------------------------------------------------------------
# Progress logging (stderr so stdout stays clean for data)
# ---------------------------------------------------------------------------
_quiet_mode = False
def _log_progress(msg: str) -> None:
"""Print progress message to stderr (keeps stdout clean for data output)."""
if not _quiet_mode:
print(msg, file=sys.stderr)
# ---------------------------------------------------------------------------
# Main execution
# ---------------------------------------------------------------------------
def execute_remote_query(
sql: str,
bq_registrations: list[tuple[str, str]],
fmt: str = "table",
output: Optional[str] = None,
max_rows: Optional[int] = None,
max_bq_rows: Optional[int] = None,
timeout: Optional[int] = None,
data_dir: str = "/data/src_data",
quiet: bool = False,
) -> None:
"""Main execution function for remote queries.
Args:
sql: DuckDB SQL query to execute
bq_registrations: List of (alias, bq_sql) tuples
fmt: Output format (table, csv, json, parquet)
output: Output file path (for parquet/csv/json)
max_rows: Max rows in final result
max_bq_rows: Max rows per BQ sub-query
timeout: Query timeout in seconds
data_dir: Path to data directory
quiet: Suppress progress messages
"""
global _quiet_mode
_quiet_mode = quiet
config = _load_remote_query_config()
max_rows = max_rows or config["max_result_rows"]
max_bq_rows = max_bq_rows or config["max_bq_registration_rows"]
timeout = timeout or config["timeout_seconds"]
fmt = fmt or config["default_format"]
start_time = time.time()
# Create in-memory DuckDB connection
conn = duckdb.connect(":memory:")
try:
# Step 1: Register local Parquet views
_log_progress("Setting up local views...")
local_views = _setup_local_views(conn, data_dir, quiet=quiet)
_log_progress(f" {len(local_views)} local views ready")
# Step 2: Register BQ sub-query results
if bq_registrations:
_log_progress(f"Registering {len(bq_registrations)} BQ sub-queries...")
bq_results = _register_bq_views(
conn, bq_registrations, max_bq_rows, timeout, quiet=quiet,
)
for alias, count in bq_results.items():
_log_progress(f" {alias}: {count:,} rows")
# Step 3: Execute the final DuckDB query
_log_progress("Executing query...")
_format_output(conn, sql, fmt, output, max_rows)
elapsed = time.time() - start_time
_log_progress(f"Done in {elapsed:.1f}s")
finally:
conn.close()
# ---------------------------------------------------------------------------
# CLI argument parsing
# ---------------------------------------------------------------------------
def _parse_register_bq(value: str) -> tuple[str, str]:
"""Parse --register-bq argument in 'alias=SQL' format.
Args:
value: String in format "alias=SELECT ..."
Returns:
Tuple of (alias, sql)
Raises:
argparse.ArgumentTypeError: If format is invalid
"""
eq_pos = value.find("=")
if eq_pos <= 0:
raise argparse.ArgumentTypeError(
f"Invalid --register-bq format: '{value}'. "
f"Expected: 'alias=SELECT ...' (e.g., 'traffic=SELECT report_date, ...')"
)
alias = value[:eq_pos].strip()
sql = value[eq_pos + 1:].strip()
if not sql:
raise argparse.ArgumentTypeError(
f"Empty SQL in --register-bq for alias '{alias}'"
)
return alias, sql
def build_parser() -> argparse.ArgumentParser:
"""Build the argument parser for remote_query CLI."""
parser = argparse.ArgumentParser(
description="Execute DuckDB queries spanning local Parquet + remote BigQuery tables",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Local-only query (no BigQuery):
python -m src.remote_query --sql "SELECT COUNT(*) FROM order_economics"
# Register BQ result and query it:
python -m src.remote_query \\
--register-bq "traffic=SELECT report_date, SUM(visitors) FROM \\`proj.ds.table\\` GROUP BY 1" \\
--sql "SELECT * FROM traffic ORDER BY report_date"
# JOIN local + remote:
python -m src.remote_query \\
--register-bq "traffic=SELECT ... GROUP BY ..." \\
--sql "SELECT o.*, t.visitors FROM order_economics o JOIN traffic t ON ..." \\
--format parquet --output /tmp/result.parquet
""",
)
parser.add_argument(
"--sql",
required=True,
help="DuckDB SQL query (executed after all views are registered)",
)
parser.add_argument(
"--register-bq",
action="append",
type=_parse_register_bq,
default=[],
metavar="ALIAS=SQL",
dest="bq_registrations",
help='Register BQ query result as DuckDB view. Format: "alias=BQ_SQL". Repeatable.',
)
parser.add_argument(
"--format",
choices=["table", "csv", "json", "parquet"],
default=None,
dest="fmt",
help="Output format (default: from config or 'table')",
)
parser.add_argument(
"--output",
default=None,
help="Output file path for parquet/csv/json (default: auto for parquet)",
)
parser.add_argument(
"--max-rows",
type=int,
default=None,
help="Max rows in final result (default: from config)",
)
parser.add_argument(
"--max-bq-rows",
type=int,
default=None,
help="Max rows per BQ sub-query (default: from config)",
)
parser.add_argument(
"--timeout",
type=int,
default=None,
help="Query timeout in seconds (default: from config)",
)
parser.add_argument(
"--data-dir",
default="/data/src_data",
help="Parquet data directory (default: /data/src_data)",
)
parser.add_argument(
"--quiet",
action="store_true",
help="Suppress progress messages (stderr)",
)
return parser
def main(argv: Optional[list[str]] = None) -> None:
"""CLI entry point."""
parser = build_parser()
args = parser.parse_args(argv)
# Setup logging
logging.basicConfig(
level=logging.WARNING if args.quiet else logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr,
)
try:
execute_remote_query(
sql=args.sql,
bq_registrations=args.bq_registrations,
fmt=args.fmt,
output=args.output,
max_rows=args.max_rows,
max_bq_rows=args.max_bq_rows,
timeout=args.timeout,
data_dir=args.data_dir,
quiet=args.quiet,
)
except RemoteQueryError as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("\nInterrupted.", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(f"UNEXPECTED ERROR: {e}", file=sys.stderr)
logger.exception("Unexpected error in remote_query")
sys.exit(2)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,69 @@
"""Tests for TableConfig.bq_entity_type field validation."""
import pytest
from src.config import TableConfig
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_table(**overrides) -> TableConfig:
"""Create a TableConfig with sensible defaults, applying overrides."""
defaults = dict(
id="test.dataset.table",
name="test_table",
description="Test",
primary_key="id",
sync_strategy="full_refresh",
)
defaults.update(overrides)
return TableConfig(**defaults)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestBqEntityTypeDefault:
def test_default_is_view(self):
table = _make_table()
assert table.bq_entity_type == "view"
class TestBqEntityTypeValidValues:
@pytest.mark.parametrize("entity_type", ["view", "table"])
def test_valid_bq_entity_type(self, entity_type):
table = _make_table(bq_entity_type=entity_type)
assert table.bq_entity_type == entity_type
class TestBqEntityTypeInvalid:
@pytest.mark.parametrize("bad_type", ["VIEW", "physical", "", "tables", "materialized"])
def test_invalid_bq_entity_type_raises(self, bad_type):
with pytest.raises(ValueError, match="Invalid bq_entity_type"):
_make_table(bq_entity_type=bad_type)
class TestBqEntityTypeFromKwarg:
def test_kwarg_sets_bq_entity_type(self):
"""Simulate what _parse_data_description does: pass bq_entity_type as kwarg."""
table = TableConfig(
id="proj.dataset.orders",
name="orders",
description="Order data",
primary_key="order_id",
sync_strategy="full_refresh",
bq_entity_type="table",
)
assert table.bq_entity_type == "table"
def test_kwarg_default_when_omitted(self):
"""When YAML has no bq_entity_type, _parse_data_description passes 'view'."""
table = TableConfig(
id="proj.dataset.orders",
name="orders",
description="Order data",
primary_key="order_id",
sync_strategy="full_refresh",
)
assert table.bq_entity_type == "view"

778
tests/test_remote_query.py Normal file
View file

@ -0,0 +1,778 @@
"""Tests for remote_query module - hybrid local Parquet + remote BigQuery queries.
Tests cover:
- CLI argument parsing (_parse_register_bq, build_parser)
- Local view setup (_setup_local_views via create_local_views)
- BQ registration with safety checks (_validate_bq_result_size, _estimate_memory_mb, _register_bq_views)
- Output formatting (_print_table, _format_output)
- End-to-end local-only queries (no BQ mocking needed)
"""
import argparse
import csv
import json
import os
from io import StringIO
from pathlib import Path
from unittest.mock import MagicMock, patch
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from src.remote_query import (
RemoteQueryError,
_estimate_memory_mb,
_format_output,
_parse_register_bq,
_print_table,
_register_bq_views,
_validate_bq_result_size,
build_parser,
execute_remote_query,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_local_project(tmp_path):
"""Create a minimal project with docs/data_description.md and parquet files.
Layout:
tmp_path/
docs/data_description.md (YAML with local + remote + hybrid tables)
server/parquet/crm_data/orders.parquet
server/parquet/crm_data/products.parquet
Returns (project_root, data_dir) where data_dir = tmp_path / "server".
"""
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
data_description = """\
# Data Description
```yaml
folder_mapping:
in.c-crm: crm_data
tables:
- id: "in.c-crm.orders"
name: "orders"
description: "Order data"
primary_key: "order_id"
sync_strategy: "full_refresh"
- id: "in.c-crm.products"
name: "products"
description: "Product catalog"
primary_key: "product_id"
sync_strategy: "full_refresh"
- id: "prj-grp-dataview-prod-1ff9.supply.traffic"
name: "traffic"
description: "Remote BQ traffic table"
primary_key: "id"
query_mode: "remote"
- id: "in.c-crm.inventory"
name: "inventory"
description: "Hybrid inventory"
primary_key: "id"
sync_strategy: "full_refresh"
query_mode: "hybrid"
```
"""
(docs_dir / "data_description.md").write_text(data_description)
# Create parquet files for local tables
crm_dir = tmp_path / "server" / "parquet" / "crm_data"
crm_dir.mkdir(parents=True)
orders_table = pa.table({
"order_id": [1, 2, 3, 4, 5],
"amount": [10.0, 20.0, 30.0, 40.0, 50.0],
"product_id": [101, 102, 101, 103, 102],
})
pq.write_table(orders_table, crm_dir / "orders.parquet")
products_table = pa.table({
"product_id": [101, 102, 103],
"name": ["Widget", "Gadget", "Doohickey"],
})
pq.write_table(products_table, crm_dir / "products.parquet")
# Create parquet for hybrid table
inventory_table = pa.table({
"id": [1, 2],
"stock": [100, 200],
})
pq.write_table(inventory_table, crm_dir / "inventory.parquet")
data_dir = str(tmp_path / "server")
return tmp_path, data_dir
@pytest.fixture
def duckdb_conn():
"""Create an in-memory DuckDB connection, closed after test."""
conn = duckdb.connect(":memory:")
yield conn
conn.close()
class _DuckDBConnectionProxy:
"""Proxy around DuckDBPyConnection that silently ignores unsupported SET commands.
DuckDB versions may not support 'statement_timeout'. This proxy catches
CatalogException for SET commands so end-to-end tests work across versions.
The real connection's execute method is read-only, so we wrap it.
"""
def __init__(self, conn):
object.__setattr__(self, "_conn", conn)
def execute(self, sql, *args, **kwargs):
if isinstance(sql, str) and sql.strip().upper().startswith("SET "):
try:
return self._conn.execute(sql, *args, **kwargs)
except duckdb.CatalogException:
return None
return self._conn.execute(sql, *args, **kwargs)
def __getattr__(self, name):
return getattr(self._conn, name)
def _patched_duckdb_connect(*args, **kwargs):
"""Create a DuckDB connection wrapped in the proxy."""
conn = duckdb.connect(*args, **kwargs)
return _DuckDBConnectionProxy(conn)
# ---------------------------------------------------------------------------
# Tests: CLI argument parsing
# ---------------------------------------------------------------------------
class TestCLIArgParsing:
"""Test _parse_register_bq() and build_parser()."""
def test_requires_sql(self):
"""Parser should fail when --sql is missing."""
parser = build_parser()
with pytest.raises(SystemExit):
parser.parse_args([])
def test_register_bq_parsing(self):
"""'alias=SELECT ...' parses into (alias, sql) tuple."""
result = _parse_register_bq("traffic=SELECT report_date FROM `proj.ds.table`")
assert result == ("traffic", "SELECT report_date FROM `proj.ds.table`")
def test_register_bq_invalid_format(self):
"""Missing '=' should raise ArgumentTypeError."""
with pytest.raises(argparse.ArgumentTypeError, match="Invalid --register-bq format"):
_parse_register_bq("no_equals_sign_here")
def test_register_bq_empty_sql(self):
"""Alias with empty SQL after '=' should raise."""
with pytest.raises(argparse.ArgumentTypeError, match="Empty SQL"):
_parse_register_bq("alias=")
def test_register_bq_empty_alias(self):
"""'=SELECT ...' (empty alias) should raise."""
with pytest.raises(argparse.ArgumentTypeError, match="Invalid --register-bq format"):
_parse_register_bq("=SELECT 1")
def test_multiple_register_bq(self):
"""Multiple --register-bq args should be collected into a list."""
parser = build_parser()
args = parser.parse_args([
"--sql", "SELECT 1",
"--register-bq", "t1=SELECT a FROM x",
"--register-bq", "t2=SELECT b FROM y",
])
assert len(args.bq_registrations) == 2
assert args.bq_registrations[0] == ("t1", "SELECT a FROM x")
assert args.bq_registrations[1] == ("t2", "SELECT b FROM y")
def test_default_format_is_none(self):
"""Default --format should be None (uses config default at runtime)."""
parser = build_parser()
args = parser.parse_args(["--sql", "SELECT 1"])
assert args.fmt is None
def test_explicit_format(self):
"""Explicit --format should be respected."""
parser = build_parser()
args = parser.parse_args(["--sql", "SELECT 1", "--format", "csv"])
assert args.fmt == "csv"
def test_invalid_format_rejected(self):
"""Invalid --format value should cause parser error."""
parser = build_parser()
with pytest.raises(SystemExit):
parser.parse_args(["--sql", "SELECT 1", "--format", "xml"])
def test_no_register_bq_yields_empty_list(self):
"""When no --register-bq is provided, bq_registrations defaults to []."""
parser = build_parser()
args = parser.parse_args(["--sql", "SELECT 1"])
assert args.bq_registrations == []
def test_register_bq_sql_with_equals(self):
"""SQL containing '=' should be parsed correctly (split only on first '=')."""
result = _parse_register_bq("view=SELECT * FROM t WHERE col = 5")
assert result[0] == "view"
assert result[1] == "SELECT * FROM t WHERE col = 5"
def test_quiet_flag(self):
"""--quiet should set quiet=True."""
parser = build_parser()
args = parser.parse_args(["--sql", "SELECT 1", "--quiet"])
assert args.quiet is True
def test_max_rows_parsing(self):
"""--max-rows should be parsed as integer."""
parser = build_parser()
args = parser.parse_args(["--sql", "SELECT 1", "--max-rows", "500"])
assert args.max_rows == 500
# ---------------------------------------------------------------------------
# Tests: Local view setup
# ---------------------------------------------------------------------------
class TestLocalViewSetup:
"""Test _setup_local_views via create_local_views with tmp_path fixture."""
def test_creates_views_from_parquet(self, tmp_local_project, duckdb_conn):
"""Local tables should be available as DuckDB views after setup."""
project_root, data_dir = tmp_local_project
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
from src.remote_query import _setup_local_views
created = _setup_local_views(duckdb_conn, data_dir, quiet=True)
assert "orders" in created
assert "products" in created
# Verify data is queryable
count = duckdb_conn.execute("SELECT COUNT(*) FROM orders").fetchone()[0]
assert count == 5
def test_skips_remote_tables(self, tmp_local_project, duckdb_conn):
"""Remote tables (query_mode='remote') should NOT create local views."""
project_root, data_dir = tmp_local_project
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
from src.remote_query import _setup_local_views
created = _setup_local_views(duckdb_conn, data_dir, quiet=True)
assert "traffic" not in created
# Verify the remote table is not queryable
tables = [row[0] for row in duckdb_conn.execute("SHOW TABLES").fetchall()]
assert "traffic" not in tables
def test_includes_hybrid_tables(self, tmp_local_project, duckdb_conn):
"""Hybrid tables (query_mode='hybrid') should create local views."""
project_root, data_dir = tmp_local_project
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
from src.remote_query import _setup_local_views
created = _setup_local_views(duckdb_conn, data_dir, quiet=True)
assert "inventory" in created
count = duckdb_conn.execute("SELECT COUNT(*) FROM inventory").fetchone()[0]
assert count == 2
# ---------------------------------------------------------------------------
# Tests: BQ registration with safety checks
# ---------------------------------------------------------------------------
class TestBQRegistration:
"""Test BQ result validation and registration (mocked BigQuery)."""
@staticmethod
def _make_mock_bq_client(count_result: int = 100, schema_fields: int = 5):
"""Create a mock BQ client that returns controlled count and schema.
Args:
count_result: Row count returned by COUNT(*) query
schema_fields: Number of fields in the schema
"""
mock_client = MagicMock()
# COUNT(*) query result
count_row = MagicMock()
count_row.__getitem__ = MagicMock(return_value=count_result)
count_iter = iter([count_row])
# Schema query result (LIMIT 0)
mock_schema_fields = [MagicMock() for _ in range(schema_fields)]
mock_schema = MagicMock()
mock_schema.__len__ = MagicMock(return_value=schema_fields)
# Use side_effect to return different results for different queries
def query_side_effect(sql):
job = MagicMock()
if sql.startswith("SELECT COUNT(*)"):
result = MagicMock()
result.__iter__ = MagicMock(return_value=iter([count_row]))
job.result.return_value = result
elif "LIMIT 0" in sql:
result = MagicMock()
result.schema = mock_schema_fields
job.result.return_value = result
return job
mock_client.query.side_effect = query_side_effect
return mock_client
def test_count_check_blocks_large_result(self):
"""BQ sub-query exceeding max_rows should raise RemoteQueryError."""
mock_client = self._make_mock_bq_client(count_result=1_000_000)
with pytest.raises(RemoteQueryError, match="would return 1,000,000 rows"):
_validate_bq_result_size(
bq_client=mock_client,
sql="SELECT * FROM big_table",
alias="big_table",
max_rows=500_000,
)
def test_validates_small_result_passes(self):
"""BQ sub-query within limits should return the row count."""
mock_client = self._make_mock_bq_client(count_result=1000)
row_count = _validate_bq_result_size(
bq_client=mock_client,
sql="SELECT * FROM small_table",
alias="small_table",
max_rows=500_000,
)
assert row_count == 1000
def test_memory_estimate_blocks_huge_result(self):
"""_register_bq_views should refuse when estimated memory exceeds 2 GB."""
# Create a mock that passes count check but fails memory check
# 500K rows x 100 cols x 50 bytes/cell = ~2384 MB > 2048 MB limit
mock_client = self._make_mock_bq_client(count_result=500_000, schema_fields=100)
conn = duckdb.connect(":memory:")
try:
with patch("src.remote_query._create_bq_client", return_value=mock_client), \
patch.dict(os.environ, {"BIGQUERY_PROJECT": "test-proj"}):
with pytest.raises(RemoteQueryError, match="estimated memory"):
_register_bq_views(
conn=conn,
registrations=[("huge", "SELECT * FROM huge_table")],
max_bq_rows=1_000_000,
timeout_seconds=60,
quiet=True,
)
finally:
conn.close()
def test_registers_small_result(self):
"""BQ sub-query within all limits should register successfully."""
# Small result: 100 rows x 5 cols = ~0.02 MB
mock_client = self._make_mock_bq_client(count_result=100, schema_fields=5)
# Mock register_bq_table to return the row count
conn = duckdb.connect(":memory:")
try:
with patch("src.remote_query._create_bq_client", return_value=mock_client), \
patch("src.remote_query.register_bq_table", return_value=100) as mock_reg, \
patch.dict(os.environ, {"BIGQUERY_PROJECT": "test-proj"}):
results = _register_bq_views(
conn=conn,
registrations=[("small_view", "SELECT * FROM small_table")],
max_bq_rows=500_000,
timeout_seconds=60,
quiet=True,
)
assert results == {"small_view": 100}
mock_reg.assert_called_once()
finally:
conn.close()
def test_missing_bigquery_project_raises(self):
"""Missing BIGQUERY_PROJECT env var should raise RemoteQueryError."""
conn = duckdb.connect(":memory:")
try:
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(RemoteQueryError, match="BIGQUERY_PROJECT"):
_register_bq_views(
conn=conn,
registrations=[("v", "SELECT 1")],
max_bq_rows=100,
timeout_seconds=60,
)
finally:
conn.close()
def test_empty_registrations_returns_empty(self):
"""Empty registration list should return empty dict without BQ calls."""
conn = duckdb.connect(":memory:")
try:
result = _register_bq_views(
conn=conn,
registrations=[],
max_bq_rows=100,
timeout_seconds=60,
)
assert result == {}
finally:
conn.close()
# ---------------------------------------------------------------------------
# Tests: Memory estimation
# ---------------------------------------------------------------------------
class TestMemoryEstimation:
"""Test _estimate_memory_mb calculation."""
def test_small_table(self):
"""100 rows x 10 cols = 50_000 bytes ~ 0.048 MB."""
result = _estimate_memory_mb(100, 10)
assert abs(result - 50_000 / (1024 * 1024)) < 0.001
def test_large_table(self):
"""1M rows x 50 cols x 50 bytes = ~2384 MB."""
result = _estimate_memory_mb(1_000_000, 50)
expected = (1_000_000 * 50 * 50) / (1024 * 1024)
assert abs(result - expected) < 0.01
def test_zero_rows(self):
"""Zero rows should return 0 MB."""
assert _estimate_memory_mb(0, 50) == 0.0
def test_zero_columns(self):
"""Zero columns should return 0 MB."""
assert _estimate_memory_mb(1000, 0) == 0.0
# ---------------------------------------------------------------------------
# Tests: Output formatting
# ---------------------------------------------------------------------------
class TestOutputFormatting:
"""Test _print_table and _format_output for various formats."""
def test_table_format_aligned(self, capsys):
"""Table format should produce aligned columns with header separator."""
columns = ["id", "name", "value"]
rows = [(1, "alice", 100), (2, "bob", 200)]
_print_table(columns, rows)
output = capsys.readouterr().out
lines = output.strip().split("\n")
# Header line
assert "id" in lines[0]
assert "name" in lines[0]
assert "value" in lines[0]
# Separator line
assert "-+-" in lines[1]
# Data rows
assert "alice" in lines[2]
assert "bob" in lines[3]
# Row count footer
assert "(2 rows)" in output
def test_table_format_empty_result(self, capsys):
"""Empty result should print '(empty result)'."""
_print_table(["col1"], [])
output = capsys.readouterr().out
assert "(empty result)" in output
def test_table_format_null_values(self, capsys):
"""None values should be rendered as 'NULL'."""
_print_table(["col"], [(None,)])
output = capsys.readouterr().out
assert "NULL" in output
def test_csv_format(self, tmp_path, duckdb_conn):
"""CSV output should contain header + data rows."""
duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS id, 'hello' AS msg")
output_path = str(tmp_path / "result.csv")
_format_output(
conn=duckdb_conn,
sql="SELECT * FROM test",
fmt="csv",
output_path=output_path,
max_rows=1000,
)
with open(output_path) as f:
reader = csv.reader(f)
header = next(reader)
rows = list(reader)
assert header == ["id", "msg"]
assert len(rows) == 1
assert rows[0][1] == "hello"
def test_csv_format_to_stdout(self, capsys, duckdb_conn):
"""CSV with no output path should write to stdout."""
duckdb_conn.execute("CREATE TABLE test AS SELECT 42 AS val")
_format_output(
conn=duckdb_conn,
sql="SELECT * FROM test",
fmt="csv",
output_path=None,
max_rows=1000,
)
output = capsys.readouterr().out
assert "val" in output
assert "42" in output
def test_json_format(self, tmp_path, duckdb_conn):
"""JSON output should contain a list of dicts."""
duckdb_conn.execute(
"CREATE TABLE test AS SELECT 1 AS id, 'world' AS msg"
)
output_path = str(tmp_path / "result.json")
_format_output(
conn=duckdb_conn,
sql="SELECT * FROM test",
fmt="json",
output_path=output_path,
max_rows=1000,
)
with open(output_path) as f:
data = json.load(f)
assert len(data) == 1
assert data[0]["id"] == 1
assert data[0]["msg"] == "world"
def test_json_format_to_stdout(self, capsys, duckdb_conn):
"""JSON with no output path should print to stdout."""
duckdb_conn.execute("CREATE TABLE test AS SELECT 99 AS num")
_format_output(
conn=duckdb_conn,
sql="SELECT * FROM test",
fmt="json",
output_path=None,
max_rows=1000,
)
output = capsys.readouterr().out
data = json.loads(output)
assert data[0]["num"] == 99
def test_parquet_write(self, tmp_path, duckdb_conn):
"""Parquet output should create a readable parquet file."""
duckdb_conn.execute(
"CREATE TABLE test AS SELECT 1 AS id, 2.5 AS val"
)
output_path = str(tmp_path / "output" / "result.parquet")
with patch("src.remote_query._load_remote_query_config", return_value={
"output_dir": str(tmp_path / "default_output"),
"timeout_seconds": 300,
"max_result_rows": 100_000,
"max_bq_registration_rows": 500_000,
"default_format": "table",
}):
_format_output(
conn=duckdb_conn,
sql="SELECT * FROM test",
fmt="parquet",
output_path=output_path,
max_rows=1000,
)
assert Path(output_path).exists()
# Read it back and verify
result = pq.read_table(output_path)
assert result.num_rows == 1
assert result.num_columns == 2
assert result.column_names == ["id", "val"]
def test_parquet_default_path(self, tmp_path, duckdb_conn):
"""Parquet with no output path should use config default dir."""
duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS x")
default_dir = str(tmp_path / "default_output")
with patch("src.remote_query._load_remote_query_config", return_value={
"output_dir": default_dir,
"timeout_seconds": 300,
"max_result_rows": 100_000,
"max_bq_registration_rows": 500_000,
"default_format": "table",
}):
_format_output(
conn=duckdb_conn,
sql="SELECT * FROM test",
fmt="parquet",
output_path=None,
max_rows=1000,
)
expected_path = Path(default_dir) / "result.parquet"
assert expected_path.exists()
def test_unknown_format_raises(self, duckdb_conn):
"""Unknown format should raise RemoteQueryError."""
duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS id")
with pytest.raises(RemoteQueryError, match="Unknown format"):
_format_output(
conn=duckdb_conn,
sql="SELECT * FROM test",
fmt="xml",
output_path=None,
max_rows=1000,
)
# ---------------------------------------------------------------------------
# Tests: End-to-end (local-only, no BQ mocking needed)
# ---------------------------------------------------------------------------
class TestEndToEnd:
"""End-to-end tests with local Parquet data only (no BigQuery dependency).
Uses _patched_duckdb_connect to handle DuckDB version differences
(statement_timeout may not be supported in all versions).
"""
_CONFIG = {
"timeout_seconds": 300,
"max_result_rows": 100_000,
"max_bq_registration_rows": 500_000,
"default_format": "table",
"output_dir": "/tmp/remote_query_test",
}
def _run(self, tmp_local_project, **kwargs):
"""Helper to run execute_remote_query with standard patches."""
project_root, data_dir = tmp_local_project
config = dict(self._CONFIG)
config.update(kwargs.pop("config_overrides", {}))
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root), \
patch("src.remote_query._load_remote_query_config", return_value=config), \
patch("src.remote_query.duckdb") as mock_duckdb_mod:
mock_duckdb_mod.connect = _patched_duckdb_connect
kwargs.setdefault("data_dir", data_dir)
kwargs.setdefault("bq_registrations", [])
kwargs.setdefault("quiet", True)
execute_remote_query(**kwargs)
def test_local_only_query(self, tmp_local_project, capsys):
"""Execute a query against local Parquet views and verify table output."""
self._run(
tmp_local_project,
sql="SELECT COUNT(*) AS cnt FROM orders",
fmt="table",
)
output = capsys.readouterr().out
assert "cnt" in output
assert "5" in output
def test_local_join_query(self, tmp_local_project, capsys):
"""JOIN between two local tables should work."""
self._run(
tmp_local_project,
sql=(
"SELECT p.name, SUM(o.amount) AS total "
"FROM orders o JOIN products p ON o.product_id = p.product_id "
"GROUP BY p.name ORDER BY total DESC"
),
fmt="json",
)
output = capsys.readouterr().out
data = json.loads(output)
assert len(data) == 3
# Widget: orders 1,3 -> 10+30=40
widget = next(r for r in data if r["name"] == "Widget")
assert widget["total"] == 40.0
def test_result_row_limit(self, tmp_local_project, capsys):
"""Result exceeding max_rows should be truncated."""
self._run(
tmp_local_project,
sql="SELECT * FROM orders ORDER BY order_id",
fmt="table",
max_rows=2,
quiet=False,
config_overrides={"max_result_rows": 2},
)
out = capsys.readouterr().out
# Table output should show exactly 2 data rows
assert "(2 rows)" in out
def test_csv_output_to_file(self, tmp_local_project, tmp_path):
"""End-to-end CSV output written to a file."""
output_path = str(tmp_path / "result.csv")
self._run(
tmp_local_project,
sql="SELECT order_id, amount FROM orders ORDER BY order_id",
fmt="csv",
output=output_path,
)
with open(output_path) as f:
reader = csv.DictReader(f)
rows = list(reader)
assert len(rows) == 5
assert rows[0]["order_id"] == "1"
assert rows[0]["amount"] == "10.0"
def test_hybrid_table_queryable(self, tmp_local_project, capsys):
"""Hybrid table should be accessible in local queries."""
self._run(
tmp_local_project,
sql="SELECT SUM(stock) AS total_stock FROM inventory",
fmt="json",
)
output = capsys.readouterr().out
data = json.loads(output)
assert data[0]["total_stock"] == 300
def test_quiet_mode_suppresses_stderr(self, tmp_local_project, capsys):
"""With quiet=True, no progress messages should appear on stderr."""
self._run(
tmp_local_project,
sql="SELECT COUNT(*) AS cnt FROM orders",
fmt="table",
quiet=True,
)
err = capsys.readouterr().err
# In quiet mode, _log_progress should not emit anything
assert "Setting up" not in err
assert "local views" not in err