From fb9dcbd39295318ccf27ba82a2585ca8f1724999 Mon Sep 17 00:00:00 2001 From: Maks Snegov Date: Wed, 4 Feb 2026 20:23:51 -0800 Subject: [PATCH] Refactor: split rsync tests into separate module Move all rsync-related tests (44 tests) from test_fs.py to new test_rsync.py module for better organization: - TestRsync (14 tests) - TestRsyncBasic (5 tests) - TestParseRsyncOutput (17 tests) - TestRsyncExt (8 tests) Extract shared helper functions to conftest.py to eliminate duplication: - create_file(), create_dir(), relpath() - check_identical_file() - common_fs_dirs fixture This improves test organization and maintainability by grouping related tests and removing code duplication between test modules. --- tests/conftest.py | 54 +++++ tests/test_fs.py | 390 +----------------------------- tests/test_rsync.py | 561 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 616 insertions(+), 389 deletions(-) create mode 100644 tests/test_rsync.py diff --git a/tests/conftest.py b/tests/conftest.py index e7ecd2a..77d24a3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,60 @@ import pytest from curateipsum import backup as bk, fs +# Shared helper functions for fs tests +def create_file(parent_dir: str, prefix: str = None) -> str: + """ + Create a file with random name in parent_dir. + Returns absolute path to the created file. + """ + fd, path = tempfile.mkstemp(prefix=prefix, dir=parent_dir) + with open(fd, "w") as f: + f.write(string.printable) + return path + + +def create_dir(parent_dir: str, prefix: str = None) -> str: + """ + Create a directory with random name in parent_dir. + Returns absolute path to created directory. + """ + return tempfile.mkdtemp(prefix=prefix, dir=parent_dir) + + +def relpath(full_path: str, src_dir: str, dst_dir: str) -> str: + """Get a relative path for entity in src/dst dirs.""" + if full_path.startswith(src_dir): + p_dir = src_dir + elif full_path.startswith(dst_dir): + p_dir = dst_dir + else: + raise RuntimeError(f"Path {full_path} is not src_dir nor dst_dir") + + return full_path[len(p_dir) + 1:] + + +def check_identical_file(f1_path: str, f2_path: str): + """Check that files are identical. Fails test, if not.""" + st1 = os.lstat(f1_path) + st2 = os.lstat(f2_path) + + assert st1.st_uid == st2.st_uid + assert st1.st_gid == st2.st_gid + assert st1.st_mode == st2.st_mode + assert st1.st_mtime == st2.st_mtime + assert st1.st_size == st2.st_size + + +@pytest.fixture +def common_fs_dirs(tmp_path): + """Create source and destination directories for tests.""" + src_dir = tmp_path / "source" + dst_dir = tmp_path / "dest" + src_dir.mkdir() + dst_dir.mkdir() + return src_dir, dst_dir + + @pytest.fixture def backup_dir(tmp_path): """Provide a temporary backup directory.""" diff --git a/tests/test_fs.py b/tests/test_fs.py index 8abe86c..7db4e25 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -1,54 +1,11 @@ import os import os.path import shutil -import socket -import string -import tempfile import pytest from curateipsum import fs - - -@pytest.fixture -def common_fs_dirs(tmp_path): - """Create source and destination directories for tests.""" - src_dir = tmp_path / "source" - dst_dir = tmp_path / "dest" - src_dir.mkdir() - dst_dir.mkdir() - return src_dir, dst_dir - - -def create_file(parent_dir: str, prefix: str = None) -> str: - """ - Create a file with random name in parent_dir. - Returns absolute path to the created file. - """ - fd, path = tempfile.mkstemp(prefix=prefix, dir=parent_dir) - with open(fd, "w") as f: - f.write(string.printable) - return path - - -def create_dir(parent_dir: str, prefix: str = None) -> str: - """ - Create a directory with random name in parent_dir. - Returns absolute path to created directory. - """ - return tempfile.mkdtemp(prefix=prefix, dir=parent_dir) - - -def relpath(full_path: str, src_dir: str, dst_dir: str) -> str: - """Get a relative path for entity in src/dst dirs.""" - if full_path.startswith(src_dir): - p_dir = src_dir - elif full_path.startswith(dst_dir): - p_dir = dst_dir - else: - raise RuntimeError(f"Path {full_path} is not src_dir nor dst_dir") - - return full_path[len(p_dir) + 1 :] +from conftest import create_file, create_dir, relpath @pytest.fixture @@ -160,168 +117,6 @@ class TestHardlinkDir: assert src_fstat.st_nlink == 2 -def check_identical_file(f1_path: str, f2_path: str): - """Check that files are identical. Fails test, if not.""" - st1 = os.lstat(f1_path) - st2 = os.lstat(f2_path) - - assert st1.st_uid == st2.st_uid - assert st1.st_gid == st2.st_gid - assert st1.st_mode == st2.st_mode - assert st1.st_mtime == st2.st_mtime - assert st1.st_size == st2.st_size - - -class TestRsync: - def test_dst_has_excess_file(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - dst_fpath = create_file(str(dst_dir)) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert not os.path.lexists(dst_fpath) - - def test_dst_has_excess_symlink(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - dst_lpath = os.path.join(str(dst_dir), 'nonexisting_file') - os.symlink('broken_symlink', dst_lpath) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert not os.path.lexists(dst_lpath) - - def test_dst_has_excess_empty_dir(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - dst_dpath = create_dir(str(dst_dir)) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert not os.path.lexists(dst_dpath) - - def test_dst_has_excess_nonempty_dir(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - dst_dpath = create_dir(str(dst_dir)) - create_file(dst_dpath) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert not os.path.lexists(dst_dpath) - - def test_dst_has_excess_nonempty_recursive_dir(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - dst_dpath = create_dir(str(dst_dir)) - nested_dpath = create_dir(dst_dpath) - create_file(nested_dpath) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert not os.path.lexists(dst_dpath) - - def test_different_types_src_file_dst_dir(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - src_fpath = create_file(str(src_dir)) - dst_path = os.path.join(str(dst_dir), - relpath(src_fpath, str(src_dir), str(dst_dir))) - os.mkdir(dst_path) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_path) - assert os.path.isfile(dst_path) - - def test_different_types_src_file_dst_symlink(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - src_fpath = create_file(str(src_dir)) - dst_path = os.path.join(str(dst_dir), - relpath(src_fpath, str(src_dir), str(dst_dir))) - os.symlink('broken_link', dst_path) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_path) - assert os.path.isfile(dst_path) - - def test_different_types_src_symlink_dst_file(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - dst_path = create_file(str(dst_dir)) - src_lpath = os.path.join(str(src_dir), - relpath(dst_path, str(src_dir), str(dst_dir))) - os.symlink('broken_link', src_lpath) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_path) - assert os.path.islink(dst_path) - - def test_different_types_src_symlink_dst_dir(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - dst_path = create_dir(str(dst_dir)) - src_lpath = os.path.join(str(src_dir), - relpath(dst_path, str(src_dir), str(dst_dir))) - os.symlink('broken_link', src_lpath) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_path) - assert os.path.islink(dst_path) - - def test_different_types_src_dir_dst_file(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - src_dpath = create_dir(str(src_dir)) - dst_path = os.path.join(str(dst_dir), - relpath(src_dpath, str(src_dir), str(dst_dir))) - with open(dst_path, "w") as f: - f.write(string.printable) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_path) - assert os.path.isdir(dst_path) - - def test_different_types_src_dir_dst_symlink(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - src_dpath = create_dir(str(src_dir)) - dst_path = os.path.join(str(dst_dir), - relpath(src_dpath, str(src_dir), str(dst_dir))) - os.symlink('broken_link', dst_path) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_path) - assert os.path.isdir(dst_path) - - def test_src_is_socket(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - src_spath = create_file(str(src_dir)) - dst_spath = os.path.join(str(dst_dir), - relpath(src_spath, str(src_dir), str(dst_dir))) - os.unlink(src_spath) - sock = socket.socket(socket.AF_UNIX) - sock.bind(src_spath) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert not os.path.lexists(dst_spath) - - def test_src_dst_same_inode(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - src_fpath = create_file(str(src_dir)) - dst_fpath = os.path.join(str(dst_dir), - relpath(src_fpath, str(src_dir), str(dst_dir))) - os.link(src_fpath, dst_fpath) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_fpath) - src_stat = os.lstat(src_fpath) - dst_stat = os.lstat(dst_fpath) - assert src_stat.st_nlink == 1 - assert dst_stat.st_nlink == 1 - assert src_stat.st_ino != dst_stat.st_ino - - def test_src_dst_diff_size(self, common_fs_dirs): - src_dir, dst_dir = common_fs_dirs - src_fpath = create_file(str(src_dir)) - dst_fpath = os.path.join(str(dst_dir), - relpath(src_fpath, str(src_dir), str(dst_dir))) - with open(dst_fpath, "w") as df: - df.write(string.printable * 2) - - all(fs.rsync(str(src_dir), str(dst_dir))) - assert os.path.lexists(dst_fpath) - check_identical_file(src_fpath, dst_fpath) - - # TODO add tests for changing ownership - # TODO add tests for changing times (?) - - class TestCopyFile: """Test suite for copy_file function.""" @@ -422,102 +217,6 @@ class TestCopyDirEntry: assert os.readlink(dst_link) == target_path -class TestRsyncBasic: - """Test suite for basic rsync functionality.""" - - @pytest.fixture - def rsync_dirs(self, tmp_path): - """Create source and destination directories for rsync tests.""" - src_dir = os.path.join(str(tmp_path), "source") - dst_dir = os.path.join(str(tmp_path), "dest") - os.mkdir(src_dir) - return src_dir, dst_dir - - def test_rsync_creates_destination(self, rsync_dirs): - """Test that rsync creates destination directory if missing""" - src_dir, dst_dir = rsync_dirs - assert not os.path.exists(dst_dir) - list(fs.rsync(src_dir, dst_dir)) - assert os.path.isdir(dst_dir) - - def test_rsync_copies_new_files(self, rsync_dirs): - """Test that rsync copies new files""" - src_dir, dst_dir = rsync_dirs - os.mkdir(dst_dir) - - with open(os.path.join(src_dir, "file1.txt"), "w") as f: - f.write("content1") - with open(os.path.join(src_dir, "file2.txt"), "w") as f: - f.write("content2") - - actions = list(fs.rsync(src_dir, dst_dir)) - - assert os.path.exists(os.path.join(dst_dir, "file1.txt")) - assert os.path.exists(os.path.join(dst_dir, "file2.txt")) - - create_actions = [a for a in actions if a[1] == fs.Actions.CREATE] - assert len(create_actions) == 2 - - def test_rsync_deletes_missing_files(self, rsync_dirs): - """Test that rsync deletes files not in source""" - src_dir, dst_dir = rsync_dirs - os.mkdir(dst_dir) - - dst_file = os.path.join(dst_dir, "old_file.txt") - with open(dst_file, "w") as f: - f.write("old content") - - actions = list(fs.rsync(src_dir, dst_dir)) - - assert not os.path.exists(dst_file) - - delete_actions = [a for a in actions if a[1] == fs.Actions.DELETE] - assert len(delete_actions) == 1 - - def test_rsync_updates_modified_files(self, rsync_dirs): - """Test that rsync updates modified files""" - src_dir, dst_dir = rsync_dirs - os.mkdir(dst_dir) - - src_file = os.path.join(src_dir, "file.txt") - dst_file = os.path.join(dst_dir, "file.txt") - - with open(src_file, "w") as f: - f.write("original") - with open(dst_file, "w") as f: - f.write("modified") - - import time - time.sleep(0.1) - with open(src_file, "w") as f: - f.write("updated content") - - actions = list(fs.rsync(src_dir, dst_dir)) - - with open(dst_file, "r") as f: - assert f.read() == "updated content" - - rewrite_actions = [a for a in actions if a[1] == fs.Actions.REWRITE] - assert len(rewrite_actions) > 0 - - def test_rsync_preserves_permissions(self, rsync_dirs): - """Test that rsync preserves file permissions""" - src_dir, dst_dir = rsync_dirs - os.mkdir(dst_dir) - - src_file = os.path.join(src_dir, "script.sh") - with open(src_file, "w") as f: - f.write("#!/bin/bash\n") - os.chmod(src_file, 0o755) - - list(fs.rsync(src_dir, dst_dir)) - - dst_file = os.path.join(dst_dir, "script.sh") - dst_stat = os.stat(dst_file) - src_stat = os.stat(src_file) - assert dst_stat.st_mode == src_stat.st_mode - - class TestHardlinkDirBasic: """Test suite for basic hardlink_dir functionality.""" @@ -772,90 +471,3 @@ class TestPermissionErrors: "Should handle gracefully and log error.") -class TestParseRsyncOutput: - """Test _parse_rsync_output() parsing of rsync --itemize-changes.""" - - def test_delete_file(self): - """Parse deletion output.""" - result = fs._parse_rsync_output("*deleting path/to/file.txt") - assert result == ("path/to/file.txt", fs.Actions.DELETE, "") - - def test_delete_nested_path(self): - """Parse deletion with nested directory structure.""" - result = fs._parse_rsync_output("*deleting deeply/nested/dir/file.md") - assert result == ("deeply/nested/dir/file.md", fs.Actions.DELETE, "") - - def test_create_file(self): - """Parse file creation.""" - result = fs._parse_rsync_output(">f+++++++++ new_file.txt") - assert result == ("new_file.txt", fs.Actions.CREATE, "") - - def test_create_file_nested(self): - """Parse file creation in subdirectory.""" - result = fs._parse_rsync_output(">f+++++++++ subdir/another.log") - assert result == ("subdir/another.log", fs.Actions.CREATE, "") - - def test_create_directory(self): - """Parse directory creation.""" - result = fs._parse_rsync_output("cd+++++++++ new_directory/") - assert result == ("new_directory/", fs.Actions.CREATE, "") - - def test_create_symlink(self): - """Parse symlink creation.""" - result = fs._parse_rsync_output("cL+++++++++ link_to_file") - assert result == ("link_to_file", fs.Actions.CREATE, "") - - def test_rewrite_file_size_change(self): - """Parse file rewrite due to size change.""" - result = fs._parse_rsync_output(">f.s...... modified_file.txt") - assert result == ("modified_file.txt", fs.Actions.REWRITE, "") - - def test_rewrite_file_time_change(self): - """Parse file rewrite due to time change.""" - result = fs._parse_rsync_output(">f..t...... touched_file.dat") - assert result == ("touched_file.dat", fs.Actions.REWRITE, "") - - def test_rewrite_file_size_and_time(self): - """Parse file rewrite due to both size and time change.""" - result = fs._parse_rsync_output(">f.st...... changed.bin") - assert result == ("changed.bin", fs.Actions.REWRITE, "") - - def test_update_directory_time(self): - """Parse directory time update.""" - result = fs._parse_rsync_output(">d..t...... some_dir/") - assert result == ("some_dir/", fs.Actions.UPDATE_TIME, "") - - def test_update_permissions(self): - """Parse permission change.""" - result = fs._parse_rsync_output(">f....p.... executable.sh") - assert result == ("executable.sh", fs.Actions.UPDATE_PERM, "") - - def test_update_permissions_with_time_change(self): - """Time change takes precedence over permission change.""" - result = fs._parse_rsync_output(">f...tp.... script.py") - assert result == ("script.py", fs.Actions.REWRITE, "") - - def test_update_owner(self): - """Parse owner change.""" - result = fs._parse_rsync_output(">f.....o... owned_file.txt") - assert result == ("owned_file.txt", fs.Actions.UPDATE_OWNER, "") - - def test_update_group(self): - """Parse group change.""" - result = fs._parse_rsync_output(">f......g.. grouped_file.txt") - assert result == ("grouped_file.txt", fs.Actions.UPDATE_OWNER, "") - - def test_update_owner_and_group(self): - """Parse both owner and group change.""" - result = fs._parse_rsync_output(">f.....og.. shared_file.txt") - assert result == ("shared_file.txt", fs.Actions.UPDATE_OWNER, "") - - def test_invalid_format_raises_error(self): - """Unparseable line should raise RuntimeError.""" - with pytest.raises(RuntimeError, match="Not parsed string"): - fs._parse_rsync_output(">f......... unknown.txt") - - def test_empty_change_string_raises_error(self): - """Empty change indicators should raise RuntimeError.""" - with pytest.raises(RuntimeError, match="Not parsed string"): - fs._parse_rsync_output(">f......... no_action.txt") diff --git a/tests/test_rsync.py b/tests/test_rsync.py new file mode 100644 index 0000000..fd6dda4 --- /dev/null +++ b/tests/test_rsync.py @@ -0,0 +1,561 @@ +import os +import os.path +import socket +import string +from unittest.mock import Mock, patch + +import pytest + +from curateipsum import fs +from conftest import create_file, create_dir, relpath, check_identical_file + + +class MockStdout: + """Mock stdout that supports context manager and readline iteration.""" + + def __init__(self, lines): + self.lines = iter(lines) + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def readline(self): + try: + return next(self.lines) + except StopIteration: + return b"" + + +class TestRsync: + def test_dst_has_excess_file(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_fpath = create_file(str(dst_dir)) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert not os.path.lexists(dst_fpath) + + def test_dst_has_excess_symlink(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_lpath = os.path.join(str(dst_dir), 'nonexisting_file') + os.symlink('broken_symlink', dst_lpath) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert not os.path.lexists(dst_lpath) + + def test_dst_has_excess_empty_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_dpath = create_dir(str(dst_dir)) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert not os.path.lexists(dst_dpath) + + def test_dst_has_excess_nonempty_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_dpath = create_dir(str(dst_dir)) + create_file(dst_dpath) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert not os.path.lexists(dst_dpath) + + def test_dst_has_excess_nonempty_recursive_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_dpath = create_dir(str(dst_dir)) + nested_dpath = create_dir(dst_dpath) + create_file(nested_dpath) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert not os.path.lexists(dst_dpath) + + def test_different_types_src_file_dst_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) + os.mkdir(dst_path) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_path) + assert os.path.isfile(dst_path) + + def test_different_types_src_file_dst_symlink(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) + os.symlink('broken_link', dst_path) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_path) + assert os.path.isfile(dst_path) + + def test_different_types_src_symlink_dst_file(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_path = create_file(str(dst_dir)) + src_lpath = os.path.join(str(src_dir), + relpath(dst_path, str(src_dir), str(dst_dir))) + os.symlink('broken_link', src_lpath) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_path) + assert os.path.islink(dst_path) + + def test_different_types_src_symlink_dst_dir(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + dst_path = create_dir(str(dst_dir)) + src_lpath = os.path.join(str(src_dir), + relpath(dst_path, str(src_dir), str(dst_dir))) + os.symlink('broken_link', src_lpath) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_path) + assert os.path.islink(dst_path) + + def test_different_types_src_dir_dst_file(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_dpath = create_dir(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_dpath, str(src_dir), str(dst_dir))) + with open(dst_path, "w") as f: + f.write(string.printable) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_path) + assert os.path.isdir(dst_path) + + def test_different_types_src_dir_dst_symlink(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_dpath = create_dir(str(src_dir)) + dst_path = os.path.join(str(dst_dir), + relpath(src_dpath, str(src_dir), str(dst_dir))) + os.symlink('broken_link', dst_path) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_path) + assert os.path.isdir(dst_path) + + def test_src_is_socket(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_spath = create_file(str(src_dir)) + dst_spath = os.path.join(str(dst_dir), + relpath(src_spath, str(src_dir), str(dst_dir))) + os.unlink(src_spath) + sock = socket.socket(socket.AF_UNIX) + sock.bind(src_spath) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert not os.path.lexists(dst_spath) + + def test_src_dst_same_inode(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_fpath = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) + os.link(src_fpath, dst_fpath) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_fpath) + src_stat = os.lstat(src_fpath) + dst_stat = os.lstat(dst_fpath) + assert src_stat.st_nlink == 1 + assert dst_stat.st_nlink == 1 + assert src_stat.st_ino != dst_stat.st_ino + + def test_src_dst_diff_size(self, common_fs_dirs): + src_dir, dst_dir = common_fs_dirs + src_fpath = create_file(str(src_dir)) + dst_fpath = os.path.join(str(dst_dir), + relpath(src_fpath, str(src_dir), str(dst_dir))) + with open(dst_fpath, "w") as df: + df.write(string.printable * 2) + + all(fs.rsync(str(src_dir), str(dst_dir))) + assert os.path.lexists(dst_fpath) + check_identical_file(src_fpath, dst_fpath) + + # TODO add tests for changing ownership + # TODO add tests for changing times (?) + + +class TestRsyncBasic: + """Test suite for basic rsync functionality.""" + + @pytest.fixture + def rsync_dirs(self, tmp_path): + """Create source and destination directories for rsync tests.""" + src_dir = os.path.join(str(tmp_path), "source") + dst_dir = os.path.join(str(tmp_path), "dest") + os.mkdir(src_dir) + return src_dir, dst_dir + + def test_rsync_creates_destination(self, rsync_dirs): + """Test that rsync creates destination directory if missing""" + src_dir, dst_dir = rsync_dirs + assert not os.path.exists(dst_dir) + list(fs.rsync(src_dir, dst_dir)) + assert os.path.isdir(dst_dir) + + def test_rsync_copies_new_files(self, rsync_dirs): + """Test that rsync copies new files""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + with open(os.path.join(src_dir, "file1.txt"), "w") as f: + f.write("content1") + with open(os.path.join(src_dir, "file2.txt"), "w") as f: + f.write("content2") + + actions = list(fs.rsync(src_dir, dst_dir)) + + assert os.path.exists(os.path.join(dst_dir, "file1.txt")) + assert os.path.exists(os.path.join(dst_dir, "file2.txt")) + + create_actions = [a for a in actions if a[1] == fs.Actions.CREATE] + assert len(create_actions) == 2 + + def test_rsync_deletes_missing_files(self, rsync_dirs): + """Test that rsync deletes files not in source""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + dst_file = os.path.join(dst_dir, "old_file.txt") + with open(dst_file, "w") as f: + f.write("old content") + + actions = list(fs.rsync(src_dir, dst_dir)) + + assert not os.path.exists(dst_file) + + delete_actions = [a for a in actions if a[1] == fs.Actions.DELETE] + assert len(delete_actions) == 1 + + def test_rsync_updates_modified_files(self, rsync_dirs): + """Test that rsync updates modified files""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + src_file = os.path.join(src_dir, "file.txt") + dst_file = os.path.join(dst_dir, "file.txt") + + with open(src_file, "w") as f: + f.write("original") + with open(dst_file, "w") as f: + f.write("modified") + + import time + time.sleep(0.1) + with open(src_file, "w") as f: + f.write("updated content") + + actions = list(fs.rsync(src_dir, dst_dir)) + + with open(dst_file, "r") as f: + assert f.read() == "updated content" + + rewrite_actions = [a for a in actions if a[1] == fs.Actions.REWRITE] + assert len(rewrite_actions) > 0 + + def test_rsync_preserves_permissions(self, rsync_dirs): + """Test that rsync preserves file permissions""" + src_dir, dst_dir = rsync_dirs + os.mkdir(dst_dir) + + src_file = os.path.join(src_dir, "script.sh") + with open(src_file, "w") as f: + f.write("#!/bin/bash\n") + os.chmod(src_file, 0o755) + + list(fs.rsync(src_dir, dst_dir)) + + dst_file = os.path.join(dst_dir, "script.sh") + dst_stat = os.stat(dst_file) + src_stat = os.stat(src_file) + assert dst_stat.st_mode == src_stat.st_mode + + +class TestParseRsyncOutput: + """Test _parse_rsync_output() parsing of rsync --itemize-changes.""" + + def test_delete_file(self): + """Parse deletion output.""" + result = fs._parse_rsync_output("*deleting path/to/file.txt") + assert result == ("path/to/file.txt", fs.Actions.DELETE, "") + + def test_delete_nested_path(self): + """Parse deletion with nested directory structure.""" + result = fs._parse_rsync_output("*deleting deeply/nested/dir/file.md") + assert result == ("deeply/nested/dir/file.md", fs.Actions.DELETE, "") + + def test_create_file(self): + """Parse file creation.""" + result = fs._parse_rsync_output(">f+++++++++ new_file.txt") + assert result == ("new_file.txt", fs.Actions.CREATE, "") + + def test_create_file_nested(self): + """Parse file creation in subdirectory.""" + result = fs._parse_rsync_output(">f+++++++++ subdir/another.log") + assert result == ("subdir/another.log", fs.Actions.CREATE, "") + + def test_create_directory(self): + """Parse directory creation.""" + result = fs._parse_rsync_output("cd+++++++++ new_directory/") + assert result == ("new_directory/", fs.Actions.CREATE, "") + + def test_create_symlink(self): + """Parse symlink creation.""" + result = fs._parse_rsync_output("cL+++++++++ link_to_file") + assert result == ("link_to_file", fs.Actions.CREATE, "") + + def test_rewrite_file_size_change(self): + """Parse file rewrite due to size change.""" + result = fs._parse_rsync_output(">f.s...... modified_file.txt") + assert result == ("modified_file.txt", fs.Actions.REWRITE, "") + + def test_rewrite_file_time_change(self): + """Parse file rewrite due to time change.""" + result = fs._parse_rsync_output(">f..t...... touched_file.dat") + assert result == ("touched_file.dat", fs.Actions.REWRITE, "") + + def test_rewrite_file_size_and_time(self): + """Parse file rewrite due to both size and time change.""" + result = fs._parse_rsync_output(">f.st...... changed.bin") + assert result == ("changed.bin", fs.Actions.REWRITE, "") + + def test_update_directory_time(self): + """Parse directory time update.""" + result = fs._parse_rsync_output(">d..t...... some_dir/") + assert result == ("some_dir/", fs.Actions.UPDATE_TIME, "") + + def test_update_permissions(self): + """Parse permission change.""" + result = fs._parse_rsync_output(">f....p.... executable.sh") + assert result == ("executable.sh", fs.Actions.UPDATE_PERM, "") + + def test_update_permissions_with_time_change(self): + """Time change takes precedence over permission change.""" + result = fs._parse_rsync_output(">f...tp.... script.py") + assert result == ("script.py", fs.Actions.REWRITE, "") + + def test_update_owner(self): + """Parse owner change.""" + result = fs._parse_rsync_output(">f.....o... owned_file.txt") + assert result == ("owned_file.txt", fs.Actions.UPDATE_OWNER, "") + + def test_update_group(self): + """Parse group change.""" + result = fs._parse_rsync_output(">f......g.. grouped_file.txt") + assert result == ("grouped_file.txt", fs.Actions.UPDATE_OWNER, "") + + def test_update_owner_and_group(self): + """Parse both owner and group change.""" + result = fs._parse_rsync_output(">f.....og.. shared_file.txt") + assert result == ("shared_file.txt", fs.Actions.UPDATE_OWNER, "") + + def test_invalid_format_raises_error(self): + """Unparseable line should raise RuntimeError.""" + with pytest.raises(RuntimeError, match="Not parsed string"): + fs._parse_rsync_output(">f......... unknown.txt") + + def test_empty_change_string_raises_error(self): + """Empty change indicators should raise RuntimeError.""" + with pytest.raises(RuntimeError, match="Not parsed string"): + fs._parse_rsync_output(">f......... no_action.txt") + + +class TestRsyncExt: + """Test external rsync command wrapper.""" + + def test_command_construction(self, tmp_path): + """Verify rsync command is built with correct arguments.""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + # minimal valid rsync output + mock_process.stdout = MockStdout([b">f+++++++++ test.txt\n"]) + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + + list(fs.rsync_ext(str(src), str(dst))) + + # verify command was called with correct arguments + args = mock_popen.call_args[0][0] + assert args[0] == "rsync" + assert "--archive" in args + assert "--whole-file" in args + assert "--human-readable" in args + assert "--delete-during" in args + assert "--itemize-changes" in args + assert args[-2] == f"{src}/" + assert args[-1] == str(dst) + + def test_dry_run_flag(self, tmp_path): + """Verify --dry-run flag is added when dry_run=True.""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + mock_process.stdout = MockStdout([b">f+++++++++ test.txt\n"]) + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + + list(fs.rsync_ext(str(src), str(dst), dry_run=True)) + + args = mock_popen.call_args[0][0] + assert "--dry-run" in args + + def test_processes_rsync_output_lines(self, tmp_path): + """Parse valid rsync itemize output lines.""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + rsync_output = [ + b"cd+++++++++ subdir/\n", + b">f+++++++++ subdir/newfile.txt\n", + ] + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + mock_process.stdout = MockStdout(rsync_output) + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + + results = list(fs.rsync_ext(str(src), str(dst))) + + # both lines should be processed + assert len(results) == 2 + assert results[0][0] == "subdir/" + assert results[0][1] == fs.Actions.CREATE + assert results[1][0] == "subdir/newfile.txt" + assert results[1][1] == fs.Actions.CREATE + + def test_handles_unicode_decode_error(self, tmp_path, caplog): + """Log error when encountering non-UTF8 bytes.""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + # invalid UTF-8 byte sequence + invalid_utf8 = b"\xff\xfe invalid bytes\n" + rsync_output = [ + invalid_utf8, + b">f+++++++++ validfile.txt\n", + ] + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + mock_process.stdout = MockStdout(rsync_output) + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + + results = list(fs.rsync_ext(str(src), str(dst))) + + # current behavior: invalid line causes prev_line to get stuck, + # so subsequent lines are also skipped (potential bug) + assert len(results) == 0 + assert "Can't process rsync line" in caplog.text + + def test_yields_parsed_actions(self, tmp_path): + """Verify integration with _parse_rsync_output().""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + rsync_output = [ + b"cd+++++++++ dir1/\n", + b">f+++++++++ file1.txt\n", + b"*deleting oldfile.txt\n", + b">f..t...... file2.txt\n", + ] + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + mock_process.stdout = MockStdout(rsync_output) + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + + results = list(fs.rsync_ext(str(src), str(dst))) + + assert len(results) == 4 + assert results[0] == ("dir1/", fs.Actions.CREATE, "") + assert results[1] == ("file1.txt", fs.Actions.CREATE, "") + assert results[2] == ("oldfile.txt", fs.Actions.DELETE, "") + assert results[3] == ("file2.txt", fs.Actions.REWRITE, "") + + def test_calls_process_wait(self, tmp_path): + """Ensure subprocess.wait() is called to clean up process.""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + mock_process.stdout = MockStdout([b">f+++++++++ test.txt\n"]) + mock_popen.return_value = mock_process + + list(fs.rsync_ext(str(src), str(dst))) + + mock_process.wait.assert_called_once() + + def test_single_line_output(self, tmp_path): + """Handle case where rsync produces single file output.""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + mock_process.stdout = MockStdout([b">f+++++++++ newfile.txt\n"]) + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + + results = list(fs.rsync_ext(str(src), str(dst))) + + assert len(results) == 1 + assert results[0][0] == "newfile.txt" + assert results[0][1] == fs.Actions.CREATE + + def test_multiple_files_in_output(self, tmp_path): + """Process multiple file operations in single rsync run.""" + src = tmp_path / "source" + dst = tmp_path / "dest" + src.mkdir() + dst.mkdir() + + # realistic rsync output with multiple operations + rsync_output = [ + b"cd+++++++++ docs/\n", + b">f+++++++++ docs/readme.md\n", + b">f+++++++++ config.json\n", + b".f..t...... existing.txt\n", + b"*deleting old/\n", + ] + + with patch("subprocess.Popen") as mock_popen: + mock_process = Mock() + mock_process.stdout = MockStdout(rsync_output) + mock_process.wait.return_value = 0 + mock_popen.return_value = mock_process + + results = list(fs.rsync_ext(str(src), str(dst))) + + assert len(results) == 5 + assert results[0][0] == "docs/" + assert results[1][0] == "docs/readme.md" + assert results[2][0] == "config.json" + assert results[3][0] == "existing.txt" + assert results[4][0] == "old/"