Add --format parquet using project's ParquetManager

Generator now supports --format {csv,parquet,both}. Parquet mode
uses src.parquet_manager.ParquetManager for snappy compression,
proper column types (DATE, TIMESTAMP, DOUBLE), and metadata.
No more ad-hoc pandas conversion needed on the server.
This commit is contained in:
Petr 2026-03-10 21:46:20 +01:00
parent 44bf43535b
commit 302494b632
4 changed files with 196 additions and 48 deletions

View file

@ -352,32 +352,14 @@ cd /opt/data-analyst/repo
# Install generator dependency # Install generator dependency
/opt/data-analyst/.venv/bin/pip install faker /opt/data-analyst/.venv/bin/pip install faker
# Generate synthetic e-commerce data (size m: ~20K orders, 100K sessions) # Generate Parquet files directly (uses project's ParquetManager
# for snappy compression, proper types, and metadata embedding)
/opt/data-analyst/.venv/bin/python scripts/generate_sample_data.py \ /opt/data-analyst/.venv/bin/python scripts/generate_sample_data.py \
--size m --output /tmp/sample_csv --seed 42 --size m --format parquet --output /data/src_data/parquet --seed 42
# Convert CSVs to Parquet and deploy to data directory
/opt/data-analyst/.venv/bin/python -c "
import pandas as pd
from pathlib import Path
csv_dir = Path('/tmp/sample_csv')
parquet_dir = Path('/data/src_data/parquet')
parquet_dir.mkdir(parents=True, exist_ok=True)
for f in sorted(csv_dir.glob('*.csv')):
df = pd.read_csv(f)
out = parquet_dir / f'{f.stem}.parquet'
df.to_parquet(out, index=False)
print(f' {f.stem}: {len(df):,} rows -> {out}')
"
# Set correct permissions # Set correct permissions
chown -R root:data-ops /data/src_data/parquet chown -R root:data-ops /data/src_data/parquet
chmod -R 2775 /data/src_data/parquet chmod -R 2775 /data/src_data/parquet
# Clean up temporary CSVs
rm -rf /tmp/sample_csv
``` ```
Available sizes: `xs` (50 customers, ~1 MB), `s` (500, ~15 MB), `m` (5K, ~150 MB), `l` (50K, ~1.5 GB). Available sizes: `xs` (50 customers, ~1 MB), `s` (500, ~15 MB), `m` (5K, ~150 MB), `l` (50K, ~1.5 GB).

View file

@ -172,25 +172,12 @@ To use sample data on a deployed server (instead of connecting a data adapter):
# On the server # On the server
cd /opt/data-analyst/repo cd /opt/data-analyst/repo
# Generate CSVs # Generate Parquet files directly using project's ParquetManager
.venv/bin/python scripts/generate_sample_data.py --size m --output /tmp/sample_csv # (snappy compression, proper column types, metadata embedding)
/opt/data-analyst/.venv/bin/python scripts/generate_sample_data.py \
--size m --format parquet --output /data/src_data/parquet --seed 42
# Convert to Parquet and deploy # Set correct permissions
.venv/bin/python -c " chown -R root:data-ops /data/src_data/parquet
import pandas as pd chmod -R 2775 /data/src_data/parquet
from pathlib import Path
csv_dir = Path('/tmp/sample_csv')
parquet_dir = Path('/data/src_data/parquet')
parquet_dir.mkdir(parents=True, exist_ok=True)
for f in sorted(csv_dir.glob('*.csv')):
df = pd.read_csv(f)
out = parquet_dir / f'{f.stem}.parquet'
df.to_parquet(out, index=False)
print(f' {f.stem}: {len(df):,} rows -> {out}')
"
# Clean up CSVs
rm -rf /tmp/sample_csv
``` ```

View file

