Files
cura-te-ipsum/tests/test_integration.py
2026-02-04 19:31:29 -08:00

377 lines
11 KiB
Python

"""
Integration tests for the full backup workflow.
Tests the complete backup process from start to finish.
"""
import os
import time
import pytest
from curateipsum import backup as bk
@pytest.fixture
def integration_dirs(tmp_path):
"""Setup integration test directories."""
backups_dir = tmp_path / "backups"
source_dir = tmp_path / "source"
backups_dir.mkdir()
source_dir.mkdir()
return backups_dir, source_dir
def test_initial_backup_creation(integration_dirs):
"""Test creating the first backup"""
backups_dir, source_dir = integration_dirs
# Create some files in source
(source_dir / "file1.txt").write_text("content1")
(source_dir / "file2.txt").write_text("content2")
# Run backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify backup was created
backups = os.listdir(str(backups_dir))
# Filter out lock files
backups = [b for b in backups if not b.startswith(".")]
assert len(backups) == 1
# Verify files exist in backup
backup_path = os.path.join(str(backups_dir), backups[0])
source_name = os.path.basename(str(source_dir))
backup_file1 = os.path.join(backup_path, source_name, "file1.txt")
backup_file2 = os.path.join(backup_path, source_name, "file2.txt")
assert os.path.exists(backup_file1)
assert os.path.exists(backup_file2)
# Verify backup marker exists
marker_files = [f for f in os.listdir(backup_path)
if f.startswith(".backup_finished")]
assert len(marker_files) == 1
def test_incremental_backup_with_hardlinks(integration_dirs):
"""Test that second backup uses hardlinks for unchanged files"""
backups_dir, source_dir = integration_dirs
# Create initial file
src_file = source_dir / "unchanged.txt"
src_file.write_text("unchanged content")
# First backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Wait a bit to ensure different timestamp
time.sleep(1.1)
# Second backup (no changes)
# Add a new file to trigger a new backup
(source_dir / "new.txt").write_text("new content")
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify two backups exist
backups = sorted([b for b in os.listdir(str(backups_dir))
if not b.startswith(".")])
assert len(backups) == 2
# Verify unchanged file is hardlinked
source_name = os.path.basename(str(source_dir))
file1_path = os.path.join(str(backups_dir), backups[0],
source_name, "unchanged.txt")
file2_path = os.path.join(str(backups_dir), backups[1],
source_name, "unchanged.txt")
stat1 = os.stat(file1_path)
stat2 = os.stat(file2_path)
# Same inode means hardlinked
assert stat1.st_ino == stat2.st_ino
# Link count should be 2
assert stat1.st_nlink == 2
def test_backup_delta_directory(integration_dirs):
"""Test that delta directory contains changed files"""
backups_dir, source_dir = integration_dirs
# Create initial file
(source_dir / "file.txt").write_text("original")
# First backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Modify file
(source_dir / "file.txt").write_text("modified")
# Second backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Check delta directory in second backup
backups = sorted([b for b in os.listdir(str(backups_dir))
if not b.startswith(".")])
second_backup = backups[1]
delta_dir = os.path.join(str(backups_dir), second_backup, bk.DELTA_DIR)
# Delta directory should exist and contain the modified file
assert os.path.isdir(delta_dir)
source_name = os.path.basename(str(source_dir))
delta_file = os.path.join(delta_dir, source_name, "file.txt")
assert os.path.exists(delta_file)
def test_cleanup_retains_recent_backups(integration_dirs):
"""Test that cleanup doesn't remove recent backups"""
backups_dir, source_dir = integration_dirs
# Create multiple backups
for i in range(3):
(source_dir / f"file{i}.txt").write_text(f"content {i}")
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Run cleanup with keep_all=10 (all should be kept)
bk.cleanup_old_backups(
backups_dir=str(backups_dir),
dry_run=False,
keep_all=10
)
# All backups should still exist
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 3
def test_dry_run_creates_no_backup(integration_dirs):
"""Test that dry run doesn't create actual backup"""
backups_dir, source_dir = integration_dirs
(source_dir / "file.txt").write_text("content")
# Dry run backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=True
)
# No backup should be created
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 0
def test_no_backup_if_no_changes(integration_dirs):
"""Test that no backup is created if nothing changed"""
backups_dir, source_dir = integration_dirs
# Create initial file
(source_dir / "file.txt").write_text("content")
# First backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Second backup with no changes
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Only one backup should exist
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 1
def test_lock_prevents_concurrent_backups(integration_dirs):
"""Test that lock file prevents concurrent backup runs"""
backups_dir, source_dir = integration_dirs
(source_dir / "file.txt").write_text("content")
# Manually create lock file
lock_acquired = bk.set_backups_lock(str(backups_dir))
assert lock_acquired
try:
# Try to run backup (should be blocked by lock)
# We can't actually test this without spawning a subprocess,
# but we can verify the lock exists
lock_path = os.path.join(str(backups_dir), bk.LOCK_FILE)
assert os.path.exists(lock_path)
finally:
bk.release_backups_lock(str(backups_dir))
# After releasing lock, backup should work
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 1
def test_multiple_source_directories(integration_dirs, tmp_path):
"""Test backing up multiple source directories"""
backups_dir, source_dir = integration_dirs
# Create second source
source2_dir = tmp_path / "source2"
source2_dir.mkdir()
# Create files in both sources
(source_dir / "file1.txt").write_text("source1")
(source2_dir / "file2.txt").write_text("source2")
# Backup both sources
bk.initiate_backup(
sources=[str(source_dir), str(source2_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify both sources are in backup
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
assert len(backups) == 1
backup_path = os.path.join(str(backups_dir), backups[0])
source1_name = os.path.basename(str(source_dir))
source2_name = os.path.basename(str(source2_dir))
assert os.path.exists(os.path.join(backup_path, source1_name,
"file1.txt"))
assert os.path.exists(os.path.join(backup_path, source2_name,
"file2.txt"))
@pytest.fixture
def recovery_dirs(tmp_path):
"""Setup recovery test directories."""
backups_dir = tmp_path / "backups"
source_dir = tmp_path / "source"
restore_dir = tmp_path / "restore"
backups_dir.mkdir()
source_dir.mkdir()
return backups_dir, source_dir, restore_dir
def test_restore_from_backup(recovery_dirs):
"""Test restoring files from a backup"""
backups_dir, source_dir, restore_dir = recovery_dirs
# Create source files
src_file = source_dir / "important.txt"
src_file.write_text("important data")
# Create backup
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Simulate data loss - delete source
os.unlink(str(src_file))
assert not os.path.exists(str(src_file))
# Restore from backup
backups = [b for b in os.listdir(str(backups_dir))
if not b.startswith(".")]
backup_path = os.path.join(str(backups_dir), backups[0])
source_name = os.path.basename(str(source_dir))
backed_up_file = os.path.join(backup_path, source_name, "important.txt")
# Verify file exists in backup
assert os.path.exists(backed_up_file)
# Restore file
restore_dir.mkdir()
import shutil
shutil.copy2(backed_up_file, str(restore_dir / "important.txt"))
# Verify restored content
assert (restore_dir / "important.txt").read_text() == "important data"
def test_find_file_version_in_old_backup(recovery_dirs):
"""Test finding an old version of a file"""
backups_dir, source_dir, _ = recovery_dirs
src_file = source_dir / "document.txt"
# Create version 1
src_file.write_text("version 1")
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
time.sleep(1.1)
# Create version 2
src_file.write_text("version 2")
bk.initiate_backup(
sources=[str(source_dir)],
backups_dir=str(backups_dir),
dry_run=False
)
# Verify we can access both versions
backups = sorted([b for b in os.listdir(str(backups_dir))
if not b.startswith(".")])
source_name = os.path.basename(str(source_dir))
# First backup has version 1
backup1_file = os.path.join(str(backups_dir), backups[0],
source_name, "document.txt")
with open(backup1_file, "r") as f:
assert f.read() == "version 1"
# Second backup has version 2
backup2_file = os.path.join(str(backups_dir), backups[1],
source_name, "document.txt")
with open(backup2_file, "r") as f:
assert f.read() == "version 2"