Add src/remote_query.py CLI module enabling the AI agent to run SQL queries spanning local Parquet tables and remote BigQuery tables in a single DuckDB session on the server. Two-phase protocol: BQ sub-queries (--register-bq) fetch filtered/aggregated data, then DuckDB SQL (--sql) joins everything. Safety: COUNT(*) pre-check, memory estimation (2GB cap), row limits (500K per BQ sub-query, 100K final result). Changes: - New src/remote_query.py with CLI, BQ registration, output formatting - Add bq_entity_type field to TableConfig (view vs table routing) - Extract create_local_views() from duckdb_manager.py for reuse - Update claude_md_template.txt with remote query agent instructions - Update example configs with remote_query section and docs - 52 new tests (42 remote_query + 10 bq_entity_type), all passing
164 lines
6.6 KiB
Text
164 lines
6.6 KiB
Text
# Data Description
|
|
|
|
This file defines the tables available for synchronization and analysis.
|
|
Copy this file to `data_description.md` and customize for your data sources.
|
|
|
|
## Tables
|
|
|
|
```yaml
|
|
# Folder mapping: data source bucket -> local folder name
|
|
folder_mapping:
|
|
"in.c-example": "example"
|
|
|
|
tables:
|
|
# Small reference table - full refresh, no automatic sync
|
|
- id: "in.c-example.customers"
|
|
name: "customers"
|
|
description: "Customer master data"
|
|
primary_key: "id"
|
|
sync_strategy: "full_refresh"
|
|
|
|
# Large transactional table - daily automatic sync with profiling
|
|
- id: "in.c-example.orders"
|
|
name: "orders"
|
|
description: "Order transactions with line items"
|
|
primary_key: "id"
|
|
sync_strategy: "partitioned"
|
|
partition_by: "created_at"
|
|
partition_column_type: "DATE"
|
|
partition_granularity: "day"
|
|
incremental_window_days: 3
|
|
max_history_days: 450
|
|
query_mode: "local"
|
|
sync_schedule: "daily 05:00"
|
|
profile_after_sync: true
|
|
|
|
# Frequently updated table - sync every hour, skip profiling
|
|
- id: "in.c-example.events"
|
|
name: "events"
|
|
description: "Real-time event stream"
|
|
primary_key: "event_id"
|
|
sync_strategy: "partitioned"
|
|
partition_by: "event_date"
|
|
partition_column_type: "DATE"
|
|
partition_granularity: "day"
|
|
incremental_window_days: 1
|
|
query_mode: "local"
|
|
sync_schedule: "every 1h"
|
|
profile_after_sync: false
|
|
|
|
# Remote table - too large for local sync, queried via BigQuery
|
|
- id: "project.dataset.page_views"
|
|
name: "page_views"
|
|
description: "Page-level traffic metrics (~5M rows/day)"
|
|
primary_key: "page_view_id"
|
|
sync_strategy: "partitioned"
|
|
partition_by: "event_date"
|
|
partition_column_type: "DATE"
|
|
partition_granularity: "day"
|
|
query_mode: "remote"
|
|
bq_entity_type: "view"
|
|
```
|
|
|
|
## Table Configuration Reference
|
|
|
|
### Required Fields
|
|
|
|
| Field | Description | Example |
|
|
|-------|-------------|---------|
|
|
| `id` | Full table identifier in data source | `"in.c-crm.company"` |
|
|
| `name` | Short name (used for Parquet filenames) | `"company"` |
|
|
| `description` | Human-readable description | `"Company master data"` |
|
|
| `primary_key` | Primary key column(s), comma-separated | `"id"` or `"order_id, line_id"` |
|
|
| `sync_strategy` | How data is downloaded (see below) | `"full_refresh"` |
|
|
|
|
### Sync Strategy
|
|
|
|
| Strategy | Description | Use for |
|
|
|----------|-------------|---------|
|
|
| `full_refresh` | Downloads entire table each sync | Small reference tables (< 100K rows) |
|
|
| `incremental` | Downloads changed rows via changedSince | Medium tables with update tracking |
|
|
| `partitioned` | Downloads by time partitions, overwrites only recent ones | Large tables with date column |
|
|
|
|
### Partitioning
|
|
|
|
| Field | Default | Description |
|
|
|-------|---------|-------------|
|
|
| `partition_by` | *(none)* | Column to partition by (e.g., `"created_at"`, `"event_date"`) |
|
|
| `partition_granularity` | `"month"` | `"day"`, `"month"`, or `"year"` |
|
|
| `partition_column_type` | `"TIMESTAMP"` | SQL type: `"DATE"`, `"TIMESTAMP"`, or `"DATETIME"` |
|
|
| `incremental_window_days` | `7` | How many recent days to re-download on each sync |
|
|
| `max_history_days` | *(all)* | Maximum history to keep (e.g., `450` for ~15 months) |
|
|
| `initial_load_chunk_days` | `30` | Chunk size for first-time download |
|
|
|
|
### Query Mode
|
|
|
|
| Field | Default | Description |
|
|
|-------|---------|-------------|
|
|
| `query_mode` | `"local"` | How the AI agent queries this table |
|
|
| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) |
|
|
|
|
| Mode | Description | Best for |
|
|
|------|-------------|----------|
|
|
| `local` | Synced to Parquet, queried via DuckDB | Tables < 2 GB, fast queries |
|
|
| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data |
|
|
| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data |
|
|
|
|
### Remote Table Metadata
|
|
|
|
Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent
|
|
write safe, efficient queries:
|
|
|
|
| Field | Description |
|
|
|-------|-------------|
|
|
| `grain` | Row granularity description |
|
|
| `volume` | Size estimates (rows_per_day, unique entities) |
|
|
| `columns` | Column descriptions with value distributions |
|
|
| `dimension_profile` | Cardinality per dimension |
|
|
| `query_result_estimates` | Expected rows after GROUP BY combinations |
|
|
| `join_keys` | How to join with other tables |
|
|
|
|
### Automatic Sync Schedule
|
|
|
|
| Field | Default | Description |
|
|
|-------|---------|-------------|
|
|
| `sync_schedule` | *(none)* | When to automatically sync this table |
|
|
| `profile_after_sync` | `true` | Run data profiler after sync completes |
|
|
|
|
The `sync_schedule` field controls automatic synchronization via the `data-refresh`
|
|
systemd timer (runs every 15 minutes). If omitted, the table is only synced manually.
|
|
|
|
**Schedule formats:**
|
|
|
|
| Format | Example | Description |
|
|
|--------|---------|-------------|
|
|
| `every {N}m` | `"every 15m"`, `"every 30m"` | Sync every N minutes |
|
|
| `every {N}h` | `"every 1h"`, `"every 6h"` | Sync every N hours |
|
|
| `daily HH:MM` | `"daily 05:00"`, `"daily 17:30"` | Sync once per day at HH:MM UTC |
|
|
| *(omitted)* | - | Manual sync only (`python -m src.data_sync`) |
|
|
|
|
**How scheduling works:**
|
|
- A systemd timer runs `python -m src.data_sync --scheduled` every 15 minutes
|
|
- For each table with `sync_schedule`, it checks the last sync time from `sync_state.json`
|
|
- `every` schedules: syncs if enough time has elapsed since last sync
|
|
- `daily` schedules: syncs once after the target time passes (skips if already synced today)
|
|
- Tables without `sync_schedule` are never synced automatically
|
|
|
|
**Profiling control:**
|
|
- `profile_after_sync: true` (default) - runs profiler after sync to update column statistics
|
|
- `profile_after_sync: false` - skips profiler (use for frequently synced tables where
|
|
profiling overhead is not worth it; the AI agent uses slightly older statistics)
|
|
- When profiling runs, the webapp is automatically restarted to load new statistics
|
|
|
|
### Optional Fields
|
|
|
|
| Field | Default | Description |
|
|
|-------|---------|-------------|
|
|
| `folder` | *(from folder_mapping)* | Override output folder name |
|
|
| `row_filter` | *(none)* | SQL WHERE clause (e.g., `"date >= DATE_SUB(CURRENT_DATE(), INTERVAL 15 MONTH)"`) |
|
|
| `columns` | *(all)* | List of columns to sync (subset) |
|
|
| `incremental_column` | *(none)* | Column for timestamp-based incremental sync (BigQuery) |
|
|
| `dataset` | *(none)* | Dataset group name for on-demand tables |
|
|
| `catalog_fqn` | *(auto)* | OpenMetadata FQN override (auto-derived from table ID if not set) |
|
|
| `foreign_keys` | `[]` | List of foreign key relationships |
|
|
| `where_filters` | `[]` | List of filters for Keboola Storage API |
|