cura-te-ipsum/tests/test_fs.py
2021-11-15 23:12:51 +03:00

297 lines
10 KiB
Python

import os
import os.path
import shutil
import socket
import string
import tempfile
import unittest
from curateipsum import fs
class CommonFSTestCase(unittest.TestCase):
def setUp(self):
self.tmp_dir_src = tempfile.TemporaryDirectory(prefix="source_")
self.tmp_dir_dst = tempfile.TemporaryDirectory(prefix="dest_")
self.src_dir = self.tmp_dir_src.name
self.dst_dir = self.tmp_dir_dst.name
def tearDown(self):
self.tmp_dir_src.cleanup()
self.tmp_dir_dst.cleanup()
@staticmethod
def create_file(parent_dir: str, prefix: str = None) -> str:
"""
Create file with random name in parent_dir.
Returns absolute path to created file.
"""
fd, path = tempfile.mkstemp(prefix=prefix, dir=parent_dir)
with open(fd, "w") as f:
f.write(string.printable)
return path
@staticmethod
def create_dir(parent_dir: str, prefix: str = None) -> str:
"""
Create directory with random name in parent_dir.
Returns absolute path to created directory.
"""
return tempfile.mkdtemp(prefix=prefix, dir=parent_dir)
def relpath(self, full_path: str) -> str:
""" Get relative path for entity in src/dst dirs. """
if full_path.startswith(self.src_dir):
p_dir = self.src_dir
elif full_path.startswith(self.dst_dir):
p_dir = self.dst_dir
else:
raise RuntimeError(f"Path {full_path} is not src_dir nor dst_dir")
return full_path[len(p_dir) + 1 :]
class TestHardlinkDir(CommonFSTestCase):
def setUp(self):
self.tmp_dir = tempfile.TemporaryDirectory(prefix="source_")
self.src_dir = self.tmp_dir.name
self.dst_dir = self.src_dir + ".copy"
@staticmethod
def check_directory_stats(d1_path: str, d2_path: str):
""" Check that directory was copied. Fails test, if not. """
dir1_stat = os.lstat(d1_path)
dir2_stat = os.lstat(d2_path)
assert dir1_stat.st_uid == dir2_stat.st_uid
assert dir1_stat.st_gid == dir2_stat.st_gid
assert dir1_stat.st_mode == dir2_stat.st_mode
assert dir1_stat.st_nlink == dir2_stat.st_nlink
assert dir1_stat.st_size == dir2_stat.st_size
assert dir1_stat.st_size == dir2_stat.st_size
# only mtime is checked
assert dir1_stat.st_mtime == dir2_stat.st_mtime
def test_common_file(self):
cf_path = self.create_file(self.src_dir)
cf_relpath = self.relpath(cf_path)
fs.hardlink_dir(self.src_dir, self.dst_dir)
src_stat = os.lstat(cf_path)
dst_stat = os.lstat(os.path.join(self.src_dir, cf_relpath))
assert os.path.samestat(src_stat, dst_stat)
assert src_stat.st_nlink == 2
def test_relative_symlink_to_common_file(self):
cf_relpath = self.relpath(self.create_file(self.src_dir))
sl2cf_relpath = "symlink_to_common_file"
os.chdir(self.src_dir)
os.symlink(cf_relpath, sl2cf_relpath)
fs.hardlink_dir(self.src_dir, self.dst_dir)
# check link
dst_sl2cf_path = os.path.join(self.dst_dir, sl2cf_relpath)
assert os.readlink(dst_sl2cf_path) == cf_relpath
# check stats
src_stat = os.lstat(os.path.join(self.dst_dir, sl2cf_relpath))
dst_stat = os.lstat(dst_sl2cf_path)
assert os.path.samestat(src_stat, dst_stat)
assert src_stat.st_nlink == 2
def test_absolute_symlink_to_common_file(self):
cf_path = self.create_file(self.src_dir)
sl2cf_relpath = "symlink_to_common_file"
sl2cf_path = os.path.join(self.src_dir, sl2cf_relpath)
os.symlink(cf_path, sl2cf_path)
fs.hardlink_dir(self.src_dir, self.dst_dir)
# check link
dst_sl2cf_path = os.path.join(self.dst_dir, sl2cf_relpath)
assert os.readlink(dst_sl2cf_path) == cf_path
# check stats
src_stat = os.lstat(os.path.join(self.dst_dir, sl2cf_relpath))
dst_stat = os.lstat(dst_sl2cf_path)
assert os.path.samestat(src_stat, dst_stat)
assert src_stat.st_nlink == 2
def test_hardlink_to_common_file(self):
cf_path = self.create_file(self.src_dir)
hl2cf_relpath = "hardlink_to_common_file"
hl2cf_path = os.path.join(self.src_dir, hl2cf_relpath)
os.link(cf_path, hl2cf_path)
fs.hardlink_dir(self.src_dir, self.dst_dir)
src_cf_stat = os.lstat(cf_path)
src_hl_stat = os.lstat(hl2cf_path)
dst_hl_stat = os.lstat(os.path.join(self.dst_dir, hl2cf_relpath))
assert os.path.samestat(src_cf_stat, dst_hl_stat)
assert os.path.samestat(src_hl_stat, dst_hl_stat)
assert src_cf_stat.st_nlink == 4
def test_nested_dir(self):
src_ndir_path = self.create_dir(self.src_dir)
src_nfile_path = self.create_file(src_ndir_path)
ndir_relpath = self.relpath(src_ndir_path)
nfile_relpath = self.relpath(src_nfile_path)
fs.hardlink_dir(self.src_dir, self.dst_dir)
self.check_directory_stats(src_ndir_path, os.path.join(self.dst_dir, ndir_relpath))
# check file in nested directory
src_fstat = os.lstat(src_nfile_path)
dst_fstat = os.lstat(os.path.join(self.dst_dir, nfile_relpath))
assert os.path.samestat(src_fstat, dst_fstat)
assert src_fstat.st_nlink == 2
def tearDown(self):
self.tmp_dir.cleanup()
shutil.rmtree(self.dst_dir, ignore_errors=True)
class TestRsync(CommonFSTestCase):
@staticmethod
def check_identical_file(f1_path: str, f2_path: str):
""" Check that files are identical. Fails test, if not. """
st1 = os.lstat(f1_path)
st2 = os.lstat(f2_path)
assert st1.st_uid == st2.st_uid
assert st1.st_gid == st2.st_gid
assert st1.st_mode == st2.st_mode
assert st1.st_mtime == st2.st_mtime
assert st1.st_size == st2.st_size
def test_dst_has_excess_file(self):
dst_fpath = self.create_file(self.dst_dir)
all(fs.rsync(self.src_dir, self.dst_dir))
assert not os.path.lexists(dst_fpath)
def test_dst_has_excess_symlink(self):
dst_lpath = os.path.join(self.dst_dir, 'nonexisting_file')
os.symlink('broken_symlink', dst_lpath)
all(fs.rsync(self.src_dir, self.dst_dir))
assert not os.path.lexists(dst_lpath)
def test_dst_has_excess_empty_dir(self):
dst_dpath = self.create_dir(self.dst_dir)
all(fs.rsync(self.src_dir, self.dst_dir))
assert not os.path.lexists(dst_dpath)
def test_dst_has_excess_nonempty_dir(self):
dst_dpath = self.create_dir(self.dst_dir)
self.create_file(dst_dpath)
all(fs.rsync(self.src_dir, self.dst_dir))
assert not os.path.lexists(dst_dpath)
def test_dst_has_excess_nonempty_recursive_dir(self):
dst_dpath = self.create_dir(self.dst_dir)
nested_dpath = self.create_dir(dst_dpath)
self.create_file(nested_dpath)
all(fs.rsync(self.src_dir, self.dst_dir))
assert not os.path.lexists(dst_dpath)
def test_different_types_src_file_dst_dir(self):
src_fpath = self.create_file(self.src_dir)
dst_path = os.path.join(self.dst_dir, self.relpath(src_fpath))
os.mkdir(dst_path)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_path)
assert os.path.isfile(dst_path)
def test_different_types_src_file_dst_symlink(self):
src_fpath = self.create_file(self.src_dir)
dst_path = os.path.join(self.dst_dir, self.relpath(src_fpath))
os.symlink('broken_link', dst_path)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_path)
assert os.path.isfile(dst_path)
def test_different_types_src_symlink_dst_file(self):
dst_path = self.create_file(self.dst_dir)
src_lpath = os.path.join(self.src_dir, self.relpath(dst_path))
os.symlink('broken_link', src_lpath)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_path)
assert os.path.islink(dst_path)
def test_different_types_src_symlink_dst_dir(self):
dst_path = self.create_dir(self.dst_dir)
src_lpath = os.path.join(self.src_dir, self.relpath(dst_path))
os.symlink('broken_link', src_lpath)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_path)
assert os.path.islink(dst_path)
def test_different_types_src_dir_dst_file(self):
src_dpath = self.create_dir(self.src_dir)
dst_path = os.path.join(self.dst_dir, self.relpath(src_dpath))
with open(dst_path, "w") as f:
f.write(string.printable)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_path)
assert os.path.isdir(dst_path)
def test_different_types_src_dir_dst_symlink(self):
src_dpath = self.create_dir(self.src_dir)
dst_path = os.path.join(self.dst_dir, self.relpath(src_dpath))
os.symlink('broken_link', dst_path)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_path)
assert os.path.isdir(dst_path)
def test_src_is_socket(self):
src_spath = self.create_file(self.src_dir)
dst_spath = os.path.join(self.dst_dir, self.relpath(src_spath))
os.unlink(src_spath)
sock = socket.socket(socket.AF_UNIX)
sock.bind(src_spath)
all(fs.rsync(self.src_dir, self.dst_dir))
assert not os.path.lexists(dst_spath)
def test_src_dst_same_inode(self):
src_fpath = self.create_file(self.src_dir)
dst_fpath = os.path.join(self.dst_dir, self.relpath(src_fpath))
os.link(src_fpath, dst_fpath)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_fpath)
src_stat = os.lstat(src_fpath)
dst_stat = os.lstat(dst_fpath)
assert src_stat.st_nlink == 1
assert dst_stat.st_nlink == 1
assert src_stat.st_ino != dst_stat.st_ino
def test_src_dst_diff_size(self):
src_fpath = self.create_file(self.src_dir)
dst_fpath = os.path.join(self.dst_dir, self.relpath(src_fpath))
with open(dst_fpath, "w") as df:
df.write(string.printable * 2)
all(fs.rsync(self.src_dir, self.dst_dir))
assert os.path.lexists(dst_fpath)
self.check_identical_file(src_fpath, dst_fpath)
# TODO add tests for changing ownership
# TODO add tests for changing permissions
# TODO add tests for changing times (?)
# TODO add tests for symlink behaviour