Fix PseudoDirEntry follow_symlinks handling and add documentation

- Fix follow_symlinks parameter being ignored in is_dir(), is_file()
- Change from realpath() to abspath() to preserve symlinks
- Add separate caches for stat() and lstat() results
- Remove incorrect follow_symlinks param from is_symlink()
- Add comprehensive docstring explaining purpose and design

When follow_symlinks=False, methods now correctly return False for
symlinks instead of following them. Previously all symlinks were
resolved, breaking symlink-aware backup operations.

Fixes #8
This commit is contained in:
2026-02-04 22:48:29 -08:00
parent 52b4fef814
commit 04ade89b96
2 changed files with 333 additions and 13 deletions

View File

@@ -29,37 +29,78 @@ class Actions(enum.Enum):
class PseudoDirEntry: class PseudoDirEntry:
"""
Duck-typed os.DirEntry for paths that don't exist yet or when you need
DirEntry-like interface for arbitrary paths.
Problem: os.DirEntry is created by os.scandir() and cannot be manually
constructed. But we need DirEntry-compatible objects for:
- Paths that will exist soon (new backup directories)
- Constructed paths (marker files)
- Uniform interface in functions accepting both real and future entries
Why not just use strings? Functions like rm_direntry(), copy_direntry()
accept Union[os.DirEntry, PseudoDirEntry] and call .is_dir(), .stat()
methods. Using this class avoids branching on type throughout the codebase.
Why not pathlib.Path? We heavily use os.scandir() which returns DirEntry
objects with cached stat info. PseudoDirEntry maintains API consistency
with minimal overhead.
Example usage:
# Create entry for future backup directory
cur_backup = PseudoDirEntry("/backups/20260204_120000")
os.mkdir(cur_backup.path)
set_backup_marker(cur_backup) # accepts DirEntry-like object
Caches stat results like real DirEntry to avoid repeated syscalls.
"""
def __init__(self, path): def __init__(self, path):
self.path = os.path.realpath(path) # Use abspath, not realpath - realpath resolves symlinks
self.path = os.path.abspath(path)
self.name = os.path.basename(self.path) self.name = os.path.basename(self.path)
self._is_dir = None self._is_dir = None
self._is_file = None self._is_file = None
self._is_symlink = None self._is_symlink = None
self._stat = None # Cache both stat and lstat separately
self._stat_follow = None
self._stat_nofollow = None
def __str__(self): def __str__(self):
return self.name return self.name
def is_dir(self, follow_symlinks: bool = True) -> bool: def is_dir(self, follow_symlinks: bool = True) -> bool:
if self._is_dir is None: if follow_symlinks:
self._is_dir = os.path.isdir(self.path) if self._is_dir is None:
return self._is_dir self._is_dir = os.path.isdir(self.path)
return self._is_dir
else:
# When not following symlinks, must return False if path is symlink
return os.path.isdir(self.path) and not os.path.islink(self.path)
def is_file(self, follow_symlinks: bool = True) -> bool: def is_file(self, follow_symlinks: bool = True) -> bool:
if self._is_file is None: if follow_symlinks:
self._is_file = os.path.isfile(self.path) if self._is_file is None:
return self._is_file self._is_file = os.path.isfile(self.path)
return self._is_file
else:
# When not following symlinks, must return False if path is symlink
return os.path.isfile(self.path) and not os.path.islink(self.path)
def is_symlink(self, follow_symlinks: bool = True) -> bool: def is_symlink(self) -> bool:
if self._is_symlink is None: if self._is_symlink is None:
self._is_symlink = os.path.islink(self.path) self._is_symlink = os.path.islink(self.path)
return self._is_symlink return self._is_symlink
def stat(self, follow_symlinks: bool = True): def stat(self, follow_symlinks: bool = True):
if self._stat is None: if follow_symlinks:
func = os.stat if follow_symlinks else os.lstat if self._stat_follow is None:
self._stat = func(self.path) self._stat_follow = os.stat(self.path)
return self._stat return self._stat_follow
else:
if self._stat_nofollow is None:
self._stat_nofollow = os.lstat(self.path)
return self._stat_nofollow
def _parse_rsync_output(line: str) -> Tuple[str, Actions, str]: def _parse_rsync_output(line: str) -> Tuple[str, Actions, str]:

View File

