agnes-the-ai-analyst/connectors/keboola/adapter.py
Petr b99ec576ca Add self-service data onboarding system
Table Registry as central source of truth (JSON) with atomic writes,
optimistic locking, audit logging, and data_description.md generation.
Existing readers (config.py, profiler.py) need zero changes.

Phase 1 - Discovery API:
  - discover_tables() on DataSource ABC + Keboola implementation
  - admin_required decorator with server-side recomputation
  - GET /api/admin/discover-tables endpoint

Phase 2 - Table Registry:
  - src/table_registry.py with CRUD, validation, migration from MD
  - Admin API: register/update/unregister with version locking
  - DELETE cascade cleans up per-user subscriptions

Phase 3 - Auto-Profiling:
  - profile_changed_tables() for incremental profiling
  - Non-fatal hook in sync_all() after successful sync

Phase 4 - Per-Table Subscriptions:
  - table_mode (all/explicit) with per-table toggles
  - GET/POST /api/table-subscriptions endpoints
  - Subscription status in catalog and dashboard views

Phase 5 - Smart Sync:
  - Python-generated rsync filter files (not shell YAML parsing)
  - sync_data.sh uses --filter="merge ..." for explicit mode

Phase 6 - Admin UI:
  - /admin/tables with discovery, registration modal, registry mgmt
  - Vanilla JS, matching existing design system
2026-03-09 14:25:37 +01:00

820 lines
32 KiB
Python

