Rework hardlink_dir, create initiate_backup
This commit is contained in:
72
spqr/curateipsum/backup.py
Normal file
72
spqr/curateipsum/backup.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""
|
||||
Module with backup functions.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from spqr.curateipsum.fs import hardlink_dir, rsync
|
||||
|
||||
BACKUP_ENT_FMT = "%y%m%d_%H%M"
|
||||
_lg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _is_backup_entity(entity_path: pathlib.Path) -> bool:
|
||||
""" Check if entity_path is a single backup dir. """
|
||||
try:
|
||||
datetime.strptime(entity_path.name, BACKUP_ENT_FMT)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def _get_latest_backup(backup_dir: pathlib.Path) -> Optional[pathlib.Path]:
|
||||
""" Returns path to latest backup created in backup_dir or None. """
|
||||
backups = sorted(os.listdir(backup_dir), reverse=True)
|
||||
|
||||
for b_ent in backups:
|
||||
b_ent_abs = pathlib.Path(os.path.join(backup_dir, b_ent))
|
||||
|
||||
if not _is_backup_entity(b_ent_abs):
|
||||
continue
|
||||
|
||||
if not os.listdir(b_ent_abs):
|
||||
_lg.info("Removing empty backup entity: %s", b_ent_abs.name)
|
||||
_lg.debug("Removing directory %s", b_ent_abs)
|
||||
os.rmdir(b_ent_abs)
|
||||
continue
|
||||
|
||||
return b_ent_abs
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def initiate_backup(sources, backup_dir: pathlib.Path):
|
||||
""" Main backup function """
|
||||
|
||||
cur_backup = pathlib.Path(
|
||||
os.path.join(backup_dir, datetime.now().strftime(BACKUP_ENT_FMT))
|
||||
)
|
||||
_lg.debug("Current backup dir: %s", cur_backup)
|
||||
|
||||
latest_backup = _get_latest_backup(backup_dir)
|
||||
if cur_backup == latest_backup:
|
||||
_lg.warning(
|
||||
"Latest backup %s was created less than minute ago, exiting",
|
||||
latest_backup.name,
|
||||
)
|
||||
return
|
||||
|
||||
if latest_backup is None:
|
||||
_lg.info("Creating empty directory for current backup: %s", cur_backup.name)
|
||||
os.mkdir(cur_backup)
|
||||
else:
|
||||
_lg.info(
|
||||
"Copying data from latest backup %s to current backup %s",
|
||||
latest_backup.name,
|
||||
cur_backup.name,
|
||||
)
|
||||
hardlink_dir(latest_backup, cur_backup)
|
||||
@@ -1,5 +1,10 @@
|
||||
"""
|
||||
Module with filesystem-related functions.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import subprocess
|
||||
from typing import Iterable
|
||||
|
||||
@@ -29,6 +34,7 @@ def scantree(path) -> Iterable[os.DirEntry]:
|
||||
entry: os.DirEntry
|
||||
for entry in os.scandir(path):
|
||||
if entry.is_dir(follow_symlinks=False):
|
||||
yield entry
|
||||
yield from scantree(entry.path)
|
||||
else:
|
||||
yield entry
|
||||
@@ -84,14 +90,18 @@ def hardlink_dir(src_dir, dst_dir):
|
||||
dst_abs = os.path.abspath(dst_dir)
|
||||
|
||||
def recursive_hardlink(src, dst):
|
||||
_lg.debug(f"Creating directory: {src} -> {dst}")
|
||||
os.mkdir(dst)
|
||||
|
||||
with os.scandir(src) as it:
|
||||
ent: os.DirEntry
|
||||
for ent in it:
|
||||
ent_dst_path = os.path.join(dst, ent.name)
|
||||
if ent.is_dir(follow_symlinks=False):
|
||||
_lg.debug(f"Copying directory: {ent.path} -> {ent_dst_path}")
|
||||
os.mkdir(ent_dst_path)
|
||||
ent_stat = ent.stat(follow_symlinks=False)
|
||||
os.chown(ent_dst_path, ent_stat.st_uid, ent_stat.st_gid)
|
||||
os.chmod(ent_dst_path, ent_stat.st_mode)
|
||||
|
||||
# process directory children
|
||||
recursive_hardlink(ent.path, ent_dst_path)
|
||||
continue
|
||||
if ent.is_file(follow_symlinks=False) or ent.is_symlink():
|
||||
@@ -109,5 +119,7 @@ def hardlink_dir(src_dir, dst_dir):
|
||||
_lg.error(f"Destination already exists: {dst_dir}")
|
||||
raise RuntimeError(f"Destination already exists: {dst_dir}")
|
||||
|
||||
_lg.debug(f"Creating directory: {dst_abs}")
|
||||
os.mkdir(dst_abs)
|
||||
recursive_hardlink(src_abs, dst_abs)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user