"""
Mostly copy-pasting from `safe_fileops.py` of the Imunify360 project
"""
import os
import shutil
import stat
from concurrent.futures import ProcessPoolExecutor
from contextlib import ExitStack, contextmanager, suppress
from functools import lru_cache, partial
from pathlib import Path
from typing import Optional, Union
import distro
R_FLAGS = os.O_RDONLY
W_FLAGS = os.O_TRUNC | os.O_CREAT | os.O_WRONLY
def drop(uid, gid, fun, *args):
if os.getuid() == 0:
os.setgroups([])
os.setgid(gid)
os.setuid(uid)
return fun(*args)
@contextmanager
def open_fd(path, flags, dir_fd=None):
"""
Context manager which wraps os.open and close file descriptor at the end
"""
fd = os.open(path, flags, dir_fd=dir_fd)
try:
yield fd
finally:
with suppress(OSError): # fd is already closed
os.close(fd)
@contextmanager
def open_dir_fd(name: str):
"""
Context manager to get a directory file descriptor
It also checks if a directory doesn't contain a symlink in the path
:param name: full directory name
"""
with open_fd(name, os.O_DIRECTORY) as dir_fd:
real = os.readlink("/proc/self/fd/{}".format(dir_fd))
if name != real:
raise PermissionError("Operations on symlinks are prohibited")
yield dir_fd
def open_fobj(f: Union[str, int], mode, flags=0, dir_fd: Optional[int] = None):
"""
Context manager to open file object from file name or from file descriptor
File object extended with 'st' attribute that contains os.stat_result of
the opened file
:param f: file name or file descriptor to open
:param mode: mode for built-in open
:param flags: flags for os.open, ignored if 'f' is a file descriptor
:param dir_fd: directory descriptor, ignored if 'f' is a file descriptor
"""
if isinstance(f, str):
f = os.open(f, flags=flags, dir_fd=dir_fd)
return open(f, mode=mode)
def make_readable(name: str, dir_fd: int):
# make a file readable by its owner
st = os.stat(name, dir_fd=dir_fd)
os.chmod(name, mode=st.st_mode | stat.S_IRUSR, dir_fd=dir_fd)
return st
@contextmanager
def safe_tuple(name: str, dir_fd: int, flags: int, is_safe: bool):
"""
If is_safe flag is True, open file descriptor using name and dir_fd
If is_safe is False, return name and dir_fd as is
"""
if is_safe:
with open_fd(name, dir_fd=dir_fd, flags=flags) as fd:
yield fd, None
else:
yield name, dir_fd
def _move(
src_fd: int,
src_mode: int,
dst_f: Union[int, str],
dst_dir_fd: Optional[int],
_race_dst,
):
with open_fobj(src_fd, "rb") as src_fo, open_fobj(
dst_f, "wb", W_FLAGS, dst_dir_fd
) as dst_fo:
if _race_dst:
_race_dst()
shutil.copyfileobj(src_fo, dst_fo)
if isinstance(dst_f, str):
# safe_dst == False
os.chmod(dst_fo.fileno(), mode=src_mode)
@lru_cache(1)
def _crontab_path() -> Path:
if "debian" in distro.like().split():
return Path("/var/spool/cron/crontabs")
return Path("/var/spool/cron")
def is_crontab(path: str) -> bool:
return Path(path).parent == _crontab_path()
def safe_move(src: str, dst: str, *, _race_src=None, _race_dst=None):
src_dir, src_name = os.path.split(src)
dst_dir, dst_name = os.path.split(dst)
with ExitStack() as stack:
src_dir_fd = stack.enter_context(open_dir_fd(src_dir))
src_st = make_readable(src_name, src_dir_fd)
src_fd = stack.enter_context(open_fd(src_name, R_FLAGS, src_dir_fd))
dst_dir_fd = stack.enter_context(open_dir_fd(dst_dir))
safe_dst = is_crontab(dst)
if safe_dst and os.stat(dst_dir_fd).st_mode & stat.S_ISVTX:
# If the parent directory has the sticky bit set,
# we can't overwrite an existing file, so delete it first
with suppress(FileNotFoundError):
os.unlink(dst_name, dir_fd=dst_dir_fd)
dst_f, dst_dir_fd = stack.enter_context(
safe_tuple(dst_name, dst_dir_fd, W_FLAGS, safe_dst)
)
fn = partial(
drop,
src_st.st_uid,
src_st.st_gid,
_move,
src_fd,
src_st.st_mode,
dst_f,
dst_dir_fd,
_race_dst,
)
with ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(fn)
future.result()
if _race_src:
_race_src()
os.unlink(src_name, dir_fd=src_dir_fd)
if safe_dst:
os.chown(dst_f, src_st.st_uid, src_st.st_gid, dir_fd=dst_dir_fd)
os.chmod(dst_f, src_st.st_mode, dir_fd=dst_dir_fd)