"""
Keboola data source adapter.
Implements the DataSource interface for Keboola Storage API.
Downloads tables via the Storage API, converts CSV exports to Parquet files
with full type metadata from Keboola column metadata.
"""
import logging
import tempfile
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import pyarrow as pa
from tqdm import tqdm
from src.config import get_config, TableConfig
from src.data_sync import DataSource, SyncState, _get_uncompressed_size
from src.parquet_manager import (
create_parquet_manager,
_convert_column,
convert_date_columns_to_date32,
apply_schema_to_table,
)
from .client import create_client as create_keboola_client
logger = logging.getLogger(__name__)
class KeboolaDataSource(DataSource):
"""
Data source: Direct download from Keboola Storage API.
Downloads data directly from a Keboola project, converts CSV exports
to typed Parquet files using column metadata for schema enforcement.
"""
def __init__(self):
"""Initialize Keboola source with full env var validation."""
self.config = get_config()
# Validate all required Keboola env vars before proceeding
missing = []
if not self.config.keboola_token:
missing.append("KEBOOLA_STORAGE_TOKEN")
if not self.config.keboola_stack_url:
missing.append("KEBOOLA_STACK_URL")
if not self.config.keboola_project_id:
missing.append("KEBOOLA_PROJECT_ID")
if missing:
raise ValueError(
f"Missing required environment variables for Keboola connector: "
f"{', '.join(missing)}. See config/.env.template"
)
self.keboola_client = create_keboola_client()
self.parquet_manager = create_parquet_manager()
def get_column_metadata(self, table_id: str) -> Optional[Dict[str, Any]]:
"""Return Keboola metadata with provider cascade applied.
Delegates type resolution to the client's _resolve_keboola_type(),
and extracts descriptions via provider priority cascade.
Returns:
{"columns": {"col_name": {"source_type": "...", "description": "..."}}}
or None if metadata is unavailable.
"""
raw = self.keboola_client.get_table_metadata(table_id)
column_metadata = raw.get("column_metadata", {})
if not column_metadata:
return None
PROVIDER_PRIORITY = [
"user",
"ai-metadata-enrichment",
"keboola.snowflake-transformation",
]
result = {}
for col_name, col_meta_list in column_metadata.items():
# Delegate type resolution to client
source_type = self.keboola_client._resolve_keboola_type(col_meta_list)
# Extract description via provider cascade
description = None
if isinstance(col_meta_list, list):
description_by_provider = {}
for entry in col_meta_list:
provider = entry.get("provider", "")
key = entry.get("key", "")
value = entry.get("value", "")
if key == "KBC.description":
description_by_provider[provider] = value
for p in PROVIDER_PRIORITY:
if p in description_by_provider:
description = description_by_provider[p]
break
result[col_name] = {"source_type": source_type}
if description:
result[col_name]["description"] = description
return {"columns": result}
def discover_tables(self) -> List[Dict[str, Any]]:
"""Discover all available tables from Keboola Storage."""
return self.keboola_client.discover_all_tables()
def get_source_name(self) -> str:
"""Display name of this data source."""
return "Keboola Storage API"
def _cleanup_staging(self):
"""
Remove all files from staging directory.
Called before chunked initial load and after failures to free up disk space.
"""
staging_dir = self.config.get_staging_path()
for f in staging_dir.glob("*"):
if f.is_file():
try:
f.unlink()
logger.debug(f"Cleaned up staging file: {f.name}")
except Exception as e:
logger.warning(f"Failed to clean up {f.name}: {e}")
def sync_table(
self,
table_config: TableConfig,
sync_state: SyncState,
) -> Dict[str, Any]:
"""
Synchronize table from Keboola.
According to sync_strategy calls _full_refresh or _incremental_sync.
Args:
table_config: Table configuration
sync_state: Sync state manager
Returns:
Dictionary with result:
- success: bool
- rows: int
- strategy: str
- error: str (if failed)
"""
logger.info(f"Syncing table: {table_config.name} ({table_config.sync_strategy})")
# Refresh metadata cache for this table to get latest types from Keboola
if table_config.id in self.keboola_client.metadata_cache:
del self.keboola_client.metadata_cache[table_config.id]
logger.debug(f"Cleared metadata cache for {table_config.id}")
try:
if table_config.sync_strategy == "full_refresh":
result = self._full_refresh(table_config)
elif table_config.sync_strategy == "partitioned":
result = self._partitioned_sync(table_config)
else: # incremental
result = self._incremental_sync(table_config, sync_state)
# Update sync state
sync_state.update_sync(
table_id=table_config.id,
table_name=table_config.name,
strategy=table_config.sync_strategy,
rows=result["rows"],
file_size_bytes=result["file_size_bytes"],
columns=result.get("columns", 0),
uncompressed_bytes=result.get("uncompressed_bytes", 0),
)
return {
"success": True,
"rows": result["rows"],
"strategy": table_config.sync_strategy,
"file_size_mb": result["file_size_bytes"] / 1024 / 1024,
}
except Exception as e:
logger.error(f"Error syncing table {table_config.name}: {e}")
return {
"success": False,
"error": str(e),
"strategy": table_config.sync_strategy,
}
def _full_refresh(self, table_config: TableConfig) -> Dict[str, Any]:
"""
Full refresh sync strategy.
Downloads entire table and replaces existing Parquet file.
"""
logger.info(f"Full refresh: {table_config.name}")
parquet_path = self.config.get_parquet_path(table_config)
staging_dir = self.config.get_staging_path()
with tempfile.NamedTemporaryFile(
mode="w", suffix=".csv", delete=False, dir=staging_dir
) as tmp_file:
tmp_csv_path = Path(tmp_file.name)
try:
# 1. Export from Keboola to CSV
filters_desc = ""
if table_config.where_filters:
filters_desc = f" (filters: {len(table_config.where_filters)})"
logger.info(f" -> Exporting from Keboola...{filters_desc}")
export_info = self.keboola_client.export_table(
table_id=table_config.id,
output_path=tmp_csv_path,
where_filters=table_config.where_filters if table_config.where_filters else None,
)
# 2. Get dtypes for proper conversion
dtypes = self.keboola_client.get_pandas_dtypes(table_config.id)
date_columns = self.keboola_client.get_date_columns(table_config.id)
pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id)
# 3. Convert CSV -> Parquet
logger.info(" -> Converting to Parquet...")
parquet_info = self.parquet_manager.csv_to_parquet(
csv_path=tmp_csv_path,
parquet_path=parquet_path,
dtypes=dtypes,
table_id=table_config.id,
date_columns=date_columns,
pyarrow_schema=pyarrow_schema,
)
return {
"rows": parquet_info["rows"],
"file_size_bytes": parquet_info["parquet_size_bytes"],
"columns": parquet_info.get("columns", 0),
"uncompressed_bytes": _get_uncompressed_size(parquet_path),
}
finally:
if tmp_csv_path.exists():
tmp_csv_path.unlink()
def _incremental_sync(
self,
table_config: TableConfig,
sync_state: SyncState,
) -> Dict[str, Any]:
"""
Incremental sync strategy.
Downloads only changed rows using changedSince API parameter.
If partition_by is configured, outputs are partitioned.
Otherwise, merges into a single Parquet file.
"""
if table_config.partition_by:
return self._incremental_partitioned_sync(table_config, sync_state)
return self._incremental_single_file_sync(table_config, sync_state)
def _incremental_single_file_sync(
self,
table_config: TableConfig,
sync_state: SyncState,
) -> Dict[str, Any]:
"""
Incremental sync to a single Parquet file (no partitioning).
"""
logger.info(f"Incremental sync (single file): {table_config.name}")
parquet_path = self.config.get_parquet_path(table_config)
staging_dir = self.config.get_staging_path()
# Determine timestamp for changedSince
last_sync = sync_state.get_last_sync(table_config.id)
if last_sync:
last_sync_dt = datetime.fromisoformat(last_sync)
window_days = table_config.incremental_window_days or 7
changed_since_dt = last_sync_dt - timedelta(days=window_days)
changed_since = changed_since_dt.isoformat()
logger.info(
f" -> ChangedSince: {changed_since} (window: {window_days} days)"
)
else:
if table_config.max_history_days:
changed_since_dt = datetime.now() - timedelta(days=table_config.max_history_days)
changed_since = changed_since_dt.isoformat()
logger.info(
f" -> First sync, limited to last {table_config.max_history_days} days "
f"(changedSince: {changed_since})"
)
else:
logger.info(" -> First sync, downloading all data...")
changed_since = None
with tempfile.NamedTemporaryFile(
mode="w", suffix=".csv", delete=False, dir=staging_dir
) as tmp_file:
tmp_csv_path = Path(tmp_file.name)
try:
# 1. Export changed data from Keboola
logger.info(" -> Exporting changes from Keboola...")
export_info = self.keboola_client.export_table(
table_id=table_config.id,
output_path=tmp_csv_path,
changed_since=changed_since,
)
if export_info["exported_rows"] == 0:
logger.info(" -> No changes since last synchronization")
if parquet_path.exists():
existing_info = self.parquet_manager.get_parquet_info(parquet_path)
return {
"rows": existing_info["rows"],
"file_size_bytes": existing_info["file_size_bytes"],
"columns": existing_info.get("columns", 0),
"uncompressed_bytes": _get_uncompressed_size(parquet_path),
}
else:
return {"rows": 0, "file_size_bytes": 0, "columns": 0, "uncompressed_bytes": 0}
# 2. Get dtypes and date columns
dtypes = self.keboola_client.get_pandas_dtypes(table_config.id)
date_columns = self.keboola_client.get_date_columns(table_config.id)
pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id)
# 3. If Parquet exists, merge; otherwise create new
if parquet_path.exists():
logger.info(
f" -> Merging {export_info['exported_rows']} changes into Parquet..."
)
with tempfile.NamedTemporaryFile(
mode="w", suffix=".parquet", delete=False, dir=staging_dir
) as tmp_parquet_file:
tmp_parquet_path = Path(tmp_parquet_file.name)
try:
merge_info = self.parquet_manager.merge_parquet(
existing_parquet=parquet_path,
new_csv=tmp_csv_path,
output_parquet=tmp_parquet_path,
primary_key=table_config.get_primary_key_columns(),
dtypes=dtypes,
date_columns=date_columns,
pyarrow_schema=pyarrow_schema,
)
tmp_parquet_path.replace(parquet_path)
return {
"rows": merge_info["total_rows"],
"file_size_bytes": parquet_path.stat().st_size,
"columns": merge_info.get("total_columns", 0),
"uncompressed_bytes": _get_uncompressed_size(parquet_path),
}
finally:
if tmp_parquet_path.exists():
tmp_parquet_path.unlink()
else:
logger.info(" -> Creating new Parquet...")
parquet_info = self.parquet_manager.csv_to_parquet(
csv_path=tmp_csv_path,
parquet_path=parquet_path,
dtypes=dtypes,
table_id=table_config.id,
date_columns=date_columns,
pyarrow_schema=pyarrow_schema,
)
return {
"rows": parquet_info["rows"],
"file_size_bytes": parquet_info["parquet_size_bytes"],
"columns": parquet_info.get("columns", 0),
"uncompressed_bytes": _get_uncompressed_size(parquet_path),
}
finally:
if tmp_csv_path.exists():
tmp_csv_path.unlink()
def _incremental_partitioned_sync(
self,
table_config: TableConfig,
sync_state: SyncState,
) -> Dict[str, Any]:
"""
Incremental sync with partitioned output.
Downloads only changed rows using changedSince API parameter,
then partitions by partition_by column and merges into existing
partition files. Same logic as _partitioned_sync but uses
changedSince instead of whereFilters.
For initial load of large tables (max_history_days > chunk_days),
uses chunked download to avoid filling up disk space. Each chunk
has 1-day overlap with the next to ensure no data is lost at boundaries.
"""
import pandas as pd
logger.info(
f"Incremental sync (partitioned): {table_config.name} "
f"(by {table_config.partition_by}, {table_config.partition_granularity})"
)
partition_dir = self.config.get_parquet_path(table_config)
staging_dir = self.config.get_staging_path()
last_sync = sync_state.get_last_sync(table_config.id)
# For initial load (no last_sync), always use chunked approach
if not last_sync:
return self._chunked_initial_load(table_config, partition_dir, staging_dir)
# Regular incremental sync
last_sync_dt = datetime.fromisoformat(last_sync)
window_days = table_config.incremental_window_days or 7
changed_since_dt = last_sync_dt - timedelta(days=window_days)
changed_since = changed_since_dt.isoformat()
logger.info(
f" -> ChangedSince: {changed_since} (window: {window_days} days)"
)
with tempfile.NamedTemporaryFile(
mode="w", suffix=".csv", delete=False, dir=staging_dir
) as tmp_file:
tmp_csv_path = Path(tmp_file.name)
try:
logger.info(" -> Exporting changes from Keboola...")
export_info = self.keboola_client.export_table(
table_id=table_config.id,
output_path=tmp_csv_path,
changed_since=changed_since,
)
if export_info["exported_rows"] == 0:
logger.info(" -> No changes since last synchronization")
return self._get_partition_totals(partition_dir)
dtypes = self.keboola_client.get_pandas_dtypes(table_config.id)
date_columns = self.keboola_client.get_date_columns(table_config.id)
pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id)
logger.info(f" -> Processing {export_info['exported_rows']} changed rows...")
partitions_updated = self._process_csv_to_partitions(
tmp_csv_path, table_config, partition_dir,
dtypes=dtypes, date_columns=date_columns, pyarrow_schema=pyarrow_schema,
)
self._deduplicate_partitions(
table_config, partitions_updated,
date_columns=date_columns, pyarrow_schema=pyarrow_schema,
)
logger.info(f" -> Incremental sync complete, {len(partitions_updated)} partitions updated")
return self._get_partition_totals(partition_dir)
finally:
if tmp_csv_path.exists():
tmp_csv_path.unlink()
def _chunked_initial_load(
self,
table_config: TableConfig,
partition_dir: Path,
staging_dir: Path,
) -> Dict[str, Any]:
"""
Chunked initial load for large tables.
Downloads data in time-window chunks to avoid filling up disk space.
Each chunk has 1-day overlap with the next to ensure no data is lost
at boundaries. Deduplication removes any duplicates from overlaps.
"""
chunk_days = table_config.initial_load_chunk_days
max_history_days = table_config.max_history_days
overlap_days = 1
max_chunks_safety = 120
now = datetime.now()
if max_history_days:
num_chunks = (max_history_days + chunk_days - 1) // chunk_days
logger.info(
f" -> CHUNKED INITIAL LOAD: {max_history_days} days in {num_chunks} chunks "
f"of {chunk_days} days each (with {overlap_days}-day overlap)"
)
else:
num_chunks = None
logger.info(
f" -> CHUNKED INITIAL LOAD: iterating backwards in {chunk_days}-day chunks "
f"until no more data (with {overlap_days}-day overlap)"
)
self._cleanup_staging()
dtypes = self.keboola_client.get_pandas_dtypes(table_config.id)
date_columns = self.keboola_client.get_date_columns(table_config.id)
pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id)
all_partitions_updated = set()
chunk_idx = 0
consecutive_empty_chunks = 0
while True:
if chunk_idx >= max_chunks_safety:
logger.warning(f" -> Reached safety limit of {max_chunks_safety} chunks, stopping")
break
if num_chunks is not None and chunk_idx >= num_chunks:
break
chunk_end_offset = chunk_idx * chunk_days
chunk_start_offset = chunk_end_offset + chunk_days + overlap_days
chunk_end = now - timedelta(days=chunk_end_offset) if chunk_idx > 0 else None
chunk_start = now - timedelta(days=chunk_start_offset)
if max_history_days and chunk_start_offset > max_history_days:
chunk_start = now - timedelta(days=max_history_days)
chunk_label = f"{chunk_idx + 1}" if num_chunks is None else f"{chunk_idx + 1}/{num_chunks}"
logger.info(
f" -> Chunk {chunk_label}: "
f"{chunk_start.strftime('%Y-%m-%d')} to "
f"{chunk_end.strftime('%Y-%m-%d') if chunk_end else 'now'}"
)
with tempfile.NamedTemporaryFile(
mode="w", suffix=".csv", delete=False, dir=staging_dir
) as tmp_file:
tmp_csv_path = Path(tmp_file.name)
try:
export_info = self.keboola_client.export_table(
table_id=table_config.id,
output_path=tmp_csv_path,
changed_since=chunk_start.isoformat(),
changed_until=chunk_end.isoformat() if chunk_end else None,
)
if export_info["exported_rows"] == 0:
logger.info(" -> No data in this chunk")
consecutive_empty_chunks += 1
if num_chunks is None and consecutive_empty_chunks >= 2:
logger.info(
f" -> Found {consecutive_empty_chunks} consecutive empty chunks, "
f"assuming end of history"
)
break
chunk_idx += 1
continue
consecutive_empty_chunks = 0
logger.info(f" -> Exported {export_info['exported_rows']} rows")
partitions_updated = self._process_csv_to_partitions(
tmp_csv_path, table_config, partition_dir,
dtypes=dtypes, date_columns=date_columns, pyarrow_schema=pyarrow_schema,
)
all_partitions_updated.update(partitions_updated)
logger.info(f" -> Processed into {len(partitions_updated)} partitions")
finally:
if tmp_csv_path.exists():
tmp_csv_path.unlink()
chunk_idx += 1
if all_partitions_updated:
logger.info(
f" -> Final deduplication of {len(all_partitions_updated)} partitions "
f"(removing duplicates from {overlap_days}-day overlaps)..."
)
self._deduplicate_partitions(
table_config, all_partitions_updated,
date_columns=date_columns, pyarrow_schema=pyarrow_schema,
)
logger.info(
f" -> Chunked initial load complete: {len(all_partitions_updated)} partitions"
)
return self._get_partition_totals(partition_dir)
def _process_csv_to_partitions(
self,
csv_path: Path,
table_config: TableConfig,
partition_dir: Path,
dtypes: Optional[Dict[str, str]] = None,
date_columns: Optional[List[str]] = None,
pyarrow_schema: Optional[pa.Schema] = None,
) -> set:
"""
Process CSV file and write to partition files.
Returns:
Set of partition keys that were updated
"""
import pandas as pd
import pyarrow.parquet as pq
partition_col = table_config.partition_by
granularity = table_config.partition_granularity or "month"
partitions_updated = set()
chunk_size = 500000 # 500k rows per pandas chunk
chunk_num = 0
for chunk_df in pd.read_csv(csv_path, chunksize=chunk_size, dtype=str):
chunk_num += 1
logger.debug(f" -> Processing pandas chunk {chunk_num} ({len(chunk_df)} rows)...")
if partition_col not in chunk_df.columns:
raise ValueError(f"Partition column '{partition_col}' not found in data")
# Apply dtypes using _convert_column (except datetime columns)
if dtypes:
for col, dtype in dtypes.items():
if col in chunk_df.columns and "datetime" not in dtype:
try:
chunk_df[col] = _convert_column(chunk_df[col], dtype, col_name=col)
except Exception as e:
logger.warning(f"Failed to apply dtype {dtype} to column {col}: {e}")
# Convert partition column to datetime
if not pd.api.types.is_datetime64_any_dtype(chunk_df[partition_col]):
chunk_df[partition_col] = pd.to_datetime(
chunk_df[partition_col], format="ISO8601", utc=True
)
# Create partition key based on granularity
if granularity == "month":
chunk_df["_partition_key"] = chunk_df[partition_col].dt.strftime("%Y_%m")
elif granularity == "day":
chunk_df["_partition_key"] = chunk_df[partition_col].dt.strftime("%Y_%m_%d")
elif granularity == "year":
chunk_df["_partition_key"] = chunk_df[partition_col].dt.strftime("%Y")
# Group by partition and append to partition files
for partition_key, partition_df in chunk_df.groupby("_partition_key"):
partition_df = partition_df.drop(columns=["_partition_key"])
partition_path = self.config.get_partition_path(table_config, partition_key)
partitions_updated.add(partition_key)
if partition_path.exists():
existing_df = pd.read_parquet(partition_path)
merged_df = pd.concat([existing_df, partition_df], ignore_index=True)
table = pa.Table.from_pandas(merged_df, preserve_index=False)
if date_columns:
table = convert_date_columns_to_date32(table, date_columns)
if pyarrow_schema:
table = apply_schema_to_table(table, pyarrow_schema)
pq.write_table(table, partition_path, compression="snappy")
else:
table = pa.Table.from_pandas(partition_df, preserve_index=False)
if date_columns:
table = convert_date_columns_to_date32(table, date_columns)
if pyarrow_schema:
table = apply_schema_to_table(table, pyarrow_schema)
pq.write_table(table, partition_path, compression="snappy")
return partitions_updated
def _deduplicate_partitions(
self,
table_config: TableConfig,
partitions_to_dedup: set,
date_columns: Optional[List[str]] = None,
pyarrow_schema: Optional[pa.Schema] = None,
):
"""
Deduplicate partition files based on primary key.
"""
import pandas as pd
import pyarrow.parquet as pq
primary_key_cols = table_config.get_primary_key_columns()
logger.info(f" -> Deduplicating {len(partitions_to_dedup)} partitions...")
for partition_key in sorted(partitions_to_dedup):
partition_path = self.config.get_partition_path(table_config, partition_key)
if not partition_path.exists():
continue
df = pd.read_parquet(partition_path)
rows_before = len(df)
df = df.drop_duplicates(subset=primary_key_cols, keep="last")
rows_after = len(df)
table = pa.Table.from_pandas(df, preserve_index=False)
if date_columns:
table = convert_date_columns_to_date32(table, date_columns)
if pyarrow_schema:
table = apply_schema_to_table(table, pyarrow_schema)
pq.write_table(table, partition_path, compression="snappy")
if rows_before != rows_after:
logger.debug(
f" -> Partition {partition_key}: {rows_before} -> {rows_after} rows "
f"(removed {rows_before - rows_after} duplicates)"
)
def _get_partition_totals(self, partition_dir: Path) -> Dict[str, Any]:
"""
Calculate totals from all partition files in a directory.
"""
import pyarrow.parquet as pq
total_rows = 0
total_size = 0
total_uncompressed = 0
total_columns = 0
if not partition_dir.exists():
return {"rows": 0, "file_size_bytes": 0, "columns": 0, "uncompressed_bytes": 0}
all_partitions = list(partition_dir.glob("*.parquet"))
for part_path in all_partitions:
try:
pf = pq.ParquetFile(part_path)
meta = pf.metadata
total_rows += meta.num_rows
total_size += part_path.stat().st_size
if total_columns == 0:
total_columns = len(pf.schema_arrow)
for rg_idx in range(meta.num_row_groups):
rg = meta.row_group(rg_idx)
for col_idx in range(rg.num_columns):
total_uncompressed += rg.column(col_idx).total_uncompressed_size
except Exception as e:
logger.warning(f" -> Skipping corrupt partition {part_path.name}: {e}")
return {
"rows": total_rows,
"file_size_bytes": total_size,
"partitions": len(all_partitions),
"columns": total_columns,
"uncompressed_bytes": total_uncompressed,
}
def _partitioned_sync(self, table_config: TableConfig) -> Dict[str, Any]:
"""
Partitioned sync strategy.
Downloads data and splits into monthly (or other granularity) partitions.
Each partition is stored as separate Parquet file and merged independently.
"""
logger.info(
f"Partitioned sync: {table_config.name} "
f"(by {table_config.partition_by}, {table_config.partition_granularity})"
)
partition_dir = self.config.get_parquet_path(table_config)
staging_dir = self.config.get_staging_path()
with tempfile.NamedTemporaryFile(
mode="w", suffix=".csv", delete=False, dir=staging_dir
) as tmp_file:
tmp_csv_path = Path(tmp_file.name)
try:
filters_desc = ""
if table_config.where_filters:
filters_desc = f" (filters: {len(table_config.where_filters)})"
logger.info(f" -> Exporting from Keboola...{filters_desc}")
export_info = self.keboola_client.export_table(
table_id=table_config.id,
output_path=tmp_csv_path,
where_filters=table_config.where_filters if table_config.where_filters else None,
)
if export_info["exported_rows"] == 0:
logger.info(" -> No data exported")
return self._get_partition_totals(partition_dir)
dtypes = self.keboola_client.get_pandas_dtypes(table_config.id)
date_columns = self.keboola_client.get_date_columns(table_config.id)
pyarrow_schema = self.keboola_client.get_pyarrow_schema(table_config.id)
logger.info(f" -> Processing CSV in chunks ({export_info['exported_rows']} rows)...")
partitions_seen = self._process_csv_to_partitions(
tmp_csv_path, table_config, partition_dir,
dtypes=dtypes, date_columns=date_columns, pyarrow_schema=pyarrow_schema,
)
self._deduplicate_partitions(
table_config, partitions_seen,
date_columns=date_columns, pyarrow_schema=pyarrow_schema,
)
totals = self._get_partition_totals(partition_dir)
logger.info(
f" -> Partitioned sync complete: {totals.get('partitions', 0)} partitions on disk, "
f"{totals['rows']} total rows"
)
return totals
finally:
if tmp_csv_path.exists():
tmp_csv_path.unlink()