Add rsync func, logging and cli parser
This commit is contained in:
parent
50d2168b15
commit
d67550cb38
111
fs.py
Normal file
111
fs.py
Normal file
@ -0,0 +1,111 @@
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def rsync_ext(src, dst, dry_run=False):
|
||||
"""Call external rsync command"""
|
||||
rsync_args = ["rsync"]
|
||||
if dry_run:
|
||||
rsync_args.append("-n")
|
||||
rsync_args.append("-a") # archive
|
||||
rsync_args.append("-z") # compress
|
||||
rsync_args.append("-h") # human-readable
|
||||
rsync_args.append("-v") # verbose
|
||||
rsync_args.append("-u") # don't touch new files on receiver
|
||||
rsync_args.append("--progress")
|
||||
rsync_args.append("--del") # delete during
|
||||
rsync_args.append(src)
|
||||
rsync_args.append(dst)
|
||||
res = subprocess.run(rsync_args)
|
||||
return res
|
||||
|
||||
|
||||
def scantree(path) -> Iterable[os.DirEntry]:
|
||||
"""Recursively yield DirEntry file objects for given directory."""
|
||||
entry: os.DirEntry
|
||||
for entry in os.scandir(path):
|
||||
if entry.is_dir(follow_symlinks=False):
|
||||
yield from scantree(entry.path)
|
||||
else:
|
||||
yield entry
|
||||
|
||||
|
||||
def rsync(src_dir, dst_dir=None):
|
||||
"""
|
||||
Do sync
|
||||
:param src_dir: source dir
|
||||
:param dst_dir: dest dir, create if not exists
|
||||
:return: nothing
|
||||
"""
|
||||
|
||||
logging.info(f"Rsync: {src_dir} -> {dst_dir}")
|
||||
src_abs = os.path.abspath(src_dir)
|
||||
dst_abs = os.path.abspath(dst_dir)
|
||||
|
||||
if not os.path.isdir(src_abs):
|
||||
raise RuntimeError(f"Error during reading source directory: {src_abs}")
|
||||
if os.path.exists(dst_abs):
|
||||
if not os.path.isdir(dst_abs):
|
||||
raise RuntimeError(f"Destination path is not a directory: {dst_abs}")
|
||||
else:
|
||||
os.mkdir(dst_abs)
|
||||
|
||||
for src_entry in scantree(src_abs):
|
||||
rel_path = src_entry.path.removeprefix(src_abs + "/")
|
||||
dst_path = os.path.join(dst_abs, rel_path)
|
||||
src_stat = os.lstat(src_entry.path)
|
||||
dst_stat = os.lstat(dst_path)
|
||||
|
||||
do_update = False
|
||||
# check file size
|
||||
if src_stat.st_size != dst_stat.st_size:
|
||||
do_update = True
|
||||
# check modification time (mtime)
|
||||
if src_stat.st_mtime > dst_stat.st_mtime:
|
||||
do_update = True
|
||||
|
||||
if do_update:
|
||||
logging.info("Updating %s", src_entry)
|
||||
|
||||
|
||||
def hardlink_dir(src_dir, dst_dir):
|
||||
"""
|
||||
Make hardlink for a directory with all its content.
|
||||
:param src_dir: path to source directory
|
||||
:param dst_dir: path to target directory
|
||||
:return: None
|
||||
"""
|
||||
logging.info(f"Recursive hardlinking: {src_dir} -> {dst_dir}")
|
||||
src_abs = os.path.abspath(src_dir)
|
||||
dst_abs = os.path.abspath(dst_dir)
|
||||
|
||||
def recursive_hardlink(src, dst):
|
||||
logging.debug(f"Creating directory: {src} -> {dst}")
|
||||
os.mkdir(dst)
|
||||
|
||||
with os.scandir(src) as it:
|
||||
ent: os.DirEntry
|
||||
for ent in it:
|
||||
ent_dst_path = os.path.join(dst, ent.name)
|
||||
if ent.is_dir(follow_symlinks=False):
|
||||
recursive_hardlink(ent.path, ent_dst_path)
|
||||
continue
|
||||
if ent.is_file(follow_symlinks=False) or ent.is_symlink():
|
||||
logging.debug(f"Hardlink file: {ent.path} -> {ent_dst_path}")
|
||||
os.link(ent.path, ent_dst_path, follow_symlinks=False)
|
||||
continue
|
||||
# something that is not a file, symlink or directory
|
||||
raise NotImplementedError(ent.path)
|
||||
|
||||
if not os.path.isdir(src_abs):
|
||||
logging.error(f"Error reading source directory: {src_dir}")
|
||||
raise RuntimeError(f"Error reading source directory: {src_dir}")
|
||||
|
||||
if os.path.exists(dst_abs):
|
||||
logging.error(f"Destination already exists: {dst_dir}")
|
||||
raise RuntimeError(f"Destination already exists: {dst_dir}")
|
||||
|
||||
recursive_hardlink(src_abs, dst_abs)
|
||||
return
|
||||
76
main.py
76
main.py
@ -1,60 +1,40 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
def hardlink_dir(src_dir, dst_dir):
|
||||
"""
|
||||
Make hardlink for a directory with all its content.
|
||||
:param src_dir: path to source directory
|
||||
:param dst_dir: path to target directory
|
||||
:return: None
|
||||
"""
|
||||
logging.info(f'Recursive hardlinking: {src_dir} -> {dst_dir}')
|
||||
src_abs = os.path.abspath(src_dir)
|
||||
dst_abs = os.path.abspath(dst_dir)
|
||||
|
||||
def recursive_hardlink(src, dst):
|
||||
logging.debug(f'Creating directory: {src} -> {dst}')
|
||||
os.mkdir(dst)
|
||||
|
||||
with os.scandir(src) as it:
|
||||
for ent in it:
|
||||
ent_dst_path = os.path.join(dst, ent.name)
|
||||
if ent.is_dir(follow_symlinks=False):
|
||||
recursive_hardlink(ent.path, ent_dst_path)
|
||||
continue
|
||||
if ent.is_file(follow_symlinks=False) or ent.is_symlink():
|
||||
logging.debug(f"Hardlink file: {ent.path} -> {ent_dst_path}")
|
||||
os.link(ent.path, ent_dst_path, follow_symlinks=False)
|
||||
continue
|
||||
# something that is not a file, symlink or directory
|
||||
raise NotImplementedError(f"{ent.path}")
|
||||
|
||||
if not os.path.isdir(src_abs):
|
||||
logging.error(f"Error reading source directory: {src_dir}")
|
||||
raise RuntimeError(f"Error reading source directory: {src_dir}")
|
||||
|
||||
if os.path.exists(dst_abs):
|
||||
logging.error(f"Destination already exists: {dst_dir}")
|
||||
raise RuntimeError(f'Destination already exists: {dst_dir}')
|
||||
|
||||
recursive_hardlink(src_abs, dst_abs)
|
||||
return
|
||||
import fs
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: %s SRC DST' % sys.argv[0])
|
||||
return 1
|
||||
formatter = logging.Formatter("{asctime}|{levelname}|{message}", style="{")
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(formatter)
|
||||
|
||||
hardlink_dir(sys.argv[1], sys.argv[2])
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="cura-te-ipsum",
|
||||
description="cura-te-ipsum, my personal backup software.",
|
||||
)
|
||||
parser.add_argument("-V", "--version", action="version", version="%(prog)s 0.1")
|
||||
parser.add_argument("-v", "--verbose", action="store_true", default=False,
|
||||
help="print verbose information")
|
||||
parser.add_argument("-b", type=pathlib.Path, dest="backup_dir",
|
||||
metavar="BACKUP_DIR", required=True,
|
||||
help="directory, where all backups will be stored")
|
||||
parser.add_argument("src_dirs", nargs="+", metavar="SRC_DIR", type=pathlib.Path,
|
||||
help="directory, which should be backed up")
|
||||
args = parser.parse_args()
|
||||
|
||||
loglevel = logging.DEBUG if args.verbose else logging.INFO
|
||||
logging.basicConfig(level=loglevel, handlers=[console_handler])
|
||||
|
||||
logging.info(args.src_dirs)
|
||||
|
||||
# fs.hardlink_dir(sys.argv[1], sys.argv[2])
|
||||
# fs.rsync(sys.argv[1], sys.argv[2])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
||||
Loading…
Reference in New Issue
Block a user