Update tests

This commit is contained in:
2026-02-04 19:31:29 -08:00
parent 5dc9235992
commit ee1df4cb21
5 changed files with 1386 additions and 1126 deletions

View File

@@ -3,354 +3,374 @@ Integration tests for the full backup workflow.
Tests the complete backup process from start to finish.
"""
import os
import tempfile
import time
from unittest import TestCase
import pytest
from curateipsum import backup as bk
class TestFullBackupWorkflow(TestCase):
"""Integration tests for complete backup workflow."""
@pytest.fixture
def integration_dirs(tmp_path):
"""Setup integration test directories."""
backups_dir = tmp_path / "backups"
source_dir = tmp_path / "source"
backups_dir.mkdir()
source_dir.mkdir()
return backups_dir, source_dir
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory(prefix="test_integration_")
self.backups_dir = os.path.join(self.temp_dir.name, "backups")
self.source_dir = os.path.join(self.temp_dir.name, "source")
os.makedirs(self.backups_dir)
os.makedirs(self.source_dir)
def tearDown(self) -> None:
self.temp_dir.cleanup()
def test_initial_backup_creation(integration_dirs):
"""Test creating the first backup"""
backups_dir, source_dir = integration_dirs
def test_initial_backup_creation(self):
"""Test creating the first backup"""
# Create some files in source
with open(os.path.join(self.source_dir, "file1.txt"), "w") as f:
f.write("content1")
with open(os.path.join(self.source_dir, "file2.txt"), "w") as f:
f.write("content2")
# Create some files in source
(source_dir / "file1.txt").write_text("content1")
(source_dir / "file2.txt").write_text("content2")
# Run backup
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
# Run backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify backup was created
backups = os.listdir(self.backups_dir)
# Filter out lock files
backups = [b for b in backups if not b.startswith(".")]
self.assertEqual(len(backups), 1)
# Verify backup was created
backups = os.listdir(str(backups_dir))
# Filter out lock files
backups = [b for b in backups if not b.startswith(".")]
assert len(backups) == 1
# Verify files exist in backup
backup_path = os.path.join(self.backups_dir, backups[0])
source_name = os.path.basename(self.source_dir)
backup_file1 = os.path.join(backup_path, source_name, "file1.txt")
backup_file2 = os.path.join(backup_path, source_name, "file2.txt")
# Verify files exist in backup
backup_path = os.path.join(str(backups_dir), backups[0])
source_name = os.path.basename(str(source_dir))
backup_file1 = os.path.join(backup_path, source_name, "file1.txt")
backup_file2 = os.path.join(backup_path, source_name, "file2.txt")
self.assertTrue(os.path.exists(backup_file1))
self.assertTrue(os.path.exists(backup_file2))
assert os.path.exists(backup_file1)
assert os.path.exists(backup_file2)
# Verify backup marker exists
marker_files = [f for f in os.listdir(backup_path) if f.startswith(".backup_finished")]
self.assertEqual(len(marker_files), 1)
# Verify backup marker exists
marker_files = [f for f in os.listdir(backup_path)
if f.startswith(".backup_finished")]
assert len(marker_files) == 1
def test_incremental_backup_with_hardlinks(self):
"""Test that second backup uses hardlinks for unchanged files"""
# Create initial file
src_file = os.path.join(self.source_dir, "unchanged.txt")
with open(src_file, "w") as f:
f.write("unchanged content")
# First backup
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
def test_incremental_backup_with_hardlinks(integration_dirs):
"""Test that second backup uses hardlinks for unchanged files"""
backups_dir, source_dir = integration_dirs
# Wait a bit to ensure different timestamp
time.sleep(1.1)
# Create initial file
src_file = source_dir / "unchanged.txt"
src_file.write_text("unchanged content")
# Second backup (no changes)
# Add a new file to trigger a new backup
with open(os.path.join(self.source_dir, "new.txt"), "w") as f:
f.write("new content")
# First backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Wait a bit to ensure different timestamp
time.sleep(1.1)
# Second backup (no changes)
# Add a new file to trigger a new backup
(source_dir / "new.txt").write_text("new content")
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify two backups exist
backups = sorted([b for b in os.listdir(str(backups_dir))
if not b.startswith(".")])
assert len(backups) == 2
# Verify unchanged file is hardlinked
source_name = os.path.basename(str(source_dir))
file1_path = os.path.join(str(backups_dir), backups[0],
source_name, "unchanged.txt")
file2_path = os.path.join(str(backups_dir), backups[1],
source_name, "unchanged.txt")
stat1 = os.stat(file1_path)
stat2 = os.stat(file2_path)
# Same inode means hardlinked
assert stat1.st_ino == stat2.st_ino
# Link count should be 2
assert stat1.st_nlink == 2
def test_backup_delta_directory(integration_dirs):
"""Test that delta directory contains changed files"""
backups_dir, source_dir = integration_dirs
# Create initial file
(source_dir / "file.txt").write_text("original")
# First backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Modify file
(source_dir / "file.txt").write_text("modified")
# Second backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Check delta directory in second backup
backups = sorted([b for b in os.listdir(str(backups_dir))
if not b.startswith(".")])
second_backup = backups[1]
delta_dir = os.path.join(str(backups_dir), second_backup, bk.DELTA_DIR)
# Delta directory should exist and contain the modified file
assert os.path.isdir(delta_dir)
source_name = os.path.basename(str(source_dir))
delta_file = os.path.join(delta_dir, source_name, "file.txt")
assert os.path.exists(delta_file)
def test_cleanup_retains_recent_backups(integration_dirs):
"""Test that cleanup doesn't remove recent backups"""
backups_dir, source_dir = integration_dirs
# Create multiple backups
for i in range(3):
(source_dir / f"file{i}.txt").write_text(f"content {i}")
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
# Verify two backups exist
backups = sorted([b for b in os.listdir(self.backups_dir) if not b.startswith(".")])
self.assertEqual(len(backups), 2)
# Verify unchanged file is hardlinked
source_name = os.path.basename(self.source_dir)
file1_path = os.path.join(self.backups_dir, backups[0], source_name, "unchanged.txt")
file2_path = os.path.join(self.backups_dir, backups[1], source_name, "unchanged.txt")
stat1 = os.stat(file1_path)
stat2 = os.stat(file2_path)
# Same inode means hardlinked
self.assertEqual(stat1.st_ino, stat2.st_ino)
# Link count should be 2
self.assertEqual(stat1.st_nlink, 2)
def test_backup_delta_directory(self):
"""Test that delta directory contains changed files"""
# Create initial file
with open(os.path.join(self.source_dir, "file.txt"), "w") as f:
f.write("original")
# First backup
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
time.sleep(1.1)
# Modify file
with open(os.path.join(self.source_dir, "file.txt"), "w") as f:
f.write("modified")
# Second backup
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
# Check delta directory in second backup
backups = sorted([b for b in os.listdir(self.backups_dir) if not b.startswith(".")])
second_backup = backups[1]
delta_dir = os.path.join(self.backups_dir, second_backup, bk.DELTA_DIR)
# Delta directory should exist and contain the modified file
self.assertTrue(os.path.isdir(delta_dir))
source_name = os.path.basename(self.source_dir)
delta_file = os.path.join(delta_dir, source_name, "file.txt")
self.assertTrue(os.path.exists(delta_file))
def test_cleanup_retains_recent_backups(self):
"""Test that cleanup doesn't remove recent backups"""
# Create multiple backups
for i in range(3):
with open(os.path.join(self.source_dir, f"file{i}.txt"), "w") as f:
f.write(f"content {i}")
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
time.sleep(1.1)
# Run cleanup with keep_all=10 (all should be kept)
bk.cleanup_old_backups(
backups_dir=self.backups_dir,
dry_run=False,
keep_all=10
)
# All backups should still exist
backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")]
self.assertEqual(len(backups), 3)
def test_dry_run_creates_no_backup(self):
"""Test that dry run doesn't create actual backup"""
with open(os.path.join(self.source_dir, "file.txt"), "w") as f:
f.write("content")
# Dry run backup
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=True
)
# No backup should be created
backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")]
self.assertEqual(len(backups), 0)
def test_no_backup_if_no_changes(self):
"""Test that no backup is created if nothing changed"""
# Create initial file
with open(os.path.join(self.source_dir, "file.txt"), "w") as f:
f.write("content")
# First backup
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
time.sleep(1.1)
# Second backup with no changes
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
# Only one backup should exist
backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")]
self.assertEqual(len(backups), 1)
def test_lock_prevents_concurrent_backups(self):
"""Test that lock file prevents concurrent backup runs"""
with open(os.path.join(self.source_dir, "file.txt"), "w") as f:
f.write("content")
# Manually create lock file
lock_acquired = bk.set_backups_lock(self.backups_dir)
self.assertTrue(lock_acquired)
try:
# Try to run backup (should be blocked by lock)
# We can't actually test this without spawning a subprocess,
# but we can verify the lock exists
lock_path = os.path.join(self.backups_dir, bk.LOCK_FILE)
self.assertTrue(os.path.exists(lock_path))
finally:
bk.release_backups_lock(self.backups_dir)
# After releasing lock, backup should work
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")]
self.assertEqual(len(backups), 1)
def test_multiple_source_directories(self):
"""Test backing up multiple source directories"""
# Create second source
source2_dir = os.path.join(self.temp_dir.name, "source2")
os.makedirs(source2_dir)
# Create files in both sources
with open(os.path.join(self.source_dir, "file1.txt"), "w") as f:
f.write("source1")
with open(os.path.join(source2_dir, "file2.txt"), "w") as f:
f.write("source2")
# Backup both sources
bk.initiate_backup(
sources=[self.source_dir, source2_dir],
backups_dir=self.backups_dir,
dry_run=False
)
# Verify both sources are in backup
backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")]
self.assertEqual(len(backups), 1)
backup_path = os.path.join(self.backups_dir, backups[0])
source1_name = os.path.basename(self.source_dir)
source2_name = os.path.basename(source2_dir)
self.assertTrue(os.path.exists(os.path.join(backup_path, source1_name, "file1.txt")))
self.assertTrue(os.path.exists(os.path.join(backup_path, source2_name, "file2.txt")))
class TestBackupRecovery(TestCase):
"""Integration tests for backup recovery scenarios."""
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory(prefix="test_recovery_")
self.backups_dir = os.path.join(self.temp_dir.name, "backups")
self.source_dir = os.path.join(self.temp_dir.name, "source")
self.restore_dir = os.path.join(self.temp_dir.name, "restore")
os.makedirs(self.backups_dir)
os.makedirs(self.source_dir)
def tearDown(self) -> None:
self.temp_dir.cleanup()
def test_restore_from_backup(self):
"""Test restoring files from a backup"""
# Create source files
src_file = os.path.join(self.source_dir, "important.txt")
with open(src_file, "w") as f:
f.write("important data")
# Create backup
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
# Simulate data loss - delete source
os.unlink(src_file)
self.assertFalse(os.path.exists(src_file))
# Restore from backup
backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")]
backup_path = os.path.join(self.backups_dir, backups[0])
source_name = os.path.basename(self.source_dir)
backed_up_file = os.path.join(backup_path, source_name, "important.txt")
# Verify file exists in backup
self.assertTrue(os.path.exists(backed_up_file))
# Restore file
os.makedirs(self.restore_dir)
import shutil
shutil.copy2(backed_up_file, os.path.join(self.restore_dir, "important.txt"))
# Verify restored content
with open(os.path.join(self.restore_dir, "important.txt"), "r") as f:
self.assertEqual(f.read(), "important data")
def test_find_file_version_in_old_backup(self):
"""Test finding an old version of a file"""
src_file = os.path.join(self.source_dir, "document.txt")
# Create version 1
with open(src_file, "w") as f:
f.write("version 1")
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Create version 2
with open(src_file, "w") as f:
f.write("version 2")
# Run cleanup with keep_all=10 (all should be kept)
bk.cleanup_old_backups(
backups_dir=str(backups_dir),
dry_run=False,
keep_all=10
)
bk.initiate_backup(
sources=[self.source_dir],
backups_dir=self.backups_dir,
dry_run=False
)
# All backups should still exist
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 3
# Verify we can access both versions
backups = sorted([b for b in os.listdir(self.backups_dir) if not b.startswith(".")])
source_name = os.path.basename(self.source_dir)
# First backup has version 1
backup1_file = os.path.join(self.backups_dir, backups[0], source_name, "document.txt")
with open(backup1_file, "r") as f:
self.assertEqual(f.read(), "version 1")
def test_dry_run_creates_no_backup(integration_dirs):
"""Test that dry run doesn't create actual backup"""
backups_dir, source_dir = integration_dirs
# Second backup has version 2
backup2_file = os.path.join(self.backups_dir, backups[1], source_name, "document.txt")
with open(backup2_file, "r") as f:
self.assertEqual(f.read(), "version 2")
(source_dir / "file.txt").write_text("content")
# Dry run backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=True
)
# No backup should be created
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 0
def test_no_backup_if_no_changes(integration_dirs):
"""Test that no backup is created if nothing changed"""
backups_dir, source_dir = integration_dirs
# Create initial file
(source_dir / "file.txt").write_text("content")
# First backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Second backup with no changes
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Only one backup should exist
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 1
def test_lock_prevents_concurrent_backups(integration_dirs):
"""Test that lock file prevents concurrent backup runs"""
backups_dir, source_dir = integration_dirs
(source_dir / "file.txt").write_text("content")
# Manually create lock file
lock_acquired = bk.set_backups_lock(str(backups_dir))
assert lock_acquired
try:
# Try to run backup (should be blocked by lock)
# We can't actually test this without spawning a subprocess,
# but we can verify the lock exists
lock_path = os.path.join(str(backups_dir), bk.LOCK_FILE)
assert os.path.exists(lock_path)
finally:
bk.release_backups_lock(str(backups_dir))
# After releasing lock, backup should work
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 1
def test_multiple_source_directories(integration_dirs, tmp_path):
"""Test backing up multiple source directories"""
backups_dir, source_dir = integration_dirs
# Create second source
source2_dir = tmp_path / "source2"
source2_dir.mkdir()
# Create files in both sources
(source_dir / "file1.txt").write_text("source1")
(source2_dir / "file2.txt").write_text("source2")
# Backup both sources
bk.initiate_backup(
sources=[str(source_dir), str(source2_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify both sources are in backup
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 1
backup_path = os.path.join(str(backups_dir), backups[0])
source1_name = os.path.basename(str(source_dir))
source2_name = os.path.basename(str(source2_dir))
assert os.path.exists(os.path.join(backup_path, source1_name,
"file1.txt"))
assert os.path.exists(os.path.join(backup_path, source2_name,
"file2.txt"))
@pytest.fixture
def recovery_dirs(tmp_path):
"""Setup recovery test directories."""
backups_dir = tmp_path / "backups"
source_dir = tmp_path / "source"
restore_dir = tmp_path / "restore"
backups_dir.mkdir()
source_dir.mkdir()
return backups_dir, source_dir, restore_dir
def test_restore_from_backup(recovery_dirs):
"""Test restoring files from a backup"""
backups_dir, source_dir, restore_dir = recovery_dirs
# Create source files
src_file = source_dir / "important.txt"
src_file.write_text("important data")
# Create backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Simulate data loss - delete source
os.unlink(str(src_file))
assert not os.path.exists(str(src_file))
# Restore from backup
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
backup_path = os.path.join(str(backups_dir), backups[0])
source_name = os.path.basename(str(source_dir))
backed_up_file = os.path.join(backup_path, source_name, "important.txt")
# Verify file exists in backup
assert os.path.exists(backed_up_file)
# Restore file
restore_dir.mkdir()
import shutil
shutil.copy2(backed_up_file, str(restore_dir / "important.txt"))
# Verify restored content
assert (restore_dir / "important.txt").read_text() == "important data"
def test_find_file_version_in_old_backup(recovery_dirs):
"""Test finding an old version of a file"""
backups_dir, source_dir, _ = recovery_dirs
src_file = source_dir / "document.txt"
# Create version 1
src_file.write_text("version 1")
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Create version 2
src_file.write_text("version 2")
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify we can access both versions
backups = sorted([b for b in os.listdir(str(backups_dir))
if not b.startswith(".")])
source_name = os.path.basename(str(source_dir))
# First backup has version 1
backup1_file = os.path.join(str(backups_dir), backups[0],
source_name, "document.txt")
with open(backup1_file, "r") as f:
assert f.read() == "version 1"
# Second backup has version 2
backup2_file = os.path.join(str(backups_dir), backups[1],
source_name, "document.txt")
with open(backup2_file, "r") as f:
assert f.read() == "version 2"