diff --git a/connectors/keboola/extractor.py b/connectors/keboola/extractor.py index 40d172a..629a5d5 100644 --- a/connectors/keboola/extractor.py +++ b/connectors/keboola/extractor.py @@ -146,13 +146,22 @@ def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str, pass finally: + conn.execute("CHECKPOINT") conn.close() - # Atomic replace: swap temp DB into place + # Atomic replace: swap temp DB into place, cleaning up any WAL files import shutil + old_wal = Path(str(db_path) + ".wal") + if old_wal.exists(): + old_wal.unlink() + if tmp_db_path.exists(): shutil.move(str(tmp_db_path), str(db_path)) + tmp_wal = Path(str(tmp_db_path) + ".wal") + if tmp_wal.exists(): + tmp_wal.unlink() + return stats diff --git a/src/orchestrator.py b/src/orchestrator.py index e8e7e78..6bd3efc 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -42,6 +42,27 @@ def _validate_identifier(name: str, context: str) -> bool: return True +def _atomic_swap_db(tmp_path: str, target_path: str) -> None: + """Atomically replace target DuckDB file, cleaning up WAL files.""" + import shutil + target = Path(target_path) + tmp = Path(tmp_path) + + # Remove old WAL file if it exists + old_wal = Path(str(target) + ".wal") + if old_wal.exists(): + old_wal.unlink() + + # Move temp DB into place + if tmp.exists(): + shutil.move(str(tmp), str(target)) + + # Clean up temp WAL + tmp_wal = Path(str(tmp) + ".wal") + if tmp_wal.exists(): + tmp_wal.unlink() + + def _get_extracts_dir() -> Path: data_dir = Path(os.environ.get("DATA_DIR", "./data")) return data_dir / "extracts" @@ -118,12 +139,11 @@ class SyncOrchestrator: result[ext_dir.name] = tables logger.info("Attached %s: %d tables", ext_dir.name, len(tables)) finally: + conn.execute("CHECKPOINT") conn.close() # Atomic swap: replace analytics.duckdb with new version - import shutil - if Path(tmp_path).exists(): - shutil.move(tmp_path, self._db_path) + _atomic_swap_db(tmp_path, self._db_path) return result diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py index 8a903eb..d54507a 100644 --- a/tests/test_orchestrator.py +++ b/tests/test_orchestrator.py @@ -333,6 +333,32 @@ class TestSyncOrchestrator: # The safe source must still be processed assert "keboola" in result + def test_rebuild_cleans_wal_files(self, setup_env): + """No .wal files should remain after rebuild completes.""" + from src.orchestrator import SyncOrchestrator + + _create_mock_extract( + setup_env["extracts_dir"], + "keboola", + [{"name": "orders", "data": [{"id": "1", "total": "100"}]}], + ) + + analytics_db = setup_env["analytics_db"] + orch = SyncOrchestrator(analytics_db_path=analytics_db) + + # Simulate a pre-existing WAL file for the target analytics DB + wal_path = Path(analytics_db + ".wal") + wal_path.write_text("stale wal") + assert wal_path.exists(), "Pre-condition: WAL file should exist before rebuild" + + orch.rebuild() + + # After rebuild, no WAL files should remain alongside the analytics DB + assert not wal_path.exists(), "Old WAL file must be removed during atomic swap" + # Also verify no temp WAL was left behind + tmp_wal = Path(analytics_db + ".tmp.wal") + assert not tmp_wal.exists(), "Temp WAL file must be cleaned up" + def test_rejects_malicious_table_name(self, setup_env): """Tables with SQL injection names in _meta must be skipped; safe tables still work.""" from src.orchestrator import SyncOrchestrator