import base64
import logging
import os
import pwd
import re
from typing import Dict, List, Tuple, Union
from urllib.parse import parse_qsl
from .. import helpers
from ..backup_backends_lib import (
BackupBase,
BaseResource,
FtpBackupBase,
NoSuchUserError,
TarFile,
TarResourceMixin,
UnsupportedBackupError,
tarfile_open,
)
BACKUP_EXTENSION = '.tar.gz' # TODO: use file name regex pattern
CRONS_LIST = '/usr/local/directadmin/data/admin/backup_crons.list'
BACKUP_CRONS_PATTERN = re.compile(r'^(?P<id>\d+)=(?P<params>.*)$',
re.MULTILINE)
USER_CONF_PATTERN = re.compile(r'^(?P<key>\w+)=(?P<value>.*)$', re.MULTILINE)
logger = logging.getLogger(__name__)
class DomainsResource(TarResourceMixin, BaseResource):
"""
User domains resource
"""
def __init__(self, path, user):
# type: (str) -> None
self.user_dir = user.pw_dir
resource = os.path.join(user.pw_dir, 'domains')
super().__init__(path, resource)
def _normalize_path(self, path):
# type: (str) -> str
tar_path = path[len(self.user_dir):].lstrip(os.sep)
return tar_path
class HomeResource(TarResourceMixin, BaseResource):
"""
User home directory resource
"""
def __init__(self, path, user):
# type: (str) -> None
resource = user.pw_dir
super().__init__(path, resource)
self.tar = None
def _prep(self, path):
if not self.tar:
self.tar = tarfile_open(self.path)
if not self.fileobj:
home = self.tar.extractfile('backup/home.tar.gz')
self.fileobj = tarfile_open(fileobj=home)
return self._normalize_path(path)
def _normalize_path(self, path):
# type: (str) -> str
tar_path = path[len(self.resource):].lstrip(os.sep)
return tar_path
def close(self):
# type: () -> None
super().close()
if self.tar:
self.tar.close()
self.tar = None
Resource = Union[DomainsResource, HomeResource]
def _read_user_conf(tar):
# type: (TarFile) -> Dict[str, str]
"""
Read user config from backup file
"""
with tar.extractfile('backup/user.conf') as user_conf:
raw_data = user_conf.read()
raw_data = raw_data.decode()
config = USER_CONF_PATTERN.findall(raw_data) # type: List[Tuple[str, str]]
return dict(config)
class DirectAdminBackup(BackupBase):
"""
DirectAdmin local backup
"""
def __init__(self, path, created):
# type: (str, helpers.DateTime) -> None
super().__init__(path, created)
self.resources = [] # type: List[Resource]
with tarfile_open(path) as tar:
try:
user_conf = _read_user_conf(tar)
username = user_conf['username']
except (KeyError, ValueError) as e:
raise UnsupportedBackupError(*e.args)
try:
user = pwd.getpwnam(username)
except KeyError:
raise NoSuchUserError('user not found: {}'.format(username))
try:
tar.getmember('domains')
except KeyError:
pass
else:
domains_resource = DomainsResource(path, user)
self.resources.append(domains_resource)
try:
tar.getmember('backup/home.tar.gz')
except KeyError:
pass
else:
home_resource = HomeResource(path, user)
self.resources.append(home_resource)
class DirectAdminFtpBackup(FtpBackupBase):
"""
DirectAdmin FTP backup
"""
FTP_DIR_NAME = '.ri-directadmin-ftp'
# for DirectAdmin FTP backups we can't determine ctime
# so using mtime instead
def __init__(self, ftp, path, modified, tmp_dir):
super().__init__(ftp, path, modified, tmp_dir=tmp_dir)
self._backup = None
@property
def resources(self):
if self._backup is None:
backup_path = self._retrieve()
if backup_path is None:
return []
try:
self._backup = DirectAdminBackup(backup_path, self.created)
except (NoSuchUserError, UnsupportedBackupError):
return []
return self._backup.resources
def close(self):
super().close()
self._backup.close()
self._backup = None
def _read_backup_crons_list(path):
with open(path) as backup_crons:
raw_data = backup_crons.read()
crons_raw = BACKUP_CRONS_PATTERN.findall(raw_data)
crons = {int(id_): dict(parse_qsl(qs)) for id_, qs in crons_raw}
return crons
def backups_from_path(path, until=None):
# type: (str, helpers.DateTime) -> List[DirectAdminBackup]
"""
Get local backups. Recursively
"""
backup_list = []
if os.path.exists(path):
for root, _, files in os.walk(path):
for f in files:
backup_path = os.path.join(root, f)
if backup_path.endswith(BACKUP_EXTENSION):
timestamp = os.stat(path).st_ctime
created = helpers.DateTime.fromtimestamp(timestamp)
if until is None or until <= created:
try:
backup = DirectAdminBackup(backup_path, created)
except (NoSuchUserError, UnsupportedBackupError):
continue
backup_list.append(backup)
backup_list = sorted(backup_list, reverse=True)
return backup_list
def backups_from_ftp(ftp, path, until=None, tmp_dir=None):
# type: (helpers.Ftp, str, helpers.DateTime) -> List[DirectAdminBackup]
"""
Get FTP backups. Recursively
"""
backup_list = []
for name, facts in ftp.mlistdir(path):
if facts['type'] == 'dir':
backups_ = backups_from_ftp(
ftp,
os.path.join(path, name),
until,
tmp_dir=tmp_dir
)
backup_list.extend(backups_)
elif facts['type'] == 'file' and name.endswith(BACKUP_EXTENSION):
modified = helpers.DateTime(facts['modify'])
backup_path = os.path.join(path, name)
if until is None or until <= modified:
backup = DirectAdminFtpBackup(
ftp,
backup_path,
modified,
tmp_dir=tmp_dir
)
backup_list.append(backup)
backup_list = sorted(backup_list, reverse=True)
return backup_list
CronData = Dict[str, str]
def backups_cron(until=None, tmp_dir=None):
# type: (helpers.DateTime) -> List[DirectAdminBackup]
"""
Get list of local/FTP backups scheduled via cron
"""
try:
crons = _read_backup_crons_list(CRONS_LIST)
except FileNotFoundError:
return []
backup_list = [] # type: List[DirectAdminBackup]
for cron_data in crons.values():
if cron_data['where'] == 'local':
if cron_data['type'] == 'admin':
path = cron_data['local_path']
backup_list.extend(backups_from_path(path, until))
elif cron_data['type'] == 'reseller':
try:
user = pwd.getpwnam(cron_data['owner'])
except KeyError:
continue
path = os.path.join(user.pw_dir, 'user_backups')
backup_list.extend(backups_from_path(path, until))
elif cron_data['where'] == 'ftp':
ftp_ip = cron_data['ftp_ip']
ftp_username = cron_data['ftp_username']
ftp_password = base64.b64decode(cron_data['ftp_password']).decode()
ftp_secure = cron_data['ftp_secure'] == 'yes'
ftp_port = int(cron_data['ftp_port'])
path = cron_data['ftp_path']
ftp = helpers.Ftp(host=ftp_ip, login=ftp_username,
password=ftp_password, use_ftps=ftp_secure,
port=ftp_port)
try:
ftp.connect()
except helpers.FtpError:
logger.warning('Error connecting to %s' % ftp)
try:
backup_list.extend(
backups_from_ftp(
ftp,
path,
until,
tmp_dir=tmp_dir)
)
except helpers.FtpError:
logger.warning('Error listing %s/%s' % (ftp, path))
backup_list = sorted(backup_list, reverse=True)
return backup_list
def backups_user(until=None):
# type: (helpers.DateTime) -> List[DirectAdminBackup]
"""
Get list of user backups
"""
backup_list = [] # type: List[DirectAdminBackup]
for user in pwd.getpwall():
path = os.path.join(user.pw_dir, 'backups')
backup_list.extend(backups_from_path(path, until))
backup_list = sorted(backup_list, reverse=True)
return backup_list
def is_suitable():
return os.path.isfile('/usr/local/directadmin/directadmin')
def backups(until=None, tmp_dir=None):
# type: (helpers.DateTime) -> List[DirectAdminBackup]
"""
Get list of all available backups
"""
backup_list = [] # type: List[DirectAdminBackup]
backup_list.extend(backups_cron(until, tmp_dir=tmp_dir))
backup_list.extend(backups_user(until))
backup_list = sorted(backup_list, reverse=True)
return backup_list
def cleanup():
helpers.Ftp.cache_clear()