""" Module with filesystem-related functions. """ import enum import glob import logging import os import subprocess import sys from typing import Iterable, Tuple, Union _lg = logging.getLogger(__name__) class BackupCreationError(Exception): pass class Actions(enum.Enum): NOTHING = enum.auto() DELETE = enum.auto() REWRITE = enum.auto() UPDATE_TIME = enum.auto() UPDATE_PERM = enum.auto() UPDATE_OWNER = enum.auto() CREATE = enum.auto() ERROR = enum.auto() class PseudoDirEntry: def __init__(self, path): self.path = os.path.realpath(path) self.name = os.path.basename(self.path) self._is_dir = None self._is_file = None self._is_symlink = None self._stat = None def __str__(self): return self.name def is_dir(self, follow_symlinks: bool = True) -> bool: if self._is_dir is None: self._is_dir = os.path.isdir(self.path) return self._is_dir def is_file(self, follow_symlinks: bool = True) -> bool: if self._is_file is None: self._is_file = os.path.isfile(self.path) return self._is_file def is_symlink(self, follow_symlinks: bool = True) -> bool: if self._is_symlink is None: self._is_symlink = os.path.islink(self.path) return self._is_symlink def stat(self, follow_symlinks: bool = True): if self._stat is None: func = os.stat if follow_symlinks else os.lstat self._stat = func(self.path) return self._stat def _parse_rsync_output(line: str) -> Tuple[str, Actions, str]: action = None change_string, relpath = line.split(' ', maxsplit=1) if change_string == "*deleting": return relpath, Actions.DELETE, "" update_type = change_string[0] entity_type = change_string[1] change_type = change_string[2:] if update_type == "c" and entity_type in {"d", "L"} and "+" in change_type: action = Actions.CREATE elif update_type == ">" and entity_type == "f" and "+" in change_type: action = Actions.CREATE elif entity_type == "f" and ("s" in change_type or "t" in change_type): action = Actions.REWRITE elif entity_type == "d" and "t" in change_type: action = Actions.UPDATE_TIME elif "p" in change_type: action = Actions.UPDATE_PERM elif "o" in change_type or "g" in change_type: action = Actions.UPDATE_OWNER if action is None: raise RuntimeError("Not parsed string: %s" % line) return relpath, action, "" def rsync_ext(src, dst, dry_run=False) -> Iterable[Tuple[str, Actions, str]]: """ Call external rsync command for syncing files from src to dst. Yield (path, action, error message) tuples. """ rsync_args = ["rsync"] if dry_run: rsync_args.append("--dry-run") rsync_args.append("--archive") # rsync_args.append("--compress") # rsync_args.append("--inplace") rsync_args.append("--whole-file") rsync_args.append("--human-readable") rsync_args.append("--delete-during") rsync_args.append("--itemize-changes") rsync_args.append(f"{src}/") rsync_args.append(str(dst)) _lg.info("Executing external command: %s", " ".join(rsync_args)) process = subprocess.Popen(rsync_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) with process.stdout: prev_line = None for line in iter(process.stdout.readline, b""): _lg.debug("Rsync current line: %s", line) if prev_line is None: prev_line = line continue try: prev_line = prev_line.decode("utf-8").strip() # some issues with cyrillic in filenames except UnicodeDecodeError: _lg.error("Can't process rsync line: %s", prev_line) continue _lg.debug("Rsync itemize line: %s", prev_line) yield _parse_rsync_output(prev_line) prev_line = line try: prev_line = prev_line.decode("utf-8").strip() _lg.debug("Rsync itemize line: %s", prev_line) yield _parse_rsync_output(prev_line) # some issues with cyrillic in filenames except UnicodeDecodeError: _lg.error("Can't process rsync line: %s", prev_line) process.wait() def scantree(path, dir_first=True) -> Iterable[os.DirEntry]: """ Recursively yield DirEntry objects (dir/file/symlink) for given directory. """ entry: os.DirEntry with os.scandir(path) as scan_it: for entry in scan_it: if entry.is_dir(follow_symlinks=False): if dir_first: yield entry yield from scantree(entry.path, dir_first) if not dir_first: yield entry else: yield entry def rm_direntry(entry: Union[os.DirEntry, PseudoDirEntry]): """ Recursively delete DirEntry (dir/file/symlink). """ if entry.is_file(follow_symlinks=False) or entry.is_symlink(): os.unlink(entry.path) elif entry.is_dir(follow_symlinks=False): with os.scandir(entry.path) as it: child_entry: os.DirEntry for child_entry in it: rm_direntry(child_entry) os.rmdir(entry.path) try: O_BINARY = os.O_BINARY # Windows only except AttributeError: O_BINARY = 0 READ_FLAGS = os.O_RDONLY | O_BINARY WRITE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_TRUNC | O_BINARY BUFFER_SIZE = 128 * 1024 def copy_file(src, dst): """ Copy file from src to dst. Faster than shutil.copy. """ try: fin = os.open(src, READ_FLAGS) fstat = os.fstat(fin) fout = os.open(dst, WRITE_FLAGS, fstat.st_mode) for x in iter(lambda: os.read(fin, BUFFER_SIZE), b""): os.write(fout, x) finally: try: os.close(fout) except (OSError, UnboundLocalError): pass try: os.close(fin) except (OSError, UnboundLocalError): pass def copy_direntry(entry: Union[os.DirEntry, PseudoDirEntry], dst_path): """ Non-recursive DirEntry (file/dir/symlink) copy. """ src_stat = entry.stat(follow_symlinks=False) if entry.is_dir(): os.mkdir(dst_path) elif entry.is_symlink(): link_target = os.readlink(entry.path) os.symlink(link_target, dst_path) else: copy_file(entry.path, dst_path) if entry.is_symlink(): # change symlink attributes only if supported by OS if os.chown in os.supports_follow_symlinks: os.chown(dst_path, src_stat.st_uid, src_stat.st_gid, follow_symlinks=False) if os.chmod in os.supports_follow_symlinks: os.chmod(dst_path, src_stat.st_mode, follow_symlinks=False) if os.utime in os.supports_follow_symlinks: os.utime(dst_path, (src_stat.st_atime, src_stat.st_mtime), follow_symlinks=False) else: os.chown(dst_path, src_stat.st_uid, src_stat.st_gid) os.chmod(dst_path, src_stat.st_mode) os.utime(dst_path, (src_stat.st_atime, src_stat.st_mtime)) def update_direntry(src_entry: os.DirEntry, dst_entry: os.DirEntry): """ Make dst DirEntry (file/dir/symlink) same as src. If dst is directory, its content will be removed. Src dir content will not be copied into dst dir. """ rm_direntry(dst_entry) copy_direntry(src_entry, dst_entry.path) def rsync(src_dir, dst_dir, dry_run=False) -> Iterable[Tuple[str, Actions, str]]: """ Sync files/dirs/symlinks from src_dir to dst_dir. Yield (path, action, error message) tuples. Entries in dst_dir will be removed if not present in src_dir. Analog of 'rsync --delete -irltpog'. """ _lg.debug("Rsync: %s -> %s", src_dir, dst_dir) src_root_abs = os.path.abspath(src_dir) dst_root_abs = os.path.abspath(dst_dir) if not os.path.isdir(src_root_abs): raise BackupCreationError( "Error during reading source directory: %s" % src_root_abs ) if os.path.exists(dst_root_abs): if not os.path.isdir(dst_root_abs): raise BackupCreationError( "Destination path is not a directory: %s" % dst_root_abs ) else: os.mkdir(dst_root_abs) # Create source map {rel_path: dir_entry} src_files_map = { ent.path[len(src_root_abs) + 1:]: ent for ent in scantree(src_root_abs) } # process dst tree for dst_entry in scantree(dst_root_abs, dir_first=False): rel_path = dst_entry.path[len(dst_root_abs) + 1:] src_entry = src_files_map.get(rel_path) # remove dst entries not existing in source if src_entry is None: _lg.debug("Rsync, deleting: %s", rel_path) try: rm_direntry(dst_entry) yield rel_path, Actions.DELETE, "" continue except OSError as exc: raise BackupCreationError(exc) from exc # mark src entry as taken for processing del src_files_map[rel_path] src_entry: os.DirEntry # rewrite dst if it has different type from src if src_entry.is_file(follow_symlinks=False): if not dst_entry.is_file(follow_symlinks=False): _lg.debug("Rsync, rewriting" " (src is a file, dst is not a file): %s", rel_path) try: update_direntry(src_entry, dst_entry) yield rel_path, Actions.REWRITE, "" except OSError as exc: yield rel_path, Actions.ERROR, str(exc) continue if src_entry.is_dir(follow_symlinks=False): if not dst_entry.is_dir(follow_symlinks=False): _lg.debug("Rsync, rewriting" " (src is a dir, dst is not a dir): %s", rel_path) try: update_direntry(src_entry, dst_entry) yield rel_path, Actions.REWRITE, "" except OSError as exc: yield rel_path, Actions.ERROR, str(exc) continue if src_entry.is_symlink(): if not dst_entry.is_symlink(): _lg.debug("Rsync, rewriting" " (src is a symlink, dst is not a symlink): %s", rel_path) try: update_direntry(src_entry, dst_entry) yield rel_path, Actions.REWRITE, "" except OSError as exc: yield rel_path, Actions.ERROR, str(exc) continue # rewrite dst if it is hard link to src (bad for backups) if src_entry.inode() == dst_entry.inode(): _lg.debug("Rsync, rewriting (different inodes): %s", rel_path) try: update_direntry(src_entry, dst_entry) yield rel_path, Actions.REWRITE, "" except OSError as exc: yield rel_path, Actions.ERROR, str(exc) continue src_stat = src_entry.stat(follow_symlinks=False) dst_stat = dst_entry.stat(follow_symlinks=False) # rewrite dst file/symlink which have different size or mtime than src if src_entry.is_file(follow_symlinks=False): same_size = src_stat.st_size == dst_stat.st_size same_mtime = src_stat.st_mtime == dst_stat.st_mtime if not (same_size and same_mtime): reason = "size" if not same_size else "time" _lg.debug("Rsync, rewriting (different %s): %s", reason, rel_path) try: update_direntry(src_entry, dst_entry) yield rel_path, Actions.REWRITE, "" except OSError as exc: yield rel_path, Actions.ERROR, str(exc) continue # rewrite dst symlink if it points somewhere else than src if src_entry.is_symlink(): if os.readlink(src_entry.path) != os.readlink(dst_entry.path): _lg.debug("Rsync, rewriting (different symlink target): %s", rel_path) try: update_direntry(src_entry, dst_entry) yield rel_path, Actions.REWRITE, "" except OSError as exc: yield rel_path, Actions.ERROR, str(exc) continue # update permissions and ownership if src_stat.st_mode != dst_stat.st_mode: _lg.debug("Rsync, updating permissions: %s", rel_path) os.chmod(dst_entry.path, dst_stat.st_mode) yield rel_path, Actions.UPDATE_PERM, "" if (src_stat.st_uid != dst_stat.st_uid or src_stat.st_gid != dst_stat.st_gid): _lg.debug("Rsync, updating owners: %s", rel_path) os.chown(dst_entry.path, src_stat.st_uid, src_stat.st_gid) yield rel_path, Actions.UPDATE_OWNER, "" # process remained source entries (new files/dirs/symlinks) for rel_path, src_entry in src_files_map.items(): dst_path = os.path.join(dst_root_abs, rel_path) _lg.debug("Rsync, creating: %s", rel_path) try: copy_direntry(src_entry, dst_path) yield rel_path, Actions.CREATE, "" except OSError as exc: yield rel_path, Actions.ERROR, str(exc) # restore dir mtimes in dst, updated by updating files for src_entry in scantree(src_root_abs, dir_first=True): if not src_entry.is_dir(): continue rel_path = src_entry.path[len(src_root_abs) + 1:] dst_path = os.path.join(dst_root_abs, rel_path) src_stat = src_entry.stat(follow_symlinks=False) dst_stat = os.lstat(dst_path) if src_stat.st_mtime != dst_stat.st_mtime: _lg.debug("Rsync, restoring directory mtime: %s", dst_path) os.utime(dst_path, (src_stat.st_atime, src_stat.st_mtime), follow_symlinks=False) # restore dst_root dir mtime src_root_stat = os.lstat(src_root_abs) dst_root_stat = os.lstat(dst_root_abs) if src_root_stat.st_mtime != dst_root_stat.st_mtime: _lg.debug("Rsync, restoring root directory mtime: %s", dst_root_abs) os.utime(dst_root_abs, (src_root_stat.st_atime, src_root_stat.st_mtime), follow_symlinks=False) def _recursive_hardlink_ext(src: str, dst: str) -> bool: """ Make hardlink for a directory using cp -al. Both src and dst should exist. :param src: absolute path to source directory. :param dst: absolute path to target directory. :return: success or not """ if sys.platform == "darwin": cp = "gcp" else: cp = "cp" src_content = glob.glob(f"{src}/*") cmd = [cp, "--archive", "--verbose", "--link", *src_content, dst] _lg.info("Executing external command: %s", " ".join(cmd)) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) with process.stdout: for line in iter(process.stdout.readline, b""): _lg.debug("%s: %s", cp, line.decode("utf-8").strip()) exitcode = process.wait() return not bool(exitcode) def _recursive_hardlink(src: str, dst: str) -> bool: """ Do hardlink directory recursively using python only. Both src and dst directories should exist. :param src: absolute path to source directory. :param dst: absolute path to target directory. :return: True if success, False otherwise. """ with os.scandir(src) as it: ent: os.DirEntry for ent in it: ent_dst_path = os.path.join(dst, ent.name) if ent.is_dir(follow_symlinks=False): _lg.debug("Hardlink, copying directory: %s -> %s", ent.path, ent_dst_path) os.mkdir(ent_dst_path) # process directory children _recursive_hardlink(ent.path, ent_dst_path) # save directory's metainfo ent_stat = ent.stat(follow_symlinks=False) os.chown(ent_dst_path, ent_stat.st_uid, ent_stat.st_gid) os.chmod(ent_dst_path, ent_stat.st_mode) os.utime(ent_dst_path, (ent_stat.st_atime, ent_stat.st_mtime)) continue if ent.is_file(follow_symlinks=False) or ent.is_symlink(): _lg.debug("Hardlink, creating link for file: %s -> %s", ent.path, ent_dst_path) os.link(ent.path, ent_dst_path, follow_symlinks=False) continue # something that is not a file, symlink or directory raise NotImplementedError(ent.path) return True def hardlink_dir(src_dir, dst_dir, use_external: bool = False) -> bool: """ Make hardlink for a directory with all its content. :param src_dir: path to source directory :param dst_dir: path to target directory :param use_external: whether to use external cp -al command :return: True if success, False otherwise. """ _lg.debug("Recursive hardlinking: %s -> %s", src_dir, dst_dir) src_abs = os.path.abspath(src_dir) dst_abs = os.path.abspath(dst_dir) if not os.path.isdir(src_abs): raise RuntimeError(f"Error reading source directory: {src_dir}") if os.path.exists(dst_abs): raise RuntimeError(f"Destination already exists: {dst_dir}") _lg.debug("Hardlink, creating directory: %s", dst_abs) os.mkdir(dst_abs) hardlink_func = (_recursive_hardlink_ext if use_external else _recursive_hardlink) return hardlink_func(src_abs, dst_abs) def nest_hardlink(src_dir: str, src_relpath: str, dst_dir: str): """ Hardlink entity from (src_dir + src_relpath) to dst_dir preserving dir structure of src_relpath. """ _lg.debug("Nested hardlinking: %s%s%s -> %s", src_dir, os.path.sep, src_relpath, dst_dir) src_dir_abs = os.path.abspath(src_dir) src_full_path = os.path.join(src_dir_abs, src_relpath) dst_dir_abs = os.path.abspath(dst_dir) dst_full_path = os.path.join(dst_dir_abs, src_relpath) # check source entity and destination directory if not os.path.lexists(src_full_path): raise RuntimeError("Error reading source entity: %s" % src_full_path) if os.path.lexists(dst_dir_abs): if not os.path.isdir(dst_dir_abs): raise RuntimeError("Destination path is not a directory: %s" % dst_dir_abs) else: os.mkdir(dst_dir_abs) # if destination entity exists, check it points to source entity dst_entry = PseudoDirEntry(dst_full_path) if os.path.lexists(dst_entry.path): src_stat = os.lstat(src_full_path) if os.path.samestat(src_stat, dst_entry.stat()): return # remove otherwise rm_direntry(dst_entry) src_cur_path = src_dir_abs dst_cur_path = dst_dir_abs for rel_part in src_relpath.split(sep=os.path.sep): src_cur_path = os.path.join(src_cur_path, rel_part) dst_cur_path = os.path.join(dst_cur_path, rel_part) if os.path.exists(dst_cur_path): continue copy_direntry(PseudoDirEntry(src_cur_path), dst_cur_path)