agnes-the-ai-analyst/config/data_description.md.example
Petr d180b2014e Step 28: Remote query architecture for local+remote table JOINs
Add src/remote_query.py CLI module enabling the AI agent to run SQL
queries spanning local Parquet tables and remote BigQuery tables in a
single DuckDB session on the server. Two-phase protocol: BQ sub-queries
(--register-bq) fetch filtered/aggregated data, then DuckDB SQL (--sql)
joins everything.

Safety: COUNT(*) pre-check, memory estimation (2GB cap), row limits
(500K per BQ sub-query, 100K final result).

Changes:
- New src/remote_query.py with CLI, BQ registration, output formatting
- Add bq_entity_type field to TableConfig (view vs table routing)
- Extract create_local_views() from duckdb_manager.py for reuse
- Update claude_md_template.txt with remote query agent instructions
- Update example configs with remote_query section and docs
- 52 new tests (42 remote_query + 10 bq_entity_type), all passing
2026-03-21 11:39:15 +01:00

164 lines
6.6 KiB
Text

# Data Description
This file defines the tables available for synchronization and analysis.
Copy this file to `data_description.md` and customize for your data sources.
## Tables
```yaml
# Folder mapping: data source bucket -> local folder name
folder_mapping:
"in.c-example": "example"
tables:
# Small reference table - full refresh, no automatic sync
- id: "in.c-example.customers"
name: "customers"
description: "Customer master data"
primary_key: "id"
sync_strategy: "full_refresh"
# Large transactional table - daily automatic sync with profiling
- id: "in.c-example.orders"
name: "orders"
description: "Order transactions with line items"
primary_key: "id"
sync_strategy: "partitioned"
partition_by: "created_at"
partition_column_type: "DATE"
partition_granularity: "day"
incremental_window_days: 3
max_history_days: 450
query_mode: "local"
sync_schedule: "daily 05:00"
profile_after_sync: true
# Frequently updated table - sync every hour, skip profiling
- id: "in.c-example.events"
name: "events"
description: "Real-time event stream"
primary_key: "event_id"
sync_strategy: "partitioned"
partition_by: "event_date"
partition_column_type: "DATE"
partition_granularity: "day"
incremental_window_days: 1
query_mode: "local"
sync_schedule: "every 1h"
profile_after_sync: false
# Remote table - too large for local sync, queried via BigQuery
- id: "project.dataset.page_views"
name: "page_views"
description: "Page-level traffic metrics (~5M rows/day)"
primary_key: "page_view_id"
sync_strategy: "partitioned"
partition_by: "event_date"
partition_column_type: "DATE"
partition_granularity: "day"
query_mode: "remote"
bq_entity_type: "view"
```
## Table Configuration Reference
### Required Fields
| Field | Description | Example |
|-------|-------------|---------|
| `id` | Full table identifier in data source | `"in.c-crm.company"` |
| `name` | Short name (used for Parquet filenames) | `"company"` |
| `description` | Human-readable description | `"Company master data"` |
| `primary_key` | Primary key column(s), comma-separated | `"id"` or `"order_id, line_id"` |
| `sync_strategy` | How data is downloaded (see below) | `"full_refresh"` |
### Sync Strategy
| Strategy | Description | Use for |
|----------|-------------|---------|
| `full_refresh` | Downloads entire table each sync | Small reference tables (< 100K rows) |
| `incremental` | Downloads changed rows via changedSince | Medium tables with update tracking |
| `partitioned` | Downloads by time partitions, overwrites only recent ones | Large tables with date column |
### Partitioning
| Field | Default | Description |
|-------|---------|-------------|
| `partition_by` | *(none)* | Column to partition by (e.g., `"created_at"`, `"event_date"`) |
| `partition_granularity` | `"month"` | `"day"`, `"month"`, or `"year"` |
| `partition_column_type` | `"TIMESTAMP"` | SQL type: `"DATE"`, `"TIMESTAMP"`, or `"DATETIME"` |
| `incremental_window_days` | `7` | How many recent days to re-download on each sync |
| `max_history_days` | *(all)* | Maximum history to keep (e.g., `450` for ~15 months) |
| `initial_load_chunk_days` | `30` | Chunk size for first-time download |
### Query Mode
| Field | Default | Description |
|-------|---------|-------------|
| `query_mode` | `"local"` | How the AI agent queries this table |
| `bq_entity_type` | `"view"` | BigQuery entity type: `"view"` (Python BQ client) or `"table"` (DuckDB BQ extension) |
| Mode | Description | Best for |
|------|-------------|----------|
| `local` | Synced to Parquet, queried via DuckDB | Tables < 2 GB, fast queries |
| `remote` | Not synced, queried via BigQuery | Huge tables (100+ GB), live data |
| `hybrid` | Subset synced for profiling, queries go to BigQuery | Medium tables needing live data |
### Remote Table Metadata
Remote tables (`query_mode: "remote"`) should include metadata to help the AI agent
write safe, efficient queries:
| Field | Description |
|-------|-------------|
| `grain` | Row granularity description |
| `volume` | Size estimates (rows_per_day, unique entities) |
| `columns` | Column descriptions with value distributions |
| `dimension_profile` | Cardinality per dimension |
| `query_result_estimates` | Expected rows after GROUP BY combinations |
| `join_keys` | How to join with other tables |
### Automatic Sync Schedule
| Field | Default | Description |
|-------|---------|-------------|
| `sync_schedule` | *(none)* | When to automatically sync this table |
| `profile_after_sync` | `true` | Run data profiler after sync completes |
The `sync_schedule` field controls automatic synchronization via the `data-refresh`
systemd timer (runs every 15 minutes). If omitted, the table is only synced manually.
**Schedule formats:**
| Format | Example | Description |
|--------|---------|-------------|
| `every {N}m` | `"every 15m"`, `"every 30m"` | Sync every N minutes |
| `every {N}h` | `"every 1h"`, `"every 6h"` | Sync every N hours |
| `daily HH:MM` | `"daily 05:00"`, `"daily 17:30"` | Sync once per day at HH:MM UTC |
| *(omitted)* | - | Manual sync only (`python -m src.data_sync`) |
**How scheduling works:**
- A systemd timer runs `python -m src.data_sync --scheduled` every 15 minutes
- For each table with `sync_schedule`, it checks the last sync time from `sync_state.json`
- `every` schedules: syncs if enough time has elapsed since last sync
- `daily` schedules: syncs once after the target time passes (skips if already synced today)
- Tables without `sync_schedule` are never synced automatically
**Profiling control:**
- `profile_after_sync: true` (default) - runs profiler after sync to update column statistics
- `profile_after_sync: false` - skips profiler (use for frequently synced tables where
profiling overhead is not worth it; the AI agent uses slightly older statistics)
- When profiling runs, the webapp is automatically restarted to load new statistics
### Optional Fields
| Field | Default | Description |
|-------|---------|-------------|
| `folder` | *(from folder_mapping)* | Override output folder name |
| `row_filter` | *(none)* | SQL WHERE clause (e.g., `"date >= DATE_SUB(CURRENT_DATE(), INTERVAL 15 MONTH)"`) |
| `columns` | *(all)* | List of columns to sync (subset) |
| `incremental_column` | *(none)* | Column for timestamp-based incremental sync (BigQuery) |
| `dataset` | *(none)* | Dataset group name for on-demand tables |
| `catalog_fqn` | *(auto)* | OpenMetadata FQN override (auto-derived from table ID if not set) |
| `foreign_keys` | `[]` | List of foreign key relationships |
| `where_filters` | `[]` | List of filters for Keboola Storage API |