Fix symlinks issue in nest_hardlink function

This commit is contained in:
Maks Snegov 2021-11-15 23:36:20 +03:00
parent 2631b78d4d
commit ae95856a58

View File

@ -8,7 +8,7 @@ import logging
import os
import subprocess
import sys
from typing import Iterable, Tuple
from typing import Iterable, Tuple, Union
_lg = logging.getLogger(__name__)
@ -33,19 +33,32 @@ class PseudoDirEntry:
self.path = os.path.realpath(path)
self.name = os.path.basename(self.path)
self._is_dir = None
self._is_file = None
self._is_symlink = None
self._stat = None
def __str__(self):
return self.name
def is_dir(self) -> bool:
def is_dir(self, follow_symlinks: bool = True) -> bool:
if self._is_dir is None:
self._is_dir = os.path.isdir(self.path)
return self._is_dir
def stat(self):
def is_file(self, follow_symlinks: bool = True) -> bool:
if self._is_file is None:
self._is_file = os.path.isfile(self.path)
return self._is_file
def is_symlink(self, follow_symlinks: bool = True) -> bool:
if self._is_symlink is None:
self._is_symlink = os.path.islink(self.path)
return self._is_symlink
def stat(self, follow_symlinks: bool = True):
if self._stat is None:
self._stat = os.lstat(self.path)
func = os.stat if follow_symlinks else os.lstat
self._stat = func(self.path)
return self._stat
@ -140,7 +153,7 @@ def scantree(path, dir_first=True) -> Iterable[os.DirEntry]:
yield entry
def rm_direntry(entry: os.DirEntry):
def rm_direntry(entry: Union[os.DirEntry, PseudoDirEntry]):
""" Recursively delete DirEntry (dir, file or symlink). """
if entry.is_file(follow_symlinks=False) or entry.is_symlink():
os.unlink(entry.path)
@ -180,38 +193,7 @@ def copy_file(src, dst):
pass
def copy_entity(src_path: str, dst_path: str):
""" Non-recursive fs entity (file, dir or symlink) copy. """
src_stat = os.lstat(src_path)
is_symlink = os.path.islink(src_path)
if os.path.isdir(src_path):
os.mkdir(dst_path)
elif is_symlink:
link_target = os.readlink(src_path)
os.symlink(link_target, dst_path)
else:
copy_file(src_path, dst_path)
if is_symlink:
# change symlink attributes only if supported by OS
if os.chown in os.supports_follow_symlinks:
os.chown(dst_path, src_stat.st_uid, src_stat.st_gid,
follow_symlinks=False)
if os.chmod in os.supports_follow_symlinks:
os.chmod(dst_path, src_stat.st_mode, follow_symlinks=False)
if os.utime in os.supports_follow_symlinks:
os.utime(dst_path, (src_stat.st_atime, src_stat.st_mtime),
follow_symlinks=False)
else:
os.chown(dst_path, src_stat.st_uid, src_stat.st_gid)
os.chmod(dst_path, src_stat.st_mode)
os.utime(dst_path, (src_stat.st_atime, src_stat.st_mtime))
def copy_direntry(entry: os.DirEntry, dst_path):
def copy_direntry(entry: Union[os.DirEntry, PseudoDirEntry], dst_path):
""" Non-recursive DirEntry (file, dir or symlink) copy. """
src_stat = entry.stat(follow_symlinks=False)
if entry.is_dir():
@ -506,16 +488,16 @@ def nest_hardlink(src_dir: str, src_relpath: str, dst_dir: str):
"""
Hardlink entity from (src_dir + src_relpath) to dst_dir preserving dir structure.
"""
_lg.debug("Nested hardlinking: %s/%s -> %s", src_dir, src_relpath, dst_dir)
_lg.debug("Nested hardlinking: %s%s%s -> %s", src_dir, os.path.sep, src_relpath, dst_dir)
src_dir_abs = os.path.abspath(src_dir)
src_full_path = os.path.join(src_dir_abs, src_relpath)
dst_dir_abs = os.path.abspath(dst_dir)
dst_full_path = os.path.join(dst_dir_abs, src_relpath)
# check source entity and destination directory
if not os.path.exists(src_full_path):
if not os.path.lexists(src_full_path):
raise RuntimeError("Error reading source entity: %s" % src_full_path)
if os.path.exists(dst_dir_abs):
if os.path.lexists(dst_dir_abs):
if not os.path.isdir(dst_dir_abs):
raise RuntimeError("Destination path is not a directory: %s"
% dst_dir_abs)
@ -523,12 +505,13 @@ def nest_hardlink(src_dir: str, src_relpath: str, dst_dir: str):
os.mkdir(dst_dir_abs)
# if destination entity exists, check it points to source entity
if os.path.exists(dst_full_path):
dst_entry = PseudoDirEntry(dst_full_path)
if os.path.lexists(dst_entry.path):
src_stat = os.lstat(src_full_path)
if os.path.samestat(src_stat, os.lstat(dst_full_path)):
if os.path.samestat(src_stat, dst_entry.stat()):
return
# remove otherwise
os.unlink(dst_full_path)
rm_direntry(dst_entry)
src_cur_path = src_dir_abs
dst_cur_path = dst_dir_abs
@ -537,4 +520,4 @@ def nest_hardlink(src_dir: str, src_relpath: str, dst_dir: str):
dst_cur_path = os.path.join(dst_cur_path, rel_part)
if os.path.exists(dst_cur_path):
continue
copy_entity(src_cur_path, dst_cur_path)
copy_direntry(PseudoDirEntry(src_cur_path), dst_cur_path)