From 302494b63274a6162da3f4a9afb511fadad90486 Mon Sep 17 00:00:00 2001 From: Petr Date: Tue, 10 Mar 2026 21:46:20 +0100 Subject: [PATCH] Add --format parquet using project's ParquetManager Generator now supports --format {csv,parquet,both}. Parquet mode uses src.parquet_manager.ParquetManager for snappy compression, proper column types (DATE, TIMESTAMP, DOUBLE), and metadata. No more ad-hoc pandas conversion needed on the server. --- docs/auto-install.md | 24 +----- docs/sample-data.md | 27 ++---- scripts/generate_sample_data.py | 132 +++++++++++++++++++++++++++-- tests/test_generate_sample_data.py | 61 +++++++++++++ 4 files changed, 196 insertions(+), 48 deletions(-) diff --git a/docs/auto-install.md b/docs/auto-install.md index 7201d7e..f9e820b 100644 --- a/docs/auto-install.md +++ b/docs/auto-install.md @@ -352,32 +352,14 @@ cd /opt/data-analyst/repo # Install generator dependency /opt/data-analyst/.venv/bin/pip install faker -# Generate synthetic e-commerce data (size m: ~20K orders, 100K sessions) +# Generate Parquet files directly (uses project's ParquetManager +# for snappy compression, proper types, and metadata embedding) /opt/data-analyst/.venv/bin/python scripts/generate_sample_data.py \ - --size m --output /tmp/sample_csv --seed 42 - -# Convert CSVs to Parquet and deploy to data directory -/opt/data-analyst/.venv/bin/python -c " -import pandas as pd -from pathlib import Path - -csv_dir = Path('/tmp/sample_csv') -parquet_dir = Path('/data/src_data/parquet') -parquet_dir.mkdir(parents=True, exist_ok=True) - -for f in sorted(csv_dir.glob('*.csv')): - df = pd.read_csv(f) - out = parquet_dir / f'{f.stem}.parquet' - df.to_parquet(out, index=False) - print(f' {f.stem}: {len(df):,} rows -> {out}') -" + --size m --format parquet --output /data/src_data/parquet --seed 42 # Set correct permissions chown -R root:data-ops /data/src_data/parquet chmod -R 2775 /data/src_data/parquet - -# Clean up temporary CSVs -rm -rf /tmp/sample_csv ``` Available sizes: `xs` (50 customers, ~1 MB), `s` (500, ~15 MB), `m` (5K, ~150 MB), `l` (50K, ~1.5 GB). diff --git a/docs/sample-data.md b/docs/sample-data.md index 77dcd34..1670649 100644 --- a/docs/sample-data.md +++ b/docs/sample-data.md @@ -172,25 +172,12 @@ To use sample data on a deployed server (instead of connecting a data adapter): # On the server cd /opt/data-analyst/repo -# Generate CSVs -.venv/bin/python scripts/generate_sample_data.py --size m --output /tmp/sample_csv +# Generate Parquet files directly using project's ParquetManager +# (snappy compression, proper column types, metadata embedding) +/opt/data-analyst/.venv/bin/python scripts/generate_sample_data.py \ + --size m --format parquet --output /data/src_data/parquet --seed 42 -# Convert to Parquet and deploy -.venv/bin/python -c " -import pandas as pd -from pathlib import Path - -csv_dir = Path('/tmp/sample_csv') -parquet_dir = Path('/data/src_data/parquet') -parquet_dir.mkdir(parents=True, exist_ok=True) - -for f in sorted(csv_dir.glob('*.csv')): - df = pd.read_csv(f) - out = parquet_dir / f'{f.stem}.parquet' - df.to_parquet(out, index=False) - print(f' {f.stem}: {len(df):,} rows -> {out}') -" - -# Clean up CSVs -rm -rf /tmp/sample_csv +# Set correct permissions +chown -R root:data-ops /data/src_data/parquet +chmod -R 2775 /data/src_data/parquet ``` diff --git a/scripts/generate_sample_data.py b/scripts/generate_sample_data.py index 8f5294b..572079e 100644 --- a/scripts/generate_sample_data.py +++ b/scripts/generate_sample_data.py @@ -2,13 +2,13 @@ """ Sample data generator for AI Data Analyst demo and testing. -Generates realistic synthetic e-commerce + marketing data as CSV files. +Generates realistic synthetic e-commerce + marketing data as CSV or Parquet. Tables: customers, products, campaigns, web_sessions, web_leads, orders, order_items, payments, support_tickets Usage: - python scripts/generate_sample_data.py --size xs --output data/sample - python scripts/generate_sample_data.py --size m --seed 42 + python scripts/generate_sample_data.py --size s --output data/sample + python scripts/generate_sample_data.py --size m --format parquet --output /data/src_data/parquet python scripts/generate_sample_data.py --list-sizes """ @@ -278,19 +278,75 @@ TICKET_CHANNELS = [ ("email", 0.40), ("chat", 0.30), ("phone", 0.15), ("web_form", 0.15), ] +# ── Parquet schema definitions (used by ParquetManager) ──────────────── + +TABLE_SCHEMAS = { + "customers": { + "dtypes": {"is_active": "Int64"}, + "date_columns": ["registration_date"], + }, + "products": { + "dtypes": { + "price": "float64", "cost": "float64", + "weight_kg": "float64", "is_active": "Int64", + }, + "date_columns": ["created_at"], + }, + "campaigns": { + "dtypes": { + "budget": "float64", "spend": "float64", + "impressions": "Int64", "clicks": "Int64", + }, + "date_columns": ["start_date", "end_date"], + }, + "web_sessions": { + "dtypes": { + "duration_seconds": "Int64", "pages_viewed": "Int64", + "is_bounce": "Int64", + }, + "parse_dates": ["started_at"], + }, + "web_leads": { + "parse_dates": ["created_at", "converted_at"], + }, + "orders": { + "dtypes": { + "items_total": "float64", "discount_amount": "float64", + "shipping_amount": "float64", "total_amount": "float64", + }, + "parse_dates": ["created_at"], + }, + "order_items": { + "dtypes": { + "quantity": "Int64", "unit_price": "float64", + "discount_percent": "Int64", "line_total": "float64", + }, + }, + "payments": { + "dtypes": {"amount": "float64"}, + "parse_dates": ["created_at", "completed_at"], + }, + "support_tickets": { + "dtypes": {"satisfaction_score": "Int64"}, + "parse_dates": ["created_at", "first_response_at", "resolved_at"], + }, +} + # ── Generator ────────────────────────────────────────────────────────── class SampleDataGenerator: - """Generates realistic synthetic e-commerce data as CSV files.""" + """Generates realistic synthetic e-commerce data as CSV or Parquet.""" - def __init__(self, size: str, seed: int, output_dir: Path): + def __init__(self, size: str, seed: int, output_dir: Path, + output_format: str = "csv"): self.cfg = SIZE_CONFIGS[size] self.size_name = size self.rng = random.Random(seed) self.fake = Faker(["en_US", "de_DE", "cs_CZ", "fr_FR"]) Faker.seed(seed) self.output_dir = output_dir + self.output_format = output_format # "csv", "parquet", or "both" self.row_counts: dict[str, int] = {} # Time range @@ -846,6 +902,39 @@ class SampleDataGenerator: self._write_table("support_tickets", fields, rows) + # ── Parquet conversion ───────────────────────────────────── + + def _convert_to_parquet(self, parquet_dir: Path) -> None: + """Convert generated CSVs to Parquet using project's ParquetManager.""" + # Ensure project root is importable (script may run from any cwd) + project_root = Path(__file__).resolve().parent.parent + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + from src.parquet_manager import create_parquet_manager + + manager = create_parquet_manager() + parquet_dir.mkdir(parents=True, exist_ok=True) + logger.info(f" Converting to Parquet -> {parquet_dir}/") + + for csv_path in sorted(self.output_dir.glob("*.csv")): + table_name = csv_path.stem + schema = TABLE_SCHEMAS.get(table_name, {}) + parquet_path = parquet_dir / f"{table_name}.parquet" + + result = manager.csv_to_parquet( + csv_path=csv_path, + parquet_path=parquet_path, + dtypes=schema.get("dtypes"), + parse_dates=schema.get("parse_dates"), + date_columns=schema.get("date_columns"), + table_id=f"sample.{table_name}", + ) + logger.info( + f" {table_name}: {result['rows']:,} rows, " + f"{result['parquet_size_bytes'] / 1024:.0f} KB " + f"({result['compression_ratio']:.1f}x compression)" + ) + # ── Orchestration ────────────────────────────────────────── def run(self) -> dict[str, Any]: @@ -853,11 +942,20 @@ class SampleDataGenerator: self.output_dir.mkdir(parents=True, exist_ok=True) t0 = time.time() - logger.info(f"Generating sample data (size: {self.size_name})") + fmt_label = self.output_format.upper() + logger.info(f"Generating sample data (size: {self.size_name}, format: {fmt_label})") logger.info(f" Period: {self.start_date} to {self.end_date} " f"({self.cfg['months']} months)") logger.info(f" Output: {self.output_dir}/") + # Phase 1: Generate CSVs (always needed as intermediate) + csv_dir = self.output_dir + if self.output_format == "parquet": + # CSVs go to a temp subdir, only Parquet files in output + csv_dir = self.output_dir / "_csv_tmp" + csv_dir.mkdir(parents=True, exist_ok=True) + self.output_dir = csv_dir # temporarily redirect CSV writes + self._generate_customers() self._generate_products() self._generate_campaigns() @@ -867,12 +965,25 @@ class SampleDataGenerator: self._generate_payments() self._generate_support_tickets() + # Phase 2: Convert to Parquet if requested + if self.output_format == "parquet": + parquet_dir = csv_dir.parent # the original output_dir + self._convert_to_parquet(parquet_dir) + # Clean up temp CSVs + import shutil + shutil.rmtree(csv_dir) + self.output_dir = parquet_dir # restore for manifest + elif self.output_format == "both": + parquet_dir = self.output_dir / "parquet" + self._convert_to_parquet(parquet_dir) + elapsed = time.time() - t0 total_rows = sum(self.row_counts.values()) manifest = { "generator": "generate_sample_data.py", "size": self.size_name, + "format": self.output_format, "seed": self.rng.getstate()[1][0], "date_range": { "start": str(self.start_date), @@ -911,6 +1022,10 @@ def main() -> None: "--seed", type=int, default=42, help="Random seed for reproducibility (default: 42)", ) + parser.add_argument( + "--format", choices=["csv", "parquet", "both"], default="csv", + help="Output format: csv, parquet (via ParquetManager), or both (default: csv)", + ) parser.add_argument( "--list-sizes", action="store_true", help="Show available size presets and exit", @@ -932,7 +1047,10 @@ def main() -> None: print() return - gen = SampleDataGenerator(size=args.size, seed=args.seed, output_dir=args.output) + gen = SampleDataGenerator( + size=args.size, seed=args.seed, + output_dir=args.output, output_format=args.format, + ) gen.run() diff --git a/tests/test_generate_sample_data.py b/tests/test_generate_sample_data.py index 45597de..e55fbcf 100644 --- a/tests/test_generate_sample_data.py +++ b/tests/test_generate_sample_data.py @@ -171,3 +171,64 @@ class TestDeterminism: content1 = (dir1 / "customers.csv").read_text() content2 = (dir2 / "customers.csv").read_text() assert content1 != content2 + + +class TestParquetFormat: + """Test Parquet output format using project's ParquetManager.""" + + def test_parquet_format_creates_parquet_files(self, tmp_path: Path): + """--format parquet should produce .parquet files, no CSVs.""" + out = tmp_path / "parquet_out" + gen = SampleDataGenerator( + size="xs", seed=42, output_dir=out, output_format="parquet", + ) + gen.run() + + parquet_files = {p.stem for p in out.glob("*.parquet")} + csv_files = list(out.glob("*.csv")) + expected = { + "customers", "products", "campaigns", "web_sessions", + "web_leads", "orders", "order_items", "payments", + "support_tickets", + } + assert expected == parquet_files + assert csv_files == [], "CSV files should be cleaned up in parquet mode" + + def test_parquet_has_correct_types(self, tmp_path: Path): + """Parquet files should have proper column types from ParquetManager.""" + import duckdb + + out = tmp_path / "typed" + gen = SampleDataGenerator( + size="xs", seed=42, output_dir=out, output_format="parquet", + ) + gen.run() + + con = duckdb.connect() + # orders.created_at should be TIMESTAMP, not VARCHAR + schema = con.execute( + f"DESCRIBE SELECT * FROM read_parquet('{out}/orders.parquet')" + ).fetchall() + col_types = {row[0]: row[1] for row in schema} + assert col_types["created_at"] == "TIMESTAMP" + assert col_types["total_amount"] == "DOUBLE" + + # customers.registration_date should be DATE + schema = con.execute( + f"DESCRIBE SELECT * FROM read_parquet('{out}/customers.parquet')" + ).fetchall() + col_types = {row[0]: row[1] for row in schema} + assert col_types["registration_date"] == "DATE" + + def test_both_format_creates_csv_and_parquet(self, tmp_path: Path): + """--format both should produce CSVs + parquet/ subdirectory.""" + out = tmp_path / "both_out" + gen = SampleDataGenerator( + size="xs", seed=42, output_dir=out, output_format="both", + ) + gen.run() + + csv_files = list(out.glob("*.csv")) + parquet_files = list((out / "parquet").glob("*.parquet")) + assert len(csv_files) == 9 + assert len(parquet_files) == 9