agnes-the-ai-analyst/scripts/duckdb_manager.py
Petr d180b2014e Step 28: Remote query architecture for local+remote table JOINs
Add src/remote_query.py CLI module enabling the AI agent to run SQL
queries spanning local Parquet tables and remote BigQuery tables in a
single DuckDB session on the server. Two-phase protocol: BQ sub-queries
(--register-bq) fetch filtered/aggregated data, then DuckDB SQL (--sql)
joins everything.

Safety: COUNT(*) pre-check, memory estimation (2GB cap), row limits
(500K per BQ sub-query, 100K final result).

Changes:
- New src/remote_query.py with CLI, BQ registration, output formatting
- Add bq_entity_type field to TableConfig (view vs table routing)
- Extract create_local_views() from duckdb_manager.py for reuse
- Update claude_md_template.txt with remote query agent instructions
- Update example configs with remote_query section and docs
- 52 new tests (42 remote_query + 10 bq_entity_type), all passing
2026-03-21 11:39:15 +01:00

506 lines
17 KiB
Python

#!/usr/bin/env python3
"""
DuckDB Manager - Initialize and manage DuckDB database with views from parquet files
and runtime BigQuery query registration for remote/hybrid tables.
For BigQuery data sources, tables with query_mode="remote" or "hybrid" are queried
at runtime via the Python BQ client and registered as in-memory Arrow tables in DuckDB.
This avoids the DuckDB BigQuery extension limitation (cannot read BQ views).
Usage:
python3 scripts/duckdb_manager.py --reinit # Initialize/reinitialize all views
python3 -m scripts.duckdb_manager --reinit # Alternative module import
"""
import duckdb
import logging
import os
import sys
import argparse
import re
import yaml
from pathlib import Path
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
def find_project_root() -> Path:
"""
Find project root (folder containing docs/data_description.md).
Searches from current folder upwards.
Returns:
Path to project root
Raises:
FileNotFoundError: If docs/data_description.md is not found
"""
current = Path.cwd()
# Try current folder first
if (current / "docs" / "data_description.md").exists():
return current
# Also check for server/ subdirectory layout (analyst setup)
if (current / "server" / "docs" / "data_description.md").exists():
return current
# Try parent folders (up to 5 levels)
for _ in range(5):
current = current.parent
if (current / "docs" / "data_description.md").exists():
return current
if (current / "server" / "docs" / "data_description.md").exists():
return current
raise FileNotFoundError(
"docs/data_description.md not found. "
"Make sure you're running from project root."
)
def parse_data_description(project_root: Path) -> Tuple[List[Dict], Dict[str, str]]:
"""
Parse data_description.md and extract table configurations.
Args:
project_root: Path to project root
Returns:
Tuple of (table_configs, folder_mapping)
- table_configs: List of table configuration dicts
- folder_mapping: Dict mapping bucket names to folder names
"""
# Try both possible locations
data_desc_path = project_root / "docs" / "data_description.md"
if not data_desc_path.exists():
data_desc_path = project_root / "server" / "docs" / "data_description.md"
if not data_desc_path.exists():
raise FileNotFoundError(f"data_description.md not found in {project_root}")
with open(data_desc_path, 'r', encoding='utf-8') as f:
content = f.read()
# Collect all markdown files: main data_description + dataset files
md_files = [data_desc_path]
for datasets_dir_name in ["docs/datasets", "server/docs/datasets"]:
datasets_dir = project_root / datasets_dir_name
if datasets_dir.exists():
for md_file in sorted(datasets_dir.glob("*.md")):
md_files.append(md_file)
# Parse YAML blocks from all files
yaml_pattern = r'```yaml\s*\n(.*?)\n```'
folder_mapping = {}
table_configs = []
for md_file in md_files:
file_content = md_file.read_text() if md_file != data_desc_path else content
for yaml_match in re.finditer(yaml_pattern, file_content, re.DOTALL):
try:
config_data = yaml.safe_load(yaml_match.group(1))
if not config_data:
continue
if 'folder_mapping' in config_data:
folder_mapping.update(config_data['folder_mapping'])
if 'tables' in config_data:
table_configs.extend(config_data['tables'])
except yaml.YAMLError as e:
raise ValueError(f"Failed to parse YAML in {md_file.name}: {e}")
if not table_configs:
raise ValueError("No tables found in YAML configuration")
return table_configs, folder_mapping
def get_parquet_path(table_config: Dict, folder_mapping: Dict[str, str], data_dir: str) -> Path:
"""
Get path to Parquet file for given table.
Format: {data_dir}/parquet/{folder_name}/{table_name}.parquet
For partitioned tables: {data_dir}/parquet/{folder_name}/{table_name}/ (directory)
Args:
table_config: Table configuration dict
folder_mapping: Mapping of bucket names to folder names
data_dir: Base data directory (e.g., "server" for analysts, "data" for server)
Returns:
Path to Parquet file (or directory for partitioned tables)
"""
# Extract bucket name from table ID (e.g., "in.c-crm" from "in.c-crm.company")
table_id = table_config['id']
bucket_name = ".".join(table_id.split(".")[:-1])
# Use folder mapping if available, otherwise fall back to bucket name
folder_name = folder_mapping.get(bucket_name, bucket_name)
# Table-level folder override (e.g., folder: kbc_telemetry_expert)
if table_config.get('folder'):
folder_name = table_config['folder']
table_name = table_config['name']
sync_strategy = table_config.get('sync_strategy', 'full_refresh')
parquet_dir = Path(data_dir) / "parquet" / folder_name
# Determine if partitioned
is_partitioned = (
sync_strategy == "partitioned" or
(sync_strategy == "incremental" and table_config.get('partition_by'))
)
if is_partitioned:
# For partitioned tables, return directory path
return parquet_dir / table_name
else:
# Single parquet file
return parquet_dir / f"{table_name}.parquet"
def _get_bq_project_from_table_id(table_id: str) -> Optional[str]:
"""Extract BQ project ID from a fully-qualified table ID.
Args:
table_id: e.g. "prj-grp-dataview-prod-1ff9.finance_unit_economics.unit_economics"
Returns:
Project ID or None if format doesn't match BQ convention
"""
parts = table_id.split(".")
if len(parts) == 3 and "-" in parts[0]:
return parts[0]
return None
def _create_bq_client(project: str):
"""Create a BigQuery client. Separated for testability.
Args:
project: GCP project ID for billing
Returns:
google.cloud.bigquery.Client instance
"""
from google.cloud import bigquery as bq_module
return bq_module.Client(project=project)
def register_bq_table(
conn: duckdb.DuckDBPyConnection,
table_id: str,
view_name: str,
sql: str,
bq_project: Optional[str] = None,
_bq_client_factory=None,
) -> int:
"""
Execute a BigQuery SQL query and register the result as a DuckDB view.
Uses the Python BigQuery client (Query API) which supports BQ views,
unlike the DuckDB BigQuery extension (Storage Read API).
The result is held in memory as a PyArrow table -- no disk I/O.
Args:
conn: Open DuckDB connection
table_id: BQ table ID for logging (e.g., "project.dataset.table")
view_name: Name to register in DuckDB (e.g., "unit_economics_live")
sql: Full BigQuery SQL query to execute
bq_project: GCP project for billing. If None, uses BIGQUERY_PROJECT env var.
_bq_client_factory: Override BQ client creation (for testing)
Returns:
Number of rows in the result
Raises:
ImportError: If google-cloud-bigquery is not installed
ValueError: If bq_project is not set
"""
project = bq_project or os.environ.get("BIGQUERY_PROJECT")
if not project:
raise ValueError(
"BigQuery project not set. "
"Pass bq_project or set BIGQUERY_PROJECT env var."
)
logger.info(f"Querying BQ: {table_id} -> {view_name}")
logger.debug(f"SQL: {sql[:200]}...")
factory = _bq_client_factory or _create_bq_client
client = factory(project)
job = client.query(sql)
# Use Query API (not Storage Read API) to support BQ views
try:
arrow_table = job.to_arrow()
except Exception as e:
if "readsessions" in str(e) or "PERMISSION_DENIED" in str(e):
logger.warning("BQ Storage API unavailable, falling back to REST")
arrow_table = job.to_arrow(create_bqstorage_client=False)
else:
raise
conn.register(view_name, arrow_table)
logger.info(
f"Registered {view_name}: {arrow_table.num_rows} rows, "
f"{arrow_table.num_columns} cols (in-memory)"
)
return arrow_table.num_rows
def get_remote_tables(table_configs: List[Dict]) -> List[Dict]:
"""Return table configs with query_mode 'remote' or 'hybrid'.
Args:
table_configs: List of table configuration dicts
Returns:
List of remote/hybrid table configs
"""
return [
tc for tc in table_configs
if tc.get("query_mode") in ("remote", "hybrid")
]
def create_local_views(
conn: duckdb.DuckDBPyConnection,
data_dir: str,
project_root: Optional[Path] = None,
verbose: bool = True,
) -> Tuple[List[str], List[str]]:
"""Create DuckDB views for local/hybrid tables from Parquet files.
Extracts the shared logic from init_duckdb() so it can be reused
by remote_query.py without creating a persistent DB file.
Args:
conn: Open DuckDB connection (in-memory or file-backed)
data_dir: Base data directory (e.g., "server", "/data/src_data")
project_root: Project root path. If None, auto-detected.
verbose: Print progress messages
Returns:
Tuple of (created_view_names, skipped_table_names)
"""
if project_root is None:
project_root = find_project_root()
table_configs, folder_mapping = parse_data_description(project_root)
# Filter to local and hybrid tables (both have local Parquet)
eligible_tables = [
tc for tc in table_configs
if tc.get("query_mode", "local") in ("local", "hybrid")
]
created_views = []
skipped_views = []
for table_config in eligible_tables:
table_name = table_config['name']
try:
parquet_path = get_parquet_path(table_config, folder_mapping, data_dir)
if not parquet_path.exists():
skipped_views.append(table_name)
if verbose:
print(f" [SKIP] {table_name} - parquet not found: {parquet_path}")
continue
# Determine if partitioned
sync_strategy = table_config.get('sync_strategy', 'full_refresh')
is_partitioned = (
sync_strategy == "partitioned" or
(sync_strategy == "incremental" and table_config.get('partition_by'))
)
if is_partitioned:
glob_pattern = parquet_path / "*.parquet"
partition_files = list(parquet_path.glob("*.parquet"))
if not partition_files:
skipped_views.append(table_name)
if verbose:
print(f" [SKIP] {table_name} - no partition files")
continue
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{glob_pattern}', union_by_name=true)"
if verbose:
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
print(f" [OK] {table_name} ({len(partition_files)} partitions, {mode_label})")
else:
sql = f"CREATE OR REPLACE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}')"
if verbose:
mode_label = "hybrid" if table_config.get("query_mode") == "hybrid" else "local"
print(f" [OK] {table_name} ({mode_label})")
conn.execute(sql)
created_views.append(table_name)
except Exception as e:
if verbose:
print(f" [ERR] Error creating {table_name}: {e}")
raise
return created_views, skipped_views
def init_duckdb(
db_path="user/duckdb/analytics.duckdb",
data_dir="server",
verbose=True,
bq_project: Optional[str] = None,
):
"""
Initialize DuckDB database with views from parquet files.
Creates DuckDB views for local/hybrid tables (from Parquet).
Remote tables are NOT pre-loaded -- they are registered at query time
via register_bq_table().
Args:
db_path: Path to DuckDB database file
data_dir: Base data directory (e.g., "server" for analysts, "data" for server)
verbose: Print progress messages
bq_project: BigQuery execution project (for informational purposes only)
Returns:
True if successful, False otherwise
"""
# Ensure database directory exists
os.makedirs(os.path.dirname(db_path), exist_ok=True)
if verbose:
print("Initializing DuckDB database...")
try:
project_root = find_project_root()
if verbose:
print(f" Project root: {project_root}")
table_configs, folder_mapping = parse_data_description(project_root)
if verbose:
print(f" Loaded {len(table_configs)} tables from data_description.md")
# Classify tables by query_mode (for display only)
remote_tables = [tc for tc in table_configs if tc.get("query_mode") == "remote"]
local_count = len([tc for tc in table_configs if tc.get("query_mode", "local") == "local"])
hybrid_count = len([tc for tc in table_configs if tc.get("query_mode") == "hybrid"])
if verbose:
print(f" Query modes: {local_count} local, "
f"{len(remote_tables)} remote, {hybrid_count} hybrid")
# Connect to database (creates if doesn't exist)
conn = duckdb.connect(db_path)
if verbose:
print("\n Creating views from parquet files...")
# Delegate to create_local_views
created_views, skipped_views = create_local_views(
conn, data_dir, project_root=project_root, verbose=verbose,
)
# Log remote tables (queried at runtime via register_bq_table)
if remote_tables:
if verbose:
print("\n Remote tables (queried at runtime via BigQuery):")
for table_config in remote_tables:
table_name = table_config['name']
table_id = table_config['id']
if verbose:
print(f" [BQ] {table_name} -> {table_id}")
# Display table list with row counts
if verbose:
print(f"\n Available tables ({len(created_views)} local views):")
tables = conn.execute("SHOW TABLES").fetchall()
for table in tables:
try:
row_count = conn.execute(f"SELECT COUNT(*) FROM {table[0]}").fetchone()[0]
print(f" - {table[0]}: {row_count:,} rows (local)")
except Exception:
print(f" - {table[0]}: (error counting rows)")
if remote_tables:
print(f"\n Remote tables ({len(remote_tables)}, loaded on demand):")
for tc in remote_tables:
print(f" - {tc['name']}: via BQ Query API (use date filters!)")
# Close connection
conn.close()
if verbose:
print(f"\n DuckDB database created: {db_path}")
return True
except Exception as e:
if verbose:
print(f"\n Error initializing DuckDB: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Main entry point for command-line usage."""
parser = argparse.ArgumentParser(
description="DuckDB Manager - Initialize and manage DuckDB database"
)
parser.add_argument(
'--reinit',
action='store_true',
help='Reinitialize all DuckDB views from parquet files'
)
parser.add_argument(
'--db-path',
default='user/duckdb/analytics.duckdb',
help='Path to DuckDB database file (default: user/duckdb/analytics.duckdb)'
)
parser.add_argument(
'--data-dir',
default='server',
help='Base data directory (default: server, use "data" for server deployment)'
)
parser.add_argument(
'--bq-project',
default=None,
help='BigQuery execution project (informational only)'
)
parser.add_argument(
'--quiet',
action='store_true',
help='Suppress progress messages'
)
args = parser.parse_args()
# If no command provided, show help
if not args.reinit:
parser.print_help()
sys.exit(1)
# Run initialization
success = init_duckdb(
db_path=args.db_path,
data_dir=args.data_dir,
verbose=not args.quiet,
bq_project=args.bq_project,
)
# Exit with appropriate code
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()