fix: Jira extract_init handles empty parquet dirs gracefully
DuckDB read_parquet glob fails when no files match. Skip view creation for tables without parquet files, create views only after first write.
This commit is contained in:
parent
8bc1fceb52
commit
e2a7ee21a2
1 changed files with 11 additions and 8 deletions
|
|
@ -46,18 +46,16 @@ def init_extract(output_dir: str | Path) -> None:
|
||||||
table_dir = data_dir / table_name
|
table_dir = data_dir / table_name
|
||||||
table_dir.mkdir(exist_ok=True)
|
table_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
# Create view that reads all parquet files in the table directory
|
# Create view only if parquet files exist (DuckDB glob fails on empty dirs)
|
||||||
glob_path = str(table_dir / "*.parquet")
|
|
||||||
conn.execute(
|
|
||||||
f'CREATE OR REPLACE VIEW "{table_name}" AS '
|
|
||||||
f"SELECT * FROM read_parquet('{glob_path}', union_by_name=true, hive_partitioning=false)"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Count existing rows if any parquets exist
|
|
||||||
rows = 0
|
rows = 0
|
||||||
size_bytes = 0
|
size_bytes = 0
|
||||||
parquets = list(table_dir.glob("*.parquet"))
|
parquets = list(table_dir.glob("*.parquet"))
|
||||||
if parquets:
|
if parquets:
|
||||||
|
glob_path = str(table_dir / "*.parquet")
|
||||||
|
conn.execute(
|
||||||
|
f'CREATE OR REPLACE VIEW "{table_name}" AS '
|
||||||
|
f"SELECT * FROM read_parquet('{glob_path}', union_by_name=true, hive_partitioning=false)"
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
rows = conn.execute(f'SELECT count(*) FROM "{table_name}"').fetchone()[0]
|
rows = conn.execute(f'SELECT count(*) FROM "{table_name}"').fetchone()[0]
|
||||||
size_bytes = sum(f.stat().st_size for f in parquets)
|
size_bytes = sum(f.stat().st_size for f in parquets)
|
||||||
|
|
@ -96,6 +94,11 @@ def update_meta(output_dir: str | Path, table_name: str) -> None:
|
||||||
if parquets:
|
if parquets:
|
||||||
try:
|
try:
|
||||||
glob_path = str(table_dir / "*.parquet")
|
glob_path = str(table_dir / "*.parquet")
|
||||||
|
# Recreate view to pick up new/changed parquet files
|
||||||
|
conn.execute(
|
||||||
|
f'CREATE OR REPLACE VIEW "{table_name}" AS '
|
||||||
|
f"SELECT * FROM read_parquet('{glob_path}', union_by_name=true, hive_partitioning=false)"
|
||||||
|
)
|
||||||
rows = conn.execute(f"SELECT count(*) FROM read_parquet('{glob_path}', union_by_name=true)").fetchone()[0]
|
rows = conn.execute(f"SELECT count(*) FROM read_parquet('{glob_path}', union_by_name=true)").fetchone()[0]
|
||||||
size_bytes = sum(f.stat().st_size for f in parquets)
|
size_bytes = sum(f.stat().st_size for f in parquets)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue