Files
cura-te-ipsum/tests/test_fs.py
Maks Snegov fe48010fc9 Add comprehensive tests for rsync output parsing
Implements TestParseRsyncOutput class with 17 tests covering all code
paths in _parse_rsync_output() function. Tests cover deletions, file/
directory/symlink creation, rewrites, metadata updates (permissions,
ownership, timestamps), and error handling for invalid input.
2026-02-04 19:36:24 -08:00

862 lines
30 KiB
Python

import os
import os.path
import shutil
import socket
import string
import tempfile
import pytest
from curateipsum import fs
@pytest.fixture
def common_fs_dirs(tmp_path):
"""Create source and destination directories for tests."""
src_dir = tmp_path / "source"
dst_dir = tmp_path / "dest"
src_dir.mkdir()
dst_dir.mkdir()
return src_dir, dst_dir
def create_file(parent_dir: str, prefix: str = None) -> str:
"""
Create a file with random name in parent_dir.
Returns absolute path to the created file.
"""
fd, path = tempfile.mkstemp(prefix=prefix, dir=parent_dir)
with open(fd, "w") as f:
f.write(string.printable)
return path
def create_dir(parent_dir: str, prefix: str = None) -> str:
"""
Create a directory with random name in parent_dir.
Returns absolute path to created directory.
"""
return tempfile.mkdtemp(prefix=prefix, dir=parent_dir)
def relpath(full_path: str, src_dir: str, dst_dir: str) -> str:
"""Get a relative path for entity in src/dst dirs."""
if full_path.startswith(src_dir):
p_dir = src_dir
elif full_path.startswith(dst_dir):
p_dir = dst_dir
else:
raise RuntimeError(f"Path {full_path} is not src_dir nor dst_dir")
return full_path[len(p_dir) + 1 :]
@pytest.fixture
def hardlink_dirs(tmp_path):
"""Create source directory and destination path for hardlink tests."""
src_dir = str(tmp_path / "source")
os.mkdir(src_dir)
dst_dir = src_dir + ".copy"
yield src_dir, dst_dir
shutil.rmtree(dst_dir, ignore_errors=True)
def check_directory_stats(d1_path: str, d2_path: str):
"""Check that directory was copied. Fails test, if not."""
dir1_stat = os.lstat(d1_path)
dir2_stat = os.lstat(d2_path)
assert dir1_stat.st_uid == dir2_stat.st_uid
assert dir1_stat.st_gid == dir2_stat.st_gid
assert dir1_stat.st_mode == dir2_stat.st_mode
assert dir1_stat.st_nlink == dir2_stat.st_nlink
assert dir1_stat.st_size == dir2_stat.st_size
assert dir1_stat.st_mtime == dir2_stat.st_mtime
class TestHardlinkDir:
def test_regular_file(self, hardlink_dirs):
src_dir, dst_dir = hardlink_dirs
cf_path = create_file(src_dir)
cf_relpath = relpath(cf_path, src_dir, dst_dir)
fs.hardlink_dir(src_dir, dst_dir)
src_stat = os.lstat(cf_path)
dst_stat = os.lstat(os.path.join(src_dir, cf_relpath))
assert os.path.samestat(src_stat, dst_stat)
assert src_stat.st_nlink == 2
def test_relative_symlink_to_common_file(self, hardlink_dirs):
src_dir, dst_dir = hardlink_dirs
cf_relpath = relpath(create_file(src_dir), src_dir, dst_dir)
sl2cf_relpath = "symlink_to_common_file"
os.chdir(src_dir)
os.symlink(cf_relpath, sl2cf_relpath)
fs.hardlink_dir(src_dir, dst_dir)
# check the link
dst_sl2cf_path = os.path.join(dst_dir, sl2cf_relpath)
assert os.readlink(dst_sl2cf_path) == cf_relpath
# check stats
src_stat = os.lstat(os.path.join(dst_dir, sl2cf_relpath))
dst_stat = os.lstat(dst_sl2cf_path)
assert os.path.samestat(src_stat, dst_stat)
assert src_stat.st_nlink == 2
def test_absolute_symlink_to_common_file(self, hardlink_dirs):
src_dir, dst_dir = hardlink_dirs
cf_path = create_file(src_dir)
sl2cf_relpath = "symlink_to_common_file"
sl2cf_path = os.path.join(src_dir, sl2cf_relpath)
os.symlink(cf_path, sl2cf_path)
fs.hardlink_dir(src_dir, dst_dir)
# check the link
dst_sl2cf_path = os.path.join(dst_dir, sl2cf_relpath)
assert os.readlink(dst_sl2cf_path) == cf_path
# check stats
src_stat = os.lstat(os.path.join(dst_dir, sl2cf_relpath))
dst_stat = os.lstat(dst_sl2cf_path)
assert os.path.samestat(src_stat, dst_stat)
assert src_stat.st_nlink == 2
def test_hardlink_to_common_file(self, hardlink_dirs):
src_dir, dst_dir = hardlink_dirs
cf_path = create_file(src_dir)
hl2cf_relpath = "hardlink_to_common_file"
hl2cf_path = os.path.join(src_dir, hl2cf_relpath)
os.link(cf_path, hl2cf_path)
fs.hardlink_dir(src_dir, dst_dir)
src_cf_stat = os.lstat(cf_path)
src_hl_stat = os.lstat(hl2cf_path)
dst_hl_stat = os.lstat(os.path.join(dst_dir, hl2cf_relpath))
assert os.path.samestat(src_cf_stat, dst_hl_stat)
assert os.path.samestat(src_hl_stat, dst_hl_stat)
assert src_cf_stat.st_nlink == 4
def test_nested_dir(self, hardlink_dirs):
src_dir, dst_dir = hardlink_dirs
src_ndir_path = create_dir(src_dir)
src_nfile_path = create_file(src_ndir_path)
ndir_relpath = relpath(src_ndir_path, src_dir, dst_dir)
nfile_relpath = relpath(src_nfile_path, src_dir, dst_dir)
fs.hardlink_dir(src_dir, dst_dir)
check_directory_stats(src_ndir_path,
os.path.join(dst_dir, ndir_relpath))
# check the file in nested directory
src_fstat = os.lstat(src_nfile_path)
dst_fstat = os.lstat(os.path.join(dst_dir, nfile_relpath))
assert os.path.samestat(src_fstat, dst_fstat)
assert src_fstat.st_nlink == 2
def check_identical_file(f1_path: str, f2_path: str):
"""Check that files are identical. Fails test, if not."""
st1 = os.lstat(f1_path)
st2 = os.lstat(f2_path)
assert st1.st_uid == st2.st_uid
assert st1.st_gid == st2.st_gid
assert st1.st_mode == st2.st_mode
assert st1.st_mtime == st2.st_mtime
assert st1.st_size == st2.st_size
class TestRsync:
def test_dst_has_excess_file(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
dst_fpath = create_file(str(dst_dir))
all(fs.rsync(str(src_dir), str(dst_dir)))
assert not os.path.lexists(dst_fpath)
def test_dst_has_excess_symlink(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
dst_lpath = os.path.join(str(dst_dir), 'nonexisting_file')
os.symlink('broken_symlink', dst_lpath)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert not os.path.lexists(dst_lpath)
def test_dst_has_excess_empty_dir(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
dst_dpath = create_dir(str(dst_dir))
all(fs.rsync(str(src_dir), str(dst_dir)))
assert not os.path.lexists(dst_dpath)
def test_dst_has_excess_nonempty_dir(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
dst_dpath = create_dir(str(dst_dir))
create_file(dst_dpath)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert not os.path.lexists(dst_dpath)
def test_dst_has_excess_nonempty_recursive_dir(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
dst_dpath = create_dir(str(dst_dir))
nested_dpath = create_dir(dst_dpath)
create_file(nested_dpath)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert not os.path.lexists(dst_dpath)
def test_different_types_src_file_dst_dir(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
src_fpath = create_file(str(src_dir))
dst_path = os.path.join(str(dst_dir),
relpath(src_fpath, str(src_dir), str(dst_dir)))
os.mkdir(dst_path)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_path)
assert os.path.isfile(dst_path)
def test_different_types_src_file_dst_symlink(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
src_fpath = create_file(str(src_dir))
dst_path = os.path.join(str(dst_dir),
relpath(src_fpath, str(src_dir), str(dst_dir)))
os.symlink('broken_link', dst_path)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_path)
assert os.path.isfile(dst_path)
def test_different_types_src_symlink_dst_file(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
dst_path = create_file(str(dst_dir))
src_lpath = os.path.join(str(src_dir),
relpath(dst_path, str(src_dir), str(dst_dir)))
os.symlink('broken_link', src_lpath)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_path)
assert os.path.islink(dst_path)
def test_different_types_src_symlink_dst_dir(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
dst_path = create_dir(str(dst_dir))
src_lpath = os.path.join(str(src_dir),
relpath(dst_path, str(src_dir), str(dst_dir)))
os.symlink('broken_link', src_lpath)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_path)
assert os.path.islink(dst_path)
def test_different_types_src_dir_dst_file(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
src_dpath = create_dir(str(src_dir))
dst_path = os.path.join(str(dst_dir),
relpath(src_dpath, str(src_dir), str(dst_dir)))
with open(dst_path, "w") as f:
f.write(string.printable)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_path)
assert os.path.isdir(dst_path)
def test_different_types_src_dir_dst_symlink(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
src_dpath = create_dir(str(src_dir))
dst_path = os.path.join(str(dst_dir),
relpath(src_dpath, str(src_dir), str(dst_dir)))
os.symlink('broken_link', dst_path)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_path)
assert os.path.isdir(dst_path)
def test_src_is_socket(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
src_spath = create_file(str(src_dir))
dst_spath = os.path.join(str(dst_dir),
relpath(src_spath, str(src_dir), str(dst_dir)))
os.unlink(src_spath)
sock = socket.socket(socket.AF_UNIX)
sock.bind(src_spath)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert not os.path.lexists(dst_spath)
def test_src_dst_same_inode(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
src_fpath = create_file(str(src_dir))
dst_fpath = os.path.join(str(dst_dir),
relpath(src_fpath, str(src_dir), str(dst_dir)))
os.link(src_fpath, dst_fpath)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_fpath)
src_stat = os.lstat(src_fpath)
dst_stat = os.lstat(dst_fpath)
assert src_stat.st_nlink == 1
assert dst_stat.st_nlink == 1
assert src_stat.st_ino != dst_stat.st_ino
def test_src_dst_diff_size(self, common_fs_dirs):
src_dir, dst_dir = common_fs_dirs
src_fpath = create_file(str(src_dir))
dst_fpath = os.path.join(str(dst_dir),
relpath(src_fpath, str(src_dir), str(dst_dir)))
with open(dst_fpath, "w") as df:
df.write(string.printable * 2)
all(fs.rsync(str(src_dir), str(dst_dir)))
assert os.path.lexists(dst_fpath)
check_identical_file(src_fpath, dst_fpath)
# TODO add tests for changing ownership
# TODO add tests for changing times (?)
class TestCopyFile:
"""Test suite for copy_file function."""
def test_copy_simple_file(self, tmp_path):
"""Test copying a simple file"""
src_path = os.path.join(str(tmp_path), "source.txt")
dst_path = os.path.join(str(tmp_path), "dest.txt")
content = b"Hello, World!" * 1000
with open(src_path, "wb") as f:
f.write(content)
fs.copy_file(src_path, dst_path)
assert os.path.exists(dst_path)
with open(dst_path, "rb") as f:
assert f.read() == content
def test_copy_large_file(self, tmp_path):
"""Test copying a large file (> buffer size)"""
src_path = os.path.join(str(tmp_path), "large.bin")
dst_path = os.path.join(str(tmp_path), "large_copy.bin")
content = b"x" * (200 * 1024) # 200KB
with open(src_path, "wb") as f:
f.write(content)
fs.copy_file(src_path, dst_path)
assert os.path.exists(dst_path)
assert os.path.getsize(dst_path) == len(content)
def test_copy_preserves_permissions(self, tmp_path):
"""Test that copy_file preserves file permissions"""
src_path = os.path.join(str(tmp_path), "executable.sh")
dst_path = os.path.join(str(tmp_path), "executable_copy.sh")
with open(src_path, "w") as f:
f.write("#!/bin/bash\necho test")
os.chmod(src_path, 0o755)
fs.copy_file(src_path, dst_path)
src_stat = os.stat(src_path)
dst_stat = os.stat(dst_path)
assert src_stat.st_mode == dst_stat.st_mode
class TestCopyDirEntry:
"""Test suite for copy_direntry function."""
def test_copy_file_entry(self, tmp_path):
"""Test copying a file DirEntry"""
src_path = os.path.join(str(tmp_path), "source.txt")
dst_path = os.path.join(str(tmp_path), "dest.txt")
with open(src_path, "w") as f:
f.write("test content")
os.chmod(src_path, 0o644)
entry = fs.PseudoDirEntry(src_path)
fs.copy_direntry(entry, dst_path)
assert os.path.isfile(dst_path)
with open(dst_path, "r") as f:
assert f.read() == "test content"
def test_copy_directory_entry(self, tmp_path):
"""Test copying a directory DirEntry"""
src_path = os.path.join(str(tmp_path), "srcdir")
dst_path = os.path.join(str(tmp_path), "dstdir")
os.mkdir(src_path)
os.chmod(src_path, 0o755)
entry = fs.PseudoDirEntry(src_path)
fs.copy_direntry(entry, dst_path)
assert os.path.isdir(dst_path)
def test_copy_symlink_entry(self, tmp_path):
"""Test copying a symlink DirEntry using real os.DirEntry"""
target_path = os.path.join(str(tmp_path), "target.txt")
src_link = os.path.join(str(tmp_path), "source_link")
dst_link = os.path.join(str(tmp_path), "dest_link")
with open(target_path, "w") as f:
f.write("target")
os.symlink(target_path, src_link)
with os.scandir(str(tmp_path)) as it:
for entry in it:
if entry.name == "source_link":
fs.copy_direntry(entry, dst_link)
break
assert os.path.islink(dst_link)
assert os.readlink(dst_link) == target_path
class TestRsyncBasic:
"""Test suite for basic rsync functionality."""
@pytest.fixture
def rsync_dirs(self, tmp_path):
"""Create source and destination directories for rsync tests."""
src_dir = os.path.join(str(tmp_path), "source")
dst_dir = os.path.join(str(tmp_path), "dest")
os.mkdir(src_dir)
return src_dir, dst_dir
def test_rsync_creates_destination(self, rsync_dirs):
"""Test that rsync creates destination directory if missing"""
src_dir, dst_dir = rsync_dirs
assert not os.path.exists(dst_dir)
list(fs.rsync(src_dir, dst_dir))
assert os.path.isdir(dst_dir)
def test_rsync_copies_new_files(self, rsync_dirs):
"""Test that rsync copies new files"""
src_dir, dst_dir = rsync_dirs
os.mkdir(dst_dir)
with open(os.path.join(src_dir, "file1.txt"), "w") as f:
f.write("content1")
with open(os.path.join(src_dir, "file2.txt"), "w") as f:
f.write("content2")
actions = list(fs.rsync(src_dir, dst_dir))
assert os.path.exists(os.path.join(dst_dir, "file1.txt"))
assert os.path.exists(os.path.join(dst_dir, "file2.txt"))
create_actions = [a for a in actions if a[1] == fs.Actions.CREATE]
assert len(create_actions) == 2
def test_rsync_deletes_missing_files(self, rsync_dirs):
"""Test that rsync deletes files not in source"""
src_dir, dst_dir = rsync_dirs
os.mkdir(dst_dir)
dst_file = os.path.join(dst_dir, "old_file.txt")
with open(dst_file, "w") as f:
f.write("old content")
actions = list(fs.rsync(src_dir, dst_dir))
assert not os.path.exists(dst_file)
delete_actions = [a for a in actions if a[1] == fs.Actions.DELETE]
assert len(delete_actions) == 1
def test_rsync_updates_modified_files(self, rsync_dirs):
"""Test that rsync updates modified files"""
src_dir, dst_dir = rsync_dirs
os.mkdir(dst_dir)
src_file = os.path.join(src_dir, "file.txt")
dst_file = os.path.join(dst_dir, "file.txt")
with open(src_file, "w") as f:
f.write("original")
with open(dst_file, "w") as f:
f.write("modified")
import time
time.sleep(0.1)
with open(src_file, "w") as f:
f.write("updated content")
actions = list(fs.rsync(src_dir, dst_dir))
with open(dst_file, "r") as f:
assert f.read() == "updated content"
rewrite_actions = [a for a in actions if a[1] == fs.Actions.REWRITE]
assert len(rewrite_actions) > 0
def test_rsync_preserves_permissions(self, rsync_dirs):
"""Test that rsync preserves file permissions"""
src_dir, dst_dir = rsync_dirs
os.mkdir(dst_dir)
src_file = os.path.join(src_dir, "script.sh")
with open(src_file, "w") as f:
f.write("#!/bin/bash\n")
os.chmod(src_file, 0o755)
list(fs.rsync(src_dir, dst_dir))
dst_file = os.path.join(dst_dir, "script.sh")
dst_stat = os.stat(dst_file)
src_stat = os.stat(src_file)
assert dst_stat.st_mode == src_stat.st_mode
class TestHardlinkDirBasic:
"""Test suite for basic hardlink_dir functionality."""
@pytest.fixture
def hardlink_basic_dirs(self, tmp_path):
"""Create source and destination directories for basic hardlink tests."""
src_dir = os.path.join(str(tmp_path), "source")
dst_dir = os.path.join(str(tmp_path), "dest")
os.mkdir(src_dir)
return src_dir, dst_dir
def test_hardlink_creates_destination(self, hardlink_basic_dirs):
"""Test that hardlink_dir creates destination directory"""
src_dir, dst_dir = hardlink_basic_dirs
result = fs.hardlink_dir(src_dir, dst_dir)
assert result
assert os.path.isdir(dst_dir)
def test_hardlink_links_files(self, hardlink_basic_dirs):
"""Test that files are hardlinked, not copied"""
src_dir, dst_dir = hardlink_basic_dirs
src_file = os.path.join(src_dir, "file.txt")
with open(src_file, "w") as f:
f.write("test content")
fs.hardlink_dir(src_dir, dst_dir)
dst_file = os.path.join(dst_dir, "file.txt")
assert os.path.exists(dst_file)
src_stat = os.stat(src_file)
dst_stat = os.stat(dst_file)
assert src_stat.st_ino == dst_stat.st_ino
def test_hardlink_nested_directories(self, hardlink_basic_dirs):
"""Test hardlinking nested directory structure"""
src_dir, dst_dir = hardlink_basic_dirs
subdir = os.path.join(src_dir, "subdir")
os.mkdir(subdir)
with open(os.path.join(subdir, "nested.txt"), "w") as f:
f.write("nested content")
fs.hardlink_dir(src_dir, dst_dir)
dst_nested = os.path.join(dst_dir, "subdir", "nested.txt")
assert os.path.exists(dst_nested)
src_nested = os.path.join(subdir, "nested.txt")
src_stat = os.stat(src_nested)
dst_stat = os.stat(dst_nested)
assert src_stat.st_ino == dst_stat.st_ino
class TestScantree:
"""Test suite for scantree function."""
def test_scantree_empty_directory(self, tmp_path):
"""Test scanning empty directory"""
entries = list(fs.scantree(str(tmp_path)))
assert len(entries) == 0
def test_scantree_flat_directory(self, tmp_path):
"""Test scanning flat directory structure"""
for i in range(3):
with open(os.path.join(str(tmp_path), f"file{i}.txt"), "w") as f:
f.write(f"content {i}")
entries = list(fs.scantree(str(tmp_path)))
assert len(entries) == 3
def test_scantree_nested_directories(self, tmp_path):
"""Test scanning nested directory structure"""
subdir1 = os.path.join(str(tmp_path), "dir1")
subdir2 = os.path.join(subdir1, "dir2")
os.makedirs(subdir2)
with open(os.path.join(str(tmp_path), "root.txt"), "w") as f:
f.write("root")
with open(os.path.join(subdir1, "sub1.txt"), "w") as f:
f.write("sub1")
with open(os.path.join(subdir2, "sub2.txt"), "w") as f:
f.write("sub2")
entries = list(fs.scantree(str(tmp_path), dir_first=True))
assert len(entries) == 5
names = [os.path.basename(e.path) for e in entries]
dir1_idx = names.index("dir1")
sub1_idx = names.index("sub1.txt")
assert dir1_idx < sub1_idx
class TestRmDirentry:
"""Test suite for rm_direntry function."""
def test_remove_file(self, tmp_path):
"""Test removing a file"""
file_path = os.path.join(str(tmp_path), "test.txt")
with open(file_path, "w") as f:
f.write("test")
entry = fs.PseudoDirEntry(file_path)
fs.rm_direntry(entry)
assert not os.path.exists(file_path)
def test_remove_empty_directory(self, tmp_path):
"""Test removing an empty directory"""
dir_path = os.path.join(str(tmp_path), "testdir")
os.mkdir(dir_path)
entry = fs.PseudoDirEntry(dir_path)
fs.rm_direntry(entry)
assert not os.path.exists(dir_path)
def test_remove_directory_with_contents(self, tmp_path):
"""Test removing a directory with files"""
dir_path = os.path.join(str(tmp_path), "testdir")
os.mkdir(dir_path)
with open(os.path.join(dir_path, "file.txt"), "w") as f:
f.write("test")
entry = fs.PseudoDirEntry(dir_path)
fs.rm_direntry(entry)
assert not os.path.exists(dir_path)
def test_remove_symlink(self, tmp_path):
"""Test removing a symlink using real os.DirEntry"""
target = os.path.join(str(tmp_path), "target.txt")
link = os.path.join(str(tmp_path), "link")
with open(target, "w") as f:
f.write("target")
os.symlink(target, link)
with os.scandir(str(tmp_path)) as it:
for entry in it:
if entry.name == "link":
fs.rm_direntry(entry)
break
assert not os.path.exists(link)
assert not os.path.islink(link)
assert os.path.exists(target)
class TestPermissionErrors:
"""Test permission error handling during backup operations."""
@pytest.fixture
def perm_dirs(self, tmp_path):
"""Create source and destination directories for permission tests."""
src_dir = os.path.join(str(tmp_path), "source")
dst_dir = os.path.join(str(tmp_path), "dest")
os.mkdir(src_dir)
yield src_dir, dst_dir
# Restore permissions before cleanup
for root, dirs, files in os.walk(str(tmp_path)):
for d in dirs:
try:
os.chmod(os.path.join(root, d), 0o755)
except:
pass
for f in files:
try:
os.chmod(os.path.join(root, f), 0o644)
except:
pass
def test_rsync_handles_unreadable_file(self, perm_dirs):
"""Test that rsync handles files it cannot read gracefully."""
src_dir, dst_dir = perm_dirs
readable_file = os.path.join(src_dir, "readable.txt")
with open(readable_file, "w") as f:
f.write("can read this")
unreadable_file = os.path.join(src_dir, "unreadable.txt")
with open(unreadable_file, "w") as f:
f.write("cannot read this")
os.chmod(unreadable_file, 0o000)
try:
actions = list(fs.rsync(src_dir, dst_dir))
readable_dst = os.path.join(dst_dir, "readable.txt")
assert os.path.exists(readable_dst)
error_actions = [a for a in actions if a[1] == fs.Actions.ERROR]
assert len(error_actions) > 0, \
"Should have ERROR action for unreadable file"
except PermissionError as e:
pytest.fail(f"rsync crashed on permission error: {e}. "
"Should handle gracefully and continue.")
def test_copy_file_with_unreadable_source(self, perm_dirs):
"""Test copy_file with unreadable source file."""
src_dir, dst_dir = perm_dirs
src_file = os.path.join(src_dir, "unreadable.txt")
dst_file = os.path.join(dst_dir, "copy.txt")
os.mkdir(dst_dir)
with open(src_file, "w") as f:
f.write("test")
os.chmod(src_file, 0o000)
with pytest.raises(PermissionError):
fs.copy_file(src_file, dst_file)
@pytest.mark.skip(reason="Fails until issue #1 is fixed")
def test_update_direntry_handles_permission_error(self, perm_dirs):
"""Test that update_direntry handles permission errors gracefully."""
src_dir, dst_dir = perm_dirs
src_file = os.path.join(src_dir, "file.txt")
dst_file = os.path.join(dst_dir, "file.txt")
os.mkdir(dst_dir)
with open(src_file, "w") as f:
f.write("source content")
with open(dst_file, "w") as f:
f.write("dest content")
os.chmod(src_file, 0o000)
src_entry = fs.PseudoDirEntry(src_file)
dst_entry = fs.PseudoDirEntry(dst_file)
try:
fs.update_direntry(src_entry, dst_entry)
except PermissionError:
pytest.fail("update_direntry crashed with PermissionError. "
"Should handle gracefully and log error.")
@pytest.mark.skip(reason="Fails until issue #1 is fixed")
def test_nest_hardlink_handles_permission_error(self, perm_dirs):
"""Test that nest_hardlink handles permission errors gracefully."""
src_dir, dst_dir = perm_dirs
subdir = os.path.join(src_dir, "subdir")
os.mkdir(subdir)
src_file = os.path.join(subdir, "file.txt")
with open(src_file, "w") as f:
f.write("test")
delta_dir = os.path.join(dst_dir, ".backup_delta")
os.mkdir(dst_dir)
os.mkdir(delta_dir)
os.chmod(delta_dir, 0o555)
try:
fs.nest_hardlink(src_dir, "subdir/file.txt", delta_dir)
except PermissionError:
pytest.fail("nest_hardlink crashed with PermissionError. "
"Should handle gracefully and log error.")
class TestParseRsyncOutput:
"""Test _parse_rsync_output() parsing of rsync --itemize-changes."""
def test_delete_file(self):
"""Parse deletion output."""
result = fs._parse_rsync_output("*deleting path/to/file.txt")
assert result == ("path/to/file.txt", fs.Actions.DELETE, "")
def test_delete_nested_path(self):
"""Parse deletion with nested directory structure."""
result = fs._parse_rsync_output("*deleting deeply/nested/dir/file.md")
assert result == ("deeply/nested/dir/file.md", fs.Actions.DELETE, "")
def test_create_file(self):
"""Parse file creation."""
result = fs._parse_rsync_output(">f+++++++++ new_file.txt")
assert result == ("new_file.txt", fs.Actions.CREATE, "")
def test_create_file_nested(self):
"""Parse file creation in subdirectory."""
result = fs._parse_rsync_output(">f+++++++++ subdir/another.log")
assert result == ("subdir/another.log", fs.Actions.CREATE, "")
def test_create_directory(self):
"""Parse directory creation."""
result = fs._parse_rsync_output("cd+++++++++ new_directory/")
assert result == ("new_directory/", fs.Actions.CREATE, "")
def test_create_symlink(self):
"""Parse symlink creation."""
result = fs._parse_rsync_output("cL+++++++++ link_to_file")
assert result == ("link_to_file", fs.Actions.CREATE, "")
def test_rewrite_file_size_change(self):
"""Parse file rewrite due to size change."""
result = fs._parse_rsync_output(">f.s...... modified_file.txt")
assert result == ("modified_file.txt", fs.Actions.REWRITE, "")
def test_rewrite_file_time_change(self):
"""Parse file rewrite due to time change."""
result = fs._parse_rsync_output(">f..t...... touched_file.dat")
assert result == ("touched_file.dat", fs.Actions.REWRITE, "")
def test_rewrite_file_size_and_time(self):
"""Parse file rewrite due to both size and time change."""
result = fs._parse_rsync_output(">f.st...... changed.bin")
assert result == ("changed.bin", fs.Actions.REWRITE, "")
def test_update_directory_time(self):
"""Parse directory time update."""
result = fs._parse_rsync_output(">d..t...... some_dir/")
assert result == ("some_dir/", fs.Actions.UPDATE_TIME, "")
def test_update_permissions(self):
"""Parse permission change."""
result = fs._parse_rsync_output(">f....p.... executable.sh")
assert result == ("executable.sh", fs.Actions.UPDATE_PERM, "")
def test_update_permissions_with_time_change(self):
"""Time change takes precedence over permission change."""
result = fs._parse_rsync_output(">f...tp.... script.py")
assert result == ("script.py", fs.Actions.REWRITE, "")
def test_update_owner(self):
"""Parse owner change."""
result = fs._parse_rsync_output(">f.....o... owned_file.txt")
assert result == ("owned_file.txt", fs.Actions.UPDATE_OWNER, "")
def test_update_group(self):
"""Parse group change."""
result = fs._parse_rsync_output(">f......g.. grouped_file.txt")
assert result == ("grouped_file.txt", fs.Actions.UPDATE_OWNER, "")
def test_update_owner_and_group(self):
"""Parse both owner and group change."""
result = fs._parse_rsync_output(">f.....og.. shared_file.txt")
assert result == ("shared_file.txt", fs.Actions.UPDATE_OWNER, "")
def test_invalid_format_raises_error(self):
"""Unparseable line should raise RuntimeError."""
with pytest.raises(RuntimeError, match="Not parsed string"):
fs._parse_rsync_output(">f......... unknown.txt")
def test_empty_change_string_raises_error(self):
"""Empty change indicators should raise RuntimeError."""
with pytest.raises(RuntimeError, match="Not parsed string"):
fs._parse_rsync_output(">f......... no_action.txt")