import functools
import gzip
import os
import pwd
from typing import BinaryIO, Dict, List, Tuple, Union
import yaml
from .. import helpers
from ..backup_backends_lib import (
BackupBase,
BaseResource,
FileResourceMixin,
NoSuchUserError,
TarResourceMixin,
)
CPANEL_BACKUP_CONFIG = '/var/cpanel/backups/config'
CPANEL_RESOURCE_EXTENSIONS = ('.gz', '.tar')
def is_suitable():
return (os.path.isfile('/usr/local/cpanel/cpanel')
and os.path.exists(CPANEL_BACKUP_CONFIG))
class SystemResource(BaseResource):
"""
Base class for system resources of cPanel backup
"""
def __init__(self, path):
# type: (str) -> None
system_resource = _strip_resource(path)
resource = system_resource.replace('_', os.sep)
super().__init__(path, resource)
# TODO implement other user related files (not only from homedir)
class AccountResource(TarResourceMixin, BaseResource):
"""
User's files resource (TARed and optionally GZIPped in one file)
"""
def __init__(self, path):
# type: (str) -> None
account_resource = _strip_resource(path)
try:
self.user = pwd.getpwnam(account_resource)
except KeyError:
raise NoSuchUserError()
resource = self.user.pw_dir
super().__init__(path, resource)
def _normalize_path(self, path):
# type: (str) -> str
home_path = path[len(self.user.pw_dir):].lstrip(os.sep)
tar_path = os.path.join(self.user.pw_name, 'homedir', home_path)
return tar_path
class SystemDirResource(TarResourceMixin, SystemResource):
"""
System directory resource (TARed and GZIPped in one file)
"""
def _normalize_path(self, path):
# type: (str) -> str
return path.lstrip(os.sep)
class SystemFileResource(FileResourceMixin, SystemResource):
"""
System file resource (GZIPped)
"""
@functools.lru_cache(maxsize=None)
def info(self, path):
# type: (str) -> Tuple[int, helpers.DateTime, int, int, int]
size = 0
for chunk in helpers.read(self.open(path)):
size += len(chunk)
stat = os.stat(self.path)
return (size, helpers.DateTime.fromtimestamp(self.fileobj.mtime),
stat.st_uid, stat.st_gid, stat.st_mode)
def open(self, path):
# type: (str) -> BinaryIO
if path != self.resource:
raise FileNotFoundError(path)
if not self.fileobj:
self.fileobj = gzip.open(self.path)
return self.fileobj
class PlainResource(BaseResource):
"""
Base class for cPanel backup resources with files stored w/o compression
"""
def close(self):
# type: () -> None
pass
@functools.lru_cache(maxsize=None)
def open(self, path):
# type: (str) -> BinaryIO
plain_path = self._normalize_path(path)
try:
return open(plain_path, 'rb')
except FileNotFoundError:
raise FileNotFoundError(repr(path))
class PlainSystemResource(PlainResource):
"""
Base class for plain system resources
"""
def __init__(self, path):
# type: (str) -> None
system_resource = os.path.basename(path)
resource = system_resource.replace('_', os.sep)
super().__init__(path, resource)
@functools.lru_cache(maxsize=None)
def info(self, path):
# type: (str) -> Tuple[int, helpers.DateTime, int, int, int]
plain_path = self._normalize_path(path)
stat = os.stat(plain_path)
return (stat.st_size, helpers.DateTime.utcfromtimestamp(stat.st_mtime),
stat.st_uid, stat.st_gid, stat.st_mode)
# TODO implement other user related files (not only from homedir)
class IAccountResource(PlainResource):
"""
User's files resource (stored in a plain directory)
"""
def __init__(self, path):
# type: (str) -> None
account_resource = os.path.basename(path)
try:
self.user = pwd.getpwnam(account_resource)
except KeyError:
raise NoSuchUserError()
resource = self.user.pw_dir
super().__init__(path, resource)
def _normalize_path(self, path):
# type: (str) -> str
resource_path = path[len(self.resource):].lstrip(os.sep)
real_path = os.path.join(self.path, 'homedir', resource_path)
return real_path
@functools.lru_cache(maxsize=None)
def info(self, path):
# type: (str) -> Tuple[int, helpers.DateTime, int, int, int]
plain_path = self._normalize_path(path)
stat = os.stat(plain_path)
return (stat.st_size, helpers.DateTime.utcfromtimestamp(stat.st_mtime),
self.user.pw_uid, self.user.pw_gid, stat.st_mode)
class ISystemDirResource(PlainSystemResource):
"""
System directory resource (stored as a plain directory)
"""
def _normalize_path(self, path):
# type: (str) -> str
resource_path = path[len(self.resource):].lstrip(os.sep)
real_path = os.path.join(self.path, resource_path)
return real_path
class ISystemFileResource(PlainSystemResource):
"""
System file resource (stored as a plain file)
"""
def _normalize_path(self, path):
# type: (str) -> str
if path != self.resource:
raise FileNotFoundError(path)
return self.path
Resource = Union[AccountResource, SystemDirResource, SystemFileResource,
IAccountResource, ISystemDirResource, ISystemFileResource]
CPANEL_BACKUP_RESOURCES = (
(
('accounts',),
AccountResource,
IAccountResource,
),
(
('system', 'dirs'),
SystemDirResource,
ISystemDirResource,
),
(
('system', 'files'),
SystemFileResource,
ISystemFileResource,
)
)
class CpanelBackup(BackupBase):
"""
Class for cPanel backup
"""
def __init__(self, path, created=None):
# type: (str, helpers.DateTime) -> None
super().__init__(path, created)
self.resources = [] # type: List[Resource]
for location, ResourceClass, IResourceClass in CPANEL_BACKUP_RESOURCES:
resource_path = os.path.join(path, *location)
if os.path.exists(resource_path):
for resource_name in os.listdir(resource_path):
resource_abs = os.path.join(resource_path, resource_name)
try:
if resource_abs.endswith(CPANEL_RESOURCE_EXTENSIONS):
resource = ResourceClass(resource_abs)
elif not resource_abs.endswith('-=-meta'):
resource = IResourceClass(resource_abs)
else:
continue
self.resources.append(resource)
except NoSuchUserError:
pass
def __lt__(self, other):
# if self.created == None (cPanel < v64 incremental backups)
try:
return super().__lt__(other)
except TypeError:
return False
def _get_backup_config(config_file=CPANEL_BACKUP_CONFIG):
# type: () -> Dict[str, Union[str, int]]
with open(config_file) as config:
return yaml.safe_load(config)
def _strip_resource(path):
# type: (str) -> str
resource = os.path.basename(path)
for extension in CPANEL_RESOURCE_EXTENSIONS:
resource = resource.rsplit(extension)[0]
return resource
def backups(until=None, path=None, tmp_dir=None):
# type: (helpers.DateTime, str) -> List[CpanelBackup]
"""
Get list of backups
"""
backup_config = _get_backup_config()
if path is None:
path = backup_config['BACKUPDIR']
incremental = backup_config['BACKUPTYPE'] == 'incremental'
backup_list = [] # type: List[CpanelBackup]
for prefix in ('', 'weekly', 'monthly'):
base = os.path.join(path, prefix)
if os.path.isdir(base):
if incremental: # cPanel < v64
backup_path = os.path.join(base, 'incremental')
if os.path.isdir(backup_path):
backup = CpanelBackup(backup_path)
backup_list.append(backup)
for backup_name in os.listdir(base):
try:
backup_date = helpers.DateTime(backup_name)
except ValueError:
continue
backup_path = os.path.join(base, backup_name)
if until is None or until <= backup_date:
backup = CpanelBackup(backup_path, backup_date)
backup_list.append(backup)
backup_list = sorted(backup_list, reverse=True)
return backup_list