From 7c54bf382337d4ef017428d3986ed93839dba86f Mon Sep 17 00:00:00 2001 From: Maks Snegov Date: Wed, 4 Feb 2026 22:48:29 -0800 Subject: [PATCH] Fix PseudoDirEntry follow_symlinks handling and add documentation - Fix follow_symlinks parameter being ignored in is_dir(), is_file() - Change from realpath() to abspath() to preserve symlinks - Add separate caches for stat() and lstat() results - Remove incorrect follow_symlinks param from is_symlink() - Add comprehensive docstring explaining purpose and design When follow_symlinks=False, methods now correctly return False for symlinks instead of following them. Previously all symlinks were resolved, breaking symlink-aware backup operations. Fixes #8 --- curateipsum/fs.py | 67 ++++++-- tests/test_pseudo_direntry.py | 277 ++++++++++++++++++++++++++++++++++ 2 files changed, 331 insertions(+), 13 deletions(-) create mode 100644 tests/test_pseudo_direntry.py diff --git a/curateipsum/fs.py b/curateipsum/fs.py index 0f84c3c..db69493 100644 --- a/curateipsum/fs.py +++ b/curateipsum/fs.py @@ -29,37 +29,78 @@ class Actions(enum.Enum): class PseudoDirEntry: + """ + Duck-typed os.DirEntry for paths that don't exist yet or when you need + DirEntry-like interface for arbitrary paths. + + Problem: os.DirEntry is created by os.scandir() and cannot be manually + constructed. But we need DirEntry-compatible objects for: + - Paths that will exist soon (new backup directories) + - Constructed paths (marker files) + - Uniform interface in functions accepting both real and future entries + + Why not just use strings? Functions like rm_direntry(), copy_direntry() + accept Union[os.DirEntry, PseudoDirEntry] and call .is_dir(), .stat() + methods. Using this class avoids branching on type throughout the codebase. + + Why not pathlib.Path? We heavily use os.scandir() which returns DirEntry + objects with cached stat info. PseudoDirEntry maintains API consistency + with minimal overhead. + + Example usage: + # Create entry for future backup directory + cur_backup = PseudoDirEntry("/backups/20260204_120000") + os.mkdir(cur_backup.path) + set_backup_marker(cur_backup) # accepts DirEntry-like object + + Caches stat results like real DirEntry to avoid repeated syscalls. + """ def __init__(self, path): - self.path = os.path.realpath(path) + # Use abspath, not realpath - realpath resolves symlinks + self.path = os.path.abspath(path) self.name = os.path.basename(self.path) self._is_dir = None self._is_file = None self._is_symlink = None - self._stat = None + # Cache both stat and lstat separately + self._stat_follow = None + self._stat_nofollow = None def __str__(self): return self.name def is_dir(self, follow_symlinks: bool = True) -> bool: - if self._is_dir is None: - self._is_dir = os.path.isdir(self.path) - return self._is_dir + if follow_symlinks: + if self._is_dir is None: + self._is_dir = os.path.isdir(self.path) + return self._is_dir + else: + # When not following symlinks, must return False if path is symlink + return os.path.isdir(self.path) and not os.path.islink(self.path) def is_file(self, follow_symlinks: bool = True) -> bool: - if self._is_file is None: - self._is_file = os.path.isfile(self.path) - return self._is_file + if follow_symlinks: + if self._is_file is None: + self._is_file = os.path.isfile(self.path) + return self._is_file + else: + # When not following symlinks, must return False if path is symlink + return os.path.isfile(self.path) and not os.path.islink(self.path) - def is_symlink(self, follow_symlinks: bool = True) -> bool: + def is_symlink(self) -> bool: if self._is_symlink is None: self._is_symlink = os.path.islink(self.path) return self._is_symlink def stat(self, follow_symlinks: bool = True): - if self._stat is None: - func = os.stat if follow_symlinks else os.lstat - self._stat = func(self.path) - return self._stat + if follow_symlinks: + if self._stat_follow is None: + self._stat_follow = os.stat(self.path) + return self._stat_follow + else: + if self._stat_nofollow is None: + self._stat_nofollow = os.lstat(self.path) + return self._stat_nofollow def _parse_rsync_output(line: str) -> Tuple[str, Actions, str]: diff --git a/tests/test_pseudo_direntry.py b/tests/test_pseudo_direntry.py new file mode 100644 index 0000000..04012a3 --- /dev/null +++ b/tests/test_pseudo_direntry.py @@ -0,0 +1,277 @@ +"""Tests for PseudoDirEntry class.""" +import os + +from curateipsum import fs + + +class TestPseudoDirEntry: + """Test PseudoDirEntry class behavior, especially follow_symlinks handling.""" + + def test_regular_file_is_file(self, tmp_path): + """Regular file should report as file, not dir or symlink.""" + filepath = tmp_path / "file.txt" + filepath.write_text("test") + + entry = fs.PseudoDirEntry(str(filepath)) + + assert entry.is_file(follow_symlinks=True) + assert entry.is_file(follow_symlinks=False) + assert not entry.is_dir(follow_symlinks=True) + assert not entry.is_dir(follow_symlinks=False) + assert not entry.is_symlink() + + def test_regular_dir_is_dir(self, tmp_path): + """Regular directory should report as dir, not file or symlink.""" + dirpath = tmp_path / "testdir" + dirpath.mkdir() + + entry = fs.PseudoDirEntry(str(dirpath)) + + assert entry.is_dir(follow_symlinks=True) + assert entry.is_dir(follow_symlinks=False) + assert not entry.is_file(follow_symlinks=True) + assert not entry.is_file(follow_symlinks=False) + assert not entry.is_symlink() + + def test_symlink_to_file_follow_true(self, tmp_path): + """Symlink to file with follow_symlinks=True should report as file.""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test") + linkpath.symlink_to(filepath) + + entry = fs.PseudoDirEntry(str(linkpath)) + + assert entry.is_file(follow_symlinks=True) + assert entry.is_symlink() + + def test_symlink_to_file_follow_false(self, tmp_path): + """Symlink to file with follow_symlinks=False should not report as file.""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test") + linkpath.symlink_to(filepath) + + entry = fs.PseudoDirEntry(str(linkpath)) + + assert not entry.is_file(follow_symlinks=False) + assert not entry.is_dir(follow_symlinks=False) + assert entry.is_symlink() + + def test_symlink_to_dir_follow_true(self, tmp_path): + """Symlink to dir with follow_symlinks=True should report as dir.""" + dirpath = tmp_path / "testdir" + linkpath = tmp_path / "linkdir" + dirpath.mkdir() + linkpath.symlink_to(dirpath) + + entry = fs.PseudoDirEntry(str(linkpath)) + + assert entry.is_dir(follow_symlinks=True) + assert entry.is_symlink() + + def test_symlink_to_dir_follow_false(self, tmp_path): + """Symlink to dir with follow_symlinks=False should not report as dir.""" + dirpath = tmp_path / "testdir" + linkpath = tmp_path / "linkdir" + dirpath.mkdir() + linkpath.symlink_to(dirpath) + + entry = fs.PseudoDirEntry(str(linkpath)) + + assert not entry.is_dir(follow_symlinks=False) + assert not entry.is_file(follow_symlinks=False) + assert entry.is_symlink() + + def test_matches_real_direntry_file(self, tmp_path): + """PseudoDirEntry should match real DirEntry for regular file.""" + filepath = tmp_path / "file.txt" + filepath.write_text("test") + + pseudo = fs.PseudoDirEntry(str(filepath)) + real = next(os.scandir(tmp_path)) + + assert pseudo.is_file(follow_symlinks=True) == real.is_file(follow_symlinks=True) + assert pseudo.is_file(follow_symlinks=False) == real.is_file(follow_symlinks=False) + assert pseudo.is_dir(follow_symlinks=True) == real.is_dir(follow_symlinks=True) + assert pseudo.is_dir(follow_symlinks=False) == real.is_dir(follow_symlinks=False) + assert pseudo.is_symlink() == real.is_symlink() + + def test_matches_real_direntry_symlink(self, tmp_path): + """PseudoDirEntry should match real DirEntry for symlink.""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test") + linkpath.symlink_to(filepath) + + pseudo = fs.PseudoDirEntry(str(linkpath)) + real = [e for e in os.scandir(tmp_path) if e.name == "link.txt"][0] + + assert pseudo.is_file(follow_symlinks=True) == real.is_file(follow_symlinks=True) + assert pseudo.is_file(follow_symlinks=False) == real.is_file(follow_symlinks=False) + assert pseudo.is_dir(follow_symlinks=True) == real.is_dir(follow_symlinks=True) + assert pseudo.is_dir(follow_symlinks=False) == real.is_dir(follow_symlinks=False) + assert pseudo.is_symlink() == real.is_symlink() + + def test_stat_follow_symlinks_true(self, tmp_path): + """stat(follow_symlinks=True) should return target's stat.""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test content") + linkpath.symlink_to(filepath) + + entry = fs.PseudoDirEntry(str(linkpath)) + stat_result = entry.stat(follow_symlinks=True) + + # should return file's size, not symlink size + assert stat_result.st_size == 12 + + def test_stat_follow_symlinks_false(self, tmp_path): + """stat(follow_symlinks=False) should return symlink's stat.""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test content") + linkpath.symlink_to(filepath) + + entry = fs.PseudoDirEntry(str(linkpath)) + stat_result = entry.stat(follow_symlinks=False) + + # should return symlink size, not file size + assert stat_result.st_size != 12 + + def test_stat_caching_separate(self, tmp_path): + """stat() should cache follow_symlinks=True/False separately.""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test content") + linkpath.symlink_to(filepath) + + entry = fs.PseudoDirEntry(str(linkpath)) + + stat_follow = entry.stat(follow_symlinks=True) + stat_nofollow = entry.stat(follow_symlinks=False) + + # should return different results + assert stat_follow.st_size == 12 + assert stat_nofollow.st_size != 12 + + # call again to verify caching works + assert entry.stat(follow_symlinks=True).st_size == 12 + assert entry.stat(follow_symlinks=False).st_size != 12 + + def test_name_and_path_attributes(self, tmp_path): + """Test that name and path attributes are set correctly.""" + filepath = tmp_path / "file.txt" + filepath.write_text("test") + + entry = fs.PseudoDirEntry(str(filepath)) + + assert entry.name == "file.txt" + assert entry.path == str(filepath) + + def test_str_representation(self, tmp_path): + """__str__ should return name.""" + filepath = tmp_path / "file.txt" + filepath.write_text("test") + + entry = fs.PseudoDirEntry(str(filepath)) + + assert str(entry) == "file.txt" + + def test_abspath_not_realpath(self, tmp_path): + """Constructor should use abspath, not realpath (preserves symlinks).""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test") + linkpath.symlink_to(filepath) + + entry = fs.PseudoDirEntry(str(linkpath)) + + # path should point to symlink, not resolved target + assert entry.path == str(linkpath) + assert entry.name == "link.txt" + assert entry.is_symlink() + + def test_caching_is_dir(self, tmp_path): + """is_dir() should cache result for follow_symlinks=True.""" + dirpath = tmp_path / "testdir" + dirpath.mkdir() + + entry = fs.PseudoDirEntry(str(dirpath)) + + # first call caches + assert entry.is_dir(follow_symlinks=True) + assert entry._is_dir is True + + # second call uses cache + assert entry.is_dir(follow_symlinks=True) + + def test_caching_is_file(self, tmp_path): + """is_file() should cache result for follow_symlinks=True.""" + filepath = tmp_path / "file.txt" + filepath.write_text("test") + + entry = fs.PseudoDirEntry(str(filepath)) + + # first call caches + assert entry.is_file(follow_symlinks=True) + assert entry._is_file is True + + # second call uses cache + assert entry.is_file(follow_symlinks=True) + + def test_caching_is_symlink(self, tmp_path): + """is_symlink() should cache result.""" + filepath = tmp_path / "file.txt" + linkpath = tmp_path / "link.txt" + filepath.write_text("test") + linkpath.symlink_to(filepath) + + entry = fs.PseudoDirEntry(str(linkpath)) + + # first call caches + assert entry.is_symlink() + assert entry._is_symlink is True + + # second call uses cache + assert entry.is_symlink() + + def test_broken_symlink_is_symlink(self, tmp_path): + """Broken symlink should still be detected as symlink.""" + linkpath = tmp_path / "broken_link" + linkpath.symlink_to("/nonexistent/path") + + entry = fs.PseudoDirEntry(str(linkpath)) + + assert entry.is_symlink() + assert not entry.is_file(follow_symlinks=False) + assert not entry.is_dir(follow_symlinks=False) + + def test_relative_path_converted_to_absolute(self, tmp_path): + """Relative paths should be converted to absolute.""" + filepath = tmp_path / "file.txt" + filepath.write_text("test") + + # change to tmp_path and use relative path + original_cwd = os.getcwd() + try: + os.chdir(tmp_path) + entry = fs.PseudoDirEntry("file.txt") + + assert os.path.isabs(entry.path) + assert entry.name == "file.txt" + finally: + os.chdir(original_cwd) + + def test_nonexistent_path_attributes(self, tmp_path): + """PseudoDirEntry should work for paths that don't exist yet.""" + filepath = tmp_path / "future_file.txt" + + # path doesn't exist yet + assert not filepath.exists() + + entry = fs.PseudoDirEntry(str(filepath)) + + # should still have name and path attributes + assert entry.name == "future_file.txt" + assert entry.path == str(filepath)