@@ -0,0 +1,279 @@
"""Tests for PseudoDirEntry class."""
import os
import pytest
from curateipsum import fs
class TestPseudoDirEntry:
"""Test PseudoDirEntry class behavior, especially follow_symlinks handling."""
def test_regular_file_is_file(self, tmp_path):
"""Regular file should report as file, not dir or symlink."""
filepath = tmp_path / "file.txt"
filepath.write_text("test")
entry = fs.PseudoDirEntry(str(filepath))
assert entry.is_file(follow_symlinks=True)
assert entry.is_file(follow_symlinks=False)
assert not entry.is_dir(follow_symlinks=True)
assert not entry.is_dir(follow_symlinks=False)
assert not entry.is_symlink()
def test_regular_dir_is_dir(self, tmp_path):
"""Regular directory should report as dir, not file or symlink."""
dirpath = tmp_path / "testdir"
dirpath.mkdir()
entry = fs.PseudoDirEntry(str(dirpath))
assert entry.is_dir(follow_symlinks=True)
assert entry.is_dir(follow_symlinks=False)
assert not entry.is_file(follow_symlinks=True)
assert not entry.is_file(follow_symlinks=False)
assert not entry.is_symlink()
def test_symlink_to_file_follow_true(self, tmp_path):
"""Symlink to file with follow_symlinks=True should report as file."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test")
linkpath.symlink_to(filepath)
entry = fs.PseudoDirEntry(str(linkpath))
assert entry.is_file(follow_symlinks=True)
assert entry.is_symlink()
def test_symlink_to_file_follow_false(self, tmp_path):
"""Symlink to file with follow_symlinks=False should not report as file."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test")
linkpath.symlink_to(filepath)
entry = fs.PseudoDirEntry(str(linkpath))
assert not entry.is_file(follow_symlinks=False)
assert not entry.is_dir(follow_symlinks=False)
assert entry.is_symlink()
def test_symlink_to_dir_follow_true(self, tmp_path):
"""Symlink to dir with follow_symlinks=True should report as dir."""
dirpath = tmp_path / "testdir"
linkpath = tmp_path / "linkdir"
dirpath.mkdir()
linkpath.symlink_to(dirpath)
entry = fs.PseudoDirEntry(str(linkpath))
assert entry.is_dir(follow_symlinks=True)
assert entry.is_symlink()
def test_symlink_to_dir_follow_false(self, tmp_path):
"""Symlink to dir with follow_symlinks=False should not report as dir."""
dirpath = tmp_path / "testdir"
linkpath = tmp_path / "linkdir"
dirpath.mkdir()
linkpath.symlink_to(dirpath)
entry = fs.PseudoDirEntry(str(linkpath))
assert not entry.is_dir(follow_symlinks=False)
assert not entry.is_file(follow_symlinks=False)
assert entry.is_symlink()
def test_matches_real_direntry_file(self, tmp_path):
"""PseudoDirEntry should match real DirEntry for regular file."""
filepath = tmp_path / "file.txt"
filepath.write_text("test")
pseudo = fs.PseudoDirEntry(str(filepath))
real = next(os.scandir(tmp_path))
assert pseudo.is_file(follow_symlinks=True) == real.is_file(follow_symlinks=True)
assert pseudo.is_file(follow_symlinks=False) == real.is_file(follow_symlinks=False)
assert pseudo.is_dir(follow_symlinks=True) == real.is_dir(follow_symlinks=True)
assert pseudo.is_dir(follow_symlinks=False) == real.is_dir(follow_symlinks=False)
assert pseudo.is_symlink() == real.is_symlink()
def test_matches_real_direntry_symlink(self, tmp_path):
"""PseudoDirEntry should match real DirEntry for symlink."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test")
linkpath.symlink_to(filepath)
pseudo = fs.PseudoDirEntry(str(linkpath))
real = [e for e in os.scandir(tmp_path) if e.name == "link.txt"][0]
assert pseudo.is_file(follow_symlinks=True) == real.is_file(follow_symlinks=True)
assert pseudo.is_file(follow_symlinks=False) == real.is_file(follow_symlinks=False)
assert pseudo.is_dir(follow_symlinks=True) == real.is_dir(follow_symlinks=True)
assert pseudo.is_dir(follow_symlinks=False) == real.is_dir(follow_symlinks=False)
assert pseudo.is_symlink() == real.is_symlink()
def test_stat_follow_symlinks_true(self, tmp_path):
"""stat(follow_symlinks=True) should return target's stat."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test content")
linkpath.symlink_to(filepath)
entry = fs.PseudoDirEntry(str(linkpath))
stat_result = entry.stat(follow_symlinks=True)
# should return file's size, not symlink size
assert stat_result.st_size == 12
def test_stat_follow_symlinks_false(self, tmp_path):
"""stat(follow_symlinks=False) should return symlink's stat."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test content")
linkpath.symlink_to(filepath)
entry = fs.PseudoDirEntry(str(linkpath))
stat_result = entry.stat(follow_symlinks=False)
# should return symlink size, not file size
assert stat_result.st_size != 12
def test_stat_caching_separate(self, tmp_path):
"""stat() should cache follow_symlinks=True/False separately."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test content")
linkpath.symlink_to(filepath)
entry = fs.PseudoDirEntry(str(linkpath))
stat_follow = entry.stat(follow_symlinks=True)
stat_nofollow = entry.stat(follow_symlinks=False)
# should return different results
assert stat_follow.st_size == 12
assert stat_nofollow.st_size != 12
# call again to verify caching works
assert entry.stat(follow_symlinks=True).st_size == 12
assert entry.stat(follow_symlinks=False).st_size != 12
def test_name_and_path_attributes(self, tmp_path):
"""Test that name and path attributes are set correctly."""
filepath = tmp_path / "file.txt"
filepath.write_text("test")
entry = fs.PseudoDirEntry(str(filepath))
assert entry.name == "file.txt"
assert entry.path == str(filepath)
def test_str_representation(self, tmp_path):
"""__str__ should return name."""
filepath = tmp_path / "file.txt"
filepath.write_text("test")
entry = fs.PseudoDirEntry(str(filepath))
assert str(entry) == "file.txt"
def test_abspath_not_realpath(self, tmp_path):
"""Constructor should use abspath, not realpath (preserves symlinks)."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test")
linkpath.symlink_to(filepath)
entry = fs.PseudoDirEntry(str(linkpath))
# path should point to symlink, not resolved target
assert entry.path == str(linkpath)
assert entry.name == "link.txt"
assert entry.is_symlink()
def test_caching_is_dir(self, tmp_path):
"""is_dir() should cache result for follow_symlinks=True."""
dirpath = tmp_path / "testdir"
dirpath.mkdir()
entry = fs.PseudoDirEntry(str(dirpath))
# first call caches
assert entry.is_dir(follow_symlinks=True)
assert entry._is_dir is True
# second call uses cache
assert entry.is_dir(follow_symlinks=True)
def test_caching_is_file(self, tmp_path):
"""is_file() should cache result for follow_symlinks=True."""
filepath = tmp_path / "file.txt"
filepath.write_text("test")
entry = fs.PseudoDirEntry(str(filepath))
# first call caches
assert entry.is_file(follow_symlinks=True)
assert entry._is_file is True
# second call uses cache
assert entry.is_file(follow_symlinks=True)
def test_caching_is_symlink(self, tmp_path):
"""is_symlink() should cache result."""
filepath = tmp_path / "file.txt"
linkpath = tmp_path / "link.txt"
filepath.write_text("test")
linkpath.symlink_to(filepath)
entry = fs.PseudoDirEntry(str(linkpath))
# first call caches
assert entry.is_symlink()
assert entry._is_symlink is True
# second call uses cache
assert entry.is_symlink()
def test_broken_symlink_is_symlink(self, tmp_path):
"""Broken symlink should still be detected as symlink."""
linkpath = tmp_path / "broken_link"
linkpath.symlink_to("/nonexistent/path")
entry = fs.PseudoDirEntry(str(linkpath))
assert entry.is_symlink()
assert not entry.is_file(follow_symlinks=False)
assert not entry.is_dir(follow_symlinks=False)
def test_relative_path_converted_to_absolute(self, tmp_path):
"""Relative paths should be converted to absolute."""
filepath = tmp_path / "file.txt"
filepath.write_text("test")
# change to tmp_path and use relative path
original_cwd = os.getcwd()
try:
os.chdir(tmp_path)
entry = fs.PseudoDirEntry("file.txt")
assert os.path.isabs(entry.path)
assert entry.name == "file.txt"
finally:
os.chdir(original_cwd)
def test_nonexistent_path_attributes(self, tmp_path):
"""PseudoDirEntry should work for paths that don't exist yet."""
filepath = tmp_path / "future_file.txt"
# path doesn't exist yet
assert not filepath.exists()
entry = fs.PseudoDirEntry(str(filepath))
# should still have name and path attributes
assert entry.name == "future_file.txt"
assert entry.path == str(filepath)