import base64
import glob
import os
import pwd
from abc import abstractmethod
from pathlib import Path
from typing import Dict, Iterable, List, Union
from xml.etree import ElementTree
import pymysql
from Crypto.Cipher import AES
from .. import helpers
from ..backup_backends_lib import (
BackupBase,
BaseResource,
FtpBackupBase,
TarFile,
TarResourceMixin,
tarfile_open,
)
PLESK_CONFIG = '/etc/psa/psa.conf'
PLESK_CONFIG_DEFAULT = '/etc/psa/psa.conf.default'
PLESK_SECRET_KEY = '/etc/psa/private/secret_key'
PLESK_SHADOW = '/etc/psa/.psa.shadow'
# TODO implement other resources (not only from /var/www/vhosts)
class DomainUserDataResource(TarResourceMixin, BaseResource):
"""
User domain data resource
"""
root = '/var/www/vhosts'
def __init__(self, path, domain):
# type: (str, str) -> None
resource = os.path.join(self.root, domain)
super().__init__(path, resource)
def _normalize_path(self, path):
# type: (str) -> str
resource_path = path[len(self.resource):].lstrip(os.sep)
return resource_path
class DomainUserDataTarResource(DomainUserDataResource):
"""
User domain data resource in tar archive
"""
def __init__(self, path, domain, tar):
# type: (str, str, TarFile) -> None
super().__init__(path, domain)
fileobj = tar.extractfile(path)
self.fileobj = tarfile_open(fileobj=fileobj)
Resource = Union[DomainUserDataResource, DomainUserDataTarResource]
class BackupInfo:
def __init__(self, path, domain: str, resources: Iterable[str]):
self.path = path
self.domain = domain
self.resources = list(resources)
@classmethod
def parse_xml(cls, path, xml_content: str):
resources = []
tree = ElementTree.fromstring(xml_content)
domain_node = tree.find("domain")
# FIXME: domain can be None
domain: str = domain_node.get("name")
phosting = domain_node.find("phosting")
dir_name = os.path.dirname(path)
if phosting:
content = phosting.find("content")
if content:
for cid in content:
if (cid_type := cid.get("type")) != "user-data":
continue
cid_path = cid.get("path", default="")
filename = cid.find("content-file").text
resource_path = os.path.join(dir_name, cid_path, filename)
resources.append(resource_path)
return cls(path, domain, resources)
class PleskDomainBackup(BackupBase):
"""
Plesk backup of user domain
"""
def __init__(self, path, created):
# type: (str, helpers.DateTime) -> None
super().__init__(path, created)
with open(path) as xml:
xml_content = xml.read()
backup_info = BackupInfo.parse_xml(path, xml_content)
self.domain = backup_info.domain
self.resources: List[Resource] = [
DomainUserDataResource(resource_path, self.domain)
for resource_path in backup_info.resources
]
class PleskFtpBackupBase(FtpBackupBase):
"""
Base class for Plesk backups on FTP server
"""
FTP_DIR_NAME = '.ri-plesk-ftp'
def __init__(self, ftp, path, created, tmp_dir):
# type: (helpers.Ftp, str, helpers.DateTime, str) -> None
super().__init__(ftp, path, created, tmp_dir=tmp_dir)
self.tar = None
self._resources = None
@staticmethod
@abstractmethod
def _check_path(path):
""" Check that this is a path to this backup's xml description """
def _retrieve_resources(self):
self._resources = [] # type: List[Resource]
path = self._retrieve()
if path is None:
return
# FIXME: initialize in the __init__.
# If there is a reason for a lazy loading, decouple the laziness logic
# from the backup classes
self.tar = tarfile_open(path)
for path in self.tar.getnames():
if not self._check_path(path):
continue
with self.tar.extractfile(path) as xml:
xml_content = xml.read()
backup_info = BackupInfo.parse_xml(path, xml_content)
for resource_path in backup_info.resources:
self._resources.append(
DomainUserDataTarResource(
resource_path, backup_info.domain, self.tar
)
)
@property
def resources(self):
if self._resources is None:
self._retrieve_resources()
return self._resources
def close(self):
# type: () -> None
super().close()
self._resources = None
if self.tar:
self.tar.close()
self.tar = None
class PleskDomainFtpBackup(PleskFtpBackupBase):
"""
Plesk FTP backup of user domain
"""
@staticmethod
def _check_path(path):
# type: (str) -> bool
try:
_, = path.split('/')
except ValueError:
return False
if not path.endswith('.xml'):
return False
return True
class PleskFtpBackup(PleskFtpBackupBase):
"""
Plesk FTP all-in-one backup
"""
@staticmethod
def _check_path(path):
# type: (str) -> bool
try:
c, _, d, _, _ = path.split('/')
except ValueError:
return False
if (c, d) != ('clients', 'domains') or not path.endswith('.xml'):
return False
return True
def _backup_date(path):
# type: (str) -> helpers.DateTime
base, _ = os.path.splitext(os.path.basename(path))
_, created = base.rsplit('_', 1)
return helpers.DateTime('20' + created)
def _decrypt_password(password):
# type: (str) -> str
try:
_, encryption, iv, content = password.split("$")
except ValueError:
return password
if encryption != 'AES-128-CBC':
raise ValueError('Unsupported encryption: ' + encryption)
with open(PLESK_SECRET_KEY, 'rb') as f:
key = f.read()
iv = base64.b64decode(iv)
content = base64.b64decode(content)
cipher = AES.new(key, mode=AES.MODE_CBC, IV=iv)
plain = cipher.decrypt(content)
plain = plain.rstrip(b'\0')
return plain.decode()
def _is_true(s):
# type: (str) -> bool
return s == 'true'
PLESK_BACKUP_SETTINGS = {
'backup_ftp_settingactive': ('active', _is_true),
'backup_ftp_settinghost': ('host', str),
'backup_ftp_settinglogin': ('login', str),
'backup_ftp_settingpassword': ('password', _decrypt_password),
'backup_ftp_settingdirectory': ('directory', str),
'backup_ftp_settingpassive_mode': ('passive_mode', _is_true),
'backup_ftp_settinguse_ftps': ('use_ftps', _is_true),
# 'backup_ftp_settinguse_backup_password': ('use_backup_password', _is_true), # noqa: E501
# 'backup_ftp_settingbackup_password': ('backup_password', _decrypt_password) # noqa: E501
}
DomainBackupSettings = Dict[str, Union[bool, str]]
UserBackupSettings = Dict[str, DomainBackupSettings]
BackupSettings = Dict[str, UserBackupSettings]
def _get_backup_settings():
# type: () -> BackupSettings
query = (
'SELECT b.type, c.login, d.name, b.param, b.value '
'FROM BackupsSettings AS b '
'LEFT JOIN domains AS d ON b.id=d.id '
'LEFT JOIN clients as c ON d.cl_id=c.id '
)
with open(PLESK_SHADOW) as f:
password = f.read()
with pymysql.connect(user='admin', password=password, db='psa') as cur:
cur.execute(query)
settings = {}
for setting_type, login, domain, param, value in cur:
if param in PLESK_BACKUP_SETTINGS:
if setting_type == 'server':
login = 'admin'
domain = ''
key, value_type = PLESK_BACKUP_SETTINGS[param]
user = settings.setdefault(login, {})
domain_settings = user.setdefault(domain, {'active': True})
domain_settings[key] = value_type(value)
return settings
def _get_config():
# type: () -> Dict[str, str]
try:
return _parse_config(PLESK_CONFIG)
except FileNotFoundError:
return _parse_config(PLESK_CONFIG_DEFAULT)
def _parse_config(path):
# type: (str) -> Dict[str, str]
config = {}
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
key, value = line.split()
config[key] = value
return config
PleskBackup = Union[PleskDomainBackup, PleskDomainFtpBackup, PleskFtpBackup]
def _domains_backups(domains, until=None, usernames=()):
# type: (str, helpers.DateTime, Iterable[str]) -> List[PleskBackup]
backup_list = [] # type: List[PleskBackup]
if os.path.isdir(domains):
for domain in os.listdir(domains):
if usernames:
domain_home = os.path.join(DomainUserDataResource.root, domain)
domain_owner = _get_domain_owner(domain_home)
if domain_owner not in usernames:
continue
domain_dumps = os.path.join(domains, domain)
domain_dumps_xml = os.path.join(domain_dumps, '*.xml')
for backup_xml in glob.glob(domain_dumps_xml):
backup_date = _backup_date(backup_xml)
if until is None or until <= backup_date:
backup = PleskDomainBackup(backup_xml, backup_date)
if backup.resources:
backup_list.append(backup)
return backup_list
def backups_local(until=None, usernames=()):
# type: (helpers.DateTime, Iterable[str]) -> List[PleskBackup]
"""
Get list of local backups
"""
backup_list = [] # type: List[PleskBackup]
config = _get_config()
dump_d = config['DUMP_D']
dump_clients_d = os.path.join(dump_d, 'clients')
if os.path.isdir(dump_clients_d):
for client in os.listdir(dump_clients_d):
if usernames and client not in usernames:
continue
client_domains = os.path.join(dump_clients_d, client, 'domains')
client_backups = _domains_backups(client_domains, until=until)
backup_list.extend(client_backups)
dump_domains_d = os.path.join(dump_d, "domains")
domains_backups = _domains_backups(
dump_domains_d, until=until, usernames=usernames
)
backup_list.extend(domains_backups)
backup_list = sorted(backup_list, reverse=True)
return backup_list
def backups_ftp(until=None, tmp_dir=None):
# type: (helpers.DateTime) -> List[PleskBackup]
"""
Get list of remote backups
"""
backup_settings = _get_backup_settings()
backup_list = [] # type: List[PleskBackup]
for _, domains in backup_settings.items():
for domain, ftp_settings in domains.items():
if ftp_settings['active']:
backup_cls = PleskDomainFtpBackup if domain else PleskFtpBackup
try:
ftp = helpers.Ftp(**ftp_settings)
except TypeError: # missing required positional arguments
continue
try:
ftp.connect()
except helpers.FtpError:
helpers.warning('Error connecting to %s' % ftp)
continue
try:
ftp_dir = ftp.listdir(ftp_settings['directory'])
except helpers.FtpError:
helpers.warning('Error listing of %s/%s' %
(ftp, ftp_settings['directory']))
continue
for path in ftp_dir:
if path.endswith('.tar'):
backup_date = _backup_date(path)
if until is None or until <= backup_date:
backup = backup_cls(
ftp, path, backup_date, tmp_dir=tmp_dir
)
backup_list.append(backup)
backup_list = sorted(backup_list, reverse=True)
return backup_list
def _get_domain_owner(filename):
root = Path(DomainUserDataResource.root)
path = Path(filename)
try:
rel = path.relative_to(root)
domain = root / rel.parts[0]
st = domain.stat()
pw = pwd.getpwuid(st.st_uid)
return pw.pw_name
except Exception:
return None
def pre_backups(files, until=None):
usernames = []
for f in files:
user = _get_domain_owner(f)
if user is None:
return
usernames.append(user)
return {
'usernames': usernames,
}
def backups(until=None, *, usernames=(), tmp_dir=None):
# type: (helpers.DateTime, Iterable[str]) -> List[PleskBackup]
"""
Get list of all available backups
"""
backup_list = [] # type: List[PleskBackup]
backup_list.extend(backups_local(until, usernames))
backup_list.extend(backups_ftp(until, tmp_dir=tmp_dir))
backup_list = sorted(backup_list, reverse=True)
return backup_list
def cleanup():
# type: () -> None
"""
Remove all temp files
"""
helpers.Ftp.cache_clear()
def is_suitable():
return os.path.isfile(PLESK_CONFIG)