From d180b2014e700e37abda5d8a1a57f6081de05e6a Mon Sep 17 00:00:00 2001 From: Petr Date: Sat, 21 Mar 2026 11:39:15 +0100 Subject: [PATCH] Step 28: Remote query architecture for local+remote table JOINs Add src/remote_query.py CLI module enabling the AI agent to run SQL queries spanning local Parquet tables and remote BigQuery tables in a single DuckDB session on the server. Two-phase protocol: BQ sub-queries (--register-bq) fetch filtered/aggregated data, then DuckDB SQL (--sql) joins everything. Safety: COUNT(*) pre-check, memory estimation (2GB cap), row limits (500K per BQ sub-query, 100K final result). Changes: - New src/remote_query.py with CLI, BQ registration, output formatting - Add bq_entity_type field to TableConfig (view vs table routing) - Extract create_local_views() from duckdb_manager.py for reuse - Update claude_md_template.txt with remote query agent instructions - Update example configs with remote_query section and docs - 52 new tests (42 remote_query + 10 bq_entity_type), all passing --- config/data_description.md.example | 27 + config/instance.yaml.example | 10 + docs/setup/claude_md_template.txt | 127 +++++ scripts/duckdb_manager.py | 166 +++--- src/config.py | 10 + src/remote_query.py | 564 ++++++++++++++++++++ tests/test_config_bq_entity_type.py | 69 +++ tests/test_remote_query.py | 778 ++++++++++++++++++++++++++++ 8 files changed, 1678 insertions(+), 73 deletions(-) create mode 100644 src/remote_query.py create mode 100644 tests/test_config_bq_entity_type.py create mode 100644 tests/test_remote_query.py diff --git a/config/data_description.md.example b/config/data_description.md.example index a246e33..033c25c 100644 --- a/config/data_description.md.example +++ b/config/data_description.md.example @@ -46,6 +46,18 @@ tables: query_mode: "local" sync_schedule: "every 1h" profile_after_sync: false + + # Remote table - too large for local sync, queried via BigQuery + - id: "project.dataset.page_views" + name: "page_views" + description: "Page-level traffic metrics (~5M rows/day)" + primary_key: "page_view_id" + sync_strategy: "partitioned" + partition_by: "event_date" + partition_column_type: "DATE" + partition_granularity: "day" + query_mode: "remote" + bq_entity_type: "view" ``` ## Table Configuration Reference @@ -84,6 +96,7 @@ tables: | Field | Default | Description | |-------|---------|-------------| | `query_mode` | `"local"` | How the AI agent queries this table | +| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) | | Mode | Description | Best for | |------|-------------|----------| @@ -91,6 +104,20 @@ tables: | `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data | | `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data | +### Remote Table Metadata + +Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent +write safe, efficient queries: + +| Field | Description | +|-------|-------------| +| `grain` | Row granularity description | +| `volume` | Size estimates (rows_per_day, unique entities) | +| `columns` | Column descriptions with value distributions | +| `dimension_profile` | Cardinality per dimension | +| `query_result_estimates` | Expected rows after GROUP BY combinations | +| `join_keys` | How to join with other tables | + ### Automatic Sync Schedule | Field | Default | Description | diff --git a/config/instance.yaml.example b/config/instance.yaml.example index 78ac461..17d28a1 100644 --- a/config/instance.yaml.example +++ b/config/instance.yaml.example @@ -181,3 +181,13 @@ catalog: # alert_missing_pct: 5.0 # Alert threshold for missing % # alert_imbalance_pct: 60.0 # Alert threshold for imbalance % # alert_high_cardinality: 50 # Alert threshold for high cardinality columns + +# --- Remote query (optional) --- +# Settings for remote BigQuery queries via `python -m src.remote_query`. +# Used when tables have query_mode: "remote" in data_description.md. +# remote_query: +# timeout_seconds: 300 # BQ + DuckDB query timeout +# max_result_rows: 100000 # Max rows in final output +# max_bq_registration_rows: 500000 # Max rows per --register-bq sub-query +# default_format: "table" # Default output format +# output_dir: "/tmp/remote_query" # Directory for Parquet/CSV exports diff --git a/docs/setup/claude_md_template.txt b/docs/setup/claude_md_template.txt index fcc5a3c..80c1acb 100644 --- a/docs/setup/claude_md_template.txt +++ b/docs/setup/claude_md_template.txt @@ -183,6 +183,133 @@ You're ready to analyze! --- +## Remote Queries (BigQuery) + +Some tables are too large for local Parquet sync and are queried remotely via BigQuery. +These tables have `query_mode: "remote"` in `server/docs/data_description.md`. + +### How to recognize remote tables + +Before writing any query, read `server/docs/data_description.md`. Each table has: +- `query_mode: "local"` -- available as a local DuckDB view (query normally) +- `query_mode: "remote"` -- NOT in local DuckDB, must use remote query protocol below +- `query_mode: "hybrid"` -- local view exists AND can query BQ for live data + +### Remote table metadata in data_description.md + +Remote tables include metadata to help you write safe queries: + +- **`volume`** -- rows_per_day, unique entities per day (tells you table size) +- **`columns`** -- column names, types, value distributions +- **`dimension_profile`** -- cardinality per dimension with value distributions +- **`query_result_estimates`** -- expected row counts after GROUP BY combinations +- **`join_keys`** -- how to join with other tables + +**ALWAYS read these sections before writing a remote query.** Use `query_result_estimates` +to predict how many rows your query will return. The server has limited RAM -- keep BQ +sub-query results under 500K rows. + +### Two-phase query protocol + +Remote queries run **on the server** via SSH (server has DuckDB + Parquet + BigQuery access). +You write two SQL statements: + +1. **BQ sub-query** (`--register-bq "alias=SQL"`) -- runs on BigQuery, result registered in DuckDB as a view. + This MUST contain WHERE and/or GROUP BY to reduce the result set. Never SELECT * from a remote table. +2. **DuckDB SQL** (`--sql "SQL"`) -- runs in DuckDB after all views (local + BQ) are ready. + Can JOIN local tables with registered BQ results. + +### Command format + +```bash +ssh {ssh_alias} 'cd /opt/data-analyst && \ + set -a && source .env && set +a && \ + PYTHONPATH=repo CONFIG_DIR=instance/config \ + .venv/bin/python3 -m src.remote_query \ + --register-bq "ALIAS=BQ_SQL_QUERY" \ + --sql "DUCKDB_SQL_QUERY" \ + --format table' +``` + +Arguments: +- `--register-bq "alias=SQL"` -- Register a BQ query result as DuckDB view (repeatable for multiple remote tables) +- `--sql "SQL"` -- The final DuckDB query (can reference local views + registered BQ aliases) +- `--format table|csv|json|parquet` -- Output format (default: table) +- `--output /path/file` -- Output file for parquet/csv/json +- `--max-rows N` -- Override max result rows + +### Example 1: Remote-only query (aggregated data) + +```bash +ssh {ssh_alias} 'cd /opt/data-analyst && \ + set -a && source .env && set +a && \ + PYTHONPATH=repo CONFIG_DIR=instance/config \ + .venv/bin/python3 -m src.remote_query \ + --register-bq "agg_data=SELECT date_col, dim_col, SUM(metric) as total FROM \`project.dataset.table\` WHERE date_col >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) GROUP BY 1,2" \ + --sql "SELECT * FROM agg_data ORDER BY date_col, dim_col" \ + --format table' +``` + +### Example 2: JOIN local + remote + +```bash +ssh {ssh_alias} 'cd /opt/data-analyst && \ + set -a && source .env && set +a && \ + PYTHONPATH=repo CONFIG_DIR=instance/config \ + .venv/bin/python3 -m src.remote_query \ + --register-bq "remote_data=SELECT date_col, dim_col, SUM(metric) as total FROM \`project.dataset.table\` WHERE date_col >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) GROUP BY 1,2" \ + --sql "SELECT l.*, r.total FROM local_table l JOIN remote_data r ON l.date_col = r.date_col AND l.dim_col = r.dim_col ORDER BY 1,2" \ + --format table' +``` + +### Example 3: Download result as Parquet for local analysis + +```bash +# 1. Run query, save as Parquet on server +ssh {ssh_alias} 'cd /opt/data-analyst && \ + set -a && source .env && set +a && \ + PYTHONPATH=repo CONFIG_DIR=instance/config \ + .venv/bin/python3 -m src.remote_query \ + --register-bq "remote_data=SELECT ... GROUP BY ..." \ + --sql "SELECT ... JOIN ..." \ + --format parquet --output /tmp/remote_query/analysis.parquet' + +# 2. Download to local machine +scp {ssh_alias}:/tmp/remote_query/analysis.parquet ./user/parquet/ + +# 3. Register in local DuckDB for further analysis +python3 -c " +import duckdb +conn = duckdb.connect('user/duckdb/analytics.duckdb') +conn.execute(\"CREATE OR REPLACE VIEW analysis AS SELECT * FROM read_parquet('user/parquet/analysis.parquet')\") +print('View created:', conn.execute('SELECT COUNT(*) FROM analysis').fetchone()[0], 'rows') +conn.close() +" +``` + +### How to estimate result sizes + +Before writing a BQ sub-query, check `dimension_profile` and `query_result_estimates` +in `server/docs/data_description.md`. + +**Rule of thumb:** rows = (estimate per day from query_result_estimates) * (number of days in WHERE clause). +If that exceeds 100K rows, add more aggregation or tighter date filters. + +### Safety rules + +1. **NEVER** run `SELECT * FROM remote_table` without WHERE + GROUP BY +2. **ALWAYS** check `dimension_profile` before writing BQ sub-queries +3. **ALWAYS** include date range in WHERE clause +4. **Limits**: 500K rows max per BQ sub-query, 100K rows max in final result +5. If the query might take > 60 seconds, use nohup pattern: + ```bash + ssh {ssh_alias} 'nohup ... --format parquet --output /tmp/result.parquet > /tmp/rq.log 2>&1 &' + ssh {ssh_alias} 'tail -5 /tmp/rq.log' # check progress + scp {ssh_alias}:/tmp/result.parquet ./user/parquet/ + ``` + +--- + ## Reporting Issues Report issues to your platform team or the project's issue tracker. diff --git a/scripts/duckdb_manager.py b/scripts/duckdb_manager.py index ecfd025..a73eb89 100644 --- a/scripts/duckdb_manager.py +++ b/scripts/duckdb_manager.py @@ -270,6 +270,89 @@ def get_remote_tables(table_configs: List[Dict]) -> List[Dict]: ] +def create_local_views( + conn: duckdb.DuckDBPyConnection, + data_dir: str, + project_root: Optional[Path] = None, + verbose: bool = True, +) -> Tuple[List[str], List[str]]: + """Create DuckDB views for local/hybrid tables from Parquet files. + + Extracts the shared logic from init_duckdb() so it can be reused + by remote_query.py without creating a persistent DB file. + + Args: + conn: Open DuckDB connection (in-memory or file-backed) + data_dir: Base data directory (e.g., "server", "/data/src_data") + project_root: Project root path. If None, auto-detected. + verbose: Print progress messages + + Returns: + Tuple of (created_view_names, skipped_table_names) + """ + if project_root is None: + project_root = find_project_root() + + table_configs, folder_mapping = parse_data_description(project_root) + + # Filter to local and hybrid tables (both have local Parquet) + eligible_tables = [ + tc for tc in table_configs + if tc.get("query_mode", "local") in ("local", "hybrid") + ] + + created_views = [] + skipped_views = [] + + for table_config in eligible_tables: + table_name = table_config['name'] + + try: + parquet_path = get_parquet_path(table_config, folder_mapping, data_dir) + + if not parquet_path.exists(): + skipped_views.append(table_name) + if verbose: + print(f" [SKIP] {table_name} - parquet not found: {parquet_path}") + continue + + # Determine if partitioned + sync_strategy = table_config.get('sync_strategy', 'full_refresh') + is_partitioned = ( + sync_strategy == "partitioned" or + (sync_strategy == "incremental" and table_config.get('partition_by')) + ) + + if is_partitioned: + glob_pattern = parquet_path / "*.parquet" + partition_files = list(parquet_path.glob("*.parquet")) + if not partition_files: + skipped_views.append(table_name) + if verbose: + print(f" [SKIP] {table_name} - no partition files") + continue + + sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)" + if verbose: + mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local" + print(f" [OK] {table_name} ({len(partition_files)} partitions, {mode_label})") + else: + sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}')" + if verbose: + mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local" + print(f" [OK] {table_name} ({mode_label})") + + conn.execute(sql) + created_views.append(table_name) + + except Exception as e: + if verbose: + print(f" [ERR] Error creating {table_name}: {e}") + raise + + return created_views, skipped_views + + def init_duckdb( db_path="user/duckdb/analytics.duckdb", data_dir="server", @@ -299,7 +382,6 @@ def init_duckdb( print("Initializing DuckDB database...") try: - # Find project root and parse data_description.md project_root = find_project_root() if verbose: print(f" Project root: {project_root}") @@ -308,87 +390,25 @@ def init_duckdb( if verbose: print(f" Loaded {len(table_configs)} tables from data_description.md") - # Separate tables by query_mode - local_tables = [] - remote_tables = [] - hybrid_tables = [] - - for tc in table_configs: - mode = tc.get("query_mode", "local") - if mode == "remote": - remote_tables.append(tc) - elif mode == "hybrid": - hybrid_tables.append(tc) - else: - local_tables.append(tc) + # Classify tables by query_mode (for display only) + remote_tables = [tc for tc in table_configs if tc.get("query_mode") == "remote"] + local_count = len([tc for tc in table_configs if tc.get("query_mode", "local") == "local"]) + hybrid_count = len([tc for tc in table_configs if tc.get("query_mode") == "hybrid"]) if verbose: - print(f" Query modes: {len(local_tables)} local, " - f"{len(remote_tables)} remote, {len(hybrid_tables)} hybrid") + print(f" Query modes: {local_count} local, " + f"{len(remote_tables)} remote, {hybrid_count} hybrid") # Connect to database (creates if doesn't exist) conn = duckdb.connect(db_path) - # Create local views from parquet files if verbose: print("\n Creating views from parquet files...") - created_views = [] - skipped_views = [] - - # Process local and hybrid tables (both have local parquet) - for table_config in local_tables + hybrid_tables: - table_name = table_config['name'] - - try: - # Get parquet path - parquet_path = get_parquet_path(table_config, folder_mapping, data_dir) - - # Check if file/directory exists - if not parquet_path.exists(): - skipped_views.append(table_name) - if verbose: - print(f" [SKIP] {table_name} - parquet not found: {parquet_path}") - continue - - # Determine if partitioned - sync_strategy = table_config.get('sync_strategy', 'full_refresh') - is_partitioned = ( - sync_strategy == "partitioned" or - (sync_strategy == "incremental" and table_config.get('partition_by')) - ) - - # Create view - if is_partitioned: - # For partitioned tables, use glob pattern - glob_pattern = parquet_path / "*.parquet" - - # Check if there are any partition files - partition_files = list(parquet_path.glob("*.parquet")) - if not partition_files: - skipped_views.append(table_name) - if verbose: - print(f" [SKIP] {table_name} - no partition files") - continue - - sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)" - if verbose: - mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local" - print(f" [OK] {table_name} ({len(partition_files)} partitions, {mode_label})") - else: - # Single parquet file - sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}')" - if verbose: - mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local" - print(f" [OK] {table_name} ({mode_label})") - - conn.execute(sql) - created_views.append(table_name) - - except Exception as e: - if verbose: - print(f" [ERR] Error creating {table_name}: {e}") - return False + # Delegate to create_local_views + created_views, skipped_views = create_local_views( + conn, data_dir, project_root=project_root, verbose=verbose, + ) # Log remote tables (queried at runtime via register_bq_table) if remote_tables: diff --git a/src/config.py b/src/config.py index cb0c627..14dccbd 100644 --- a/src/config.py +++ b/src/config.py @@ -107,6 +107,7 @@ class TableConfig: columns: Optional[List[str]] = None # Subset of columns to sync (None = all) row_filter: Optional[str] = None # SQL WHERE clause for filtering (e.g., "event_date >= '2024-01-01'") query_mode: str = "local" # "local" (Parquet) | "remote" (BQ direct) | "hybrid" (sync subset, query BQ) + bq_entity_type: str = "view" # "view" (Python BQ client) | "table" (DuckDB BQ extension) partition_column_type: str = "TIMESTAMP" # BQ SQL type for partition column: "DATE", "TIMESTAMP", "DATETIME" catalog_fqn: Optional[str] = None # Explicit OpenMetadata FQN override (auto-derived if not set) sync_schedule: Optional[str] = None # Schedule: "every 15m", "every 1h", "daily 05:00" (UTC) @@ -122,6 +123,14 @@ class TableConfig: f"Allowed values: {', '.join(valid_query_modes)}" ) + # Validate bq_entity_type + valid_entity_types = ("view", "table") + if self.bq_entity_type not in valid_entity_types: + raise ValueError( + f"Invalid bq_entity_type '{self.bq_entity_type}' for table {self.id}. " + f"Allowed values: {', '.join(valid_entity_types)}" + ) + # Validate sync_strategy if self.sync_strategy not in ["full_refresh", "incremental", "partitioned"]: raise ValueError( @@ -473,6 +482,7 @@ class Config: columns=table_data.get("columns"), row_filter=table_data.get("row_filter"), query_mode=table_data.get("query_mode", "local"), + bq_entity_type=table_data.get("bq_entity_type", "view"), partition_column_type=table_data.get("partition_column_type", "TIMESTAMP"), catalog_fqn=table_data.get("catalog_fqn"), sync_schedule=table_data.get("sync_schedule"), diff --git a/src/remote_query.py b/src/remote_query.py new file mode 100644 index 0000000..6d5cf1a --- /dev/null +++ b/src/remote_query.py @@ -0,0 +1,564 @@ +""" +Remote Query - Execute DuckDB queries spanning local Parquet + remote BigQuery tables. + +Provides a server-side CLI for the AI agent to run SQL queries that JOIN local +(Parquet-backed) tables with on-demand BigQuery results. Designed for tables too +large to sync locally (e.g., daily_deal_traffic: ~3M rows/day). + +Two-phase query protocol: +1. BQ sub-queries (--register-bq "alias=SQL") run on BigQuery, results registered + as DuckDB views via PyArrow (reuses register_bq_table from duckdb_manager). +2. DuckDB SQL (--sql) runs against local Parquet views + registered BQ results. + +Usage: + python -m src.remote_query \\ + --sql "SELECT ... FROM order_economics o JOIN traffic t ON ..." \\ + --register-bq "traffic=SELECT ... FROM \\`project.dataset.table\\` WHERE ..." \\ + --format table + +Safety features: +- COUNT(*) pre-check before fetching BQ data +- Memory estimation (refuses queries > 2 GB estimated) +- Configurable row limits (per BQ sub-query and final result) +- Query timeout support +""" + +import argparse +import csv +import io +import json +import logging +import os +import sys +import time +from pathlib import Path +from typing import Optional + +import duckdb + +from config.loader import load_instance_config, get_instance_value +from scripts.duckdb_manager import ( + create_local_views, + register_bq_table, + _create_bq_client, +) + +logger = logging.getLogger(__name__) + + +class RemoteQueryError(Exception): + """Error during remote query execution.""" + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +def _load_remote_query_config() -> dict: + """Load remote_query settings from instance.yaml with defaults.""" + try: + instance_config = load_instance_config() + except (FileNotFoundError, ValueError) as e: + logger.warning("Could not load instance config: %s. Using defaults.", e) + instance_config = {} + + return { + "timeout_seconds": get_instance_value( + instance_config, "remote_query", "timeout_seconds", default=300, + ), + "max_result_rows": get_instance_value( + instance_config, "remote_query", "max_result_rows", default=100_000, + ), + "max_bq_registration_rows": get_instance_value( + instance_config, "remote_query", "max_bq_registration_rows", default=500_000, + ), + "default_format": get_instance_value( + instance_config, "remote_query", "default_format", default="table", + ), + "output_dir": get_instance_value( + instance_config, "remote_query", "output_dir", default="/tmp/remote_query", + ), + } + + +# --------------------------------------------------------------------------- +# BQ registration with safety checks +# --------------------------------------------------------------------------- + +def _validate_bq_result_size( + bq_client, sql: str, alias: str, max_rows: int, +) -> int: + """Execute COUNT(*) on the BQ sub-query before fetching all rows. + + Args: + bq_client: BigQuery client instance + sql: The BQ SQL query to count + alias: Alias name (for error messages) + max_rows: Maximum allowed rows + + Returns: + Row count + + Raises: + RemoteQueryError: If count exceeds max_rows + """ + count_sql = f"SELECT COUNT(*) AS cnt FROM ({sql})" + _log_progress(f" Counting rows for '{alias}'...") + + job = bq_client.query(count_sql) + result = job.result() + row_count = next(iter(result))[0] + + if row_count > max_rows: + raise RemoteQueryError( + f"BQ sub-query '{alias}' would return {row_count:,} rows " + f"(limit: {max_rows:,}). Add more WHERE filters or GROUP BY " + f"to reduce the result set." + ) + + return row_count + + +def _estimate_memory_mb(row_count: int, column_count: int) -> float: + """Estimate memory usage in MB for a PyArrow table. + + Uses ~50 bytes per cell as a rough average across data types. + """ + return (row_count * column_count * 50) / (1024 * 1024) + + +def _register_bq_views( + conn: duckdb.DuckDBPyConnection, + registrations: list[tuple[str, str]], + max_bq_rows: int, + timeout_seconds: int, + quiet: bool = False, +) -> dict[str, int]: + """Register BQ query results as DuckDB views with safety checks. + + Args: + conn: DuckDB connection + registrations: List of (alias, bq_sql) tuples + max_bq_rows: Max rows per sub-query + timeout_seconds: BQ job timeout + quiet: Suppress progress messages + + Returns: + Dict of {alias: row_count} + """ + if not registrations: + return {} + + bq_project = os.environ.get("BIGQUERY_PROJECT") + if not bq_project: + raise RemoteQueryError( + "BIGQUERY_PROJECT environment variable not set. " + "Required for BigQuery sub-queries." + ) + + bq_client = _create_bq_client(bq_project) + results = {} + + for alias, bq_sql in registrations: + start_time = time.time() + + # Phase 1: COUNT(*) safety check + row_count = _validate_bq_result_size(bq_client, bq_sql, alias, max_bq_rows) + _log_progress(f" '{alias}': {row_count:,} rows (within limit)") + + # Phase 2: Memory estimation + # Estimate column count from a LIMIT 0 query (cheap) + sample_job = bq_client.query(f"SELECT * FROM ({bq_sql}) LIMIT 0") + schema = sample_job.result().schema + col_count = len(schema) + estimated_mb = _estimate_memory_mb(row_count, col_count) + + if estimated_mb > 2048: # 2 GB = 25% of 8 GB server RAM + raise RemoteQueryError( + f"BQ sub-query '{alias}' estimated memory: {estimated_mb:.0f} MB " + f"({row_count:,} rows x {col_count} cols). " + f"Limit is 2048 MB. Add more aggregation or filters." + ) + + # Phase 3: Execute and register + _log_progress(f" Fetching '{alias}' ({row_count:,} rows, ~{estimated_mb:.0f} MB)...") + actual_rows = register_bq_table( + conn=conn, + table_id=f"bq_registration.{alias}", + view_name=alias, + sql=bq_sql, + bq_project=bq_project, + ) + + elapsed = time.time() - start_time + _log_progress(f" '{alias}' registered: {actual_rows:,} rows in {elapsed:.1f}s") + results[alias] = actual_rows + + return results + + +# --------------------------------------------------------------------------- +# Local view setup +# --------------------------------------------------------------------------- + +def _setup_local_views( + conn: duckdb.DuckDBPyConnection, + data_dir: str, + quiet: bool = False, +) -> list[str]: + """Create DuckDB views for all local/hybrid tables from Parquet. + + Args: + conn: DuckDB connection + data_dir: Path to data directory (e.g., "/data/src_data") + quiet: Suppress progress messages + + Returns: + List of created view names + """ + created, skipped = create_local_views( + conn=conn, + data_dir=data_dir, + verbose=not quiet, + ) + return created + + +# --------------------------------------------------------------------------- +# Output formatting +# --------------------------------------------------------------------------- + +def _print_table(columns: list[str], rows: list[tuple]) -> None: + """Print an aligned ASCII table to stdout.""" + if not rows: + print("(empty result)") + return + + # Calculate column widths + str_rows = [[str(v) if v is not None else "NULL" for v in row] for row in rows] + widths = [len(col) for col in columns] + for row in str_rows: + for i, val in enumerate(row): + widths[i] = max(widths[i], len(val)) + + # Header + header = " | ".join(col.ljust(widths[i]) for i, col in enumerate(columns)) + separator = "-+-".join("-" * widths[i] for i in range(len(columns))) + print(header) + print(separator) + + # Rows + for row in str_rows: + line = " | ".join(val.ljust(widths[i]) for i, val in enumerate(row)) + print(line) + + print(f"\n({len(rows)} rows)") + + +def _format_output( + conn: duckdb.DuckDBPyConnection, + sql: str, + fmt: str, + output_path: Optional[str], + max_rows: int, +) -> None: + """Execute final SQL and output results in the requested format. + + Args: + conn: DuckDB connection with all views registered + sql: The final DuckDB SQL query + fmt: Output format (table, csv, json, parquet) + output_path: File path for file-based outputs + max_rows: Maximum rows to return + """ + # Add LIMIT to prevent runaway results + limited_sql = f"SELECT * FROM ({sql}) AS _rq LIMIT {max_rows + 1}" + result = conn.execute(limited_sql) + columns = [desc[0] for desc in result.description] + rows = result.fetchall() + + # Check if result exceeded limit + if len(rows) > max_rows: + rows = rows[:max_rows] + _log_progress( + f" WARNING: Result truncated to {max_rows:,} rows. " + f"Add more filters or increase --max-rows." + ) + + if fmt == "table": + _print_table(columns, rows) + + elif fmt == "csv": + if output_path: + with open(output_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(columns) + writer.writerows(rows) + _log_progress(f" CSV written: {output_path} ({len(rows)} rows)") + else: + writer = csv.writer(sys.stdout) + writer.writerow(columns) + writer.writerows(rows) + + elif fmt == "json": + data = [dict(zip(columns, row)) for row in rows] + json_str = json.dumps(data, default=str, indent=2) + if output_path: + with open(output_path, "w") as f: + f.write(json_str) + _log_progress(f" JSON written: {output_path} ({len(rows)} rows)") + else: + print(json_str) + + elif fmt == "parquet": + import pyarrow as pa + import pyarrow.parquet as pq + + # Re-execute without limit wrapper for clean Arrow export + arrow_result = conn.execute( + f"SELECT * FROM ({sql}) AS _rq LIMIT {max_rows}" + ).fetch_arrow_table() + + if not output_path: + output_path = str(Path(_load_remote_query_config()["output_dir"]) / "result.parquet") + + Path(output_path).parent.mkdir(parents=True, exist_ok=True) + pq.write_table(arrow_result, output_path) + _log_progress( + f" Parquet written: {output_path} " + f"({arrow_result.num_rows} rows, {arrow_result.num_columns} cols)" + ) + + else: + raise RemoteQueryError(f"Unknown format: {fmt}") + + +# --------------------------------------------------------------------------- +# Progress logging (stderr so stdout stays clean for data) +# --------------------------------------------------------------------------- + +_quiet_mode = False + + +def _log_progress(msg: str) -> None: + """Print progress message to stderr (keeps stdout clean for data output).""" + if not _quiet_mode: + print(msg, file=sys.stderr) + + +# --------------------------------------------------------------------------- +# Main execution +# --------------------------------------------------------------------------- + +def execute_remote_query( + sql: str, + bq_registrations: list[tuple[str, str]], + fmt: str = "table", + output: Optional[str] = None, + max_rows: Optional[int] = None, + max_bq_rows: Optional[int] = None, + timeout: Optional[int] = None, + data_dir: str = "/data/src_data", + quiet: bool = False, +) -> None: + """Main execution function for remote queries. + + Args: + sql: DuckDB SQL query to execute + bq_registrations: List of (alias, bq_sql) tuples + fmt: Output format (table, csv, json, parquet) + output: Output file path (for parquet/csv/json) + max_rows: Max rows in final result + max_bq_rows: Max rows per BQ sub-query + timeout: Query timeout in seconds + data_dir: Path to data directory + quiet: Suppress progress messages + """ + global _quiet_mode + _quiet_mode = quiet + + config = _load_remote_query_config() + max_rows = max_rows or config["max_result_rows"] + max_bq_rows = max_bq_rows or config["max_bq_registration_rows"] + timeout = timeout or config["timeout_seconds"] + fmt = fmt or config["default_format"] + + start_time = time.time() + + # Create in-memory DuckDB connection + conn = duckdb.connect(":memory:") + + try: + # Step 1: Register local Parquet views + _log_progress("Setting up local views...") + local_views = _setup_local_views(conn, data_dir, quiet=quiet) + _log_progress(f" {len(local_views)} local views ready") + + # Step 2: Register BQ sub-query results + if bq_registrations: + _log_progress(f"Registering {len(bq_registrations)} BQ sub-queries...") + bq_results = _register_bq_views( + conn, bq_registrations, max_bq_rows, timeout, quiet=quiet, + ) + for alias, count in bq_results.items(): + _log_progress(f" {alias}: {count:,} rows") + + # Step 3: Execute the final DuckDB query + _log_progress("Executing query...") + _format_output(conn, sql, fmt, output, max_rows) + + elapsed = time.time() - start_time + _log_progress(f"Done in {elapsed:.1f}s") + + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# CLI argument parsing +# --------------------------------------------------------------------------- + +def _parse_register_bq(value: str) -> tuple[str, str]: + """Parse --register-bq argument in 'alias=SQL' format. + + Args: + value: String in format "alias=SELECT ..." + + Returns: + Tuple of (alias, sql) + + Raises: + argparse.ArgumentTypeError: If format is invalid + """ + eq_pos = value.find("=") + if eq_pos <= 0: + raise argparse.ArgumentTypeError( + f"Invalid --register-bq format: '{value}'. " + f"Expected: 'alias=SELECT ...' (e.g., 'traffic=SELECT report_date, ...')" + ) + alias = value[:eq_pos].strip() + sql = value[eq_pos + 1:].strip() + if not sql: + raise argparse.ArgumentTypeError( + f"Empty SQL in --register-bq for alias '{alias}'" + ) + return alias, sql + + +def build_parser() -> argparse.ArgumentParser: + """Build the argument parser for remote_query CLI.""" + parser = argparse.ArgumentParser( + description="Execute DuckDB queries spanning local Parquet + remote BigQuery tables", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Local-only query (no BigQuery): + python -m src.remote_query --sql "SELECT COUNT(*) FROM order_economics" + + # Register BQ result and query it: + python -m src.remote_query \\ + --register-bq "traffic=SELECT report_date, SUM(visitors) FROM \\`proj.ds.table\\` GROUP BY 1" \\ + --sql "SELECT * FROM traffic ORDER BY report_date" + + # JOIN local + remote: + python -m src.remote_query \\ + --register-bq "traffic=SELECT ... GROUP BY ..." \\ + --sql "SELECT o.*, t.visitors FROM order_economics o JOIN traffic t ON ..." \\ + --format parquet --output /tmp/result.parquet + """, + ) + parser.add_argument( + "--sql", + required=True, + help="DuckDB SQL query (executed after all views are registered)", + ) + parser.add_argument( + "--register-bq", + action="append", + type=_parse_register_bq, + default=[], + metavar="ALIAS=SQL", + dest="bq_registrations", + help='Register BQ query result as DuckDB view. Format: "alias=BQ_SQL". Repeatable.', + ) + parser.add_argument( + "--format", + choices=["table", "csv", "json", "parquet"], + default=None, + dest="fmt", + help="Output format (default: from config or 'table')", + ) + parser.add_argument( + "--output", + default=None, + help="Output file path for parquet/csv/json (default: auto for parquet)", + ) + parser.add_argument( + "--max-rows", + type=int, + default=None, + help="Max rows in final result (default: from config)", + ) + parser.add_argument( + "--max-bq-rows", + type=int, + default=None, + help="Max rows per BQ sub-query (default: from config)", + ) + parser.add_argument( + "--timeout", + type=int, + default=None, + help="Query timeout in seconds (default: from config)", + ) + parser.add_argument( + "--data-dir", + default="/data/src_data", + help="Parquet data directory (default: /data/src_data)", + ) + parser.add_argument( + "--quiet", + action="store_true", + help="Suppress progress messages (stderr)", + ) + return parser + + +def main(argv: Optional[list[str]] = None) -> None: + """CLI entry point.""" + parser = build_parser() + args = parser.parse_args(argv) + + # Setup logging + logging.basicConfig( + level=logging.WARNING if args.quiet else logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + stream=sys.stderr, + ) + + try: + execute_remote_query( + sql=args.sql, + bq_registrations=args.bq_registrations, + fmt=args.fmt, + output=args.output, + max_rows=args.max_rows, + max_bq_rows=args.max_bq_rows, + timeout=args.timeout, + data_dir=args.data_dir, + quiet=args.quiet, + ) + except RemoteQueryError as e: + print(f"ERROR: {e}", file=sys.stderr) + sys.exit(1) + except KeyboardInterrupt: + print("\nInterrupted.", file=sys.stderr) + sys.exit(130) + except Exception as e: + print(f"UNEXPECTED ERROR: {e}", file=sys.stderr) + logger.exception("Unexpected error in remote_query") + sys.exit(2) + + +if __name__ == "__main__": + main() diff --git a/tests/test_config_bq_entity_type.py b/tests/test_config_bq_entity_type.py new file mode 100644 index 0000000..6640074 --- /dev/null +++ b/tests/test_config_bq_entity_type.py @@ -0,0 +1,69 @@ +"""Tests for TableConfig.bq_entity_type field validation.""" + +import pytest + +from src.config import TableConfig + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _make_table(**overrides) -> TableConfig: + """Create a TableConfig with sensible defaults, applying overrides.""" + defaults = dict( + id="test.dataset.table", + name="test_table", + description="Test", + primary_key="id", + sync_strategy="full_refresh", + ) + defaults.update(overrides) + return TableConfig(**defaults) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- +class TestBqEntityTypeDefault: + def test_default_is_view(self): + table = _make_table() + assert table.bq_entity_type == "view" + + +class TestBqEntityTypeValidValues: + @pytest.mark.parametrize("entity_type", ["view", "table"]) + def test_valid_bq_entity_type(self, entity_type): + table = _make_table(bq_entity_type=entity_type) + assert table.bq_entity_type == entity_type + + +class TestBqEntityTypeInvalid: + @pytest.mark.parametrize("bad_type", ["VIEW", "physical", "", "tables", "materialized"]) + def test_invalid_bq_entity_type_raises(self, bad_type): + with pytest.raises(ValueError, match="Invalid bq_entity_type"): + _make_table(bq_entity_type=bad_type) + + +class TestBqEntityTypeFromKwarg: + def test_kwarg_sets_bq_entity_type(self): + """Simulate what _parse_data_description does: pass bq_entity_type as kwarg.""" + table = TableConfig( + id="proj.dataset.orders", + name="orders", + description="Order data", + primary_key="order_id", + sync_strategy="full_refresh", + bq_entity_type="table", + ) + assert table.bq_entity_type == "table" + + def test_kwarg_default_when_omitted(self): + """When YAML has no bq_entity_type, _parse_data_description passes 'view'.""" + table = TableConfig( + id="proj.dataset.orders", + name="orders", + description="Order data", + primary_key="order_id", + sync_strategy="full_refresh", + ) + assert table.bq_entity_type == "view" diff --git a/tests/test_remote_query.py b/tests/test_remote_query.py new file mode 100644 index 0000000..f719fef --- /dev/null +++ b/tests/test_remote_query.py @@ -0,0 +1,778 @@ +"""Tests for remote_query module - hybrid local Parquet + remote BigQuery queries. + +Tests cover: +- CLI argument parsing (_parse_register_bq, build_parser) +- Local view setup (_setup_local_views via create_local_views) +- BQ registration with safety checks (_validate_bq_result_size, _estimate_memory_mb, _register_bq_views) +- Output formatting (_print_table, _format_output) +- End-to-end local-only queries (no BQ mocking needed) +""" + +import argparse +import csv +import json +import os +from io import StringIO +from pathlib import Path +from unittest.mock import MagicMock, patch + +import duckdb +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from src.remote_query import ( + RemoteQueryError, + _estimate_memory_mb, + _format_output, + _parse_register_bq, + _print_table, + _register_bq_views, + _validate_bq_result_size, + build_parser, + execute_remote_query, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def tmp_local_project(tmp_path): + """Create a minimal project with docs/data_description.md and parquet files. + + Layout: + tmp_path/ + docs/data_description.md (YAML with local + remote + hybrid tables) + server/parquet/crm_data/orders.parquet + server/parquet/crm_data/products.parquet + + Returns (project_root, data_dir) where data_dir = tmp_path / "server". + """ + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + + data_description = """\ +# Data Description + +```yaml +folder_mapping: + in.c-crm: crm_data + +tables: + - id: "in.c-crm.orders" + name: "orders" + description: "Order data" + primary_key: "order_id" + sync_strategy: "full_refresh" + + - id: "in.c-crm.products" + name: "products" + description: "Product catalog" + primary_key: "product_id" + sync_strategy: "full_refresh" + + - id: "prj-grp-dataview-prod-1ff9.supply.traffic" + name: "traffic" + description: "Remote BQ traffic table" + primary_key: "id" + query_mode: "remote" + + - id: "in.c-crm.inventory" + name: "inventory" + description: "Hybrid inventory" + primary_key: "id" + sync_strategy: "full_refresh" + query_mode: "hybrid" +``` +""" + (docs_dir / "data_description.md").write_text(data_description) + + # Create parquet files for local tables + crm_dir = tmp_path / "server" / "parquet" / "crm_data" + crm_dir.mkdir(parents=True) + + orders_table = pa.table({ + "order_id": [1, 2, 3, 4, 5], + "amount": [10.0, 20.0, 30.0, 40.0, 50.0], + "product_id": [101, 102, 101, 103, 102], + }) + pq.write_table(orders_table, crm_dir / "orders.parquet") + + products_table = pa.table({ + "product_id": [101, 102, 103], + "name": ["Widget", "Gadget", "Doohickey"], + }) + pq.write_table(products_table, crm_dir / "products.parquet") + + # Create parquet for hybrid table + inventory_table = pa.table({ + "id": [1, 2], + "stock": [100, 200], + }) + pq.write_table(inventory_table, crm_dir / "inventory.parquet") + + data_dir = str(tmp_path / "server") + return tmp_path, data_dir + + +@pytest.fixture +def duckdb_conn(): + """Create an in-memory DuckDB connection, closed after test.""" + conn = duckdb.connect(":memory:") + yield conn + conn.close() + + +class _DuckDBConnectionProxy: + """Proxy around DuckDBPyConnection that silently ignores unsupported SET commands. + + DuckDB versions may not support 'statement_timeout'. This proxy catches + CatalogException for SET commands so end-to-end tests work across versions. + The real connection's execute method is read-only, so we wrap it. + """ + + def __init__(self, conn): + object.__setattr__(self, "_conn", conn) + + def execute(self, sql, *args, **kwargs): + if isinstance(sql, str) and sql.strip().upper().startswith("SET "): + try: + return self._conn.execute(sql, *args, **kwargs) + except duckdb.CatalogException: + return None + return self._conn.execute(sql, *args, **kwargs) + + def __getattr__(self, name): + return getattr(self._conn, name) + + +def _patched_duckdb_connect(*args, **kwargs): + """Create a DuckDB connection wrapped in the proxy.""" + conn = duckdb.connect(*args, **kwargs) + return _DuckDBConnectionProxy(conn) + + +# --------------------------------------------------------------------------- +# Tests: CLI argument parsing +# --------------------------------------------------------------------------- + +class TestCLIArgParsing: + """Test _parse_register_bq() and build_parser().""" + + def test_requires_sql(self): + """Parser should fail when --sql is missing.""" + parser = build_parser() + with pytest.raises(SystemExit): + parser.parse_args([]) + + def test_register_bq_parsing(self): + """'alias=SELECT ...' parses into (alias, sql) tuple.""" + result = _parse_register_bq("traffic=SELECT report_date FROM `proj.ds.table`") + assert result == ("traffic", "SELECT report_date FROM `proj.ds.table`") + + def test_register_bq_invalid_format(self): + """Missing '=' should raise ArgumentTypeError.""" + with pytest.raises(argparse.ArgumentTypeError, match="Invalid --register-bq format"): + _parse_register_bq("no_equals_sign_here") + + def test_register_bq_empty_sql(self): + """Alias with empty SQL after '=' should raise.""" + with pytest.raises(argparse.ArgumentTypeError, match="Empty SQL"): + _parse_register_bq("alias=") + + def test_register_bq_empty_alias(self): + """'=SELECT ...' (empty alias) should raise.""" + with pytest.raises(argparse.ArgumentTypeError, match="Invalid --register-bq format"): + _parse_register_bq("=SELECT 1") + + def test_multiple_register_bq(self): + """Multiple --register-bq args should be collected into a list.""" + parser = build_parser() + args = parser.parse_args([ + "--sql", "SELECT 1", + "--register-bq", "t1=SELECT a FROM x", + "--register-bq", "t2=SELECT b FROM y", + ]) + assert len(args.bq_registrations) == 2 + assert args.bq_registrations[0] == ("t1", "SELECT a FROM x") + assert args.bq_registrations[1] == ("t2", "SELECT b FROM y") + + def test_default_format_is_none(self): + """Default --format should be None (uses config default at runtime).""" + parser = build_parser() + args = parser.parse_args(["--sql", "SELECT 1"]) + assert args.fmt is None + + def test_explicit_format(self): + """Explicit --format should be respected.""" + parser = build_parser() + args = parser.parse_args(["--sql", "SELECT 1", "--format", "csv"]) + assert args.fmt == "csv" + + def test_invalid_format_rejected(self): + """Invalid --format value should cause parser error.""" + parser = build_parser() + with pytest.raises(SystemExit): + parser.parse_args(["--sql", "SELECT 1", "--format", "xml"]) + + def test_no_register_bq_yields_empty_list(self): + """When no --register-bq is provided, bq_registrations defaults to [].""" + parser = build_parser() + args = parser.parse_args(["--sql", "SELECT 1"]) + assert args.bq_registrations == [] + + def test_register_bq_sql_with_equals(self): + """SQL containing '=' should be parsed correctly (split only on first '=').""" + result = _parse_register_bq("view=SELECT * FROM t WHERE col = 5") + assert result[0] == "view" + assert result[1] == "SELECT * FROM t WHERE col = 5" + + def test_quiet_flag(self): + """--quiet should set quiet=True.""" + parser = build_parser() + args = parser.parse_args(["--sql", "SELECT 1", "--quiet"]) + assert args.quiet is True + + def test_max_rows_parsing(self): + """--max-rows should be parsed as integer.""" + parser = build_parser() + args = parser.parse_args(["--sql", "SELECT 1", "--max-rows", "500"]) + assert args.max_rows == 500 + + +# --------------------------------------------------------------------------- +# Tests: Local view setup +# --------------------------------------------------------------------------- + +class TestLocalViewSetup: + """Test _setup_local_views via create_local_views with tmp_path fixture.""" + + def test_creates_views_from_parquet(self, tmp_local_project, duckdb_conn): + """Local tables should be available as DuckDB views after setup.""" + project_root, data_dir = tmp_local_project + + with patch("scripts.duckdb_manager.find_project_root", return_value=project_root): + from src.remote_query import _setup_local_views + created = _setup_local_views(duckdb_conn, data_dir, quiet=True) + + assert "orders" in created + assert "products" in created + + # Verify data is queryable + count = duckdb_conn.execute("SELECT COUNT(*) FROM orders").fetchone()[0] + assert count == 5 + + def test_skips_remote_tables(self, tmp_local_project, duckdb_conn): + """Remote tables (query_mode='remote') should NOT create local views.""" + project_root, data_dir = tmp_local_project + + with patch("scripts.duckdb_manager.find_project_root", return_value=project_root): + from src.remote_query import _setup_local_views + created = _setup_local_views(duckdb_conn, data_dir, quiet=True) + + assert "traffic" not in created + + # Verify the remote table is not queryable + tables = [row[0] for row in duckdb_conn.execute("SHOW TABLES").fetchall()] + assert "traffic" not in tables + + def test_includes_hybrid_tables(self, tmp_local_project, duckdb_conn): + """Hybrid tables (query_mode='hybrid') should create local views.""" + project_root, data_dir = tmp_local_project + + with patch("scripts.duckdb_manager.find_project_root", return_value=project_root): + from src.remote_query import _setup_local_views + created = _setup_local_views(duckdb_conn, data_dir, quiet=True) + + assert "inventory" in created + + count = duckdb_conn.execute("SELECT COUNT(*) FROM inventory").fetchone()[0] + assert count == 2 + + +# --------------------------------------------------------------------------- +# Tests: BQ registration with safety checks +# --------------------------------------------------------------------------- + +class TestBQRegistration: + """Test BQ result validation and registration (mocked BigQuery).""" + + @staticmethod + def _make_mock_bq_client(count_result: int = 100, schema_fields: int = 5): + """Create a mock BQ client that returns controlled count and schema. + + Args: + count_result: Row count returned by COUNT(*) query + schema_fields: Number of fields in the schema + """ + mock_client = MagicMock() + + # COUNT(*) query result + count_row = MagicMock() + count_row.__getitem__ = MagicMock(return_value=count_result) + count_iter = iter([count_row]) + + # Schema query result (LIMIT 0) + mock_schema_fields = [MagicMock() for _ in range(schema_fields)] + mock_schema = MagicMock() + mock_schema.__len__ = MagicMock(return_value=schema_fields) + + # Use side_effect to return different results for different queries + def query_side_effect(sql): + job = MagicMock() + if sql.startswith("SELECT COUNT(*)"): + result = MagicMock() + result.__iter__ = MagicMock(return_value=iter([count_row])) + job.result.return_value = result + elif "LIMIT 0" in sql: + result = MagicMock() + result.schema = mock_schema_fields + job.result.return_value = result + return job + + mock_client.query.side_effect = query_side_effect + return mock_client + + def test_count_check_blocks_large_result(self): + """BQ sub-query exceeding max_rows should raise RemoteQueryError.""" + mock_client = self._make_mock_bq_client(count_result=1_000_000) + + with pytest.raises(RemoteQueryError, match="would return 1,000,000 rows"): + _validate_bq_result_size( + bq_client=mock_client, + sql="SELECT * FROM big_table", + alias="big_table", + max_rows=500_000, + ) + + def test_validates_small_result_passes(self): + """BQ sub-query within limits should return the row count.""" + mock_client = self._make_mock_bq_client(count_result=1000) + + row_count = _validate_bq_result_size( + bq_client=mock_client, + sql="SELECT * FROM small_table", + alias="small_table", + max_rows=500_000, + ) + + assert row_count == 1000 + + def test_memory_estimate_blocks_huge_result(self): + """_register_bq_views should refuse when estimated memory exceeds 2 GB.""" + # Create a mock that passes count check but fails memory check + # 500K rows x 100 cols x 50 bytes/cell = ~2384 MB > 2048 MB limit + mock_client = self._make_mock_bq_client(count_result=500_000, schema_fields=100) + + conn = duckdb.connect(":memory:") + try: + with patch("src.remote_query._create_bq_client", return_value=mock_client), \ + patch.dict(os.environ, {"BIGQUERY_PROJECT": "test-proj"}): + with pytest.raises(RemoteQueryError, match="estimated memory"): + _register_bq_views( + conn=conn, + registrations=[("huge", "SELECT * FROM huge_table")], + max_bq_rows=1_000_000, + timeout_seconds=60, + quiet=True, + ) + finally: + conn.close() + + def test_registers_small_result(self): + """BQ sub-query within all limits should register successfully.""" + # Small result: 100 rows x 5 cols = ~0.02 MB + mock_client = self._make_mock_bq_client(count_result=100, schema_fields=5) + + # Mock register_bq_table to return the row count + conn = duckdb.connect(":memory:") + try: + with patch("src.remote_query._create_bq_client", return_value=mock_client), \ + patch("src.remote_query.register_bq_table", return_value=100) as mock_reg, \ + patch.dict(os.environ, {"BIGQUERY_PROJECT": "test-proj"}): + results = _register_bq_views( + conn=conn, + registrations=[("small_view", "SELECT * FROM small_table")], + max_bq_rows=500_000, + timeout_seconds=60, + quiet=True, + ) + + assert results == {"small_view": 100} + mock_reg.assert_called_once() + finally: + conn.close() + + def test_missing_bigquery_project_raises(self): + """Missing BIGQUERY_PROJECT env var should raise RemoteQueryError.""" + conn = duckdb.connect(":memory:") + try: + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(RemoteQueryError, match="BIGQUERY_PROJECT"): + _register_bq_views( + conn=conn, + registrations=[("v", "SELECT 1")], + max_bq_rows=100, + timeout_seconds=60, + ) + finally: + conn.close() + + def test_empty_registrations_returns_empty(self): + """Empty registration list should return empty dict without BQ calls.""" + conn = duckdb.connect(":memory:") + try: + result = _register_bq_views( + conn=conn, + registrations=[], + max_bq_rows=100, + timeout_seconds=60, + ) + assert result == {} + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Tests: Memory estimation +# --------------------------------------------------------------------------- + +class TestMemoryEstimation: + """Test _estimate_memory_mb calculation.""" + + def test_small_table(self): + """100 rows x 10 cols = 50_000 bytes ~ 0.048 MB.""" + result = _estimate_memory_mb(100, 10) + assert abs(result - 50_000 / (1024 * 1024)) < 0.001 + + def test_large_table(self): + """1M rows x 50 cols x 50 bytes = ~2384 MB.""" + result = _estimate_memory_mb(1_000_000, 50) + expected = (1_000_000 * 50 * 50) / (1024 * 1024) + assert abs(result - expected) < 0.01 + + def test_zero_rows(self): + """Zero rows should return 0 MB.""" + assert _estimate_memory_mb(0, 50) == 0.0 + + def test_zero_columns(self): + """Zero columns should return 0 MB.""" + assert _estimate_memory_mb(1000, 0) == 0.0 + + +# --------------------------------------------------------------------------- +# Tests: Output formatting +# --------------------------------------------------------------------------- + +class TestOutputFormatting: + """Test _print_table and _format_output for various formats.""" + + def test_table_format_aligned(self, capsys): + """Table format should produce aligned columns with header separator.""" + columns = ["id", "name", "value"] + rows = [(1, "alice", 100), (2, "bob", 200)] + + _print_table(columns, rows) + + output = capsys.readouterr().out + lines = output.strip().split("\n") + + # Header line + assert "id" in lines[0] + assert "name" in lines[0] + assert "value" in lines[0] + + # Separator line + assert "-+-" in lines[1] + + # Data rows + assert "alice" in lines[2] + assert "bob" in lines[3] + + # Row count footer + assert "(2 rows)" in output + + def test_table_format_empty_result(self, capsys): + """Empty result should print '(empty result)'.""" + _print_table(["col1"], []) + + output = capsys.readouterr().out + assert "(empty result)" in output + + def test_table_format_null_values(self, capsys): + """None values should be rendered as 'NULL'.""" + _print_table(["col"], [(None,)]) + + output = capsys.readouterr().out + assert "NULL" in output + + def test_csv_format(self, tmp_path, duckdb_conn): + """CSV output should contain header + data rows.""" + duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS id, 'hello' AS msg") + + output_path = str(tmp_path / "result.csv") + _format_output( + conn=duckdb_conn, + sql="SELECT * FROM test", + fmt="csv", + output_path=output_path, + max_rows=1000, + ) + + with open(output_path) as f: + reader = csv.reader(f) + header = next(reader) + rows = list(reader) + + assert header == ["id", "msg"] + assert len(rows) == 1 + assert rows[0][1] == "hello" + + def test_csv_format_to_stdout(self, capsys, duckdb_conn): + """CSV with no output path should write to stdout.""" + duckdb_conn.execute("CREATE TABLE test AS SELECT 42 AS val") + + _format_output( + conn=duckdb_conn, + sql="SELECT * FROM test", + fmt="csv", + output_path=None, + max_rows=1000, + ) + + output = capsys.readouterr().out + assert "val" in output + assert "42" in output + + def test_json_format(self, tmp_path, duckdb_conn): + """JSON output should contain a list of dicts.""" + duckdb_conn.execute( + "CREATE TABLE test AS SELECT 1 AS id, 'world' AS msg" + ) + + output_path = str(tmp_path / "result.json") + _format_output( + conn=duckdb_conn, + sql="SELECT * FROM test", + fmt="json", + output_path=output_path, + max_rows=1000, + ) + + with open(output_path) as f: + data = json.load(f) + + assert len(data) == 1 + assert data[0]["id"] == 1 + assert data[0]["msg"] == "world" + + def test_json_format_to_stdout(self, capsys, duckdb_conn): + """JSON with no output path should print to stdout.""" + duckdb_conn.execute("CREATE TABLE test AS SELECT 99 AS num") + + _format_output( + conn=duckdb_conn, + sql="SELECT * FROM test", + fmt="json", + output_path=None, + max_rows=1000, + ) + + output = capsys.readouterr().out + data = json.loads(output) + assert data[0]["num"] == 99 + + def test_parquet_write(self, tmp_path, duckdb_conn): + """Parquet output should create a readable parquet file.""" + duckdb_conn.execute( + "CREATE TABLE test AS SELECT 1 AS id, 2.5 AS val" + ) + + output_path = str(tmp_path / "output" / "result.parquet") + + with patch("src.remote_query._load_remote_query_config", return_value={ + "output_dir": str(tmp_path / "default_output"), + "timeout_seconds": 300, + "max_result_rows": 100_000, + "max_bq_registration_rows": 500_000, + "default_format": "table", + }): + _format_output( + conn=duckdb_conn, + sql="SELECT * FROM test", + fmt="parquet", + output_path=output_path, + max_rows=1000, + ) + + assert Path(output_path).exists() + + # Read it back and verify + result = pq.read_table(output_path) + assert result.num_rows == 1 + assert result.num_columns == 2 + assert result.column_names == ["id", "val"] + + def test_parquet_default_path(self, tmp_path, duckdb_conn): + """Parquet with no output path should use config default dir.""" + duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS x") + + default_dir = str(tmp_path / "default_output") + with patch("src.remote_query._load_remote_query_config", return_value={ + "output_dir": default_dir, + "timeout_seconds": 300, + "max_result_rows": 100_000, + "max_bq_registration_rows": 500_000, + "default_format": "table", + }): + _format_output( + conn=duckdb_conn, + sql="SELECT * FROM test", + fmt="parquet", + output_path=None, + max_rows=1000, + ) + + expected_path = Path(default_dir) / "result.parquet" + assert expected_path.exists() + + def test_unknown_format_raises(self, duckdb_conn): + """Unknown format should raise RemoteQueryError.""" + duckdb_conn.execute("CREATE TABLE test AS SELECT 1 AS id") + + with pytest.raises(RemoteQueryError, match="Unknown format"): + _format_output( + conn=duckdb_conn, + sql="SELECT * FROM test", + fmt="xml", + output_path=None, + max_rows=1000, + ) + + +# --------------------------------------------------------------------------- +# Tests: End-to-end (local-only, no BQ mocking needed) +# --------------------------------------------------------------------------- + +class TestEndToEnd: + """End-to-end tests with local Parquet data only (no BigQuery dependency). + + Uses _patched_duckdb_connect to handle DuckDB version differences + (statement_timeout may not be supported in all versions). + """ + + _CONFIG = { + "timeout_seconds": 300, + "max_result_rows": 100_000, + "max_bq_registration_rows": 500_000, + "default_format": "table", + "output_dir": "/tmp/remote_query_test", + } + + def _run(self, tmp_local_project, **kwargs): + """Helper to run execute_remote_query with standard patches.""" + project_root, data_dir = tmp_local_project + config = dict(self._CONFIG) + config.update(kwargs.pop("config_overrides", {})) + + with patch("scripts.duckdb_manager.find_project_root", return_value=project_root), \ + patch("src.remote_query._load_remote_query_config", return_value=config), \ + patch("src.remote_query.duckdb") as mock_duckdb_mod: + mock_duckdb_mod.connect = _patched_duckdb_connect + kwargs.setdefault("data_dir", data_dir) + kwargs.setdefault("bq_registrations", []) + kwargs.setdefault("quiet", True) + execute_remote_query(**kwargs) + + def test_local_only_query(self, tmp_local_project, capsys): + """Execute a query against local Parquet views and verify table output.""" + self._run( + tmp_local_project, + sql="SELECT COUNT(*) AS cnt FROM orders", + fmt="table", + ) + + output = capsys.readouterr().out + assert "cnt" in output + assert "5" in output + + def test_local_join_query(self, tmp_local_project, capsys): + """JOIN between two local tables should work.""" + self._run( + tmp_local_project, + sql=( + "SELECT p.name, SUM(o.amount) AS total " + "FROM orders o JOIN products p ON o.product_id = p.product_id " + "GROUP BY p.name ORDER BY total DESC" + ), + fmt="json", + ) + + output = capsys.readouterr().out + data = json.loads(output) + assert len(data) == 3 + # Widget: orders 1,3 -> 10+30=40 + widget = next(r for r in data if r["name"] == "Widget") + assert widget["total"] == 40.0 + + def test_result_row_limit(self, tmp_local_project, capsys): + """Result exceeding max_rows should be truncated.""" + self._run( + tmp_local_project, + sql="SELECT * FROM orders ORDER BY order_id", + fmt="table", + max_rows=2, + quiet=False, + config_overrides={"max_result_rows": 2}, + ) + + out = capsys.readouterr().out + # Table output should show exactly 2 data rows + assert "(2 rows)" in out + + def test_csv_output_to_file(self, tmp_local_project, tmp_path): + """End-to-end CSV output written to a file.""" + output_path = str(tmp_path / "result.csv") + + self._run( + tmp_local_project, + sql="SELECT order_id, amount FROM orders ORDER BY order_id", + fmt="csv", + output=output_path, + ) + + with open(output_path) as f: + reader = csv.DictReader(f) + rows = list(reader) + + assert len(rows) == 5 + assert rows[0]["order_id"] == "1" + assert rows[0]["amount"] == "10.0" + + def test_hybrid_table_queryable(self, tmp_local_project, capsys): + """Hybrid table should be accessible in local queries.""" + self._run( + tmp_local_project, + sql="SELECT SUM(stock) AS total_stock FROM inventory", + fmt="json", + ) + + output = capsys.readouterr().out + data = json.loads(output) + assert data[0]["total_stock"] == 300 + + def test_quiet_mode_suppresses_stderr(self, tmp_local_project, capsys): + """With quiet=True, no progress messages should appear on stderr.""" + self._run( + tmp_local_project, + sql="SELECT COUNT(*) AS cnt FROM orders", + fmt="table", + quiet=True, + ) + + err = capsys.readouterr().err + # In quiet mode, _log_progress should not emit anything + assert "Setting up" not in err + assert "local views" not in err