@ -2,13 +2,13 @@
""" """
Sample data generator for AI Data Analyst demo and testing. Sample data generator for AI Data Analyst demo and testing.
Generates realistic synthetic e-commerce + marketing data as CSV files. Generates realistic synthetic e-commerce + marketing data as CSV or Parquet.
Tables: customers, products, campaigns, web_sessions, web_leads, Tables: customers, products, campaigns, web_sessions, web_leads,
orders, order_items, payments, support_tickets orders, order_items, payments, support_tickets
Usage: Usage:
python scripts/generate_sample_data.py --size xs --output data/sample python scripts/generate_sample_data.py --size s --output data/sample
python scripts/generate_sample_data.py --size m --seed 42 python scripts/generate_sample_data.py --size m --format parquet --output /data/src_data/parquet
python scripts/generate_sample_data.py --list-sizes python scripts/generate_sample_data.py --list-sizes
""" """
@ -278,19 +278,75 @@ TICKET_CHANNELS = [
("email", 0.40), ("chat", 0.30), ("phone", 0.15), ("web_form", 0.15), ("email", 0.40), ("chat", 0.30), ("phone", 0.15), ("web_form", 0.15),
] ]
# ── Parquet schema definitions (used by ParquetManager) ────────────────
TABLE_SCHEMAS = {
"customers": {
"dtypes": {"is_active": "Int64"},
"date_columns": ["registration_date"],
},
"products": {
"dtypes": {
"price": "float64", "cost": "float64",
"weight_kg": "float64", "is_active": "Int64",
},
"date_columns": ["created_at"],
},
"campaigns": {
"dtypes": {
"budget": "float64", "spend": "float64",
"impressions": "Int64", "clicks": "Int64",
},
"date_columns": ["start_date", "end_date"],
},
"web_sessions": {
"dtypes": {
"duration_seconds": "Int64", "pages_viewed": "Int64",
"is_bounce": "Int64",
},
"parse_dates": ["started_at"],
},
"web_leads": {
"parse_dates": ["created_at", "converted_at"],
},
"orders": {
"dtypes": {
"items_total": "float64", "discount_amount": "float64",
"shipping_amount": "float64", "total_amount": "float64",
},
"parse_dates": ["created_at"],
},
"order_items": {
"dtypes": {
"quantity": "Int64", "unit_price": "float64",
"discount_percent": "Int64", "line_total": "float64",
},
},
"payments": {
"dtypes": {"amount": "float64"},
"parse_dates": ["created_at", "completed_at"],
},
"support_tickets": {
"dtypes": {"satisfaction_score": "Int64"},
"parse_dates": ["created_at", "first_response_at", "resolved_at"],
},
}
# ── Generator ────────────────────────────────────────────────────────── # ── Generator ──────────────────────────────────────────────────────────
class SampleDataGenerator: class SampleDataGenerator:
"""Generates realistic synthetic e-commerce data as CSV files.""" """Generates realistic synthetic e-commerce data as CSV or Parquet."""
def __init__(self, size: str, seed: int, output_dir: Path): def __init__(self, size: str, seed: int, output_dir: Path,
output_format: str = "csv"):
self.cfg = SIZE_CONFIGS[size] self.cfg = SIZE_CONFIGS[size]
self.size_name = size self.size_name = size
self.rng = random.Random(seed) self.rng = random.Random(seed)
self.fake = Faker(["en_US", "de_DE", "cs_CZ", "fr_FR"]) self.fake = Faker(["en_US", "de_DE", "cs_CZ", "fr_FR"])
Faker.seed(seed) Faker.seed(seed)
self.output_dir = output_dir self.output_dir = output_dir
self.output_format = output_format # "csv", "parquet", or "both"
self.row_counts: dict[str, int] = {} self.row_counts: dict[str, int] = {}
# Time range # Time range
@ -846,6 +902,39 @@ class SampleDataGenerator:
self._write_table("support_tickets", fields, rows) self._write_table("support_tickets", fields, rows)
# ── Parquet conversion ─────────────────────────────────────
def _convert_to_parquet(self, parquet_dir: Path) -> None:
"""Convert generated CSVs to Parquet using project's ParquetManager."""
# Ensure project root is importable (script may run from any cwd)
project_root = Path(__file__).resolve().parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from src.parquet_manager import create_parquet_manager
manager = create_parquet_manager()
parquet_dir.mkdir(parents=True, exist_ok=True)
logger.info(f" Converting to Parquet -> {parquet_dir}/")
for csv_path in sorted(self.output_dir.glob("*.csv")):
table_name = csv_path.stem
schema = TABLE_SCHEMAS.get(table_name, {})
parquet_path = parquet_dir / f"{table_name}.parquet"
result = manager.csv_to_parquet(
csv_path=csv_path,
parquet_path=parquet_path,
dtypes=schema.get("dtypes"),
parse_dates=schema.get("parse_dates"),
date_columns=schema.get("date_columns"),
table_id=f"sample.{table_name}",
)
logger.info(
f" {table_name}: {result['rows']:,} rows, "
f"{result['parquet_size_bytes'] / 1024:.0f} KB "
f"({result['compression_ratio']:.1f}x compression)"
)
# ── Orchestration ────────────────────────────────────────── # ── Orchestration ──────────────────────────────────────────
def run(self) -> dict[str, Any]: def run(self) -> dict[str, Any]:
@ -853,11 +942,20 @@ class SampleDataGenerator:
self.output_dir.mkdir(parents=True, exist_ok=True) self.output_dir.mkdir(parents=True, exist_ok=True)
t0 = time.time() t0 = time.time()
logger.info(f"Generating sample data (size: {self.size_name})") fmt_label = self.output_format.upper()
logger.info(f"Generating sample data (size: {self.size_name}, format: {fmt_label})")
logger.info(f" Period: {self.start_date} to {self.end_date} " logger.info(f" Period: {self.start_date} to {self.end_date} "
f"({self.cfg['months']} months)") f"({self.cfg['months']} months)")
logger.info(f" Output: {self.output_dir}/") logger.info(f" Output: {self.output_dir}/")
# Phase 1: Generate CSVs (always needed as intermediate)
csv_dir = self.output_dir
if self.output_format == "parquet":
# CSVs go to a temp subdir, only Parquet files in output
csv_dir = self.output_dir / "_csv_tmp"
csv_dir.mkdir(parents=True, exist_ok=True)
self.output_dir = csv_dir # temporarily redirect CSV writes
self._generate_customers() self._generate_customers()
self._generate_products() self._generate_products()
self._generate_campaigns() self._generate_campaigns()
@ -867,12 +965,25 @@ class SampleDataGenerator:
self._generate_payments() self._generate_payments()
self._generate_support_tickets() self._generate_support_tickets()
# Phase 2: Convert to Parquet if requested
if self.output_format == "parquet":
parquet_dir = csv_dir.parent # the original output_dir
self._convert_to_parquet(parquet_dir)
# Clean up temp CSVs
import shutil
shutil.rmtree(csv_dir)
self.output_dir = parquet_dir # restore for manifest
elif self.output_format == "both":
parquet_dir = self.output_dir / "parquet"
self._convert_to_parquet(parquet_dir)
elapsed = time.time() - t0 elapsed = time.time() - t0
total_rows = sum(self.row_counts.values()) total_rows = sum(self.row_counts.values())
manifest = { manifest = {
"generator": "generate_sample_data.py", "generator": "generate_sample_data.py",
"size": self.size_name, "size": self.size_name,
"format": self.output_format,
"seed": self.rng.getstate()[1][0], "seed": self.rng.getstate()[1][0],
"date_range": { "date_range": {
"start": str(self.start_date), "start": str(self.start_date),
@ -911,6 +1022,10 @@ def main() -> None:
"--seed", type=int, default=42, "--seed", type=int, default=42,
help="Random seed for reproducibility (default: 42)", help="Random seed for reproducibility (default: 42)",
) )
parser.add_argument(
"--format", choices=["csv", "parquet", "both"], default="csv",
help="Output format: csv, parquet (via ParquetManager), or both (default: csv)",
)
parser.add_argument( parser.add_argument(
"--list-sizes", action="store_true", "--list-sizes", action="store_true",
help="Show available size presets and exit", help="Show available size presets and exit",
@ -932,7 +1047,10 @@ def main() -> None:
print() print()
return return
gen = SampleDataGenerator(size=args.size, seed=args.seed, output_dir=args.output) gen = SampleDataGenerator(
size=args.size, seed=args.seed,
output_dir=args.output, output_format=args.format,
)
gen.run() gen.run()

View file

@ -171,3 +171,64 @@ class TestDeterminism:
content1 = (dir1 / "customers.csv").read_text() content1 = (dir1 / "customers.csv").read_text()
content2 = (dir2 / "customers.csv").read_text() content2 = (dir2 / "customers.csv").read_text()
assert content1 != content2 assert content1 != content2
class TestParquetFormat:
"""Test Parquet output format using project's ParquetManager."""
def test_parquet_format_creates_parquet_files(self, tmp_path: Path):
"""--format parquet should produce .parquet files, no CSVs."""
out = tmp_path / "parquet_out"
gen = SampleDataGenerator(
size="xs", seed=42, output_dir=out, output_format="parquet",
)
gen.run()
parquet_files = {p.stem for p in out.glob("*.parquet")}
csv_files = list(out.glob("*.csv"))
expected = {
"customers", "products", "campaigns", "web_sessions",
"web_leads", "orders", "order_items", "payments",
"support_tickets",
}
assert expected == parquet_files
assert csv_files == [], "CSV files should be cleaned up in parquet mode"
def test_parquet_has_correct_types(self, tmp_path: Path):
"""Parquet files should have proper column types from ParquetManager."""
import duckdb
out = tmp_path / "typed"
gen = SampleDataGenerator(
size="xs", seed=42, output_dir=out, output_format="parquet",
)
gen.run()
con = duckdb.connect()
# orders.created_at should be TIMESTAMP, not VARCHAR
schema = con.execute(
f"DESCRIBE SELECT * FROM read_parquet('{out}/orders.parquet')"
).fetchall()
col_types = {row[0]: row[1] for row in schema}
assert col_types["created_at"] == "TIMESTAMP"
assert col_types["total_amount"] == "DOUBLE"
# customers.registration_date should be DATE
schema = con.execute(
f"DESCRIBE SELECT * FROM read_parquet('{out}/customers.parquet')"
).fetchall()
col_types = {row[0]: row[1] for row in schema}
assert col_types["registration_date"] == "DATE"
def test_both_format_creates_csv_and_parquet(self, tmp_path: Path):
"""--format both should produce CSVs + parquet/ subdirectory."""
out = tmp_path / "both_out"
gen = SampleDataGenerator(
size="xs", seed=42, output_dir=out, output_format="both",
)
gen.run()
csv_files = list(out.glob("*.csv"))
parquet_files = list((out / "parquet").glob("*.parquet"))
assert len(csv_files) == 9
assert len(parquet_files) == 9