From ae95856a582d165d7880943f5c4b4e5fb8eccb95 Mon Sep 17 00:00:00 2001 From: Maks Snegov Date: Mon, 15 Nov 2021 23:36:20 +0300 Subject: [PATCH] Fix symlinks issue in nest_hardlink function --- curateipsum/fs.py | 71 ++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 44 deletions(-) diff --git a/curateipsum/fs.py b/curateipsum/fs.py index 12b0f92..ca57cdb 100644 --- a/curateipsum/fs.py +++ b/curateipsum/fs.py @@ -8,7 +8,7 @@ import logging import os import subprocess import sys -from typing import Iterable, Tuple +from typing import Iterable, Tuple, Union _lg = logging.getLogger(__name__) @@ -33,19 +33,32 @@ class PseudoDirEntry: self.path = os.path.realpath(path) self.name = os.path.basename(self.path) self._is_dir = None + self._is_file = None + self._is_symlink = None self._stat = None def __str__(self): return self.name - def is_dir(self) -> bool: + def is_dir(self, follow_symlinks: bool = True) -> bool: if self._is_dir is None: self._is_dir = os.path.isdir(self.path) return self._is_dir - def stat(self): + def is_file(self, follow_symlinks: bool = True) -> bool: + if self._is_file is None: + self._is_file = os.path.isfile(self.path) + return self._is_file + + def is_symlink(self, follow_symlinks: bool = True) -> bool: + if self._is_symlink is None: + self._is_symlink = os.path.islink(self.path) + return self._is_symlink + + def stat(self, follow_symlinks: bool = True): if self._stat is None: - self._stat = os.lstat(self.path) + func = os.stat if follow_symlinks else os.lstat + self._stat = func(self.path) return self._stat @@ -140,7 +153,7 @@ def scantree(path, dir_first=True) -> Iterable[os.DirEntry]: yield entry -def rm_direntry(entry: os.DirEntry): +def rm_direntry(entry: Union[os.DirEntry, PseudoDirEntry]): """ Recursively delete DirEntry (dir, file or symlink). """ if entry.is_file(follow_symlinks=False) or entry.is_symlink(): os.unlink(entry.path) @@ -180,38 +193,7 @@ def copy_file(src, dst): pass -def copy_entity(src_path: str, dst_path: str): - """ Non-recursive fs entity (file, dir or symlink) copy. """ - src_stat = os.lstat(src_path) - is_symlink = os.path.islink(src_path) - - if os.path.isdir(src_path): - os.mkdir(dst_path) - - elif is_symlink: - link_target = os.readlink(src_path) - os.symlink(link_target, dst_path) - - else: - copy_file(src_path, dst_path) - - if is_symlink: - # change symlink attributes only if supported by OS - if os.chown in os.supports_follow_symlinks: - os.chown(dst_path, src_stat.st_uid, src_stat.st_gid, - follow_symlinks=False) - if os.chmod in os.supports_follow_symlinks: - os.chmod(dst_path, src_stat.st_mode, follow_symlinks=False) - if os.utime in os.supports_follow_symlinks: - os.utime(dst_path, (src_stat.st_atime, src_stat.st_mtime), - follow_symlinks=False) - else: - os.chown(dst_path, src_stat.st_uid, src_stat.st_gid) - os.chmod(dst_path, src_stat.st_mode) - os.utime(dst_path, (src_stat.st_atime, src_stat.st_mtime)) - - -def copy_direntry(entry: os.DirEntry, dst_path): +def copy_direntry(entry: Union[os.DirEntry, PseudoDirEntry], dst_path): """ Non-recursive DirEntry (file, dir or symlink) copy. """ src_stat = entry.stat(follow_symlinks=False) if entry.is_dir(): @@ -506,16 +488,16 @@ def nest_hardlink(src_dir: str, src_relpath: str, dst_dir: str): """ Hardlink entity from (src_dir + src_relpath) to dst_dir preserving dir structure. """ - _lg.debug("Nested hardlinking: %s/%s -> %s", src_dir, src_relpath, dst_dir) + _lg.debug("Nested hardlinking: %s%s%s -> %s", src_dir, os.path.sep, src_relpath, dst_dir) src_dir_abs = os.path.abspath(src_dir) src_full_path = os.path.join(src_dir_abs, src_relpath) dst_dir_abs = os.path.abspath(dst_dir) dst_full_path = os.path.join(dst_dir_abs, src_relpath) # check source entity and destination directory - if not os.path.exists(src_full_path): + if not os.path.lexists(src_full_path): raise RuntimeError("Error reading source entity: %s" % src_full_path) - if os.path.exists(dst_dir_abs): + if os.path.lexists(dst_dir_abs): if not os.path.isdir(dst_dir_abs): raise RuntimeError("Destination path is not a directory: %s" % dst_dir_abs) @@ -523,12 +505,13 @@ def nest_hardlink(src_dir: str, src_relpath: str, dst_dir: str): os.mkdir(dst_dir_abs) # if destination entity exists, check it points to source entity - if os.path.exists(dst_full_path): + dst_entry = PseudoDirEntry(dst_full_path) + if os.path.lexists(dst_entry.path): src_stat = os.lstat(src_full_path) - if os.path.samestat(src_stat, os.lstat(dst_full_path)): + if os.path.samestat(src_stat, dst_entry.stat()): return # remove otherwise - os.unlink(dst_full_path) + rm_direntry(dst_entry) src_cur_path = src_dir_abs dst_cur_path = dst_dir_abs @@ -537,4 +520,4 @@ def nest_hardlink(src_dir: str, src_relpath: str, dst_dir: str): dst_cur_path = os.path.join(dst_cur_path, rel_part) if os.path.exists(dst_cur_path): continue - copy_entity(src_cur_path, dst_cur_path) + copy_direntry(PseudoDirEntry(src_cur_path), dst_cur_path)