fix: handle WAL files in atomic swap to prevent DB corruption
Add _atomic_swap_db helper that removes stale WAL files before and after moving the temp DuckDB into place. Apply CHECKPOINT before close in both orchestrator and Keboola extractor so DuckDB flushes WAL before the swap.
This commit is contained in:
parent
3321d2e266
commit
e425d4baa5
3 changed files with 59 additions and 4 deletions
|
|
@ -146,13 +146,22 @@ def run(output_dir: str, table_configs: List[Dict[str, Any]], keboola_url: str,
|
|||
pass
|
||||
|
||||
finally:
|
||||
conn.execute("CHECKPOINT")
|
||||
conn.close()
|
||||
|
||||
# Atomic replace: swap temp DB into place
|
||||
# Atomic replace: swap temp DB into place, cleaning up any WAL files
|
||||
import shutil
|
||||
old_wal = Path(str(db_path) + ".wal")
|
||||
if old_wal.exists():
|
||||
old_wal.unlink()
|
||||
|
||||
if tmp_db_path.exists():
|
||||
shutil.move(str(tmp_db_path), str(db_path))
|
||||
|
||||
tmp_wal = Path(str(tmp_db_path) + ".wal")
|
||||
if tmp_wal.exists():
|
||||
tmp_wal.unlink()
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -42,6 +42,27 @@ def _validate_identifier(name: str, context: str) -> bool:
|
|||
return True
|
||||
|
||||
|
||||
def _atomic_swap_db(tmp_path: str, target_path: str) -> None:
|
||||
"""Atomically replace target DuckDB file, cleaning up WAL files."""
|
||||
import shutil
|
||||
target = Path(target_path)
|
||||
tmp = Path(tmp_path)
|
||||
|
||||
# Remove old WAL file if it exists
|
||||
old_wal = Path(str(target) + ".wal")
|
||||
if old_wal.exists():
|
||||
old_wal.unlink()
|
||||
|
||||
# Move temp DB into place
|
||||
if tmp.exists():
|
||||
shutil.move(str(tmp), str(target))
|
||||
|
||||
# Clean up temp WAL
|
||||
tmp_wal = Path(str(tmp) + ".wal")
|
||||
if tmp_wal.exists():
|
||||
tmp_wal.unlink()
|
||||
|
||||
|
||||
def _get_extracts_dir() -> Path:
|
||||
data_dir = Path(os.environ.get("DATA_DIR", "./data"))
|
||||
return data_dir / "extracts"
|
||||
|
|
@ -118,12 +139,11 @@ class SyncOrchestrator:
|
|||
result[ext_dir.name] = tables
|
||||
logger.info("Attached %s: %d tables", ext_dir.name, len(tables))
|
||||
finally:
|
||||
conn.execute("CHECKPOINT")
|
||||
conn.close()
|
||||
|
||||
# Atomic swap: replace analytics.duckdb with new version
|
||||
import shutil
|
||||
if Path(tmp_path).exists():
|
||||
shutil.move(tmp_path, self._db_path)
|
||||
_atomic_swap_db(tmp_path, self._db_path)
|
||||
|
||||
return result
|
||||
|
||||
|
|
|
|||
|
|
@ -333,6 +333,32 @@ class TestSyncOrchestrator:
|
|||
# The safe source must still be processed
|
||||
assert "keboola" in result
|
||||
|
||||
def test_rebuild_cleans_wal_files(self, setup_env):
|
||||
"""No .wal files should remain after rebuild completes."""
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
|
||||
_create_mock_extract(
|
||||
setup_env["extracts_dir"],
|
||||
"keboola",
|
||||
[{"name": "orders", "data": [{"id": "1", "total": "100"}]}],
|
||||
)
|
||||
|
||||
analytics_db = setup_env["analytics_db"]
|
||||
orch = SyncOrchestrator(analytics_db_path=analytics_db)
|
||||
|
||||
# Simulate a pre-existing WAL file for the target analytics DB
|
||||
wal_path = Path(analytics_db + ".wal")
|
||||
wal_path.write_text("stale wal")
|
||||
assert wal_path.exists(), "Pre-condition: WAL file should exist before rebuild"
|
||||
|
||||
orch.rebuild()
|
||||
|
||||
# After rebuild, no WAL files should remain alongside the analytics DB
|
||||
assert not wal_path.exists(), "Old WAL file must be removed during atomic swap"
|
||||
# Also verify no temp WAL was left behind
|
||||
tmp_wal = Path(analytics_db + ".tmp.wal")
|
||||
assert not tmp_wal.exists(), "Temp WAL file must be cleaned up"
|
||||
|
||||
def test_rejects_malicious_table_name(self, setup_env):
|
||||
"""Tables with SQL injection names in _meta must be skipped; safe tables still work."""
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
|
|
|
|||
Loading…
Reference in a new issue