From ee1df4cb21bf91c2b7d9f641c763e759fad62577 Mon Sep 17 00:00:00 2001 From: Maks Snegov Date: Wed, 4 Feb 2026 19:31:29 -0800 Subject: [PATCH] Update tests --- tests/conftest.py | 77 ++++ tests/test_backups.py | 531 +++++++++++++++---------- tests/test_fs.py | 807 ++++++++++++++++++++++++++++++-------- tests/test_fs_extended.py | 429 -------------------- tests/test_integration.py | 668 ++++++++++++++++--------------- 5 files changed, 1386 insertions(+), 1126 deletions(-) create mode 100644 tests/conftest.py delete mode 100644 tests/test_fs_extended.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e7ecd2a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,77 @@ +"""Shared pytest fixtures for cura-te-ipsum tests.""" +import os +import random +import string +import tempfile +import pytest + +from curateipsum import backup as bk, fs + + +@pytest.fixture +def backup_dir(tmp_path): + """Provide a temporary backup directory.""" + return tmp_path / "backups" + + +@pytest.fixture +def source_dir(tmp_path): + """Provide a temporary source directory.""" + src = tmp_path / "source" + src.mkdir() + return src + + +@pytest.fixture +def add_backup(backup_dir): + """Factory fixture for creating test backups.""" + backup_dir.mkdir(parents=True, exist_ok=True) + + def _add_backup(backup_name: str) -> fs.PseudoDirEntry: + backup = fs.PseudoDirEntry(os.path.join(str(backup_dir), backup_name)) + os.mkdir(backup.path) + bk.set_backup_marker(backup) + + fd, path = tempfile.mkstemp(prefix="backup_file_", dir=backup.path) + with open(fd, "w") as f: + f.write(''.join(random.choices(string.printable, k=128))) + return backup + + return _add_backup + + +@pytest.fixture +def check_backup_not_empty(): + """Helper to verify backup is not empty.""" + def _check(backup: fs.PseudoDirEntry) -> bool: + return bool(os.listdir(backup.path)) + return _check + + +@pytest.fixture +def check_backups(backup_dir, check_backup_not_empty): + """Helper to verify expected backups exist.""" + def _check(expected_backups): + backups_list = os.listdir(str(backup_dir)) + assert sorted(b.name for b in expected_backups) == sorted(backups_list) + for b in expected_backups: + assert check_backup_not_empty(b) + return _check + + +@pytest.fixture +def run_cleanup(backup_dir): + """Helper to run cleanup_old_backups with default parameters.""" + def _run(**kwargs): + cleanup_kwargs = { + "backups_dir": str(backup_dir), + "dry_run": False, + "keep_all": None, + "keep_daily": None, + "keep_weekly": None, + "keep_monthly": None, + "keep_yearly": None, + } + cleanup_kwargs.update(**kwargs) + bk.cleanup_old_backups(**cleanup_kwargs) + return _run diff --git a/tests/test_backups.py b/tests/test_backups.py index c4f0b02..e4a8425 100644 --- a/tests/test_backups.py +++ b/tests/test_backups.py @@ -1,332 +1,449 @@ import os -import random -import string -import tempfile -from unittest import TestCase, mock +from unittest import mock from datetime import datetime -from curateipsum import backup as bk, fs +from curateipsum import backup as bk -class TestBackupCleanup(TestCase): - def setUp(self) -> None: - self.backup_dir = tempfile.TemporaryDirectory(prefix="backup_") +class TestBackupCleanup: + """Tests for backup cleanup and retention policies.""" - def tearDown(self) -> None: - self.backup_dir.cleanup() - - def _add_backup(self, backup_name: str) -> fs.PseudoDirEntry: - backup = fs.PseudoDirEntry(os.path.join(self.backup_dir.name, backup_name)) - os.mkdir(backup.path) - bk.set_backup_marker(backup) - - fd, path = tempfile.mkstemp(prefix="backup_file_", dir=backup.path) - with open(fd, "w") as f: - f.write(''.join(random.choices(string.printable, k=128))) - return backup - - @staticmethod - def _check_backup_not_empty(backup: fs.PseudoDirEntry) -> bool: - return bool(os.listdir(backup.path)) - - def _check_backups(self, expected_backups): - backups_list = os.listdir(self.backup_dir.name) - self.assertEqual(sorted(b.name for b in expected_backups), - sorted(backups_list)) - for b in expected_backups: - self.assertTrue(self._check_backup_not_empty(b)) - - def _run_cleanup(self, **kwargs): - """ Run cleanup_old_backups with null parameters. """ - cleanup_kwargs = { - "backups_dir": self.backup_dir.name, - "dry_run": False, - "keep_all": None, - "keep_daily": None, - "keep_weekly": None, - "keep_monthly": None, - "keep_yearly": None, - } - cleanup_kwargs.update(**kwargs) - bk.cleanup_old_backups(**cleanup_kwargs) - - def test_no_backups(self): - """ Test behaviour with no available backups """ - bk.cleanup_old_backups(self.backup_dir.name) - self.assertFalse(os.listdir(self.backup_dir.name)) + def test_no_backups(self, backup_dir, run_cleanup): + """Test behaviour with no available backups""" + backup_dir.mkdir() + bk.cleanup_old_backups(str(backup_dir)) + assert not os.listdir(str(backup_dir)) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_only_one_backup(self, mock_datetime): - """ Test the only backup will not be removed in any case """ + def test_only_one_backup(self, mock_datetime, add_backup, run_cleanup, + check_backups): + """Test the only backup will not be removed in any case""" mock_datetime.now.return_value = datetime(2021, 10, 20) # very old backup - only_backup = self._add_backup("20010101_0000") - self._run_cleanup(keep_all=1) - self._check_backups([only_backup]) + only_backup = add_backup("20010101_0000") + run_cleanup(keep_all=1) + check_backups([only_backup]) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_at_least_one_should_be_left(self, mock_datetime): - """ Test at least one backup should be left """ + def test_at_least_one_should_be_left(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test at least one backup should be left""" mock_datetime.now.return_value = datetime(2021, 10, 20) backups = [ - self._add_backup("20211103_0300"), # this one is the latest and should be kept - self._add_backup("20201216_0100"), # the rest should be removed - self._add_backup("20200716_0100"), - self._add_backup("20181116_0100"), + add_backup("20211103_0300"), # latest, should be kept + add_backup("20201216_0100"), # rest should be removed + add_backup("20200716_0100"), + add_backup("20181116_0100"), ] expected_backups = [backups[0]] - self._run_cleanup() - self._check_backups(expected_backups) + run_cleanup() + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_keep_all_threshold_only(self, mock_datetime): - """ Test threshold for keeping all backups """ + def test_keep_all_threshold_only(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test threshold for keeping all backups""" mock_datetime.now.return_value = datetime(2021, 10, 20) backups = [ - self._add_backup("20211019_0300"), # keep - self._add_backup("20211017_0100"), # keep - self._add_backup("20211016_2300"), # remove, older than 3 days + add_backup("20211019_0300"), # keep + add_backup("20211017_0100"), # keep + add_backup("20211016_2300"), # remove, older than 3 days ] expected_backups = backups[:2] - self._run_cleanup(keep_all=3) - self._check_backups(expected_backups) + run_cleanup(keep_all=3) + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_keep_daily_threshold_only(self, mock_datetime): - """ Test threshold for keeping daily backups """ + def test_keep_daily_threshold_only(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test threshold for keeping daily backups""" mock_datetime.now.return_value = datetime(2021, 10, 20) backups = [ - self._add_backup("20211019_0300"), # keep, first daily backup at 2021-10-19 - self._add_backup("20211017_2100"), # remove, not the first daily backup - self._add_backup("20211017_0100"), # remove, not the first daily backup - self._add_backup("20211017_0030"), # keep, first daily backup at 2021-10-17 - self._add_backup("20211016_2300"), # remove, older than 3 days - self._add_backup("20211016_0100"), # remove, older than 3 days + add_backup("20211019_0300"), # keep, first daily at 2021-10-19 + add_backup("20211017_2100"), # remove, not first daily + add_backup("20211017_0100"), # remove, not first daily + add_backup("20211017_0030"), # keep, first daily at 2021-10-17 + add_backup("20211016_2300"), # remove, older than 3 days + add_backup("20211016_0100"), # remove, older than 3 days ] expected_backups = [backups[0], backups[3]] - self._run_cleanup(keep_daily=3) - self._check_backups(expected_backups) + run_cleanup(keep_daily=3) + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_keep_all_and_daily_thresholds(self, mock_datetime): - """ Test threshold for keeping all and daily backups """ + def test_keep_all_and_daily_thresholds(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test threshold for keeping all and daily backups""" mock_datetime.now.return_value = datetime(2021, 10, 20) backups = [ - self._add_backup("20211019_0300"), # keep, newer than 3 days - self._add_backup("20211017_0200"), # keep, newer than 3 days - self._add_backup("20211017_0100"), # keep, newer than 3 days - self._add_backup("20211016_2300"), # remove, not the first daily backup - self._add_backup("20211016_2200"), # keep, the first daily backup at 2021-10-16 - self._add_backup("20211015_2200"), # remove, not the first daily backup - self._add_backup("20211015_1500"), # remove, not the first daily backup - self._add_backup("20211015_0200"), # keep, the first daily backup at 2021-10-15 - self._add_backup("20211014_2200"), # remove, older than 5 days - self._add_backup("20211014_2000"), # remove, older than 5 days - self._add_backup("20211014_1232"), # remove, older than 5 days + add_backup("20211019_0300"), # keep, newer than 3 days + add_backup("20211017_0200"), # keep, newer than 3 days + add_backup("20211017_0100"), # keep, newer than 3 days + add_backup("20211016_2300"), # remove, not first daily + add_backup("20211016_2200"), # keep, first daily at 2021-10-16 + add_backup("20211015_2200"), # remove, not first daily + add_backup("20211015_1500"), # remove, not first daily + add_backup("20211015_0200"), # keep, first daily at 2021-10-15 + add_backup("20211014_2200"), # remove, older than 5 days + add_backup("20211014_2000"), # remove, older than 5 days + add_backup("20211014_1232"), # remove, older than 5 days ] expected_backups = backups[0:3] + [backups[4]] + [backups[7]] - self._run_cleanup(keep_all=3, keep_daily=5) - self._check_backups(expected_backups) + run_cleanup(keep_all=3, keep_daily=5) + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_keep_weekly_threshold_only(self, mock_datetime): - """ Test threshold for keeping weekly backups """ + def test_keep_weekly_threshold_only(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test threshold for keeping weekly backups""" mock_datetime.now.return_value = datetime(2021, 11, 11) backups = [ - self._add_backup("20211111_0300"), # remove, not the first weekly backup (Thursday) - self._add_backup("20211110_0300"), # remove, not the first weekly backup (Wednesday) - self._add_backup("20211108_0100"), # keep, first weekly backup at 2021-11-08 (Monday) - self._add_backup("20211107_2300"), # remove, not the first weekly backup (Sunday) - self._add_backup("20211107_0100"), # keep, first weekly backup at 2021-11-07 (Sunday) - self._add_backup("20211031_0100"), # remove, not the first weekly backup (Sunday) - self._add_backup("20211025_0100"), # keep, first weekly backup at 2021-10-25 (Monday) - self._add_backup("20211024_0100"), # remove, not the first weekly backup (Sunday) - self._add_backup("20211023_0100"), # remove, not the first weekly backup (Saturday) - self._add_backup("20211022_0100"), # keep, first weekly backup at 2021-10-22 (Friday) - self._add_backup("20211008_0100"), # remove, not the first weekly backup (Friday) - self._add_backup("20211007_0100"), # remove, not the first weekly backup (Thursday) - self._add_backup("20211004_0100"), # keep, first weekly backup at 2021-10-04 (Monday) - self._add_backup("20211003_0100"), # remove, older than 5 weeks - self._add_backup("20211002_0100"), # remove, older than 5 weeks + add_backup("20211111_0300"), # remove, not first weekly (Thu) + add_backup("20211110_0300"), # remove, not first weekly (Wed) + add_backup("20211108_0100"), # keep, first weekly 2021-11-08 (Mon) + add_backup("20211107_2300"), # remove, not first weekly (Sun) + add_backup("20211107_0100"), # keep, first weekly 2021-11-07 (Sun) + add_backup("20211031_0100"), # remove, not first weekly (Sun) + add_backup("20211025_0100"), # keep, first weekly 2021-10-25 (Mon) + add_backup("20211024_0100"), # remove, not first weekly (Sun) + add_backup("20211023_0100"), # remove, not first weekly (Sat) + add_backup("20211022_0100"), # keep, first weekly 2021-10-22 (Fri) + add_backup("20211008_0100"), # remove, not first weekly (Fri) + add_backup("20211007_0100"), # remove, not first weekly (Thu) + add_backup("20211004_0100"), # keep, first weekly 2021-10-04 (Mon) + add_backup("20211003_0100"), # remove, older than 5 weeks + add_backup("20211002_0100"), # remove, older than 5 weeks ] expected_backups = [backups[2], backups[4], backups[6], backups[9], backups[12]] - self._run_cleanup(keep_weekly=5) - self._check_backups(expected_backups) + run_cleanup(keep_weekly=5) + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_keep_weekly_threshold_inclusive(self, mock_datetime): - """ Test threshold for keeping weekly backups """ + def test_keep_weekly_threshold_inclusive(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test threshold for keeping weekly backups""" mock_datetime.now.return_value = datetime(2021, 11, 11) backups = [ - self._add_backup("20211111_0300"), # remove, not the first weekly backup (Thursday) - self._add_backup("20211110_0300"), # keep, first weekly backup (Wednesday) - self._add_backup("20211107_0100"), # remove, not the first weekly backup (Sunday) - self._add_backup("20211102_0100"), # keep, first weekly backup (Tuesday) + add_backup("20211111_0300"), # remove, not first weekly (Thu) + add_backup("20211110_0300"), # keep, first weekly (Wed) + add_backup("20211107_0100"), # remove, not first weekly (Sun) + add_backup("20211102_0100"), # keep, first weekly (Tue) ] expected_backups = [backups[1], backups[3]] - self._run_cleanup(keep_weekly=5) - self._check_backups(expected_backups) + run_cleanup(keep_weekly=5) + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_keep_monthly_threshold_only(self, mock_datetime): - """ Test threshold for keeping monthly backups """ + def test_keep_monthly_threshold_only(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test threshold for keeping monthly backups""" mock_datetime.now.return_value = datetime(2021, 11, 11) backups = [ - self._add_backup("20211103_0300"), # keep, first monthly backup at 2021-11 - self._add_backup("20211019_0300"), # remove, not the first monthly backup - self._add_backup("20211017_2100"), # remove, not the first monthly backup - self._add_backup("20211017_0100"), # keep, first monthly backup at 2021-10 - self._add_backup("20210916_2300"), # remove, not the first monthly backup - self._add_backup("20210916_0100"), # keep, first monthly backup at 2021-09 - self._add_backup("20210816_0100"), # remove, not the first monthly backup - self._add_backup("20210810_0000"), # keep, first monthly backup at 2021-08 - self._add_backup("20210716_0100"), # remove, older than 3 months - self._add_backup("20210715_0100"), # remove, older than 3 months + add_backup("20211103_0300"), # keep, first monthly at 2021-11 + add_backup("20211019_0300"), # remove, not first monthly + add_backup("20211017_2100"), # remove, not first monthly + add_backup("20211017_0100"), # keep, first monthly at 2021-10 + add_backup("20210916_2300"), # remove, not first monthly + add_backup("20210916_0100"), # keep, first monthly at 2021-09 + add_backup("20210816_0100"), # remove, not first monthly + add_backup("20210810_0000"), # keep, first monthly at 2021-08 + add_backup("20210716_0100"), # remove, older than 3 months + add_backup("20210715_0100"), # remove, older than 3 months ] expected_backups = [backups[0], backups[3], backups[5], backups[7]] - self._run_cleanup(keep_monthly=3) - self._check_backups(expected_backups) + run_cleanup(keep_monthly=3) + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_keep_yearly_threshold_only(self, mock_datetime): - """ Test threshold for keeping yearly backups """ + def test_keep_yearly_threshold_only(self, mock_datetime, add_backup, + run_cleanup, check_backups): + """Test threshold for keeping yearly backups""" mock_datetime.now.return_value = datetime(2021, 11, 11) backups = [ - self._add_backup("20211103_0300"), # remove, not the first yearly backup in 2021 - self._add_backup("20210810_0000"), # remove, not the first yearly backup in 2021 - self._add_backup("20210716_0100"), # keep, first yearly backup in 2021 - self._add_backup("20201216_0100"), # remove, not the first yearly backup in 2020 - self._add_backup("20200716_0100"), # keep, first yearly backup in 2020 - self._add_backup("20191216_0100"), # remove, not the first yearly backup in 2019 - self._add_backup("20190316_0100"), # keep, first yearly backup in 2019 - self._add_backup("20181216_0100"), # remove, not the first yearly backup in 2018 - self._add_backup("20181116_0100"), # keep, first yearly backup in 2018 - self._add_backup("20171116_0100"), # remove, older than 3 years - self._add_backup("20171115_0100"), # remove, older than 3 years + add_backup("20211103_0300"), # remove, not first yearly in 2021 + add_backup("20210810_0000"), # remove, not first yearly in 2021 + add_backup("20210716_0100"), # keep, first yearly in 2021 + add_backup("20201216_0100"), # remove, not first yearly in 2020 + add_backup("20200716_0100"), # keep, first yearly in 2020 + add_backup("20191216_0100"), # remove, not first yearly in 2019 + add_backup("20190316_0100"), # keep, first yearly in 2019 + add_backup("20181216_0100"), # remove, not first yearly in 2018 + add_backup("20181116_0100"), # keep, first yearly in 2018 + add_backup("20171116_0100"), # remove, older than 3 years + add_backup("20171115_0100"), # remove, older than 3 years ] expected_backups = [backups[2], backups[4], backups[6], backups[8]] - self._run_cleanup(keep_yearly=3) - self._check_backups(expected_backups) + run_cleanup(keep_yearly=3) + check_backups(expected_backups) @mock.patch(f"{bk.__name__}.datetime", wraps=datetime) - def test_dry_run(self, mock_datetime): - """ Test dry run does not remove anything """ + def test_dry_run(self, mock_datetime, add_backup, run_cleanup, + check_backups): + """Test dry run does not remove anything""" mock_datetime.now.return_value = datetime(2021, 11, 11) backups = [ - self._add_backup("20211103_0300"), - self._add_backup("20210810_0000"), - self._add_backup("20210716_0100"), - self._add_backup("20200716_0100"), - self._add_backup("20181116_0100"), + add_backup("20211103_0300"), + add_backup("20210810_0000"), + add_backup("20210716_0100"), + add_backup("20200716_0100"), + add_backup("20181116_0100"), ] - self._run_cleanup(keep_all=2, dry_run=True) - self._check_backups(backups) + run_cleanup(keep_all=2, dry_run=True) + check_backups(backups) -class TestBackupLock(TestCase): +class TestBackupLock: """Test suite for backup lock file functionality.""" - def setUp(self) -> None: - self.backup_dir = tempfile.TemporaryDirectory(prefix="backup_lock_") - - def tearDown(self) -> None: - self.backup_dir.cleanup() - - def test_lock_creation(self): + def test_lock_creation(self, backup_dir): """Test that lock file is created with current PID""" - result = bk.set_backups_lock(self.backup_dir.name) - self.assertTrue(result) + backup_dir.mkdir() + result = bk.set_backups_lock(str(backup_dir)) + assert result - lock_path = os.path.join(self.backup_dir.name, bk.LOCK_FILE) - self.assertTrue(os.path.exists(lock_path)) + lock_path = os.path.join(str(backup_dir), bk.LOCK_FILE) + assert os.path.exists(lock_path) with open(lock_path, "r") as f: pid = int(f.read().strip()) - self.assertEqual(pid, os.getpid()) + assert pid == os.getpid() - def test_lock_prevents_concurrent_backup(self): + def test_lock_prevents_concurrent_backup(self, backup_dir): """Test that second lock acquisition is blocked""" + backup_dir.mkdir() # First lock should succeed - result1 = bk.set_backups_lock(self.backup_dir.name) - self.assertTrue(result1) + result1 = bk.set_backups_lock(str(backup_dir)) + assert result1 - # Second lock should fail (same process trying to lock again) + # The second lock should fail (same process trying to lock again) # Write a different PID to simulate another process - lock_path = os.path.join(self.backup_dir.name, bk.LOCK_FILE) + lock_path = os.path.join(str(backup_dir), bk.LOCK_FILE) with open(lock_path, "w") as f: f.write(str(os.getpid())) - result2 = bk.set_backups_lock(self.backup_dir.name, force=False) - self.assertFalse(result2) + result2 = bk.set_backups_lock(str(backup_dir), force=False) + assert not result2 - def test_stale_lock_is_removed(self): + def test_stale_lock_is_removed(self, backup_dir): """Test that lock from non-existent process is cleaned up""" - lock_path = os.path.join(self.backup_dir.name, bk.LOCK_FILE) + backup_dir.mkdir() + lock_path = os.path.join(str(backup_dir), bk.LOCK_FILE) # Create lock with non-existent PID with open(lock_path, "w") as f: f.write("999999") # Lock should succeed by removing stale lock - result = bk.set_backups_lock(self.backup_dir.name) - self.assertTrue(result) + result = bk.set_backups_lock(str(backup_dir)) + assert result # Verify new lock has current PID with open(lock_path, "r") as f: pid = int(f.read().strip()) - self.assertEqual(pid, os.getpid()) + assert pid == os.getpid() - def test_corrupted_lock_is_handled(self): + def test_corrupted_lock_is_handled(self, backup_dir): """Test that corrupted lock file is handled gracefully""" - lock_path = os.path.join(self.backup_dir.name, bk.LOCK_FILE) + backup_dir.mkdir() + lock_path = os.path.join(str(backup_dir), bk.LOCK_FILE) # Create corrupted lock file (non-numeric content) with open(lock_path, "w") as f: f.write("not a number") # Lock should succeed by removing corrupted lock - result = bk.set_backups_lock(self.backup_dir.name) - self.assertTrue(result) + result = bk.set_backups_lock(str(backup_dir)) + assert result # Verify new lock has current PID with open(lock_path, "r") as f: pid = int(f.read().strip()) - self.assertEqual(pid, os.getpid()) + assert pid == os.getpid() - def test_empty_lock_is_handled(self): + def test_empty_lock_is_handled(self, backup_dir): """Test that empty lock file is handled gracefully""" - lock_path = os.path.join(self.backup_dir.name, bk.LOCK_FILE) + backup_dir.mkdir() + lock_path = os.path.join(str(backup_dir), bk.LOCK_FILE) - # Create empty lock file + # Create the empty lock file open(lock_path, "w").close() # Lock should succeed by removing empty lock - result = bk.set_backups_lock(self.backup_dir.name) - self.assertTrue(result) + result = bk.set_backups_lock(str(backup_dir)) + assert result # Verify new lock has current PID with open(lock_path, "r") as f: pid = int(f.read().strip()) - self.assertEqual(pid, os.getpid()) + assert pid == os.getpid() - def test_lock_release(self): + def test_lock_release(self, backup_dir): """Test that lock file is properly released""" - bk.set_backups_lock(self.backup_dir.name) - lock_path = os.path.join(self.backup_dir.name, bk.LOCK_FILE) - self.assertTrue(os.path.exists(lock_path)) + backup_dir.mkdir() + bk.set_backups_lock(str(backup_dir)) + lock_path = os.path.join(str(backup_dir), bk.LOCK_FILE) + assert os.path.exists(lock_path) - bk.release_backups_lock(self.backup_dir.name) - self.assertFalse(os.path.exists(lock_path)) + bk.release_backups_lock(str(backup_dir)) + assert not os.path.exists(lock_path) - def test_release_nonexistent_lock(self): + def test_release_nonexistent_lock(self, backup_dir): """Test that releasing non-existent lock doesn't raise error""" + backup_dir.mkdir() # Should not raise any exception - bk.release_backups_lock(self.backup_dir.name) + bk.release_backups_lock(str(backup_dir)) - lock_path = os.path.join(self.backup_dir.name, bk.LOCK_FILE) - self.assertFalse(os.path.exists(lock_path)) + lock_path = os.path.join(str(backup_dir), bk.LOCK_FILE) + assert not os.path.exists(lock_path) -# TODO add tests for iterating over backups (marker, dirname) +class TestBackupIteration: + """Tests for internal backup iteration and validation functions.""" + + def test_is_backup_valid_backup(self, add_backup): + """Test _is_backup recognizes valid backup directory""" + backup = add_backup("20210101_120000") + assert bk._is_backup(backup) + + def test_is_backup_missing_marker(self, backup_dir, tmp_path): + """Test _is_backup rejects directory without marker""" + backup_dir.mkdir() + backup_path = backup_dir / "20210101_120000" + backup_path.mkdir() + # Create content but no marker + (backup_path / "file.txt").write_text("content") + + entry = os.scandir(str(backup_dir)) + backup = next(entry) + entry.close() + assert not bk._is_backup(backup) + + def test_is_backup_only_marker_no_content(self, backup_dir): + """Test _is_backup rejects directory with only marker file""" + backup_dir.mkdir() + backup_path = backup_dir / "20210101_120000" + backup_path.mkdir() + # Create only marker, no content + marker_name = f"{bk.BACKUP_MARKER}_20210101_120000" + (backup_path / marker_name).touch() + + entry = os.scandir(str(backup_dir)) + backup = next(entry) + entry.close() + assert not bk._is_backup(backup) + + def test_is_backup_invalid_name_format(self, backup_dir): + """Test _is_backup rejects invalid directory name""" + backup_dir.mkdir() + backup_path = backup_dir / "not-a-backup" + backup_path.mkdir() + # Create marker and content + marker_name = f"{bk.BACKUP_MARKER}_not-a-backup" + (backup_path / marker_name).touch() + (backup_path / "file.txt").write_text("content") + + entry = os.scandir(str(backup_dir)) + backup = next(entry) + entry.close() + assert not bk._is_backup(backup) + + def test_iterate_backups_empty_directory(self, backup_dir): + """Test _iterate_backups on empty directory""" + backup_dir.mkdir() + backups = list(bk._iterate_backups(str(backup_dir))) + assert backups == [] + + def test_iterate_backups_mixed_contents(self, backup_dir, add_backup): + """Test _iterate_backups filters non-backup entries""" + # Create valid backups + backup1 = add_backup("20210101_120000") + backup2 = add_backup("20210102_120000") + + # Create invalid entries + (backup_dir / "random_file.txt").write_text("not a backup") + (backup_dir / "invalid_dir").mkdir() + (backup_dir / bk.LOCK_FILE).touch() + + backups = sorted(bk._iterate_backups(str(backup_dir)), + key=lambda e: e.name) + assert len(backups) == 2 + assert backups[0].name == backup1.name + assert backups[1].name == backup2.name + + def test_iterate_backups_incomplete_backup(self, backup_dir): + """Test _iterate_backups skips backup without marker""" + backup_dir.mkdir() + # Create complete backup + complete = backup_dir / "20210101_120000" + complete.mkdir() + (complete / "file.txt").write_text("content") + marker_name = f"{bk.BACKUP_MARKER}_20210101_120000" + (complete / marker_name).touch() + + # Create incomplete backup (no marker) + incomplete = backup_dir / "20210102_120000" + incomplete.mkdir() + (incomplete / "file.txt").write_text("content") + + backups = list(bk._iterate_backups(str(backup_dir))) + assert len(backups) == 1 + assert backups[0].name == "20210101_120000" + + def test_get_latest_backup_returns_most_recent(self, backup_dir, + add_backup): + """Test _get_latest_backup returns most recent backup""" + add_backup("20210101_120000") + add_backup("20210102_120000") + latest = add_backup("20210103_120000") + + result = bk._get_latest_backup(str(backup_dir)) + assert result is not None + assert result.name == latest.name + + def test_get_latest_backup_empty_directory(self, backup_dir): + """Test _get_latest_backup returns None for empty directory""" + backup_dir.mkdir() + result = bk._get_latest_backup(str(backup_dir)) + assert result is None + + def test_get_latest_backup_no_valid_backups(self, backup_dir): + """Test _get_latest_backup returns None with no valid backups""" + backup_dir.mkdir() + # Create incomplete backup + incomplete = backup_dir / "20210101_120000" + incomplete.mkdir() + (incomplete / "file.txt").write_text("content") + # no marker + + result = bk._get_latest_backup(str(backup_dir)) + assert result is None + + def test_set_backup_marker_creates_marker(self, backup_dir): + """Test set_backup_marker creates marker file""" + backup_dir.mkdir() + backup_path = backup_dir / "20210101_120000" + backup_path.mkdir() + + backup_entry = bk.fs.PseudoDirEntry(str(backup_path)) + bk.set_backup_marker(backup_entry) + + marker_name = f"{bk.BACKUP_MARKER}_20210101_120000" + marker_path = backup_path / marker_name + assert marker_path.exists() + + def test_set_backup_marker_idempotent(self, backup_dir): + """Test set_backup_marker is idempotent""" + backup_dir.mkdir() + backup_path = backup_dir / "20210101_120000" + backup_path.mkdir() + + backup_entry = bk.fs.PseudoDirEntry(str(backup_path)) + bk.set_backup_marker(backup_entry) + # Call again - should not fail + bk.set_backup_marker(backup_entry) + + marker_name = f"{bk.BACKUP_MARKER}_20210101_120000" + marker_path = backup_path / marker_name + assert marker_path.exists() diff --git a/tests/test_fs.py b/tests/test_fs.py index 9417cf9..cfdbde7 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -4,276 +4,301 @@ import shutil import socket import string import tempfile -import unittest + +import pytest from curateipsum import fs -class CommonFSTestCase(unittest.TestCase): - def setUp(self): - self.tmp_dir_src = tempfile.TemporaryDirectory(prefix="source_") - self.tmp_dir_dst = tempfile.TemporaryDirectory(prefix="dest_") - self.src_dir = self.tmp_dir_src.name - self.dst_dir = self.tmp_dir_dst.name - - def tearDown(self): - self.tmp_dir_src.cleanup() - self.tmp_dir_dst.cleanup() - - @staticmethod - def create_file(parent_dir: str, prefix: str = None) -> str: - """ - Create file with random name in parent_dir. - Returns absolute path to created file. - """ - fd, path = tempfile.mkstemp(prefix=prefix, dir=parent_dir) - with open(fd, "w") as f: - f.write(string.printable) - return path - - @staticmethod - def create_dir(parent_dir: str, prefix: str = None) -> str: - """ - Create directory with random name in parent_dir. - Returns absolute path to created directory. - """ - return tempfile.mkdtemp(prefix=prefix, dir=parent_dir) - - def relpath(self, full_path: str) -> str: - """ Get relative path for entity in src/dst dirs. """ - if full_path.startswith(self.src_dir): - p_dir = self.src_dir - elif full_path.startswith(self.dst_dir): - p_dir = self.dst_dir - else: - raise RuntimeError(f"Path {full_path} is not src_dir nor dst_dir") - - return full_path[len(p_dir) + 1 :] +@pytest.fixture +def common_fs_dirs(tmp_path): + """Create source and destination directories for tests.""" + src_dir = tmp_path / "source" + dst_dir = tmp_path / "dest" + src_dir.mkdir() + dst_dir.mkdir() + return src_dir, dst_dir -class TestHardlinkDir(CommonFSTestCase): - def setUp(self): - self.tmp_dir = tempfile.TemporaryDirectory(prefix="source_") - self.src_dir = self.tmp_dir.name - self.dst_dir = self.src_dir + ".copy" +def create_file(parent_dir: str, prefix: str = None) -> str: + """ + Create a file with random name in parent_dir. + Returns absolute path to the created file. + """ + fd, path = tempfile.mkstemp(prefix=prefix, dir=parent_dir) + with open(fd, "w") as f: + f.write(string.printable) + return path - @staticmethod - def check_directory_stats(d1_path: str, d2_path: str): - """ Check that directory was copied. Fails test, if not. """ - dir1_stat = os.lstat(d1_path) - dir2_stat = os.lstat(d2_path) - assert dir1_stat.st_uid == dir2_stat.st_uid - assert dir1_stat.st_gid == dir2_stat.st_gid - assert dir1_stat.st_mode == dir2_stat.st_mode - assert dir1_stat.st_nlink == dir2_stat.st_nlink - assert dir1_stat.st_size == dir2_stat.st_size - assert dir1_stat.st_size == dir2_stat.st_size - # only mtime is checked - assert dir1_stat.st_mtime == dir2_stat.st_mtime +def create_dir(parent_dir: str, prefix: str = None) -> str: + """ + Create a directory with random name in parent_dir. + Returns absolute path to created directory. + """ + return tempfile.mkdtemp(prefix=prefix, dir=parent_dir) - def test_common_file(self): - cf_path = self.create_file(self.src_dir) - cf_relpath = self.relpath(cf_path) - fs.hardlink_dir(self.src_dir, self.dst_dir) +def relpath(full_path: str, src_dir: str, dst_dir: str) -> str: + """Get a relative path for entity in src/dst dirs.""" + if full_path.startswith(src_dir): + p_dir = src_dir + elif full_path.startswith(dst_dir): + p_dir = dst_dir + else: + raise RuntimeError(f"Path {full_path} is not src_dir nor dst_dir") + + return full_path[len(p_dir) + 1 :] + + +@pytest.fixture +def hardlink_dirs(tmp_path): + """Create source directory and destination path for hardlink tests.""" + src_dir = str(tmp_path / "source") + os.mkdir(src_dir) + dst_dir = src_dir + ".copy" + yield src_dir, dst_dir + shutil.rmtree(dst_dir, ignore_errors=True) + + +def check_directory_stats(d1_path: str, d2_path: str): + """Check that directory was copied. Fails test, if not.""" + dir1_stat = os.lstat(d1_path) + dir2_stat = os.lstat(d2_path) + + assert dir1_stat.st_uid == dir2_stat.st_uid + assert dir1_stat.st_gid == dir2_stat.st_gid + assert dir1_stat.st_mode == dir2_stat.st_mode + assert dir1_stat.st_nlink == dir2_stat.st_nlink + assert dir1_stat.st_size == dir2_stat.st_size + assert dir1_stat.st_mtime == dir2_stat.st_mtime + + +class TestHardlinkDir: + def test_regular_file(self, hardlink_dirs): + src_dir, dst_dir = hardlink_dirs + cf_path = create_file(src_dir) + cf_relpath = relpath(cf_path, src_dir, dst_dir) + + fs.hardlink_dir(src_dir, dst_dir) src_stat = os.lstat(cf_path) - dst_stat = os.lstat(os.path.join(self.src_dir, cf_relpath)) + dst_stat = os.lstat(os.path.join(src_dir, cf_relpath)) assert os.path.samestat(src_stat, dst_stat) assert src_stat.st_nlink == 2 - def test_relative_symlink_to_common_file(self): - cf_relpath = self.relpath(self.create_file(self.src_dir)) + def test_relative_symlink_to_common_file(self, hardlink_dirs): + src_dir, dst_dir = hardlink_dirs + cf_relpath = relpath(create_file(src_dir), src_dir, dst_dir) sl2cf_relpath = "symlink_to_common_file" - os.chdir(self.src_dir) + os.chdir(src_dir) os.symlink(cf_relpath, sl2cf_relpath) - fs.hardlink_dir(self.src_dir, self.dst_dir) + fs.hardlink_dir(src_dir, dst_dir) - # check link - dst_sl2cf_path = os.path.join(self.dst_dir, sl2cf_relpath) + # check the link + dst_sl2cf_path = os.path.join(dst_dir, sl2cf_relpath) assert os.readlink(dst_sl2cf_path) == cf_relpath # check stats - src_stat = os.lstat(os.path.join(self.dst_dir, sl2cf_relpath)) + src_stat = os.lstat(os.path.join(dst_dir, sl2cf_relpath)) dst_stat = os.lstat(dst_sl2cf_path) assert os.path.samestat(src_stat, dst_stat) assert src_stat.st_nlink == 2 - def test_absolute_symlink_to_common_file(self): - cf_path = self.create_file(self.src_dir) + def test_absolute_symlink_to_common_file(self, hardlink_dirs): + src_dir, dst_dir = hardlink_dirs + cf_path = create_file(src_dir) sl2cf_relpath = "symlink_to_common_file" - sl2cf_path = os.path.join(self.src_dir, sl2cf_relpath) + sl2cf_path = os.path.join(src_dir, sl2cf_relpath) os.symlink(cf_path, sl2cf_path) - fs.hardlink_dir(self.src_dir, self.dst_dir) + fs.hardlink_dir(src_dir, dst_dir) - # check link - dst_sl2cf_path = os.path.join(self.dst_dir, sl2cf_relpath) + # check the link + dst_sl2cf_path = os.path.join(dst_dir, sl2cf_relpath) assert os.readlink(dst_sl2cf_path) == cf_path # check stats - src_stat = os.lstat(os.path.join(self.dst_dir, sl2cf_relpath)) + src_stat = os.lstat(os.path.join(dst_dir, sl2cf_relpath)) dst_stat = os.lstat(dst_sl2cf_path) assert os.path.samestat(src_stat, dst_stat) assert src_stat.st_nlink == 2 - def test_hardlink_to_common_file(self): - cf_path = self.create_file(self.src_dir) + def test_hardlink_to_common_file(self, hardlink_dirs): + src_dir, dst_dir = hardlink_dirs + cf_path = create_file(src_dir) hl2cf_relpath = "hardlink_to_common_file" - hl2cf_path = os.path.join(self.src_dir, hl2cf_relpath) + hl2cf_path = os.path.join(src_dir, hl2cf_relpath) os.link(cf_path, hl2cf_path) - fs.hardlink_dir(self.src_dir, self.dst_dir) + fs.hardlink_dir(src_dir, dst_dir) src_cf_stat = os.lstat(cf_path) src_hl_stat = os.lstat(hl2cf_path) - dst_hl_stat = os.lstat(os.path.join(self.dst_dir, hl2cf_relpath)) + dst_hl_stat = os.lstat(os.path.join(dst_dir, hl2cf_relpath)) assert os.path.samestat(src_cf_stat, dst_hl_stat) assert os.path.samestat(src_hl_stat, dst_hl_stat) assert src_cf_stat.st_nlink == 4 - def test_nested_dir(self): - src_ndir_path = self.create_dir(self.src_dir) - src_nfile_path = self.create_file(src_ndir_path) - ndir_relpath = self.relpath(src_ndir_path) - nfile_relpath = self.relpath(src_nfile_path) + def test_nested_dir(self, hardlink_dirs): + src_dir, dst_dir = hardlink_dirs + src_ndir_path = create_dir(src_dir) + src_nfile_path = create_file(src_ndir_path) + ndir_relpath = relpath(src_ndir_path, src_dir, dst_dir) + nfile_relpath = relpath(src_nfile_path, src_dir, dst_dir) - fs.hardlink_dir(self.src_dir, self.dst_dir) - self.check_directory_stats(src_ndir_path, - os.path.join(self.dst_dir, ndir_relpath)) + fs.hardlink_dir(src_dir, dst_dir) + check_directory_stats(src_ndir_path, + os.path.join(dst_dir, ndir_relpath)) - # check file in nested directory + # check the file in nested directory src_fstat = os.lstat(src_nfile_path) - dst_fstat = os.lstat(os.path.join(self.dst_dir, nfile_relpath)) + dst_fstat = os.lstat(os.path.join(dst_dir, nfile_relpath)) assert os.path.samestat(src_fstat, dst_fstat) assert src_fstat.st_nlink == 2 - def tearDown(self): - self.tmp_dir.cleanup() - shutil.rmtree(self.dst_dir, ignore_errors=True) + +def check_identical_file(f1_path: str, f2_path: str): + """Check that files are identical. Fails test, if not.""" + st1 = os.lstat(f1_path) + st2 = os.lstat(f2_path) + + assert st1.st_uid == st2.st_uid + assert st1.st_gid == st2.st_gid + assert st1.st_mode == st2.st_mode + assert st1.st_mtime == st2.st_mtime + assert st1.st_size == st2.st_size -class TestRsync(CommonFSTestCase): - @staticmethod - def check_identical_file(f1_path: str, f2_path: str): - """ Check that files are identical. Fails test, if not. """ - st1 = os.lstat(f1_path) - st2 = os.lstat(f2_path) +class TestRsync: + def test_dst_has_excess_file(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_fpath = create_file(str(dst_dir)) - assert st1.st_uid == st2.st_uid - assert st1.st_gid == st2.st_gid - assert st1.st_mode == st2.st_mode - assert st1.st_mtime == st2.st_mtime - assert st1.st_size == st2.st_size - - def test_dst_has_excess_file(self): - dst_fpath = self.create_file(self.dst_dir) - - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert not os.path.lexists(dst_fpath) - def test_dst_has_excess_symlink(self): - dst_lpath = os.path.join(self.dst_dir, 'nonexisting_file') + def test_dst_has_excess_symlink(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_lpath = os.path.join(str(dst_dir), 'nonexisting_file') os.symlink('broken_symlink', dst_lpath) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert not os.path.lexists(dst_lpath) - def test_dst_has_excess_empty_dir(self): - dst_dpath = self.create_dir(self.dst_dir) + def test_dst_has_excess_empty_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_dpath = create_dir(str(dst_dir)) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert not os.path.lexists(dst_dpath) - def test_dst_has_excess_nonempty_dir(self): - dst_dpath = self.create_dir(self.dst_dir) - self.create_file(dst_dpath) + def test_dst_has_excess_nonempty_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_dpath = create_dir(str(dst_dir)) + create_file(dst_dpath) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert not os.path.lexists(dst_dpath) - def test_dst_has_excess_nonempty_recursive_dir(self): - dst_dpath = self.create_dir(self.dst_dir) - nested_dpath = self.create_dir(dst_dpath) - self.create_file(nested_dpath) + def test_dst_has_excess_nonempty_recursive_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_dpath = create_dir(str(dst_dir)) + nested_dpath = create_dir(dst_dpath) + create_file(nested_dpath) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert not os.path.lexists(dst_dpath) - def test_different_types_src_file_dst_dir(self): - src_fpath = self.create_file(self.src_dir) - dst_path = os.path.join(self.dst_dir, self.relpath(src_fpath)) + def test_different_types_src_file_dst_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) os.mkdir(dst_path) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_path) assert os.path.isfile(dst_path) - def test_different_types_src_file_dst_symlink(self): - src_fpath = self.create_file(self.src_dir) - dst_path = os.path.join(self.dst_dir, self.relpath(src_fpath)) + def test_different_types_src_file_dst_symlink(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) os.symlink('broken_link', dst_path) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_path) assert os.path.isfile(dst_path) - def test_different_types_src_symlink_dst_file(self): - dst_path = self.create_file(self.dst_dir) - src_lpath = os.path.join(self.src_dir, self.relpath(dst_path)) + def test_different_types_src_symlink_dst_file(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_path = create_file(str(dst_dir)) + src_lpath = os.path.join(str(src_dir), + relpath(dst_path, str(src_dir), str(dst_dir))) os.symlink('broken_link', src_lpath) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_path) assert os.path.islink(dst_path) - def test_different_types_src_symlink_dst_dir(self): - dst_path = self.create_dir(self.dst_dir) - src_lpath = os.path.join(self.src_dir, self.relpath(dst_path)) + def test_different_types_src_symlink_dst_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_path = create_dir(str(dst_dir)) + src_lpath = os.path.join(str(src_dir), + relpath(dst_path, str(src_dir), str(dst_dir))) os.symlink('broken_link', src_lpath) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_path) assert os.path.islink(dst_path) - def test_different_types_src_dir_dst_file(self): - src_dpath = self.create_dir(self.src_dir) - dst_path = os.path.join(self.dst_dir, self.relpath(src_dpath)) + def test_different_types_src_dir_dst_file(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_dpath = create_dir(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_dpath, str(src_dir), str(dst_dir))) with open(dst_path, "w") as f: f.write(string.printable) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_path) assert os.path.isdir(dst_path) - def test_different_types_src_dir_dst_symlink(self): - src_dpath = self.create_dir(self.src_dir) - dst_path = os.path.join(self.dst_dir, self.relpath(src_dpath)) + def test_different_types_src_dir_dst_symlink(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_dpath = create_dir(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_dpath, str(src_dir), str(dst_dir))) os.symlink('broken_link', dst_path) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_path) assert os.path.isdir(dst_path) - def test_src_is_socket(self): - src_spath = self.create_file(self.src_dir) - dst_spath = os.path.join(self.dst_dir, self.relpath(src_spath)) + def test_src_is_socket(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_spath = create_file(str(src_dir)) + dst_spath = os.path.join(str(dst_dir), + relpath(src_spath, str(src_dir), str(dst_dir))) os.unlink(src_spath) sock = socket.socket(socket.AF_UNIX) sock.bind(src_spath) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert not os.path.lexists(dst_spath) - def test_src_dst_same_inode(self): - src_fpath = self.create_file(self.src_dir) - dst_fpath = os.path.join(self.dst_dir, self.relpath(src_fpath)) + def test_src_dst_same_inode(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_fpath = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) os.link(src_fpath, dst_fpath) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_fpath) src_stat = os.lstat(src_fpath) dst_stat = os.lstat(dst_fpath) @@ -281,17 +306,467 @@ class TestRsync(CommonFSTestCase): assert dst_stat.st_nlink == 1 assert src_stat.st_ino != dst_stat.st_ino - def test_src_dst_diff_size(self): - src_fpath = self.create_file(self.src_dir) - dst_fpath = os.path.join(self.dst_dir, self.relpath(src_fpath)) + def test_src_dst_diff_size(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_fpath = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) with open(dst_fpath, "w") as df: df.write(string.printable * 2) - all(fs.rsync(self.src_dir, self.dst_dir)) + all(fs.rsync(str(src_dir), str(dst_dir))) assert os.path.lexists(dst_fpath) - self.check_identical_file(src_fpath, dst_fpath) + check_identical_file(src_fpath, dst_fpath) # TODO add tests for changing ownership - # TODO add tests for changing permissions # TODO add tests for changing times (?) - # TODO add tests for symlink behaviour + + +class TestCopyFile: + """Test suite for copy_file function.""" + + def test_copy_simple_file(self, tmp_path): + """Test copying a simple file""" + src_path = os.path.join(str(tmp_path), "source.txt") + dst_path = os.path.join(str(tmp_path), "dest.txt") + + content = b"Hello, World!" * 1000 + with open(src_path, "wb") as f: + f.write(content) + + fs.copy_file(src_path, dst_path) + + assert os.path.exists(dst_path) + with open(dst_path, "rb") as f: + assert f.read() == content + + def test_copy_large_file(self, tmp_path): + """Test copying a large file (> buffer size)""" + src_path = os.path.join(str(tmp_path), "large.bin") + dst_path = os.path.join(str(tmp_path), "large_copy.bin") + + content = b"x" * (200 * 1024) # 200KB + with open(src_path, "wb") as f: + f.write(content) + + fs.copy_file(src_path, dst_path) + + assert os.path.exists(dst_path) + assert os.path.getsize(dst_path) == len(content) + + def test_copy_preserves_permissions(self, tmp_path): + """Test that copy_file preserves file permissions""" + src_path = os.path.join(str(tmp_path), "executable.sh") + dst_path = os.path.join(str(tmp_path), "executable_copy.sh") + + with open(src_path, "w") as f: + f.write("#!/bin/bash\necho test") + os.chmod(src_path, 0o755) + + fs.copy_file(src_path, dst_path) + + src_stat = os.stat(src_path) + dst_stat = os.stat(dst_path) + assert src_stat.st_mode == dst_stat.st_mode + + +class TestCopyDirEntry: + """Test suite for copy_direntry function.""" + + def test_copy_file_entry(self, tmp_path): + """Test copying a file DirEntry""" + src_path = os.path.join(str(tmp_path), "source.txt") + dst_path = os.path.join(str(tmp_path), "dest.txt") + + with open(src_path, "w") as f: + f.write("test content") + os.chmod(src_path, 0o644) + + entry = fs.PseudoDirEntry(src_path) + fs.copy_direntry(entry, dst_path) + + assert os.path.isfile(dst_path) + with open(dst_path, "r") as f: + assert f.read() == "test content" + + def test_copy_directory_entry(self, tmp_path): + """Test copying a directory DirEntry""" + src_path = os.path.join(str(tmp_path), "srcdir") + dst_path = os.path.join(str(tmp_path), "dstdir") + + os.mkdir(src_path) + os.chmod(src_path, 0o755) + + entry = fs.PseudoDirEntry(src_path) + fs.copy_direntry(entry, dst_path) + + assert os.path.isdir(dst_path) + + def test_copy_symlink_entry(self, tmp_path): + """Test copying a symlink DirEntry using real os.DirEntry""" + target_path = os.path.join(str(tmp_path), "target.txt") + src_link = os.path.join(str(tmp_path), "source_link") + dst_link = os.path.join(str(tmp_path), "dest_link") + + with open(target_path, "w") as f: + f.write("target") + os.symlink(target_path, src_link) + + with os.scandir(str(tmp_path)) as it: + for entry in it: + if entry.name == "source_link": + fs.copy_direntry(entry, dst_link) + break + + assert os.path.islink(dst_link) + assert os.readlink(dst_link) == target_path + + +class TestRsyncBasic: + """Test suite for basic rsync functionality.""" + + @pytest.fixture + def rsync_dirs(self, tmp_path): + """Create source and destination directories for rsync tests.""" + src_dir = os.path.join(str(tmp_path), "source") + dst_dir = os.path.join(str(tmp_path), "dest") + os.mkdir(src_dir) + return src_dir, dst_dir + + def test_rsync_creates_destination(self, rsync_dirs): + """Test that rsync creates destination directory if missing""" + src_dir, dst_dir = rsync_dirs + assert not os.path.exists(dst_dir) + list(fs.rsync(src_dir, dst_dir)) + assert os.path.isdir(dst_dir) + + def test_rsync_copies_new_files(self, rsync_dirs): + """Test that rsync copies new files""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + with open(os.path.join(src_dir, "file1.txt"), "w") as f: + f.write("content1") + with open(os.path.join(src_dir, "file2.txt"), "w") as f: + f.write("content2") + + actions = list(fs.rsync(src_dir, dst_dir)) + + assert os.path.exists(os.path.join(dst_dir, "file1.txt")) + assert os.path.exists(os.path.join(dst_dir, "file2.txt")) + + create_actions = [a for a in actions if a[1] == fs.Actions.CREATE] + assert len(create_actions) == 2 + + def test_rsync_deletes_missing_files(self, rsync_dirs): + """Test that rsync deletes files not in source""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + dst_file = os.path.join(dst_dir, "old_file.txt") + with open(dst_file, "w") as f: + f.write("old content") + + actions = list(fs.rsync(src_dir, dst_dir)) + + assert not os.path.exists(dst_file) + + delete_actions = [a for a in actions if a[1] == fs.Actions.DELETE] + assert len(delete_actions) == 1 + + def test_rsync_updates_modified_files(self, rsync_dirs): + """Test that rsync updates modified files""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + src_file = os.path.join(src_dir, "file.txt") + dst_file = os.path.join(dst_dir, "file.txt") + + with open(src_file, "w") as f: + f.write("original") + with open(dst_file, "w") as f: + f.write("modified") + + import time + time.sleep(0.1) + with open(src_file, "w") as f: + f.write("updated content") + + actions = list(fs.rsync(src_dir, dst_dir)) + + with open(dst_file, "r") as f: + assert f.read() == "updated content" + + rewrite_actions = [a for a in actions if a[1] == fs.Actions.REWRITE] + assert len(rewrite_actions) > 0 + + def test_rsync_preserves_permissions(self, rsync_dirs): + """Test that rsync preserves file permissions""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + src_file = os.path.join(src_dir, "script.sh") + with open(src_file, "w") as f: + f.write("#!/bin/bash\n") + os.chmod(src_file, 0o755) + + list(fs.rsync(src_dir, dst_dir)) + + dst_file = os.path.join(dst_dir, "script.sh") + dst_stat = os.stat(dst_file) + src_stat = os.stat(src_file) + assert dst_stat.st_mode == src_stat.st_mode + + +class TestHardlinkDirBasic: + """Test suite for basic hardlink_dir functionality.""" + + @pytest.fixture + def hardlink_basic_dirs(self, tmp_path): + """Create source and destination directories for basic hardlink tests.""" + src_dir = os.path.join(str(tmp_path), "source") + dst_dir = os.path.join(str(tmp_path), "dest") + os.mkdir(src_dir) + return src_dir, dst_dir + + def test_hardlink_creates_destination(self, hardlink_basic_dirs): + """Test that hardlink_dir creates destination directory""" + src_dir, dst_dir = hardlink_basic_dirs + result = fs.hardlink_dir(src_dir, dst_dir) + assert result + assert os.path.isdir(dst_dir) + + def test_hardlink_links_files(self, hardlink_basic_dirs): + """Test that files are hardlinked, not copied""" + src_dir, dst_dir = hardlink_basic_dirs + src_file = os.path.join(src_dir, "file.txt") + with open(src_file, "w") as f: + f.write("test content") + + fs.hardlink_dir(src_dir, dst_dir) + + dst_file = os.path.join(dst_dir, "file.txt") + assert os.path.exists(dst_file) + + src_stat = os.stat(src_file) + dst_stat = os.stat(dst_file) + assert src_stat.st_ino == dst_stat.st_ino + + def test_hardlink_nested_directories(self, hardlink_basic_dirs): + """Test hardlinking nested directory structure""" + src_dir, dst_dir = hardlink_basic_dirs + subdir = os.path.join(src_dir, "subdir") + os.mkdir(subdir) + with open(os.path.join(subdir, "nested.txt"), "w") as f: + f.write("nested content") + + fs.hardlink_dir(src_dir, dst_dir) + + dst_nested = os.path.join(dst_dir, "subdir", "nested.txt") + assert os.path.exists(dst_nested) + + src_nested = os.path.join(subdir, "nested.txt") + src_stat = os.stat(src_nested) + dst_stat = os.stat(dst_nested) + assert src_stat.st_ino == dst_stat.st_ino + + +class TestScantree: + """Test suite for scantree function.""" + + def test_scantree_empty_directory(self, tmp_path): + """Test scanning empty directory""" + entries = list(fs.scantree(str(tmp_path))) + assert len(entries) == 0 + + def test_scantree_flat_directory(self, tmp_path): + """Test scanning flat directory structure""" + for i in range(3): + with open(os.path.join(str(tmp_path), f"file{i}.txt"), "w") as f: + f.write(f"content {i}") + + entries = list(fs.scantree(str(tmp_path))) + assert len(entries) == 3 + + def test_scantree_nested_directories(self, tmp_path): + """Test scanning nested directory structure""" + subdir1 = os.path.join(str(tmp_path), "dir1") + subdir2 = os.path.join(subdir1, "dir2") + os.makedirs(subdir2) + + with open(os.path.join(str(tmp_path), "root.txt"), "w") as f: + f.write("root") + with open(os.path.join(subdir1, "sub1.txt"), "w") as f: + f.write("sub1") + with open(os.path.join(subdir2, "sub2.txt"), "w") as f: + f.write("sub2") + + entries = list(fs.scantree(str(tmp_path), dir_first=True)) + assert len(entries) == 5 + + names = [os.path.basename(e.path) for e in entries] + dir1_idx = names.index("dir1") + sub1_idx = names.index("sub1.txt") + assert dir1_idx < sub1_idx + + +class TestRmDirentry: + """Test suite for rm_direntry function.""" + + def test_remove_file(self, tmp_path): + """Test removing a file""" + file_path = os.path.join(str(tmp_path), "test.txt") + with open(file_path, "w") as f: + f.write("test") + + entry = fs.PseudoDirEntry(file_path) + fs.rm_direntry(entry) + assert not os.path.exists(file_path) + + def test_remove_empty_directory(self, tmp_path): + """Test removing an empty directory""" + dir_path = os.path.join(str(tmp_path), "testdir") + os.mkdir(dir_path) + + entry = fs.PseudoDirEntry(dir_path) + fs.rm_direntry(entry) + assert not os.path.exists(dir_path) + + def test_remove_directory_with_contents(self, tmp_path): + """Test removing a directory with files""" + dir_path = os.path.join(str(tmp_path), "testdir") + os.mkdir(dir_path) + with open(os.path.join(dir_path, "file.txt"), "w") as f: + f.write("test") + + entry = fs.PseudoDirEntry(dir_path) + fs.rm_direntry(entry) + assert not os.path.exists(dir_path) + + def test_remove_symlink(self, tmp_path): + """Test removing a symlink using real os.DirEntry""" + target = os.path.join(str(tmp_path), "target.txt") + link = os.path.join(str(tmp_path), "link") + + with open(target, "w") as f: + f.write("target") + os.symlink(target, link) + + with os.scandir(str(tmp_path)) as it: + for entry in it: + if entry.name == "link": + fs.rm_direntry(entry) + break + + assert not os.path.exists(link) + assert not os.path.islink(link) + assert os.path.exists(target) + + +class TestPermissionErrors: + """Test permission error handling during backup operations.""" + + @pytest.fixture + def perm_dirs(self, tmp_path): + """Create source and destination directories for permission tests.""" + src_dir = os.path.join(str(tmp_path), "source") + dst_dir = os.path.join(str(tmp_path), "dest") + os.mkdir(src_dir) + yield src_dir, dst_dir + # Restore permissions before cleanup + for root, dirs, files in os.walk(str(tmp_path)): + for d in dirs: + try: + os.chmod(os.path.join(root, d), 0o755) + except: + pass + for f in files: + try: + os.chmod(os.path.join(root, f), 0o644) + except: + pass + + def test_rsync_handles_unreadable_file(self, perm_dirs): + """Test that rsync handles files it cannot read gracefully.""" + src_dir, dst_dir = perm_dirs + readable_file = os.path.join(src_dir, "readable.txt") + with open(readable_file, "w") as f: + f.write("can read this") + + unreadable_file = os.path.join(src_dir, "unreadable.txt") + with open(unreadable_file, "w") as f: + f.write("cannot read this") + os.chmod(unreadable_file, 0o000) + + try: + actions = list(fs.rsync(src_dir, dst_dir)) + + readable_dst = os.path.join(dst_dir, "readable.txt") + assert os.path.exists(readable_dst) + + error_actions = [a for a in actions if a[1] == fs.Actions.ERROR] + assert len(error_actions) > 0, \ + "Should have ERROR action for unreadable file" + + except PermissionError as e: + pytest.fail(f"rsync crashed on permission error: {e}. " + "Should handle gracefully and continue.") + + def test_copy_file_with_unreadable_source(self, perm_dirs): + """Test copy_file with unreadable source file.""" + src_dir, dst_dir = perm_dirs + src_file = os.path.join(src_dir, "unreadable.txt") + dst_file = os.path.join(dst_dir, "copy.txt") + + os.mkdir(dst_dir) + with open(src_file, "w") as f: + f.write("test") + os.chmod(src_file, 0o000) + + with pytest.raises(PermissionError): + fs.copy_file(src_file, dst_file) + + @pytest.mark.skip(reason="Fails until issue #1 is fixed") + def test_update_direntry_handles_permission_error(self, perm_dirs): + """Test that update_direntry handles permission errors gracefully.""" + src_dir, dst_dir = perm_dirs + src_file = os.path.join(src_dir, "file.txt") + dst_file = os.path.join(dst_dir, "file.txt") + + os.mkdir(dst_dir) + with open(src_file, "w") as f: + f.write("source content") + with open(dst_file, "w") as f: + f.write("dest content") + + os.chmod(src_file, 0o000) + + src_entry = fs.PseudoDirEntry(src_file) + dst_entry = fs.PseudoDirEntry(dst_file) + + try: + fs.update_direntry(src_entry, dst_entry) + except PermissionError: + pytest.fail("update_direntry crashed with PermissionError. " + "Should handle gracefully and log error.") + + @pytest.mark.skip(reason="Fails until issue #1 is fixed") + def test_nest_hardlink_handles_permission_error(self, perm_dirs): + """Test that nest_hardlink handles permission errors gracefully.""" + src_dir, dst_dir = perm_dirs + subdir = os.path.join(src_dir, "subdir") + os.mkdir(subdir) + src_file = os.path.join(subdir, "file.txt") + with open(src_file, "w") as f: + f.write("test") + + delta_dir = os.path.join(dst_dir, ".backup_delta") + os.mkdir(dst_dir) + os.mkdir(delta_dir) + os.chmod(delta_dir, 0o555) + + try: + fs.nest_hardlink(src_dir, "subdir/file.txt", delta_dir) + except PermissionError: + pytest.fail("nest_hardlink crashed with PermissionError. " + "Should handle gracefully and log error.") diff --git a/tests/test_fs_extended.py b/tests/test_fs_extended.py deleted file mode 100644 index 7a81e3c..0000000 --- a/tests/test_fs_extended.py +++ /dev/null @@ -1,429 +0,0 @@ -""" -Extended tests for filesystem operations in fs.py module. -Tests critical functionality like copy_file, rsync, and hardlink operations. -""" -import os -import tempfile -from unittest import TestCase - -from curateipsum import fs - - -class TestCopyFile(TestCase): - """Test suite for copy_file function.""" - - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_copy_") - - def tearDown(self) -> None: - self.temp_dir.cleanup() - - def test_copy_simple_file(self): - """Test copying a simple file""" - src_path = os.path.join(self.temp_dir.name, "source.txt") - dst_path = os.path.join(self.temp_dir.name, "dest.txt") - - # Create source file - content = b"Hello, World!" * 1000 - with open(src_path, "wb") as f: - f.write(content) - - # Copy file - fs.copy_file(src_path, dst_path) - - # Verify destination exists and has same content - self.assertTrue(os.path.exists(dst_path)) - with open(dst_path, "rb") as f: - self.assertEqual(f.read(), content) - - def test_copy_large_file(self): - """Test copying a large file (> buffer size)""" - src_path = os.path.join(self.temp_dir.name, "large.bin") - dst_path = os.path.join(self.temp_dir.name, "large_copy.bin") - - # Create file larger than buffer (128KB) - content = b"x" * (200 * 1024) # 200KB - with open(src_path, "wb") as f: - f.write(content) - - # Copy file - fs.copy_file(src_path, dst_path) - - # Verify - self.assertTrue(os.path.exists(dst_path)) - self.assertEqual(os.path.getsize(dst_path), len(content)) - - def test_copy_preserves_permissions(self): - """Test that copy_file preserves file permissions""" - src_path = os.path.join(self.temp_dir.name, "executable.sh") - dst_path = os.path.join(self.temp_dir.name, "executable_copy.sh") - - # Create source file with specific permissions - with open(src_path, "w") as f: - f.write("#!/bin/bash\necho test") - os.chmod(src_path, 0o755) - - # Copy file - fs.copy_file(src_path, dst_path) - - # Verify permissions - src_stat = os.stat(src_path) - dst_stat = os.stat(dst_path) - self.assertEqual(src_stat.st_mode, dst_stat.st_mode) - - -class TestCopyDirEntry(TestCase): - """Test suite for copy_direntry function.""" - - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_copydir_") - - def tearDown(self) -> None: - self.temp_dir.cleanup() - - def test_copy_file_entry(self): - """Test copying a file DirEntry""" - src_path = os.path.join(self.temp_dir.name, "source.txt") - dst_path = os.path.join(self.temp_dir.name, "dest.txt") - - # Create source file - with open(src_path, "w") as f: - f.write("test content") - os.chmod(src_path, 0o644) - - # Create DirEntry and copy - entry = fs.PseudoDirEntry(src_path) - fs.copy_direntry(entry, dst_path) - - # Verify - self.assertTrue(os.path.isfile(dst_path)) - with open(dst_path, "r") as f: - self.assertEqual(f.read(), "test content") - - def test_copy_directory_entry(self): - """Test copying a directory DirEntry""" - src_path = os.path.join(self.temp_dir.name, "srcdir") - dst_path = os.path.join(self.temp_dir.name, "dstdir") - - # Create source directory - os.mkdir(src_path) - os.chmod(src_path, 0o755) - - # Create DirEntry and copy - entry = fs.PseudoDirEntry(src_path) - fs.copy_direntry(entry, dst_path) - - # Verify - self.assertTrue(os.path.isdir(dst_path)) - - def test_copy_symlink_entry(self): - """Test copying a symlink DirEntry using real os.DirEntry""" - target_path = os.path.join(self.temp_dir.name, "target.txt") - src_link = os.path.join(self.temp_dir.name, "source_link") - dst_link = os.path.join(self.temp_dir.name, "dest_link") - - # Create target and symlink - with open(target_path, "w") as f: - f.write("target") - os.symlink(target_path, src_link) - - # Use os.DirEntry from scandir for proper symlink detection - # PseudoDirEntry doesn't properly detect symlinks - with os.scandir(self.temp_dir.name) as it: - for entry in it: - if entry.name == "source_link": - fs.copy_direntry(entry, dst_link) - break - - # Verify symlink was copied - self.assertTrue(os.path.islink(dst_link)) - self.assertEqual(os.readlink(dst_link), target_path) - - -class TestRsync(TestCase): - """Test suite for rsync function.""" - - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_rsync_") - self.src_dir = os.path.join(self.temp_dir.name, "source") - self.dst_dir = os.path.join(self.temp_dir.name, "dest") - os.mkdir(self.src_dir) - - def tearDown(self) -> None: - self.temp_dir.cleanup() - - def test_rsync_creates_destination(self): - """Test that rsync creates destination directory if missing""" - # Destination doesn't exist yet - self.assertFalse(os.path.exists(self.dst_dir)) - - # Run rsync - list(fs.rsync(self.src_dir, self.dst_dir)) - - # Destination should now exist - self.assertTrue(os.path.isdir(self.dst_dir)) - - def test_rsync_copies_new_files(self): - """Test that rsync copies new files""" - os.mkdir(self.dst_dir) - - # Create files in source - with open(os.path.join(self.src_dir, "file1.txt"), "w") as f: - f.write("content1") - with open(os.path.join(self.src_dir, "file2.txt"), "w") as f: - f.write("content2") - - # Run rsync - actions = list(fs.rsync(self.src_dir, self.dst_dir)) - - # Verify files were created - self.assertTrue(os.path.exists(os.path.join(self.dst_dir, "file1.txt"))) - self.assertTrue(os.path.exists(os.path.join(self.dst_dir, "file2.txt"))) - - # Check that CREATE actions were reported - create_actions = [a for a in actions if a[1] == fs.Actions.CREATE] - self.assertEqual(len(create_actions), 2) - - def test_rsync_deletes_missing_files(self): - """Test that rsync deletes files not in source""" - os.mkdir(self.dst_dir) - - # Create file only in destination - dst_file = os.path.join(self.dst_dir, "old_file.txt") - with open(dst_file, "w") as f: - f.write("old content") - - # Run rsync - actions = list(fs.rsync(self.src_dir, self.dst_dir)) - - # Verify file was deleted - self.assertFalse(os.path.exists(dst_file)) - - # Check that DELETE action was reported - delete_actions = [a for a in actions if a[1] == fs.Actions.DELETE] - self.assertEqual(len(delete_actions), 1) - - def test_rsync_updates_modified_files(self): - """Test that rsync updates modified files""" - os.mkdir(self.dst_dir) - - # Create file in both locations - src_file = os.path.join(self.src_dir, "file.txt") - dst_file = os.path.join(self.dst_dir, "file.txt") - - with open(src_file, "w") as f: - f.write("original") - with open(dst_file, "w") as f: - f.write("modified") - - # Make source newer - import time - time.sleep(0.1) - with open(src_file, "w") as f: - f.write("updated content") - - # Run rsync - actions = list(fs.rsync(self.src_dir, self.dst_dir)) - - # Verify file was updated - with open(dst_file, "r") as f: - self.assertEqual(f.read(), "updated content") - - # Check that REWRITE action was reported - rewrite_actions = [a for a in actions if a[1] == fs.Actions.REWRITE] - self.assertGreater(len(rewrite_actions), 0) - - def test_rsync_preserves_permissions(self): - """Test that rsync preserves file permissions""" - os.mkdir(self.dst_dir) - - # Create executable file in source - src_file = os.path.join(self.src_dir, "script.sh") - with open(src_file, "w") as f: - f.write("#!/bin/bash\n") - os.chmod(src_file, 0o755) - - # Run rsync - list(fs.rsync(self.src_dir, self.dst_dir)) - - # Verify permissions were preserved - dst_file = os.path.join(self.dst_dir, "script.sh") - dst_stat = os.stat(dst_file) - src_stat = os.stat(src_file) - self.assertEqual(dst_stat.st_mode, src_stat.st_mode) - - -class TestHardlinkDir(TestCase): - """Test suite for hardlink_dir function.""" - - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_hardlink_") - self.src_dir = os.path.join(self.temp_dir.name, "source") - self.dst_dir = os.path.join(self.temp_dir.name, "dest") - os.mkdir(self.src_dir) - - def tearDown(self) -> None: - self.temp_dir.cleanup() - - def test_hardlink_creates_destination(self): - """Test that hardlink_dir creates destination directory""" - result = fs.hardlink_dir(self.src_dir, self.dst_dir) - - self.assertTrue(result) - self.assertTrue(os.path.isdir(self.dst_dir)) - - def test_hardlink_links_files(self): - """Test that files are hardlinked, not copied""" - # Create file in source - src_file = os.path.join(self.src_dir, "file.txt") - with open(src_file, "w") as f: - f.write("test content") - - # Hardlink directory - fs.hardlink_dir(self.src_dir, self.dst_dir) - - # Verify file exists in destination - dst_file = os.path.join(self.dst_dir, "file.txt") - self.assertTrue(os.path.exists(dst_file)) - - # Verify they're the same inode (hardlinked) - src_stat = os.stat(src_file) - dst_stat = os.stat(dst_file) - self.assertEqual(src_stat.st_ino, dst_stat.st_ino) - - def test_hardlink_nested_directories(self): - """Test hardlinking nested directory structure""" - # Create nested structure - subdir = os.path.join(self.src_dir, "subdir") - os.mkdir(subdir) - with open(os.path.join(subdir, "nested.txt"), "w") as f: - f.write("nested content") - - # Hardlink directory - fs.hardlink_dir(self.src_dir, self.dst_dir) - - # Verify nested structure exists - dst_nested = os.path.join(self.dst_dir, "subdir", "nested.txt") - self.assertTrue(os.path.exists(dst_nested)) - - # Verify hardlink - src_nested = os.path.join(subdir, "nested.txt") - src_stat = os.stat(src_nested) - dst_stat = os.stat(dst_nested) - self.assertEqual(src_stat.st_ino, dst_stat.st_ino) - - -class TestScantree(TestCase): - """Test suite for scantree function.""" - - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_scantree_") - - def tearDown(self) -> None: - self.temp_dir.cleanup() - - def test_scantree_empty_directory(self): - """Test scanning empty directory""" - entries = list(fs.scantree(self.temp_dir.name)) - self.assertEqual(len(entries), 0) - - def test_scantree_flat_directory(self): - """Test scanning flat directory structure""" - # Create some files - for i in range(3): - with open(os.path.join(self.temp_dir.name, f"file{i}.txt"), "w") as f: - f.write(f"content {i}") - - entries = list(fs.scantree(self.temp_dir.name)) - self.assertEqual(len(entries), 3) - - def test_scantree_nested_directories(self): - """Test scanning nested directory structure""" - # Create nested structure - subdir1 = os.path.join(self.temp_dir.name, "dir1") - subdir2 = os.path.join(subdir1, "dir2") - os.makedirs(subdir2) - - with open(os.path.join(self.temp_dir.name, "root.txt"), "w") as f: - f.write("root") - with open(os.path.join(subdir1, "sub1.txt"), "w") as f: - f.write("sub1") - with open(os.path.join(subdir2, "sub2.txt"), "w") as f: - f.write("sub2") - - # Scan with dir_first=True - entries = list(fs.scantree(self.temp_dir.name, dir_first=True)) - - # Should find: root.txt, dir1, sub1.txt, dir2, sub2.txt - self.assertEqual(len(entries), 5) - - # Verify directories come before their contents - names = [os.path.basename(e.path) for e in entries] - dir1_idx = names.index("dir1") - sub1_idx = names.index("sub1.txt") - self.assertLess(dir1_idx, sub1_idx) - - -class TestRmDirentry(TestCase): - """Test suite for rm_direntry function.""" - - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_rm_") - - def tearDown(self) -> None: - self.temp_dir.cleanup() - - def test_remove_file(self): - """Test removing a file""" - file_path = os.path.join(self.temp_dir.name, "test.txt") - with open(file_path, "w") as f: - f.write("test") - - entry = fs.PseudoDirEntry(file_path) - fs.rm_direntry(entry) - - self.assertFalse(os.path.exists(file_path)) - - def test_remove_empty_directory(self): - """Test removing an empty directory""" - dir_path = os.path.join(self.temp_dir.name, "testdir") - os.mkdir(dir_path) - - entry = fs.PseudoDirEntry(dir_path) - fs.rm_direntry(entry) - - self.assertFalse(os.path.exists(dir_path)) - - def test_remove_directory_with_contents(self): - """Test removing a directory with files""" - dir_path = os.path.join(self.temp_dir.name, "testdir") - os.mkdir(dir_path) - with open(os.path.join(dir_path, "file.txt"), "w") as f: - f.write("test") - - entry = fs.PseudoDirEntry(dir_path) - fs.rm_direntry(entry) - - self.assertFalse(os.path.exists(dir_path)) - - def test_remove_symlink(self): - """Test removing a symlink using real os.DirEntry""" - target = os.path.join(self.temp_dir.name, "target.txt") - link = os.path.join(self.temp_dir.name, "link") - - with open(target, "w") as f: - f.write("target") - os.symlink(target, link) - - # Use os.DirEntry from scandir for proper symlink detection - # PseudoDirEntry doesn't properly detect symlinks - with os.scandir(self.temp_dir.name) as it: - for entry in it: - if entry.name == "link": - fs.rm_direntry(entry) - break - - # Link should be removed, target should remain - self.assertFalse(os.path.exists(link)) - self.assertFalse(os.path.islink(link)) - self.assertTrue(os.path.exists(target)) diff --git a/tests/test_integration.py b/tests/test_integration.py index 5961b01..ffec5c1 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -3,354 +3,374 @@ Integration tests for the full backup workflow. Tests the complete backup process from start to finish. """ import os -import tempfile import time -from unittest import TestCase +import pytest from curateipsum import backup as bk -class TestFullBackupWorkflow(TestCase): - """Integration tests for complete backup workflow.""" +@pytest.fixture +def integration_dirs(tmp_path): + """Setup integration test directories.""" + backups_dir = tmp_path / "backups" + source_dir = tmp_path / "source" + backups_dir.mkdir() + source_dir.mkdir() + return backups_dir, source_dir - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_integration_") - self.backups_dir = os.path.join(self.temp_dir.name, "backups") - self.source_dir = os.path.join(self.temp_dir.name, "source") - os.makedirs(self.backups_dir) - os.makedirs(self.source_dir) - def tearDown(self) -> None: - self.temp_dir.cleanup() +def test_initial_backup_creation(integration_dirs): + """Test creating the first backup""" + backups_dir, source_dir = integration_dirs - def test_initial_backup_creation(self): - """Test creating the first backup""" - # Create some files in source - with open(os.path.join(self.source_dir, "file1.txt"), "w") as f: - f.write("content1") - with open(os.path.join(self.source_dir, "file2.txt"), "w") as f: - f.write("content2") + # Create some files in source + (source_dir / "file1.txt").write_text("content1") + (source_dir / "file2.txt").write_text("content2") - # Run backup - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) + # Run backup + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) - # Verify backup was created - backups = os.listdir(self.backups_dir) - # Filter out lock files - backups = [b for b in backups if not b.startswith(".")] - self.assertEqual(len(backups), 1) + # Verify backup was created + backups = os.listdir(str(backups_dir)) + # Filter out lock files + backups = [b for b in backups if not b.startswith(".")] + assert len(backups) == 1 - # Verify files exist in backup - backup_path = os.path.join(self.backups_dir, backups[0]) - source_name = os.path.basename(self.source_dir) - backup_file1 = os.path.join(backup_path, source_name, "file1.txt") - backup_file2 = os.path.join(backup_path, source_name, "file2.txt") + # Verify files exist in backup + backup_path = os.path.join(str(backups_dir), backups[0]) + source_name = os.path.basename(str(source_dir)) + backup_file1 = os.path.join(backup_path, source_name, "file1.txt") + backup_file2 = os.path.join(backup_path, source_name, "file2.txt") - self.assertTrue(os.path.exists(backup_file1)) - self.assertTrue(os.path.exists(backup_file2)) + assert os.path.exists(backup_file1) + assert os.path.exists(backup_file2) - # Verify backup marker exists - marker_files = [f for f in os.listdir(backup_path) if f.startswith(".backup_finished")] - self.assertEqual(len(marker_files), 1) + # Verify backup marker exists + marker_files = [f for f in os.listdir(backup_path) + if f.startswith(".backup_finished")] + assert len(marker_files) == 1 - def test_incremental_backup_with_hardlinks(self): - """Test that second backup uses hardlinks for unchanged files""" - # Create initial file - src_file = os.path.join(self.source_dir, "unchanged.txt") - with open(src_file, "w") as f: - f.write("unchanged content") - # First backup - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) +def test_incremental_backup_with_hardlinks(integration_dirs): + """Test that second backup uses hardlinks for unchanged files""" + backups_dir, source_dir = integration_dirs - # Wait a bit to ensure different timestamp - time.sleep(1.1) + # Create initial file + src_file = source_dir / "unchanged.txt" + src_file.write_text("unchanged content") - # Second backup (no changes) - # Add a new file to trigger a new backup - with open(os.path.join(self.source_dir, "new.txt"), "w") as f: - f.write("new content") + # First backup + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + # Wait a bit to ensure different timestamp + time.sleep(1.1) + + # Second backup (no changes) + # Add a new file to trigger a new backup + (source_dir / "new.txt").write_text("new content") + + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + # Verify two backups exist + backups = sorted([b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")]) + assert len(backups) == 2 + + # Verify unchanged file is hardlinked + source_name = os.path.basename(str(source_dir)) + file1_path = os.path.join(str(backups_dir), backups[0], + source_name, "unchanged.txt") + file2_path = os.path.join(str(backups_dir), backups[1], + source_name, "unchanged.txt") + + stat1 = os.stat(file1_path) + stat2 = os.stat(file2_path) + + # Same inode means hardlinked + assert stat1.st_ino == stat2.st_ino + # Link count should be 2 + assert stat1.st_nlink == 2 + + +def test_backup_delta_directory(integration_dirs): + """Test that delta directory contains changed files""" + backups_dir, source_dir = integration_dirs + + # Create initial file + (source_dir / "file.txt").write_text("original") + + # First backup + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + time.sleep(1.1) + + # Modify file + (source_dir / "file.txt").write_text("modified") + + # Second backup + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + # Check delta directory in second backup + backups = sorted([b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")]) + second_backup = backups[1] + delta_dir = os.path.join(str(backups_dir), second_backup, bk.DELTA_DIR) + + # Delta directory should exist and contain the modified file + assert os.path.isdir(delta_dir) + + source_name = os.path.basename(str(source_dir)) + delta_file = os.path.join(delta_dir, source_name, "file.txt") + assert os.path.exists(delta_file) + + +def test_cleanup_retains_recent_backups(integration_dirs): + """Test that cleanup doesn't remove recent backups""" + backups_dir, source_dir = integration_dirs + + # Create multiple backups + for i in range(3): + (source_dir / f"file{i}.txt").write_text(f"content {i}") bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - # Verify two backups exist - backups = sorted([b for b in os.listdir(self.backups_dir) if not b.startswith(".")]) - self.assertEqual(len(backups), 2) - - # Verify unchanged file is hardlinked - source_name = os.path.basename(self.source_dir) - file1_path = os.path.join(self.backups_dir, backups[0], source_name, "unchanged.txt") - file2_path = os.path.join(self.backups_dir, backups[1], source_name, "unchanged.txt") - - stat1 = os.stat(file1_path) - stat2 = os.stat(file2_path) - - # Same inode means hardlinked - self.assertEqual(stat1.st_ino, stat2.st_ino) - # Link count should be 2 - self.assertEqual(stat1.st_nlink, 2) - - def test_backup_delta_directory(self): - """Test that delta directory contains changed files""" - # Create initial file - with open(os.path.join(self.source_dir, "file.txt"), "w") as f: - f.write("original") - - # First backup - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - time.sleep(1.1) - - # Modify file - with open(os.path.join(self.source_dir, "file.txt"), "w") as f: - f.write("modified") - - # Second backup - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - # Check delta directory in second backup - backups = sorted([b for b in os.listdir(self.backups_dir) if not b.startswith(".")]) - second_backup = backups[1] - delta_dir = os.path.join(self.backups_dir, second_backup, bk.DELTA_DIR) - - # Delta directory should exist and contain the modified file - self.assertTrue(os.path.isdir(delta_dir)) - - source_name = os.path.basename(self.source_dir) - delta_file = os.path.join(delta_dir, source_name, "file.txt") - self.assertTrue(os.path.exists(delta_file)) - - def test_cleanup_retains_recent_backups(self): - """Test that cleanup doesn't remove recent backups""" - # Create multiple backups - for i in range(3): - with open(os.path.join(self.source_dir, f"file{i}.txt"), "w") as f: - f.write(f"content {i}") - - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - time.sleep(1.1) - - # Run cleanup with keep_all=10 (all should be kept) - bk.cleanup_old_backups( - backups_dir=self.backups_dir, - dry_run=False, - keep_all=10 - ) - - # All backups should still exist - backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")] - self.assertEqual(len(backups), 3) - - def test_dry_run_creates_no_backup(self): - """Test that dry run doesn't create actual backup""" - with open(os.path.join(self.source_dir, "file.txt"), "w") as f: - f.write("content") - - # Dry run backup - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=True - ) - - # No backup should be created - backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")] - self.assertEqual(len(backups), 0) - - def test_no_backup_if_no_changes(self): - """Test that no backup is created if nothing changed""" - # Create initial file - with open(os.path.join(self.source_dir, "file.txt"), "w") as f: - f.write("content") - - # First backup - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - time.sleep(1.1) - - # Second backup with no changes - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - # Only one backup should exist - backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")] - self.assertEqual(len(backups), 1) - - def test_lock_prevents_concurrent_backups(self): - """Test that lock file prevents concurrent backup runs""" - with open(os.path.join(self.source_dir, "file.txt"), "w") as f: - f.write("content") - - # Manually create lock file - lock_acquired = bk.set_backups_lock(self.backups_dir) - self.assertTrue(lock_acquired) - - try: - # Try to run backup (should be blocked by lock) - # We can't actually test this without spawning a subprocess, - # but we can verify the lock exists - lock_path = os.path.join(self.backups_dir, bk.LOCK_FILE) - self.assertTrue(os.path.exists(lock_path)) - finally: - bk.release_backups_lock(self.backups_dir) - - # After releasing lock, backup should work - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")] - self.assertEqual(len(backups), 1) - - def test_multiple_source_directories(self): - """Test backing up multiple source directories""" - # Create second source - source2_dir = os.path.join(self.temp_dir.name, "source2") - os.makedirs(source2_dir) - - # Create files in both sources - with open(os.path.join(self.source_dir, "file1.txt"), "w") as f: - f.write("source1") - with open(os.path.join(source2_dir, "file2.txt"), "w") as f: - f.write("source2") - - # Backup both sources - bk.initiate_backup( - sources=[self.source_dir, source2_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - # Verify both sources are in backup - backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")] - self.assertEqual(len(backups), 1) - - backup_path = os.path.join(self.backups_dir, backups[0]) - source1_name = os.path.basename(self.source_dir) - source2_name = os.path.basename(source2_dir) - - self.assertTrue(os.path.exists(os.path.join(backup_path, source1_name, "file1.txt"))) - self.assertTrue(os.path.exists(os.path.join(backup_path, source2_name, "file2.txt"))) - - -class TestBackupRecovery(TestCase): - """Integration tests for backup recovery scenarios.""" - - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory(prefix="test_recovery_") - self.backups_dir = os.path.join(self.temp_dir.name, "backups") - self.source_dir = os.path.join(self.temp_dir.name, "source") - self.restore_dir = os.path.join(self.temp_dir.name, "restore") - os.makedirs(self.backups_dir) - os.makedirs(self.source_dir) - - def tearDown(self) -> None: - self.temp_dir.cleanup() - - def test_restore_from_backup(self): - """Test restoring files from a backup""" - # Create source files - src_file = os.path.join(self.source_dir, "important.txt") - with open(src_file, "w") as f: - f.write("important data") - - # Create backup - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) - - # Simulate data loss - delete source - os.unlink(src_file) - self.assertFalse(os.path.exists(src_file)) - - # Restore from backup - backups = [b for b in os.listdir(self.backups_dir) if not b.startswith(".")] - backup_path = os.path.join(self.backups_dir, backups[0]) - source_name = os.path.basename(self.source_dir) - backed_up_file = os.path.join(backup_path, source_name, "important.txt") - - # Verify file exists in backup - self.assertTrue(os.path.exists(backed_up_file)) - - # Restore file - os.makedirs(self.restore_dir) - import shutil - shutil.copy2(backed_up_file, os.path.join(self.restore_dir, "important.txt")) - - # Verify restored content - with open(os.path.join(self.restore_dir, "important.txt"), "r") as f: - self.assertEqual(f.read(), "important data") - - def test_find_file_version_in_old_backup(self): - """Test finding an old version of a file""" - src_file = os.path.join(self.source_dir, "document.txt") - - # Create version 1 - with open(src_file, "w") as f: - f.write("version 1") - - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, + sources=[str(source_dir)], + backups_dir=str(backups_dir), dry_run=False ) time.sleep(1.1) - # Create version 2 - with open(src_file, "w") as f: - f.write("version 2") + # Run cleanup with keep_all=10 (all should be kept) + bk.cleanup_old_backups( + backups_dir=str(backups_dir), + dry_run=False, + keep_all=10 + ) - bk.initiate_backup( - sources=[self.source_dir], - backups_dir=self.backups_dir, - dry_run=False - ) + # All backups should still exist + backups = [b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")] + assert len(backups) == 3 - # Verify we can access both versions - backups = sorted([b for b in os.listdir(self.backups_dir) if not b.startswith(".")]) - source_name = os.path.basename(self.source_dir) - # First backup has version 1 - backup1_file = os.path.join(self.backups_dir, backups[0], source_name, "document.txt") - with open(backup1_file, "r") as f: - self.assertEqual(f.read(), "version 1") +def test_dry_run_creates_no_backup(integration_dirs): + """Test that dry run doesn't create actual backup""" + backups_dir, source_dir = integration_dirs - # Second backup has version 2 - backup2_file = os.path.join(self.backups_dir, backups[1], source_name, "document.txt") - with open(backup2_file, "r") as f: - self.assertEqual(f.read(), "version 2") + (source_dir / "file.txt").write_text("content") + + # Dry run backup + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=True + ) + + # No backup should be created + backups = [b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")] + assert len(backups) == 0 + + +def test_no_backup_if_no_changes(integration_dirs): + """Test that no backup is created if nothing changed""" + backups_dir, source_dir = integration_dirs + + # Create initial file + (source_dir / "file.txt").write_text("content") + + # First backup + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + time.sleep(1.1) + + # Second backup with no changes + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + # Only one backup should exist + backups = [b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")] + assert len(backups) == 1 + + +def test_lock_prevents_concurrent_backups(integration_dirs): + """Test that lock file prevents concurrent backup runs""" + backups_dir, source_dir = integration_dirs + + (source_dir / "file.txt").write_text("content") + + # Manually create lock file + lock_acquired = bk.set_backups_lock(str(backups_dir)) + assert lock_acquired + + try: + # Try to run backup (should be blocked by lock) + # We can't actually test this without spawning a subprocess, + # but we can verify the lock exists + lock_path = os.path.join(str(backups_dir), bk.LOCK_FILE) + assert os.path.exists(lock_path) + finally: + bk.release_backups_lock(str(backups_dir)) + + # After releasing lock, backup should work + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + backups = [b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")] + assert len(backups) == 1 + + +def test_multiple_source_directories(integration_dirs, tmp_path): + """Test backing up multiple source directories""" + backups_dir, source_dir = integration_dirs + + # Create second source + source2_dir = tmp_path / "source2" + source2_dir.mkdir() + + # Create files in both sources + (source_dir / "file1.txt").write_text("source1") + (source2_dir / "file2.txt").write_text("source2") + + # Backup both sources + bk.initiate_backup( + sources=[str(source_dir), str(source2_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + # Verify both sources are in backup + backups = [b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")] + assert len(backups) == 1 + + backup_path = os.path.join(str(backups_dir), backups[0]) + source1_name = os.path.basename(str(source_dir)) + source2_name = os.path.basename(str(source2_dir)) + + assert os.path.exists(os.path.join(backup_path, source1_name, + "file1.txt")) + assert os.path.exists(os.path.join(backup_path, source2_name, + "file2.txt")) + + +@pytest.fixture +def recovery_dirs(tmp_path): + """Setup recovery test directories.""" + backups_dir = tmp_path / "backups" + source_dir = tmp_path / "source" + restore_dir = tmp_path / "restore" + backups_dir.mkdir() + source_dir.mkdir() + return backups_dir, source_dir, restore_dir + + +def test_restore_from_backup(recovery_dirs): + """Test restoring files from a backup""" + backups_dir, source_dir, restore_dir = recovery_dirs + + # Create source files + src_file = source_dir / "important.txt" + src_file.write_text("important data") + + # Create backup + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + # Simulate data loss - delete source + os.unlink(str(src_file)) + assert not os.path.exists(str(src_file)) + + # Restore from backup + backups = [b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")] + backup_path = os.path.join(str(backups_dir), backups[0]) + source_name = os.path.basename(str(source_dir)) + backed_up_file = os.path.join(backup_path, source_name, "important.txt") + + # Verify file exists in backup + assert os.path.exists(backed_up_file) + + # Restore file + restore_dir.mkdir() + import shutil + shutil.copy2(backed_up_file, str(restore_dir / "important.txt")) + + # Verify restored content + assert (restore_dir / "important.txt").read_text() == "important data" + + +def test_find_file_version_in_old_backup(recovery_dirs): + """Test finding an old version of a file""" + backups_dir, source_dir, _ = recovery_dirs + src_file = source_dir / "document.txt" + + # Create version 1 + src_file.write_text("version 1") + + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + time.sleep(1.1) + + # Create version 2 + src_file.write_text("version 2") + + bk.initiate_backup( + sources=[str(source_dir)], + backups_dir=str(backups_dir), + dry_run=False + ) + + # Verify we can access both versions + backups = sorted([b for b in os.listdir(str(backups_dir)) + if not b.startswith(".")]) + source_name = os.path.basename(str(source_dir)) + + # First backup has version 1 + backup1_file = os.path.join(str(backups_dir), backups[0], + source_name, "document.txt") + with open(backup1_file, "r") as f: + assert f.read() == "version 1" + + # Second backup has version 2 + backup2_file = os.path.join(str(backups_dir), backups[1], + source_name, "document.txt") + with open(backup2_file, "r") as f: + assert f.read() == "version 2"